【Flume】(一)Flume 高可用的、高可靠的、分布式日志收集系统1

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: 【Flume】(一)Flume 高可用的、高可靠的、分布式日志收集系统1

文章目录


一、初识 Flume

二、安装 Flume

三、简单案例实现(单节点实现)

四、Flume Source

1、netcat 源

2、avro 源

3、exec 源

4、JMS 源

5、Spooling Directory 源

6、Kafka 源

五、Flume Channel

六、Flume Sinks


一、初识 Flume


Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统, Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。


为什么选用Flume


20200907223402284.png


Flume最重要的作用就是,实时读取服务器本地磁盘的数据,将数据写入到HDFS。


架构:


Flume 组成架构如图所示


20200207175040463.png


运行机制:


Flume 的核心是把数据从数据源(source)收集过来,在将收集到的数据送到指定的目的地(sink)。为了保证输送的过程一定成功,在送到目的地(sink)之前,会先缓存数据(channel),待数据真正到达目的地(sink)后,Flume 在删除自己缓存的数据。


核心的角色是 Agent, Agent 本身是一个 Java 进程, 一般运行在日志收集节点。 Flume 采集系统就是由一个个 Agent 所连接起来形成。


agent有三个组件:


Source:采集源,用于跟数据源对接,以获取数据。source输入端常见的类型有:spooling directory、exec、syslog、avro、netcat等。

Channel: Agent 内部的数据传输通道,是位于Source和Sink之间的缓冲区。

Sink:下沉地,采集数据的传送目的地,用于往下一级 agent 传递数据或者往最终存储系统传递数据。sink组件常见的目的地包括:HDFS、Kafka、logger、File等。


二、安装 Flume


下载安装包

下载地址:http://flume.apache.org/download.html


解压并配置

# 1、上传软件包
# 2、解压
tar -zxvf flume-ng-1.6.0-cdh5.14.2.tar.gz
mv apache-flume-1.6.0-cdh5.14.2-bin soft/flume160 # 移动到我集群 /opt/soft/下
# 3、修改conf/flume-env.sh  
 ## 将配置jdk环境变量的注释放开,并修改滑稽变量
 ## 删除 docs目录, docs 保存了这个版本的官方文档 , 可以通过浏览器查看, 但是在虚拟机中无法查看,在分布式配置分发时会影响分发效率(图1 )
rm -rf docs/
注意:JAVA_OPTS 配置  如果我们传输文件过大 报内存溢出时 需要修改这个配置项
cp flume-env.sh.template flume-env.sh
vi flume-env.sh
#配置java路径
JAVA_HOME=/opt/soft/jdk180
# 4、验证安装是否成功
./flume-ng version
#安装netcat
yum -y install nmap-ncat


20200207183820100.png


三、简单案例实现(单节点实现)


注意: 现在版本已经更新到1.9 ,而本人使用的是1.6的版本, 配置方式早已相差甚远

因此可以通过在Windows下打开该软件包中的docs/ 目录下的index.html查看相应版本的教程


官网教程配置


博主这里在 /opt/soft/ 路径下新建了 flumeconf 文件夹


20200207184911530.png


配置一个properties , 内容如下:

#组件别名
a1.sources=r1  #//数据来源,可以多个,中间用空格分隔
a1.sinks=k1  #//传输管道,一般只有一个,可以多个
a1.channels=c1 #//数据沉淀,可以多个,中间用空格分隔
#使用监控
a1.sources.r1.type=netcat  #//数据来源类型是输入
#指定IP
a1.sources.r1.bind=localhost  
# 指定端口号
a1.sources.r1.port=44444  
a1.sinks.k1.type=logger
#管道描述
a1.channels.c1.type=memory    #//传输管道的参数,类型是内存传输
#capacity:默认该通道中最大的可以存储的 event 数量
a1.channels.c1.capacity=1000 
# trasactionCapacity:每次最大可以从 source 中拿到或者送到 sink 中的 event数量
a1.channels.c1.transactionCapacity=100
# 绑定组件
a1.sources.r1.channels=c1   #//指定传输管道
a1.sinks.k1.channel=c1


运行该文件, 前置运行


#运行该案例,-conf-file 文件名,生成后的文件名 a1, -Dflume.root.logger 日志输出街边,console在控制台输出

