数据湖实操讲解【OSS 访问加速】第八讲:Flume 高效写入 OSS

本文涉及的产品
EMR Serverless StarRocks,5000CU*H 48000GB*H
简介: 数据湖 JindoFS+OSS 实操干货 36讲 每周二16点准时直播!扫文章底部二维码入钉群,线上准时观看~ Github链接: https://github.com/aliyun/alibabacloud-jindofs

本期导读 :【OSS 访问加速】第八讲


主题:Flume 高效写入 OSS


讲师:焱冰,阿里巴巴计算平台事业部 EMR 技术专家


内容框架:

  • Flume 简介
  • Flume 常用组件
  • Flume 使用 JindoFS SDK
  • Flume 实战 JindoFS SDK


直播回放链接:(7/8讲)

https://developer.aliyun.com/live/246851



一、Flume 简介

Apache Flume 简介

  • Apache Flume 是 Apache 基金会的一个顶级项目,以下简称 Flume。
  • Flume 是一个分布式、可靠、高可用的系统,支持从不同数据源高效地收集、聚合、迁移大量日志数据,聚合到中心化的数据存储服务。
  • Flume 使用最多的场景是日志收集,也可以通过定制 Source 来传输其他不同类型的数据。
  • E-MapReduce 从 3.16.0 版本开始支持 Apache Flume。


image.png


Flume 中的概念及术语

image.png


一个 Flume Agent 由 Source、Channel、Sink 组成。


Event

  • 数据流通过 Flume Agent 的基本单位。
  • Event 由一个装载字节数组负载(Payload)和一个可选的字符串属性集合组成。

image.png


Source

  • 数据源收集器,从外部数据源收集数据,并发送到 Channel。


Channel

  • Source 和 Sink 之间的缓冲队列。

Sink

  • 从 Channel 中获取 Event ,并将以事务的形式 commit 到外部存储中。一旦事务 commit 成功,该 Event 会从 Channel 中移除。



二、Flume 常用组件


常用组件介绍

常见 Source

  • Avro Source:通过监听 Avro 端口获取 Avro Client 发送的事件。Avro 是 Hadoop 提供的一种协议,用于序列化反序列化数据。
  • Exec Source:通过监听命令行输出获取数据,如 tail -f /var/log/messages。
  • NetCat TCP Source: 监听指定 tcp 端口获取数据。类似的还有 Netcat UDP Source。
  • Taildir Source: 监控目录下的多个文件,会记录偏移量,不会丢失数据,最为常用。


常见 Channel

  • Memory Channel: 缓存到内存中,性能高,最为常用。
  • File Channel: 缓存到文件中,会记录 checkpoint 和 data 文件,可靠性高,但性能较差。
  • JDBC Channel: 缓存到关系型数据库中。
  • Kakfa Channel:通过 Kafka 来缓存数据。


常见 Sink

  • Logger Sink: 用于测试
  • Avro Sink: 转换成 Avro Event,主要用于连接多个 Flume Agent。
  • HDFS Sink: 写入 HDFS,最为常用。
  • Hive sink: 写入 Hive 表或分区,使用 Hive 事务写 events。
  • Kafka sink: 写入 Kafka。


文档

  • 官方文档:
    https://flume.apache.org/documentation.html
  • 中文文档:
    https://flume.liyifeng.org/


三、Flume 使用 JindoFS SDK


Flume 使用 JindoFS SDK 写入 OSS

环境要求

在集群上已经部署 Flume,已部署 JindoSDK 3.4 以上版本。


为什么需要使用 JindoFS SDK 写入 OSS

Flume 通过 flush() 调用保证事务性写入,OSS 本身不支持 Flush 功能,通过 JindoFS SDK 写入 OSS,虽然不能让 flush 后的数据立刻可见,但是可以保证 flush() 后的数据不丢失,Flume 作业失败后,可以使用 JindoFS 命令恢复 flush 过的数据。


配置示例

xxx.sinks.oss_sink.hdfs.path = oss://${your_bucket}/flume_dir/%Y-%m-%d/%H 
xxx.sinks.oss_sink.hdfs.batchSize = 100000 
xxx.sinks.oss_sink.hdfs.round = true
xxx.sinks.oss_sink.hdfs.roundValue = 15
xxx.sinks.oss_sink.hdfs.Unit = minute
xxx.sinks.oss_sink.hdfs.filePrefix = your_topic
xxx.sinks.oss_sink.rollSize = 3600
xxx.sinks.oss_sink.threadsPoolSize = 30

- 文档链接⭐

https://github.com/aliyun/alibabacloud-jindofs/blob/master/docs/flume/jindofs_sdk_on_flume_for_oss.md


在 EMR 集群内对 Flush 文件恢复

jindo jfs -recover [-R]
                   [-flushStagingPath {flushStagingPath}]
                   [-accessKeyId ${accessKeyId}]
                   [-accessKeySecret ${accessKeySecret}]
                   <path>

注:如需递归恢复(-R),建议先停止 Flume 任务,避免 Flume 任务运行异常。


在 EMR 集群外对 Flush 文件恢复

JindoOssFileSystem jindoFileSystem = (JindoOssFileSystem) fs; 
boolean isFolder = true; 
jindoFileSystem.recover(path, isFolder);

- 文档链接

https://github.com/aliyun/alibabacloud-jindofs/blob/master/docs/flume/jindofs_sdk_on_flume_for_oss.md


四、Flume 实战 JindoFS SDK


自建Flume 使用 JindoFS SDK 压缩写入 OSS

环境准备

Hadoop-2.8.5


下载

Flume-1.9.0:wgethttps://downloads.apache.org/flume/1.9.0/apache-flume-1.9.0-bin.tar.gz


添加依赖

