Flume+Kafka整合案例实现

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: Flume+Kafka整合案例实现

一、为什么要集成Flume和Kafka

我们很多人在在使用Flume和kafka时,都会问一句为什么要将Flume和Kafka集成?那首先就应该明白业务需求,一般使用Flume+Kafka架构都是希望完成实时流式的日志处理,后面再连接上Flink/Storm/Spark Streaming等流式实时处理技术,从而完成日志实时解析的目标。第一、如果Flume直接对接实时计算框架,当数据采集速度大于数据处理速度,很容易发生数据堆积或者数据丢失,而kafka可以当做一个消息缓存队列,从广义上理解,把它当做一个数据库,可以存放一段时间的数据。第二、Kafka属于中间件,一个明显的优势就是使各层解耦,使得出错时不会干扰其他组件。

因此数据从数据源到flume再到Kafka时,数据一方面可以同步到HDFS做离线计算,另一方面可以做实时计算,可实现数据多分发。

二、概念剖析Flume+Kafka

Flume 是一个分布式、可靠、和高可用的海量日志聚合的系统,支持在系统中定制各类数据发送方,通过监控整个文件目录或者某一个特定文件,用于收集数据;同时Flume也 提供数据写到各种数据接受方(可定制)的能力,用于转发数据。Flume 的易用性在于通过读取配置文件,可以自动收集日志文件,在大数据处理及各种复杂的情况下,flume 经常被用来作为数据处理的工具。

Source:Flume 搜集日志的方式多种多样,比如可以检测文件夹的变化spool Source,可以监测端口信息 Netcat Source,可以监控某各文件新增的内容 Exec Source等等,通常使用检测文件夹变化的方式来实时收集信息,所以本例中我们也将使用Spool Source。

Channel:提供了一层缓冲机制,来实现数据的事务性传输,最大限度保证数据的安全传输。常用的有MemoryChannel:所有的events 被保存在内存中,优点是高吞吐,缺点是容量有限并且Agent 死掉时会丢失内存中的数据;FileChannel:所有的Events 被保存在文件中,优点是容量较大且死掉时数据可恢复,缺点是速度较慢。因此为了保证Event 在数据流点对点传输中是可靠地,要注意Channel 的选择。目前为了提高速度,我们暂时采用MemoryChannel,之后的目标是实现一个自定义channel—doubleChannel,解决上述的两个痛点问题。

Sink:将数据转发到目的地,或者继续将数据转发到另外一个source,实现接力传输,多层之间通过AVRO Sink来实现。本例中,我们的最终目标是实现日志实时处理,因此实时的采集数据流就把数据发送到Kafka 中。

那么小结一下,使用的是对文件夹中文件变化进行监测的Spooling DirectorySource,channel 是用的MemoryChannel,sink 是自定义的kafkasink,用于向kafka 发送数据。

Kafka 是由LinkedIn 开发的开源分布式消息系统,主要用于处理LinkedIn 的活跃数据,说白了也就是用户访问日志数据。这些数据主要包括PV、UV、用户行为(登陆、浏览、搜索、分享、点击)、系统运行日志(CPU、内存、磁盘、进程、网络)等方面的数据。这些数据通常以日志的形式进行存储,现有的消息队列系统可以很好的用于日志分析系统对于实时数据的处理,提高日志解析效率。那么说到Kafka,就必须掌握三个原理部分:Producer、Topic、Consumer:

Producer:消息和数据的生产者,向Kafka的一个topic发布消息的过程即为生产过程,在本例中Flume应该是Producer;

Topic:主题,Kafka处理的消息的不同分类(逻辑概念),可以根据Topic的不同,去区分处理不同的消息。说的更直白一些,Topic就是起到资源隔离的作用,Producer向指定Topic中产生消息,Consumer再从指定的Topic中消费消息。

Consumer:消息和数据的消费者,订阅topic并处理其发布的消息的过程即为消费过程。

大概的一个过程:如下图

1.创建Zookeeper集群请看我的其他博客。

2.创建Kafaka单节点多节点集群请看我的其他博客。

  • 1.解压kafka的压缩包。
  • 2.vi /config/server.properties,在其中进行下面4,5,6步
  • 3.broker.id=0 ,这个是每台机器的标识,不可重复。
  • 4.delete.topic.enable=true,这个是确保删除Topic否则删除时不仅要清除本地数据,还要清除ZK上的数据。
  • 5.log.dirs=/opt/module/kafka/logs,配置自己想存储的路径,这里不仅是日志,它也是数据存储的地方。
  • 6.zookeeper.connect=hadoop01:2181,hadoop02:2181,hadoop03:2181,配置ZK集群的ip地址及端口号。
  • 7.配置环境变量,即在/etc/profile下增加PATH。
  • 8.分发一下kafka到集群其他机器上,记得修改broker.id。
  • 9.在每台机器的kafka文件夹下,使用bin/kafka-server-start.sh config/server.properties & 打开kafka服务。

3.Flume安装

3.1安装

#重命名
[root@hadoop-01 local]# mv apache-flume-1.8.0-bin flume
[root@hadoop-01 local]# cd flume/
[root@hadoop-01 flume]# cd conf/
#配置文件重命名
[root@hadoop-01 conf]# mv flume-env.sh.template flume-env.sh
#添加jdk路径
[root@hadoop-01 conf]# vim flume-env.sh 
export JAVA_HOME=/usr/lib/java

