indexer - 数据自动同步工具
indexer 是一个实现将客户数据(支持包括 SQL 等多种数据库、文件目录、队列)自动同步到 indexea
的工具。indexer 采用 rust 语言开发,体积小、支持跨平台,简单轻便,资源占用少,同时对业务系统的侵入很小。indexer 提供非常灵活的配置 (indexer.yml
),以非常高效方式将业务数据同步到 indexea
。
该工具基于 MIT 许可证开源,开源地址:https://gitee.com/indexea/indexer
使用说明
配置文件
indexer
使用 YAML 文件(indexer.yml
)进行灵活的自定义配置,配置包括三部分内容:
序号 | 配置名 | 配置说明 | 其他说明 |
---|---|---|---|
1 | datasource | 数据源定义 | 允许配置多个数据库,当前支持 MySQL 和 Postgres |
2 | endpoint | Indexea 服务配置 | 只能配置单个服务,如果需要使用多个服务,可以运行多个 indexer 的实例 |
3 | tasks | 同步任务配置 | 配置 datasource 的数据如何同步到 endpoint 中,可以配置一到多个同步任务 |
indexer.yml
datasource:
mydb: # datasource name
source: 'mysql' # mysql,postgres,oracle
url: 'mysql://account:passwd@localhost:3306/mydb'
endpoint:
url: 'https://api.indexea.com/v1/' # Indexea Gateway API
access_token: 't0ezsaje7zbz2wvzxtdlzafcr2q0ur2ligql1spm'
indices:
repositories: # index name
app: 'tw6mi8il' # app ient
index: 1005 # index id
blogs:
app: 'tw6mi8il'
index: 1006
tasks:
repositories: # task name
datasource: 'mydb' # datasource name, refer to $datasource.mydb
index: 'repositories' # index name, refer to $endpoint.indices.repositories
table: 'repositories' # table name
primary: 'id' # primary field name of table
interval: 1000 # check interval in mills-second
blogs:
datasource: 'mydb'
index: 'blogs'
table: 'blogs'
primary: 'id'
sql: 'SELECT b.*, a.name AS author_name FROM blogs b LEFT JOIN authors a ON b.author = a.id' ##指定 SQL ,否则默认为 SELECT * FROM <table>
fields:
author_name: 'user' # 字段名映射,例:在 indexea 将使用 user 字段名来表示 author_name 的值
password: '' # 删除某个字段,一些敏感信息不需要进行搜索的可以通过这种方式避免同步到 indexea
status: # 自定义字段值映射
0: 'pending'
1: 'open'
2: 'block'
3: 'delete'
interval: 2000
配置文件详解
1. 定义数据源
当前 indexer 的数据源支持 MySQL 和 Postgres,你可以在 datasource
中定义多个数据源,每个数据源需要指定一个唯一的名称,然后在 tasks
中的 datasource
引用该名称。
2. 定义 Indexea 服务信息
在 endpoint
中定义 Indexea 服务的信息,包括服务 URL 地址( 一般为 https://api.indexea.com/v1/
,如果你使用的是私有部署的 Indexea 则填写该服务的访问地址即可 )、access_token
访问令牌,关于访问令牌的获取请参考 访问令牌。
同时需要定义需要同步的目标索引信息,每一个索引信息包括:名称、应用标识、索引编号,你可以在 Indexea 管理后台中查看应用标识和索引编号。形式如下:
indices:
repositories: # index name
app: 'tw6mi8il' # app ient
index: 1005 # index id
blogs:
app: 'tw6mi8il'
index: 1006
3. 定义同步任务
有了数据源和 Indexea 服务信息后,接下来就可以定义同步任务了,对应的配置项是 tasks
。每个同步任务需要指定所用的数据源名称 datasource
和目标索引名称 index
。同时需要指定同步的数据表名称 table
和表的主键字段名称 primary
,这两个字段是必须的。
这是最基本的配置,用来将某个表的数据同步到 Indexea 中,表中的字段名对应索引的字段名,每一行记录相当于索引中的一个文档。但经常这样的配置是不够的,因为你可能希望在同步数据前对数据做一些简单的处理,这就需要使用 sql
和 fields
配置进行定义。
首先我们可以通过 sql
指令来指定数据的获取,因为经常我们需要通过关联表来获取额外的信息,例如一个博客表中只有作者的 ID,我们需要通过作者表来获取作者的名称,这时候就可以通过 sql
来指定获取数据的 SQL 语句,例如:
SELECT b.*, a.name AS author_name FROM blogs b LEFT JOIN authors a ON b.author = a.id
在定义完数据获取的方式后,我们还需要定义字段的映射关系,indexer
支持以下几种字段的映射关系:
-
字段名映射(改字段名):
fields
中的key
为数据源中的字段名,value
为索引中的字段名,例如:author_name: 'user'
表示将 author_name 的值同步到索引中的 user 字段中 -
对字段内容 进行 HTML 标签清理:如果你的字段内容是一段 HTML 内容(例如博客文章的内容一般以 HTML 格式存储),但在搜索中这些 HTML 标签会对搜索结果产生干扰,因此需要进行清理,使用 HTML 清理的配置如下:
fields:
body:
html_strip:
new_field_name: 'body_text' # 清理后的字段名这个过程你还可以顺手将 HTML 内容中的图像提取出来并存为一个新的数组字段,例如:
fields:
body:
html_strip:
extract_images_to_field: 'images' -
对字段值进行切分 :有些字段的值可能是一个字符串,但我们希望将其切分为一个数组,例如:博客文章的标签字段使用逗号隔开多个标签,我们希望将其切分为一个数组,这时候可以使用
split
配置,例如:fields:
tags:
split:
separator: ',' # 分隔符 -
对字段进行值映射:例如你希望将某个数值字段的值映射为一个字符串,例如:博客文章的状态字段,0 表示待审核,1 表示已发布,2 表示已屏蔽,3 表示已删除,这时候可以使用如下配置:
fields:
status:
0: 'pending'
1: 'open'
2: 'block'
3: 'delete' -
Markdown 字段值预处理: 配置方式与 html_strip 一致,只需将 html_strip 替换成 markdown 即可。
-
删除字段:有些字段是敏感信息,不希望同步到索引中,可以通过如下配置来删除:
fields:
password: '' # 删除 password 字段
对超大量记录的提速处理
indexer 在读取原数据时是分批读取的,越读到后面的记录,读取的速度越慢,这是因为数据库的索引机制导致的,为了解决这个问题,可以通过 scroll_by_id: true
配置来指定通过主键字段进行记录为条件进行分批次读取,例如:
repositories:
datasource: 'mysql'
index: 'repositories'
table: 'repositories'
primary: 'id' # 主键字段名称
scroll_by_id: true
程序参数
-c [indexer.yml]
指定配置文件路径
-l [indexer.log]
指定日志文件路径
-t --test
测试配置是否正确
-i --init
在本地数据源上建立 indexer 必备的信息,例如在数据库上建 indexea_tasks
表和触发器等。
然后执行数据首次同步,也就是全量的数据同步,如果使用该参数,需要用户进行二次确认
-d --start
进入守护进程方式开始增量数据同步
--clean
清除 indexer 生成的表、函数和触发器,
当 不再使用 indexer 时可以使用该参数来清除数据。
--stop
停止守护进程
数据同步流程
初始化和全量同步
命令:indexer -i
首次使用前需要进行配置,你可以运行 indexer -t
来测试配置文件是否正确,然后运行 indexer -i
来执行初始化。
初始化主要包括三部分:
- 在你指定的数据源中建
indexea_tasks
任务表,表结构如下表所示 - 创建每个任务对应的数据增删改的触发器,触发器的主要任务是将变更数据写入
indexea_tasks
表中 - 执行全量的数据同步(同步完成后 indexer 进程自动终止,同步的时间取决于数据量大小)
[indexea_tasks] 表结构 (MySQL)
字段名 | 类型 | 说明 | 其他说明 |
---|---|---|---|
id | bigint | 自增长主键 | |
task | varchar(32) | 对应 yml 配置中的任务名称 | |
field | varchar(32) | 对象主键字段名 | |
value | varchar(64) | 字段值 | |
type | tinyint | 字段类型 | 1 数值,2 字符串 |
ops | tinyint | 操作类型 | 1 添加,2 修改, 3 删除 |
status | tinyint | 处理状态 | 0 未处理,1 已处 理, 2 处理失败, 3 参数错误 |
其他数据库的表结构与 MySQL 同。
数据库触发器(MySQL)
CREATE TRIGGER indexea_after_insert_[task]
AFTER DELETE
ON <table> FOR EACH ROW
INSERT INTO indexea_tasks(`task`,`field`,`value`,`type`,`status`) VALUES('[task]', '[field]', NEW.<field>, 1, 0);
CREATE TRIGGER indexea_after_update_[task]
AFTER UPDATE
ON <table> FOR EACH ROW
INSERT INTO indexea_tasks(`task`,`field`,`value`,`type`,`status`) VALUES('[task]', '[field]', OLD.<field>, 2, 0);
CREATE TRIGGER indexea_after_delete_[task]
AFTER DELETE
ON <table> FOR EACH ROW
INSERT INTO indexea_tasks(`task`,`field`,`value`,`type`,`status`) VALUES('[task]', '[field]', OLD.<field>, 3, 0);
其中:
- [task] 对应 indexer.yml 中的任务名称
- [table] 对应 indexer.yml 中任务对应的表名
- [field] 对应 indexer.yml 中表配置的主键名称 (primary)
其他数据库原理与 MySQL 相同。
自动增量同步
一旦完成了初始化配置和全量同步后,你可以通过 indexer -d
命令启动服务进入自动增量同步的守护进程。
守护进程会定时读取 indexea_tasks
表中记录即时 同步到 indexea
。
你可以通过查看 indexea.log
来查看服务运行状态。
你可以使用 indexer --stop
或者 kill
命令来停止服务运行。