通过Kafka Connect进行数据迁移

本文涉及的产品
EMR Serverless StarRocks,5000CU*H 48000GB*H
简介: 在流式数据处理过程中,E-MapReduce经常需要在Kafka与其他系统间进行数据同步或者在Kafka集群间进行数据迁移。本节向您介绍如何在E-MapReduce上通过Kafka Connect快速的实现Kafka集群间的数据同步或者数据迁移。

在流式数据处理过程中,E-MapReduce经常需要在Kafka与其他系统间进行数据同步或者在Kafka集群间进行数据迁移。本节向您介绍如何在E-MapReduce上通过Kafka Connect快速的实现Kafka集群间的数据同步或者数据迁移。

前提条件

  • 已注册云账号,详情请参见注册云账号
  • 已开通E-MapReduce服务。
  • 已完成云账号的授权,详情请参见角色授权

背景信息

Kafka Connect是一种可扩展的、可靠的,用于在Kafka和其他系统之间快速的进行流式数据传输的工具。例如,Kafka Connect可以获取数据库的binlog数据,将数据库数据同步至Kafka集群,从而达到迁移数据库数据的目的。由于Kafka集群可对接流式处理系统,所以还可以间接实现数据库对接下游流式处理系统的目的。同时,Kafka Connect还提供了REST API接口,方便您创建和管理Kafka Connect。

kafka Connect分为standalone和distributed两种运行模式。在standalone模式下,所有的worker都在一个进程中运行。相比于standalone模式,distributed模式更具扩展性和容错性,是最常用的方式,也是生产环境推荐使用的模式。

本文介绍如何在E-MapReduce上使用Kafka Connect的REST API接口在Kafka集群间进行数据迁移,kafka Connect使用distributed模式。

步骤一 创建Kafka集群

在EMR上创建源Kafka集群和目的Kafka集群。Kafka Connect安装在Task节点上,所以目的Kafka集群必须创建Task节点。集群创建好后,Task节点上Kafka Connect服务会默认启动,端口号为8083。

推荐您将源Kafka集群和目的kafka集群创建在同一个安全组下。如果源Kafka集群和目的kafka集群不在同一个安全组下,则两者的网络默认是不互通的,您需要对两者的安全组分别进行相关配置,以使两者的网络互通。

  1. 登录阿里云 E-MapReduce 控制台
  2. 创建源Kafka集群和目的Kafka集群,详情请参见创建集群。

说明 创建目的Kafka集群时,必须开启Task实例,即创建Task节点。

image.png

步骤二 准备待迁移数据Topic

在源Kafka集群上创建一个名称为connect的Topic。

  1. 以SSH方式登录到源Kafka集群的header节点(本例为emr-header-1)。
  2. 以root用户运行如下命令创建一个名称为connect的Topic。
kafka-topics.sh --create --zookeeper emr-header-1:2181 --replication-factor 2 --partitions 10 --topic connect

image.png

说明 完成上述操作后,请保留该登录窗口,后续仍将使用。

步骤三 创建Kafka Connect的connector

在目的Kafka集群的Task节点上,使用curl命令通过JSON数据创建一个Kafka Connect的connector。

  1. 以SSH方式登录到目的Kafka集群的Task节点(本节为emr-worker-3)。
  2. 可选: 自定义Kafka Connect配置。
    进入目的Kafka集群Kafka服务的配置页面,在connect-distributed.properties中自定义offset.storage.topic、config.storage.topic和status.storage.topic三个配置项,详情请参见组件参数配置。

Kafka Connect会将offsets、configs和任务状态保存在Topic中,Topic名对应offset.storage.topic、config.storage.topic和status.storage.topic三个配置项。Kafka Connect会自动使用默认的partition和replication factor创建这三个Topic,其中partition和repication factor配置项保存在/etc/ecm/kafka-conf/connect-distributed.properties文件中。

  1. 以root用户运行如下命令创建一个Kafka Connect。
curl -X POST -H "Content-Type: application/json" --data '{"name": "connect-test", "config": { "connector.class": "EMRReplicatorSourceConnector", "key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter", "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter", "src.kafka.bootstrap.servers": "${src-kafka-ip}:9092", "src.zookeeper.connect": "${src-kafka-curator-ip}:2181", "dest.zookeeper.connect": "${dest-kafka-curator-ip}:2181", "topic.whitelist": "${source-topic}", "topic.rename.format": "${dest-topic}", "src.kafka.max.poll.records": "300" } }' http://emr-worker-3:8083/connectors

在JSON数据中,name字段代表创建的Kafka Connect的名称,本例为connect-test;config字段需要根据实际情况进行配置,关键变量的说明如下:image.png

说明 完成上述操作后,请保留该登录窗口,后续仍将使用。

步骤四 查看Kafka Connect和Task节点状态

查看Kafka Connect和Task节点信息,确保两者的状态正常。

  1. 返回到目的Kafka集群的Task节点(本节为emr-worker-3)的登录窗口。
  2. 以root用户运行如下命令查看所有的Kafka Connect。
curl emr-worker-3:8083/connectors

image.png

  1. 以root用户运行如下命令查看本例创建的Kafka Connect(本例为connect-test)的状态。
curl emr-worker-3:8083/connectors/connect-test/status

image.png
确保Kafka Connect(本例为connect-test)的状态为RUNNING。

  1. 以root用户运行如下命令查看Task节点信息。
