如何在E-MapReduce上进行Kafka集群间数据复制

本文涉及的产品
EMR Serverless StarRocks,5000CU*H 48000GB*H
简介: 本文介绍如何使用社区的Kafka MirrorMaker工具进行集群间的数据复制。

1. 问题背景

我们在使用Kafka的时候,有时候会遇到以下几种场景:

  • 原有Kafka集群机型配置过旧,需要升级换代,重新规划一个Kafka集群,将老集群的数据迁移到新集群上
  • 数据上云,云下Kafka集群数据迁移到云上Kafka集群/Kafka服务
  • 多个Kafka集群数据汇总到一个Kafka集群
  • 基于业务Kafka集群,构建一个Kafka灾备集群

总结一下,以上场景可以抽象成两类:

  • 数据迁移
  • 数据灾备

Kafka社区提供了一个工具,即MirrorMaker,它可以满足用户的数据迁移需求,同时一定程度的满足数据灾备需求。当然除了原生MirrorMaker工具,也存在着各种衍生版本的数据同步工具。下面就简单介绍一下社区版的MirrorMaker工具。

2. MirrorMaker工具介绍

image

MirrorMaker工具不过是将Kafka Consumer和Producer打包在一起。一个MirrorMaker进程会起若干个Consumer和一个Producer,Consumer负责从源Kafka集群消费数据,Producer负责将数据写到目标Kafka集群。

The source and destination clusters are completely independent entities: they can have different numbers of partitions and the offsets will not be the same.

上面是Kafka官方文档原文。在目标Kafka集群中,Topic可以有不同的分区数,offset也可能不一样。

3. 如何创建镜像集群

在E-MapReduce上创建两个Kafka集群,这里使用EMR-3.18.1版本,分别命名为Kafka-A和Kafka-B。建议将这两个集群建在一个安全组内,避免网络配置问题。

在Kafka-A 集群创建一个Topic "test":

kafka-topics.sh --zookeeper emr-header-1:2181/kafka-1.1.1 --topic test --partitions 100 
--replication-factor 2 --create

并向其中发送数据:

kafka-producer-perf-test.sh --topic test --num-records 10000000000 --throughput 10000 
--producer-props bootstrap.servers=emr-worker-1:9092 --record-size 10

在Kafka-B集群同样创建一个Topic "test",改变一下分区数目:

kafka-topics.sh --zookeeper emr-header-1:2181/kafka-1.1.1 --topic test --partitions 50 
--replication-factor 2 --create

同时在Kafka-B集群启动一个MirrorMaker进程,这个进程会启动4个消费线程消费数据:

kafka-mirror-maker.sh --consumer.config consumer.conf --producer.config producer.conf 
--new.consumer --num.streams 4 --whitelist test

这时,我们就可以在Kafka-B集群看到同步过来的数据。我们在Kafka-A发送一条有意义的数据“hello world!”,在Kafka-B消费看看:
image

注意:

  • 在实际使用中,建议不要开启"auto.topic.create",虽然这样避免我们在目标集群手动创建topic,并且可以自动应对新增topic。但是默认创建的topic,分区数目,副本数都是固定的,不一定就合适。建议按照实际需求手动创建topic,或者至少手动创建那些核心topic,自动创建那些非核心topic。
  • 一个MirrorMaker进程的处理能力是有限的,我们可以在一台机器多启动几个MirrorMaker进程。
  • MirrorMaker所在的机器的带宽能力是有限的,我们可以在多台机器同时部署MirrorMaker进程。
  • 所有的MirrorMaker进程必选使用相同的配置。

4. MirrorMaker参数说明

简单介绍下工具的各个参数:

配置项 说明
abort.on.send.failure 当发送数据失败时,MirrorMaker进程退出,默认true。
blacklist 无需复制数据的Topic列表,支持Java正则表达。注意,只有使用旧的Consumer时才生效。
whitelist 需要复制数据的Topic列表,支持Java正则表达。
consumer.config Consumer客户端配置文件。注意,当配置了"zookeeper.connect"时表示使用旧的Consumer。
producer.config Producer客户端配置文件
consumer.rebalance.listener 自定义的consumer rebalance listener,一般无需配置
rebalance.listener.args 自定义的consumer rebalance listener参数
message.handler 自定义的消息处理器,源端数据经过处理后再写到目的端
message.handler.args 自定义的消息处理器参数
new.consumer 指定使用新的Consumer
num.streams Consumer并发数
offset.commit.interval.ms Offset commit间隔

