互联网、游戏等行业实时数仓构建最佳实践

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
实时数仓Hologres,5000CU*H 100GB 3个月
阿里云百炼推荐规格 ADB PostgreSQL,4核16GB 100GB 1个月
简介: 互联网、游戏等行业客户基于Flink构建实时数仓,通过Logtail采集日志,使用Flink实现对日志的拆分、处理。

分享人:弦望  解决方案架构师

        元风  Flink产品专家

正文:本文从三方面来介绍互联网、游戏等行业实时数仓构建最佳实践。

Ÿ 最佳实践原理讲解

Ÿ 核心产品实时计算Flink讲解

Ÿ 最佳实践系统搭建


一、最佳实践原理讲解


1)场景描述


本方案的核心场景是互联网、游戏等行业客户,对业务数据的实时反馈尤为重要,比如游戏的实时战斗数据、跟AI结合的实时反馈数据等,往往会基于Flink构建实时数仓。本方案通过Logtail采集服务端日志,使用Flink实现对日志的拆分、处理,根据业务需求写入不同的数据存储,实时分析数据使用ADB存储,离线分析数据 使用HDFS存储,构建高效、稳健的实时数据服务。这也是行业内构建实时和离线数仓、实时离线分离最通用的方案之一。以下几个场景非常适合使用本方案:

Ÿ 日志的采集、处理及分析、日志投递到SLS日志服务

Ÿ 日志使用Flink拆分到不同Logstore,使用Flink对日志数据进行ETL

Ÿ 日志使用Flink实时写入HDFS,ETL后写入离线数仓。

Ÿ 日志数据实时ETL构建实时数仓。

Ÿ 日志实时写入ADB进行实时查询分析等场景。


2)系统架构图


image.png

这是来自客户的一个架构,客户的业务日志存放在ECS上,并且多种日志是汇聚在一起的,如广告日志、自定义事件、交易日志、登录日志等,需要把不同的日志拆分到不同的Logstore,客户有80多种日志类型,如果通过Flink内置的connectore组件实现需要写80多组SQL,而且不好维护。为了解决这个痛点,本方案通过自定义Sink实现日志拆分,一组FlinkSQL,业务逻辑全部通过自定义代码完成。

客户的业务日志数据需要进行实时ETL,这里为了结合业务和维表的缓存能力,我们使用RDS、Hbase、ADB等维表实现日志实时ETL

实时数据需要处理完后实时写入ADB,供业务系统做后续数据实时分析使用。

离线数据通过Flink写入EMRHDFS。Flink组件未包含写入HDFS的组件并且开源的组件逻辑也不能满足业务需求,这里需要通过我肯定义Sink实现。

测试从日志落地到写入ADB的总体延时,即测试Flink在全链路处理数据的时效性验证。


3)方案优势


image.png

Ÿ 性能优越:Flink集群拥有高吞吐和高扩展性,是业内公认的实时计算最新一代的引擎。

Ÿ 安全稳定:支持Exactly-Once保障数据的唯一性和可恢复性,支持故障自动恢复,资源隔离。

Ÿ 简单易用:支持标准的SQL语言,可实时在线开发,全面支持UDX实现复杂的业务逻辑。

Ÿ 功能强大:支持SQL进行实时及离线数据清洗、,数据分析、数据同步、异构数据源计算等数据湖相关功能,以及各种流式及静态数据源关联查询。


4)实时计算产品架构


image.png

左侧数据源可以来自ECS、RDS及IOT等设备产生的数据,数据源通过消息队列如SLS、MQ、Datahub等产品进入实时计算引擎。在实时计算中对数据进行实时ETL写入下游数据存储,如Hologress、MaxCompute、Hbase、ES等产品。Flink支持 SQL和DataStream API编写满足简单和复杂应用场景,支持自定义connector,支持CEP Rule engine、Flink ML、Gelly等引擎库,同时支持Web IDE开发和强大的数据监控能力。


二、核心产品Flink讲解


image.png


1)实时计算-技术架构


在实时计算Flink板全托管模式中一共分成两个主要的组成部分:第一层是开发平台,第二层是执行引擎。

在开发平台中,我们提供Verveica Platform。Verveica是德国的Flink创始团队所成立的公司,阿里巴巴在2009年收购了这个公司,并在去上提供商业板产品,在执行引擎上我们提供开源版的Flink内核,在开源版内核的基础上提供远程Shuffle,包括高业版的状态存储,以及对SQL作业算子的深度优化。

