Hadoop-Flume基础实战(2)

简介: Hadoop-Flume基础实战(2)

一. Flume安装与配置


  1. Flume官网: http://flume.apache.org


  1. JDK版本要求1.7及以上


  1. 此次下载与安装使用的Flume版本为: apache-flume-1.6.0-bin.tar.gz
    <1> 解压命令:tar -zxvf apache-flume-1.6.0-bin.tar.gz
    <2> 安装目录: /usr/local/src/apache-flume-1.6.0-bin
    <3> 配置环境变量vi ~/.bashrc如下配置:

# new add FLUME_HOME
export FLUME_HOME=/usr/local/src/apache-flume-1.6.0-bin
# new add FLUME_HOME into PATH
export PATH=$FLUME_HOME/bin:$PATH


<4> 完整的~/.bashrc环境变量配置为:

# .bashrc
# User specific aliases and functions
alias rm='rm -i'
alias cp='cp -i'
alias mv='mv -i'
# Source global definitions
if [ -f /etc/bashrc ]; then
        . /etc/bashrc
fi
iptables -F
setenforce 0
hostname master
export JAVA_HOME=/usr/local/src/jdk1.7.0_80
export HADOOP_HOME=/usr/local/src/hadoop-2.6.1
# new add FLUME_HOME
export FLUME_HOME=/usr/local/src/apache-flume-1.6.0-bin
# added by Anaconda3
#export PATH =/root/anaconda3/bin:$PATH
export CLASSPATH=.:$CLASSPATH:$JAVA_HOME/lib
export PATH=$HADOOP_HOME/bin:$JAVA_HOME/bin:$PATH
# new add FLUME_HOME into PATH
export PATH=$FLUME_HOME/bin:$PATH


<5> 重新加载环境变量: source ~/.bashrc


<6> 检查$FLUME_HOME配置是否生效,执行命令echo $FLUME_HOME,并观察:

[root@master ~]# echo $FLUME_HOME
/usr/local/src/apache-flume-1.6.0-bin


二.Flume实战小项目


Flume配置文件存放路径: /usr/local/src/apache-flume-1.6.0-bin/conf

配置说明:


a) 配置source

b) 配置channel

c) 配置sink

d) 把以上三个组件串起来


2.1  NetCat方式


需求: 监听一个ip端口,并将收到的信息输出到console控制台中


<1> 在conf/目录下新增配置文件netcat_console.conf,配置内容如下:

mple.conf: A single-node Flume configuration
# Name the components on this agent
## agent的名称: a1
## a1的source名称: r1
## a1的sink名称: k1
## a1的channel名称为:c1
## 复数表示可以配置多个
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
# 配置agent a1的source r1
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink
# 配置agent a1的sink k1
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
# 配置agent a1的channel c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
# 一个source可以对应多个channel,一个sink只能对应一个channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1


<2> 运行flume-ng

flume-ng agent --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/conf/netcat_console.conf --name a1 -Dflume.root.logger=INFO,console


说明:

flume-ng agent   \
--conf $FLUME_HOME/conf   \   #指定配置文件存放的文件夹
--conf-file $FLUME_HOME/conf/netcat_console.conf  \    #指定配置文件
--name a1   \   #指定agent名称
-Dflume.root.logger=INFO,console


<3> Telnet对应host和端口:

[root@master badou]# telnet localhost 44444
Trying ::1...
telnet: connect to address ::1: Connection refused
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
111
OK
222
OK
333
OK


观察flume logger


2.2 Exec方式


需求:监听一个日志文件的变化,并实时将文件新增内容,输出到console控制台中

<1> 在conf/目录下新增配置文件exec_console.conf,配置内容如下:

mple.conf: A single-node Flume configuration
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /usr/local/src/flume_test.txt
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1


<2> 运行flume-ng


执行命令:

flume-ng agent --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/conf/exec_console.conf --name a1 -Dflume.root.logger=INFO,console


<3> 向对应文件尾部追加内容:

echo 111 >> /usr/local/src/flume_test.txt


观察flume logger.


2.3 HDFS


**需求: **通过flume将指定的文件,上传到hdfs中,并指定位置与命名规则

<1> 在conf/目录下新增配置文件avro_hdfs.conf,配置内容如下:

mple.conf: A single-node Flume configuration
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 41414
# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://master:9000/flume_data_pool
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.fileType=DataStream
a1.sinks.k1.hdfs.writeFormat=Text
a1.sinks.k1.hdfs.roundSize = 0
a1.sinks.k1.hdfs.roundCount = 600000
a1.sinks.k1.hdfs.roundInterval = 600
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1


