如何在E-MapReduce上进行Kafka集群间数据复制

本文涉及的产品
EMR Serverless StarRocks,5000CU*H 48000GB*H
简介: 本文介绍如何使用社区的Kafka MirrorMaker工具进行集群间的数据复制。

1. 问题背景

我们在使用Kafka的时候,有时候会遇到以下几种场景:

  • 原有Kafka集群机型配置过旧,需要升级换代,重新规划一个Kafka集群,将老集群的数据迁移到新集群上
  • 数据上云,云下Kafka集群数据迁移到云上Kafka集群/Kafka服务
  • 多个Kafka集群数据汇总到一个Kafka集群
  • 基于业务Kafka集群,构建一个Kafka灾备集群

总结一下,以上场景可以抽象成两类:

  • 数据迁移
  • 数据灾备

Kafka社区提供了一个工具,即MirrorMaker,它可以满足用户的数据迁移需求,同时一定程度的满足数据灾备需求。当然除了原生MirrorMaker工具,也存在着各种衍生版本的数据同步工具。下面就简单介绍一下社区版的MirrorMaker工具。

2. MirrorMaker工具介绍

image

MirrorMaker工具不过是将Kafka Consumer和Producer打包在一起。一个MirrorMaker进程会起若干个Consumer和一个Producer,Consumer负责从源Kafka集群消费数据,Producer负责将数据写到目标Kafka集群。

The source and destination clusters are completely independent entities: they can have different numbers of partitions and the offsets will not be the same.

上面是Kafka官方文档原文。在目标Kafka集群中,Topic可以有不同的分区数,offset也可能不一样。

3. 如何创建镜像集群

在E-MapReduce上创建两个Kafka集群,这里使用EMR-3.18.1版本,分别命名为Kafka-A和Kafka-B。建议将这两个集群建在一个安全组内,避免网络配置问题。

在Kafka-A 集群创建一个Topic "test":

kafka-topics.sh --zookeeper emr-header-1:2181/kafka-1.1.1 --topic test --partitions 100 
--replication-factor 2 --create

并向其中发送数据:

kafka-producer-perf-test.sh --topic test --num-records 10000000000 --throughput 10000 
--producer-props bootstrap.servers=emr-worker-1:9092 --record-size 10

在Kafka-B集群同样创建一个Topic "test",改变一下分区数目:

kafka-topics.sh --zookeeper emr-header-1:2181/kafka-1.1.1 --topic test --partitions 50 
--replication-factor 2 --create

同时在Kafka-B集群启动一个MirrorMaker进程,这个进程会启动4个消费线程消费数据:

kafka-mirror-maker.sh --consumer.config consumer.conf --producer.config producer.conf 
--new.consumer --num.streams 4 --whitelist test

这时,我们就可以在Kafka-B集群看到同步过来的数据。我们在Kafka-A发送一条有意义的数据“hello world!”,在Kafka-B消费看看:
image

注意:

  • 在实际使用中,建议不要开启"auto.topic.create",虽然这样避免我们在目标集群手动创建topic,并且可以自动应对新增topic。但是默认创建的topic,分区数目,副本数都是固定的,不一定就合适。建议按照实际需求手动创建topic,或者至少手动创建那些核心topic,自动创建那些非核心topic。
  • 一个MirrorMaker进程的处理能力是有限的,我们可以在一台机器多启动几个MirrorMaker进程。
  • MirrorMaker所在的机器的带宽能力是有限的,我们可以在多台机器同时部署MirrorMaker进程。
  • 所有的MirrorMaker进程必选使用相同的配置。

4. MirrorMaker参数说明

简单介绍下工具的各个参数:

配置项 说明
abort.on.send.failure 当发送数据失败时,MirrorMaker进程退出,默认true。
blacklist 无需复制数据的Topic列表,支持Java正则表达。注意,只有使用旧的Consumer时才生效。
whitelist 需要复制数据的Topic列表,支持Java正则表达。
consumer.config Consumer客户端配置文件。注意,当配置了"zookeeper.connect"时表示使用旧的Consumer。
producer.config Producer客户端配置文件
consumer.rebalance.listener 自定义的consumer rebalance listener,一般无需配置
rebalance.listener.args 自定义的consumer rebalance listener参数
message.handler 自定义的消息处理器,源端数据经过处理后再写到目的端
message.handler.args 自定义的消息处理器参数
new.consumer 指定使用新的Consumer
num.streams Consumer并发数
offset.commit.interval.ms Offset commit间隔