在开发平台上面,我们做了可视化的SQL编辑器,提供SQL Debug功能,提供全链路的商业版Connectors,扩展积极学习库,并且在Operator级别做了状态管理,为大家提供AotuPilot智能高优,可以根据作业的规模来增减作业资源使用量,在云上提供Java与Python的作业支持,为大家提供99.9%的SLA企业级安全以及全链路控制。


2)实时计算与开源Flink对比


image.png

可以看到对比Flink,我们一共有这些主要的功能 ,在SQL、部署、优化、状态存储、资源管理、机器学习包括监控报警上,我们领先开源。我们产品主打的从开发到运行,主打的几个核心功能。


3)在线调试


image.png

首先是SQL作业的在线调度,对于流作业的开发SQL往往是我们的第一选择,但是SQL在开发的过程中,由于它语意与最后执行引擎是不统一,所以经常会造成代码与逻辑的混乱,所以在云上我们为大家提供给予SQL的调试能力,这样的话我们可以直接看到SQL的执行结果 。


4)Session Cluster


image.png

在执行层,我们为云上用户提供小规模的Session集群,在Session集群中,默认多个作业是共享相同资源,这样可以最大化地节省资源的使用,同时提升资源的利用率,并且对于小规模的作业,由于Session集群是提前创建完毕的,可以节省状态的下载时间。当作业负载达到一定程度时,可以手动将作业从Session集群迁到Per-Job集群里。


5)AutoPilot


image.png

分配算力时,我们尽可能的将计算能力与真实的流量进行匹配,我们就可以在流作业的峰谷对作业进行资源的扩充以及资源的消减,可以最大化地节省资源的利用,保证作业平衡运行。


三、最佳实践系统搭建


image.png

使用云架构设计工具CADT来拉起今天要演示的环境,搜索实时数仓,互联网游戏等行业实时数仓,基于模版新建。

image.png

在这里要对一些参数进行修改,改一下OSS配置Bucket的名称。

image.png

Ecs的密码需要设置一下。

image.png

日志服务Project的名称设置一下,名字要是唯一的。

image.png

Emr也要设置一下密码。

image.png

RDS数据库也设置一下密码。

image.png

保存。输入一个自定义的名称。

image.png

保存后开始应用部署。

image.png

资源校验成功了。

image.png

看一下部署需要的相关价格,这里列出来免费和后付费的价格,这里有详细的报告可以看。

image.png

勾选协议,创建资源。

image.png

可以看到资源已经成功创建了。

image.png

接下来创建数据库,在这里进入到rds的控制台。

image.png

进入rds控制台,这是我们的帐号。

image.png

创建数据库。

image.png

接下来设置白名单,目前已经在白名单里面了。

image.png

我们把emr、ecs的安全组添加到白名单。

image.png

进入adb控制台,创建帐号。

image.png

创建完成。

image.png

登陆数据库。

image.png

添加ems的白名单,再登陆就可以了。

image.png

image.png

登陆实例,创建数据库。

image.png

回到adb控制台,找到数据安全,添加白名单。

image.png

把vpc的网段也添加进来。

image.png

网段可以在CADT里看到。

image.png

接下来创建实时计算,我们这里Blink独享集群,目前我们推荐使用Flink全托管产品,现在已经把Blink的特性和开源版社区的能力融合在一起了,形成了现在的Flink全托管。今天的演示使用Blink独享集群来做。

image.png

我们master选1个就可以了,Slave选择3个,购买。

image.png

接下来创建集群。

image.png

OSS刚才也跟随创建了,这里选一下,VPC是VPC FlinkZonee可用区,创建。

image.png

接下来等待集群创建,现在集群已经创建好了,我们开始创建项目。

image.png 

创建项目。

image.png

我们在日志服务台看一下这个项目。这是我们创建的Project

image.png

这是我们创建的logstore

image.png

这里选择sls的服务器。

image.png

执行安装成功,确认安装完毕。

image.png

输入名称。

image.png

看一下核心组,是成功了。

image.png

配置名称和日志路径。

image.png

手动输入日志样例,填写日志抽取内容。

image.png

上传原始日志,进入下一步。

image.png

我们的logstore就创建成功了。

image.png

从这里找hbase控制台。

image.png

修改分组白名单,把vpc的网段加进去。

我们所有的基础环境的配置就完成了。

image.png

这个是我们的公网IP

image.png

登陆一下。

image.png

下载原始日志,首先进入目录,接下来创建目录。

image.png

使用命令去生成一个模拟的自主文件。

image.png

接下来登陆日志服务的控制台,看一下日志生成的情况,可以看到日志已经成功上传到日志服务了,说明我们的采集已经生效了。

 

