大数据编程技术基础实验八:Flume实验——文件数据Flume至HDFS

本文涉及的产品
注册配置 MSE Nacos/ZooKeeper,118元/月
日志服务 SLS,月写入数据量 50GB 1个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 大数据技术基础实验八,学习安装部署Flume并将写入Flume的文件数据上传至HDFS。

一、前言

距离上次大数据编程技术基础实验已经过去二十天了,我们的课程并没有结束,是因为学校服务器关闭了一段时间,所以就一直没有做实验,今天我们就继续进行有关大数据的实验。

二、实验目的

  1. 掌握Flume的安装部署
  2. .掌握一个agent中source、sink、channel组件之间的关系
  3. 加深对Flume结构和概念的理解
  4. 掌握Flume的编码方法及启动任务方法

三、实验要求

  1. 在一台机器上(本例以master为例)部署Flume
  2. 实时收集本地hadoop的日志的最新信息然后将收集到日志信息以一分钟一个文件的形式写入HDFS目录中

四、实验原理

Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。

Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力 Flume提供了从console(控制台)、RPC(Thrift-RPC)、text(文件)、tail(UNIX tail)、syslog(syslog日志系统,支持TCP和UDP等2种模式),exec(命令执行)等数据源上收集数据的能力。

当前Flume有两个版本Flume 0.9X版本的统称Flume-og,Flume1.X版本的统称Flume-ng。由于Flume-ng经过重大重构,与Flume-og有很大不同,使用时请注意区分。

Flume-og采用了多Master的方式。为了保证配置数据的一致性,Flume引入了ZooKeeper,用于保存配置数据,ZooKeeper本身可保证配置数据的一致性和高可用,另外,在配置数据发生变化时,ZooKeeper可以通知Flume Master节点。Flume Master间使用gossip协议同步数据。

Flume-ng最明显的改动就是取消了集中管理配置的 Master 和 Zookeeper,变为一个纯粹的传输工具。Flume-ng另一个主要的不同点是读入数据和写出数据现在由不同的工作线程处理(称为 Runner)。 在 Flume-og 中,读入线程同样做写出工作(除了故障重试)。如果写出慢的话(不是完全失败),它将阻塞 Flume 接收数据的能力。这种异步的设计使读入线程可以顺畅的工作而无需关注下游的任何问题。

Flume以agent为最小的独立运行单位。一个agent就是一个JVM。单agent由Source、Sink和Channel三大组件构成,如下图所示。

image-20221028100649379.png

值得注意的是,Flume提供了大量内置的Source、Channel和Sink类型。不同类型的Source,Channel和Sink可以自由组合。组合方式基于用户设置的配置文件,非常灵活。比如:Channel可以把事件暂存在内存里,也可以持久化到本地硬盘上。Sink可以把日志写入HDFS, HBase,甚至是另外一个Source等等。Flume支持用户建立多级流,也就是说,多个agent可以协同工作,并且支持Fan-in、Fan-out、Contextual Routing、Backup Routes,这也正是NB之处。如下图所示:

image-20221028100728485.png

1、flume的特点

flume是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统。支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(比如文本、HDFS、Hbase等)的能力 。

flume的数据流由事件(Event)贯穿始终。事件是Flume的基本数据单位,它携带日志数据(字节数组形式)并且携带有头信息,这些Event由Agent外部的Source生成,当Source捕获事件后会进行特定的格式化,然后Source会把事件推入(单个或多个)Channel中。你可以把Channel看作是一个缓冲区,它将保存事件直到Sink处理完该事件。Sink负责持久化日志或者把事件推向另一个Source。

2、flume的可靠性

当节点出现故障时,日志能够被传送到其他节点上而不会丢失。Flume提供了三种级别的可靠性保障,从强到弱依次分别为:end-to-end(收到数据agent首先将event写到磁盘上,当数据传送成功后,再删除;如果数据发送失败,可以重新发送。),Store on failure(这也是scribe采用的策略,当数据接收方crash时,将数据写到本地,待恢复后,继续发送),Besteffort(数据发送到接收方后,不会进行确认)。

五、实验步骤

1、启动Hadoop集群

因为本实验主要演示Flume安装以及启动一个Flume手机日志信息的例子,而且学校集群能一键搭建,所以在这里就不重复的演示Hadoop集群的部署和启动了,如果有不清楚的朋友可以移步到我之前的博客,里面有详细的步骤:

大数据技术基础实验三:HDFS实验——部署HDFS