flume-ng agent --conf-file option --name a1 -Dflume.root.logger=INFO,console

./flume-ng agent -n a1 -c conf -f /opt/soft/flumeconf/demo.properties -Dflume.root.logger=INFO,console


[root@zj1 ~]# nc localhost 44444


20200207193847999.png


可以看到数据被完整的显示, 以及每个字符的16进制表示。


四、Flume Source


Source是从其他生产数据的应用中接受数据的组件。Source可以监听一个或者多个网络端口,用于接受数据或者从本地文件系统中读取数据,每个Source必须至少连接一个Channel。当然一个Source也可以连接多个Channnel,这取决于系统设计的需要。


所有的Flume Source如下 ,下面将介绍一些主要的源:


image.png


1、netcat 源


netcat的源在给定端口上侦听并将每一行文本转换为事件。表现得像数控nc -k -l [host] [port].…换句话说,它打开指定的端口并侦听数据。期望提供的数据是换行符分隔的文本。每一行文本都被转换成一个sink事件,并通过连接的通道发送。

常用于单节点的配置


2、avro 源


侦听Avro端口并从外部Avro客户端流接收事件。当在另一个(前一跳)sink代理上与内置的Avro Sink配对时,它可以创建分层的集合拓扑。

我们搭建多Agent流的环境使用的就是avro源


3、exec 源


exec源在启动时运行给定的unix命令,并期望该进程在标准输出上不断生成数据(stderr被简单丢弃,除非属性logStdErr设置为true)。如果进程因任何原因退出,源也会退出,并且不会产生进一步的数据。

这意味着配置例如 cat [named pipe] or tail -F [file]将产生预期的结果日期可能不会-前两个命令生成数据流,后者生成单个事件并退出。


利用exec源监控某个文件


利用node2上的 flume 进行配置

官方介绍如下


1.编写自定义配置文件 option-exec

[root@node2 dirflume]# vim option-exec
# 配置文件内容
# 主要是通过 a1.sources.r1.command = tail -F /root/log.txt 这条配置来监控log.txt文件中的内容
# example.conf: A single-node Flume configuration
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /root/log.txt
a1.sources.r1.channels = c1
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1


2.创建被监控的文件( 文件没有会自动创建, 但是下面演示目录不会)

vim  /root/log.txt
# 文件内容如下
hello flume


3.启动 flume ,查看结果( 图1)

flume-ng agent  --conf-file option-exec --name a1 -Dflume.root.logger=INFO,console


4.我们可以通过echo 向文件中追加内容 ,查看node2的 flume的阻塞式界面是否显示数据(图2,图3)

echo 'hello flume' >> /root/log.txt
echo 'hello flume' >> /root/log.txt
echo 'hello flume' >> /root/log.txt
....


注意 :

a.我们通常在项目中使用exec源来监控某些日志文件的数据

b.我们可以通过修改配置文件中的a1.sources.r1.command = tail -F /root/log.txt配置来决定是否在一开始读取时读取全部文件,如果我们使用的是 tail -f -n 3 /root/log.txt 则是从倒数第三行开始输出


图1


20200207212039245.png


图2


20200207212051261.png


图3


20200207212103471.png


4、JMS 源


JMS源从JMS目的地(如队列或主题)读取消息。作为JMS应用程序,它应该与任何JMS提供程序一起工作,但只在ActiveMQ中进行了测试。JMS源提供可配置的批处理大小、消息选择器、用户/传递和消息到Flume事件转换器。请注意,供应商提供的JMS JAR应该使用命令行上的plugins.d目录(首选)、-classpath或Flume_CLASSPATH变量(flume-env.sh)包含在Flume类路径中

现在来说用处不大


5、Spooling Directory 源


通过此源,您可以通过将要摄取的文件放入磁盘上的“Spooling”目录中来摄取数据。该源将监视指定目录中的新文件,并从出现的新文件中解析事件。事件解析逻辑是可插入的。将给定文件完全读入通道后,将其重命名以指示完成(或选择删除)。


与Exec源不同,此源是可靠的,即使Flume重新启动或终止,它也不会丢失数据。为了获得这种可靠性,必须仅将不可变的唯一命名的文件放入Spooling目录中。Flume尝试检测这些问题情况,如果违反这些条件,将返回失败:


如果将文件放入Spooling目录后写入文件,Flume将在其日志文件中打印错误并停止处理。


如果以后再使用文件名,Flume将在其日志文件中打印错误并停止处理。

为避免上述问题,将唯一的标识符(例如时间戳)添加到日志文件名称(当它们移到Spooling目录中时)可能会很有用。


尽管有此来源的可靠性保证,但是在某些情况下,如果发生某些下游故障,则事件可能会重复。这与Flume其他组件提供的保证是一致的。


官方介绍如下


利用Spooling Directory源监控目录


1.修改自定义配置文件( vim .option-spooldir),内容如下

# example.conf: A single-node Flume configuration
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /root/log
a1.sources.r1.fileHeader = false
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1


2.根据配置文件中a1.sources.r1.spoolDir = /root/log 的配置,创建 .root/log目录


3.启动 flume

flume-ng agent  --conf-file option-spooldir --name a1 -Dflume.root.logger=INFO,console


4.其他文件的目录下的文件移动到 /root/log文件夹下, 观察flume的阻塞式界面(图1)

可以看到,被读取后文件的后缀名会被修改( 图2 )


5.补充: 我们可以自定义这个后缀名 ,通过图3 设置 ,效果如图4 所示


图1


20191116214130845.png


图2


2019111621425467.png

图3

通过修改自定义文件的下面配置, 可以设置文件被读取后的后缀名 ,默认是 .completed


20191116214440532.png


图4

修改后,再次启动 flume,查看被读取目录下的文件,可以看到被读取的文件后缀变成了 .sxt结尾


20191116214659564.png


6、Kafka 源


KafkaSource是一个ApacheKafka消费者,负责阅读来自Kafka主题的信息。如果您有多个Kafka源正在运行,您可以使用相同的ConsumerGroup来配置它们,这样每个用户都会为主题读取一组唯一的分区。


注意: Kafka Source覆盖两个Kafka消费者参数:


auto.committee.Enable被源设置为“false”,我们提交每一批。为了提高性能,可以将其设置为“true”,但是,这可能导致数据使用者的丢失。

timeout.ms被设置为10 ms,所以当我们检查Kafka是否有新数据时,我们最多要等待10 ms才能到达,将其设置为更高的值可以降低CPU利用率(我们将在较少的紧循环中轮询Kafka),但也意味着写入通道的延迟更高(因为我们将等待更长的数据到达时间)


部分配置参数参考

ier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.channels = channel1
tier1.sources.source1.zookeeperConnect = localhost:2181
tier1.sources.source1.topic = test1
tier1.sources.source1.groupId = flume
tier1.sources.source1.kafka.consumer.timeout.ms = 100