接下来把日志拆分到不同的logstore,可以看到evenkey有很多种类型,原始日志是汇集到一个文件里面的,接下来把不同的日志文件分别拆分到不同的logstore里面。

image.png

我们通过给大家提供的源码来进行创建,大家可以导入这个源码去创建logstore,这里需要把AK替换成自己的,日志服务也替换一下,这里使用新创建的logstore

image.png

接下来运行一下,可以看到成功了,可以看到预处理成了Create案例。

image.png

我们再回到日志服务的控制台,看一下创建完成了。

image.png

可以看到刚刚这些通过代码创建的logstore都已经创建了,这里需要注意给所有logstore开启索引。

image.png

接下来在rds控制台登陆数据库。

image.png

image.png

添加上白名单,登陆。

image.png

登陆成功后,先创建尾表,点poc的数据库,点执行,创建尾表,成功创建了。

image.png

接下来往尾表里传数据,刷新一下,执行可查询的数据,可以看到类型和logstore已经做了一一映射的关系。

image.png

接下来对源码进行编译,创建一个新的编译clean install,开始编译源码。

image.png

可以看到build成功,编译完成了。

image.png

回到Flink的项目,已经创建好了。

image.png

image.png

进入项目,进到Flink的开发,首先上传资源,新建资源。

image.png

上传一个刚刚打包出来的价包,选择这个价包。

image.png

上传。

image.png

上传完成之后,我们去新建一个作业,这个作业用于拆分到不同的logstore

image.png

从Demo库里把Flink SQL拷贝过来,这时注意要修改一下AK的信息、project、logstore相关的信息。

image.png

替换一下rds尾表的地址,去拷贝rds的内网地址。

image.png

数据库是poc,用户名改一下,把输出的logstore也改一下,project也替换一下。

image.png

作业已经自动保存了,在这个作业里,我们使用了自定义的价包,资源还要去引用一下。

image.png

上线作业。

image.png

通过后,自动调优,上线。

image.png

上线后,进到运维的界面。

image.png

启动作业,时间选择较早的时间,保证当前是有数据的。

image.png

进入作业详情,看一下作业的拓扑图。

image.png

任务失败,排查一下原因,进入Jobmanager里面。

image.png

可以看到Jobmanager.log

image.png

搜索一下看有没有错误,这个是连接失败了。

image.png

我们排查一下网络连接的问题,原来是刚才帐号输错了。

image.png

image.png

把原来任务停止,重新启动一下作业。

image.png

再进来看一下,现在数据能正常输入输出了。

image.png

刚刚也给大家介绍了如何在Flink里面如何排错。大家在Jobmanager里看到Flink日志的。刚才Jobmanager.log是认证失败的,排查数据库的帐号输错了,重新上线启动了作业,现在好了。

image.png

如果要看当前Flink集群的IP地址是什么,哪台机器在处理这个作业,可以在日志中心里看到IP

image.png

可以在日志服务的控制台里看看,可以看到日志已经被拆分到不同的logstore里面了。这是根据rds尾表里的映射关系,把原始不同类型的日志拆分到不同类型的logstore

image.png

接下来处理日志实时的etl,进入到ecs,我们把hbase的客户端部署到ecs上,这里在实验,为了节约资源的关系,直接把hbase客户端部署到日志上,生产的时候可以独立地去做部署,部署在这台机器上,开始安装。

image.png

安装完成后,去编辑下hbase的配置。

image.png

需要去hbase控制台,找到ZK连接地址,复制过去。

image.png

接下来进入到hbase客户端的命令端,用shell启动客户端。

image.png

启动之后创建一个hbase的尾表,比如创建一个item的尾表。

image.png

成功创建了,可以看到发表已经成功创建。

image.png

往表里插一组数据,可以看下数据是不是成功插入。

image.png

可以看到数据已经成功插入了。

image.png

接下来在hbase里添加Flink安全组。

image.png

确认一下Flink集群当前的安全组是哪一个,勾选添加。

image.png

创建logstore,用来去做数据etl之后的存储,在这里创建logstore,永久保存,创建。

image.png

在实时计算里面创建开发任务,新建作业。

image.png

再从Demo库里拷贝Flink SQL,这里面的信息也要替换一下。

image.png

拷贝数据库的连接地址。

image.png

把AK、project、logstore都要替换。

image.png

image.png

保存后上线。image.png

我们再启动作业。

image.png

成功运行后,我们看一下拓扑图。

image.png

看一下有没有数据的产生。可以看到有数据输入和输出。

image.png

在日志服务控制台看一下,开启索引,如果查不到,可能是没有开索引。

