开发者社区> 冶善> 正文

基于Apache Flume Datahub插件将日志数据同步上云

简介: ## 简介 Apache Flume是一个分布式的、可靠的、可用的系统,可用于从不同的数据源中高效地收集、聚合和移动海量日志数据到集中式数据存储系统,支持多种Source和Sink插件。本文将介绍如何使用Apache Flume的[Datahub Sink插件](https://github.
+关注继续查看

本文用到的

阿里云数加-大数据计算服务MaxCompute产品地址:https://www.aliyun.com/product/odps


简介

Apache Flume是一个分布式的、可靠的、可用的系统,可用于从不同的数据源中高效地收集、聚合和移动海量日志数据到集中式数据存储系统,支持多种Source和Sink插件。本文将介绍如何使用Apache Flume的Datahub Sink插件将日志数据实时上传到Datahub。

环境要求

  • JDK (1.7及以上,推荐1.7)
  • Flume-NG 1.x
  • Apache Maven 3.x

插件部署

下载插件压缩包

$ wget http://odps-repo.oss-cn-hangzhou.aliyuncs.com/data-collectors%2Faliyun-flume-datahub-sink-2.0.2.tar.gz

解压插件压缩包

$ tar zxvf flume-datahub-sink-1.1.0.tar.gz
$ ls flume-datahub-sink
lib    libext

部署Datahub Sink插件

将解压后的插件文件夹flume-datahub-sink移动到Apache Flume安装目录下

$ mkdir {YOUR_FLUME_DIRECTORY}/plugins.d
$ mv flume-datahub-sink {YOUR_FLUME_DIRECTORY}/plugins.d/

移动后,核验Datahub Sink插件是否已经在相应目录:

$ ls { YOUR_APACHE_FLUME_DIR }/plugins.d
flume-datahub-sink

配置示例

Flume的原理、架构,以及核心组件的介绍请参考 Flume-ng的原理和使用。本文将构建一个使用Datahub Sink的Flume实例,对日志文件中的结构化数据进行解析,并上传到Datahub Topic中。

需要上传的日志文件格式如下(每行为一条记录,字段之间逗号分隔):

# test_basic.log
some,log,line1
some,log,line2
...

下面将创建Datahub Topic,并把每行日志的第一列和第二列作为一条记录写入Topic中。

创建Datahub Topic

使用Datahub WebConsole创建好Topic,schema为(string c1, string c2),下面假设建好的Topic名为test_topic。

Flume配置文件

在Flume安装目录的conf/文件夹下创建名为datahub_basic.conf的文件,并输入内容如下:

# A single-node Flume configuration for Datahub
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = cat {YOUR_LOG_DIRECTORY}/test_basic.log

# Describe the sink
a1.sinks.k1.type = com.aliyun.datahub.flume.sink.DatahubSink
a1.sinks.k1.datahub.accessID = {YOUR_ALIYUN_DATAHUB_ACCESS_ID}
a1.sinks.k1.datahub.accessKey = {YOUR_ALIYUN_DATAHUB_ACCESS_KEY}
a1.sinks.k1.datahub.endPoint = {YOUR_ALIYUN_DATAHUB_END_POINT}
a1.sinks.k1.datahub.project = test_project
a1.sinks.k1.datahub.topic = test_topic
a1.sinks.k1.batchSize = 1
a1.sinks.k1.serializer = DELIMITED
a1.sinks.k1.serializer.delimiter = ,
a1.sinks.k1.serializer.fieldnames = c1,c2,
a1.sinks.k1.serializer.charset = UTF-8
a1.sinks.k1.shard.number = 1
a1.sinks.k1.shard.maxTimeOut = 60

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 1000

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

这里serializer配置指定了以逗号分隔的形式将输入源解析成三个字段,并忽略第三个字段。

启动Flume

配置完成后,启动Flume并指定agent的名称和配置文件路径,添加-Dflume.root.logger=INFO,console选项可以将日志实时输出到控制台。

$ cd {YOUR_FLUME_DIRECTORY}
$ bin/flume-ng agent -n a1 -c conf -f conf/datahub_basic.conf -Dflume.root.logger=INFO,console

