大数据技术之 Flume4

本文涉及的产品
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
日志服务 SLS,月写入数据量 50GB 1个月
简介: 大数据技术之 Flume4

为 hadoop103 上的 Flume4 配置一个 avro source 和一个 logger sink。

a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = avro 
a1.sources.r1.bind = hadoop103
a1.sources.r1.port = 4141
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sinks.k1.channel = c1
a1.sources.r1.channels = c1

为 hadoop104 上的 Flume3 配置一个 avro source 和一个 logger sink。

a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop104
a1.sources.r1.port = 4242
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sinks.k1.channel = c1
a1.sources.r1.channels = c1

(4)分别在 hadoop102,hadoop103,hadoop104 上启动 flume 进程,注意先后顺序。


(5)在 hadoop102 使用 netcat 向 localhost:44444 发送字母和数字。


(6)观察 hadoop103 和 hadoop104 打印的日志。


3.6 自定义 Source

1)介绍

Source 是负责接收数据到 Flume Agent 的组件。Source 组件可以处理各种类型、各种格式的日志数据,包括 avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy。官方提供的 source 类型已经很多,但是有时候并不能满足实际开发当中的需求,此时我们就需要根据实际需求自定义某些 source。


官方也提供了自定义 source 的接口:


https://flume.apache.org/FlumeDeveloperGuide.html#source 根据官方说明自定义


MySource 需要继承 AbstractSource 类并实现 Configurable 和 PollableSource 接口。


实现相应方法:


getBackOffSleepIncrement() //backoff 步长


getMaxBackOffSleepInterval()//backoff 最长时间


configure(Context context)//初始化 context(读取配置文件内容)


process()//获取数据封装成 event 并写入 channel,这个方法将被循环调用。


使用场景:读取 MySQL 数据或者其他文件系统。


2)需求


使用 flume 接收数据,并给每条数据添加前缀,输出到控制台。前缀可从 flume 配置文件中配置。


自定义Source需求

3)分析

自定义Source需求分析


4)编码

(1)导入 pom 依赖

<dependencies>
 <dependency>
 <groupId>org.apache.flume</groupId>
 <artifactId>flume-ng-core</artifactId>
 <version>1.9.0</version>
</dependency>

(2)编写代码

package com.atguigu;
import org.apache.flume.Context;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.source.AbstractSource;
import java.util.HashMap;
public class MySource extends AbstractSource implements 
Configurable, PollableSource {
     //定义配置文件将来要读取的字段
     private Long delay;
     private String field;
     //初始化配置信息
     @Override
     public void configure(Context context) {
         delay = context.getLong("delay");
         field = context.getString("field", "Hello!");
     }
     @Override
     public Status process() throws EventDeliveryException {
         try {
             //创建事件头信息
             HashMap<String, String> hearderMap = new HashMap<>();
             //创建事件
             SimpleEvent event = new SimpleEvent();
             //循环封装事件
             for (int i = 0; i < 5; i++) {
                 //给事件设置头信息
                 event.setHeaders(hearderMap);
                 //给事件设置内容
                 event.setBody((field + i).getBytes());
                 //将事件写入 channel
                 getChannelProcessor().processEvent(event);
                 Thread.sleep(delay);
             }
         } catch (Exception e) {
             e.printStackTrace(); 
             return Status.BACKOFF;
         }
         return Status.READY;
     }
     @Override
     public long getBackOffSleepIncrement() {
         return 0;
     }
     @Override
     public long getMaxBackOffSleepInterval() {
         return 0;
     }
}

5)测试

(1)打包

将写好的代码打包,并放到 flume 的 lib 目录(/opt/module/flume)下。

(2)配置文件

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = com.atguigu.MySource
a1.sources.r1.delay = 1000
#a1.sources.r1.field = atguigu
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1


(3)开启任务

[atguigu@hadoop102 flume]$ pwd
/opt/module/flume
[atguigu@hadoop102 flume]$ bin/flume-ng agent -c conf/ -f 
job/mysource.conf -n a1 -Dflume.root.logger=INFO,console

(4)结果展示

3.7 自定义 Sink

1)介绍

Sink 不断地轮询 Channel 中的事件且批量地移除它们,并将这些事件批量写入到存储或索引系统、或者被发送到另一个 Flume Agent。


Sink 是完全事务性的。在从 Channel 批量删除数据之前,每个 Sink 用 Channel 启动一个事务。批量事件一旦成功写出到存储系统或下一个 Flume Agent,Sink 就利用 Channel 提交事务。事务一旦被提交,该 Channel 从自己的内部缓冲区删除事件。


Sink 组件目的地包括 hdfs、logger、avro、thrift、ipc、file、null、HBase、solr、自定义。官方提供的 Sink 类型已经很多,但是有时候并不能满足实际开发当中的需求,此时我们就需要根据实际需求自定义某些 Sink。


