Skip to main content

indexer - 数据自动同步工具

indexer 是一个实现将客户数据(支持包括 SQL 等多种数据库、文件目录、队列)自动同步到 indexea 的工具。indexer 采用 rust 语言开发,体积小、支持跨平台,简单轻便,资源占用少,同时对业务系统的侵入很小。indexer 提供非常灵活的配置 (indexer.yml),以非常高效方式将业务数据同步到 indexea

该工具基于 MIT 许可证开源,开源地址:https://gitee.com/indexea/indexer

使用说明

配置文件

indexer 使用 YAML 文件(indexer.yml)进行灵活的自定义配置,配置包括三部分内容:

序号配置名配置说明其他说明
1datasource数据源定义允许配置多个数据库,当前支持 MySQL 和 Postgres
2endpointIndexea 服务配置只能配置单个服务,如果需要使用多个服务,可以运行多个 indexer 的实例
3tasks同步任务配置配置 datasource 的数据如何同步到 endpoint 中,可以配置一到多个同步任务

indexer.yml

datasource:
mydb: # datasource name
source: 'mysql' # mysql,postgres,oracle
url: 'mysql://account:passwd@localhost:3306/mydb'

endpoint:
url: 'https://api.indexea.com/v1/' # Indexea Gateway API
access_token: 't0ezsaje7zbz2wvzxtdlzafcr2q0ur2ligql1spm'
indices:
repositories: # index name
app: 'tw6mi8il' # app ient
index: 1005 # index id
blogs:
app: 'tw6mi8il'
index: 1006

tasks:
repositories: # task name
datasource: 'mydb' # datasource name, refer to $datasource.mydb
index: 'repositories' # index name, refer to $endpoint.indices.repositories
table: 'repositories' # table name
primary: 'id' # primary field name of table
interval: 1000 # check interval in mills-second
blogs:
datasource: 'mydb'
index: 'blogs'
table: 'blogs'
primary: 'id'
sql: 'SELECT b.*, a.name AS author_name FROM blogs b LEFT JOIN authors a ON b.author = a.id' ##指定 SQL ,否则默认为 SELECT * FROM <table>
fields:
author_name: 'user' # 字段名映射,例:在 indexea 将使用 user 字段名来表示 author_name 的值
password: '' # 删除某个字段,一些敏感信息不需要进行搜索的可以通过这种方式避免同步到 indexea
status: # 自定义字段值映射
0: 'pending'
1: 'open'
2: 'block'
3: 'delete'
interval: 2000

配置文件详解

1. 定义数据源

当前 indexer 的数据源支持 MySQL 和 Postgres,你可以在 datasource 中定义多个数据源,每个数据源需要指定一个唯一的名称,然后在 tasks 中的 datasource 引用该名称。

2. 定义 Indexea 服务信息

