【Flume中间件】(8)channel选择器副本机制

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 【Flume中间件】(8)channel选择器副本机制

channel选择器副本机制

我们现在的需求就是实时的监听一个文件的变化,然后将变化的数据上传到HDFS,另外需要在本地文件系统保存副本,所以此次的sink去向有两个,一个是HDFS,另外一个就是本地文件系统。

其实我们可以用一个flume就可以实现将第一个flume的sink分别设置为两个去向,为什么下图要多用两个flume呢?是因为有时HDFSsink端写数据较慢,生产数据较快,那就有可能生产大于消费,刚开始还可以在channel进行缓存,如果channel也满了就会崩掉,多两个flume也是为了增加缓冲。

上图我们可以看到第一个flume从source监听数据,然后交给两个channel,这就需要两个channel收到的数据是一致的,所以channel选择器需要配置成副本,然后两个channel分别发往对应的sink,这里sink采用的是avro,它是用于两个flume之间联系的组件,如果两个flume需要联系,上一个flume的sink就要配置avro,下端的flume的source也要配置相应的avro。

avro的sink就是将数据发往指定IP的某个端口,avro的source就是从某个IP地址的指定端口进行获取数据。

Flume2配置的就是avrosource,然后sink端就是HDFS。

Flume3同样配置avro,但是sink端配置的是file_roll,用于传输到本地文件系统

Flume1

a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
a1.sources.r1.selector.type=replicating
a1.sources.r1.type = taildir
a1.sources.r1.channels = c1
a1.sources.r1.positionFile = /opt/module/flume/position/position.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /home/hadoop/data3/data
a1.sinks.k1.type = avro
a1.sinks.k1.hostname=hadoop102
a1.sinks.k1.port=4141
a1.sinks.k2.type = avro
a1.sinks.k2.hostname=hadoop102
a1.sinks.k2.port=4142
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1      
a1.sinks.k2.channel = c2

Flume2

a2.sources = r1
a2.sinks = k1
a2.channels = c1
a2.sources.r1.type = avro
a2.sources.r1.bind=hadoop102
a2.sources.r1.port=4141
a2.sinks.k1.type = hdfs
# 配置hdfs存储的路径,这里我设置的是动态路径,按照时间命名的文件夹
a2.sinks.k1.hdfs.path=hdfs://hadoop102:9000/flume/%Y%m%d/%H
# 给存储到HDFS的文件加个前缀标明该数据为日志信息
a2.sinks.k1.hdfs.filePrefix=logs-
# 是否按照时间进行滚动文件夹,对应上面配置的动态文件夹
a2.sinks.k1.hdfs.round=true
a2.sinks.k1.hdfs.roundValue=1
# 设置时间量级为秒、分钟还是小时
a2.sinks.k1.hdfs.roundUnit=hour
# 是否使用本地时间戳,我测试了一下,如果这里设置成false,HDFS目录不会发生变化,可能是hdfs配置的动态时间路径就是按照时间戳配置的
a2.sinks.k1.hdfs.useLocalTimeStamp=true
# 积攒多少个Event才会flush到HDFS
a2.sinks.k1.hdfs.batchSize=1000
# 文件类型
a2.sinks.k1.hdfs.fileType=DataStream
# 多久会生成一个新的文件,如果监听的文件没有变化,尽管到了时间也不会产生新文件,产生新文件需要事件去触发
a2.sinks.k1.hdfs.rollInterval=30
# 每个文件大小达到多少会进行滚动
a2.sinks.k1.hdfs.rollSize=134217700
# 设置文件的滚动与事件无关
a2.sinks.k1.hdfs.rollCount=0
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1      

Flume3

a3.sources = r1
a3.sinks = k1
a3.channels = c1
a3.sources.r1.type = avro
a3.sources.r1.bind=hadoop102
a3.sources.r1.port=4142
a3.sinks.k1.type = file_roll
a3.sinks.k1.sink.directory = /home/hadoop/data4/flume
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1      

这样当我们向data文件中追加数据,整个flume流程就将我们追加的数据以副本机制分别传输到HDFS和本地文件系统。


目录
相关文章
|
消息中间件 数据采集 存储
大数据数据采集的数据采集(收集/聚合)的Flume之数据采集流程的Channel的Kafka Channel
在大数据处理和管理中,数据采集是非常重要的一环。为了更加高效地进行数据采集,Flume作为一种流式数据采集工具得到了广泛的应用。其中,Flume的Channel模块是实现数据缓存和传输的核心模块之一。本文将介绍Flume中的Kafka Channel,讲解其数据采集流程。
146 0
|
数据采集 存储 缓存
大数据数据采集的数据采集(收集/聚合)的Flume之数据采集流程的Channel的File Channel
在大数据处理和管理中,数据采集是非常重要的一环。为了更加高效地进行数据采集,Flume作为一种流式数据采集工具得到了广泛的应用。其中,Flume的Channel模块是实现数据缓存和传输的核心模块之一。本文将介绍Flume中的File Channel,讲解其数据采集流程。
87 0
|
数据采集 存储 缓存
大数据数据采集的数据采集(收集/聚合)的Flume之数据采集流程的Channel的JDBC Channel
在大数据处理和管理中,数据采集是非常重要的一环。为了更加高效地进行数据采集,Flume作为一种流式数据采集工具得到了广泛的应用。其中,Flume的Channel模块是实现数据缓存和传输的核心模块之一。本文将介绍Flume中的JDBC Channel,讲解其数据采集流程。
193 0
|
存储 数据采集 大数据
大数据数据采集的数据采集(收集/聚合)的Flume之数据采集流程的Channel的Memory Channel
在Flume中,Channel是用于存储从Source采集的数据并传输至Sink的组件。Memory Channel是其中一种常见的Channel类型。它将事件存储在内存中,并提供快速的读写和处理能力。本文将介绍Memory Channel的配置和数据传输流程。
118 0
|
存储 数据采集 消息中间件
大数据数据采集的数据采集(收集/聚合)的Flume之基本组件的Sink:从Channel中取数据
在Flume中,Sink是数据采集和传输过程中的最终组件。它负责从Channel缓冲区中获取数据并将其存储到目标存储系统中。
219 0
|
存储 数据采集 缓存
大数据数据采集的数据采集(收集/聚合)的Flume之基本组件的Channel:临时存储数据的管道
在Flume中,Channel是数据采集和传输过程中的一个重要组件。它负责存储从Source获取的数据,并将其转发给Sink进行处理和存储。
111 0
|
中间件
【Flume中间件】(14)自定义Sink
【Flume中间件】(14)自定义Sink
74 0
|
中间件 Java 数据库连接
【Flume中间件】(13)自定义Source
【Flume中间件】(13)自定义Source
121 0
|
中间件
【Flume中间件】(12)自定义拦截器
【Flume中间件】(12)自定义拦截器
77 0
|
监控 中间件
【Flume中间件】(11)聚合组
【Flume中间件】(11)聚合组
54 0