如何使用Kafka Connect实现同步RDS binlog数据

本文涉及的产品
EMR Serverless Spark 免费试用,1000 CU*H 有效期3个月
EMR Serverless StarRocks,5000CU*H 48000GB*H
简介: 本文介绍如何在E-MapReduce上使用Kafka Connect实现同步RDS binlog数据

1. 背景

在我们的业务开发中,往往会碰到下面这个场景:

  • 业务更新数据写到数据库中
  • 业务更新数据需要实时传递给下游依赖处理

所以传统的处理架构可能会这样:

image

但这个架构也存在着不少弊端:我们需要在项目中维护很多发送消息的代码。新增或者更新消息都会带来不少维护成本。所以,更好的处理方式应该是直接将数据库的数据接入到流式系统中,如下图:
image

本文将演示如何在E-MapReduce上实现将RDS binlog实时同步到Kafka集群中。

2. 环境准备

实验中使用VPC网络环境,以下实例创建时默认都是在VPC环境下。

2.1 准备一个测试RDS数据库

创建一个RDS实例,版本选择5.7。这里不赘述如何创建RDS,详细流程请参考RDS文档。创建完如图:
image

2.2 准备一个Kafka集群

创建一个E-MapReduce Kafka集群,版本选择EMR-3.11.0。需要注意,这里必须选择EMR-3.11.0以上版本,否则不会默认安装启动Kafka Connect服务。详细创建流程请参考E-MapReduce文档。创建完如图:
image

注意:RDS实例和E-MapReduce Kafka集群最好在同一个VPC中,否则需要打通两个VPC之间的网络。

3. Kafka Connect

3.1 Connector

Kafka Connect是一个用于Kafka和其他数据系统之间进行数据传输的工具,它可以实现基于Kafka的数据管道,打通上下游数据源。我们需要做的就是在Kafka Connect服务上运行一个Connector,这个Connector是具体实现如何从/向数据源中读/写数据。Confluent提供了很多Connector实现,你可以在这里下载。不过今天我们使用Debezium提供的一个MySQL Connector插件,下载地址

  • 下载这个插件,并将解压出来的jar包全部拷贝到kafka lib目录下。注意:需要将这些jar包拷贝到Kafka集群所有机器上。
  • 在Kafka集群的服务列表中重启Kafka Connect组件。
    image

3.2 启动Connector

在创建connector前,我们需要做一番配置,这里罗列一些Debezium MySQL Connector的主要配置项:

database.hostname=x.x.x.x
database.port=3306
database.user=tom
database.password=password
database.server.id=123456
database.server.name=fullfillment
database.whitelist=inventory
database.history.kafka.bootstrap.servers=y.y.y.y:9092
database.history.kafka.topic=dbhistory.fullfillment
include.schema.changes=true

登录到Kafka集群,配置并创建一个connector,命令如下:

curl -X POST -H "Content-Type: application/json" 
--data '{"name": "rds-binlog", 
"config": {"connector.class":"io.debezium.connector.mysql.MySqlConnector", 
"database.hostname": "x.x.x.x", "database.port": "3306", 
"database.user": "tom", "database.password": "password", 
"database.server.id": "123456", "database.server.name": "fulfillment", 
"database.history.kafka.bootstrap.servers": "y.y.y.y:9092", 
"database.history.kafka.topic": "dbhistory.fullfillment", 
"include.schema.changes": "true"}}' 
http://emr-worker-1:8083/connectors

这时,我们可以看到一个创建好的connector,如图:
image

3.3 注意事项

  • server_id是多少?:你可以在RDS执行"SELECT @@server_id;"查到。
  • 创建connector时可能会出现连接失败,请确保RDS的白名单已经授权了Kafka集群机器访问。

4 测试

4.1 创建一张表

image

一会之后,Kafka集群中会自动创建一个对应的topic
image

插入几条数据

image

查看binlog数据

查看fulfillment.mugen.students这个topic,是否有刚刚新插入的数据

kafka-console-consumer.sh --zookeeper emr-header-1:2181/kafka-1.0.1 
--topic fulfillment.mugen.students --from-beginning

结果如图所示:

image

5. 资料