注意:

  • 目标Kafka集群需要同步源Kakfa集群的配置,譬如最大消息Size等。
  • Consumer配置:

    • 根据最大消息,调整max.partition.fetch.bytes和fetch.max.bytes参数等等。
    • 根据网络环境,适当增大session.timeout.ms和request.timeout.ms。当网络环境恶劣时,适当增加heartbeat.interval.ms,避免不必要的rebalance,当网络环境比较好时,则适当减小heartbeat.interval.ms,以使得服务端更快响应Client的异常。
  • Producer配置:

    • 根据最大消息,调整max.request.size等参数。
    • reties=Int.MAX_VALUE,acks=-1(all)。
    • 根据网络环境,适当增大request.timeout.ms。

5. 小结

本文介绍了如何使用社区的Kafka MirrorMaker工具进行集群间的数据复制。E-MapReduce Kafka基于Kafka Conenct实现了新的一种数据同步工具,支持数据复制和灾备集群构建。Kafka社区当前正在进行MirrorMaker v2.0版本开发,同样也是基于Kafka Conenct来实施。后续将专门写一篇文章来介绍。

目录
相关文章
|
19天前
|
消息中间件 Kafka 测试技术
【Kafka揭秘】Leader选举大揭秘!如何打造一个不丢失消息的强大Kafka集群?
【8月更文挑战第24天】Apache Kafka是一款高性能分布式消息系统,利用分区机制支持数据并行处理。每个分区含一个Leader处理所有读写请求,并可有多个副本确保数据安全与容错。关键的Leader选举机制保障了系统的高可用性和数据一致性。选举发生于分区创建、Leader故障或被手动移除时。Kafka提供多种选举策略:内嵌机制自动选择最新数据副本为新Leader;Unclean选举快速恢复服务但可能丢失数据;Delayed Unclean选举则避免短暂故障下的Unclean选举;Preferred选举允许基于性能或地理位置偏好指定特定副本为首选Leader。
37 5
|
17天前
|
消息中间件 监控 Java
联通实时计算平台问题之监控Kafka集群的断传和积压情况要如何操作
联通实时计算平台问题之监控Kafka集群的断传和积压情况要如何操作
|
19天前
|
消息中间件 监控 Java
【Kafka节点存活大揭秘】如何让Kafka集群时刻保持“心跳”?探索Broker、Producer和Consumer的生死关头!
【8月更文挑战第24天】在分布式系统如Apache Kafka中,确保节点的健康运行至关重要。Kafka通过Broker、Producer及Consumer间的交互实现这一目标。文章介绍Kafka如何监测节点活性,包括心跳机制、会话超时与故障转移策略。示例Java代码展示了Producer如何通过定期发送心跳维持与Broker的连接。合理配置这些机制能有效保障Kafka集群的稳定与高效运行。
24 2
|
9天前
|
分布式计算 资源调度 Hadoop
在YARN集群上运行部署MapReduce分布式计算框架
主要介绍了如何在YARN集群上配置和运行MapReduce分布式计算框架,包括准备数据、运行MapReduce任务、查看任务日志,并启动HistoryServer服务以便于日志查看。
23 0
|
2月前
|
消息中间件 Kafka
kafka 集群环境搭建
kafka 集群环境搭建
57 8
|
30天前
|
消息中间件 Java Kafka
Linux——Kafka集群搭建
Linux——Kafka集群搭建
34 0
|
1月前
|
消息中间件 Kafka Apache
部署安装kafka集群
部署安装kafka集群
|
2月前
|
消息中间件 监控 Java
使用 JMX 监控 Kafka 集群性能指标
使用 JMX 监控 Kafka 集群性能指标
217 1
|
2月前
|
消息中间件 存储 负载均衡
Kafka高可用性指南:提高数据一致性和集群容错能力!
**Kafka高可用性概览** - 创建Topic时设置`--replication-factor 3`确保数据冗余和高可用。 - 分配角色:Leader处理读写,Follower同步数据,简化管理和客户端逻辑。 - ISR(In-Sync Replicas)保持与Leader同步的副本列表,确保数据一致性和可靠性。 - 设置`acks=all`保证消息被所有副本确认,防止数据丢失,增强一致性。 - 通过这些机制,Kafka实现了分布式环境中的数据可靠性、一致性及服务的高可用性。
246 0
|
2月前
|
消息中间件 监控 Kafka
查询Kafka集群中消费组(group)信息和对应topic的消费情况
查询Kafka集群中消费组(group)信息和对应topic的消费情况
781 0