image.png

重启下原来的作业。

image.png

重启后,可以查询到了。有些空的,是因为有些字段不完整。

image.png

把日志实时同步到adb里面。首先创建adb的结构表,进入在dms控制台,点poc,创建一个adb的表,已经成功创建了。

image.png

接下来在实时计算里面创建一个Flink SQL,去把刚刚处理完的日志数据往adb里面去写,新建一个作业。

image.png

拷贝一下Flink SQL,修改一些AK等信息,进行替换,logstore是一致的,都确认一下。

image.png

把adb的链接拷贝过来,看下用户名、密码。

image.png

上线。

image.png

进入运维,启动作业,选择较早的时间。

image.png

启动完后,看下数据的情况。

image.png

可以看到数据在输入和输出。

image.png

进到adb的控制台,查询下数据是否已经写入。

image.png

查看下数据延时的情况,这个表里面记录了原始日志的时间,可以分析从日志生产的时间到我们进行一系列上传、拆分,看下数据的平均延时是多少。

image.png

可以看到这个是刚才查出来的延时,可以测全链路,通过实时计算处理这个延时是多少。我们这个方案里还包含离线,我们把数据写到edf里面,这是一个实时的方案。

image.png我们看一下emr的安全组是不是对Flink已经放行了,进入emr集群,看下emr的安全组,添加一下端口,全部放开,授权给实时计算的安全组,保存。

image.png

在实时计算里新建作业。

image.png

再把Flink SQL写进去。

image.png

把日志里的project信息替换一下,找到emr的host,可以在emr控制台找到,直接使用内网地址。

image.png

这个是自定义的sink

image.png

这个是从日志服务通过customersink,,去把写到hdfs里面,这里面的基本逻辑是根据时间写不同的目录。

image.png

上线。

image.png

image.png

看一下是什么错误,看一下是否是资源引用的问题,把资源引用上。

image.png

image.png

上线好后,启动。

image.png

进入作业,看下拓扑图。

image.png 

可以看到数据已经处理了。

image.png

下面进入emr集群里看下数据是否真实写入。用emr的公网地址。

image.png

已经连接到emr的节点地址上了。

image.png

可以看到当前已经有文件写入了,是每小时写一个文件。

image.png

接下来看下这个文件里是否有内容。可以看到CustomEvent里面的时间。写进来的日志已经归类到文件了。这个就是我们完成的需要把离线存储的数据按照不同的时间存到不同的hdfs里,后面再做离线分析。

image.png

下面做延时的测试,原来的数据先进到adb的控制台,可以把原来的数据清空。

image.png

再回到ecs集群,这里先测试1000条数据,执行生成1000条日志。

image.png

生成之后可以测试一下延时,可以看到当前延时是5秒,这是平均的延时。

image.png

先把数据清掉。

image.png

可以看到是1.6秒的延时。