在这里我就放出输入jps查看进程的截图:

image-20221028101359078.png

image-20221028101408999.png

image-20221028101418329.png

2、安装并配置Flume

首先学校虚拟机上面有Flume的压缩包,我们只需要将压缩包解压到/usr/cstor目录下,并将flume目录所属用户改成root:root。

tar -zxvf flume-1.5.2.tar.gz -c /usr/cstor

这是解压命令,但学校的虚拟机已经帮我们解压好了所以就不用输这个命令了,我们使用查看命令看看:

ls /usr/cstor/flume/
[2020122145 root@master ~]# ls /usr/cstor/flume/bin        conf      docs  LICENSE  README         tools
CHANGELOG  DEVNOTES  lib   NOTICE   RELEASE-NOTES

接下来我们将所属用户改成root:root。

chown-R root:root /usr/cstor/flume/

然后我们进入解压目录,在conf目录下新建test.conf文件。

cd /usr/cstor/flume/conf
vim test.conf

并添加以下配置内容:

#定义agent中各组件名称agent1.sources=source1
agent1.sinks=sink1
agent1.channels=channel1
# source1组件的配置参数agent1.sources.source1.type=exec
#此处的文件/home/source.log需要手动生成,见后续说明agent1.sources.source1.command=tail -n+0-F /home/source.log
# channel1的配置参数agent1.channels.channel1.type=memory
agent1.channels.channel1.capacity=1000agent1.channels.channel1.transactionCapactiy=100# sink1的配置参数agent1.sinks.sink1.type=hdfs
agent1.sinks.sink1.hdfs.path=hdfs://master:8020/flume/data
agent1.sinks.sink1.hdfs.fileType=DataStream
#时间类型agent1.sinks.sink1.hdfs.useLocalTimeStamp=trueagent1.sinks.sink1.hdfs.writeFormat=TEXT
#文件前缀agent1.sinks.sink1.hdfs.filePrefix=%Y-%m-%d-%H-%M
#60秒滚动生成一个文件agent1.sinks.sink1.hdfs.rollInterval=60#HDFS块副本数agent1.sinks.sink1.hdfs.minBlockReplicas=1#不根据文件大小滚动文件agent1.sinks.sink1.hdfs.rollSize=0#不根据消息条数滚动文件agent1.sinks.sink1.hdfs.rollCount=0#不根据多长时间未收到消息滚动文件agent1.sinks.sink1.hdfs.idleTimeout=0# 将source和sink 绑定到channelagent1.sources.source1.channels=channel1
agent1.sinks.sink1.channel=channel1

image-20221028102328779.png

3、启动Flume并上传文件数据到HDFS

添加成功之后我们在HDFS上创建/flume/data目录:

cd /usr/cstor/hadoop/bin
./hdfs dfs -mkdir /flume
./hdfs dfs -mkdir /flume/data

最后进入Flume安装的bin目录下面:

cd /usr/cstor/flume/bin

启动Flume并开始收集日志信息。:

./flume-ng agent --conf conf --conf-file /usr/cstor/flume/conf/test.conf --name agent1 -Dflume.root.logger=DEBUG,console

image-20221028102858229.png

出现如上图所示结果就代表启动成功,接下来我们再创建一个master节点,然后我们去手动生成消息源,也就是配置文件中的/home/source.log,使用如下命令去不断的写入信息到该文件中:

vi /home/source.log

然后不断重复如下命令写入信息:

echo aa >> /home/source.log
echo aa >> /home/source.log
echo aa >> /home/source.log
echo aa >> /home/source.log
echo aa >> /home/source.log
echo aa >> /home/source.log

最后我们查看这个文件的内容:

cat /home/source.log

image-20221028103442345.png

可以看见我们写入信息成功。

4、查看实验结果

我们进入到hadoop的bin目录下面,然后查看我们创建的文件中是否有我们传入的文件:

cd /usr/cstor/hadoop/bin/
hadoop fs -ls /flume/data

image-20221028103805966.png

可以看出成功出现,证明上传成功。

六、最后我想说

到这里本次实验就结束了,实验内容比较少和简单,大家按步骤来进行就不会出现错误了,有关大数据的实验大家都可以多加练习。

另外想说的是,Flume 是一种分布式、可靠且可用的服务,用于高效收集、聚合和移动大量日志数据。它具有基于流数据流的简单灵活的架构。它具有可调整的可靠性机制以及许多故障转移和恢复机制,具有健壮性和容错性。它使用允许在线分析应用程序的简单可扩展数据模型。

