flume分布式日志收集测试

本文涉及的产品
传统型负载均衡 CLB,每月750个小时 15LCU
日志服务 SLS,月写入数据量 50GB 1个月
网络型负载均衡 NLB,每月750个小时 15LCU
简介:

官方参考文档

https://flume.apache.org/FlumeUserGuide.html#file-channel

Flume NG是一个分布式、可靠、可用的系统,它能够将不同数据源的海量日志数据进行高效收集、聚合、移动,最后存储到一个中心化数据存储系统中。由原来的Flume OG到现在的Flume NG,进行了架构重构,并且现在NG版本完全不兼容原来的OG版本。经过架构重构后,Flume NG更像是一个轻量的小工具,非常简单,容易适应各种方式日志收集,并支持failover和负载均衡。

架构设计要点

Flume的架构主要有一下几个核心概念:

  • Event:一个数据单元,带有一个可选的消息头

  • Flow:Event从源点到达目的点的迁移的抽象

  • Client:操作位于源点处的Event,将其发送到Flume Agent

  • Agent:一个独立的Flume进程,包含组件Source、Channel、Sink

  • Source:用来消费传递到该组件的Event

  • Channel:中转Event的一个临时存储,保存有Source组件传递过来的Event

  • Sink:从Channel中读取并移除Event,将Event传递到Flow Pipeline中的下一个Agent(如果有的话)

Flume NG架构,如图所示:

flume-ng-architecture


外部系统产生日志,直接通过Flume的Agent的Source组件将事件(如日志行)发送到中间临时的channel组件,最后传递给Sink组件,HDFS Sink组件可以直接把数据存储到HDFS集群上。
一个最基本Flow的配置,格式如下:

1
2
3
4
5
6
7
8
9
10
11
12
# list the sources, sinks and channels for the agent
<Agent>.sources = <Source1> <Source2>
<Agent>.sinks = <Sink1> <Sink2>
<Agent>.channels = <Channel1> <Channel2>
 
# set channel for source
<Agent>.sources.<Source1>.channels = <Channel1> <Channel2> ...
<Agent>.sources.<Source2>.channels = <Channel1> <Channel2> ...
 
# set channel for sink
<Agent>.sinks.<Sink1>.channel = <Channel1>
<Agent>.sinks.<Sink2>.channel = <Channel2>


尖括号里面的,我们可以根据实际需求或业务来修改名称。下面详细说明:
表示配置一个Agent的名称,一个Agent肯定有一个名称。与是Agent的Source组件的名称,消费传递过来的Event。与是Agent的Channel组件的名称。与是Agent的Sink组件的名称,从Channel中消费(移除)Event。
上面配置内容中,第一组中配置Source、Sink、Channel,它们的值可以有1个或者多个;第二组中配置Source将把数据存储(Put)到哪一个Channel中,可以存储到1个或多个Channel中,同一个Source将数据存储到多个Channel中,实际上是Replication;第三组中配置Sink从哪一个Channel中取(Task)数据,一个Sink只能从一个Channel中取数据。
下面,根据官网文档,我们展示几种Flow Pipeline,各自适应于什么样的应用场景:

  • 多个Agent顺序连接

flume-multiseq-agents
可以将多个Agent顺序连接起来,将最初的数据源经过收集,存储到最终的存储系统中。这是最简单的情况,一般情况下,应该控制这种顺序连接的Agent的数量,因为数据流经的路径变长了,如果不考虑failover的话,出现故障将影响整个Flow上的Agent收集服务。

  • 多个Agent的数据汇聚到同一个Agent

flume-join-agent
这种情况应用的场景比较多,比如要收集Web网站的用户行为日志,Web网站为了可用性使用的负载均衡的集群模式,每个节点都产生用户行为日志,可以为每个节点都配置一个Agent来单独收集日志数据,然后多个Agent将数据最终汇聚到一个用来存储数据存储系统,如HDFS上。

  • 多路(Multiplexing)Agent

flume-multiplexing-agent
这种模式,有两种方式,一种是用来复制(Replication),另一种是用来分流(Multiplexing)。Replication方式,可以将最前端的数据源复制多份,分别传递到多个channel中,每个channel接收到的数据都是相同的,配置格式,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
# List the sources, sinks and channels for the agent
<Agent>.sources = <Source1>
<Agent>.sinks = <Sink1> <Sink2>
<Agent>.channels = <Channel1> <Channel2>
 
