阿里云数据总线(DataHub)使用Flume插件导入数据示例

简介: Flume NG是Cloudera提供的一个分布式、可靠、可用的系统,它能够将不同数据源的海量日志数据进行高效收集、聚合、移动,最后存储到一个中心化数据存储系统中。本文主要演示如何使用Flume-DataHub插件导入数据到阿里云数据总线(DataHub)。

Step By Step

主要操作步骤

1、Java环境安装
2、Apache Maven安装
3、Flume-NG安装
4、配置导入数据


一、JAVA环境安装

1、更新软件包列表

sudo apt-get update

2、安装openjdk-8-jdk

sudo apt-get install openjdk-8-jdk

3、查看java版本,看看是否安装成功

java -version

图片.png

二、Apache Maven安装

1、安装

apt install maven

2、查看安装版本

mvn -v

图片.png

三、Flume-NG安装

1、flume下载
wget https://downloads.apache.org/flume/1.9.0/apache-flume-1.9.0-bin.tar.gz

2、解压

tar zxvf apache-flume-1.9.0-bin.tar.gz

图片.png

3、下载flume-datahub插件
wget https://aliyun-datahub.oss-cn-hangzhou.aliyuncs.com/tools/aliyun-flume-datahub-sink-2.0.4.tar.gz

4、解压flume插件并放在${FLUME_HOME}/plugins.d目录下(本示例${FLUME_HOME}值为:apache-flume-1.9.0-bin)

tar -zxvf aliyun-flume-datahub-sink-2.0.4.tar.gz
mkdir apache-flume-1.9.0-bin/plugins.d
mv aliyun-flume-datahub-sink apache-flume-1.9.0-bin/plugins.d

5、安装效果查看

apache-flume-1.9.0-bin/bin/flume-ng version

图片.png

四、配置导入数据

1、数据文件(demo.txt)

0,YxCOHXcst1NlL5ebJM9YmvQ1f8oy8neb3obdeoS0,true,1254275.1144629316,1573206062763,1254275.1144637289
0,YxCOHXcst1NlL5ebJM9YmvQ1f8oy8neb3obdeoS0,true,1254275.1144629316,1573206062763,1254275.1144637289
1,hHVNjKW5DsRmVXjguwyVDjzjn60wUcOKos9Qym0V,false,1254275.1144637289,1573206062763,1254275.1144637289
2,vnXOEuKF4Xdn5WnDCPbzPwTwDj3k1m3rlqc1vN2l,true,1254275.1144637289,1573206062763,1254275.1144637289
3,t0AGT8HShzroBVM3vkP37fIahg2yDqZ5xWfwDFJs,false,1254275.1144637289,1573206062763,1254275.1144637289
4,MKwZ1nczmCBp6whg1lQeFLZ6E628lXvFncUVcYWI,true,1254275.1144637289,1573206062763,1254275.1144637289
5,bDPQJ656xvPGw1PPjhhTUZyLJGILkNnpqNLaELWV,false,1254275.1144637289,1573206062763,1254275.1144637289
6,wWF7i4X8SXNhm4EfClQjQF4CUcYQgy3XnOSz0StX,true,1254275.1144637289,1573206062763,1254275.1144637289
7,whUxTNREujMP6ZrAJlSVhCEKH1KH9XYJmOFXKbh8,false,1254275.1144637289,1573206062763,1254275.1144637289
8,OYcS1WkGcbZFbPLKaqU5odlBf7rHDObkQJdBDrYZ,true,1254275.1144637289,1573206062763,1254275.1144637289

2、DataHub Topic Schema

字段名称 字段类型
id BIGINT
name STRING
gender BOOLEAN
salary DOUBLE
my_time TIMESTAMP
decimal DECIMAL

图片.png

3、配置文件

