大数据技术之 -- flume

本文涉及的产品
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 大数据技术之 -- flume

1.Flume简介


Flume提供一个分布式的,可靠的,对大数据量的日志进行高效收集、聚集、移动的服务,Flume只能在Unix环境下运行。


Flume基于流式架构,容错性强,也很灵活简单。


Flume、Kafka用来实时进行数据收集,Spark、Flink用来实时处理数据,impala用来实时查询。


2.Flume角色


1dc618a0ed9580ce8bfa6facb208c08f.png


2.1、Source


用于采集数据,Source是产生数据流的地方,同时Source会将产生的数据流传输到Channel,这个有点类似于Java IO部分的Channel。


2.2、Channel


用于桥接Sources和Sinks,类似于一个队列。


2.3、Sink


从Channel收集数据,将数据写到目标源(可以是下一个Source,也可以是HDFS或者HBase)。


2.4、Event


传输单元,Flume数据传输的基本单元,以事件的形式将数据从源头送至目的地。


3.Flume传输过程


source监控某个文件或数据流,数据源产生新的数据,拿到该数据后,将数据封装在一个Event中,并put到channel后commit提交,channel队列先进先出,sink去channel队列中拉取数据,然后写入到HDFS中。


4.Flume部署及使用


4.1、文件配置


查询JAVA_HOME: echo $JAVA_HOME


显示/opt/module/jdk1.8.0_144 /opt/module/jdk1.8.0_144


安装Flume


[itstar@bigdata113 software]$ tar -zxvf apache-flume1.8.0-bin.tar.gz -C /opt/module/


改名:


[itstar@bigdata113 conf]$ mv flume-env.sh.template flume-env.sh

flume-env.sh涉及修改项:

export JAVA_HOME=/opt/module/jdk1.8.0_144


Linux中文件上传命令是:rz,下载:sz 如果没有这两个命令使用yum下载:yum install lrzsz


案例


案例一:监控端口数据


标:Flume监控一端Console,另一端Console发送消息,使被监控端实时显示。

分步实现:


安装telnet工具

yum -y install telnet

5d4c6812c8535adbb050f4ddf2e1bce8.png

创建Flume Agent配置文件flume-telnet.conf

#定义Agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#定义netcatsource
a1.sources.r1.type = netcat
a1.sources.r1.bind = bigdata111
a1.sources.r1.port = 44445
# 定义sink
a1.sinks.k1.type = logger
# 定义channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 双向链接
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1


判断44444端口是否被占用

$netstat -tunlp | grep 44445


启动flume配置文件


/opt/module/flume-1.8.0/bin/flume-ng agent \ --conf /opt/module/flume1.8.0/conf/ \ --name a1 \ --conf-file /opt/module/flume-1.8.0/jobconf/flume-telnet.conf \ -Dflume.root.logger==INFO,console
使用telnet工具向本机的44444端口发送内容
$ telnet bigdata111 44445


案例二:实时读取本地文件到HDFS


创建flume-hdfs.conf文件
# 1 agent
a2.sources = r2
a2.sinks = k2
a2.channels = c2
# 2 source
a2.sources.r2.type = exec
a2.sources.r2.command = tail -F /opt/plus
a2.sources.r2.shell = /bin/bash -c
# 3 sink
a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://bigdata111:9000/flume/%Y%m%d/%H
#上传文件的前缀
a2.sinks.k2.hdfs.filePrefix = logs-
#是否按照时间滚动文件夹
a2.sinks.k2.hdfs.round = true
#多少时间单位创建一个新的文件夹
a2.sinks.k2.hdfs.roundValue = 1
#重新定义时间单位
a2.sinks.k2.hdfs.roundUnit = hour
#是否使用本地时间戳
a2.sinks.k2.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
a2.sinks.k2.hdfs.batchSize = 1000
#设置文件类型,可支持压缩
a2.sinks.k2.hdfs.fileType = DataStream
#多久生成一个新的文件
a2.sinks.k2.hdfs.rollInterval = 600
#设置每个文件的滚动大小
a2.sinks.k2.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a2.sinks.k2.hdfs.rollCount = 0
#最小副本数
a2.sinks.k2.hdfs.minBlockReplicas = 1
# Use a channel which buffers events in memory
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2


3)执行监控配置


