使用EMR-Flume同步Kafka数据到HDFS

简介: Flume是一个分布式、可靠和高效的数据汇聚系统,其source、channel和sink的结构设计,不仅实现了数据生产者与消费者的解耦,还提供了数据缓冲的功能。一个比较通用的使用场景是使用Flume将Kafka的数据按照时间分区同步至HDFS,进行实时的流式分析或离线统计。

使用EMR-Flume同步Kafka数据到HDFS

1. 背景

Flume是一个分布式、可靠和高效的数据汇聚系统,其source、channel和sink的结构设计,不仅实现了数据生产者与消费者的解耦,还提供了数据缓冲的功能。Flume支持多种source、channel和sink,也可以实现自定义source、channel和sink并以插件的方式加入Flume中。同时,Flume也支持数据处理、负载均衡、failover和数据高可靠等高级特性。E-MapReduce从3.19.0版本开始,提供了EMR-Flume集群管理、gateway扩展等多种特性方便Flume的使用。
一个比较通用的使用场景是使用Flume将Kafka的数据按照时间分区同步至HDFS,进行实时的流式分析,或者使用Hive等工具进行离线的统计。下面就详细的介绍使用Flume同步Kafka的数据到HDFS。

2.准备工作

创建Kafka集群并创建topic test,详细步骤参考Kafka快速入门
创建Hadoop集群,在可选软件中选择Flume,详细步骤参考创建集群

3. Flume配置

首先对agent和绑定关系进行如下配置

agent.sources source
agent.sinks sink
agent.channels channel
agent.sources.source1.channels channel
agent.sinks.k1.channel channel

3.1 Kafka source

因为写入HDFS的数据是按照时间分区的,如果在HDFS sink中配置useLocalTimeStamp将写入HDFS的时间作为分区时间,在Flume有数据滞后时,会将数据写入错误的分区。默认的,Kafka Source会在接收数据时将系统时间写入header中,可以使用该时间作为分区时间。Kafka source配置如下

agent.sources.source.type org.apache.flume.source.kafka.KafkaSource
agent.sources.source.batchSize 5000
agent.sources.source.kafka.bootstrap.servers ip:port
agent.sources.source.kafka.topics test
agent.sources.source.kafka.consumer.group.id test-group

其中,agent.sources.source.kafka.bootstrap.servers为Kafka broker的地址,根据实际配置。
在实际使用中,Kafka topic的数据量可能很大,超过一个Flume agent的负载,可以启动多个agent,使用相同的consumer group id来共同消费同一个topic的数据;同时,如果其中一个agent失败,其他agent也会继续消费topic的数据,达到容灾的效果。

3.2 channel

根据实际情况,需要在性能和可靠性做权衡。比如相比file channel,memory channel性能更高,但是在agent停止后channel中的数据会丢失;file channel虽然性能不如memory channel,但是持久化在磁盘的数据可以在agent停止后保证数据不丢失。此处使用file channel做说明

agent.channels.channel.transactionCapacity 51200
agent.channels.channel.checkpointDir /mnt/disk1/flume/file-channel/checkpoint
agent.channels.channel.dataDirs /mnt/disk1/flume/file-channel/data
agent.channels.channel.capacity 51200

3.3 HDFS sink

将数据以时间为分区写入HDFS。考虑到配合Hive进行查询,可以在路径中添加列名。例如添加datetime和hour列,如下所示

agent.sinks.sink.hdfs.path /tmp/flume-data/datetime=%y%m%d/hour=%H
agent.sinks.sink.hdfs.fileType DataStream
agent.sinks.sink.hdfs.rollSize 0
agent.sinks.sink.hdfs.rollCount 0
agent.sinks.sink.hdfs.rollInterval 3600
agent.sinks.sink.hdfs.batchSize 51200
agent.sinks.sink.hdfs.round true
agent.sinks.sink.hdfs.roundValue 60
agent.sinks.sink.hdfs.roundUnit minute

其中,batchSize的设置需要在发送效率和延迟中做出选择,设置过大会数据滞后,设置过小会影响HDFS的吞吐。
为防止生成过多小文件,此处按照时间(1小时)来生成文件,也可根据实际情况根据event数或者文件大小来生成文件。

4.运行Flume agent

参考Flume使用说明
成功运行agent之后,可以查看HDFS中存储的数据。如下图所示查看2019年4月9日20点的数据
1554818460665_f357cc07_0b08_4e00_b1ba_e7042a06ae97

5.load balance

为了保证下游sink的可靠性,可以配置多个sink并使用相同的load balance sink组。这样,在其中一个sink失败时,其他sink可以从channel拉取数据并sink到HDFS中。如下设置了两个avro sink同属于一个sink组load-balancer-sink-group。

agent.sinks avro-sink-1 avro-sink-2
agent.sinkgroups load-balancer-sink-group
agent.sinkgroups.load-balancer-sink-group.sinks avro-sink-1 avro-sink-2
agent.sinkgroups.load-balancer-sink-group.processor.type load_balance
agent.sinkgroups.load-balancer-sink-group.processor.selector random
agent.sinks.avro-sink-1.type avro
agent.sinks.avro-sink-1.hostname emr-worker-1
agent.sinks.avro-sink-1.port 19999
agent.sinks.avro-sink-2.type avro
agent.sinks.avro-sink-2.hostname emr-worker-2
agent.sinks.avro-sink-2.port 19999

