基于LogStash插件采集数据到阿里云Datahub

简介: DataHub服务基于阿里云自研的飞天平台,具有高可用,低延迟,高可扩展,高吞吐的特点,原生支持对接阿里云的多项服务,相关功能特点与Kafka类似。本身主要介绍如何使用LogStash采集数据写入Datahub。

Step By Step

一、测试环境
Ubuntu16.04
二、Java JDK环境安装与配置

1、更新软件包列表:

sudo apt-get update

2、安装openjdk-8-jdk

sudo apt-get install openjdk-8-jdk

3、查看Java安装情况及版本号

java -version

图片.png

三、安装Ruby(安装插件的时候底层依赖需要)

1、安装

apt install ruby

2、查看版本

ruby -version

图片.png

四、LogStash安装

1、下载

wget https://mirrors.huaweicloud.com/logstash/7.5.0/logstash-7.5.0.tar.gz

注意:目前直接从ElasticSearch官网下载,因为资源在国外,下载会特别慢,建议使用:Logstash 国内加速下载

2、解压

tar -xzvf logstash-7.5.0.tar.gz
五、Datahub 插件安装

1、插件下载

2、修改Gemfile(路径 /logstash-7.5.0/Gemfile)

https://gems.ruby-china.com/

图片.png

3、插件安装

$ {LOG_STASH_HOME}/bin/logstash-plugin install logstash-output-datahub-1.0.8.gem

$ {LOG_STASH_HOME}/bin/logstash-plugin install logstash-input-datahub-1.0.8.gem

$ {LOG_STASH_HOME} 为LogStash的解压路径

六、Datahub控制台创建Topic

图片.png

图片.png

七、测试数据集配置文件准备

1、CSV数据(多行)

1111,1.23456789012E9,true,14321111111000000,string_dataxxx0
2222,2.23456789012E9,false,14321111111000000,string_dataxxx1
3333,1.23456789012E9,true,14321111111000000,string_dataxxx0
4444,2.23456789012E9,false,14321111111000000,string_dataxxx1

2、配置文件:csv-sample2.conf

input {
    file {
        path => "/root/logstash/data2.csv"
        start_position => "beginning"
    }
}
filter{
    csv {
        columns => ['col1', 'col2', 'col3', 'col4', 'col5']
    }
}
output {
    datahub {
        access_id => "LTAIOZZg********"
        access_key => "v7CjUJCMk7j9aK****************"
        endpoint => "https://dh-cn-shanghai.aliyuncs.com"
        project_name => "logstash_project"
        topic_name => "logstash"
        #shard_id => "0"
        #shard_keys => ["thread_id"]
        dirty_data_continue => true
        dirty_data_file => "/root/logstash/data/dirty.data"
        dirty_data_file_max_size => 1000
    }
}
八、运行与查看数据

1、运行

./logstash-7.5.0/bin/logstash -f csv-sample2.conf

2、查看数据
图片.png

参考链接

Logstash 国内加速下载

相关文章
|
7月前
|
消息中间件 分布式计算 DataWorks
DataWorks常见问题之kafka数据导入datahub失败如何解决
DataWorks是阿里云提供的一站式大数据开发与管理平台,支持数据集成、数据开发、数据治理等功能;在本汇总中,我们梳理了DataWorks产品在使用过程中经常遇到的问题及解答,以助用户在数据处理和分析工作中提高效率,降低难度。
|
SQL 关系型数据库 数据管理
Datahub实践——Sqllineage解析Sql实现端到端数据血缘
Datahub实践——Sqllineage解析Sql实现端到端数据血缘
1550 1
|
数据采集 JSON 关系型数据库
将 MySQL 数据抽取并写入 DataHub,您可以按照以下步骤进行
将 MySQL 数据抽取并写入 DataHub,您可以按照以下步骤进行
579 2
|
6月前
|
数据采集 DataWorks 监控
DataWorks产品使用合集之mysql-cdc读取数据写入到datahub中,datahub如何转换时区
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
66 1
|
7月前
|
SQL 关系型数据库 Java
实时计算 Flink版操作报错之在阿里云DataHub平台上执行SQL查询GitHub新增star仓库Top 3时不显示结果,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
7月前
|
存储 监控 Apache
Flink整库同步 如何把数据丢入到 datahub 中
Flink整库同步 如何把数据丢入到 datahub 中
|
数据采集 大数据 数据挖掘
企业级数据治理工作怎么开展?Datahub这样做
企业级数据治理工作怎么开展?Datahub这样做
191 0
|
数据采集 JSON 关系型数据库
将 MySQL 数据抽取并写入 DataHub
将 MySQL 数据抽取并写入 DataHub
277 3
|
Java API Maven
Fink在处理DataHub数据源时无法正确识别RecordData类的字段
Fink在处理DataHub数据源时无法正确识别RecordData类的字段
113 1
|
JSON 物联网 数据格式
物联网平台数据流转到datahub时报错
记录一次物联网平台数据流转到datahub时的报错
555 0
物联网平台数据流转到datahub时报错

热门文章

最新文章