/opt/module/flume1.8.0/bin/flume-ng agent \ --conf /opt/module/flume1.8.0/conf/ \ --name a2 \ --conf-file /opt/module/flume1.8.0/jobconf/flume-hdfs.conf


案例三:实时读取目录文件到HDFS


目标:使用flume监听整个目录的文件

分步实现:


1)创建配置文件flume-dir.conf


#1 Agent
a3.sources = r3
a3.sinks = k3
a3.channels = c3
#2 source
#监控目录的类型
a3.sources.r3.type = spooldir
#监控目录的路径
a3.sources.r3.spoolDir = /opt/module/flume1.8.0/upload
#哪个文件上传hdfs,然后给这个文件添加一个后缀
a3.sources.r3.fileSuffix = .COMPLETED
a3.sources.r3.fileHeader = true
#忽略所有以.tmp结尾的文件,不上传(可选)
a3.sources.r3.ignorePattern = ([^ ]*\.tmp)
# 3 sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://bigdata111:9000/flume/%H
#上传文件的前缀
a3.sinks.k3.hdfs.filePrefix = upload-
#是否按照时间滚动文件夹
a3.sinks.k3.hdfs.round = true
#多少时间单位创建一个新的文件夹
a3.sinks.k3.hdfs.roundValue = 1
#重新定义时间单位
a3.sinks.k3.hdfs.roundUnit = hour
#是否使用本地时间戳
a3.sinks.k3.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
a3.sinks.k3.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a3.sinks.k3.hdfs.fileType = DataStream
#多久生成一个新的文件
a3.sinks.k3.hdfs.rollInterval = 600
#设置每个文件的滚动大小大概是128M
a3.sinks.k3.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a3.sinks.k3.hdfs.rollCount = 0
#最小副本数
a3.sinks.k3.hdfs.minBlockReplicas = 1
# Use a channel which buffers events in memory
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3


执行测试:执行如下脚本后,请向upload文件夹中添加文件试试

/opt/module/flume1.8.0/bin/flume-ng agent \ --conf /opt/module/flume1.8.0/conf/ \ --name a3 \ --conf-file /opt/module/flume1.8.0/jobconf/flume-dir.conf

尖叫提示:

在使用Spooling Directory Source时


不要在监控目录中创建并持续修改文件

上传完成的文件会以.COMPLETED结尾

被监控文件夹每500毫秒扫描一次文件变动


案例四:Flume与Flume之间数据传递:单Flume多Channel、Sink


1dc618a0ed9580ce8bfa6facb208c08f.png

目标:使用flume1监控文件变动,flume1将变动内容传递给flume-2,flume-2负责存储到HDFS。同时flume1将变动内容传递给flume-3,flume-3负责输出到local

分步实现:


1)创建flume1.conf,用于监控某文件的变动,同时产生两个channel和两个sink分别输送给flume2和flume3:


# 1.agent     source->channel对应关系1/n    sink->channel对应关系1/1
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
# 将数据流复制给多个channel
a1.sources.r1.selector.type = replicating
# 2.source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/plus
a1.sources.r1.shell = /bin/bash -c
# 3.sink1
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = bigdata112
a1.sinks.k1.port = 4141
# sink2
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = bigdata113
a1.sinks.k2.port = 4141
# 4.channel—1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 4.channel—2
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2


2)创建flume-2.conf,用于接收flume1的event,同时产生1个channel和1个sink,将数据输送给hdfs:


# 1 agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1
# 2 source
a2.sources.r1.type = avro
a2.sources.r1.bind = bigdata112
a2.sources.r1.port = 4141
# 3 sink
a2.sinks.k1.type = hdfs
a2.sinks.k1.hdfs.path = hdfs://bigdata111:9000/flume2/%H
#上传文件的前缀
a2.sinks.k1.hdfs.filePrefix = flume2-
#是否按照时间滚动文件夹
a2.sinks.k1.hdfs.round = true
#多少时间单位创建一个新的文件夹
a2.sinks.k1.hdfs.roundValue = 1
#重新定义时间单位
a2.sinks.k1.hdfs.roundUnit = hour
#是否使用本地时间戳
a2.sinks.k1.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
a2.sinks.k1.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a2.sinks.k1.hdfs.fileType = DataStream
#多久生成一个新的文件
a2.sinks.k1.hdfs.rollInterval = 600
#设置每个文件的滚动大小大概是128M
a2.sinks.k1.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a2.sinks.k1.hdfs.rollCount = 0
#最小副本数
a2.sinks.k1.hdfs.minBlockReplicas = 1
# 4 channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
#5 Bind 
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1