使用sink组替代2.3 HDFS sink 中介绍的配置后,需要在emr-worker-1和emr-worker-2两个节点配置source为avro,sink为HDFS的Flume agent。如下所示

agent.sinks.sink.hdfs.path /tmp/flume-data/datetime=%y%m%d/hour=%H
agent.sinks.sink.hdfs.fileType DataStream
agent.sinks.sink.hdfs.rollSize 0
agent.sinks.sink.hdfs.rollCount 0
agent.sinks.sink.hdfs.rollInterval 3600
agent.sinks.sink.hdfs.batchSize 51200
agent.sources.source.bind 0.0.0.0
agent.sources.source.port 19999
agent.channels.channel.transactionCapacity 51200
agent.channels.channel.dataDirs /mnt/disk1/flume/file-channel/data
agent.channels.channel.checkpointDir /mnt/disk1/flume/file-channel/checkpoint
agent.channels.channel.capacity 51200
agent.sinks.sink.type hdfs
agent.channels.channel.type file
agent.sources.source.type avro
目录
相关文章
|
6月前
|
存储 运维 Serverless
千万级数据秒级响应!碧桂园基于 EMR Serverless StarRocks 升级存算分离架构实践
碧桂园服务通过引入 EMR Serverless StarRocks 存算分离架构,解决了海量数据处理中的资源利用率低、并发能力不足等问题,显著降低了硬件和运维成本。实时查询性能提升8倍,查询出错率减少30倍,集群数据 SLA 达99.99%。此次技术升级不仅优化了用户体验,还结合AI打造了“一看”和“—问”智能场景助力精准决策与风险预测。
537 69
|
12月前
|
SQL 分布式计算 监控
Hadoop-20 Flume 采集数据双写至本地+HDFS中 监控目录变化 3个Agent MemoryChannel Source对比
Hadoop-20 Flume 采集数据双写至本地+HDFS中 监控目录变化 3个Agent MemoryChannel Source对比
171 3
|
12月前
|
分布式计算 Java Hadoop
Hadoop-18 Flume HelloWorld 第一个Flume尝试!编写conf实现Source+Channel+Sink 控制台查看收集到的数据 流式收集
Hadoop-18 Flume HelloWorld 第一个Flume尝试!编写conf实现Source+Channel+Sink 控制台查看收集到的数据 流式收集
129 1
|
9月前
|
消息中间件 存储 缓存
kafka 的数据是放在磁盘上还是内存上,为什么速度会快?
Kafka的数据存储机制通过将数据同时写入磁盘和内存,确保高吞吐量与持久性。其日志文件按主题和分区组织,使用预写日志(WAL)保证数据持久性,并借助操作系统的页缓存加速读取。Kafka采用顺序I/O、零拷贝技术和批量处理优化性能,支持分区分段以实现并行处理。示例代码展示了如何使用KafkaProducer发送消息。
|
12月前
|
消息中间件 存储 运维
为什么说Kafka还不是完美的实时数据通道
【10月更文挑战第19天】Kafka 虽然作为数据通道被广泛应用,但在实时性、数据一致性、性能及管理方面存在局限。数据延迟受消息堆积和分区再平衡影响;数据一致性难以达到恰好一次;性能瓶颈在于网络和磁盘I/O;管理复杂性涉及集群配置与版本升级。
398 1
|
12月前
|
SQL 分布式计算 Hadoop
Hadoop-14-Hive HQL学习与测试 表连接查询 HDFS数据导入导出等操作 逻辑运算 函数查询 全表查询 WHERE GROUP BY ORDER BY(一)
Hadoop-14-Hive HQL学习与测试 表连接查询 HDFS数据导入导出等操作 逻辑运算 函数查询 全表查询 WHERE GROUP BY ORDER BY(一)
176 4
|
12月前
|
SQL 分布式计算 Hadoop
Hadoop-19 Flume Agent批量采集数据到HDFS集群 监听Hive的日志 操作则把记录写入到HDFS 方便后续分析
Hadoop-19 Flume Agent批量采集数据到HDFS集群 监听Hive的日志 操作则把记录写入到HDFS 方便后续分析
190 2
|
12月前
|
SQL
Hadoop-14-Hive HQL学习与测试 表连接查询 HDFS数据导入导出等操作 逻辑运算 函数查询 全表查询 WHERE GROUP BY ORDER BY(二)
Hadoop-14-Hive HQL学习与测试 表连接查询 HDFS数据导入导出等操作 逻辑运算 函数查询 全表查询 WHERE GROUP BY ORDER BY(二)
139 2
|
12月前
|
消息中间件 Java Kafka
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
274 1
|
12月前
|
SQL 分布式计算 关系型数据库
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
287 0

热门文章

最新文章