官方也提供了自定义 sink 的接口:


https://flume.apache.org/FlumeDeveloperGuide.html#sink 根据官方说明自定义MySink 需要继承 AbstractSink 类并实现 Configurable 接口。


实现相应方法:


configure(Context context)//初始化 context(读取配置文件内容)


process()//从 Channel 读取获取数据(event),这个方法将被循环调用。


使用场景:读取 Channel 数据写入 MySQL 或者其他文件系统。


2)需求

使用 flume 接收数据,并在 Sink 端给每条数据添加前缀和后缀,输出到控制台。前后缀可在 flume 任务配置文件中配置。

流程分析:


3)编码

package com.atguigu;
import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MySink extends AbstractSink implements Configurable {
     //创建 Logger 对象
     private static final Logger LOG = 
     LoggerFactory.getLogger(AbstractSink.class);
     private String prefix;
     private String suffix;
     @Override
     public Status process() throws EventDeliveryException {
         //声明返回值状态信息
         Status status;
         //获取当前 Sink 绑定的 Channel
         Channel ch = getChannel();
         //获取事务
         Transaction txn = ch.getTransaction();
         //声明事件
         Event event;
         //开启事务
         txn.begin(); 
         //读取 Channel 中的事件,直到读取到事件结束循环
         while (true) {
         event = ch.take();
         if (event != null) {
         break;
       }
     }
     try {
         //处理事件(打印)
         LOG.info(prefix + new String(event.getBody()) + suffix);
         //事务提交
         txn.commit();
         status = Status.READY;
     } catch (Exception e) {
         //遇到异常,事务回滚
         txn.rollback();
         status = Status.BACKOFF;
     } finally {
         //关闭事务
         txn.close();
     }
         return status;
     }
     @Override
     public void configure(Context context) {
         //读取配置文件内容,有默认值
         prefix = context.getString("prefix", "hello:");
         //读取配置文件内容,无默认值
         suffix = context.getString("suffix");
      }
}

4)测试

(1)打包

将写好的代码打包,并放到 flume 的 lib 目录(/opt/module/flume)下。

(2)配置文件

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444 
# Describe the sink
a1.sinks.k1.type = com.atguigu.MySink
#a1.sinks.k1.prefix = atguigu:
a1.sinks.k1.suffix = :atguigu
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1


(3)开启任务

[atguigu@hadoop102 flume]$ bin/flume-ng agent -c conf/ -f 
job/mysink.conf -n a1 -Dflume.root.logger=INFO,console
[atguigu@hadoop102 ~]$ nc localhost 44444
hello
OK
atguigu
OK

(4)结果展示

3.8 Flume 数据流监控

3.8.1 Ganglia 的安装与部署

Ganglia 由 gmond、gmetad 和 gweb 三部分组成。gmond(Ganglia Monitoring Daemon)是一种轻量级服务,安装在每台需要收集指标数据的节点主机上。使用 gmond,你可以很容易收集很多系统指标数据,如 CPU、内存、磁盘、网络和活跃进程的数据等。


gmetad(Ganglia Meta Daemon)整合所有信息,并将其以 RRD 格式存储至磁盘的服务。gweb(Ganglia Web)Ganglia 可视化工具,gweb 是一种利用浏览器显示 gmetad 所存储数据的 PHP 前端。在 Web 界面中以图表方式展现集群的运行状态下收集的多种不同指标数据。


1)安装 ganglia

(1)规划

hadoop102: web gmetad gmod
hadoop103: gmod
hadoop104: gmod

(2)在 102 103 104 分别安装 epel-release

[atguigu@hadoop102 flume]$ sudo yum -y install epel-release

(3)在 102 安装

[atguigu@hadoop102 flume]$ sudo yum -y install ganglia-gmetad 
[atguigu@hadoop102 flume]$ sudo yum -y install ganglia-web
[atguigu@hadoop102 flume]$ sudo yum -y install ganglia-gmond


(4)在 103 和 104 安装

[atguigu@hadoop102 flume]$ sudo yum -y install ganglia-gmond

2)在 102 修改配置文件/etc/httpd/conf.d/ganglia.conf

[atguigu@hadoop102 flume]$ sudo vim

/etc/httpd/conf.d/ganglia.conf

修改为红颜色的配置:

# Ganglia monitoring system php web frontend
#
Alias /ganglia /usr/share/ganglia
<Location /ganglia>
 # Require local
# 通过 windows 访问 ganglia,需要配置 Linux 对应的主机(windows)ip 地址
 Require ip 192.168.9.1
 # Require ip 10.1.2.3
 # Require host example.org
</Location>

5)在 102 修改配置文件/etc/ganglia/gmetad.conf