3) 创建flume-3.conf,用于接收flume1的event,同时产生1个channel和1个sink,将数据输送给本地目录:


#1 agent
a3.sources = r1
a3.sinks = k1
a3.channels = c1
# 2 source
a3.sources.r1.type = avro
a3.sources.r1.bind = bigdata113
a3.sources.r1.port = 4141
#3 sink
a3.sinks.k1.type = file_roll
#备注:此处的文件夹需要先创建好
a3.sinks.k1.sink.directory = /opt/flume3
# 4 channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100
# 5 Bind
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1


尖叫提示:输出的本地目录必须是已经存在的目录,如果该目录不存在,并不会创建新的目录。


4) 执行测试:分别开启对应flume-job(依次启动flume1,flume-2,flume-3),同时产生文件变动并观察结果:


$ bin/flume-ng agent --conf conf/ --name a1 --conf-file jobconf/flume1.conf
$ bin/flume-ng agent --conf conf/ --name a2 --conf-file jobconf/flume2.conf
$ bin/flume-ng agent --conf conf/ --name a3 --conf-file jobconf/flume3.conf


案例五:Flume与Flume之间数据传递,多Flume汇总数据到单Flume


5d4c6812c8535adbb050f4ddf2e1bce8.png

目标:flume11监控文件hive.log,flume-22监控某一个端口的数据流,flume11与flume-22将数据发送给flume-33,flume33将最终数据写入到HDFS。

分步实现:


1 )创建flume11.conf,用于监控hive.log文件,同时sink数据到flume-33:


# 1 agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 2 source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/plus
a1.sources.r1.shell = /bin/bash -c
# 3 sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = bigdata113
a1.sinks.k1.port = 4141
# 4 channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 5. Bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1


2)创建flume-22.conf,用于监控端口44444数据流,同时sink数据到flume-33:


# 1 agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1
# 2 source
a2.sources.r1.type = netcat
a2.sources.r1.bind = bigdata112
a2.sources.r1.port = 44444
#3 sink
a2.sinks.k1.type = avro
a2.sinks.k1.hostname = bigdata113
a2.sinks.k1.port = 4141
# 4 channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
# 5 Bind
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1


3)创建flume33.conf,用于接收flume11与flume22发送过来的数据流,最终合并后sink到HDFS:

# 1 agent
a3.sources = r1
a3.sinks = k1
a3.channels = c1
# 2 source
a3.sources.r1.type = avro
a3.sources.r1.bind = bigdata113
a3.sources.r1.port = 4141
# 3 sink
a3.sinks.k1.type = hdfs
a3.sinks.k1.hdfs.path = hdfs://bigdata111:9000/flume3/%H
#上传文件的前缀
a3.sinks.k1.hdfs.filePrefix = flume3-
#是否按照时间滚动文件夹
a3.sinks.k1.hdfs.round = true
#多少时间单位创建一个新的文件夹
a3.sinks.k1.hdfs.roundValue = 1
#重新定义时间单位
a3.sinks.k1.hdfs.roundUnit = hour
#是否使用本地时间戳
a3.sinks.k1.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
a3.sinks.k1.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a3.sinks.k1.hdfs.fileType = DataStream
#多久生成一个新的文件
a3.sinks.k1.hdfs.rollInterval = 600
#设置每个文件的滚动大小大概是128M
a3.sinks.k1.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a3.sinks.k1.hdfs.rollCount = 0
#最小冗余数
a3.sinks.k1.hdfs.minBlockReplicas = 1
# 4 channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100
# 5 Bind
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1


4) 执行测试:分别开启对应flume-job(依次启动flume-33,flume-22,flume11),同时产生文件变动并观察结果:


$ bin/flume-ng agent --conf conf/ --name a3 --conf-file jobconf/flume33.conf
$ bin/flume-ng agent --conf conf/ --name a2 --conf-file jobconf/flume22.conf
$ bin/flume-ng agent --conf conf/ --name a1 --conf-file jobconf/flume11.conf


数据发送


telnet bigdata111 44444 打开后发送5555555
在/opt/plus 中追加666666