这个组件也非常重要,学校学习的东西都比较基础,如果大家还想深入了解该组件的运用可以上网去查阅一下。

最后,谢谢大家能看完,感谢支持!

相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
目录
相关文章
|
2月前
|
存储 分布式计算 Java
踏上大数据第一步:flume
Flume 是一个分布式、可靠且高效的系统,用于收集、聚合和移动大量日志数据。它是 Apache 顶级项目,广泛应用于 Hadoop 生态系统中。Flume 支持从多种数据源(如 Web 服务器、应用服务器)收集日志,并将其传输到中央存储(如 HDFS、HBase)。其核心组件包括 Source、Channel 和 Sink,分别负责数据获取、临时存储和最终存储。本文还介绍了在 Ubuntu 20.04 上安装 Flume 1.9.0 的步骤,涵盖 JDK 安装、Flume 下载、解压、配置环境变量及验证安装等详细过程。
76 10
|
2月前
|
存储 分布式计算 大数据
Flume+Hadoop:打造你的大数据处理流水线
本文介绍了如何使用Apache Flume采集日志数据并上传至Hadoop分布式文件系统(HDFS)。Flume是一个高可用、可靠的分布式系统,适用于大规模日志数据的采集和传输。文章详细描述了Flume的安装、配置及启动过程,并通过具体示例展示了如何将本地日志数据实时传输到HDFS中。同时,还提供了验证步骤,确保数据成功上传。最后,补充说明了使用文件模式作为channel以避免数据丢失的方法。
89 4
|
2月前
|
分布式计算 大数据 数据处理
技术评测:MaxCompute MaxFrame——阿里云自研分布式计算框架的Python编程接口
随着大数据和人工智能技术的发展,数据处理的需求日益增长。阿里云推出的MaxCompute MaxFrame(简称“MaxFrame”)是一个专为Python开发者设计的分布式计算框架,它不仅支持Python编程接口,还能直接利用MaxCompute的云原生大数据计算资源和服务。本文将通过一系列最佳实践测评,探讨MaxFrame在分布式Pandas处理以及大语言模型数据处理场景中的表现,并分析其在实际工作中的应用潜力。
115 2
|
2月前
|
SQL 运维 大数据
轻量级的大数据处理技术
现代大数据应用架构中,数据中心作为核心,连接数据源与应用,承担着数据处理与服务的重要角色。然而,随着数据量的激增,数据中心面临运维复杂、体系封闭及应用间耦合性高等挑战。为缓解这些问题,一种轻量级的解决方案——esProc SPL应运而生。esProc SPL通过集成性、开放性、高性能、数据路由和敏捷性等特性,有效解决了现有架构的不足,实现了灵活高效的数据处理,特别适用于应用端的前置计算,降低了整体成本和复杂度。
|
3月前
|
机器学习/深度学习 存储 大数据
在大数据时代,高维数据处理成为难题,主成分分析(PCA)作为一种有效的数据降维技术,通过线性变换将数据投影到新的坐标系
在大数据时代,高维数据处理成为难题,主成分分析(PCA)作为一种有效的数据降维技术,通过线性变换将数据投影到新的坐标系,保留最大方差信息,实现数据压缩、去噪及可视化。本文详解PCA原理、步骤及其Python实现,探讨其在图像压缩、特征提取等领域的应用,并指出使用时的注意事项,旨在帮助读者掌握这一强大工具。
175 4
|
3月前
|
机器学习/深度学习 存储 大数据
云计算与大数据技术的融合应用
云计算与大数据技术的融合应用
|
4月前
|
分布式计算 Kubernetes Hadoop
大数据-82 Spark 集群模式启动、集群架构、集群管理器 Spark的HelloWorld + Hadoop + HDFS
大数据-82 Spark 集群模式启动、集群架构、集群管理器 Spark的HelloWorld + Hadoop + HDFS
242 6
|
4月前
|
SQL 分布式计算 关系型数据库
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
165 0
|
4月前
|
SQL 分布式计算 关系型数据库
Hadoop-23 Sqoop 数据MySQL到HDFS(部分) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-23 Sqoop 数据MySQL到HDFS(部分) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
71 0
|
4月前
|
SQL 分布式计算 关系型数据库
Hadoop-22 Sqoop 数据MySQL到HDFS(全量) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-22 Sqoop 数据MySQL到HDFS(全量) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
93 0