【最佳实践】阿里云Logstash JDBC实现Elasticsearch与关系型数据库保持数据同步-阿里云开发者社区

开发者社区> Elasticsearch 技术团队> 正文
登录阅读全文

【最佳实践】阿里云Logstash JDBC实现Elasticsearch与关系型数据库保持数据同步

简介: 为了充分利用阿里云 Elasticsearch 提供的强大搜索功能,很多公司都会在关系型数据库的基础上,部署 Elasticsearch。这种情况下,则需要确保 Elasticsearch 与所关联关系型数据库中的数据保持同步。 在本篇博文中,我会演示如何使用 Logstash 高效复制数据,将关系型数据库阿里云 RDS 中的数据更新同步到 Elasticsearch 中。

整体概述:

阿里云 logstash-input-jdbc 插件是实现阿里云 Elasticsearch 与 RDS 关系型数据库数据同步的关键,本质是通过 Logstash JDBC 输入插件,运行一个循环来定期对 RDS 进行轮询,从而找到在此次循环上次迭代后插入或更改的记录,如让其正确运行,必须满足如下条件:

1、 在将 RDS 中的文档写入 Elasticsearch 时,Elasticsearch 中的 "_id" 字段必须设置为 RDS 中的 "id" 字段。这可在 RDS 记录与 Elasticsearch 文档之间建立一个直接映射关系,如果在 RDS 中更新了某条记录,那么将会在 Elasticsearch 中覆盖整条相关记录。

注意,在 Elasticsearch 中覆盖文档的效率与更新操作的效率一样高,因为从内部原理上来讲,更新便包括删除旧文档以及随后对全新文档进行索引。

2、在 RDS 中插入或者更新数据时,该记录必须有一个包含更新或插入时间的字段。通过此字段,便可允许 Logstash 仅请求获得在轮询循环的上次迭代后编辑或插入的文档,Logstash 每次对 RDS 进行轮询时,都会保存其从 RDS 所读取最后一条记录的更新或插入时间。在下一次迭代时,Logstash 便知道其仅需请求获得符合下列条件的记录:更新或插入时间晚于在轮询循环中的上一次迭代中所收到的最后一条记录。

注意,Logstash-input-jdbc 插件无法实现“sql delete”操作,需要手动在 Elasticsearch 侧做删除。如果满足上述条件,我们便可配置 Logstash,以定期请求从 RDS 获得新增或已编辑的全部记录,然后将它们写入 Elasticsearch 中。

前期准备:

1、准备阿里云 Elasticsearch 6.7 版本环境,并使用创建的账号密码登录Kibana;
2、准备阿里云 Logstash 6.7 版本环境
3、开通 RDS 服务,并开通阿里云logstash vpc白名单。

阿里云 RDS 准备数据

准备一张学生表(student)表结构如下:

create table student(
id BIGINT(20) UNSIGNED NOT NULL,
PRIMARY KEY (id),
UNIQUE KEY unique_id (id),
stuName nvarchar(5) not null,
stuSex nchar(1) check (stuSex in('男', '女')) default '男' ,
stuAge int check(stuAge>1),
stuDept nvarchar(20),
create_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
update_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);

在上面 RDS 配置中,有如下参数需要特别注意。

1、“student”:是表的名称,数据会从这里读取出来并同步到 Elasticsearch。

2、 “id”:是该条记录的唯一标识符。由于本实验需将id映射为elasticsearch _id,所以需要保证数据的唯一性,这里将“id”定义为 PRIMARY KEY(主键)和 UNIQUE KEY(唯一键),确保每个 “id” 仅在当前表格中出现一次。其将会转换为 “_id”,以用于更新 Elasticsearch 中的文档及向 Elasticsearch 中插入文档。

3、 “update_time”:在 表中插入或更改任何记录时,都会将这个所定义字段的值设置为编辑时间。有了这个编辑时间,我们便能提取自从上次 Logstash 请求从 RDS 获取记录后被编辑的任何记录。

4、 “create_time”:数据首次插入时间,其来跟踪记录最初插入到表中的时间。
其他字段均为用户自定义数据,可按照需求自定义多种字段。

创建后表如下:

image.png

Logstash 同步配置

下列 Logstash 管道会实施在前一部分中所描述的同步代码:

input {
  jdbc {
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_driver_library => "/ssd/1/share/ls-cn-4591f1y6i003/logstash/current/config/custom/mysql-connector-java-8.0.17.jar"
    jdbc_connection_string => "jdbc:mysql://rm-bp1p4vl011t9w9ksf.mysql.rds.aliyuncs.com:3306/terms?useUnicode=true&characterEncoding=utf-8&useSSL=false&allowLoadLocalInfile=false&autoDeserialize=false"
    jdbc_user => "zl_manager"
    jdbc_password => "Elastic@123"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_paging_enabled => true
    jdbc_page_size => 50
    statement => "SELECT *, UNIX_TIMESTAMP(update_time) AS unix_ts_in_secs FROM student WHERE (UNIX_TIMESTAMP(update_time) > :sql_last_value AND update_time < NOW()) ORDER BY update_time ASC"
    record_last_run => true
    clean_run => true
    tracking_column_type => "numeric"
    tracking_column => "unix_ts_in_secs"
    use_column_value => true
    last_run_metadata_path => "/ssd/1/ls-cn-4591f1y6i003/logstash/data/student"
    schedule => "*/5 * * * * *"
  }
}
filter {
  mutate{
    copy => { "id" => "[@metadata][_id]"}
    remove_field => ["id", "@version", "unix_ts_in_secs"]
  }
}
output {
 elasticsearch {
    hosts => "es-cn-mp91kzb8m0009pjzh.elasticsearch.aliyuncs.com:9200"
    index => "student"
    user => "elastic"
    password => "Elastic@123"
    doc_as_upsert => true
    action => "update"
    document_id => "%{[@metadata][_id]}"
    
  }
}

重点说明以下部分:

1、“tracking_column”:此字段指定 unix_ts_in_secs,用于标记 Logstash 从数据库读取的最后一个文档,存储在last_run_metadata_path指定的文件下,该值将会用于确定 Logstash 在其轮询循环的下一次迭代中所请求文档的起始值。

2、“unix_ts_in_secs”:这是一个由上述 SELECT 语句生成的字段,包含可作为标准 Unix 时间戳(自 Epoch 起秒数)的 “update_time”。我们刚讨论的 “tracking column” 会引用该字段。Unix 时间戳用于跟踪进度,而非作为简单的时间戳;如将其作为简单时间戳,可能会导致错误,因为在 UMT 和本地时区之间正确地来回转换是一个十分复杂的过程。

3、“sql_last_value”:这是一个内置参数,包括 Logstash 轮询循环中当前迭代的起始点,上面 JDBC 输入配置中的 SELECT 语句便会引用这一参数。该字段会设置为 “unix_ts_in_secs”的最新值。在 Logstash 轮询循环内所执行的查询中,其会用作所返回文档的起点。通过在查询中加入这一变量,能够确保不会将之前传播到 Elasticsearch 的插入或更新内容重新发送到 Elasticsearch。

4、“schedule”:其会使用 cron 语法来指定 Logstash 应当以什么频率对 RDS 进行轮询以查找变更。这里所指定的 "/5 " 会告诉 Logstash 每 5 秒钟联系一次表。

5、“filter”:在这一部分,我们只需简单地将 RDS 记录中的 “id” 值复制到名为 “_id” 的元数据字段,因为我们之后输出时会引用这一字段,以确保写入 Elasticsearch 的每个文档都有正确的 “_id” 值。通过使用元数据字段,可以确保这一临时值不会导致创建新的字段。我们还从文档中删除了 “id”、“@version” 和 “unix_ts_in_secs” 字段,因为我们不希望将这些字段写入到 Elasticsearch 中。

6、“output”:在这一部分,我们指定每个文档都应当写入 Elasticsearch,还需为其分配一个 “_id”(需从我们在筛选部分所创建的元数据字段提取出来)。

SELECT 语句正确性分析

在这一部分,我们会详加解释为什么在 SELECT 语句中添加 update_time < NOW() 至关重要。为帮助解释这一概念,我们首先给出几个反面例子,向您演示为什么两种最直观的方法行不通。然后会解释为什么添加 update_time < NOW() 能够克服那两种直观方法所导致的问题。

直观方法应用情况:一

在这一部分,我们会演示如果 WHERE 子句中不包括 update_time < NOW(),而仅仅指定 UNIX_TIMESTAMP(update_time) > :sql_last_value 的话,会发生什么情况。在这种情况下,SELECT 语句如下:

statement => "SELECT *, UNIX_TIMESTAMP(update_time) AS unix_ts_in_secs FROM student WHERE (UNIX_TIMESTAMP(update_time) > :sql_last_value AND update_time < NOW()) ORDER BY update_time ASC"