endpoint 中定义 Indexea 服务的信息,包括服务 URL 地址( 一般为 https://api.indexea.com/v1/ ,如果你使用的是私有部署的 Indexea 则填写该服务的访问地址即可 )、access_token 访问令牌,关于访问令牌的获取请参考 访问令牌

同时需要定义需要同步的目标索引信息,每一个索引信息包括:名称、应用标识、索引编号,你可以在 Indexea 管理后台中查看应用标识和索引编号。形式如下:

indices:
repositories: # index name
app: 'tw6mi8il' # app ient
index: 1005 # index id
blogs:
app: 'tw6mi8il'
index: 1006

3. 定义同步任务

有了数据源和 Indexea 服务信息后,接下来就可以定义同步任务了,对应的配置项是 tasks。每个同步任务需要指定所用的数据源名称 datasource 和目标索引名称 index 。同时需要指定同步的数据表名称 table 和表的主键字段名称 primary,这两个字段是必须的。

这是最基本的配置,用来将某个表的数据同步到 Indexea 中,表中的字段名对应索引的字段名,每一行记录相当于索引中的一个文档。但经常这样的配置是不够的,因为你可能希望在同步数据前对数据做一些简单的处理,这就需要使用 sqlfields 配置进行定义。

首先我们可以通过 sql 指令来指定数据的获取,因为经常我们需要通过关联表来获取额外的信息,例如一个博客表中只有作者的 ID,我们需要通过作者表来获取作者的名称,这时候就可以通过 sql 来指定获取数据的 SQL 语句,例如:

SELECT b.*, a.name AS author_name FROM blogs b LEFT JOIN authors a ON b.author = a.id

在定义完数据获取的方式后,我们还需要定义字段的映射关系,indexer 支持以下几种字段的映射关系:

  • 字段名映射(改字段名)fields 中的 key 为数据源中的字段名,value 为索引中的字段名,例如:author_name: 'user' 表示将 author_name 的值同步到索引中的 user 字段中

  • 对字段内容进行 HTML 标签清理:如果你的字段内容是一段 HTML 内容(例如博客文章的内容一般以 HTML 格式存储),但在搜索中这些 HTML 标签会对搜索结果产生干扰,因此需要进行清理,使用 HTML 清理的配置如下:

    fields:
    body:
    html_strip:
    new_field_name: 'body_text' # 清理后的字段名

    这个过程你还可以顺手将 HTML 内容中的图像提取出来并存为一个新的数组字段,例如:

    fields:
    body:
    html_strip:
    extract_images_to_field: 'images'
  • 对字段值进行切分 :有些字段的值可能是一个字符串,但我们希望将其切分为一个数组,例如:博客文章的标签字段使用逗号隔开多个标签,我们希望将其切分为一个数组,这时候可以使用 split 配置,例如:

    fields:
    tags:
    split:
    separator: ',' # 分隔符
  • 对字段进行值映射:例如你希望将某个数值字段的值映射为一个字符串,例如:博客文章的状态字段,0 表示待审核,1 表示已发布,2 表示已屏蔽,3 表示已删除,这时候可以使用如下配置:

    fields:
    status:
    0: 'pending'
    1: 'open'
    2: 'block'
    3: 'delete'
  • Markdown 字段值预处理: 配置方式与 html_strip 一致,只需将 html_strip 替换成 markdown 即可。

  • 删除字段:有些字段是敏感信息,不希望同步到索引中,可以通过如下配置来删除:

    fields:
    password: '' # 删除 password 字段

对超大量记录的提速处理

indexer 在读取原数据时是分批读取的,越读到后面的记录,读取的速度越慢,这是因为数据库的索引机制导致的,为了解决这个问题,可以通过 scroll_by_id: true 配置来指定通过主键字段进行记录为条件进行分批次读取,例如:

repositories:
datasource: 'mysql'
index: 'repositories'
table: 'repositories'
primary: 'id' # 主键字段名称
scroll_by_id: true

程序参数

-c [indexer.yml]
指定配置文件路径

-l [indexer.log]
指定日志文件路径

-t --test
测试配置是否正确

-i --init
在本地数据源上建立 indexer 必备的信息,例如在数据库上建 indexea_tasks 表和触发器等。 然后执行数据首次同步,也就是全量的数据同步,如果使用该参数,需要用户进行二次确认

-d --start
进入守护进程方式开始增量数据同步

--clean
清除 indexer 生成的表、函数和触发器,
当不再使用 indexer 时可以使用该参数来清除数据。

--stop
停止守护进程

数据同步流程

初始化和全量同步

命令:indexer -i
首次使用前需要进行配置,你可以运行 indexer -t 来测试配置文件是否正确,然后运行 indexer -i 来执行初始化。

初始化主要包括三部分:

  1. 在你指定的数据源中建 indexea_tasks 任务表,表结构如下表所示
  2. 创建每个任务对应的数据增删改的触发器,触发器的主要任务是将变更数据写入 indexea_tasks 表中
  3. 执行全量的数据同步(同步完成后 indexer 进程自动终止,同步的时间取决于数据量大小)

[indexea_tasks] 表结构 (MySQL)

字段名类型说明其他说明
idbigint自增长主键
taskvarchar(32)对应 yml 配置中的任务名称
fieldvarchar(32)对象主键字段名
valuevarchar(64)字段值
typetinyint字段类型1 数值,2 字符串
opstinyint操作类型1 添加,2 修改, 3 删除
statustinyint处理状态0 未处理,1 已处理, 2 处理失败, 3 参数错误

其他数据库的表结构与 MySQL 同。

数据库触发器(MySQL)

CREATE TRIGGER indexea_after_insert_[task]
AFTER DELETE
ON <table> FOR EACH ROW
INSERT INTO indexea_tasks(`task`,`field`,`value`,`type`,`status`) VALUES('[task]', '[field]', NEW.<field>, 1, 0);

CREATE TRIGGER indexea_after_update_[task]
AFTER UPDATE
ON <table> FOR EACH ROW
INSERT INTO indexea_tasks(`task`,`field`,`value`,`type`,`status`) VALUES('[task]', '[field]', OLD.<field>, 2, 0);

CREATE TRIGGER indexea_after_delete_[task]
AFTER DELETE
ON <table> FOR EACH ROW
INSERT INTO indexea_tasks(`task`,`field`,`value`,`type`,`status`) VALUES('[task]', '[field]', OLD.<field>, 3, 0);

其中:

  • [task] 对应 indexer.yml 中的任务名称
  • [table] 对应 indexer.yml 中任务对应的表名
  • [field] 对应 indexer.yml 中表配置的主键名称 (primary)

其他数据库原理与 MySQL 相同。

自动增量同步

一旦完成了初始化配置和全量同步后,你可以通过 indexer -d 命令启动服务进入自动增量同步的守护进程。 守护进程会定时读取 indexea_tasks 表中记录即时同步到 indexea

你可以通过查看 indexea.log 来查看服务运行状态。

你可以使用 indexer --stop 或者 kill 命令来停止服务运行。