如何使用Kafka Connect实现同步RDS binlog数据

简介: 本文介绍如何在E-MapReduce上使用Kafka Connect实现同步RDS binlog数据

1. 背景

在我们的业务开发中,往往会碰到下面这个场景:

  • 业务更新数据写到数据库中
  • 业务更新数据需要实时传递给下游依赖处理

所以传统的处理架构可能会这样:

image

但这个架构也存在着不少弊端:我们需要在项目中维护很多发送消息的代码。新增或者更新消息都会带来不少维护成本。所以,更好的处理方式应该是直接将数据库的数据接入到流式系统中,如下图:
image

本文将演示如何在E-MapReduce上实现将RDS binlog实时同步到Kafka集群中。

2. 环境准备

实验中使用VPC网络环境,以下实例创建时默认都是在VPC环境下。

2.1 准备一个测试RDS数据库

创建一个RDS实例,版本选择5.7。这里不赘述如何创建RDS,详细流程请参考RDS文档。创建完如图:
image

2.2 准备一个Kafka集群

创建一个E-MapReduce Kafka集群,版本选择EMR-3.11.0。需要注意,这里必须选择EMR-3.11.0以上版本,否则不会默认安装启动Kafka Connect服务。详细创建流程请参考E-MapReduce文档。创建完如图:
image

注意:RDS实例和E-MapReduce Kafka集群最好在同一个VPC中,否则需要打通两个VPC之间的网络。

3. Kafka Connect

3.1 Connector

Kafka Connect是一个用于Kafka和其他数据系统之间进行数据传输的工具,它可以实现基于Kafka的数据管道,打通上下游数据源。我们需要做的就是在Kafka Connect服务上运行一个Connector,这个Connector是具体实现如何从/向数据源中读/写数据。Confluent提供了很多Connector实现,你可以在这里下载。不过今天我们使用Debezium提供的一个MySQL Connector插件,下载地址

  • 下载这个插件,并将解压出来的jar包全部拷贝到kafka lib目录下。注意:需要将这些jar包拷贝到Kafka集群所有机器上。
  • 在Kafka集群的服务列表中重启Kafka Connect组件。
    image

3.2 启动Connector

在创建connector前,我们需要做一番配置,这里罗列一些Debezium MySQL Connector的主要配置项:

database.hostname=x.x.x.x
database.port=3306
database.user=tom
database.password=password
database.server.id=123456
database.server.name=fullfillment
database.whitelist=inventory
database.history.kafka.bootstrap.servers=y.y.y.y:9092
database.history.kafka.topic=dbhistory.fullfillment
include.schema.changes=true

登录到Kafka集群,配置并创建一个connector,命令如下:

curl -X POST -H "Content-Type: application/json" 
--data '{"name": "rds-binlog", 
"config": {"connector.class":"io.debezium.connector.mysql.MySqlConnector", 
"database.hostname": "x.x.x.x", "database.port": "3306", 
"database.user": "tom", "database.password": "password", 
"database.server.id": "123456", "database.server.name": "fulfillment", 
"database.history.kafka.bootstrap.servers": "y.y.y.y:9092", 
"database.history.kafka.topic": "dbhistory.fullfillment", 
"include.schema.changes": "true"}}' 
http://emr-worker-1:8083/connectors

这时,我们可以看到一个创建好的connector,如图:
image

3.3 注意事项

  • server_id是多少?:你可以在RDS执行"SELECT @@server_id;"查到。
  • 创建connector时可能会出现连接失败,请确保RDS的白名单已经授权了Kafka集群机器访问。

4 测试

4.1 创建一张表

image

一会之后,Kafka集群中会自动创建一个对应的topic
image

插入几条数据

image

查看binlog数据

查看fulfillment.mugen.students这个topic,是否有刚刚新插入的数据

kafka-console-consumer.sh --zookeeper emr-header-1:2181/kafka-1.0.1 
--topic fulfillment.mugen.students --from-beginning

结果如图所示:

image

5. 资料

目录
相关文章
|
3天前
|
关系型数据库 MySQL API
实时计算 Flink版产品使用合集之可以通过mysql-cdc动态监听MySQL数据库的数据变动吗
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
79 0
|
3天前
|
关系型数据库 MySQL OLAP
实时计算 Flink版产品使用合集之可以支持 MySQL 数据源的增量同步到 Hudi 吗
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
27 4
|
1天前
|
自然语言处理 监控 关系型数据库
mysql造数据占用临时表空间
【5月更文挑战第20天】MySQL在处理复杂查询时可能使用临时表,可能导致性能下降。临时表用于排序、分组和连接操作。常见问题包括内存限制、未优化的查询、数据类型不当和临时表清理。避免过度占用的策略包括优化查询、调整系统参数、优化数据类型和事务管理。使用并行查询、分区表和监控工具也能帮助管理临时表空间。通过智能问答工具如通义灵码,可实时续写SQL和获取优化建议。注意监控`Created_tmp_tables`和`Created_tmp_disk_tables`以了解临时表使用状况。
110 5
|
2天前
|
Oracle 关系型数据库 MySQL
实时计算 Flink版操作报错合集之用CTAS从mysql同步数据到hologres,改了字段长度,报错提示需要全部重新同步如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
45 8
|
3天前
|
SQL 资源调度 关系型数据库
实时计算 Flink版产品使用合集之在抓取 MySQL binlog 数据时,datetime 字段会被自动转换为时间戳形式如何解决
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
12 2
|
3天前
|
消息中间件 关系型数据库 MySQL
MySQL 到 Kafka 实时数据同步实操分享(1),字节面试官职级
MySQL 到 Kafka 实时数据同步实操分享(1),字节面试官职级
|
4天前
|
消息中间件 关系型数据库 MySQL
实时计算 Flink版产品使用合集之2.2.1版本同步mysql数据写入doris2.0 ,同步完了之后增量的数据延迟能达到20分钟甚至一直不写入如何解决
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
14 1
|
4天前
|
关系型数据库 MySQL 数据处理
实时计算 Flink版产品使用合集之在同步MySQL的时候卡在某个binlog文件处如何解决
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
15 1
实时计算 Flink版产品使用合集之在同步MySQL的时候卡在某个binlog文件处如何解决
|
6天前
|
存储 安全 关系型数据库
Mysql 的binlog日志的优缺点
MySQL的binlog(二进制日志)是一个记录数据库更改的日志文件,它包含了所有对数据库执行的更改操作,如INSERT、UPDATE和DELETE等。binlog的主要目的是复制和恢复。以下是binlog日志的优缺点: ### 优点: 1. **数据恢复**:当数据库出现意外故障或数据丢失时,可以利用binlog进行点恢复(point-in-time recovery),将数据恢复到某一特定时间点。 2. **主从复制**:binlog是实现MySQL主从复制功能的核心组件。主服务器将binlog中的事件发送到从服务器,从服务器再重放这些事件,从而实现数据的同步。 3. **审计**:b
|
6天前
|
SQL 关系型数据库 MySQL
mysql的binlog恢复数据
mysql的binlog恢复数据
34 0