使用 Logstash 导入流式数据|学习笔记

简介: 快速学习使用 Logstash 导入流式数据

开发者学堂课程【阿里云 DataHub 使用教程使用 Logstash 导入流式数据】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/429/detail/5367


使用 Logstash 导入流式数据


 如何使用 Logstash 上传数据到 Datahub 中

1、首先,进入到 Datahub 的控制台,

(https://datahub.console.aliyun.com),可看到之前创建的 project。

2、进入后创建一个能上传数据的 topic ,Topic 名称为 longstash_test,准备了五个字段,因此 topic 也有五个字段,Shard 数量为1,备注 longstash。(如下图)

image.png

创建成功后,点击查看 topic,数据量为0。

3、进入迷你行终端,可先下载

logstash-with-datahub-2.3.0.tar.gz的包,包里包含 datahub 插件,可解压出来,进入已解压好的目录中,准备好的配置文件包含 input,表示上传数据文件的位置

Input {

fail {

path=>”/Users/wz/workspace/Logstash-with-datahub-2.3.0/sample_conf/datahub_type.data”

start_position=>”beginning”//从文件开始处上传

sincedb_path=>”/tmp/.sincedb_test”//表示文件上传到哪里

}

}

filter是 csv 的插件

filter {

csv {

## change to you own columns

columns => { ‘f_bigint”, ‘f_double’, ‘f_boolean’, ‘f_timestamp’, ‘f_string’ }//表示把数据文件分成五个字段,每个字段的名字即 topic 定义的五个字段的名字

}

}

Output 插件

output {

datahub {

acess_id=>”your acessId”

acess_key=>”your acesskey”

endpoint=>”http://dh_cn_hangzhou.aliyuncs.com

project_name=>”test_dh1”

topic_name=>”logstash_test”

}//表示输出到 datahub 哪一个 topic

继续上传数据,亦包含五个字段,每一行用逗号分隔的五段数据

例:

1001,1.23456789012E9,true,14321111111,test_string_filed

其中1001为bigint,1.23456789012E9为dubbo,true 为buling,14321111111为typestep,test_string_filed为string。

启动 logstash

./bin/logstash-f sample_conf/datahub-type-data.conf.ak-verbose

已经把数据文件的40行 put 到文件里。

4、查看控制台

数据为40条,点击数据抽样,数据已上传

5、在数据文件中加入数据

Logstash 已检测出数据变动,把数据也写到 datahub 中。查看控制台,有41条数据,进行数据抽样,最后的数据是1041。

相关文章
|
7月前
|
数据库连接 数据库
kettle开发篇-流查询
kettle开发篇-流查询
175 0
|
消息中间件 Kafka API
数据管道 Logstash 入门(上)
数据管道 Logstash 入门
117 0
|
6月前
|
DataWorks Java 调度
DataWorks产品使用合集之进行离线同步时,如何使用DataX的Reader插件来实现源端过滤
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
81 0
DataWorks产品使用合集之进行离线同步时,如何使用DataX的Reader插件来实现源端过滤
|
7月前
|
自然语言处理 测试技术 网络安全
ElasticSearch7最新实战文档-附带logstash同步方案
ElasticSearch7最新实战文档-附带logstash同步方案
103 0
|
7月前
|
SQL Oracle 关系型数据库
实时计算 Flink版产品使用合集之采集选择增量(latest)读取模式,是否可以使用动态加载表功能
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
7月前
|
存储 分布式计算 关系型数据库
Elasticsearch 8.X 导出 CSV 多种方案,一网打尽!
Elasticsearch 8.X 导出 CSV 多种方案,一网打尽!
98 0
|
消息中间件 存储 NoSQL
数据管道 Logstash 入门(下)
数据管道 Logstash 入门
105 0
|
存储 消息中间件 编解码
Logstash收集多数据源数据神器
Logstash收集多数据源数据神器
370 0
Logstash收集多数据源数据神器
|
SQL 机器学习/深度学习 分布式计算
离线数仓之Kerberos基本使用及问题记录
离线数仓之Kerberos基本使用及问题记录
195 0
离线数仓之Kerberos基本使用及问题记录
|
canal 数据采集 关系型数据库
Elastic实战:通过pipeline实现mysql同步数据到es的数据预处理
首先canal是支持自定义客户端的,需要引入如下依赖,这种方式适合数据转换规则比较复杂,具有强定制性的场景,但是考虑到我这里还要做logstash的数据同步,因此需要一个比较通用的方式来实现数据转换处理,因此我用到了es的pipeline来做预处理
271 0
Elastic实战:通过pipeline实现mysql同步数据到es的数据预处理