Flume+Kafka+Storm实战:二、Flume与Kafka整合

简介: Flume+Kafka+Storm实战:二、Flume与Kafka整合

0x00 文章内容


  1. Flume准备
  2. Kafka准备
  3. 校验结果


PS:请自行准备好Flume、Kafka的环境。由于本教程是属于整合教程,所以,我们可以直接在原来的基础上进行升级即可。过程是将教程:Flume入门案例之NetCat-Souces里的Sink修改为Kafka,而这里的Kafka用的其实是教程:Flume+Kafka+Storm实战:一、Kakfa与Storm整合里面的topic。


0x01 Flume准备


1. 编写Flume配置文件

a. 拷贝一份example

cd ~/bigdata/apache-flume-1.8.0-bin

cp conf/example.conf conf/kafka.conf

b. 然后修改Sink(点击跳转官网参考

vi conf/kafka.conf


image.png


image.png


c. 完整配置文件

a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = word-count-input
a1.sinks.k1.kafka.bootstrap.servers = master:9092
a1.sinks.k1.kafka.flumeBatchSize = 5
a1.channels.c1.type = memory
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1


0x02 Kafka准备


1. 创建topic(如已操作过可跳过)

a. 启动Kafka与ZK(三台都执行)

启动ZK:

zkServer.sh start

启动Kafka:

nohup ~/bigdata/kafka_2.11-1.0.0/bin/kafka-server-start.sh ~/bigdata/kafka_2.11-1.0.0/config/server.properties >~/bigdata/kafka_2.11-1.0.0/logs/server.log 2>&1 &


目前的进程情况(call_all.sh脚本请参考:大数据常用管理集群脚本集合):


image.png


b. 创建Topic:word-count-input(如果已创建则忽略)

查看Topic(在上一教程已经创建):

~/bigdata/kafka_2.11-1.0.0/bin/kafka-topics.sh --list --zookeeper master:2181


image.png


如果没创建则创建:

~/bigdata/kafka_2.11-1.0.0/bin/kafka-topics.sh --create --zookeeper master:2181 --replication-factor 1 --partitions 1 --topic word-count-input


0x03 校验结果


1. 启动Flume


a. 在master启动

bin/flume-ng agent --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/conf/kafka.conf --name a1


image.png


2. 启动Kafka消费者

a. 首先应确保先将Kafka与ZK启动起来(已启动则忽略,三台都执行)

启动ZK:

zkServer.sh start

启动Kafka:

nohup ~/bigdata/kafka_2.11-1.0.0/bin/kafka-server-start.sh ~/bigdata/kafka_2.11-1.0.0/config/server.properties >~/bigdata/kafka_2.11-1.0.0/logs/server.log 2>&1 &

b. 在master启动Kafka消费者

kafka-console-consumer.sh --bootstrap-server master:9092 --topic word-count-input


3. 测试结果

a. 测试过程与教程:Flume入门案例之NetCat-Souces 一样,可以看到我们的内容可以在Kafka的消费者端接收到。


image.png


image.png


0xFF 总结


其实内容也比较简单,重点是要结合官网来进行配置,其余的都是学习过的内容。

实战的上一篇教程:Flume+Kafka+Storm实战:一、Kakfa与Storm整合,学习没有先后顺序,但是应该具备Flume、Kafka等基础。

不同的版本,配置与结果都有可能不同,关于flumeBatchSize的配置项问题,请参考文章:Flume1.7及以上版本的Kafka Sink batchsize(flumeBatchSize) 配置问题,flumeBatchSize为分批次进行发送配置。

Flume1.6.0版本参考:


先测试能否输出到控制台再对接Spark Streaming
技术选型一:avro-memory-logger
技术选型二:avro-memory-kafka
参考:
技术选型一:
#streaming_logger.conf
agent1.sources=avro-source
agent1.channels=logger-channel
agent1.sinks=log-sink
#define source
agent1.sources.avro-source.type=avro
agent1.sources.avro-source.bind=localhost
agent1.sources.avro-source.port=41414
#define channel
agent1.channels.logger-channel.type=memory
#define sink
agent1.sinks.log-sink.type=logger
agent1.sources.avro-source.channels=logger-channel
agent1.sinks.log-sink.channel=logger-channel
flume-ng agent \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/streaming_logger.conf \
--name agent1 \
-Dflume.root.logger=INFO,console
参考:
技术选型二:
#streaming_kafka.conf
agent1.sources=avro-source
agent1.channels=logger-channel
agent1.sinks=kafka-sink
#define source
agent1.sources.avro-source.type=avro
agent1.sources.avro-source.bind=0.0.0.0
agent1.sources.avro-source.port=41414
#define channel
agent1.channels.logger-channel.type=memory
#define sink
agent1.sinks.kafka-sink.type=org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.kafka-sink.topic = streaming_topic
agent1.sinks.kafka-sink.brokerList = localhost:9092
agent1.sinks.kafka-sink.requiredAcks = 1
agent1.sinks.kafka-sink.batchSize = 20
agent1.sources.avro-source.channels=logger-channel
agent1.sinks.kafka-sink.channel=logger-channel
执行脚本:
flume-ng agent \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/streaming_kafka.conf \
--name agent1 \
相关文章
|
6月前
|
数据采集 消息中间件 监控
Flume数据采集系统设计与配置实战:面试经验与必备知识点解析
【4月更文挑战第9天】本文深入探讨Apache Flume的数据采集系统设计,涵盖Flume Agent、Source、Channel、Sink的核心概念及其配置实战。通过实例展示了文件日志收集、网络数据接收、命令行实时数据捕获等场景。此外,还讨论了Flume与同类工具的对比、实际项目挑战及解决方案,以及未来发展趋势。提供配置示例帮助理解Flume在数据集成、日志收集中的应用,为面试准备提供扎实的理论与实践支持。
265 1
|
6月前
|
消息中间件 存储 监控
Flume+Kafka整合案例实现
Flume+Kafka整合案例实现
120 1
|
6月前
|
消息中间件 缓存 Java
✈️【Kafka技术专题】「开发实战篇」深入实战探索Kafka的生产者的开发实现及实战指南
✈️【Kafka技术专题】「开发实战篇」深入实战探索Kafka的生产者的开发实现及实战指南
76 0
|
6月前
|
存储 SQL Shell
bigdata-13-Flume实战
bigdata-13-Flume实战
47 0
|
18天前
|
消息中间件 数据挖掘 Kafka
Apache Kafka流处理实战:构建实时数据分析应用
【10月更文挑战第24天】在当今这个数据爆炸的时代,能够快速准确地处理实时数据变得尤为重要。无论是金融交易监控、网络行为分析还是物联网设备的数据收集,实时数据处理技术都是不可或缺的一部分。Apache Kafka作为一款高性能的消息队列系统,不仅支持传统的消息传递模式,还提供了强大的流处理能力,能够帮助开发者构建高效、可扩展的实时数据分析应用。
64 5
|
1月前
|
消息中间件 存储 druid
大数据-156 Apache Druid 案例实战 Scala Kafka 订单统计
大数据-156 Apache Druid 案例实战 Scala Kafka 订单统计
40 3
|
3月前
|
消息中间件 Java Kafka
"Kafka快速上手:从环境搭建到Java Producer与Consumer实战,轻松掌握分布式流处理平台"
【8月更文挑战第10天】Apache Kafka作为分布式流处理平台的领头羊,凭借其高吞吐量、可扩展性和容错性,在大数据处理、实时日志收集及消息队列领域表现卓越。初学者需掌握Kafka基本概念与操作。Kafka的核心组件包括Producer(生产者)、Broker(服务器)和Consumer(消费者)。Producer发送消息到Topic,Broker负责存储与转发,Consumer则读取这些消息。首先确保已安装Java和Kafka,并启动服务。接着可通过命令行创建Topic,并使用提供的Java API实现Producer发送消息和Consumer读取消息的功能。
71 8
|
5月前
|
消息中间件 存储 Kafka
go语言并发实战——日志收集系统(二) Kafka简介
go语言并发实战——日志收集系统(二) Kafka简介
117 1
|
5月前
|
消息中间件 数据挖掘 Kafka
使用 Flume 将 CSV 数据导入 Kafka:实现实时数据流
使用 Flume 将 CSV 数据导入 Kafka:实现实时数据流
126 2