[atguigu@hadoop102 flume]$ sudo vim /etc/ganglia/gmetad.conf


修改为:data_source "my cluster" hadoop102


6)在 102 103 104 修改配置文件/etc/ganglia/gmond.conf


[atguigu@hadoop102 flume]$ sudo vim /etc/ganglia/gmond.conf


修改为:

cluster {
 name = "my cluster"
 owner = "unspecified"
 latlong = "unspecified"
 url = "unspecified"
}
udp_send_channel {
 #bind_hostname = yes # Highly recommended, soon to be default.
 # This option tells gmond to use a source 
address
 # that resolves to the machine's hostname. 
Without
 # this, the metrics may appear to come from 
any
 # interface and the DNS names associated with
 # those IPs will be used to create the RRDs.
 # mcast_join = 239.2.11.71
 # 数据发送给 hadoop102
 host = hadoop102
 port = 8649
 ttl = 1
} 
udp_recv_channel {
 # mcast_join = 239.2.11.71
 port = 8649
# 接收来自任意连接的数据
 bind = 0.0.0.0
 retry_bind = true
 # Size of the UDP buffer. If you are handling lots of metrics 
you really
 # should bump it up to e.g. 10MB or even higher.
 # buffer = 10485760
}

7)在 102 修改配置文件/etc/selinux/config

[atguigu@hadoop102 flume]$ sudo vim /etc/selinux/config

修改为:

# This file controls the state of SELinux on the system.
# SELINUX= can take one of these three values:
# enforcing - SELinux security policy is enforced.
# permissive - SELinux prints warnings instead of enforcing.
# disabled - No SELinux policy is loaded.
SELINUX=disabled
# SELINUXTYPE= can take one of these two values:
# targeted - Targeted processes are protected,
# mls - Multi Level Security protection.
SELINUXTYPE=targeted

尖叫提示:selinux 生效需要重启,如果此时不想重启,可以临时生效之:

[atguigu@hadoop102 flume]$ sudo setenforce 0

8)启动 ganglia

(1)在 102 103 104 启动

[atguigu@hadoop102 flume]$ sudo systemctl start gmond

(2)在 102 启动

[atguigu@hadoop102 flume]$ sudo systemctl start httpd
[atguigu@hadoop102 flume]$ sudo systemctl start gmetad

9)打开网页浏览 ganglia 页面


http://hadoop102/ganglia


尖叫提示:如果完成以上操作依然出现权限不足错误,请修改/var/lib/ganglia 目录


的权限:


[atguigu@hadoop102 flume]$ sudo chmod -R 777 /var/lib/ganglia


3.8.2 操作 Flume 测试监控

1)启动 Flume 任务

[atguigu@hadoop102 flume]$ bin/flume-ng agent \
-c conf/ \
-n a1 \
-f job/flume-netcat-logger.conf \
-Dflume.root.logger=INFO,console \
-Dflume.monitoring.type=ganglia \
-Dflume.monitoring.hosts=hadoop102:8649 

2)发送数据观察 ganglia 监测图

[atguigu@hadoop102 flume]$ nc localhost 44444

样式如图:

图例说明:


第 4 章 企业真实面试题(重点)

4.1 你是如何实现 Flume 数据传输的监控的

使用第三方框架 Ganglia 实时监控 Flume。

4.2 Flume 的 Source,Sink,Channel 的作用?你们 Source 是什么类

型?

1)作用

(1)Source 组件是专门用来收集数据的,可以处理各种类型、各种格式的日志数据,包括 avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy


(2)Channel 组件对采集到的数据进行缓存,可以存放在 Memory 或 File 中。


(3)Sink 组件是用于把数据发送到目的地的组件,目的地包括 Hdfs、Logger、avro、thrift、ipc、file、Hbase、solr、自定义。


2)我公司采用的 Source 类型为:


(1)监控后台日志:exec


(2)监控后台产生日志的端口:netcat


4.3 Flume 的 Channel Selectors

Flume Channel Selectors

4.4 Flume 参数调优

1)Source

增加 Source 个(使用Tair Dir Source 时可增加 FileGroups 个数)可以增大 Source的读取数据的能力。例如:当某一个目录产生的文件过多时需要将这个文件目录拆分成多个文件目录,同时配置好多个 Source 以保证 Source 有足够的能力获取到新产生的数据。


batchSize 参数决定 Source 一次批量运输到 Channel 的 event 条数,适当调大这个参数可以提高 Source 搬运 Event 到 Channel 时的性能。


2)Channel


type 选择 memory 时 Channel 的性能最好,但是如果 Flume 进程意外挂掉可能会丢失数据。type 选择 file 时 Channel 的容错性更好,但是性能上会比 memory channel 差。


