如何使用Kafka Connect实现同步RDS binlog数据

本文涉及的产品
EMR Serverless StarRocks,5000CU*H 48000GB*H
简介: 本文介绍如何在E-MapReduce上使用Kafka Connect实现同步RDS binlog数据

1. 背景

在我们的业务开发中,往往会碰到下面这个场景:

  • 业务更新数据写到数据库中
  • 业务更新数据需要实时传递给下游依赖处理

所以传统的处理架构可能会这样:

image

但这个架构也存在着不少弊端:我们需要在项目中维护很多发送消息的代码。新增或者更新消息都会带来不少维护成本。所以,更好的处理方式应该是直接将数据库的数据接入到流式系统中,如下图:
image

本文将演示如何在E-MapReduce上实现将RDS binlog实时同步到Kafka集群中。

2. 环境准备

实验中使用VPC网络环境,以下实例创建时默认都是在VPC环境下。

2.1 准备一个测试RDS数据库

创建一个RDS实例,版本选择5.7。这里不赘述如何创建RDS,详细流程请参考RDS文档。创建完如图:
image

2.2 准备一个Kafka集群

创建一个E-MapReduce Kafka集群,版本选择EMR-3.11.0。需要注意,这里必须选择EMR-3.11.0以上版本,否则不会默认安装启动Kafka Connect服务。详细创建流程请参考E-MapReduce文档。创建完如图:
image

注意:RDS实例和E-MapReduce Kafka集群最好在同一个VPC中,否则需要打通两个VPC之间的网络。

3. Kafka Connect

3.1 Connector

Kafka Connect是一个用于Kafka和其他数据系统之间进行数据传输的工具,它可以实现基于Kafka的数据管道,打通上下游数据源。我们需要做的就是在Kafka Connect服务上运行一个Connector,这个Connector是具体实现如何从/向数据源中读/写数据。Confluent提供了很多Connector实现,你可以在这里下载。不过今天我们使用Debezium提供的一个MySQL Connector插件,下载地址

  • 下载这个插件,并将解压出来的jar包全部拷贝到kafka lib目录下。注意:需要将这些jar包拷贝到Kafka集群所有机器上。
  • 在Kafka集群的服务列表中重启Kafka Connect组件。
    image

3.2 启动Connector

在创建connector前,我们需要做一番配置,这里罗列一些Debezium MySQL Connector的主要配置项:

database.hostname=x.x.x.x
database.port=3306
database.user=tom
database.password=password
database.server.id=123456
database.server.name=fullfillment
database.whitelist=inventory
database.history.kafka.bootstrap.servers=y.y.y.y:9092
database.history.kafka.topic=dbhistory.fullfillment
include.schema.changes=true

登录到Kafka集群,配置并创建一个connector,命令如下:

curl -X POST -H "Content-Type: application/json" 
--data '{"name": "rds-binlog", 
"config": {"connector.class":"io.debezium.connector.mysql.MySqlConnector", 
"database.hostname": "x.x.x.x", "database.port": "3306", 
"database.user": "tom", "database.password": "password", 
"database.server.id": "123456", "database.server.name": "fulfillment", 
"database.history.kafka.bootstrap.servers": "y.y.y.y:9092", 
"database.history.kafka.topic": "dbhistory.fullfillment", 
"include.schema.changes": "true"}}' 
http://emr-worker-1:8083/connectors

这时,我们可以看到一个创建好的connector,如图:
image

3.3 注意事项

  • server_id是多少?:你可以在RDS执行"SELECT @@server_id;"查到。
  • 创建connector时可能会出现连接失败,请确保RDS的白名单已经授权了Kafka集群机器访问。

4 测试

4.1 创建一张表

image

一会之后,Kafka集群中会自动创建一个对应的topic
image

插入几条数据

image

查看binlog数据

查看fulfillment.mugen.students这个topic,是否有刚刚新插入的数据

kafka-console-consumer.sh --zookeeper emr-header-1:2181/kafka-1.0.1 
--topic fulfillment.mugen.students --from-beginning

结果如图所示:

image

5. 资料

目录
相关文章
|
10天前
|
关系型数据库 MySQL 数据库
ORM对mysql数据库中数据进行操作报错解决
ORM对mysql数据库中数据进行操作报错解决
35 2
|
11天前
|
SQL 关系型数据库 MySQL
MySQL如何排查和删除重复数据
该文章介绍了在MySQL中如何排查和删除重复数据的方法,包括通过组合字段生成唯一标识符以及使用子查询和聚合函数来定位并删除重复记录的具体步骤。
29 2
|
5天前
|
消息中间件 canal 关系型数据库
Maxwell:binlog 解析器,轻松同步 MySQL 数据
Maxwell:binlog 解析器,轻松同步 MySQL 数据
39 11
|
4天前
|
关系型数据库 MySQL 数据库
MySQL的语法涵盖了数据定义、数据操作、数据查询和数据控制等多个方面
MySQL的语法涵盖了数据定义、数据操作、数据查询和数据控制等多个方面
18 5
|
11天前
|
关系型数据库 MySQL 数据库
Python MySQL查询返回字典类型数据的方法
通过使用 `mysql-connector-python`库并选择 `MySQLCursorDict`作为游标类型,您可以轻松地将MySQL查询结果以字典类型返回。这种方式提高了代码的可读性,使得数据操作更加直观和方便。上述步骤和示例代码展示了如何实现这一功能,希望对您的项目开发有所帮助。
37 4
|
8天前
|
存储 SQL 关系型数据库
mysql删除 所有数据
mysql删除 所有数据
|
18天前
|
NoSQL 关系型数据库 MySQL
微服务架构下的数据库选择:MySQL、PostgreSQL 还是 NoSQL?
在微服务架构中,数据库的选择至关重要。不同类型的数据库适用于不同的需求和场景。在本文章中,我们将深入探讨传统的关系型数据库(如 MySQL 和 PostgreSQL)与现代 NoSQL 数据库的优劣势,并分析在微服务架构下的最佳实践。
|
20天前
|
存储 SQL 关系型数据库
使用MySQL Workbench进行数据库备份
【9月更文挑战第13天】以下是使用MySQL Workbench进行数据库备份的步骤:启动软件后,通过“Database”菜单中的“管理连接”选项配置并选择要备份的数据库。随后,选择“数据导出”,确认导出的数据库及格式(推荐SQL格式),设置存储路径,点击“开始导出”。完成后,可在指定路径找到备份文件,建议定期备份并存储于安全位置。
160 11
|
2月前
|
弹性计算 关系型数据库 数据库
手把手带你从自建 MySQL 迁移到云数据库,一步就能脱胎换骨
阿里云瑶池数据库来开课啦!自建数据库迁移至云数据库 RDS原来只要一步操作就能搞定!点击阅读原文完成实验就可获得一本日历哦~
|
15天前
|
存储 SQL 关系型数据库
MySQL的安装&数据库的简单操作
本文介绍了数据库的基本概念及MySQL的安装配置。首先解释了数据库、数据库管理系统和SQL的概念,接着详细描述了MySQL的安装步骤及其全局配置文件my.ini的调整方法。文章还介绍了如何启动MySQL服务,包括配置环境变量和使用命令行的方法。最后,详细说明了数据库的各种操作,如创建、选择和删除数据库的SQL语句,并提供了实际操作示例。
58 13
MySQL的安装&数据库的简单操作
下一篇
无影云桌面