阿里云数据总线(DataHub)使用Flume插件导入数据示例

简介: Flume NG是Cloudera提供的一个分布式、可靠、可用的系统,它能够将不同数据源的海量日志数据进行高效收集、聚合、移动,最后存储到一个中心化数据存储系统中。本文主要演示如何使用Flume-DataHub插件导入数据到阿里云数据总线(DataHub)。

Step By Step

主要操作步骤

1、Java环境安装
2、Apache Maven安装
3、Flume-NG安装
4、配置导入数据


一、JAVA环境安装

1、更新软件包列表

sudo apt-get update

2、安装openjdk-8-jdk

sudo apt-get install openjdk-8-jdk

3、查看java版本,看看是否安装成功

java -version

图片.png

二、Apache Maven安装

1、安装

apt install maven

2、查看安装版本

mvn -v

图片.png

三、Flume-NG安装

1、flume下载
wget https://downloads.apache.org/flume/1.9.0/apache-flume-1.9.0-bin.tar.gz

2、解压

tar zxvf apache-flume-1.9.0-bin.tar.gz

图片.png

3、下载flume-datahub插件
wget https://aliyun-datahub.oss-cn-hangzhou.aliyuncs.com/tools/aliyun-flume-datahub-sink-2.0.4.tar.gz

4、解压flume插件并放在${FLUME_HOME}/plugins.d目录下(本示例${FLUME_HOME}值为:apache-flume-1.9.0-bin)

tar -zxvf aliyun-flume-datahub-sink-2.0.4.tar.gz
mkdir apache-flume-1.9.0-bin/plugins.d
mv aliyun-flume-datahub-sink apache-flume-1.9.0-bin/plugins.d

5、安装效果查看

apache-flume-1.9.0-bin/bin/flume-ng version

图片.png

四、配置导入数据

1、数据文件(demo.txt)

0,YxCOHXcst1NlL5ebJM9YmvQ1f8oy8neb3obdeoS0,true,1254275.1144629316,1573206062763,1254275.1144637289
0,YxCOHXcst1NlL5ebJM9YmvQ1f8oy8neb3obdeoS0,true,1254275.1144629316,1573206062763,1254275.1144637289
1,hHVNjKW5DsRmVXjguwyVDjzjn60wUcOKos9Qym0V,false,1254275.1144637289,1573206062763,1254275.1144637289
2,vnXOEuKF4Xdn5WnDCPbzPwTwDj3k1m3rlqc1vN2l,true,1254275.1144637289,1573206062763,1254275.1144637289
3,t0AGT8HShzroBVM3vkP37fIahg2yDqZ5xWfwDFJs,false,1254275.1144637289,1573206062763,1254275.1144637289
4,MKwZ1nczmCBp6whg1lQeFLZ6E628lXvFncUVcYWI,true,1254275.1144637289,1573206062763,1254275.1144637289
5,bDPQJ656xvPGw1PPjhhTUZyLJGILkNnpqNLaELWV,false,1254275.1144637289,1573206062763,1254275.1144637289
6,wWF7i4X8SXNhm4EfClQjQF4CUcYQgy3XnOSz0StX,true,1254275.1144637289,1573206062763,1254275.1144637289
7,whUxTNREujMP6ZrAJlSVhCEKH1KH9XYJmOFXKbh8,false,1254275.1144637289,1573206062763,1254275.1144637289
8,OYcS1WkGcbZFbPLKaqU5odlBf7rHDObkQJdBDrYZ,true,1254275.1144637289,1573206062763,1254275.1144637289

2、DataHub Topic Schema

字段名称 字段类型
id BIGINT
name STRING
gender BOOLEAN
salary DOUBLE
my_time TIMESTAMP
decimal DECIMAL

图片.png

3、配置文件

# A single-node Flume configuration for Datahub
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = cat /root/flume/demo.txt
# Describe the sink
a1.sinks.k1.type = com.aliyun.datahub.flume.sink.DatahubSink
a1.sinks.k1.datahub.accessId = xxxxxx
a1.sinks.k1.datahub.accessKey = xxxxxx
a1.sinks.k1.datahub.endPoint = https://dh-cn-shanghai.aliyuncs.com
a1.sinks.k1.datahub.project = flume_project
a1.sinks.k1.datahub.topic = flume
a1.sinks.k1.serializer = DELIMITED
a1.sinks.k1.serializer.delimiter = ,
a1.sinks.k1.serializer.fieldnames = id,name,gender,salary,my_time,decimal
a1.sinks.k1.serializer.charset = UTF-8
a1.sinks.k1.datahub.retryTimes = 5
a1.sinks.k1.datahub.retryInterval = 5
a1.sinks.k1.datahub.batchSize = 100
a1.sinks.k1.datahub.batchTimeout = 5
a1.sinks.k1.datahub.enablePb = true
a1.sinks.k1.datahub.compressType = DEFLATE
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