# set list of channels for source (separated by space)
<Agent>.sources.<Source1>.channels = <Channel1> <Channel2>
 
# set channel for sinks
<Agent>.sinks.<Sink1>.channel = <Channel1>
<Agent>.sinks.<Sink2>.channel = <Channel2>
 
<Agent>.sources.<Source1>.selector. type  = replicating


上面指定了selector的type的值为replication,其他的配置没有指定,使用的Replication方式,Source1会将数据分别存储到Channel1和Channel2,这两个channel里面存储的数据是相同的,然后数据被传递到Sink1和Sink2。
Multiplexing方式,selector可以根据header的值来确定数据传递到哪一个channel,配置格式,如下所示:

1
2
3
4
5
6
7
8
9
# Mapping for multiplexing selector
<Agent>.sources.<Source1>.selector. type  = multiplexing
<Agent>.sources.<Source1>.selector.header = <someHeader>
<Agent>.sources.<Source1>.selector.mapping.<Value1> = <Channel1>
<Agent>.sources.<Source1>.selector.mapping.<Value2> = <Channel1> <Channel2>
<Agent>.sources.<Source1>.selector.mapping.<Value3> = <Channel2>
#...
 
<Agent>.sources.<Source1>.selector.default = <Channel2>



上面selector的type的值为multiplexing,同时配置selector的header信息,还配置了多个selector的mapping的值,即header的值:如果header的值为Value1、Value2,数据从Source1路由到Channel1;如果header的值为Value2、Value3,数据从Source1路由到Channel2。

  • 实现load balance功能

flume-load-balance-agents
Load balancing Sink Processor能够实现load balance功能,上图Agent1是一个路由节点,负责将Channel暂存的Event均衡到对应的多个Sink组件上,而每个Sink组件分别连接到一个独立的Agent上,示例配置,如下所示:


1
2
3
4
5
6
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2 k3
a1.sinkgroups.g1.processor. type  = load_balance
a1.sinkgroups.g1.processor.backoff =  true
a1.sinkgroups.g1.processor.selector = round_robin
a1.sinkgroups.g1.processor.selector.maxTimeOut=10000
  • 实现failover能

Failover Sink Processor能够实现failover功能,具体流程类似load balance,但是内部处理机制与load balance完全不同:Failover Sink Processor维护一个优先级Sink组件列表,只要有一个Sink组件可用,Event就被传递到下一个组件。如果一个Sink能够成功处理Event,则会加入到一个Pool中,否则会被移出Pool并计算失败次数,设置一个惩罚因子,示例配置如下所示:

1
2
3
4
5
6
7
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2 k3
a1.sinkgroups.g1.processor. type  = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 7
a1.sinkgroups.g1.processor.priority.k3 = 6
a1.sinkgroups.g1.processor.maxpenalty = 20000

基本功能

