阿里云大数据利器之-使用flume+sql实现流计算做实时展现业务(归档Maxcompute)

简介: 实时业务处理的需求越来越多,也有各种处理方案,比如storm,spark等都可以。那以数据流的方向可以总结成数据源-数据搜集-缓存队列-实时处理计算-数据展现。本文就用阿里云产品简单实现了一个实时处理的方案。

实时业务处理的需求越来越多,也有各种处理方案,比如storm,spark等都可以。那以数据流的方向可以总结成数据源-数据搜集-缓存队列-实时处理计算-数据展现。本文就用阿里云产品简单实现了一个实时处理的方案。

一,总体架构
1

按照数据流向
数据采集:flume(配置故障转移)
缓存队列:datahub
https://help.aliyun.com/product/53345.html?spm=5176.7618386.3.4.cigK2v
数据落地:rds(mysql)
https://help.aliyun.com/document_detail/26092.html?spm=5176.7841871.6.539.9FTjxU
数据展现:Quick-BI
https://data.aliyun.com/product/bi?spm=5176.8142029.388261.284.spvIS0
或者大屏显示 DATA-V
https://data.aliyun.com/visual/datav?spm=5176.8142029.388261.283.spvIS0

二,搭建过程

1,flume配置搭建
flume在数据采集的开源框架中还是比较常用的,但是在采集输送到datahub中有可能网络断了或者服务器挂了。那这里配置了故障转移,如图,其中sink1和sink2为上面架构中的agentA和agentB.把agentA和agentB分别部署在两台服务器上。
2

在搭建flume时需要安装DatahubSink插件,
那看下配置文件


# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1

# Describe/configure the source这里监控一个文件变化,写了一个定时脚本每秒插入一条
a1.sources.r1.type = exec
a1.sources.r1.channels=c1
a1.sources.r1.command=tail -F /usr/local/shangdan/test.txt

#define sinkgroups,在这里配置故障转移的sink组
a1.sinkgroups=g1
a1.sinkgroups.g1.sinks=k1 k2
a1.sinkgroups.g1.processor.type=failover
a1.sinkgroups.g1.processor.priority.k1=10//这里设置sink的优先级,优先发送到级别高的sink里
a1.sinkgroups.g1.processor.priority.k2=5
a1.sinkgroups.g1.processor.maxpenalty=10000

#define the sink 1,发送到agentA
a1.sinks.k1.type=avro
a1.sinks.k1.hostname=agentA的ip
a1.sinks.k1.port=5555

#define the sink 2 ,发送到agentB
a1.sinks.k2.type=avro
a1.sinks.k2.hostname=agentB的ip
a1.sinks.k2.port=5555


# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel=c1
~

agentA和agentB的配置文件出了ip地址不一样,其他完全一致,这里贴其中一个

A single-node Flume configuration for Datahub
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.channels=c1
a1.sources.r1.bind= agentA的ip
a1.sources.r1.port= 5555
# Describe the sink
a1.sinks.k1.type = com.aliyun.datahub.flume.sink.DatahubSink
a1.sinks.k1.datahub.accessID = ******
a1.sinks.k1.datahub.accessKey = **********
a1.sinks.k1.datahub.endPoint = http://dh-cn-hangzhou.aliyun-inc.com
a1.sinks.k1.datahub.project = shangdantest
a1.sinks.k1.datahub.topic = databubtest
a1.sinks.k1.serializer = DELIMITED
a1.sinks.k1.serializer.delimiter = ,//这里配置数据的分隔符
a1.sinks.k1.serializer.fieldnames = line//配置数据的字段
a1.sinks.k1.batchSize = 1
a1.sinks.k1.serializer.charset = UTF-8
a1.sinks.k1.shard.number = 1
a1.sinks.k1.shard.maxTimeOut = 60
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 1000
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

三台服务配置完成后启动flume(先启动agentA和agentB)预期结果是agent1发送数据到agentA(优先级高的),如果停止agentA服务,会自动转换发送到agentB。重启agegtA的服务后,再次切回到agentA。
如图:正常启动数据正常传输经过agent1-agentB-datahub
3

此时,停掉agentA服务,日志报错,故障转移。
4

重启agentA服务,恢复到之前状态,切回到sink1
5

2,datahub创建,
在datahub控制台创建项目和topic,
设置分片和生命周期
datahub中看到有flume传过来的数据
6

3,配置阿里流计算
登录阿里流计算控制台
注册数据源datahub/rds(也支持阿里其他类型数据源)-编写流计算脚本-调试-上线-启动

如图先注册数据源供脚本使用。必须要有数据来源表和数据结果表。
8

在编写脚本时,可以直接引用表,会自动插入表结构和配置信息,非常方便
9

那开始编写脚本必须包括三部分
1,创建数据来源表,这里是datahub表
2,创建数据结果表,这里是rds表
3,将来源表数据写入结果表,并进行计算

如图
10
三、测试
脚本编写完毕,点击上方【调试】,可以自己先准备一些数据上传测试。也可以直接线上测试,点击上面【上线】,上线成功后在【运维】中能看到项目,点击启动,项目启动几秒就工作了如图:
11

 然后可以看到监控状态,计算延迟,数据是否倾斜等指标,也有更详细的链路可以查看

12

最后,我们把整个流程全部启动,到rds中看结果如图
13

当然,如果希望源源不断的流数据保存下来称为静态的数据,作为后续业务分析统计等用途,在datahub控制台可以直接配置归档到大数据计算服务(Maxcompute)中,直接入库为表数据。
如图
14
需要在Maxcompute中创建好对应表即可自动归档存储。

