kafka 连接器实现 Mysql 数据同步 Elasticsearch

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
简介: kafka 连接器实现 Mysql 数据同步 Elasticsearch

为什么需要将 Mysql 数据同步到 Elasticsearch

Mysql 作为传统的关系型数据库,主要面向 OLTP,性能优异,支持事务,但是在一些全文检索,复杂查询上面并不快。Elasticsearch 底层基于 Lucense 实现,天然分布式,采用倒排索引存储数据,全文检索效率很高,使用 Elasticsearch 存储业务数据可以很好的解决我们业务中的搜索需求。

kafka 连接器同步方案

Debezium 是捕获数据实时动态变化(change data capture,CDC)的开源的分布式同步平台。能实时捕获到数据源(Mysql、Mongo、PostgreSql)的:新增(inserts)、更新(updates)、删除(deletes)操作,实时同步到Kafka,稳定性强且速度非常快。Debezium 是基于 kafka Connect 的开源项目。

Elasticsearch-Connector 使用主题+分区+偏移量作为事件的唯一标识符,然后在 Elasticsearch 中转换为唯一的文档。它支持使用 Kafka 消息中的键值作为 Elasticsearch 中的文档 Id,并且确保更新按顺序写入 Elasticsearch。image.png如图,Mysql 到 ES 的同步策略,采取“曲线救国”机制。

  • 步骤1:基 Debezium 的binlog 机制,将 Mysql 数据同步到Kafka。
  • 步骤2:基于 Kafka_connector 机制,将 Kafka 数据同步到 Elasticsearch。

MySQL 配置

开启 binlog

Debezium 使用 MySQL 的 binlog 机制实现数据动态变化监测,所以需要 Mysql 提前配置 binlog。

编辑 /etc/my.cnf 的 mysqld 下添加如下配置:

server-id         = 7777
log_bin           = mysql-bin
binlog_format     = row
binlog_row_image  = full
expire_logs_days  = 10

然后,重启一下 Mysql 以使得 binlog 生效。

systemctl restart mysqld.service

检查 binlog 是否开启:

[root@mysql-5 ~]# mysqladmin variables -uroot@123456 | grep log_bin
| log_bin                                                | ON

创建用户

创建用户 debezium,密码 dbz,并授予相关权限:

mysql> GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'debezium' IDENTIFIED BY 'dbz';

创建表并插入数据

mysql> create database school;
mysql> use school;
mysql> create table student (name varchar(20),age int);
mysql> insert into student values('tom',18),('jack',19),('lisa',18);

使用 Debezium 同步 MySQL 数据到 Kafka

安装 Debezium

下载 Debezium 压缩包:

https://www.confluent.io/hub/debezium/debezium-connector-mysql

image.png将压缩包解压到自定义的目录,只要 libs 目录中的 jar 包即可:

[root@kafka1 connect]# ls -l /usr/local/kafka/connect/debezium-connector-mysql
total 9412
-rw-r--r--. 1 root root  337904 Apr  3 22:54 antlr4-runtime-4.7.2.jar
-rw-r--r--. 1 root root   20270 Apr  3 22:54 debezium-api-1.4.0.Final.jar
-rw-r--r--. 1 root root  264910 Apr  3 22:54 debezium-connector-mysql-1.4.0.Final.jar
-rw-r--r--. 1 root root  823056 Apr  3 22:54 debezium-core-1.4.0.Final.jar
-rw-r--r--. 1 root root 2733898 Apr  3 22:54 debezium-ddl-parser-1.4.0.Final.jar
-rw-r--r--. 1 root root    4617 Apr  3 22:54 failureaccess-1.0.1.jar
-rw-r--r--. 1 root root 2858426 Apr  3 22:54 guava-30.0-jre.jar
-rw-r--r--. 1 root root  182602 Apr  3 22:54 mysql-binlog-connector-java-0.23.1.jar
-rw-r--r--. 1 root root 2397321 Apr  3 22:54 mysql-connector-java-8.0.21.jar

修改 Kafka 的 config/connect-distributed.properties 文件,在最后添加如下内容,这里注意 plugin.path 只写到放 jar 包的上一层目录:

plugin.path=/usr/local/kafka/connect

启动 Kafka 连接器

bin/connect-distributed.sh config/connect-distributed.properties

启动完成后,可以查看刚刚安装的 debezium 插件:

[root@kafka1 connect]# curl http://kafka1:8083/connector-plugins -s | jq
[
  {
    "class": "io.debezium.connector.mysql.MySqlConnector",
    "type": "source",
    "version": "1.4.0.Final"
  }
]

新增 connector 连接器实例

为了方便起见,先编辑一个文件 mysql-connector.json:

{
    "name": "mysql-connector", #自定义连接器实例名
    "config":
    {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector", #连接器类库
        "database.hostname": "192.168.1.14", #mysql地址
        "database.port": "3306", #mysql端口号
        "database.user": "debezium",  #用户名
        "database.password": "dbz", #密码
        "database.server.id": "7777",  #对应mysql中的server-id的配置。
        "database.server.name": "cr7-demo", #逻辑名称,每个connector确保唯一,作为写入数据的kafka topic的前缀名称
        "database.history.kafka.bootstrap.servers": "kafka1:9092,kafka2:9092,kafka3:9092", #kafka集群地址
        "database.history.kafka.topic": "cr7-schema-changes-inventory", #存储数据库的Shcema的记录信息,而非写入数据的topic
        "include.schema.changes": "true",
        "database.whitelist": "school", #待同步的mysql数据库名
        "table.whitlelist": "student" #待同步的mysq表名
    }
}

通过 Http Post 请求新增 connector 连接器实例:

curl -X POST -H "Content-Type:application/json" --data @mysql-connector.json http://kafka1:8083/connectors

查看新增的连接器实例:

[root@kafka1 connect]# curl http://kafka1:8083/connectors -s | jq
[
  "mysql-connector"
]

查看连接器实例运行状态:

[root@kafka1 connect]# curl http://kafka1:8083/connectors/mysql-connector/status  -s | jq
{
  "name": "mysql-connector",
  "connector": {
    "state": "RUNNING",
    "worker_id": "192.168.1.87:8083"
  },
  "tasks": [
    {
      "id": 0,
      "state": "RUNNING",
      "worker_id": "192.168.1.87:8083"
    }
  ],
  "type": "source"
}

查看 Kafka 数据

使用下面命令可以消费到 Debezium 根据 binlog 更新写入到 Kafka Topic 中的数据: --from-beginning 表示从头开始消费,如果不加该参数,就只能消费到新增的消息。

kafka-console-consumer.sh \
--bootstrap-server kafka1:9092 \
--topic cr7-demo.school.student \
--from-beginning

Kafka 数据同步到 Elasticsearch

安装 elasticsearch-connector

下载 elasticsearch-connector 压缩包:

https://www.confluent.io/hub/confluentinc/kafka-connect-elasticsearch

image.png下载完成后解压到自定义目录,只要 libs 目录下的 jar 包即可,然后重启 Kafka 连接器:

[root@kafka1 kafka]# ls -l /usr/local/kafka/connect/elasticsearch-connector
total 27048
-rw-r--r--. 1 root root    59860 Apr  3 20:18 aggs-matrix-stats-client-7.0.1.jar
-rw-r--r--. 1 root root   353793 Apr  3 20:18 commons-codec-1.15.jar
-rw-r--r--. 1 root root    61829 Apr  3 20:18 commons-logging-1.2.jar
-rw-r--r--. 1 root root    17265 Apr  3 20:18 common-utils-6.0.1.jar
-rw-r--r--. 1 root root    99939 Apr  3 20:18 compiler-0.9.3.jar
-rw-r--r--. 1 root root 10997301 Apr  3 20:18 elasticsearch-7.0.1.jar
-rw-r--r--. 1 root root    16058 Apr  3 20:18 elasticsearch-cli-7.0.1.jar
-rw-r--r--. 1 root root    38776 Apr  3 20:18 elasticsearch-core-7.0.1.jar
-rw-r--r--. 1 root root    31303 Apr  3 20:18 elasticsearch-geo-7.0.1.jar
-rw-r--r--. 1 root root    62091 Apr  3 20:18 elasticsearch-rest-client-7.0.1.jar
-rw-r--r--. 1 root root   989767 Apr  3 20:18 elasticsearch-rest-high-level-client-7.0.1.jar
-rw-r--r--. 1 root root    10876 Apr  3 20:18 elasticsearch-secure-sm-7.0.1.jar
-rw-r--r--. 1 root root   117634 Apr  3 20:18 elasticsearch-x-content-7.0.1.jar
......

查看安装的 elasticsearch-connector 插件:

[root@kafka1 connect]# curl http://kafka1:8083/connector-plugins -s | jq
[
  {
    "class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "type": "sink",
    "version": "11.0.3"
  },
  {
    "class": "io.debezium.connector.mysql.MySqlConnector",
    "type": "source",
    "version": "1.4.0.Final"
  }
]

为了方便起见,先编辑一个文件 elasticsearch-connector.json:

{
    "name": "elasticsearch-connector",  #自定义连接器实例名
    "config":
    {
        "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",  #连接器类库
        "connection.url": "http://192.168.1.171:9200", #Elasticsearch地址
        "key.ignore": "true",  #Kafka 消息没有指定 key,因此要指定该参数,否则无法消费到 Elasticsearch
        "topics": "cr7-demo.school.student"  #kafka topic名字
    }
}

通过 Http Post 请求新增 connector 连接器实例:

curl -X POST -H "Content-Type:application/json" --data @elasticsearch-connector.json http://kafka1:8083/connectors

查看创建的连接器实例:

[root@kafka1 connect]# curl http://kafka1:8083/connectors -s | jq
[
  "mysql-connector",
  "elasticsearch-connector"
]

查看 Elasticsearch 数据