写入成功,显示日志如下:

...
Write success. Event count: 2
...

数据使用

日志数据通过Flume上传到Datahub后,可以使用StreamCompute流计算来进行实时分析,例如对于一些Web网站的日志,可以实时统计各个页面的PV/UV等。另外,导入Datahub的数据也可以配置Connector将数据归档至MaxCompute中,方便后续的离线分析。

对于数据归档MaxCompute的场景,一般来说需要将数据进行分区。Datahub到MaxCompute的归档可以根据MaxCompute表的分区字段自动创建分区,前提是要求MaxCompute和Datahub的字段名以及类型可以完全对应上。如果需要根据日志的传输时间自动设置分区,则在上面的例子中需要指定MaxCompute的分区相应字段和时间格式,例如按小时自动创建分区,添加的配置如下:

a1.sinks.k1.maxcompute.partition.columns = pt
a1.sinks.k1.maxcompute.partition.values = %Y%m%d%H

注意:pt这个字段需要在Datahub Topic以及MaxCompute表中都存在,且是表的分区字段。

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

相关文章
SpringBoot2 配置多数据源,整合MybatisPlus增强插件
本文源码:GitHub·点这里 || GitEE·点这里 一、项目案例简介 1、多数据简介 实际的项目中,经常会用到不同的数据库以满足项目的实际需求。随着业务的并发量的不断增加,一个项目使用多个数据库:主从复制、读写分离、分布式数据库等方式,越来越常见。
2625 0
k8s与日志--采用golang实 现Fluent Bit的output插件
s" "time" "unsafe" "github.com/Shopify/sarama" "github.com/fluent/fluent-bit-go/output" "github.com/ugorji/go/codec" ) var ( brokers []string producer sarama.
2016 0
基于Tablestore 实现大规模订单系统海量订单/日志数据分类存储的实践
前言:从最早的互联网高速发展、到移动互联网的爆发式增长,再到今天的产业互联网、物联网的快速崛起,各种各样新应用、新系统产生了众多订单类型的需求,比如电商购物订单、银行流水、运营商话费账单、外卖订单、设备信息等,产生的数据种类和数据量越来越多;其中订单系统就是一个非常广泛、通用的系统。而随着数据规模的快速增长、大数据技术的发展、运营水平的不断提高,包括数据消费的能力要求越来越高,这对支撑订单系统的数据库设计、存储系统也提出了更多的要求。在新的需求下,传统的经典架构面临着诸多挑战,需要进一步思考架构优化,以更好支撑业务发展;
234 0
【DATAGUARD】手工恢复备库日志中断
1、在备库检查日志缺失 FAL[client]: Failed to request gap sequence  GAP - thread 1 sequence 53415-53434  DBID 424533136 branch 710350416 FAL[c...
593 0
【DataGuard】10GR 日志传输服务参数
日志的传输以及应用可以算作是Dataguard的核心所在.在我们搭建DG的过程中,如何配置优化日志传输服务,关系到整个DG体系的性能以及可用性.而且,不同的保护模式也需要不用的参数组合.
620 0
Visual Studio 2008中如何比较二个数据库的架构【Schema】和数据【Data】并同步
使用场景: 在团队开发中,每一个人都有可能随时更新数据库,这时候数据库中数据和架构等信息都会发生变化。如果更新不及时,就会发生数据错误或数据丢失的风险,影响团队的开发效率和 项目进度,这时候我们该怎么办呢?VS2008 Team System版本中就提供了解决这个问题的工具。
728 0
+关注
冶善
阿里巴巴
10
文章
3
问答
来源圈子
更多
MaxCompute(原ODPS)是一项面向分析的大数据计算服务,它以Serverless架构提供快速、全托管的在线数据仓库服务,消除传统数据平台在资源扩展性和弹性方面的限制,最小化用户运维投入,使您经济并高效的分析处理海量数据。
+ 订阅
相关文档: MaxCompute
文章排行榜
最热
最新
相关电子书
更多
OceanBase 入门到实战教程
立即下载
阿里云图数据库GDB,加速开启“图智”未来.ppt
立即下载
实时数仓Hologres技术实战一本通2.0版(下)
立即下载