使用 file Channel 时 dataDirs 配置多个不同盘下的目录可以提高性能。


Capacity 参数决定 Channel 可容纳最大的 event 条数。transactionCapacity 参数决定每次 Source 往 channel 里面写的最大 event 条数和每次 Sink 从 channel 里面读的最大


event 条数。transactionCapacity 需要大于 Source 和 Sink 的 batchSize 参数。


3)Sink


增加 Sink 的个数可以增加 Sink 消费 event 的能力。Sink 也不是越多越好够用就行, 过多的 Sink 会占用系统资源,造成系统资源不必要的浪费。batchSize 参数决定 Sink 一次批量从 Channel 读取的 event 条数,适当调大这个参数可以提高 Sink 从 Channel 搬出 event 的性能。


4.5 Flume 的事务机制

Flume 的事务机制(类似数据库的事务机制):Flume 使用两个独立的事务分别负责从Soucrce 到 Channel,以及从 Channel 到 Sink 的事件传递。


比如 spooling directory source 为文件的每一行创建一个事件,一旦事务中所有的事件全部传递到 Channel 且提交成功,那么 Soucrce 就将该文件标记为完成。同理,事务以类似的方式处理从 Channel 到 Sink 的传递过程,如果因为某种原因使得事件无法记录,那么事务将会回滚。且所有的事件都会保持到 Channel 中,等待重新传递。


4.6 Flume 采集数据会丢失吗?

根据 Flume 的架构原理,Flume 是不可能丢失数据的,其内部有完善的事务机制,Source 到 Channel 是事务性的,Channel 到 Sink 是事务性的,因此这两个环节不会出现数据的丢失,唯一可能丢失数据的情况是 Channel 采用 memoryChannel,agent 宕机导致数据丢失,或者 Channel 存储数据已满,导致 Source 不再写入,未写入的数据丢失。


Flume 不会丢失数据,但是有可能造成数据的重复,例如数据已经成功由 Sink 发出,但是没有接收到响应,Sink 会再次发送数据,此时可能会导致数据的重复


相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
目录
相关文章
|
1月前
|
存储 机器学习/深度学习 分布式计算
大数据技术——解锁数据的力量,引领未来趋势
【10月更文挑战第5天】大数据技术——解锁数据的力量,引领未来趋势
|
10天前
|
存储 机器学习/深度学习 SQL
大数据处理与分析技术
大数据处理与分析技术
44 2
|
29天前
|
存储 分布式计算 数据可视化
大数据常用技术与工具
【10月更文挑战第16天】
101 4
|
12天前
|
存储 分布式计算 NoSQL
【赵渝强老师】大数据技术的理论基础
本文介绍了大数据平台的核心思想,包括Google的三篇重要论文:Google文件系统(GFS)、MapReduce分布式计算模型和BigTable大表。这些论文奠定了大数据生态圈的技术基础,进而发展出了Hadoop、Spark和Flink等生态系统。文章详细解释了GFS的架构、MapReduce的计算过程以及BigTable的思想和HBase的实现。
|
1月前
|
存储 数据采集 监控
大数据技术:开启智能决策与创新服务的新纪元
【10月更文挑战第5天】大数据技术:开启智能决策与创新服务的新纪元
|
6天前
|
机器学习/深度学习 存储 大数据
云计算与大数据技术的融合应用
云计算与大数据技术的融合应用
|
12天前
|
SQL 存储 算法
比 SQL 快出数量级的大数据计算技术
SQL 是大数据计算中最常用的工具,但在实际应用中,SQL 经常跑得很慢,浪费大量硬件资源。例如,某银行的反洗钱计算在 11 节点的 Vertica 集群上跑了 1.5 小时,而用 SPL 重写后,单机只需 26 秒。类似地,电商漏斗运算和时空碰撞任务在使用 SPL 后,性能也大幅提升。这是因为 SQL 无法写出低复杂度的算法,而 SPL 提供了更强大的数据类型和基础运算,能够实现高效计算。
|
15天前
|
存储 大数据 定位技术
大数据 数据索引技术
【10月更文挑战第26天】
37 3
|
15天前
|
存储 大数据 OLAP
大数据数据分区技术
【10月更文挑战第26天】
49 2
|
18天前
|
消息中间件 分布式计算 大数据
数据为王:大数据处理与分析技术在企业决策中的力量
【10月更文挑战第29天】在信息爆炸的时代,大数据处理与分析技术为企业提供了前所未有的洞察力和决策支持。本文探讨了大数据技术在企业决策中的重要性和实际应用,包括数据的力量、实时分析、数据驱动的决策以及数据安全与隐私保护。通过这些技术,企业能够从海量数据中提取有价值的信息,预测市场趋势,优化业务流程,从而在竞争中占据优势。
60 2
下一篇
无影云桌面