基于Apache Flume Datahub插件将日志数据同步上云

本文涉及的产品
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: ## 简介 Apache Flume是一个分布式的、可靠的、可用的系统,可用于从不同的数据源中高效地收集、聚合和移动海量日志数据到集中式数据存储系统,支持多种Source和Sink插件。本文将介绍如何使用Apache Flume的[Datahub Sink插件](https://github.

本文用到的

阿里云数加-大数据计算服务MaxCompute产品地址:https://www.aliyun.com/product/odps


简介

Apache Flume是一个分布式的、可靠的、可用的系统,可用于从不同的数据源中高效地收集、聚合和移动海量日志数据到集中式数据存储系统,支持多种Source和Sink插件。本文将介绍如何使用Apache Flume的Datahub Sink插件将日志数据实时上传到Datahub。

环境要求

  • JDK (1.7及以上,推荐1.7)
  • Flume-NG 1.x
  • Apache Maven 3.x

插件部署

下载插件压缩包

$ wget http://odps-repo.oss-cn-hangzhou.aliyuncs.com/data-collectors%2Faliyun-flume-datahub-sink-2.0.2.tar.gz

解压插件压缩包

$ tar zxvf flume-datahub-sink-1.1.0.tar.gz
$ ls flume-datahub-sink
lib    libext

部署Datahub Sink插件

将解压后的插件文件夹flume-datahub-sink移动到Apache Flume安装目录下

$ mkdir {YOUR_FLUME_DIRECTORY}/plugins.d
$ mv flume-datahub-sink {YOUR_FLUME_DIRECTORY}/plugins.d/

移动后,核验Datahub Sink插件是否已经在相应目录:

$ ls { YOUR_APACHE_FLUME_DIR }/plugins.d
flume-datahub-sink

配置示例

Flume的原理、架构,以及核心组件的介绍请参考 Flume-ng的原理和使用。本文将构建一个使用Datahub Sink的Flume实例,对日志文件中的结构化数据进行解析,并上传到Datahub Topic中。

需要上传的日志文件格式如下(每行为一条记录,字段之间逗号分隔):

# test_basic.log
some,log,line1
some,log,line2
...

下面将创建Datahub Topic,并把每行日志的第一列和第二列作为一条记录写入Topic中。

创建Datahub Topic

使用Datahub WebConsole创建好Topic,schema为(string c1, string c2),下面假设建好的Topic名为test_topic。

Flume配置文件

在Flume安装目录的conf/文件夹下创建名为datahub_basic.conf的文件,并输入内容如下:

# A single-node Flume configuration for Datahub
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = cat {YOUR_LOG_DIRECTORY}/test_basic.log

# Describe the sink
a1.sinks.k1.type = com.aliyun.datahub.flume.sink.DatahubSink
a1.sinks.k1.datahub.accessID = {YOUR_ALIYUN_DATAHUB_ACCESS_ID}
a1.sinks.k1.datahub.accessKey = {YOUR_ALIYUN_DATAHUB_ACCESS_KEY}
a1.sinks.k1.datahub.endPoint = {YOUR_ALIYUN_DATAHUB_END_POINT}
a1.sinks.k1.datahub.project = test_project
a1.sinks.k1.datahub.topic = test_topic
a1.sinks.k1.batchSize = 1
a1.sinks.k1.serializer = DELIMITED
a1.sinks.k1.serializer.delimiter = ,
a1.sinks.k1.serializer.fieldnames = c1,c2,
a1.sinks.k1.serializer.charset = UTF-8
a1.sinks.k1.shard.number = 1
a1.sinks.k1.shard.maxTimeOut = 60

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 1000

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

这里serializer配置指定了以逗号分隔的形式将输入源解析成三个字段,并忽略第三个字段。

启动Flume

配置完成后,启动Flume并指定agent的名称和配置文件路径,添加-Dflume.root.logger=INFO,console选项可以将日志实时输出到控制台。

$ cd {YOUR_FLUME_DIRECTORY}
$ bin/flume-ng agent -n a1 -c conf -f conf/datahub_basic.conf -Dflume.root.logger=INFO,console

写入成功,显示日志如下:

...
Write success. Event count: 2
...

数据使用

日志数据通过Flume上传到Datahub后,可以使用StreamCompute流计算来进行实时分析,例如对于一些Web网站的日志,可以实时统计各个页面的PV/UV等。另外,导入Datahub的数据也可以配置Connector将数据归档至MaxCompute中,方便后续的离线分析。

对于数据归档MaxCompute的场景,一般来说需要将数据进行分区。Datahub到MaxCompute的归档可以根据MaxCompute表的分区字段自动创建分区,前提是要求MaxCompute和Datahub的字段名以及类型可以完全对应上。如果需要根据日志的传输时间自动设置分区,则在上面的例子中需要指定MaxCompute的分区相应字段和时间格式,例如按小时自动创建分区,添加的配置如下:

a1.sinks.k1.maxcompute.partition.columns = pt
a1.sinks.k1.maxcompute.partition.values = %Y%m%d%H

注意:pt这个字段需要在Datahub Topic以及MaxCompute表中都存在,且是表的分区字段。

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps 
目录
相关文章
|
1月前
|
XML 开发框架 .NET
ASP.NET COR3.1 集成日志插件NLog
ASP.NET COR3.1 集成日志插件NLog
50 0
|
1月前
|
Shell
Flume【问题记录 01】【at org.apache.flume.node.Application.main(Application.java:xxx) 类问题整理+其他类型问题总结】【避坑指南】
【2月更文挑战第17天】Flume【问题记录 01】【at org.apache.flume.node.Application.main(Application.java:xxx) 类问题整理+其他类型问题总结】【避坑指南】
132 2
|
1月前
|
Java Linux
Flume【环境搭建 01】CentOS Linux release 7.5 安装配置 apache-flume-1.9.0 并验证
【2月更文挑战第16天】Flume【环境搭建 01】CentOS Linux release 7.5 安装配置 apache-flume-1.9.0 并验证
68 0
|
1天前
|
SQL Java 数据库连接
IDEA插件(MyBatis Log Free)
IDEA插件(MyBatis Log Free)
9 0
|
7天前
|
安全 Java 编译器
写个代码扫描插件,再也不怕 log4j 等问题
写个代码扫描插件,再也不怕 log4j 等问题
10 0
|
1月前
|
存储 监控 Apache
查询提速11倍、资源节省70%,阿里云数据库内核版 Apache Doris 在网易日志和时序场景的实践
网易的灵犀办公和云信利用 Apache Doris 改进了大规模日志和时序数据处理,取代了 Elasticsearch 和 InfluxDB。Doris 实现了更低的服务器资源消耗和更高的查询性能,相比 Elasticsearch,查询速度提升至少 11 倍,存储资源节省达 70%。Doris 的列式存储、高压缩比和倒排索引等功能,优化了日志和时序数据的存储与分析,降低了存储成本并提高了查询效率。在灵犀办公和云信的实际应用中,Doris 显示出显著的性能优势,成功应对了数据增长带来的挑战。
查询提速11倍、资源节省70%,阿里云数据库内核版 Apache Doris 在网易日志和时序场景的实践
|
11天前
|
测试技术 Apache
使用 Apache JMeter Flexible File Writer 插件的详细指南
Apache JMeter 是开源性能测试工具,用于负载测试。Flexible File Writer 是一个插件,用于自定义格式记录测试结果。安装该插件需通过 JMeter 的 Plugins Manager。配置时,添加监听器到测试计划,设置输出文件、文件格式及字段。执行测试后,结果将按指定格式写入 CSV 文件。此插件增强了数据记录的灵活性,便于分析和报告。
20 0
|
1月前
|
JSON JavaScript 编译器
同事写的console.log太多令人烦恼?来手撕一个vite插件去掉它
同事写的console.log太多令人烦恼?来手撕一个vite插件去掉它
|
1月前
|
Apache
web服务器(Apache)访问日志(access_log)详细解释
web服务器(Apache)访问日志(access_log)详细解释
|
1月前
|
存储 消息中间件 监控
Zoom 基于Apache Hudi 的流式日志处理实践
Zoom 基于Apache Hudi 的流式日志处理实践
54 1