Flume(二)【Flume 进阶使用】(3)

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: Flume(二)【Flume 进阶使用】

Flume(二)【Flume 进阶使用】(2)https://developer.aliyun.com/article/1532353

2)需求实现

flume-log-flume.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
 
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/group.log
a1.sources.r1.shell = /bin/bash -c
 
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop104
a1.sinks.k1.port = 4141
 
# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
 
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
flume-nc-flume.conf
# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1
 
# Describe/configure the source
a2.sources.r1.type = netcat
a2.sources.r1.bind = hadoop103
a2.sources.r1.port = 44444
 
# Describe the sink
a2.sinks.k1.type = avro
a2.sinks.k1.hostname = hadoop104
a2.sinks.k1.port = 4141
 
# Use a channel which buffers events in memory
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
 
# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
flume-flume-log.conf
# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c1
 
# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop104
a3.sources.r1.port = 4141
 
# Describe the sink
a3.sinks.k1.type = logger
 
# Describe the channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100
 
# Bind the source and sink to the channel
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1

3)测试

向 group.log 文件中追加文本:

注意:hadoop103 这里不能写 nc localhost 44444 而要写 nc hadoop103 44444! 否则报错:Ncat: Connection refused.

5、自定义 Interceptor

前面我们的多路复用还没有实现,因为我们说多路复用必须配合拦截器来使用,因为我们必须知道每个 Channel 发往哪些 Sink,这需要拦截器往 Event Header 中写一些内容。

1)案例需求

使用 Flume 采集服务器本地日志,需要按照日志类型的不同,将不同种类的日志发往不同的分析系统。

2)需求分析

在实际的开发中,一台服务器产生的日志类型可能有很多种,不同类型的日志可能需要发送到不同的分析系统。此时会用到 Flume 拓扑结构中的 Multiplexing 结构,Multiplexing 的原理是,根据 event 中 Header 的某个 key 的值,将不同的 event 发送到不同的 Channel中,所以我们需要自定义一个 Interceptor,为不同类型的 event 的 Header 中的 key 赋予不同的值。

在该案例中,我们以端口数据模拟日志,以是否包含”lyh”模拟不同类型的日志,我们需要自定义 interceptor 区分数据中是否包含”lyh”,将其分别发往不同的分析系统(Channel)。

3)需求实现

自定义拦截器

引入 flume 依赖

<dependency>
 <groupId>org.apache.flume</groupId>
 <artifactId>flume-ng-core</artifactId>
 <version>1.9.0</version>
</dependency>
package com.lyh.gmall.interceptor;
 
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
 
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
 
 
public class TypeInterceptor implements Interceptor {
 
    // 存放事件集合
    private List<Event> addHeaderEvents;
 
 
    @Override
    public void initialize() {
        // 初始化存放事件的集合
        addHeaderEvents = new ArrayList<>();
    }
 
    // 单个事件拦截
    @Override
    public Event intercept(Event event) {
 
        // 1. 获取事件中的 header 信息
        Map<String, String> headers = event.getHeaders();
 
        // 2. 获取事件中的 body 信息
        String body = new String(event.getBody());
 
        // 3. 根据 body 中是否包含 'lyh' 来决定发往哪个 sink
        if (body.contains("lyh"))
            headers.put("type","first");
        else
            headers.put("type","second");
 
        return event;
    }
 
    // 批量事件拦截
    @Override
    public List<Event> intercept(List<Event> events) {
 
        // 1. 清空集合
        addHeaderEvents.clear();
 
        // 2. 遍历 events
        for (Event event : events) {
            // 3. 给每个事件添加头信息
            addHeaderEvents.add(intercept(event));
        }
 
        return addHeaderEvents;
    }
 
    @Override
    public void close() {
    }
 
    public static class Builder implements Interceptor.Builder{
 
        @Override
        public Interceptor build() {
            return new TypeInterceptor();
        }
 
        @Override
        public void configure(Context context) {
 
        }
    }
}

打包放到 flume 安装目录的 lib 目录下:

 

flume 作业配置

hadoop102:

# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
 
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.lyh.interceptor.TypeInterceptor$Builder
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = type
a1.sources.r1.selector.mapping.first = c1 # 包含 'lyh'
a1.sources.r1.selector.mapping.second = c2 # 不包含 'lyh'
 
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop103
a1.sinks.k1.port = 4141
a1.sinks.k2.type=avro
a1.sinks.k2.hostname = hadoop104
a1.sinks.k2.port = 4242
 
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
 
# Use a channel which buffers events in memory
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
 
# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2

hadoop103:

a1.sources = r1
a1.sinks = k1
a1.channels = c1
 
a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop103
a1.sources.r1.port = 4141
 
a1.sinks.k1.type = logger
 
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
 
a1.sinks.k1.channel = c1
a1.sources.r1.channels = c1

hadoop104:

a1.sources = r1
a1.sinks = k1
a1.channels = c1
 
a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop104
a1.sources.r1.port = 4242
 
a1.sinks.k1.type = logger
 
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
 
a1.sinks.k1.channel = c1
a1.sources.r1.channels = c1

4)需求实现

#hadoop103
bin/flume-ng agent -n a1 -c conf/ -f job/group4/flume2.conf -Dflume.root.logger=INFO,console
 
#hadoop104
bin/flume-ng agent -n a1 -c conf/ -f job/group4/flume3.conf -Dflume.root.logger=INFO,console
 
#hadoop102
bin/flume-ng agent -n a1 -c conf/ -f job/group4/flume1.conf
nc localhost 44444

hadoop102:

hadoop103:

hadoop104:

可以看到,从 hadoop102 发送的日志中,包含 "lyh" 的都被发往 hadoop103 的 4141 端口,其它日志则被发往 hadoop104 的 4242端口。

Flume(二)【Flume 进阶使用】(4)https://developer.aliyun.com/article/1532357

相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
相关文章
|
6月前
|
消息中间件 存储 关系型数据库
Flume(二)【Flume 进阶使用】(4)
Flume(二)【Flume 进阶使用】
|
6月前
|
监控 负载均衡
Flume(二)【Flume 进阶使用】(2)
Flume(二)【Flume 进阶使用】
|
6月前
|
SQL 存储 负载均衡
Flume(二)【Flume 进阶使用】(1)
Flume(二)【Flume 进阶使用】
|
SQL 存储 分布式计算
Flume学习---2、Flume进阶(事务)、负载均衡、故障转移、聚合
Flume学习---2、Flume进阶(事务)、负载均衡、故障转移、聚合
|
监控 负载均衡
Flume学习---2、Flume进阶(事务)、负载均衡、故障转移、聚合(二)
Flume学习---2、Flume进阶(事务)、负载均衡、故障转移、聚合(二)
|
SQL 存储 分布式计算
Flume学习---2、Flume进阶(事务)、负载均衡、故障转移、聚合(一)
Flume学习---2、Flume进阶(事务)、负载均衡、故障转移、聚合(一)
|
7月前
|
存储 分布式计算 监控
【Flume】Flume 监听日志文件案例分析
【4月更文挑战第4天】【Flume】Flume 监听日志文件案例分析
|
7月前
|
存储 运维 监控
【Flume】flume 日志管理中的应用
【4月更文挑战第4天】【Flume】flume 日志管理中的应用
|
消息中间件 数据采集 SQL
1、电商数仓(用户行为采集平台)数据仓库概念、用户行为日志、业务数据、模拟数据、用户行为数据采集模块、日志采集Flume(一)
1、电商数仓(用户行为采集平台)数据仓库概念、用户行为日志、业务数据、模拟数据、用户行为数据采集模块、日志采集Flume(一)
|
4月前
|
存储 数据采集 数据处理
【Flume拓扑揭秘】掌握Flume的四大常用结构,构建强大的日志收集系统!
【8月更文挑战第24天】Apache Flume是一个强大的工具,专为大规模日志数据的收集、聚合及传输设计。其核心架构包括源(Source)、通道(Channel)与接收器(Sink)。Flume支持多样化的拓扑结构以适应不同需求,包括单层、扇入(Fan-in)、扇出(Fan-out)及复杂多层拓扑。单层拓扑简单直观,适用于单一数据流场景;扇入结构集中处理多源头数据;扇出结构则实现数据多目的地分发;复杂多层拓扑提供高度灵活性,适合多层次数据处理。通过灵活配置,Flume能够高效构建各种规模的数据收集系统。
89 0