Hadoop-Flume基础实战(2)

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: Hadoop-Flume基础实战(2)

一. Flume安装与配置


  1. Flume官网: http://flume.apache.org


  1. JDK版本要求1.7及以上


  1. 此次下载与安装使用的Flume版本为: apache-flume-1.6.0-bin.tar.gz
    <1> 解压命令:tar -zxvf apache-flume-1.6.0-bin.tar.gz
    <2> 安装目录: /usr/local/src/apache-flume-1.6.0-bin
    <3> 配置环境变量vi ~/.bashrc如下配置:

# new add FLUME_HOME
export FLUME_HOME=/usr/local/src/apache-flume-1.6.0-bin
# new add FLUME_HOME into PATH
export PATH=$FLUME_HOME/bin:$PATH


<4> 完整的~/.bashrc环境变量配置为:

# .bashrc
# User specific aliases and functions
alias rm='rm -i'
alias cp='cp -i'
alias mv='mv -i'
# Source global definitions
if [ -f /etc/bashrc ]; then
        . /etc/bashrc
fi
iptables -F
setenforce 0
hostname master
export JAVA_HOME=/usr/local/src/jdk1.7.0_80
export HADOOP_HOME=/usr/local/src/hadoop-2.6.1
# new add FLUME_HOME
export FLUME_HOME=/usr/local/src/apache-flume-1.6.0-bin
# added by Anaconda3
#export PATH =/root/anaconda3/bin:$PATH
export CLASSPATH=.:$CLASSPATH:$JAVA_HOME/lib
export PATH=$HADOOP_HOME/bin:$JAVA_HOME/bin:$PATH
# new add FLUME_HOME into PATH
export PATH=$FLUME_HOME/bin:$PATH


<5> 重新加载环境变量: source ~/.bashrc


<6> 检查$FLUME_HOME配置是否生效,执行命令echo $FLUME_HOME,并观察:

[root@master ~]# echo $FLUME_HOME
/usr/local/src/apache-flume-1.6.0-bin


二.Flume实战小项目


Flume配置文件存放路径: /usr/local/src/apache-flume-1.6.0-bin/conf

配置说明:


a) 配置source

b) 配置channel

c) 配置sink

d) 把以上三个组件串起来


2.1  NetCat方式


需求: 监听一个ip端口,并将收到的信息输出到console控制台中


<1> 在conf/目录下新增配置文件netcat_console.conf,配置内容如下:

mple.conf: A single-node Flume configuration
# Name the components on this agent
## agent的名称: a1
## a1的source名称: r1
## a1的sink名称: k1
## a1的channel名称为:c1
## 复数表示可以配置多个
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
# 配置agent a1的source r1
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink
# 配置agent a1的sink k1
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
# 配置agent a1的channel c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
# 一个source可以对应多个channel,一个sink只能对应一个channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1


<2> 运行flume-ng

flume-ng agent --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/conf/netcat_console.conf --name a1 -Dflume.root.logger=INFO,console


说明:

flume-ng agent   \
--conf $FLUME_HOME/conf   \   #指定配置文件存放的文件夹
--conf-file $FLUME_HOME/conf/netcat_console.conf  \    #指定配置文件
--name a1   \   #指定agent名称
-Dflume.root.logger=INFO,console


<3> Telnet对应host和端口:

[root@master badou]# telnet localhost 44444
Trying ::1...
telnet: connect to address ::1: Connection refused
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
111
OK
222
OK
333
OK


观察flume logger


2.2 Exec方式


需求:监听一个日志文件的变化,并实时将文件新增内容,输出到console控制台中

<1> 在conf/目录下新增配置文件exec_console.conf,配置内容如下:

mple.conf: A single-node Flume configuration
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /usr/local/src/flume_test.txt
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1


<2> 运行flume-ng


执行命令:

flume-ng agent --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/conf/exec_console.conf --name a1 -Dflume.root.logger=INFO,console


<3> 向对应文件尾部追加内容:

echo 111 >> /usr/local/src/flume_test.txt


观察flume logger.


2.3 HDFS


**需求: **通过flume将指定的文件,上传到hdfs中,并指定位置与命名规则

<1> 在conf/目录下新增配置文件avro_hdfs.conf,配置内容如下:

mple.conf: A single-node Flume configuration
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 41414
# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://master:9000/flume_data_pool
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.fileType=DataStream
a1.sinks.k1.hdfs.writeFormat=Text
a1.sinks.k1.hdfs.roundSize = 0
a1.sinks.k1.hdfs.roundCount = 600000
a1.sinks.k1.hdfs.roundInterval = 600
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1


<2> 运行flume-ng


执行命令:

flume-ng agent --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/conf/avro_hdfs.conf --name a1 -Dflume.root.logger=INFO,console


<3> 验证

flume-ng avro-client --conf conf -H master -p 41414 -F /usr/local/src/flume_test.txt -Dflume.root.logger=DEBUG,console


执行hdfs命令查看文件是否存在:

hadoop fs -ls /
# 查看文件内容是否一致:
hadoop fs -text /flume_data_pool/events-.1524279392273


2.4 模拟使用Flume监听日志变化,并且把增量日志文件写入到hdfs中


<1> 在conf/目录下新增配置文件exec_hdfs.conf,配置内容如下:

mple.conf: A single-node Flume configuration
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
##
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /usr/local/src/flume_test/monitor_source/1.log
# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://master:9000/flume/tailout/%y-%m-%d/%H%M/
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.fileType=DataStream
a1.sinks.k1.hdfs.round=true
a1.sinks.k1.hdfs.roundValue=1
a1.sinks.k1.hdfs.rountUnit=minute
a1.sinks.k1.hdfs.writeFormat=Text
a1.sinks.k1.hdfs.roundSize = 20
a1.sinks.k1.hdfs.roundCount = 5
a1.sinks.k1.hdfs.roundInterval = 3
a1.sinks.k1.hdfs.bathchSize=10
a1.sinks.k1.hdfs.useLocalTimeStamp=true
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1


<2> 运行flume-ng


执行命令:

flume-ng agent --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/conf/exec_hdfs.conf --name a1 -Dflume.root.logger=INFO,console


<3> 验证

echo 111 >> /usr/local/src/flume_test/monitor_source/1.log


根据日志查看比对内容:

hadoop fs -text /flume/tailout/18-04-21/1104/events-.1524279852216


相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
相关文章
|
7月前
|
数据采集 消息中间件 监控
Flume数据采集系统设计与配置实战:面试经验与必备知识点解析
【4月更文挑战第9天】本文深入探讨Apache Flume的数据采集系统设计,涵盖Flume Agent、Source、Channel、Sink的核心概念及其配置实战。通过实例展示了文件日志收集、网络数据接收、命令行实时数据捕获等场景。此外,还讨论了Flume与同类工具的对比、实际项目挑战及解决方案,以及未来发展趋势。提供配置示例帮助理解Flume在数据集成、日志收集中的应用,为面试准备提供扎实的理论与实践支持。
322 1
|
7月前
|
存储 SQL Shell
bigdata-13-Flume实战
bigdata-13-Flume实战
50 0
|
SQL 分布式计算 Hadoop
大数据行业部署实战1:Hadoop伪分布式部署
大数据行业部署实战1:Hadoop伪分布式部署
494 0
|
7月前
|
分布式计算 Java 大数据
【大数据技术Hadoop+Spark】HDFS Shell常用命令及HDFS Java API详解及实战(超详细 附源码)
【大数据技术Hadoop+Spark】HDFS Shell常用命令及HDFS Java API详解及实战(超详细 附源码)
747 0
|
7月前
|
分布式计算 大数据 Scala
【大数据技术Hadoop+Spark】Spark RDD创建、操作及词频统计、倒排索引实战(超详细 附源码)
【大数据技术Hadoop+Spark】Spark RDD创建、操作及词频统计、倒排索引实战(超详细 附源码)
352 1
|
6月前
|
存储 分布式计算 Hadoop
Hadoop Distributed File System (HDFS): 概念、功能点及实战
【6月更文挑战第12天】Hadoop Distributed File System (HDFS) 是 Hadoop 生态系统中的核心组件之一。它设计用于在大规模集群环境中存储和管理海量数据,提供高吞吐量的数据访问和容错能力。
700 4
|
2月前
Hadoop-09-HDFS集群 JavaClient 代码上手实战!详细附代码 安装依赖 上传下载文件 扫描列表 PUT GET 进度条显示(二)
Hadoop-09-HDFS集群 JavaClient 代码上手实战!详细附代码 安装依赖 上传下载文件 扫描列表 PUT GET 进度条显示(二)
49 3
|
2月前
|
分布式计算 Java Hadoop
Hadoop-09-HDFS集群 JavaClient 代码上手实战!详细附代码 安装依赖 上传下载文件 扫描列表 PUT GET 进度条显示(一)
Hadoop-09-HDFS集群 JavaClient 代码上手实战!详细附代码 安装依赖 上传下载文件 扫描列表 PUT GET 进度条显示(一)
47 2
|
3月前
|
分布式计算 Hadoop Devops
Hadoop集群配置https实战案例
本文提供了一个实战案例,详细介绍了如何在Hadoop集群中配置HTTPS,包括生成私钥和证书文件、配置keystore和truststore、修改hdfs-site.xml和ssl-client.xml文件,以及重启Hadoop集群的步骤,并提供了一些常见问题的故障排除方法。
94 3
|
3月前
|
分布式计算 监控 Hadoop
监控Hadoop集群实战篇
介绍了监控Hadoop集群的方法,包括监控Linux服务器、Hadoop指标、使用Ganglia监控Hadoop集群、Hadoop日志记录、通过Hadoop的Web UI进行监控以及其他Hadoop组件的监控,并提供了相关监控工具和资源的推荐阅读链接。
104 2