Logstash + DataHub + MaxCompute/StreamCompute 进行实时数据分析-阿里云开发者社区

开发者社区> wangzhuo.wz> 正文

Logstash + DataHub + MaxCompute/StreamCompute 进行实时数据分析

简介: Logstash是一款开源日志收集处理框架,logstash-output-datahub插件,实现将数据导入DataHub的功能。通过简单的配置即可完成数据采集和向DataHub的传输,结合MaxCompute/StreamCompute可以轻松构建流式数据从采集到分析的一站式解决方案。
+关注继续查看

Logstash是一款开源日志收集处理框架,有各种不同的input、filter、output插件,用户使用这些插件可以将各种数据源导入到其他系统。
logstash-output-datahub插件,实现将数据导入DataHub的功能,通过简单的配置即可完成数据采集和向DataHub的传输任务。
结合StreamCompute(Galaxy)用户可以方便的完成流式数据从采集,传输,开发到结果数据存储与展示的整套解决方案。
同时,还可以通过创建Collector同步任务将数据同步到MaxCompute(ODPS),之后在MaxCompute上进行完备的数据开发工作。

接下来,会将各个流程步骤在文章中作详细描述,以帮助用户使用Logstash+DataHub+StreamCompute/MaxCompute快速构建起自己的流式数据应用。

数据通道

DataHub服务是阿里云的基于飞天开发的pubsub服务;
创建用于数据采集与传输的DataHub Topic是我们的第一步。

Endpoint列表

公共云DataHub服务Endpoint列表:

公有网络 经典网络ECS Endpoint VPC ECS Endpoint
http://dh-cn-hangzhou.aliyuncs.com http://dh-cn-hangzhou.aliyun-inc.com http://dh-cn-hangzhou-vpc.aliyuncs.com

基本概念

首先,明确DataHub中的几个概念,具体可参见DataHub基本概念

  • Shard: Shard表示对一个Topic进行数据传输的并发通道,每个Shard会有对应的ID。每个Shard会有多种状态 : "Opening" - 启动中,"Active" - 启动完成可服务
  • Lifecycle: 表示一个Topic中写入数据可以保存的时间,以天为单位
  • Record: 用户数据和DataHub服务端交互的基本单位
  • Schema: 描述Record必须遵守的格式,以及每个字段的类型,包括:bigint、string、boolean、double和timestamp

创建Topic

目前DataHub提供的工具包括Datahub Java SDK和DataHub webconsole,另外console还处于试用阶段,若有需要可联系我们提供。

  • Webconsole

用户可在webconsole上完成对所属资源的基本操作,包括创建、查看、删除Topic以及数据抽样等。在webconsole中创建Topic如下所示:

创建Topic

  • SDK

依次调用以下接口来完成Project和Topic的创建,SDK的一些基本接口可参考SDK基本说明

<dependency>
  <groupId>com.aliyun.datahub</groupId>
  <artifactId>aliyun-sdk-datahub</artifactId>
  <version>2.3.0-public</version>
</dependency>
 
public class DatahubClient {
    /**
     * 初始化DatahubClient,
     * @param conf Datahub的配置信息,包括用户的账号信息和datahub endpoint
     */
    public DatahubClient(DatahubConfiguration conf);
    
    /**
     * 创建Datahub topic
     * @param projectName 该topic所属的project
     * @param topicName 要创建的topic名字
     * @param shardCount 指定该topic的shard数量
     * @param lifeCycle 数据回收时间
     * @param recordType 该topic的record类型,包括TUPLE和BLOB
     * @param recordSchema 当recordType为TUPLE时,需要指定schema
     * @param desc topic的描述信息
     */
    public createTopic(String projectName, String topicName, int shardCount, int lifeCycle, RecordType recordType, RecordSchema recordSchema, String desc);
}

数据采集

由于DataHub提供创建具有schema的Topic的功能,所以用户在使用logstash将数据采集到datahub时,可同时完成对原始数据清洗工作。这样在后续的数据分析工作中,用户能更加方便的进行数据开发。