# A single-node Flume configuration for Datahub
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = cat /root/flume/demo.txt
# Describe the sink
a1.sinks.k1.type = com.aliyun.datahub.flume.sink.DatahubSink
a1.sinks.k1.datahub.accessId = LTAIOZZ******
a1.sinks.k1.datahub.accessKey = v7CjUJCMk7j9aKdu************
a1.sinks.k1.datahub.endPoint = https://dh-cn-shanghai.aliyuncs.com
a1.sinks.k1.datahub.project = flume_project
a1.sinks.k1.datahub.topic = flume
a1.sinks.k1.serializer = DELIMITED
a1.sinks.k1.serializer.delimiter = ,
a1.sinks.k1.serializer.fieldnames = id,name,gender,salary,my_time,decimal
a1.sinks.k1.serializer.charset = UTF-8
a1.sinks.k1.datahub.retryTimes = 5
a1.sinks.k1.datahub.retryInterval = 5
a1.sinks.k1.datahub.batchSize = 100
a1.sinks.k1.datahub.batchTimeout = 5
a1.sinks.k1.datahub.enablePb = true
a1.sinks.k1.datahub.compressType = DEFLATE
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

4、测试效果(本地测试按照自己实际文件路径配置即可)

apache-flume-1.9.0-bin/bin/flume-ng agent -n a1 -c conf -f datahub.conf -Dflume.root.logger=INFO,console

图片.png

图片.png

参考链接

Flume插件
Flume-ng 的原理和使用

相关文章
|
6月前
|
SQL 关系型数据库 Java
实时计算 Flink版操作报错之在阿里云DataHub平台上执行SQL查询GitHub新增star仓库Top 3时不显示结果,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
HH
|
Java 开发工具 关系型数据库
阿里云DataHub入门测试
阿里云流数据处理平台DataHub是流式数据(Streaming Data)的处理平台,提供对流式数据的发布 (Publish),订阅 (Subscribe)和分发功能,让您可以轻松构建基于流式数据的分析和应用。DataHub服务可以对各种移动设备,应用软件,网站服务,传感器等产生的大量流式数据进行持续不断的采集,存储和处理。用户可以编写应用程序或者使用流计算引擎来处理写入到DataHub的流式数据比如实时web访问日志、应用日志、各种事件等,并产出各种实时的数据处理结果比如实时图表、报警信息、实时统计等。
HH
558 0
阿里云DataHub入门测试
|
Java 测试技术 Ruby
基于LogStash插件采集数据到阿里云Datahub
DataHub服务基于阿里云自研的飞天平台,具有高可用,低延迟,高可扩展,高吞吐的特点,原生支持对接阿里云的多项服务,相关功能特点与Kafka类似。本身主要介绍如何使用LogStash采集数据写入Datahub。
1002 0
基于LogStash插件采集数据到阿里云Datahub
|
SQL 分布式计算 DataWorks
阿里云物联网平台数据转发到DataHub示例
本文主要演示通过规则引擎将消息流转到DataHub,并通过Dataconnector 将消费流转到MaxCompute的表。
阿里云物联网平台数据转发到DataHub示例
|
大数据 测试技术 流计算
|
6月前
|
消息中间件 分布式计算 DataWorks
DataWorks常见问题之kafka数据导入datahub失败如何解决
DataWorks是阿里云提供的一站式大数据开发与管理平台,支持数据集成、数据开发、数据治理等功能;在本汇总中,我们梳理了DataWorks产品在使用过程中经常遇到的问题及解答,以助用户在数据处理和分析工作中提高效率,降低难度。
|
SQL 关系型数据库 数据管理
Datahub实践——Sqllineage解析Sql实现端到端数据血缘
Datahub实践——Sqllineage解析Sql实现端到端数据血缘
1441 1
|
数据采集 JSON 关系型数据库
将 MySQL 数据抽取并写入 DataHub,您可以按照以下步骤进行
将 MySQL 数据抽取并写入 DataHub,您可以按照以下步骤进行
546 2
|
5月前
|
数据采集 DataWorks 监控
DataWorks产品使用合集之mysql-cdc读取数据写入到datahub中,datahub如何转换时区
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
60 1