一小时完成基于阿里云流计算的实时计算系统搭建

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
实时计算 Flink 版,5000CU*H 3个月
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
简介: 目前,实时计算越来越被广泛应用,比如 实时ETL、实时报表、实时大屏展示等一些监控预警和在线系统的场景。企业对计算速度和消息更新速度要求越来越高。开源框架中,Storm,Sparks,Flink等在企业生产中大量投入使用,但是开发相对复杂,需要对接各种框架api、sdk等,另外人力成本相对较高。

目前,实时计算越来越被广泛应用,比如 实时ETL、实时报表、实时大屏展示等一些监控预警和在线系统的场景。企业对计算速度和消息更新速度要求越来越高。开源框架中,Storm,Sparks,Flink等在企业生产中大量投入使用,但是开发相对复杂,需要对接各种框架api、sdk等,另外人力成本相对较高。那么有没有一种高效的实时计算平台,只要会写SQL并且可视化的操作就可以快速完成实时计算的业务开发呢

阿里云目前推出产品 阿里云流计算(公测中,预计18年3月份商业化)(StreamCompute)传送门:https://help.aliyun.com/document_detail/62437.html?spm=a2c4g.11186623.6.544.Ed7XzG

阿里云流计算全链路示意图
image.png
可以简单快速的实现仅用SQL就完成流计算的业务链路,下面我们就使用流计算给大家示范。

架构:

以阿里云流计算为核心,从数据流向上我们可以分为数据从哪里,到哪里去。
本次实验架构为:
Logstash+DataHub+阿里云流计算+RDS-mysql
其中,
Logstash :开源框架,用于采集数据
DataHub:阿里云自主研发大型缓存队列(可以理解为类似 开源Kafka )
https://help.aliyun.com/document_detail/47439.html?spm=a2c4g.11186623.6.539.hri7Gy
RDS-mysql:阿里云关系型数据库 mysql版

搭建流程:

第一步:创建DataHub 项目和Topic

具体参考
https://help.aliyun.com/document_detail/47448.html?spm=a2c4g.11186623.6.546.eJLHOm
如图是本次实验创建的:
image.png

第二步:搭建Logstash 参考

https://help.aliyun.com/document_detail/47451.html?spm=5176.product27797.6.588.iFTE4i
配置文件如下

input {
    file {
        path => "/Users/yang/test/stream.csv"
        start_position => "beginning"
    }
}
filter{
    csv {
        columns => ['name', 'age']
    }
}
output {
    datahub {
        access_id => "LTAIu****Ouj87b"
        access_key => "MfY8ONjK6******7OEdyXw4T"
        endpoint => "https://dh-cn-hangzhou.aliyuncs.com"
        project_name => "M_shangdantest"
        topic_name => "to_stream_topic"
        #shard_id => "0"
        #shard_keys => ["thread_id"]
        dirty_data_continue => true
        dirty_data_file => "/Users/yang/test/dirty.data"
        dirty_data_file_max_size => 1000
    }
}
第三步:创建流计算任务

1,可视化注册刚才创建的DataHub数据源,接入数据。如图
image.png
2,可视化注册数据输出的数据库RDS-mysql
image.png
3,编写从数据来源(DataHub)取数据经过计算写入数据目的源(RDS-mysql)的业务SQL。
如图
image.png
代码附上,具体业务逻辑可以自己通过SQL实现:

CREATE TABLE to_stream_topic (
    `name`             VARCHAR,
    age                BIGINT
) WITH (
    type = 'datahub',
    endPoint = 'http://dh-cn-hangzhou.aliyun-inc.com',
    roleArn='acs:ram::xxxxx:role/aliyunstreamdefaultrole',
    project = 'shangdantest',
    topic = 'to_stream_topic'
);
INSERT INTO resoult_stream
SELECT name,age from to_stream_topic;
CREATE TABLE resoult_stream (
    `name`             VARCHAR,
    age                BIGINT
) WITH (
    type= 'rds',
    url = 'jdbc:mysql://rm-xxxxxxx.mysql.rds.aliyuncs.com:3306/lptest',
    userName = 'xxxxx',
    password = 'xxxx',
    tableName = 'resoult_stream'
);