这样我们可以汇总统计,处理多少条数据,在全链路上的延时是多少,因为我们每次启动作业是会有延时在里面。可以看到数据越小反而越快的。可见实时计算在全链路的实时处理上,它的时效性是非常强的。

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
相关文章
|
1天前
|
存储 关系型数据库 BI
实时计算UniFlow:Flink+Paimon构建流批一体实时湖仓
实时计算架构中,传统湖仓架构在数据流量管控和应用场景支持上表现良好,但在实际运营中常忽略细节,导致新问题。为解决这些问题,提出了流批一体的实时计算湖仓架构——UniFlow。该架构通过统一的流批计算引擎、存储格式(如Paimon)和Flink CDC工具,简化开发流程,降低成本,并确保数据一致性和实时性。UniFlow还引入了Flink Materialized Table,实现了声明式ETL,优化了调度和执行模式,使用户能灵活调整新鲜度与成本。最终,UniFlow不仅提高了开发和运维效率,还提供了更实时的数据支持,满足业务决策需求。
|
1天前
|
SQL 存储 分布式计算
Hologres+Paimon构建一体化实时湖仓
Hologres 3.0全新升级,面向未来的一体化实时湖仓。它支持多种Table Format,提供湖仓存储、多模式计算、分析服务和Data+AI一体的能力。Hologres与Paimon结合,实现统一元数据管理、极速查询性能、增量消费及ETL功能。Dynamic Table支持流式、增量和全量三种刷新模式,满足不同业务需求,实现一份数据、一份SQL、一份计算的多模式刷新。该架构适用于高时效性要求的场景,也可用于成本敏感的数据共享场景。
|
20天前
|
DataWorks 数据挖掘 大数据
方案实践测评 | DataWorks集成Hologres构建一站式高性能的OLAP数据分析
DataWorks在任务开发便捷性、任务运行速度、产品使用门槛等方面都表现出色。在数据处理场景方面仍有改进和扩展的空间,通过引入更多的智能技术、扩展数据源支持、优化任务调度和可视化功能以及提升团队协作效率,DataWorks将能够为企业提供更全面、更高效的数据处理解决方案。
|
2月前
|
消息中间件 人工智能 监控
Paimon x StarRocks 助力喜马拉雅直播实时湖仓构建
本文由喜马拉雅直播业务与仓库建设负责人王琛撰写,介绍了喜马拉雅直播业务的数据仓库架构迭代升级。文章重点分享了基于 Flink + Paimon + StarRocks 实现实时湖仓的架构及其成效,通过分钟级别的收入监控、实时榜单生成、流量监测和盈亏预警,大幅提升了运营效率与决策质量,并为未来的业务扩展和 AI 项目打下坚实基础。
228 5
Paimon x StarRocks 助力喜马拉雅直播实时湖仓构建
|
2月前
|
SQL 存储 数据挖掘
快速入门:利用AnalyticDB构建实时数据分析平台
【10月更文挑战第22天】在大数据时代,实时数据分析成为了企业和开发者们关注的焦点。传统的数据仓库和分析工具往往无法满足实时性要求,而AnalyticDB(ADB)作为阿里巴巴推出的一款实时数据仓库服务,凭借其强大的实时处理能力和易用性,成为了众多企业的首选。作为一名数据分析师,我将在本文中分享如何快速入门AnalyticDB,帮助初学者在短时间内掌握使用AnalyticDB进行简单数据分析的能力。
57 2
|
3月前
|
存储 SQL 分布式计算
湖仓一体架构深度解析:构建企业级数据管理与分析的新基石
【10月更文挑战第7天】湖仓一体架构深度解析:构建企业级数据管理与分析的新基石
162 1
|
2月前
|
缓存 监控 大数据
构建高可用AnalyticDB集群:最佳实践
【10月更文挑战第25天】在大数据时代,数据仓库和分析平台的高可用性变得尤为重要。作为阿里巴巴推出的一款完全托管的PB级实时数据仓库服务,AnalyticDB(ADB)凭借其高性能、易扩展和高可用的特点,成为众多企业的首选。本文将从我个人的角度出发,分享如何构建和维护高可用性的AnalyticDB集群,确保系统在各种情况下都能稳定运行。
42 0
|
2月前
|
数据采集 分布式计算 OLAP
最佳实践:AnalyticDB在企业级大数据分析中的应用案例
【10月更文挑战第22天】在数字化转型的大潮中,企业对数据的依赖程度越来越高。如何高效地处理和分析海量数据,从中提取有价值的洞察,成为企业竞争力的关键。作为阿里云推出的一款实时OLAP数据库服务,AnalyticDB(ADB)凭借其强大的数据处理能力和亚秒级的查询响应时间,已经在多个行业和业务场景中得到了广泛应用。本文将从个人的角度出发,分享多个成功案例,展示AnalyticDB如何助力企业在广告投放效果分析、用户行为追踪、财务报表生成等领域实现高效的数据处理与洞察发现。
93 0
|
6月前
|
SQL 关系型数据库 MySQL
如何在Dataphin中构建Flink+Paimon流式湖仓方案
当前大数据处理工业界非常重要的一个大趋势是一体化,尤其是湖仓一体架构。与过去分散的数据仓库和数据湖不同,湖仓一体架构通过将数据存储和处理融为一体,不仅提升了数据访问速度和处理效率,还简化了数据管理流程,降低了资源成本。企业可以更轻松地实现数据治理和分析,从而快速决策。paimon是国内开源的,也是最年轻的成员。 本文主要演示如何在 Dataphin 产品中构建 Flink+Paimon 的流式湖仓方案。
7891 10
如何在Dataphin中构建Flink+Paimon流式湖仓方案
|
4月前
|
存储 SQL 分布式计算
Hologres 与阿里云生态的集成:构建高效的数据处理解决方案
【9月更文第1天】随着大数据时代的到来,数据处理和分析的需求日益增长。阿里云作为国内领先的云计算平台之一,提供了多种数据存储和处理的服务,其中Hologres作为一款实时数仓产品,以其高性能、高可用性以及对标准SQL的支持而受到广泛关注。本文将探讨Hologres如何与阿里云上的其他服务如MaxCompute、DataHub等进行集成,以构建一个完整的数据处理解决方案。
105 2

热门文章

最新文章