安装

$ {LOG_STASH_HOME}/bin/plugin install --local logstash-output-datahub-1.0.0.gem
  • 直接下载免安装版logstash(下载地址)。 解压即可使用。
$ tar -xzvf logstash-with-datahub-2.3.0.tar.gz
$ cd logstash-with-datahub-2.3.0

配置信息

我们以一条典型的日志为例,说明如何配置logstash和datahub topic.

示例日志为:

20:04:30.359 [qtp1453606810-20] INFO  AuditInterceptor - [13pn9kdr5tl84stzkmaa8vmg] end /web/v1/project/fhp4clxfbu0w3ym2n7ee6ynh/statistics?executionName=bayes_poc_test GET, 187 ms

对应的Datahub Topic的schema定义为:

字段名称 字段类型
request_time STRING
thread_id STRING
log_level STRING
class_name STRING
request_id STRING
detail STRING

Logstash配置文件为:

input {
    file {
        path => "${APP_HOME}/log/bayes.log"
        start_position => "beginning"
    }
}

filter{
    # 对每一条日志message进行分割,并将各分片指定对应的tag
    # 若将整条日志作为Topic的一个字段,可创建只包含(message string)字段的Topic,从而不用配置grok filter
    grok {
        match => {
           "message" => "(?<request_time>\d\d:\d\d:\d\d\.\d+)\s+\[(?<thread_id>[\w\-]+)\]\s+(?<log_level>\w+)\s+(?<class_name>\w+)\s+\-(?<detail>.+)"
        }
    }
}

output {
    datahub {
        access_id => ""
        access_key => ""
        endpoint => ""
        project_name => "project"
        topic_name => "topic"
        #shard_id => "0"
        #shard_keys => ["thread_id"]
        dirty_data_continue => true
        dirty_data_file => "/Users/u1/trash/dirty.data"
        dirty_data_file_max_size => 1000
    }
}

启动logstash数据采集

使用命令启动logstash开始数据采集

logstash -f 上述配置文件地址

可使用参数 -b 指定每次batch大小,即每次请求的记录条数,可进行性能调试

# 缓存1000条数据后发送,不指定时默认为125(logstash的默认配置)
logstash -f 上述配置文件地址 -b 1000

数据分析

目前DataHub和计算引擎StreamCompute(Galaxy)和MaxCompute(ODPS)已打通。

在StreamCompute中,可以通过配置DataHub数据源,直接进行数据开发,写入DataHub的数据会被StreamCompute订阅并进行实时计算。

同时,通过创建同步到MaxCompute的Collector,可以将DataHub数据同步到MaxCompute,从而在MaxCompute中进行数据开发。

StreamCompute

在StreamCompute中注册DataHub数据源(帮助文档)

在StreamCompute中查看或使用DataHub数据(帮助文档)

MaxCompute

可以通过创建Connector,将DataHub数据导入到MaxCompute(ODPS).
在Webconsole创建Connector是一件方便的事情,(webconsole地址)。如果有很多topic或者topic的field很多,不方便在页面上手动操作,也可以使用SDK。

创建Connector

创建Connector之前,用户必须已创建好MaxCompute的Table,并且所使用的账号必须具备该MaxCompute Project的CreateInstance权限和归档ODPS表的Desc、Alter、Update权限。
在webconsole创建Connector步骤可参考创建Connector.

欢迎加入MaxCompute钉钉群讨论
42559c7dde62e4d333c90e02efdf416257a4be27_jpeg

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

相关文章
【译】Databricks使用Spark Streaming和Delta Lake对流式数据进行数据质量监控介绍
本文主要对Databricks如何使用Spark Streaming和Delta Lake对流式数据进行数据质量监控的方法和架构进行了介绍,本文探讨了一种数据管理架构,该架构可以在数据到达时,通过主动监控和分析来检测流式数据中损坏或不良的数据,并且不会造成瓶颈。
1086 0
保洁A/B test——用户数据分析的魔力
    在1990年代中期,宝洁公司的高管们开始一项研发全新除异味产品的秘密计划。宝洁公司花费了几百万美元开发了一种无色且成本低廉的液体,这种液体能够喷涂在有油烟味的衬衣、发臭的沙发、陈旧的夹克以及污损的汽车内饰上来除去异味。