我们看一下,Flume NG都支持哪些功能(目前最新版本是1.5.0.1),了解它的功能集合,能够让我们在应用中更好地选择使用哪一种方案。说明Flume NG的功能,实际还是围绕着Agent的三个组件Source、Channel、Sink来看它能够支持哪些技术或协议。我们不再对各个组件支持的协议详细配置进行说明,通过列表的方式分别对三个组件进行概要说明:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
Source类型                    说明
Avro Source                    支持Avro协议(实际上是Avro RPC),内置支持
Thrift Source                  支持Thrift协议,内置支持
Exec Source                    基于Unix的 command 在标准输出上生产数据
JMS Source                 从JMS系统(消息、主题)中读取数据,ActiveMQ已经测试过
Spooling Directory Source 监控指定目录内数据变更
Twitter 1% firehose Source   通过API持续下载Twitter数据,试验性质
Netcat Source                  监控某个端口,将流经端口的每一个文本行数据作为Event输入
Sequence Generator Source 序列生成器数据源,生产序列数据
Syslog Sources                 读取syslog数据,产生Event,支持UDP和TCP两种协议
HTTP Source                        基于HTTP POST或GET方式的数据源,支持JSON、BLOB表示形式
Legacy Sources                 兼容老的Flume OG中Source(0.9.x版本)
Flume Channel
###############################################################
Channel类型                   说明
Memory Channel                 Event数据存储在内存中
JDBC Channel                   Event数据存储在持久化存储中,当前Flume Channel内置支持Derby
File Channel                   Event数据存储在磁盘文件中
Spillable Memory Channel  Event数据存储在内存中和磁盘上,当内存队列满了,会持久化到磁盘文件(当前试验性的,不建议生产环境使用)
Pseudo Transaction Channel    测试用途
Custom Channel                 自定义Channel实现
Flume Sink
###################################################################
Sink类型                  说明
HDFS Sink                  数据写入HDFS
Logger Sink                    数据写入日志文件
Avro Sink                  数据被转换成Avro Event,然后发送到配置的RPC端口上
Thrift Sink                    数据被转换成Thrift Event,然后发送到配置的RPC端口上
IRC Sink                   数据在IRC上进行回放
File Roll Sink            存储数据到本地文件系统
Null Sink                  丢弃到所有数据
HBase Sink                 数据写入HBase数据库
Morphline Solr Sink           数据发送到Solr搜索服务器(集群)
ElasticSearch Sink         数据发送到Elastic Search搜索服务器(集群)
Kite Dataset Sink         写数据到Kite Dataset,试验性质的
Custom Sink                    自定义Sink实现
#################################################################
另外还有Channel Selector、Sink Processor、Event Serializer、Interceptor等组件,可以参考官网提供的用户手册。
  1. 安装配置略,可以参考网上教程


  2. 下面是测试的配置文件

    agent 配置文件如下



  3. 1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    # Name the  components on this agent
    a1.sources =  r1
    a1.sinks =  k1
    a1.channels  = c1
     
    a1.sinks.k1. type  = avro 
    a1.sinks.k1. hostname  = 127.0.0.1
    a1.sinks.k1.port = 44444 
    a1.sinks.k1.channel = c1
     
    # Use a  channel which buffers events in memory
    a1.channels.c1. type   = memory
    a1.channels.c1.capacity  = 1000
    a1.channels.c1.transactionCapacity  = 100
     
    a1.sources.r1. type   exec
    a1.sources.r1. command   tail  -F   /var/log/nginx/access .log
    a1.sources.r1.channels  = c1
    a1.sources.s.deserializer.maxLineLength=65535

server端配置文件如下:  测试复制(Replication)1个source 复制到多个channels 输出到多个sink

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# Name the  components on this agent
b1.sources =  r1
b1.sinks =  k1 k2 k3 
b1.channels  = c1 c2 c3
b1.sources.r1.selector. type  = replicating
 
 
 
b1.sources.r1. type  = avro 
b1.sources.r1.channels = c1 c2 c3 
b1.sources.r1.bind = 0.0.0.0 
b1.sources.r1.port = 44444
 
 
b1.channels.c1. type  file 
b1.channels.c1.write-timeout = 10 
b1.channels.c1.keep-alive = 10 
b1.channels.c1.checkpointDir =  /flume/check
b1.channels.c1.useDualCheckpoints =  true 
b1.channels.c1.backupCheckpointDir =  /flume/backup
b1.channels.c1.dataDirs =  /flume
 
 
b1.channels.c2. type =memory  
b1.channels.c2.capacity=2000000  
b1.channels.c2.transactionCapacity=10000  
 
 
b1.channels.c3. type =memory  
b1.channels.c3.capacity=2000000  
b1.channels.c3.transactionCapacity=10000  
# Describe the sink 
b1.sinks.k1. type   = hdfs
b1.sinks.k1.channel  = c1
b1.sinks.k1.hdfs.path  = hdfs: //localhost :9000 /user/hadoop/flume/collected/
b1.sinks.k1.hdfs.filePrefix  = chen_test
b1.sinks.k1.hdfs.round  =  true
b1.sinks.k1.hdfs.roundValue  = 10
b1.sinks.k1.hdfs.roundUnit  = minute
 
 
b1.sinks.k2.channel = c2
b1.sinks.k2. type  = file_roll
b1.sinks.k2.batchSize = 100000000
b1.sinks.k2.rollInterval = 1000000
b1.sinks.k2.serializer = TEXT
b1.sinks.k2.sink.directory =  /var/log/flume
 
 
b1.sinks.k3.channel = c3 
b1.sinks.k3. type  = logger

启动测试命令