好神奇,几句sql数据就源源不断的流过来,那么前端或者其他业务层可以过来拿数据展示了,数据还可以界面化配置归档入库,十分方便。如果有复杂逻辑计算的,可以申请开通流计算的udf功能,这样看来,学好sql和java,走遍天下都不怕。
数据可视化部分可以参考使用阿里云产品dataV,实现类似双十一大屏效果,也可以使用产品Quick-BI做实时报表。

相关实践学习
基于MaxCompute的热门话题分析
Apsara Clouder大数据专项技能认证配套课程:基于MaxCompute的热门话题分析
目录
相关文章
|
5月前
|
SQL 存储 分布式计算
【万字长文,建议收藏】《高性能ODPS SQL章法》——用古人智慧驾驭大数据战场
本文旨在帮助非专业数据研发但是有高频ODPS使用需求的同学们(如数分、算法、产品等)能够快速上手ODPS查询优化,实现高性能查数看数,避免日常工作中因SQL任务卡壳、失败等情况造成的工作产出delay甚至集群资源稳定性问题。
1273 36
【万字长文,建议收藏】《高性能ODPS SQL章法》——用古人智慧驾驭大数据战场
|
4月前
|
数据采集 缓存 大数据
【赵渝强老师】大数据日志采集引擎Flume
Apache Flume 是一个分布式、可靠的数据采集系统,支持从多种数据源收集日志信息,并传输至指定目的地。其核心架构由Source、Channel、Sink三组件构成,通过Event封装数据,保障高效与可靠传输。
303 1
|
6月前
|
SQL 分布式计算 大数据
SparkSQL 入门指南:小白也能懂的大数据 SQL 处理神器
在大数据处理的领域,SparkSQL 是一种非常强大的工具,它可以让开发人员以 SQL 的方式处理和查询大规模数据集。SparkSQL 集成了 SQL 查询引擎和 Spark 的分布式计算引擎,使得我们可以在分布式环境下执行 SQL 查询,并能利用 Spark 的强大计算能力进行数据分析。
|
8月前
|
SQL 人工智能 分布式计算
别再只会写SQL了!这五个大数据趋势正在悄悄改变行业格局
别再只会写SQL了!这五个大数据趋势正在悄悄改变行业格局
163 0
|
10月前
|
SQL 关系型数据库 MySQL
大数据新视界--大数据大厂之MySQL数据库课程设计:MySQL 数据库 SQL 语句调优方法详解(2-1)
本文深入介绍 MySQL 数据库 SQL 语句调优方法。涵盖分析查询执行计划,如使用 EXPLAIN 命令及理解关键指标;优化查询语句结构,包括避免子查询、减少函数使用、合理用索引列及避免 “OR”。还介绍了索引类型知识,如 B 树索引、哈希索引等。结合与 MySQL 数据库课程设计相关文章,强调 SQL 语句调优重要性。为提升数据库性能提供实用方法,适合数据库管理员和开发人员。
|
11月前
|
SQL 大数据 数据挖掘
玩转大数据:从零开始掌握SQL查询基础
玩转大数据:从零开始掌握SQL查询基础
395 35
|
10月前
|
关系型数据库 MySQL 大数据
大数据新视界--大数据大厂之MySQL 数据库课程设计:MySQL 数据库 SQL 语句调优的进阶策略与实际案例(2-2)
本文延续前篇,深入探讨 MySQL 数据库 SQL 语句调优进阶策略。包括优化索引使用,介绍多种索引类型及避免索引失效等;调整数据库参数,如缓冲池、连接数和日志参数;还有分区表、垂直拆分等其他优化方法。通过实际案例分析展示调优效果。回顾与数据库课程设计相关文章,强调全面认识 MySQL 数据库重要性。为读者提供综合调优指导,确保数据库高效运行。
|
11月前
|
SQL 容灾 关系型数据库
阿里云DTS踩坑经验分享系列|DTS打通SQL Server数据通道能力介绍
SQL Server 以其卓越的易用性和丰富的软件生态系统,在数据库行业中占据了显著的市场份额。作为一款商业数据库,外部厂商在通过解析原生日志实现增量数据捕获上面临很大的挑战,DTS 在 SQL Sever 数据通道上深研多年,提供了多种模式以实现 SQL Server 增量数据捕获。用户可以通过 DTS 数据传输服务,一键打破自建 SQL Server、RDS SQL Server、Azure、AWS等他云 SQL Server 数据孤岛,实现 SQL Server 数据源的流动。
686 0
阿里云DTS踩坑经验分享系列|DTS打通SQL Server数据通道能力介绍
|
存储 分布式计算 Java
踏上大数据第一步:flume
Flume 是一个分布式、可靠且高效的系统,用于收集、聚合和移动大量日志数据。它是 Apache 顶级项目,广泛应用于 Hadoop 生态系统中。Flume 支持从多种数据源(如 Web 服务器、应用服务器)收集日志,并将其传输到中央存储(如 HDFS、HBase)。其核心组件包括 Source、Channel 和 Sink,分别负责数据获取、临时存储和最终存储。本文还介绍了在 Ubuntu 20.04 上安装 Flume 1.9.0 的步骤,涵盖 JDK 安装、Flume 下载、解压、配置环境变量及验证安装等详细过程。
350 10

热门文章

最新文章

相关产品

  • 云原生大数据计算服务 MaxCompute