更多操作其他数据源参考:
https://help.aliyun.com/document_detail/62509.html?spm=a2c4g.11186623.6.633.DPannE

以上操作完成后可以
1,启动logstash
2,测试流计算,调试非常方便,对线上业务没有影响,自动从DataHub中抽取数据进行测试
image.png
3,启动流计算作业,如图
image.png
4,观察数据是否成功写入RDS-Mysql

到此 流计算一个实时链路搭建完毕,有没有发现很酷炫,只需要写写SQL,加一些数据源的配置即可。整个过程一小时就可以完成,相对开源省去了繁琐的各种环境搭建,代码编写,监控等等。

  
  
  有对大数据技术感兴趣的,可以加笔者的微信 wx4085116.目前笔者已经从阿里离职,博客不代表阿里立场。笔者开了一个大数据培训班。有兴趣的加我。
相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
4天前
|
SQL 存储 API
阿里云实时计算Flink的产品化思考与实践【下】
本文整理自阿里云高级产品专家黄鹏程和阿里云技术专家陈婧敏在 FFA 2023 平台建设专场中的分享。
111278 154
阿里云实时计算Flink的产品化思考与实践【下】
|
4天前
|
存储 消息中间件 运维
友盟+|如何通过阿里云Flink+Paimon实现流式湖仓落地方案
本文主要分享友盟+ U-App 整体的技术架构,以及在实时和离线计算上面的优化方案。
357 1
友盟+|如何通过阿里云Flink+Paimon实现流式湖仓落地方案
|
4天前
|
SQL 运维 DataWorks
Flink CDC在阿里云DataWorks数据集成应用实践
本文整理自阿里云 DataWorks 数据集成团队的高级技术专家 王明亚(云时)老师在 Flink Forward Asia 2023 中数据集成专场的分享。
546 2
Flink CDC在阿里云DataWorks数据集成应用实践
|
4天前
|
消息中间件 SQL Java
阿里云Flink-自定义kafka format实践及踩坑记录(以protobuf为例)
阿里云Flink-自定义kafka format实践及踩坑记录(以protobuf为例)
|
4天前
|
SQL 存储 数据处理
阿里云实时计算Flink的产品化思考与实践【上】
本文整理自阿里云高级产品专家黄鹏程和阿里云技术专家陈婧敏在 FFA 2023 平台建设专场中的分享。
3414 4
阿里云实时计算Flink的产品化思考与实践【上】
|
4天前
|
分布式计算 关系型数据库 OLAP
阿里云AnalyticDB基于Flink CDC+Hudi实现多表全增量入湖实践
阿里云AnalyticDB基于Flink CDC+Hudi实现多表全增量入湖实践
97 0
|
4天前
|
存储 SQL Java
阿里Flink云服务提供了CDC(Change Data Capture)功能
【2月更文挑战第10天】阿里Flink云服务提供了CDC(Change Data Capture)功能
40 1
|
4天前
|
流计算
Flink CDC里假设我做widow计算使用ProcessTime计算
【1月更文挑战第23天】【1月更文挑战第113篇】Flink CDC里假设我做widow计算使用ProcessTime计算
200 45
|
4天前
|
存储 NoSQL MongoDB
阿里云 Flink 原理分析与应用:深入探索 MongoDB Schema Inference
本文整理自阿里云 Flink 团队归源老师关于阿里云 Flink 原理分析与应用:深入探索 MongoDB Schema Inference 的研究。
46955 2
阿里云 Flink 原理分析与应用:深入探索 MongoDB Schema Inference
|
4天前
|
存储 测试技术 数据处理
阿里云实时计算企业级状态存储引擎 Gemini 技术解读
阿里云实时计算企业级状态存储引擎 Gemini 技术解读
123 0