阿里云大数据利器之-使用sql实现流计算做实时展现业务( flume故障转移版 )

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
简介: 实时业务处理的需求越来越多,也有各种处理方案,比如storm,spark等都可以。那以数据流的方向可以总结成数据源-数据搜集-缓存队列-实时处理计算-数据展现。本文就用阿里云产品简单实现了一个实时处理的方案。

实时业务处理的需求越来越多,也有各种处理方案,比如storm,spark等都可以。那以数据流的方向可以总结成数据源-数据搜集-缓存队列-实时处理计算-数据展现。本文就用阿里云产品简单实现了一个实时处理的方案。

一,总体架构
1

按照数据流向
数据采集:flume(配置故障转移)
缓存队列:datahub
https://help.aliyun.com/product/53345.html?spm=5176.7618386.3.4.cigK2v
数据计算:阿里流计算(StreamCompute)
https://help.aliyun.com/video_list/54212.html?spm=5176.7618386.3.2.COgP6l
数据落地:rds(mysql)
https://help.aliyun.com/document_detail/26092.html?spm=5176.7841871.6.539.9FTjxU
数据展现:Quick-BI
https://data.aliyun.com/product/bi?spm=5176.8142029.388261.284.spvIS0
或者大屏显示 DATA-V
https://data.aliyun.com/visual/datav?spm=5176.8142029.388261.283.spvIS0

二,搭建过程

1,flume配置搭建
flume在数据采集的开源框架中还是比较常用的,但是在采集输送到datahub中有可能网络断了或者服务器挂了。那这里配置了故障转移,如图,其中sink1和sink2为上面架构中的agentA和agentB.把agentA和agentB分别部署在两台服务器上。
2

在搭建flume时需要安装DatahubSink插件,参考https://help.aliyun.com/knowledge_detail/42843.html
那看下配置文件


# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1

# Describe/configure the source这里监控一个文件变化,写了一个定时脚本每秒插入一条
a1.sources.r1.type = exec
a1.sources.r1.channels=c1
a1.sources.r1.command=tail -F /usr/local/shangdan/test.txt

#define sinkgroups,在这里配置故障转移的sink组
a1.sinkgroups=g1
a1.sinkgroups.g1.sinks=k1 k2
a1.sinkgroups.g1.processor.type=failover
a1.sinkgroups.g1.processor.priority.k1=10//这里设置sink的优先级,优先发送到级别高的sink里
a1.sinkgroups.g1.processor.priority.k2=5
a1.sinkgroups.g1.processor.maxpenalty=10000

#define the sink 1,发送到agentA
a1.sinks.k1.type=avro
a1.sinks.k1.hostname=agentA的ip
a1.sinks.k1.port=5555

#define the sink 2 ,发送到agentB
a1.sinks.k2.type=avro
a1.sinks.k2.hostname=agentB的ip
a1.sinks.k2.port=5555


# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel=c1
~

agentA和agentB的配置文件除了IP地址不一样,其他完全一致,这里贴其中一个

A single-node Flume configuration for Datahub
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.channels=c1
a1.sources.r1.bind= agentA的ip
a1.sources.r1.port= 5555
# Describe the sink
a1.sinks.k1.type = com.aliyun.datahub.flume.sink.DatahubSink
a1.sinks.k1.datahub.accessID = ******
a1.sinks.k1.datahub.accessKey = **********
a1.sinks.k1.datahub.endPoint = http://dh-cn-hangzhou.aliyun-inc.com
a1.sinks.k1.datahub.project = shangdantest
a1.sinks.k1.datahub.topic = databubtest
a1.sinks.k1.serializer = DELIMITED
a1.sinks.k1.serializer.delimiter = ,//这里配置数据的分隔符
a1.sinks.k1.serializer.fieldnames = line//配置数据的字段
a1.sinks.k1.batchSize = 1
a1.sinks.k1.serializer.charset = UTF-8
a1.sinks.k1.shard.number = 1
a1.sinks.k1.shard.maxTimeOut = 60
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 1000
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

三台服务配置完成后启动flume(先启动agentA和agentB)预期结果是agent1发送数据到agentA(优先级高的),如果停止agentA服务,会自动转换发送到agentB。重启agegtA的服务后,再次切回到agentA。
如图:正常启动数据正常传输经过agent1-agentB-datahub
3

此时,停掉agentA服务,日志报错,故障转移。
4

重启agentA服务,恢复到之前状态,切回到sink1
5

2,datahub创建,
在datahub控制台创建项目和topic,
设置分片和生命周期,具体方法见链接
https://help.aliyun.com/document_detail/47448.html?spm=5176.doc47443.6.584.UrSX1A
datahub中看到有flume传过来的数据
6

3,配置阿里流计算