乍看起来,上面的方法好像可以正常运行,但是对于一些边缘情况,可能会错过一些文档。举例说明,我们假设 RDS 现在每秒插入两个文档,Logstash 每 5 秒执行一次 SELECT 语句。具体如下图所示,T0 到 T10 分别代表每一秒, RDS 中的数据则以 R1 到 R22 表示。我们假定 Logstash 轮询循环的第一个迭代发生在 T5,其会读取文档 R1 到 R11,如蓝绿色的方框所示。在 sql_last_value 中存储的值现在是 T5,因为这是所读取最后一条记录 (R11) 的时间戳。我们还假设在 Logstash 从 RDS 读取完文件后,另一个时间戳为 T5 的文档 R12 立即插入到了 RDS 中。

在上述 SELECT 语句的下一个迭代中,我们仅会提取时间晚于 T5 的文档(因为 WHERE (UNIX_TIMESTAMP(update_time) > :sql_last_value) 就是如此规定的),这也就意味着将会跳过记录 R12。您可以参看下面的图表,其中蓝绿色方框表示 Logstash 在当前迭代中读取的记录,灰色方框表示 Logstash 之前读取的记录。

image.png

请注意,如果使用这种情况中的 SELECT 语句,记录 R12 永远不会写到 Elasticsearch 中。

直观方法应用情况:二

为了解决上面的问题,您可能决定更改 WHERE 子句为 greater than or equals(晚于或等于),具体如下:

statement => "SELECT *, UNIX_TIMESTAMP(update_time) AS unix_ts_in_secs FROM student WHERE (UNIX_TIMESTAMP(update_time) > :sql_last_value AND update_time < NOW()) ORDER BY update_time ASC"

然而,这种实施策略也并不理想。这种情况下的问题是:在最近一个时间间隔内从 RDS 读取的最近文档会重复发送到 Elasticsearch。尽管这不会对结果的正确性造成任何影响,但的确做了无用功。和前一部分类似,在最初的 Logstash 轮询迭代后,下图显示了已经从 RDS 读取了哪些文档。

image.png

当执行后续的 Logstash 轮询迭代时,我们会将时间晚于或等于 T5 的文档全部提取出来。可以参见下面的图表。

请注意:记录 11(紫色显示)会再次发送到 Elasticsearch。

image.png

前面两种情况都不甚理想。在第一种情况中,会丢失数据,而在第二种情况中,会从 RDS 读取冗余数据并将这些数据发送到 Elasticsearch。

如何解决直观方法所带来的的问题

鉴于前面两种情况都不太理想,应该采用另一种办法。通过指定

(UNIX_TIMESTAMP(update_time) > :sql_last_value AND update_time < NOW())

我们会将每个文档都发送到 Elasticsearch,而且只发送一次。
请参见下面的图表,其中当前的 Logstash 轮询会在 T5 执行。请注意,由于必须满足

update_time < NOW()

所以只会从 RDS 中读取截至(但不包括)时间段 T5 的文档。由于我们已经提取了 T4 的全部文档,而未读取 T5 的任何文档,所以我们知道对于下一次的 Logstash 轮询迭代,sql_last_value 将会被设置为 T4。

image.png

下图演示了在 Logstash 轮询的下一次迭代中将会发生什么情况。由于 UNIX_TIMESTAMP(update_time) > :sql_last_value,并且 sql_last_value 设置为 T4,我们知道仅会从 T5 开始提取文档。此外,由于只会提取满足 update_time < NOW() 的文档,所以仅会提取到截至(含)T9 的文档。再说一遍,这意味着 T9 中的所有文档都已提取出来,而且对于下一次迭代 sql_last_value 将会设置为 T9。所以这一方法消除了对于任何给定时间间隔仅检索到 MySQL 文档的一个子集的风险。

测试系统

image.png

测试系统

JDBC 输入计划触发了从 RDS 读取记录的操作并将记录写入 Elasticsearch 后,我们即可运行下列 Elasticsearch 查询来查看 Elasticsearch 中的文档:

GET student/_search

其会返回类似下面回复的内容:

{
  "took" : 2,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : 5,
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "student",
        "_type" : "doc",
        "_id" : "20205002",
        "_score" : 1.0,
        "_source" : {
          "update_time" : "2020-05-19T08:43:19.000Z",
          "stuage" : 25,
          "@timestamp" : "2020-05-19T09:08:02.173Z",
          "stuname" : "张雨",
          "studept" : "大个子",
          "create_time" : "2020-05-19T08:43:19.000Z",
          "stusex" : "男"
        }
      },
      {
        "_index" : "student",
        "_type" : "doc",
        "_id" : "20205005",
        "_score" : 1.0,
        "_source" : {
          "stuage" : 28,
          "stuname" : "大禹",
          "create_time" : "2020-05-19T12:12:22.000Z",
          "stusex" : "男",
          "update_time" : "2020-05-19T12:12:22.000Z",
          "studept" : "咖啡",
          "@timestamp" : "2020-05-19T12:13:00.160Z"
        }
      },
      {
        "_index" : "student",
        "_type" : "doc",
        "_id" : "20205001",
        "_score" : 1.0,
        "_source" : {
          "update_time" : "2020-05-19T08:42:39.000Z",
          "stuage" : 27,
          "@timestamp" : "2020-05-19T09:08:02.140Z",
          "stuname" : "赵弘景",
          "studept" : "健身",
          "create_time" : "2020-05-19T08:42:39.000Z",
          "stusex" : "男"
        }
      },
      {
        "_index" : "student",
        "_type" : "doc",
        "_id" : "20205004",
        "_score" : 1.0,
        "_source" : {
          "update_time" : "2020-05-19T08:44:54.000Z",
          "stuage" : 23,
          "@timestamp" : "2020-05-19T09:08:02.191Z",
          "stuname" : "潘多",
          "studept" : "跳舞",
          "create_time" : "2020-05-19T08:44:54.000Z",
          "stusex" : "女"
        }
      },
      {
        "_index" : "student",
        "_type" : "doc",
        "_id" : "20205003",
        "_score" : 1.0,
        "_source" : {
          "update_time" : "2020-05-19T08:44:11.000Z",
          "stuage" : 26,
          "@timestamp" : "2020-05-19T09:08:02.175Z",
          "stuname" : "黄磊",
          "studept" : "烹饪",
          "create_time" : "2020-05-19T08:44:11.000Z",
          "stusex" : "男"
        }
      }
    ]
  }
}

然后我们可以使用下列命令更新 RDS id=20205003的数据。

update `student` set `stuDept`='好男人,烹饪大厨' where `id`='20205003';

jdbc 会正确更新 _id 被标识为 20205003 的数据,通过以下命令查看 Elasticsearch 文档:

GET student/_doc/20205003

其会返回一个类似下面的文档:

{
  "_index" : "student",
  "_type" : "_doc",
  "_id" : "20205003",
  "_version" : 3,
  "_seq_no" : 2,
  "_primary_term" : 1,
  "found" : true,
  "_source" : {
    "update_time" : "2020-05-19T12:27:42.000Z",
    "stuage" : 26,
    "@timestamp" : "2020-05-19T12:28:00.125Z",
    "stuname" : "黄磊",
    "studept" : "好男人,烹饪大厨",
    "create_time" : "2020-05-19T08:44:11.000Z",
    "stusex" : "男"
  }
}

请注意 _version 现已设置为 3,update_time 现在已不同于 "create_time" ,并且 "stuname" 字段已正确更新至新值。在本例中,@timestamp 字段的用处并不大,由 Logstash 默认添加。

RDS 中的更新/插入 (upsert) 可通过下列命令完成,您可以验证正确信息是否会反映在 Elasticsearch 中:

insert `student`(`id`,`stuName`,`stuSex`,`stuAge`,`stuDept`) values('20205008','悦来','女',28,'跳跳');

声明:本文由“Logstash:如何使用 Logstash 和 JDBC 确保 Elasticsearch 与关系型数据库保持同步”基于阿里云服务环境授权改编

原文作者:Elastic 中国社区布道师——刘晓国
合作编辑:张蕾/大禹
出处链接:https://elasticstack.blog.csdn.net/


image.png

阿里云Elastic Stack】100%兼容开源ES,独有9大能力

相关活动


更多折扣活动,请访问阿里云 Elasticsearch 官网

阿里云 Elasticsearch 商业通用版,1核2G ,SSD 20G首月免费
阿里云 Logstash 2核4G首月免费


image.png

image.png

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

分享:

Elasticsearch 作为一个分布式、高扩展、实时的搜索与数据分析引擎,因其轻量级、稳定、可靠、快速等特性受到越来越多开发者的青睐,在搜索、日志分析、运维监控和安全分析等领域得到广泛应用。

官方博客
友情链接