curl emr-worker-3:8083/connectors/connect-test/tasks

image.png
确保Task节点的返回信息中无错误信息。

步骤五 生成待迁移数据

通过命令向源集群中的connect Topic发送待迁移的数据。

  1. 返回到源Kafka集群的header节点(本例为emr-header-1)的登录窗口。
  2. 以root用户运行如下命令向connect Topic发送数据。
kafka-producer-perf-test.sh --topic connect --num-records 100000 --throughput 5000 --record-size 1000 --producer-props bootstrap.servers=emr-header-1:9092

image.png

步骤六 查看数据迁移结果

生成待迁移数据后,Kafka Connect会自动将这些数据迁移到目的集群的相应文件(本例为connect.replica)中。

  1. 返回到目的Kafka集群的Task节点(本节为emr-worker-3)的登录窗口。
  2. 以root用户运行如下命令查看数据迁移是否成功。
kafka-consumer-perf-test.sh --topic connect.replica --broker-list emr-header-1:9092 --messages 100000

image.png
从上述返回结果可以看出,在源Kafka集群发送的100000条数据已经迁移到了目的kafka集群。

小结

本文介绍并演示了使用Kafka Connect在Kafka集群间进行数据迁移的方法。如果需要了解Kafka Connect更详细的使用方法,请参见Kafka官网资料REST API


阿里巴巴开源大数据技术团队成立Apache Spark中国技术社区,定期推送精彩案例,技术专家直播,问答区数个Spark技术同学每日在线答疑,只为营造纯粹的Spark氛围,欢迎钉钉扫码加入!
image.png

对开源大数据和感兴趣的同学可以加小编微信(下图二维码,备注“进群”)进入技术交流微信群。

image.png

相关文章
|
4月前
|
消息中间件 分布式计算 NoSQL
EMR-Kafka Connect:高效数据迁移的革新实践与应用探索
Kafka Connect是Kafka官方提供的一个可扩展的数据传输框架,它允许用户以声明式的方式在Kafka与其他数据源之间进行数据迁移,无需编写复杂的数据传输代码。
|
4月前
|
消息中间件 存储 Kafka
如何通过 CloudCanal 实现从 Kafka 到 AutoMQ 的数据迁移
随着大数据技术的飞速发展,Apache Kafka 作为一种高吞吐量、低延迟的分布式消息系统,已经成为企业实时数据处理的核心组件。然而,随着业务的扩展和技术的发展,企业面临着不断增加的存储成本和运维复杂性问题。为了更好地优化系统性能和降低运营成本,企业开始寻找更具优势的消息系统解决方案。其中,AutoMQ [1] 作为一种基于云重新设计的消息系统,凭借其显著的成本优势和弹性能力,成为了企业的理想选择。
33 0
如何通过 CloudCanal 实现从 Kafka 到 AutoMQ 的数据迁移
|
6月前
|
消息中间件 关系型数据库 MySQL
实时计算 Flink版操作报错合集之遇到报错:Apache Kafka Connect错误如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
211 5
|
6月前
|
消息中间件 关系型数据库 MySQL
在kafka connect 同步 mysql 主从数据库
在kafka connect 同步 mysql 主从数据库
109 0
|
6月前
|
消息中间件 Kafka Windows
使用Kafka Connect 导入导出数据
使用Kafka Connect 导入导出数据
180 0
|
6月前
|
消息中间件 关系型数据库 MySQL
Kafka Connect :构建强大分布式数据集成方案
Kafka Connect 是 Apache Kafka 生态系统中的关键组件,专为构建可靠、高效的分布式数据集成解决方案而设计。本文将深入探讨 Kafka Connect 的核心架构、使用方法以及如何通过丰富的示例代码解决实际的数据集成挑战。
|
消息中间件 JSON 关系型数据库
[实战系列]SelectDB Cloud Kafka Connect 最佳实践张家锋
[实战系列]SelectDB Cloud Kafka Connect 最佳实践张家锋
155 1
|
消息中间件 存储 Kafka
Apache Kafka - 构建数据管道 Kafka Connect
Apache Kafka - 构建数据管道 Kafka Connect
152 0
|
消息中间件 JSON 监控
实时数据同步与共享:使用Apache Kafka Connect
在现代应用程序开发中,实时数据同步和共享变得越来越重要。而Apache Kafka Connect作为一个可靠的、分布式的数据集成工具,为我们提供了一种简单而强大的方式来实现实时数据的传输和共享。
1201 0
|
消息中间件 JSON Kubernetes
云原生系列五:Kafka 集群数据迁移基于Kubernetes的内部
​ 1.概述 Kafka的使用场景非常广泛,一些实时流数据业务场景,均依赖Kafka来做数据分流。而在分布式应用场景中,数据迁移是一个比较常见的问题。关于Kafka集群数据如何迁移,今天叶秋学长将为大家详细介绍。 2.内容 本篇博客为大家介绍两种迁移场景,分别是同集群数据迁移、跨集群数据迁移。如下图所示:  2.1 同集群迁移 同集群之间数据迁移,比如在已有的集群中新增了一个Broker节点,此时需要将原来集群中已有的Topic的数据迁移部分到新的集群中,缓解集群压力。 将新的节点添加到Kafka集群很简单,只需为它们分配一个唯一的Broker ID,并在新服务器上启动Kafka。
251 0
云原生系列五:Kafka 集群数据迁移基于Kubernetes的内部