flume-ng agent -c . -f test.conf   -n b1  -Dflume.root.logger=INFO,console

-c  配置文件目录  -f配置文件  -n  节点名字  和配置文件对应     console打到终端



5.flume-ng负载均衡load-balance、failover集群搭建

参考链接

http://blog.csdn.net/lskyne/article/details/37662835


6. 测试   区分 flume日志合并在一起的日志

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
a1配置
[root@host_12  test ] # cat  a1.conf 
# Name the  components on this agent
a1.sources =  r1
a1.sinks =  k1
a1.channels  = c1
 
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1. type =static
a1.sources.r1.interceptors.i1.key=nginx
a1.sources.r1.interceptors.i1.value=nginx_1
a1.sources.r1.interceptors.i1.preserveExisting= false
 
 
#a1.sources.r1.interceptors = i1
#a1.sources.r1.interceptors.i1.type = host
#a1.sources.r1.interceptors.i1.hostHeader = hostname
 
a1.sinks.k1. type  = avro 
a1.sinks.k1. hostname  = 127.0.0.1
a1.sinks.k1.port = 44444 
a1.sinks.k1.channel = c1
 
# Use a  channel which buffers events in memory
a1.channels.c1. type   = memory
a1.channels.c1.capacity  = 1000
a1.channels.c1.transactionCapacity  = 100
 
a1.sources.r1. type   exec
###匹配shell  tomcat yyyy:mm:dd:hh格式的日志
a1.sources.r1.shell =  /bin/bash  -c
a1.sources.r1. command   tail  -f   /var/log/nginx_1/access_ ` date  +%Y%m%d%H`.log
a1.sources.r1.channels  = c1
 
#########匹配替换行里的文本的内容
#a1.sources.r1.interceptors = i1
#a1.sources.r1.interceptors.i1.type = search_replace
#a1.sources.r1.interceptors.i1.searchPattern = [0-9]+
#a1.sources.r1.interceptors.i1.replaceString = lxw1234
#a1.sources.r1.interceptors.i1.charset = UTF-8
###########################################
a2配置
 
[root@host_12  test ] # cat  a2.conf 
# Name the  components on this agent
a2.sources =  r1
a2.sinks =  k1
a2.channels  = c1
 
a2.sources.r1.interceptors = i1
a2.sources.r1.interceptors.i1. type =static
a2.sources.r1.interceptors.i1.key=nginx
a2.sources.r1.interceptors.i1.value=nginx_2
a2.sources.r1.interceptors.i1.preserveExisting= false
 
 
a2.sinks.k1. type  = avro 
a2.sinks.k1. hostname  = 127.0.0.1
a2.sinks.k1.port = 44444 
a2.sinks.k1.channel = c1
 
# Use a  channel which buffers events in memory
a2.channels.c1. type   = memory
a2.channels.c1.capacity  = 1000
a2.channels.c1.transactionCapacity  = 100
 
a2.sources.r1. type   exec
a2.sources.r1.shell =  /bin/bash  -c
a2.sources.r1. command   tail  -f   /var/log/nginx_2/access_ ` date  +%Y%m%d%H`.log
a2.sources.r1.channels  = c1
 
####################################
server配置
[root@host_12  test ] # cat   h1.conf
# Name the  components on this agent
serv_1.sources =  r1
serv_1.sinks =   k2 k3 
serv_1.channels  =  c2 c3
#serv_1.sources.r1.selector.type = replicating
 
serv_1.sources.r1.selector. type  = multiplexing
serv_1.sources.r1.selector.header = nginx
serv_1.sources.r1.selector.mapping.nginx_1 = c2
serv_1.sources.r1.selector.mapping.nginx_2 = c3
  
  
  
serv_1.sources.r1. type  = avro 
serv_1.sources.r1.channels =  c2 c3 
serv_1.sources.r1.bind = 0.0.0.0 
serv_1.sources.r1.port = 44444
  
  
  
  
serv_1.channels.c2. type =memory  
serv_1.channels.c2.capacity=2000000  
serv_1.channels.c2.transactionCapacity=10000  
  
  
serv_1.channels.c3. type =memory  
serv_1.channels.c3.capacity=2000000  
serv_1.channels.c3.transactionCapacity=10000  
  
  
serv_1.sinks.k2.channel = c2
serv_1.sinks.k2. type  = file_roll
serv_1.sinks.k2.batchSize = 100000000
serv_1.sinks.k2.rollInterval = 1000000
serv_1.sinks.k2.serializer = TEXT
serv_1.sinks.k2.sink.directory =  /var/log/flume/
  
  
serv_1.sinks.k3.channel = c3 
serv_1.sinks.k3. type  = logger

