为 hadoop103 上的 Flume4 配置一个 avro source 和一个 logger sink。
a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = avro a1.sources.r1.bind = hadoop103 a1.sources.r1.port = 4141 a1.sinks.k1.type = logger a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sinks.k1.channel = c1 a1.sources.r1.channels = c1
为 hadoop104 上的 Flume3 配置一个 avro source 和一个 logger sink。
a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = avro a1.sources.r1.bind = hadoop104 a1.sources.r1.port = 4242 a1.sinks.k1.type = logger a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sinks.k1.channel = c1 a1.sources.r1.channels = c1
(4)分别在 hadoop102,hadoop103,hadoop104 上启动 flume 进程,注意先后顺序。
(5)在 hadoop102 使用 netcat 向 localhost:44444 发送字母和数字。
(6)观察 hadoop103 和 hadoop104 打印的日志。
3.6 自定义 Source
Source 是负责接收数据到 Flume Agent 的组件。Source 组件可以处理各种类型、各种格式的日志数据,包括 avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy。官方提供的 source 类型已经很多,但是有时候并不能满足实际开发当中的需求,此时我们就需要根据实际需求自定义某些 source。
官方也提供了自定义 source 的接口:
https://flume.apache.org/FlumeDeveloperGuide.html#source 根据官方说明自定义
MySource 需要继承 AbstractSource 类并实现 Configurable 和 PollableSource 接口。
getBackOffSleepIncrement() //backoff 步长
getMaxBackOffSleepInterval()//backoff 最长时间
configure(Context context)//初始化 context(读取配置文件内容)
process()//获取数据封装成 event 并写入 channel,这个方法将被循环调用。
使用场景:读取 MySQL 数据或者其他文件系统。
使用 flume 接收数据,并给每条数据添加前缀,输出到控制台。前缀可从 flume 配置文件中配置。
(1)导入 pom 依赖
<dependencies> <dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-core</artifactId> <version>1.9.0</version> </dependency>
package com.atguigu; import org.apache.flume.Context; import org.apache.flume.EventDeliveryException; import org.apache.flume.PollableSource; import org.apache.flume.conf.Configurable; import org.apache.flume.event.SimpleEvent; import org.apache.flume.source.AbstractSource; import java.util.HashMap; public class MySource extends AbstractSource implements Configurable, PollableSource { //定义配置文件将来要读取的字段 private Long delay; private String field; //初始化配置信息 @Override public void configure(Context context) { delay = context.getLong("delay"); field = context.getString("field", "Hello!"); } @Override public Status process() throws EventDeliveryException { try { //创建事件头信息 HashMap<String, String> hearderMap = new HashMap<>(); //创建事件 SimpleEvent event = new SimpleEvent(); //循环封装事件 for (int i = 0; i < 5; i++) { //给事件设置头信息 event.setHeaders(hearderMap); //给事件设置内容 event.setBody((field + i).getBytes()); //将事件写入 channel getChannelProcessor().processEvent(event); Thread.sleep(delay); } } catch (Exception e) { e.printStackTrace(); return Status.BACKOFF; } return Status.READY; } @Override public long getBackOffSleepIncrement() { return 0; } @Override public long getMaxBackOffSleepInterval() { return 0; } }
将写好的代码打包,并放到 flume 的 lib 目录(/opt/module/flume)下。
# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = com.atguigu.MySource a1.sources.r1.delay = 1000 #a1.sources.r1.field = atguigu # Describe the sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
[atguigu@hadoop102 flume]$ pwd /opt/module/flume [atguigu@hadoop102 flume]$ bin/flume-ng agent -c conf/ -f job/mysource.conf -n a1 -Dflume.root.logger=INFO,console
3.7 自定义 Sink
Sink 不断地轮询 Channel 中的事件且批量地移除它们,并将这些事件批量写入到存储或索引系统、或者被发送到另一个 Flume Agent。
Sink 是完全事务性的。在从 Channel 批量删除数据之前,每个 Sink 用 Channel 启动一个事务。批量事件一旦成功写出到存储系统或下一个 Flume Agent,Sink 就利用 Channel 提交事务。事务一旦被提交,该 Channel 从自己的内部缓冲区删除事件。
Sink 组件目的地包括 hdfs、logger、avro、thrift、ipc、file、null、HBase、solr、自定义。官方提供的 Sink 类型已经很多,但是有时候并不能满足实际开发当中的需求,此时我们就需要根据实际需求自定义某些 Sink。
官方也提供了自定义 sink 的接口:
https://flume.apache.org/FlumeDeveloperGuide.html#sink 根据官方说明自定义MySink 需要继承 AbstractSink 类并实现 Configurable 接口。
configure(Context context)//初始化 context(读取配置文件内容)
process()//从 Channel 读取获取数据(event),这个方法将被循环调用。
使用场景:读取 Channel 数据写入 MySQL 或者其他文件系统。
使用 flume 接收数据,并在 Sink 端给每条数据添加前缀和后缀,输出到控制台。前后缀可在 flume 任务配置文件中配置。
package com.atguigu; import org.apache.flume.*; import org.apache.flume.conf.Configurable; import org.apache.flume.sink.AbstractSink; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class MySink extends AbstractSink implements Configurable { //创建 Logger 对象 private static final Logger LOG = LoggerFactory.getLogger(AbstractSink.class); private String prefix; private String suffix; @Override public Status process() throws EventDeliveryException { //声明返回值状态信息 Status status; //获取当前 Sink 绑定的 Channel Channel ch = getChannel(); //获取事务 Transaction txn = ch.getTransaction(); //声明事件 Event event; //开启事务 txn.begin(); //读取 Channel 中的事件,直到读取到事件结束循环 while (true) { event = ch.take(); if (event != null) { break; } } try { //处理事件(打印) LOG.info(prefix + new String(event.getBody()) + suffix); //事务提交 txn.commit(); status = Status.READY; } catch (Exception e) { //遇到异常,事务回滚 txn.rollback(); status = Status.BACKOFF; } finally { //关闭事务 txn.close(); } return status; } @Override public void configure(Context context) { //读取配置文件内容,有默认值 prefix = context.getString("prefix", "hello:"); //读取配置文件内容,无默认值 suffix = context.getString("suffix"); } }
将写好的代码打包,并放到 flume 的 lib 目录(/opt/module/flume)下。
# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 # Describe the sink a1.sinks.k1.type = com.atguigu.MySink #a1.sinks.k1.prefix = atguigu: a1.sinks.k1.suffix = :atguigu # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
[atguigu@hadoop102 flume]$ bin/flume-ng agent -c conf/ -f job/mysink.conf -n a1 -Dflume.root.logger=INFO,console [atguigu@hadoop102 ~]$ nc localhost 44444 hello OK atguigu OK
3.8 Flume 数据流监控
3.8.1 Ganglia 的安装与部署
Ganglia 由 gmond、gmetad 和 gweb 三部分组成。gmond(Ganglia Monitoring Daemon)是一种轻量级服务,安装在每台需要收集指标数据的节点主机上。使用 gmond,你可以很容易收集很多系统指标数据,如 CPU、内存、磁盘、网络和活跃进程的数据等。
gmetad(Ganglia Meta Daemon)整合所有信息,并将其以 RRD 格式存储至磁盘的服务。gweb(Ganglia Web)Ganglia 可视化工具,gweb 是一种利用浏览器显示 gmetad 所存储数据的 PHP 前端。在 Web 界面中以图表方式展现集群的运行状态下收集的多种不同指标数据。
1)安装 ganglia
hadoop102: web gmetad gmod hadoop103: gmod hadoop104: gmod
(2)在 102 103 104 分别安装 epel-release
[atguigu@hadoop102 flume]$ sudo yum -y install epel-release
(3)在 102 安装
[atguigu@hadoop102 flume]$ sudo yum -y install ganglia-gmetad [atguigu@hadoop102 flume]$ sudo yum -y install ganglia-web [atguigu@hadoop102 flume]$ sudo yum -y install ganglia-gmond
(4)在 103 和 104 安装
[atguigu@hadoop102 flume]$ sudo yum -y install ganglia-gmond
2)在 102 修改配置文件/etc/httpd/conf.d/ganglia.conf
[atguigu@hadoop102 flume]$ sudo vim
# Ganglia monitoring system php web frontend # Alias /ganglia /usr/share/ganglia <Location /ganglia> # Require local # 通过 windows 访问 ganglia,需要配置 Linux 对应的主机(windows)ip 地址 Require ip # Require ip # Require host example.org </Location>
5)在 102 修改配置文件/etc/ganglia/gmetad.conf
[atguigu@hadoop102 flume]$ sudo vim /etc/ganglia/gmetad.conf
修改为:data_source "my cluster" hadoop102
6)在 102 103 104 修改配置文件/etc/ganglia/gmond.conf
[atguigu@hadoop102 flume]$ sudo vim /etc/ganglia/gmond.conf
cluster { name = "my cluster" owner = "unspecified" latlong = "unspecified" url = "unspecified" } udp_send_channel { #bind_hostname = yes # Highly recommended, soon to be default. # This option tells gmond to use a source address # that resolves to the machine's hostname. Without # this, the metrics may appear to come from any # interface and the DNS names associated with # those IPs will be used to create the RRDs. # mcast_join = # 数据发送给 hadoop102 host = hadoop102 port = 8649 ttl = 1 } udp_recv_channel { # mcast_join = port = 8649 # 接收来自任意连接的数据 bind = retry_bind = true # Size of the UDP buffer. If you are handling lots of metrics you really # should bump it up to e.g. 10MB or even higher. # buffer = 10485760 }
7)在 102 修改配置文件/etc/selinux/config
[atguigu@hadoop102 flume]$ sudo vim /etc/selinux/config
# This file controls the state of SELinux on the system. # SELINUX= can take one of these three values: # enforcing - SELinux security policy is enforced. # permissive - SELinux prints warnings instead of enforcing. # disabled - No SELinux policy is loaded. SELINUX=disabled # SELINUXTYPE= can take one of these two values: # targeted - Targeted processes are protected, # mls - Multi Level Security protection. SELINUXTYPE=targeted
尖叫提示:selinux 生效需要重启,如果此时不想重启,可以临时生效之:
[atguigu@hadoop102 flume]$ sudo setenforce 0
8)启动 ganglia
(1)在 102 103 104 启动
[atguigu@hadoop102 flume]$ sudo systemctl start gmond
(2)在 102 启动
[atguigu@hadoop102 flume]$ sudo systemctl start httpd [atguigu@hadoop102 flume]$ sudo systemctl start gmetad
9)打开网页浏览 ganglia 页面
尖叫提示:如果完成以上操作依然出现权限不足错误,请修改/var/lib/ganglia 目录
[atguigu@hadoop102 flume]$ sudo chmod -R 777 /var/lib/ganglia
3.8.2 操作 Flume 测试监控
1)启动 Flume 任务
[atguigu@hadoop102 flume]$ bin/flume-ng agent \ -c conf/ \ -n a1 \ -f job/flume-netcat-logger.conf \ -Dflume.root.logger=INFO,console \ -Dflume.monitoring.type=ganglia \ -Dflume.monitoring.hosts=hadoop102:8649
2)发送数据观察 ganglia 监测图
[atguigu@hadoop102 flume]$ nc localhost 44444
第 4 章 企业真实面试题(重点)
4.1 你是如何实现 Flume 数据传输的监控的
使用第三方框架 Ganglia 实时监控 Flume。
4.2 Flume 的 Source,Sink,Channel 的作用?你们 Source 是什么类
(1)Source 组件是专门用来收集数据的,可以处理各种类型、各种格式的日志数据,包括 avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy
(2)Channel 组件对采集到的数据进行缓存,可以存放在 Memory 或 File 中。
(3)Sink 组件是用于把数据发送到目的地的组件,目的地包括 Hdfs、Logger、avro、thrift、ipc、file、Hbase、solr、自定义。
2)我公司采用的 Source 类型为:
4.3 Flume 的 Channel Selectors
Flume Channel Selectors
4.4 Flume 参数调优
增加 Source 个(使用Tair Dir Source 时可增加 FileGroups 个数)可以增大 Source的读取数据的能力。例如:当某一个目录产生的文件过多时需要将这个文件目录拆分成多个文件目录,同时配置好多个 Source 以保证 Source 有足够的能力获取到新产生的数据。
batchSize 参数决定 Source 一次批量运输到 Channel 的 event 条数,适当调大这个参数可以提高 Source 搬运 Event 到 Channel 时的性能。
type 选择 memory 时 Channel 的性能最好,但是如果 Flume 进程意外挂掉可能会丢失数据。type 选择 file 时 Channel 的容错性更好,但是性能上会比 memory channel 差。
使用 file Channel 时 dataDirs 配置多个不同盘下的目录可以提高性能。
Capacity 参数决定 Channel 可容纳最大的 event 条数。transactionCapacity 参数决定每次 Source 往 channel 里面写的最大 event 条数和每次 Sink 从 channel 里面读的最大
event 条数。transactionCapacity 需要大于 Source 和 Sink 的 batchSize 参数。
增加 Sink 的个数可以增加 Sink 消费 event 的能力。Sink 也不是越多越好够用就行, 过多的 Sink 会占用系统资源,造成系统资源不必要的浪费。batchSize 参数决定 Sink 一次批量从 Channel 读取的 event 条数,适当调大这个参数可以提高 Sink 从 Channel 搬出 event 的性能。
4.5 Flume 的事务机制
Flume 的事务机制(类似数据库的事务机制):Flume 使用两个独立的事务分别负责从Soucrce 到 Channel,以及从 Channel 到 Sink 的事件传递。
比如 spooling directory source 为文件的每一行创建一个事件,一旦事务中所有的事件全部传递到 Channel 且提交成功,那么 Soucrce 就将该文件标记为完成。同理,事务以类似的方式处理从 Channel 到 Sink 的传递过程,如果因为某种原因使得事件无法记录,那么事务将会回滚。且所有的事件都会保持到 Channel 中,等待重新传递。
4.6 Flume 采集数据会丢失吗?
根据 Flume 的架构原理,Flume 是不可能丢失数据的,其内部有完善的事务机制,Source 到 Channel 是事务性的,Channel 到 Sink 是事务性的,因此这两个环节不会出现数据的丢失,唯一可能丢失数据的情况是 Channel 采用 memoryChannel,agent 宕机导致数据丢失,或者 Channel 存储数据已满,导致 Source 不再写入,未写入的数据丢失。
Flume 不会丢失数据,但是有可能造成数据的重复,例如数据已经成功由 Sink 发出,但是没有接收到响应,Sink 会再次发送数据,此时可能会导致数据的重复