在 Elasticsearch 上查询 cr7-demo.school.student 索引可以看到数据:

GET cr7-demo.school.student/_search
#返回结果:
{
  "took" : 190,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 5,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "cr7-demo.school.student",
        "_type" : "_doc",
        "_id" : "cr7-demo.school.student+2+0",
        "_score" : 1.0,
        "_source" : {
          "before" : null,
          "after" : {   
            "name" : "tom", #字段内容
            "age" : 18
          },
          "source" : {
            "name" : "cr7-demo",
            "server_id" : 0,
            "ts_sec" : 0,
            "gtid" : null,
            "file" : "mysql-bin.000001", #binlog文件
            "pos" : 995,
            "row" : 0,
            "snapshot" : true,
            "thread" : null,
            "db" : "school", #数据库名
            "table" : "student"  #表名
          },
          "op" : "c",
          "ts_ms" : 1617450734795
        }
      },
     }
     ......
}

参考链接


目录
相关文章
|
3月前
|
SQL DataWorks 关系型数据库
DataWorks操作报错合集之如何处理数据同步时(mysql->hive)报:Render instance failed
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
|
1月前
|
监控 关系型数据库 MySQL
深入了解MySQL主从复制:构建高效稳定的数据同步架构
深入了解MySQL主从复制:构建高效稳定的数据同步架构
120 1
|
1月前
|
消息中间件 关系型数据库 MySQL
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
131 0
|
2月前
|
canal 消息中间件 关系型数据库
Canal作为一款高效、可靠的数据同步工具,凭借其基于MySQL binlog的增量同步机制,在数据同步领域展现了强大的应用价值
【9月更文挑战第1天】Canal作为一款高效、可靠的数据同步工具,凭借其基于MySQL binlog的增量同步机制,在数据同步领域展现了强大的应用价值
621 4
|
3月前
|
关系型数据库 MySQL 数据库
【MySQL】手把手教你MySQL数据同步
【MySQL】手把手教你MySQL数据同步
|
29天前
|
消息中间件 NoSQL 关系型数据库
一文彻底搞定Redis与MySQL的数据同步
【10月更文挑战第21天】本文介绍了 Redis 与 MySQL 数据同步的原因及实现方式。同步的主要目的是为了优化性能和保持数据一致性。实现方式包括基于数据库触发器、应用层双写和使用消息队列。每种方式都有其优缺点,需根据具体场景选择合适的方法。此外,文章还强调了数据同步时需要注意的数据一致性、性能优化和异常处理等问题。
323 0
|
3月前
|
消息中间件 数据采集 关系型数据库
大数据-业务数据采集-FlinkCDC 读取 MySQL 数据存入 Kafka
大数据-业务数据采集-FlinkCDC 读取 MySQL 数据存入 Kafka
56 1
|
3月前
|
SQL 关系型数据库 MySQL
“震撼揭秘!Flink CDC如何轻松实现SQL Server到MySQL的实时数据同步?一招在手,数据无忧!”
【8月更文挑战第7天】随着大数据技术的发展,实时数据同步变得至关重要。Apache Flink作为高性能流处理框架,在实时数据处理领域扮演着核心角色。Flink CDC(Change Data Capture)组件的加入,使得数据同步更为高效。本文介绍如何使用Flink CDC实现从SQL Server到MySQL的实时数据同步,并提供示例代码。首先确保SQL Server启用了CDC功能,接着在Flink环境中引入相关连接器。通过定义源表与目标表,并执行简单的`INSERT INTO SELECT`语句,即可完成数据同步。
337 1
|
3月前
|
SQL canal 关系型数据库
(二十四)全解MySQL之主从篇:死磕主从复制中数据同步原理与优化
兜兜转转,经过《全解MySQL专栏》前面二十多篇的内容讲解后,基本对MySQL单机模式下的各方面进阶知识做了详细阐述,同时在前面的《分库分表概念篇》、《分库分表隐患篇》两章中也首次提到了数据库的一些高可用方案,但前两章大多属于方法论,并未涵盖真正的实操过程。接下来的内容,会以目前这章作为分割点,开启MySQL高可用方案的落地实践分享的新章程!
1616 1
|
3月前
|
canal 关系型数据库 MySQL
"揭秘阿里数据同步黑科技Canal:从原理到实战,手把手教你玩转MySQL数据秒级同步,让你的数据处理能力瞬间飙升,成为技术界的新晋网红!"
【8月更文挑战第18天】Canal是一款由阿里巴巴开源的高性能数据同步系统,它通过解析MySQL的增量日志(Binlog),提供低延迟、可靠的数据订阅和消费功能。Canal模拟MySQL Slave与Master间的交互协议来接收并解析Binary Log,支持数据的增量同步。配置简单直观,包括Server和Instance两层配置。在实战中,Canal可用于数据库镜像、实时备份等多种场景,通过集成Canal Client可实现数据的消费和处理,如更新缓存或写入消息队列。
742 0

热门文章

最新文章