3.1配置Flume:在Flume文件夹下,创建一个job文件夹

mkdir job

3.2 进入job目录创建flume-kafka.conf 并且进行编辑

[root@MiWiFi-R4-srv job]# vim flume-kafka.conf

3.3配置文件如下:

# setting
a1.sources=r1
a1.sinks=k1
a1.channels=c1
 
# Describe/configure the source
a1.sources.r1.type=exec
#监听的日志文件
a1.sources.r1.command=tail -F /home/dns/java.log
 
# Describe the sink
#a1.sinks.k1.type=logger
a1.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink
#kafka创建的为test的消息主题
a1.sinks.k1.topic=test
a1.sinks.k1.brokerList=localhost:9092
a1.sinks.k1.requiredAcks=1
a1.sinks.k1.batchSize=20
 
# Use a channel which buffers events in memory
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100
 
# Bind the source and sink to the channel
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

3.4在Flume文件中进行启动

bin/flume-ng agent --conf conf/ --name a1 --conf-file job/flume-kafka.conf  
 
#文件中的a1正是对应了这点的name

4.测试

4.1测试kafka生产者和消费者

#启动生产者,并输入以下文字
[root@MiWiFi-R4-srv bin]# ./kafka-console-producer.sh --broker-list localhost:9092 --topic test
>hello
>hhh
>
#启动消费者,进行实时消费
[root@MiWiFi-R4-srv bin]# ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
hello
hhh

4.1测试Flume实时日志到Kafka中

#其中Flume
[root@MiWiFi-R4-srv apache-flume-1.9.0-bin]# bin/flume-ng agent --conf conf/ --name a1 --conf-file job/flume-kafka.conf
Info: Including Hive libraries found via () for Hive access
+ exec /usr/local/java/bin/java -Xmx20m -cp '/usr/local/apache-flume-1.9.0-bin/conf:/usr/local/apache-flume-1.9.0-bin/lib/*:/lib/*' -Djava.library.path= org.apache.flume.node.Application --name a1 --conf-file job/flume-kafka.conf
 
# 跟日志中添加数据
[root@MiWiFi-R4-srv dns]# echo "测试" >> java.log 
# kafka进行了消费
[root@MiWiFi-R4-srv bin]# ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
hello
hhh
  
sds
测试

 

相关文章
|
7月前
|
存储 分布式计算 监控
【Flume】Flume 监听日志文件案例分析
【4月更文挑战第4天】【Flume】Flume 监听日志文件案例分析
|
2月前
|
消息中间件 关系型数据库 MySQL
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
186 0
|
2月前
|
消息中间件 存储 druid
大数据-156 Apache Druid 案例实战 Scala Kafka 订单统计
大数据-156 Apache Druid 案例实战 Scala Kafka 订单统计
49 3
|
2月前
|
消息中间件 druid 大数据
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(二)
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(二)
41 2
|
2月前
|
消息中间件 分布式计算 druid
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(一)
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(一)
60 1
|
2月前
|
消息中间件 NoSQL Kafka
大数据-116 - Flink DataStream Sink 原理、概念、常见Sink类型 配置与使用 附带案例1:消费Kafka写到Redis
大数据-116 - Flink DataStream Sink 原理、概念、常见Sink类型 配置与使用 附带案例1:消费Kafka写到Redis
194 0
|
2月前
|
消息中间件 分布式计算 Kafka
大数据-102 Spark Streaming Kafka ReceiveApproach DirectApproach 附带Producer、DStream代码案例
大数据-102 Spark Streaming Kafka ReceiveApproach DirectApproach 附带Producer、DStream代码案例
59 0
|
4月前
|
消息中间件 安全 Kafka
"深入实践Kafka多线程Consumer:案例分析、实现方式、优缺点及高效数据处理策略"
【8月更文挑战第10天】Apache Kafka是一款高性能的分布式流处理平台,以高吞吐量和可扩展性著称。为提升数据处理效率,常采用多线程消费Kafka数据。本文通过电商订单系统的案例,探讨了多线程Consumer的实现方法及其利弊,并提供示例代码。案例展示了如何通过并行处理加快订单数据的处理速度,确保数据正确性和顺序性的同时最大化资源利用。多线程Consumer有两种主要模式:每线程一个实例和单实例多worker线程。前者简单易行但资源消耗较大;后者虽能解耦消息获取与处理,却增加了系统复杂度。通过合理设计,多线程Consumer能够有效支持高并发数据处理需求。
196 4
|
6月前
|
消息中间件 数据挖掘 Kafka
使用 Flume 将 CSV 数据导入 Kafka:实现实时数据流
使用 Flume 将 CSV 数据导入 Kafka:实现实时数据流
153 2
|
7月前
|
XML 数据格式
Flume【付诸实践 01】flume1.9.0版 配置格式说明+常用案例分享(ExecSource+SpoolingDirectorySource+HDFSSink+AvroSourceSink)
【2月更文挑战第19天】Flume【付诸实践 01】flume1.9.0版 配置格式说明+常用案例分享(ExecSource+SpoolingDirectorySource+HDFSSink+AvroSourceSink)
150 1
下一篇
DataWorks