目录
相关文章
|
2月前
|
关系型数据库 MySQL Shell
MySQL 备份 Shell 脚本:支持远程同步与阿里云 OSS 备份
一款自动化 MySQL 备份 Shell 脚本,支持本地存储、远程服务器同步(SSH+rsync)、阿里云 OSS 备份,并自动清理过期备份。适用于数据库管理员和开发者,帮助确保数据安全。
|
5月前
|
存储 SQL 关系型数据库
mysql 的ReLog和BinLog区别
MySQL中的重做日志和二进制日志是确保数据库稳定性和可靠性的关键组件。重做日志主要用于事务的持久性和原子性,通过记录数据页的物理修改信息来恢复未提交的事务;而二进制日志记录SQL语句的逻辑变化,支持数据复制、恢复和审计。两者在写入时机、存储方式及配置参数等方面存在显著差异。
143 6
|
1月前
|
SQL 运维 关系型数据库
MySQL Binlog 日志查看方法及查看内容解析
本文介绍了 MySQL 的 Binlog(二进制日志)功能及其使用方法。Binlog 记录了数据库的所有数据变更操作,如 INSERT、UPDATE 和 DELETE,对数据恢复、主从复制和审计至关重要。文章详细说明了如何开启 Binlog 功能、查看当前日志文件及内容,并解析了常见的事件类型,包括 Format_desc、Query、Table_map、Write_rows、Update_rows 和 Delete_rows 等,帮助用户掌握数据库变化历史,提升维护和排障能力。
|
3月前
|
存储 SQL 关系型数据库
MySQL日志详解——日志分类、二进制日志bin log、回滚日志undo log、重做日志redo log
MySQL日志详解——日志分类、二进制日志bin log、回滚日志undo log、重做日志redo log、原理、写入过程;binlog与redolog区别、update语句的执行流程、两阶段提交、主从复制、三种日志的使用场景;查询日志、慢查询日志、错误日志等其他几类日志
237 35
MySQL日志详解——日志分类、二进制日志bin log、回滚日志undo log、重做日志redo log
|
2月前
|
存储 SQL 关系型数据库
mysql的undo log、redo log、bin log、buffer pool
MySQL的undo log、redo log、bin log和buffer pool是确保数据库高效、安全和可靠运行的关键组件。理解这些组件的工作原理和作用,对于优化数据库性能和保障数据安全具有重要意义。通过适当的配置和优化,可以显著提升MySQL的运行效率和数据可靠性。
64 16
|
2月前
|
存储 SQL 关系型数据库
mysql的undo log、redo log、bin log、buffer pool
MySQL的undo log、redo log、bin log和buffer pool是确保数据库高效、安全和可靠运行的关键组件。理解这些组件的工作原理和作用,对于优化数据库性能和保障数据安全具有重要意义。通过适当的配置和优化,可以显著提升MySQL的运行效率和数据可靠性。
49 4
|
3月前
|
SQL 监控 关系型数据库
MySQL原理简介—12.MySQL主从同步
本文介绍了四种为MySQL搭建主从复制架构的方法:异步复制、半同步复制、GTID复制和并行复制。异步复制通过配置主库和从库实现简单的主从架构,但存在数据丢失风险;半同步复制确保日志复制到从库后再提交事务,提高了数据安全性;GTID复制简化了配置过程,增强了复制的可靠性和管理性;并行复制通过多线程技术降低主从同步延迟,保证数据一致性。此外,还讨论了如何使用工具监控主从延迟及应对策略,如强制读主库以确保即时读取最新数据。
MySQL原理简介—12.MySQL主从同步
|
2月前
|
SQL 存储 关系型数据库
简单聊聊MySQL的三大日志(Redo Log、Binlog和Undo Log)各有什么区别
在MySQL数据库管理中,理解Redo Log(重做日志)、Binlog(二进制日志)和Undo Log(回滚日志)至关重要。Redo Log确保数据持久性和崩溃恢复;Binlog用于主从复制和数据恢复,记录逻辑操作;Undo Log支持事务的原子性和隔离性,实现回滚与MVCC。三者协同工作,保障事务ACID特性。文章还详细解析了日志写入流程及可能的异常情况,帮助深入理解数据库日志机制。
206 0
|
6月前
|
消息中间件 关系型数据库 Kafka
一种小资源情况下RDS数据实时同步StarRocks方案
使用一台4C8 G服务器轻松实现2个MySQL实例中通过负责分库分表规则之后的5000多张表的数据实时同步到StarRocks
265 67
|
5月前
|
监控 关系型数据库 MySQL
Flink CDC MySQL同步MySQL错误记录
在使用Flink CDC同步MySQL数据时,常见的错误包括连接错误、权限错误、表结构变化、数据类型不匹配、主键冲突和
284 17