4、测试效果(本地测试按照自己实际文件路径配置即可)

apache-flume-1.9.0-bin/bin/flume-ng agent -n a1 -c conf -f datahub.conf -Dflume.root.logger=INFO,console

图片.png

图片.png

参考链接

Flume插件
Flume-ng 的原理和使用

相关文章
|
Java 关系型数据库 MySQL
兴奋!阿里巴巴首推“Java进阶必备宝典”,理论到实战,一键搞定
作为一名Java方向的程序员,打好夯实的基础是非常重要的,现在大厂面试对于程序员基础知识的掌握考察也越来越严格,虽然说现在技术更新比较快,但基础扎实才能够更深入的去理解每一个知识技术点。
|
11月前
|
数据采集 人工智能 JSON
Crawl4AI:为大语言模型打造的开源网页数据采集工具
随着大语言模型(LLMs)的快速发展,高质量数据成为智能系统的关键基础。**Crawl4AI**是一款专为LLMs设计的开源网页爬取工具,可高效提取并结构化处理网页数据,突破传统API限制,支持JSON、HTML或Markdown等格式输出。
943 3
Crawl4AI:为大语言模型打造的开源网页数据采集工具
|
6月前
|
机器学习/深度学习 XML 人工智能
怎能在国产规则引擎中使用PMML模型
本文介绍了如何在国产Together规则引擎中导入和处理PMML模型,详细演示了将机器学习模型集成到DMN决策流程中的步骤。通过图文教程,帮助用户快速掌握PMML模型的调用与应用。
|
存储 SQL 监控
Dynamic Table快速入门
本次分享的主题是Dynamic Table快速入门,由Hologres PD 梅酱分享。今天的分享分为三个部分。首先,第一部分为Table的基本概念;第二部分进行Table的实操;第三部分为一些使用Table的建议和最佳实践。
521 12
|
SQL 分布式计算 调度
实时数仓 Hologres操作报错合集之在与PostgreSOL数据库进行通信时出现报错,如何解决
实时数仓Hologres是阿里云推出的一款高性能、实时分析的数据库服务,专为大数据分析和复杂查询场景设计。使用Hologres,企业能够打破传统数据仓库的延迟瓶颈,实现数据到决策的无缝衔接,加速业务创新和响应速度。以下是Hologres产品的一些典型使用场景合集。
|
Python
Pygame AttributeError no attribute ‘display‘问题及其解决方法
Pygame AttributeError no attribute ‘display‘问题及其解决方法
586 4
|
API vr&ar Android开发
一文搞懂ARKit,ARCore,RealityKit和Vuforia
AR框架的一个核心是含能够理解特征的场景和各种类型的锚点。锚点有很多种,可根据特定场景来摆放3D模型 ,甚至可以真人来做距离测量(ARBodyAnchor)。并且这些框架不容忽视的优势是具有进行场景重建和人形遮挡(occlusion)的32位尝试数据。
1987 4
|
编解码 文字识别 安全
印刷文字操作报错合集之出现“图片和服务类型不匹配”,该怎么解决
在使用印刷文字识别(OCR)技术过程中,可能会遇到各种错误或问题。以下是一些常见的报错情况及其可能的原因和解决建议。包括但不限于:1.识别率低,错误多、2.无法识别特定字符或字体、3.文件格式不支持、4.内存或资源不足、5.网络连接问题、6.API调用限制或授权问题、7.语言识别错误、8.安全与隐私问题。
1054 0
充值后为什么还显示停机?
充值后为什么还显示停机?
1655 1
|
安全 Java Shell
Docker实战 | 第四篇:Docker启用TLS加密解决暴露2375端口引发的安全漏洞,被黑掉三台云主机的教训总结
Docker实战 | 第四篇:Docker启用TLS加密解决暴露2375端口引发的安全漏洞,被黑掉三台云主机的教训总结

热门文章

最新文章