1091 0
基于 MaxCompute 的实时数据处理实践
MaxCompute 通过流式数据高性能写入和秒级别查询能力(查询加速),提供EB级云原生数仓近实时分析能力;高效的实现对变化中的数据进行快速分析及决策辅助。当前Demo基于近实时交互式BI分析/决策辅助场景,实现指标卡近实时BI分析、近实时市场监测、近实时趋势分析、近实时销量拆分功能。
500 0
基于DataFlux进行养猪场实时数据模拟生成和分析实践
摘要:DataFlux是驻云科技的实时大数据分析平台。经过对养猪场的数据分析需求,使用DataMock数据模拟器模拟生成原始数据并上传至DataFlux,快速实现了对养猪场数据的分析全流程。 注:本次业务分析、模拟数据分析生成和实践主要为培训和演示用途,旨在快速了解DataMock和DataFlux进行实时数据分析的功能和流程。
755 0
MaxCompute(ODPS)上处理非结构化数据的Best Practice
随着MaxCompute(ODPS)2.0的上线,新增的非结构化数据处理框架也推出一系列的介绍文章,包括 MaxCompute上如何访问OSS数据, 基本功能用法和整体介绍,侧重介绍读取OSS数据进行计算处理; 本文:MaxCompute(ODPS)上处理非结构化数据的Best Practice。
4094 0
MaxCompute在电商场景中如何进行漏斗模型分析
本文以某电商案例为例,通过案例为您介绍如何使用离线计算并制作漏斗图。
3328 0
阿里首次披露中台战略:OneData的统一数据标准和实时数据分析是核心
近日,阿里巴巴公共数据平台负责人罗金鹏首次对外披露了在阿里中台战略下,如何推动数据中台落地的个中细节,其中OneData的统一数据标准和实时数据分析是核心。
20141 0
使用 Kafka + Spark Streaming + Cassandra 构建数据实时处理引擎
Apache Kafka 是一个可扩展,高性能,低延迟的平台,允许我们像消息系统一样读取和写入数据。我们可以很容易地在 Java 中使用 Kafka。 Spark Streaming 是 Apache Spark 的一部分,是一个可扩展、高吞吐、容错的实时流处理引擎。
2725 0
从 0 到 1 通过 Flink + Tablestore 进行大数据处理与分析
阿里云实时计算Flink版是一套基于 Apache Flink 构建的⼀站式实时大数据分析平台。在大数据场景下,实时计算 Flink 可提供端到端亚秒级实时数据流批处理能力。表格存储 Tablestore (又名 OTS)是阿里云自研的多模型结构化数据存储,可提供海量结构化数据的存储、查询分析服务。表格存储的双引擎架构支持千万TPS和毫秒级延迟的服务能力,可作为大数据计算的极佳上下游存储。
307 0
应用MaxCompute实现变压器局部放电相位分析
应用MaxCompute实现变压器局部放电相位分析 1 引言 随着智能电网建设的不断推进,智能化电力一次设备和常规电力设备的在线监测都得到了较大发展并成为趋势,监测数据日益庞大,电力设备在线监测系统在数据存储和处理方面面临巨大的技术挑战。
5937 0
+关注
wangzhuo.wz
离线与实时计算,消息中间件
1
文章
2
问答
来源圈子
更多
MaxCompute(原ODPS)是一项面向分析的大数据计算服务,它以Serverless架构提供快速、全托管的在线数据仓库服务,消除传统数据平台在资源扩展性和弹性方面的限制,最小化用户运维投入,使您经济并高效的分析处理海量数据。
+ 订阅
文章排行榜
最热
最新
相关电子书
更多
文娱运维技术
立即下载
《SaaS模式云原生数据仓库应用场景实践》
立即下载
《看见新力量:二》电子书
立即下载