本文转自   tianshuai369   51CTO博客,原文链接:http://blog.51cto.com/kkkkkk/1722390

相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
相关文章
|
1月前
|
存储 分布式计算 监控
【Flume】Flume 监听日志文件案例分析
【4月更文挑战第4天】【Flume】Flume 监听日志文件案例分析
|
1月前
|
存储 运维 监控
【Flume】flume 日志管理中的应用
【4月更文挑战第4天】【Flume】flume 日志管理中的应用
|
1月前
|
SQL Oracle 关系型数据库
oracle11g SAP测试机归档日志暴增排查(二)
oracle11g SAP测试机归档日志暴增排查(二)
96 1
|
1月前
|
Oracle 关系型数据库 Shell
oracle11g SAP测试机归档日志暴增排查(一)
oracle11g SAP测试机归档日志暴增排查(一)
28 1
|
1月前
|
SQL Java 数据库连接
Mybatis之Mybatis简介、搭建Mybatis相关步骤(开发环境、maven、核心配置文件、mapper接口、映射文件、junit测试、log4j日志)
【1月更文挑战第2天】 MyBatis最初是Apache的一个开源项目iBatis, 2010年6月这个项目由Apache Software Foundation迁移到了Google Code。随着开发团队转投Google Code旗下,iBatis3.x正式更名为MyBatis。代码于2013年11月迁移到Github iBatis一词来源于“internet”和“abatis”的组合,是一个基于Java的持久层框架。iBatis提供的持久层框架包括SQL Maps和Data Access Objects(DAO)
224 3
Mybatis之Mybatis简介、搭建Mybatis相关步骤(开发环境、maven、核心配置文件、mapper接口、映射文件、junit测试、log4j日志)
|
1月前
|
关系型数据库 MySQL 数据库
测试部署PolarDB-X 分布式与集中式
在本文中,作者详述了在CentOS 7.9上部署测试PolarDB-X分布式与集中式数据库的过程。PolarDB-X作为阿里云优化的分布式数据库,提供高稳定性和与MySQL的兼容性,是应对单体数据库扩展性和性能瓶颈的解决方案,同时也符合国产化需求。文章介绍了部署环境准备,包括关闭防火墙和SELinux,设置系统参数,安装Python3和Docker,以及配置MySQL客户端。接着,通过PXD工具部署了PolarDB-X的集中式和分布式版,遇到的问题包括阿里云镜像源异常导致的部署失败以及指定版本安装的困扰。最后,作者进行了初步的压力测试,并对文档完善、生态工具建设以及提供更多使用案例提出了建议。
47748 10
测试部署PolarDB-X 分布式与集中式
|
9天前
|
SQL 监控 中间件
【应急响应】拒绝服务&钓鱼指南&DDOS压力测试&邮件反制分析&应用日志
【应急响应】拒绝服务&钓鱼指南&DDOS压力测试&邮件反制分析&应用日志
|
30天前
|
算法 安全 程序员
揭秘分布式系统:日志复制如何保障数据一致性?
本文介绍了分布式系统中的日志复制技术,这是保证高可用性和数据一致性的重要手段。以Raft算法为例,文章阐述了Leader如何将客户端请求复制到Follower的日志中:Leader首先记录请求,然后通过RPC发送给Follower,等待ACK确认,必要时进行重试。当多数Follower确认后,Leader提交日志并通知Follower。文中还提到了网络分区和日志一致性等挑战,以及应对策略,如超时机制、领导选举、日志匹配和压缩。最后,强调了日志复制在面对故障时确保系统一致性和可用性的作用。
212 4
|
7天前
|
存储 分布式计算 监控
分布式系统详解--框架(Hadoop-HDFS的HA搭建及测试)
分布式系统详解--框架(Hadoop-HDFS的HA搭建及测试)
20 0
|
1月前
|
存储 测试技术 C++
P2P网络下分布式文件共享场景的测试
P2P网络下分布式文件共享场景的测试
229 6

热门文章

最新文章