基于LogStash插件采集数据到阿里云Datahub

简介: DataHub服务基于阿里云自研的飞天平台,具有高可用,低延迟,高可扩展,高吞吐的特点,原生支持对接阿里云的多项服务,相关功能特点与Kafka类似。本身主要介绍如何使用LogStash采集数据写入Datahub。

Step By Step

一、测试环境
Ubuntu16.04
二、Java JDK环境安装与配置

1、更新软件包列表:

sudo apt-get update

2、安装openjdk-8-jdk

sudo apt-get install openjdk-8-jdk

3、查看Java安装情况及版本号

java -version

图片.png

三、安装Ruby(安装插件的时候底层依赖需要)

1、安装

apt install ruby

2、查看版本

ruby -version

图片.png

四、LogStash安装

1、下载

wget https://mirrors.huaweicloud.com/logstash/7.5.0/logstash-7.5.0.tar.gz

注意:目前直接从ElasticSearch官网下载,因为资源在国外,下载会特别慢,建议使用:Logstash 国内加速下载

2、解压

tar -xzvf logstash-7.5.0.tar.gz
五、Datahub 插件安装

1、插件下载

2、修改Gemfile(路径 /logstash-7.5.0/Gemfile)

https://gems.ruby-china.com/

图片.png

3、插件安装

$ {LOG_STASH_HOME}/bin/logstash-plugin install logstash-output-datahub-1.0.8.gem

$ {LOG_STASH_HOME}/bin/logstash-plugin install logstash-input-datahub-1.0.8.gem

$ {LOG_STASH_HOME} 为LogStash的解压路径

六、Datahub控制台创建Topic

图片.png

图片.png

七、测试数据集配置文件准备

1、CSV数据(多行)

1111,1.23456789012E9,true,14321111111000000,string_dataxxx0
2222,2.23456789012E9,false,14321111111000000,string_dataxxx1
3333,1.23456789012E9,true,14321111111000000,string_dataxxx0
4444,2.23456789012E9,false,14321111111000000,string_dataxxx1

2、配置文件:csv-sample2.conf

input {
    file {
        path => "/root/logstash/data2.csv"
        start_position => "beginning"
    }
}
filter{
    csv {
        columns => ['col1', 'col2', 'col3', 'col4', 'col5']
    }
}
output {
    datahub {
        access_id => "LTAIOZZg********"
        access_key => "v7CjUJCMk7j9aK****************"
        endpoint => "https://dh-cn-shanghai.aliyuncs.com"
        project_name => "logstash_project"
        topic_name => "logstash"
        #shard_id => "0"
        #shard_keys => ["thread_id"]
        dirty_data_continue => true
        dirty_data_file => "/root/logstash/data/dirty.data"
        dirty_data_file_max_size => 1000
    }
}
八、运行与查看数据

1、运行

./logstash-7.5.0/bin/logstash -f csv-sample2.conf

2、查看数据
图片.png

参考链接

Logstash 国内加速下载

相关文章
|
4月前
|
SQL 关系型数据库 Java
实时计算 Flink版操作报错之在阿里云DataHub平台上执行SQL查询GitHub新增star仓库Top 3时不显示结果,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
HH
|
Java 开发工具 关系型数据库
阿里云DataHub入门测试
阿里云流数据处理平台DataHub是流式数据(Streaming Data)的处理平台,提供对流式数据的发布 (Publish),订阅 (Subscribe)和分发功能,让您可以轻松构建基于流式数据的分析和应用。DataHub服务可以对各种移动设备,应用软件,网站服务,传感器等产生的大量流式数据进行持续不断的采集,存储和处理。用户可以编写应用程序或者使用流计算引擎来处理写入到DataHub的流式数据比如实时web访问日志、应用日志、各种事件等,并产出各种实时的数据处理结果比如实时图表、报警信息、实时统计等。
HH
541 0
阿里云DataHub入门测试
|
Java Apache Maven
阿里云数据总线(DataHub)使用Flume插件导入数据示例
Flume NG是Cloudera提供的一个分布式、可靠、可用的系统,它能够将不同数据源的海量日志数据进行高效收集、聚合、移动,最后存储到一个中心化数据存储系统中。本文主要演示如何使用Flume-DataHub插件导入数据到阿里云数据总线(DataHub)。
2833 0
阿里云数据总线(DataHub)使用Flume插件导入数据示例
|
SQL 分布式计算 DataWorks
阿里云物联网平台数据转发到DataHub示例
本文主要演示通过规则引擎将消息流转到DataHub,并通过Dataconnector 将消费流转到MaxCompute的表。
阿里云物联网平台数据转发到DataHub示例
|
数据可视化 物联网 关系型数据库
阿里云物联网平台、DataHub、RDS和DataV集成样例
本文通过一个DEMO,演示了如何基于阿里云产品和服务实现设备数据在大屏上显示。在设备端模拟两个点位,通过MQTT协议向阿里云物联网平台设备(高级版)发送数据,物联网平台接收到数据后通过规则引擎转发至DataHub,接着在DataHub中通过DataConnector将数据同步到RDS MySQL数据库中,最终使用DataV将RDS MySQL中数据呈现在大屏上。
3009 0
|
4月前
|
消息中间件 分布式计算 DataWorks
DataWorks常见问题之kafka数据导入datahub失败如何解决
DataWorks是阿里云提供的一站式大数据开发与管理平台,支持数据集成、数据开发、数据治理等功能;在本汇总中,我们梳理了DataWorks产品在使用过程中经常遇到的问题及解答,以助用户在数据处理和分析工作中提高效率,降低难度。
|
10月前
|
SQL 关系型数据库 数据管理
Datahub实践——Sqllineage解析Sql实现端到端数据血缘
Datahub实践——Sqllineage解析Sql实现端到端数据血缘
1208 1
|
数据采集 JSON 关系型数据库
将 MySQL 数据抽取并写入 DataHub,您可以按照以下步骤进行
将 MySQL 数据抽取并写入 DataHub,您可以按照以下步骤进行
480 2
|
3月前
|
数据采集 DataWorks 监控
DataWorks产品使用合集之mysql-cdc读取数据写入到datahub中,datahub如何转换时区
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
37 1
|
4月前
|
存储 监控 Apache
Flink整库同步 如何把数据丢入到 datahub 中
Flink整库同步 如何把数据丢入到 datahub 中