<2> 运行flume-ng


执行命令:

flume-ng agent --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/conf/avro_hdfs.conf --name a1 -Dflume.root.logger=INFO,console


<3> 验证

flume-ng avro-client --conf conf -H master -p 41414 -F /usr/local/src/flume_test.txt -Dflume.root.logger=DEBUG,console


执行hdfs命令查看文件是否存在:

hadoop fs -ls /
# 查看文件内容是否一致:
hadoop fs -text /flume_data_pool/events-.1524279392273


2.4 模拟使用Flume监听日志变化,并且把增量日志文件写入到hdfs中


<1> 在conf/目录下新增配置文件exec_hdfs.conf,配置内容如下:

mple.conf: A single-node Flume configuration
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
##
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /usr/local/src/flume_test/monitor_source/1.log
# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://master:9000/flume/tailout/%y-%m-%d/%H%M/
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.fileType=DataStream
a1.sinks.k1.hdfs.round=true
a1.sinks.k1.hdfs.roundValue=1
a1.sinks.k1.hdfs.rountUnit=minute
a1.sinks.k1.hdfs.writeFormat=Text
a1.sinks.k1.hdfs.roundSize = 20
a1.sinks.k1.hdfs.roundCount = 5
a1.sinks.k1.hdfs.roundInterval = 3
a1.sinks.k1.hdfs.bathchSize=10
a1.sinks.k1.hdfs.useLocalTimeStamp=true
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1


<2> 运行flume-ng


执行命令:

flume-ng agent --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/conf/exec_hdfs.conf --name a1 -Dflume.root.logger=INFO,console


<3> 验证

echo 111 >> /usr/local/src/flume_test/monitor_source/1.log


根据日志查看比对内容:

hadoop fs -text /flume/tailout/18-04-21/1104/events-.1524279852216


相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
相关文章
|
1月前
|
数据采集 消息中间件 监控
Flume数据采集系统设计与配置实战:面试经验与必备知识点解析
【4月更文挑战第9天】本文深入探讨Apache Flume的数据采集系统设计,涵盖Flume Agent、Source、Channel、Sink的核心概念及其配置实战。通过实例展示了文件日志收集、网络数据接收、命令行实时数据捕获等场景。此外,还讨论了Flume与同类工具的对比、实际项目挑战及解决方案,以及未来发展趋势。提供配置示例帮助理解Flume在数据集成、日志收集中的应用,为面试准备提供扎实的理论与实践支持。
37 1
|
2月前
|
存储 SQL Shell
bigdata-13-Flume实战
bigdata-13-Flume实战
26 0
|
8月前
|
SQL 存储 监控
大数据Flume企业开发实战
大数据Flume企业开发实战
38 0
|
5月前
|
SQL 数据采集 数据挖掘
nginx+flume网络流量日志实时数据分析实战
nginx+flume网络流量日志实时数据分析实战
112 0
|
5月前
|
消息中间件 分布式计算 大数据
【大数据技术】Spark+Flume+Kafka实现商品实时交易数据统计分析实战(附源码)
【大数据技术】Spark+Flume+Kafka实现商品实时交易数据统计分析实战(附源码)
97 0
|
消息中间件 大数据 Kafka
Flume+Kafka+Storm实战:二、Flume与Kafka整合
Flume+Kafka+Storm实战:二、Flume与Kafka整合
150 0
Flume+Kafka+Storm实战:二、Flume与Kafka整合
|
消息中间件 Kafka 流计算
Flume+Kafka+Storm实战:一、Kakfa与Storm整合(下)
Flume+Kafka+Storm实战:一、Kakfa与Storm整合(下)
161 0
Flume+Kafka+Storm实战:一、Kakfa与Storm整合(下)
|
消息中间件 Java Kafka
Flume+Kafka+Storm实战:一、Kakfa与Storm整合(上)
Flume+Kafka+Storm实战:一、Kakfa与Storm整合(上)
126 0
Flume+Kafka+Storm实战:一、Kakfa与Storm整合(上)
|
存储 监控 负载均衡
【Flume】(六)Flume 开发实战案例分享3
【Flume】(六)Flume 开发实战案例分享3
186 0
【Flume】(六)Flume 开发实战案例分享3
|
SQL 存储 分布式计算
【Flume】(六)Flume 开发实战案例分享2
【Flume】(六)Flume 开发实战案例分享2
195 0
【Flume】(六)Flume 开发实战案例分享2

相关实验场景

更多