使用EMR-Kafka Connect进行数据迁移

简介: 流式处理中经常会遇到Kafka与其他系统进行数据同步或者Kafka集群间数据迁移的情景。使用EMR Kafka Connect可以方便快速的实现数据同步或者数据迁移。本文介绍使用EMR Kafka Connect的REST API接口在Kafka集群间进行数据迁移。

1.背景

流式处理中经常会遇到Kafka与其他系统进行数据同步或者Kafka集群间数据迁移的情景。使用EMR Kafka Connect可以方便快速的实现数据同步或者数据迁移。

Kafka Connect是一种可扩展的、可靠的,用于在Kafka和其他系统之间快速地进行流式数据传输的工具。例如可以使用Kafka Connect获取数据库的binglog数据,将数据库的数据迁入Kafka集群,以同步数据库的数据,或者对接下游的流式处理系统。同时,Kafka Connect提供的REST API接口可以方便的进行Kafka Connect的创建和管理。
Kafka Connect分为standalone和distributed两种运行模式。standalone模式下,所有的worker都在一个进程中运行;相比之下,distributed模式更具扩展性和容错性,是最常用的方式,也是生产环境推荐使用的模式。

本文介绍使用EMR Kafka Connect的REST API接口在Kafka集群间进行数据迁移,使用distributed模式。

2.环境准备

创建两个EMR集群,集群类型为Kafka。EMR Kafka Connect安装在task节点上,进行数据迁移的目的Kafka集群需要创建task节点。集群创建好后,task节点上EMR Kafka Connect服务会默认启动,端口号为8083。

注意要保证两个集群的网路互通,详细的创建流程见创建集群

3.数据迁移

3.1准备工作

EMR Kafka Connect的配置文件路径为/etc/ecm/kafka-conf/connect-distributed.properties,可以查看所有的配置。修改配置项请参考组件参数配置

在源Kafka集群创建需要同步的topic,例如

1

另外,Kafka Connect会将offsets, configs和任务状态保存在topic中,topic名对应配置文件中的offset.storage.topic、config.storage.topic 和status.storage.topic三个配置项。默认的,Kafka Connect会自动的使用默认的partition和replication factor创建这三个topic。

3.2创建Kafka Connect

在目的Kafka集群的task节点(例如emr-worker-3节点),使用curl命令通过json数据创建一个Kafka Connect。

curl -X POST -H "Content-Type: application/json" --data '{"name": "connect-test", "config": { "connector.class": "EMRReplicatorSourceConnector", "key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter", "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter", "src.kafka.bootstrap.servers": "${src-kafka-ip}:9092", "src.zookeeper.connect": "${src-kafka-curator-ip}:2181", "dest.zookeeper.connect": "${dest-kafka-curator-ip}:2181", "topic.whitelist": "${source-topic}", "topic.rename.format": "${dest-topic}", "src.kafka.max.poll.records": "300" } }' http://emr-worker-3:8083/connectors

json数据中,name字段代表创建的connect的名称,此处为connect-test;config字段需要根据实际情况进行配置,其中的变量说明如下表

字段 说明
topic.whitelist 源Kafka集群中需要同步的topic,多个topic用逗号隔开,例如connect
topic.rename.format 可选配置项,目的Kafka集群中同步后的topic,默认值为${topic.whitelist}.replica。例如源topic为connect,同步后的topic为connect.replica
src.kafka.bootstrap.servers 源Kafka集群broker地址
src.zookeeper.connect 源Kafka集群安装了zookeeper服务的节点内网IP
dest.zookeeper.connect 目的Kafka集群安装了zookeeper服务的节点内网IP

3.3查看Kafka Connect

查看所有的Kafka Connect

2

查看创建的connect-test的状态

3

查看task的信息

4

3.4数据同步

在源Kafka集群创建需要同步的数据。

5

3.5查看同步结果

在目的Kafka集群消费同步的数据。

6

可以看到,在源Kafka集群发送的100000条数据已经迁移到了目的Kafka集群。

4.小结

本文介绍并演示了使用EMR kafka Connect在Kafka集群间进行数据迁移的方法,关于Kafka Connect更详细的使用请参考Kafka官网资料REST API使用


目录
相关文章
lda模型和bert模型的文本主题情感分类实战
lda模型和bert模型的文本主题情感分类实战
559 0
|
Java Android开发
Rockchip系列之CAN APP测试应用实现(4)
Rockchip系列之CAN APP测试应用实现(4)
547 1
|
NoSQL MongoDB 数据库
MongoDB 删除数据库
10月更文挑战第13天
373 0
|
SQL 分布式计算 DataWorks
实时数仓 Hologres产品使用合集之查询分区表的生命周期(即之前设置的'auto_partitioning.num_retention'值)的SQL语句,可以使用什么查询
实时数仓Hologres是阿里云推出的一款高性能、实时分析的数据库服务,专为大数据分析和复杂查询场景设计。使用Hologres,企业能够打破传统数据仓库的延迟瓶颈,实现数据到决策的无缝衔接,加速业务创新和响应速度。以下是Hologres产品的一些典型使用场景合集。
364 0
|
存储 Linux KVM
2021最新版KVM虚拟机安装详解
2021最新版KVM虚拟机安装详解
1024 0
|
前端开发 JavaScript 小程序
【休闲益智】【HTML】看字说颜色
【休闲益智】【HTML】看字说颜色
3566 0
【休闲益智】【HTML】看字说颜色
|
存储 人工智能 分布式计算
阿里云云原生一体化数仓 — 离线实时一体化新能力解读
介绍MaxCompute+Hologres离线和实时数仓一体化优于之前有离线、有在线、有很多不同的引擎的实现方案,通过用实时的引擎做预处理,实现离线实时数据入仓后做更加实时的服务化BI分析实践。
3187 1
阿里云云原生一体化数仓 — 离线实时一体化新能力解读
|
存储 NoSQL 安全
分布式锁中-基于 Redis 的实现如何防重入
分布式锁中-基于 Redis 的实现如何防重入
628 0
分布式锁中-基于 Redis 的实现如何防重入
|
机器学习/深度学习 数据采集 存储
数据集
【7月更文挑战第10天】数据集
2306 1
下一篇
开通oss服务