登录阿里流计算控制台

注册数据源datahub/rds(也支持阿里其他类型数据源)-编写流计算脚本-调试-上线-启动

如图先注册数据源供脚本使用。必须要有数据来源表和数据结果表。
8

在编写脚本时,可以直接引用表,会自动插入表结构和配置信息,非常方便
9

那开始编写脚本必须包括三部分
1,创建数据来源表,这里是datahub表
2,创建数据结果表,这里是rds表
3,将来源表数据写入结果表,并进行计算

如图
10
三、测试

   脚本编写完毕,点击上方【调试】,可以自己先准备一些数据上传测试。也可以直接线上测试,点击上面【上线】,上线成功后在【运维】中能看到项目,点击启动,项目启动几秒就工作了如图:

11

 然后可以看到监控状态,计算延迟,数据是否倾斜等指标,也有更详细的链路可以查看

12

最后,我们把整个流程全部启动,到rds中看结果如图
13

当然,如果希望源源不断的流数据保存下来称为静态的数据,作为后续业务分析统计等用途,在datahub控制台可以直接配置归档到大数据计算服务(Maxcompute)中,直接入库为表数据。
如图
14
需要在Maxcompute中创建好对应表即可自动归档存储。详细配置如下
https://help.aliyun.com/document_detail/47453.html?spm=5176.doc47439.6.555.3GNrRs

好神奇,几句sql数据就源源不断的流过来,那么前端或者其他业务层可以过来拿数据展示了,数据还可以界面化配置归档入库,十分方便。如果有复杂逻辑计算的,可以申请开通流计算的udf功能,这样看来,学好sql和java,走遍天下都不怕。
数据可视化部分可以参考使用阿里云产品dataV,实现类似双十一大屏效果,也可以使用产品Quick-BI做实时报表。

有对大数据技术感兴趣的,可以加笔者的微信 wx4085116.目前笔者已经从阿里离职,博客不代表阿里立场。笔者开了一个大数据培训班。有兴趣的加我。

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps 
目录
相关文章
|
4天前
|
存储 消息中间件 监控
【Flume】Flume在大数据分析领域的应用
【4月更文挑战第4天】【Flume】Flume在大数据分析领域的应用
|
4天前
|
SQL 存储 数据管理
阿里云视觉智能开放平台的逻辑数仓基于统一的SQL语法
【2月更文挑战第9天】阿里云视觉智能开放平台的逻辑数仓基于统一的SQL语法
67 2
|
4天前
|
分布式计算 大数据 BI
MaxCompute产品使用合集之MaxCompute项目的数据是否可以被接入到阿里云的Quick BI中
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
4天前
|
分布式计算 大数据 MaxCompute
MaxCompute产品使用合集之使用pyodps读取OSS(阿里云对象存储)中的文件的步骤是什么
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
4天前
|
分布式计算 DataWorks Java
DataWorks产品使用合集之阿里云DataWorks专有云环境下,上传MaxCompute的UDF(用户自定义函数)的JAR包的步骤如何解决
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
22 0
|
4天前
|
SQL 分布式计算 DataWorks
MaxCompute产品使用合集之阿里云MaxCompute对SQL语句的长度的长度限制是多少
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
4天前
|
机器学习/深度学习 分布式计算 数据挖掘
阿里云 MaxCompute MaxFrame 开启免费邀测,统一 Python 开发生态
阿里云 MaxCompute MaxFrame 正式开启邀测,统一 Python 开发生态,打破大数据及 AI 开发使用边界。
757 1
|
4天前
|
SQL 数据可视化 Apache
阿里云数据库内核 Apache Doris 兼容 Presto、Trino、ClickHouse、Hive 等近十种 SQL 方言,助力业务平滑迁移
阿里云数据库 SelectDB 内核 Doris 的 SQL 方言转换工具, Doris SQL Convertor 致力于提供高效、稳定的 SQL 迁移解决方案,满足用户多样化的业务需求。兼容 Presto、Trino、ClickHouse、Hive 等近十种 SQL 方言,助力业务平滑迁移。
阿里云数据库内核 Apache Doris 兼容 Presto、Trino、ClickHouse、Hive 等近十种 SQL 方言,助力业务平滑迁移
|
4天前
|
人工智能 DataWorks 数据可视化
心动基于阿里云DataWorks构建游戏行业通用大数据模型
心动游戏在阿里云上构建云原生大数据平台,基于DataWorks构建行业通用大数据模型,如玩家、产品、SDK、事件、发行等,满足各种不同的分析型应用的要求,如AI场景、风控场景、数据分析场景等。
348 1
|
2天前
|
SQL 数据处理 API
实时计算 Flink版产品使用合集之遇到SQL Server锁表问题如何解决
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
8 0