相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
目录
相关文章
|
1月前
|
机器学习/深度学习 存储 运维
分布式机器学习系统:设计原理、优化策略与实践经验
本文详细探讨了分布式机器学习系统的发展现状与挑战,重点分析了数据并行、模型并行等核心训练范式,以及参数服务器、优化器等关键组件的设计与实现。文章还深入讨论了混合精度训练、梯度累积、ZeRO优化器等高级特性,旨在提供一套全面的技术解决方案,以应对超大规模模型训练中的计算、存储及通信挑战。
81 4
|
2月前
|
存储 运维 负载均衡
构建高可用性GraphRAG系统:分布式部署与容错机制
【10月更文挑战第28天】作为一名数据科学家和系统架构师,我在构建和维护大规模分布式系统方面有着丰富的经验。最近,我负责了一个基于GraphRAG(Graph Retrieval-Augmented Generation)模型的项目,该模型用于构建一个高可用性的问答系统。在这个过程中,我深刻体会到分布式部署和容错机制的重要性。本文将详细介绍如何在生产环境中构建一个高可用性的GraphRAG系统,包括分布式部署方案、负载均衡、故障检测与恢复机制等方面的内容。
142 4
构建高可用性GraphRAG系统:分布式部署与容错机制
|
1月前
|
存储 运维 数据可视化
如何为微服务实现分布式日志记录
如何为微服务实现分布式日志记录
81 1
|
2月前
|
机器学习/深度学习 人工智能 分布式计算
【AI系统】分布式通信与 NVLink
进入大模型时代后,AI的核心转向大模型发展,训练这类模型需克服大量GPU资源及长时间的需求。面对单个GPU内存限制,跨多个GPU的分布式训练成为必要,这涉及到分布式通信和NVLink技术的应用。分布式通信允许多个节点协作完成任务,而NVLink则是一种高速、低延迟的通信技术,用于连接GPU或GPU与其它设备,以实现高性能计算。随着大模型的参数、数据规模扩大及算力需求增长,分布式并行策略,如数据并行和模型并行,变得至关重要。这些策略通过将模型或数据分割在多个GPU上处理,提高了训练效率。此外,NVLink和NVSwitch技术的持续演进,为GPU间的高效通信提供了更强的支持,推动了大模型训练的快
52 0
|
3月前
|
消息中间件 中间件 数据库
NServiceBus:打造企业级服务总线的利器——深度解析这一面向消息中间件如何革新分布式应用开发与提升系统可靠性
【10月更文挑战第9天】NServiceBus 是一个面向消息的中间件,专为构建分布式应用程序设计,特别适用于企业级服务总线(ESB)。它通过消息队列实现服务间的解耦,提高系统的可扩展性和容错性。在 .NET 生态中,NServiceBus 提供了强大的功能,支持多种传输方式如 RabbitMQ 和 Azure Service Bus。通过异步消息传递模式,各组件可以独立运作,即使某部分出现故障也不会影响整体系统。 示例代码展示了如何使用 NServiceBus 发送和接收消息,简化了系统的设计和维护。
79 3
|
3月前
|
消息中间件 存储 监控
消息队列系统中的确认机制在分布式系统中如何实现?
消息队列系统中的确认机制在分布式系统中如何实现?
|
3月前
|
NoSQL Java Redis
太惨痛: Redis 分布式锁 5个大坑,又大又深, 如何才能 避开 ?
Redis分布式锁在高并发场景下是重要的技术手段,但其实现过程中常遇到五大深坑:**原子性问题**、**连接耗尽问题**、**锁过期问题**、**锁失效问题**以及**锁分段问题**。这些问题不仅影响系统的稳定性和性能,还可能导致数据不一致。尼恩在实际项目中总结了这些坑,并提供了详细的解决方案,包括使用Lua脚本保证原子性、设置合理的锁过期时间和使用看门狗机制、以及通过锁分段提升性能。这些经验和技巧对面试和实际开发都有很大帮助,值得深入学习和实践。
太惨痛: Redis 分布式锁 5个大坑,又大又深, 如何才能 避开 ?
|
1月前
|
存储 NoSQL Java
使用lock4j-redis-template-spring-boot-starter实现redis分布式锁
通过使用 `lock4j-redis-template-spring-boot-starter`,我们可以轻松实现 Redis 分布式锁,从而解决分布式系统中多个实例并发访问共享资源的问题。合理配置和使用分布式锁,可以有效提高系统的稳定性和数据的一致性。希望本文对你在实际项目中使用 Redis 分布式锁有所帮助。
132 5
|
2月前
|
NoSQL Java 数据处理
基于Redis海量数据场景分布式ID架构实践
【11月更文挑战第30天】在现代分布式系统中,生成全局唯一的ID是一个常见且重要的需求。在微服务架构中,各个服务可能需要生成唯一标识符,如用户ID、订单ID等。传统的自增ID已经无法满足在集群环境下保持唯一性的要求,而分布式ID解决方案能够确保即使在多个实例间也能生成全局唯一的标识符。本文将深入探讨如何利用Redis实现分布式ID生成,并通过Java语言展示多个示例,同时分析每个实践方案的优缺点。
77 8
|
2月前
|
NoSQL Redis
Redis分布式锁如何实现 ?
Redis分布式锁通过SETNX指令实现,确保仅在键不存在时设置值。此机制用于控制多个线程对共享资源的访问,避免并发冲突。然而,实际应用中需解决死锁、锁超时、归一化、可重入及阻塞等问题,以确保系统的稳定性和可靠性。解决方案包括设置锁超时、引入Watch Dog机制、使用ThreadLocal绑定加解锁操作、实现计数器支持可重入锁以及采用自旋锁思想处理阻塞请求。
64 16