08【在线日志分析】之Flume Agent(聚合节点) sink to kafka cluster

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: 1.创建logtopic[root@sht-sgmhadoopdn-01 kafka]# bin/kafka-topics.sh --create --zookeeper 172.

1.创建logtopic
[root@sht-sgmhadoopdn-01 kafka]# bin/kafka-topics.sh --create --zookeeper 172.16.101.58:2181,172.16.101.59:2181,172.16.101.60:2181/kafka --replication-factor 3 --partitions 1 --topic logtopic


2.创建avro_memory_kafka.properties (kafka sink)
[root@sht-sgmhadoopcm-01 ~]# cd /tmp/flume-ng/conf
[root@sht-sgmhadoopcm-01 conf]# cp avro_memory_hdfs.properties avro_memory_kafka.properties
[root@sht-sgmhadoopcm-01 conf]# vi avro_memory_kafka.properties
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.bind = 172.16.101.54
a1.sources.r1.port = 4545


# Describe the sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = logtopic
a1.sinks.k1.kafka.bootstrap.servers = 172.16.101.58:9092,172.16.101.59:9092,172.16.101.60:9092
a1.sinks.k1.kafka.flumeBatchSize = 6000
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.ki.kafka.producer.compression.type = snappy


# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.keep-alive = 90
a1.channels.c1.capacity = 2000000
a1.channels.c1.transactionCapacity = 6000


# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1


3.后台启动 flume-ng agent(聚合节点)和查看nohup.out
[root@sht-sgmhadoopcm-01 ~]# source /etc/profile
[root@sht-sgmhadoopcm-01 ~]# cd /tmp/flume-ng/
[root@sht-sgmhadoopcm-01 flume-ng]# nohup  flume-ng agent -c conf -f /tmp/flume-ng/conf/avro_memory_kafka.properties  -n a1 -Dflume.root.logger=INFO,console &
[1] 4971
[root@sht-sgmhadoopcm-01 flume-ng]# nohup: ignoring input and appending output to `nohup.out'

[root@sht-sgmhadoopcm-01 flume-ng]#
[root@sht-sgmhadoopcm-01 flume-ng]#
[root@sht-sgmhadoopcm-01 flume-ng]# cat nohup.out


4.检查log收集的三台(收集节点)开启没
[hdfs@flume-agent-01 flume-ng]$ . ~/.bash_profile
[hdfs@flume-agent-02 flume-ng]$ . ~/.bash_profile
[hdfs@flume-agent-03 flume-ng]$ . ~/.bash_profile


[hdfs@flume-agent-01 flume-ng]$ nohup  flume-ng agent -c /tmp/flume-ng/conf -f /tmp/flume-ng/conf/exec_memory_avro.properties -n a1 -Dflume.root.logger=INFO,console &
[hdfs@flume-agent-01 flume-ng]$ nohup  flume-ng agent -c /tmp/flume-ng/conf -f /tmp/flume-ng/conf/exec_memory_avro.properties -n a1 -Dflume.root.logger=INFO,console &
[hdfs@flume-agent-01 flume-ng]$ nohup  flume-ng agent -c /tmp/flume-ng/conf -f /tmp/flume-ng/conf/exec_memory_avro.properties -n a1 -Dflume.root.logger=INFO,console &


5.打开kafka manager监控
http://172.16.101.55:9999

目录
相关文章
|
2月前
|
存储 分布式计算 监控
【Flume】Flume 监听日志文件案例分析
【4月更文挑战第4天】【Flume】Flume 监听日志文件案例分析
|
2月前
|
消息中间件 存储 监控
Flume+Kafka整合案例实现
Flume+Kafka整合案例实现
68 1
|
2月前
|
消息中间件 存储 Kafka
阿里 P7 三面凉凉,kafka Borker 日志持久化没答上来
阿里 P7 三面凉凉,kafka Borker 日志持久化没答上来
|
2月前
|
消息中间件 分布式计算 Kafka
亿万级别Kafka演进之路:可靠性+事务+消息中间件+源码+日志
Kafka起初是由LinkedIn公司采用Scala语言开发的-一个多分区、多副本且基于ZooKeeper协调的分布式消息系统,现已被捐献给Apache基金会。目前Kafka已经定位为一个分布式流式处理平台,它以高吞吐、可持久化、可水平扩展、支持流数据处理等多种特性而被广泛使用。
|
6天前
|
消息中间件 存储 Kafka
Kafka日志处理:深入了解偏移量查找与切分文件
**摘要:** 本文介绍了如何在Kafka中查找偏移量为23的消息,涉及ConcurrentSkipListMap的查询、索引文件的二分查找及日志分段的物理位置搜索。还探讨了Kafka日志分段的切分策略,包括大小、时间、索引大小和偏移量达到特定阈值时的切分条件。理解这些对于优化Kafka的性能和管理日志至关重要。
16 2
|
2月前
|
分布式计算 DataWorks 关系型数据库
DataWorks操作报错合集之在DataWorks中设置了一个任务节点的调度时间,并将其发布到生产环境,但到了指定时间(例如17:30)却没有产生运行实例和相关日志如何解决
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
|
2月前
|
消息中间件 数据可视化 关系型数据库
ELK7.x日志系统搭建 4. 结合kafka集群完成日志系统
ELK7.x日志系统搭建 4. 结合kafka集群完成日志系统
175 0
|
26天前
|
消息中间件 存储 Kafka
go语言并发实战——日志收集系统(二) Kafka简介
go语言并发实战——日志收集系统(二) Kafka简介
|
1月前
|
消息中间件 数据挖掘 Kafka
使用 Flume 将 CSV 数据导入 Kafka:实现实时数据流
使用 Flume 将 CSV 数据导入 Kafka:实现实时数据流
|
12天前
|
消息中间件 NoSQL Kafka
基于Kafka的nginx日志收集分析与监控平台(3)
基于Kafka的nginx日志收集分析与监控平台(3)