相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps 
相关文章
|
3月前
|
存储 机器学习/深度学习 分布式计算
大数据技术——解锁数据的力量,引领未来趋势
【10月更文挑战第5天】大数据技术——解锁数据的力量,引领未来趋势
|
2月前
|
存储 机器学习/深度学习 SQL
大数据处理与分析技术
大数据处理与分析技术
161 2
|
3月前
|
存储 分布式计算 数据可视化
大数据常用技术与工具
【10月更文挑战第16天】
205 4
|
2月前
|
存储 分布式计算 NoSQL
【赵渝强老师】大数据技术的理论基础
本文介绍了大数据平台的核心思想,包括Google的三篇重要论文:Google文件系统(GFS)、MapReduce分布式计算模型和BigTable大表。这些论文奠定了大数据生态圈的技术基础,进而发展出了Hadoop、Spark和Flink等生态系统。文章详细解释了GFS的架构、MapReduce的计算过程以及BigTable的思想和HBase的实现。
119 0
|
3月前
|
存储 数据采集 监控
大数据技术:开启智能决策与创新服务的新纪元
【10月更文挑战第5天】大数据技术:开启智能决策与创新服务的新纪元
|
19天前
|
存储 分布式计算 Java
踏上大数据第一步:flume
Flume 是一个分布式、可靠且高效的系统,用于收集、聚合和移动大量日志数据。它是 Apache 顶级项目,广泛应用于 Hadoop 生态系统中。Flume 支持从多种数据源(如 Web 服务器、应用服务器)收集日志,并将其传输到中央存储(如 HDFS、HBase)。其核心组件包括 Source、Channel 和 Sink,分别负责数据获取、临时存储和最终存储。本文还介绍了在 Ubuntu 20.04 上安装 Flume 1.9.0 的步骤,涵盖 JDK 安装、Flume 下载、解压、配置环境变量及验证安装等详细过程。
63 10
|
19天前
|
存储 分布式计算 大数据
Flume+Hadoop:打造你的大数据处理流水线
本文介绍了如何使用Apache Flume采集日志数据并上传至Hadoop分布式文件系统(HDFS)。Flume是一个高可用、可靠的分布式系统,适用于大规模日志数据的采集和传输。文章详细描述了Flume的安装、配置及启动过程,并通过具体示例展示了如何将本地日志数据实时传输到HDFS中。同时,还提供了验证步骤,确保数据成功上传。最后,补充说明了使用文件模式作为channel以避免数据丢失的方法。
56 4
|
20天前
|
分布式计算 大数据 数据处理
技术评测:MaxCompute MaxFrame——阿里云自研分布式计算框架的Python编程接口
随着大数据和人工智能技术的发展,数据处理的需求日益增长。阿里云推出的MaxCompute MaxFrame(简称“MaxFrame”)是一个专为Python开发者设计的分布式计算框架,它不仅支持Python编程接口,还能直接利用MaxCompute的云原生大数据计算资源和服务。本文将通过一系列最佳实践测评,探讨MaxFrame在分布式Pandas处理以及大语言模型数据处理场景中的表现,并分析其在实际工作中的应用潜力。
57 2
|
1月前
|
SQL 运维 大数据
轻量级的大数据处理技术
现代大数据应用架构中,数据中心作为核心,连接数据源与应用,承担着数据处理与服务的重要角色。然而,随着数据量的激增,数据中心面临运维复杂、体系封闭及应用间耦合性高等挑战。为缓解这些问题,一种轻量级的解决方案——esProc SPL应运而生。esProc SPL通过集成性、开放性、高性能、数据路由和敏捷性等特性,有效解决了现有架构的不足,实现了灵活高效的数据处理,特别适用于应用端的前置计算,降低了整体成本和复杂度。
|
2月前
|
机器学习/深度学习 存储 大数据
在大数据时代,高维数据处理成为难题,主成分分析(PCA)作为一种有效的数据降维技术,通过线性变换将数据投影到新的坐标系
在大数据时代,高维数据处理成为难题,主成分分析(PCA)作为一种有效的数据降维技术,通过线性变换将数据投影到新的坐标系,保留最大方差信息,实现数据压缩、去噪及可视化。本文详解PCA原理、步骤及其Python实现,探讨其在图像压缩、特征提取等领域的应用,并指出使用时的注意事项,旨在帮助读者掌握这一强大工具。
104 4
下一篇
开通oss服务