【Kafka】(九)项目中使用 Kafka 整合 Flume

简介: 【Kafka】(九)项目中使用 Kafka 整合 Flume

文章目录


一、启动 Kafka

二、创建 Topic 消息队列

三、查询 kafka 消息队列

四、启动 consumer 监控窗口

五、写 Flume 自定义配置文件

六、开启 Flume

七、结果分析


一、启动 Kafka



kafka-server-start.sh /opt/soft/kafka211/config/server.properties


二、创建 Topic 消息队列



kafka-topics.sh --create --zookeeper 192.168.56.137:2181 --topic demo12 --replication-factor 1 --partitions 1

三、查询 kafka 消息队列



kafka-topics.sh --zookeeper 192.168.56.137:2181 --list


做完前三步,我们可以看下结果大致如下:


image.png


四、启动 consumer 监控窗口



kafka-console-consumer.sh --bootstrap-server 192.168.56.137:9092 --topic demo3 --from-beginning

五、写 Flume 自定义配置文件


这里的路径是:/opt/soft/flumconf


a7.sources=r7
a7.sinks=k7
a7.channels=c7
a7.sources.r7.type=spooldir
a7.sources.r7.spoolDir=/opt/soft/data/user_friends
a7.sources.r7.deserializer.maxLineLength=150000
a7.sources.r7.interceptors=f1
a7.sources.r7.interceptors.f1.type=regex_filter
a7.sources.r7.interceptors.f1.regex=^(\\s*user\\s*,\\s*friends\\s*)$
a7.sources.r7.interceptors.f1.excludeEvents=true
a7.sinks.k7.type=org.apache.flume.sink.kafka.KafkaSink
a7.sinks.k7.topic=demo13
a7.sinks.k7.kafka.bootstrap.servers=192.168.56.137:9092
a7.sinks.k7.serializer.class=kafka.serializer.StringEncoder
a7.channels.c7.type=memory
a7.channels.c7.capacity=1000
a7.channels.c7.transactionCapacity=100
a7.sources.r7.channels=c7
a7.sinks.k7.channel=c7


/

这是 CSV 数据源的路径,如下:


image.png


六、开启 Flume



flume-ng agent -n a6 -c conf -f /opt/soft/flumeconf/demo5.properties


此时我们会发现监控窗口那里开始有csv文件的数据


image.png


七、结果分析


用下面的重置命令,可以将消息队列的指针重置到开头


kafka-run-class.sh kafka.tools.GetOffsetShell --topic demo3 --time -1 --broker-list 192.168.56.137:9092 --partitions 0


需要注意的是:


-1 指针移动到开头的位置
-2 指针移动到尾部


image.png


首先我们查看了数据源的数据,发下有38203条数据,再通过kafka 消息重置命令 查询,发现只有 38202 条数据,说明我们在flume 自定义配置文件中成功完成对数据源表头的去除,并成功将数据导入到kafka消息队列中。


目录
相关文章
|
7月前
|
消息中间件 存储 监控
Flume+Kafka整合案例实现
Flume+Kafka整合案例实现
128 1
|
6月前
|
消息中间件 Java Kafka
集成Kafka到Spring Boot项目中的步骤和配置
集成Kafka到Spring Boot项目中的步骤和配置
285 7
|
6月前
|
消息中间件 数据挖掘 Kafka
使用 Flume 将 CSV 数据导入 Kafka:实现实时数据流
使用 Flume 将 CSV 数据导入 Kafka:实现实时数据流
133 2
|
5月前
|
消息中间件 负载均衡 监控
基于kafka项目之Keepalived高可用详细介绍
基于kafka项目之Keepalived高可用详细介绍
|
5月前
|
消息中间件 NoSQL Kafka
日志收集平台项目nginx、kafka、zookeeper、filebeat搭建的基本配置(2)
日志收集平台项目nginx、kafka、zookeeper、filebeat搭建的基本配置(2)
|
5月前
|
消息中间件 应用服务中间件 Kafka
日志收集平台项目nginx、kafka、zookeeper、filebeat搭建的基本配置(1)
日志收集平台项目nginx、kafka、zookeeper、filebeat搭建的基本配置(1)
|
7月前
|
数据可视化 JavaScript 关系型数据库
基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(五)FineBI可视化
基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(五)FineBI可视化
129 0
|
7月前
|
SQL 消息中间件 关系型数据库
基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(四)实时计算需求及技术方案
基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(四)实时计算需求及技术方案
192 0
|
7月前
|
SQL 消息中间件 分布式数据库
基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(三)离线分析
基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(三)离线分析
131 0
|
7月前
|
消息中间件 存储 数据采集
基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(二)数据源
基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(二)数据源
105 0