cd $HADOOP_HOME/share/hadoop/httpfs/tomcat/webapps/webhdfs/WEB-INF/lib
cp commons-configuration-1.6.jar $FLUME_HOME/lib
cp hadoop-auth-2.8.5.jar $FLUME_HOME/lib
cp hadoop-common-2.8.5.jar $FLUME_HOME/lib
cp hadoop-hdfs-2.8.5.jar $FLUME_HOME/lib
cp commons-io-2.4.jar $FLUME_HOME/lib
cp htrace-core4-4.0.1-incubating.jar $FLUME_HOME/lib
wget https://smartdata-binary.oss-cn-shanghai.aliyuncs.com/jindofs-sdk-3.5.0.jar -O 
$FLUME_HOME/lib/jindofs-sdk-3.5.0.jar

配置 JindoFS SDK

https://github.com/aliyun/alibabacloud-jindofs/blob/master/docs/jindofs_sdk_how_to_hadoop.md


配置

a1.sources = r1

a1.sinks = k1

a1.channels = c1


a1.sources.r1.type = exec

a1.sources.r1.command = tail -F /tmp/test.log


a1.channels.c1.type = memory

a1.channels.c1.capacity = 10000

a1.channels.c1.transactionCapacity = 20


a1.sinks.k1.type = hdfs

a1.sinks.k1.hdfs.path = oss://yanbin-hd2-test/%Y-%m-%d/%H

a1.sinks.k1.hdfs.filePrefix = test

a1.sinks.k1.hdfs.batchSize = 20

a1.sinks.k1.hdfs.codeC = gzip

a1.sinks.k1.hdfs.fileType = CompressedStream

a1.sinks.k1.rollCount = 20

a2.sinks.k1.hdfs.minBlockReplicas = 1


a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1


日志仿真

while true; do echo `date` >> /tmp/test.log; sleep 1; done

Flume 启动

bin/flume-ng agent --name a1 -c conf -f conf/flume-exec-oss.conf  -Dflume.root.logger=INFO,console

结果

image.png


直接观看第四课(7/8讲)视频回放,获取实例讲解~

https://developer.aliyun.com/live/246851





Github链接:

https://github.com/aliyun/alibabacloud-jindofs


不错过每次直播信息、探讨更多数据湖 JindoFS+OSS 相关技术问题,欢迎扫码加入钉钉交流群!

image.png



相关实践学习
借助OSS搭建在线教育视频课程分享网站
本教程介绍如何基于云服务器ECS和对象存储OSS,搭建一个在线教育视频课程分享网站。
相关文章
|
4天前
|
存储 应用服务中间件 开发工具
对象存储OSS-Python设置代理访问请求
通过 Python SDK 配置 nginx 代理地址请求阿里云 OSS 存储桶服务。示例代码展示了如何使用 RAM 账号进行身份验证,并通过代理下载指定对象到本地文件。
37 15
|
2月前
|
存储 人工智能 缓存
AI助理直击要害,从繁复中提炼精华——使用CDN加速访问OSS存储的图片
本案例介绍如何利用AI助理快速实现OSS存储的图片接入CDN,以加速图片访问。通过AI助理提炼关键操作步骤,避免在复杂文档中寻找解决方案。主要步骤包括开通CDN、添加加速域名、配置CNAME等。实测显示,接入CDN后图片加载时间显著缩短,验证了加速效果。此方法大幅提高了操作效率,降低了学习成本。
5467 16
|
2月前
|
存储 网络安全 对象存储
缺乏中间证书导致通过HTTPS协议访问OSS异常
【10月更文挑战第4天】缺乏中间证书导致通过HTTPS协议访问OSS异常
131 4
|
4月前
|
存储 机器学习/深度学习 弹性计算
阿里云EMR数据湖文件系统问题之OSS-HDFS全托管服务的问题如何解决
阿里云EMR数据湖文件系统问题之OSS-HDFS全托管服务的问题如何解决
|
5月前
|
分布式计算 DataWorks 数据处理
MaxCompute操作报错合集之UDF访问OSS,配置白名单后出现报错,是什么原因
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
|
5月前
|
域名解析 Serverless API
函数计算产品使用问题之如何配置自定义域名访问OSS中的内容
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
6月前
|
分布式计算 DataWorks MaxCompute
MaxCompute操作报错合集之在Spark访问OSS时出现证书错误的问题,该如何解决
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
|
6月前
|
存储 运维 Serverless
Serverless 应用引擎产品使用合集之如何访问相同地域的OSS
阿里云Serverless 应用引擎(SAE)提供了完整的微服务应用生命周期管理能力,包括应用部署、服务治理、开发运维、资源管理等功能,并通过扩展功能支持多环境管理、API Gateway、事件驱动等高级应用场景,帮助企业快速构建、部署、运维和扩展微服务架构,实现Serverless化的应用部署与运维模式。以下是对SAE产品使用合集的概述,包括应用管理、服务治理、开发运维、资源管理等方面。
|
6月前
|
机器学习/深度学习 人工智能 分布式计算
人工智能平台PAI操作报错合集之在ODPS的xxx_dev项目空间调用easyrec训练,需要访问yyy项目空间的OSS,出现报错,是什么导致的
阿里云人工智能平台PAI (Platform for Artificial Intelligence) 是阿里云推出的一套全面、易用的机器学习和深度学习平台,旨在帮助企业、开发者和数据科学家快速构建、训练、部署和管理人工智能模型。在使用阿里云人工智能平台PAI进行操作时,可能会遇到各种类型的错误。以下列举了一些常见的报错情况及其可能的原因和解决方法。
|
7月前
|
存储 SQL 分布式计算
基于Apache Hudi + MinIO 构建流式数据湖
基于Apache Hudi + MinIO 构建流式数据湖
273 1
下一篇
DataWorks