注意:

  • 目标Kafka集群需要同步源Kakfa集群的配置,譬如最大消息Size等。
  • Consumer配置:

    • 根据最大消息,调整max.partition.fetch.bytes和fetch.max.bytes参数等等。
    • 根据网络环境,适当增大session.timeout.ms和request.timeout.ms。当网络环境恶劣时,适当增加heartbeat.interval.ms,避免不必要的rebalance,当网络环境比较好时,则适当减小heartbeat.interval.ms,以使得服务端更快响应Client的异常。
  • Producer配置:

    • 根据最大消息,调整max.request.size等参数。
    • reties=Int.MAX_VALUE,acks=-1(all)。
    • 根据网络环境,适当增大request.timeout.ms。

5. 小结

本文介绍了如何使用社区的Kafka MirrorMaker工具进行集群间的数据复制。E-MapReduce Kafka基于Kafka Conenct实现了新的一种数据同步工具,支持数据复制和灾备集群构建。Kafka社区当前正在进行MirrorMaker v2.0版本开发,同样也是基于Kafka Conenct来实施。后续将专门写一篇文章来介绍。

目录
相关文章
|
15天前
|
消息中间件 存储 监控
构建高可用性Apache Kafka集群:从理论到实践
【10月更文挑战第24天】随着大数据时代的到来,数据传输与处理的需求日益增长。Apache Kafka作为一个高性能的消息队列服务,因其出色的吞吐量、可扩展性和容错能力而受到广泛欢迎。然而,在构建大规模生产环境下的Kafka集群时,保证其高可用性是至关重要的。本文将从个人实践经验出发,详细介绍如何构建一个高可用性的Kafka集群,包括集群规划、节点配置以及故障恢复机制等方面。
47 4
|
1月前
|
消息中间件 监控 数据可视化
大数据-79 Kafka 集群模式 集群监控方案 JavaAPI获取集群指标 可视化监控集群方案: jconsole、Kafka Eagle
大数据-79 Kafka 集群模式 集群监控方案 JavaAPI获取集群指标 可视化监控集群方案: jconsole、Kafka Eagle
49 2
|
13天前
|
消息中间件 存储 Prometheus
Kafka集群如何配置高可用性
Kafka集群如何配置高可用性
|
1月前
|
消息中间件 分布式计算 监控
大数据-78 Kafka 集群模式 集群的应用场景与Kafka集群的搭建 三台云服务器
大数据-78 Kafka 集群模式 集群的应用场景与Kafka集群的搭建 三台云服务器
59 6
|
1月前
|
分布式计算 资源调度 Hadoop
Hadoop-10-HDFS集群 Java实现MapReduce WordCount计算 Hadoop序列化 编写Mapper和Reducer和Driver 附带POM 详细代码 图文等内容
Hadoop-10-HDFS集群 Java实现MapReduce WordCount计算 Hadoop序列化 编写Mapper和Reducer和Driver 附带POM 详细代码 图文等内容
88 3
|
1月前
|
分布式计算 资源调度 数据可视化
Hadoop-06-Hadoop集群 历史服务器配置 超详细 执行任务记录 JobHistoryServer MapReduce执行记录 日志聚合结果可视化查看
Hadoop-06-Hadoop集群 历史服务器配置 超详细 执行任务记录 JobHistoryServer MapReduce执行记录 日志聚合结果可视化查看
32 1
|
1月前
|
分布式计算 资源调度 Hadoop
Hadoop-05-Hadoop集群 集群WordCount 超详细 真正的分布式计算 上传HDFS MapReduce计算 YRAN查看任务 上传计算下载查看
Hadoop-05-Hadoop集群 集群WordCount 超详细 真正的分布式计算 上传HDFS MapReduce计算 YRAN查看任务 上传计算下载查看
46 1
|
1月前
|
SQL 分布式计算 关系型数据库
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
80 0
|
1月前
|
SQL 分布式计算 关系型数据库
Hadoop-23 Sqoop 数据MySQL到HDFS(部分) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-23 Sqoop 数据MySQL到HDFS(部分) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
36 0
|
1月前
|
SQL 分布式计算 关系型数据库
Hadoop-22 Sqoop 数据MySQL到HDFS(全量) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-22 Sqoop 数据MySQL到HDFS(全量) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
46 0