开发者学堂课程【基于 Flink 的实时大数据应用 Demo:基于 Flink 的实时大数据应用 Demo】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/839/detail/14005
基于 Flink 的实时大数据应用 Demo
内容介绍:
一、 实时应用日志分析
二、 车辆引擎实时预测维护
为了更好的体现应用的价值以及它所代表的一个场景,这次会分享两个 Demo 接近的一个应用案例。第一个是如何去做实时 API 应用日志分析,第二个是采用模拟分析车辆引擎实时预测维护,以达到做预测维护的目的。
一、 实时应用日志分析
这个需求非常普遍,我们为这个场景搭建了一个车辆隐私保护的 API,这个 API 可以对用户上传的车辆照片进行一个隐私的保护的处理,是一个深度学习的模型,把这个模型封装成一个 API 放在阿里云的公共云上供全世界各地的用户访问,对这个 API 首先要去分析,有多少人在访问,它的访客频度来自哪个国家或地区以及访问的特征是否为攻击或者是正常的应用,为了做实时的分析需要对分散在各个地方的 API 运用日志进行海量的收集,不仅能够去收集还要能够对问题进行及时,实时的处理和报告,处理包括维度表查询以及窗口的聚合等等对流式计算的一些常见的操作,之后把这些操作处理完的结果要放在一个高吞吐低延迟的一个环境里面,使得下游的分析系统对数据进行一个实时的访问,整个链路不复杂,但是代表了一个重要的能力,与其等到另一天知道我的日志里面存了什么内容,通过使用 Flink为代表的实施计算和处理能够在描记的单位里面给业务决策人员一个数据驱动决策的赋能,下面来看下这个 Demo 是怎么来实现的。
这个架构里面有几个非常重要的几点,首先在右上方的搭建了一个环境,搭用的是flash python,结合比较主流的NGX及unicorn,把它制成了一个API,我们把API变成一个容器镜像,并且通过镜像将它部署到阿里云的各个上面,为了高并发低延迟,而且还装了第七层的负载均衡,以及前面套了一个APIK网关,去帮助用户去调用背后的这个API的能力。
作为这个 demo,我们也提供了一个web APP,使得用户不仅能通过代码去调用 API,也可以使用途径化的界面去访问API 本身所提供的服务的能力,当前端的用户去调用我们的这个 API 的时候,我们会使用 sos,简单日志服务是从API。去从 AP I 本 身的服务器当中收集,实时地收集 API 的应用日志,并且让他做简单的处理,之后投递到reailtime computer Flink,也就是实时计算浮力。Flink 有个很好的一个特征,就是它可以去订阅来自简单的日志服务的日志的投递,并且以流式计算的方式,对这个日志进行窗口聚合,维度表的查询结合等等这些操作。
Flink分析还有一个好处是,它可以用我们习以为惯的 SQL 去做比较复杂的业务逻辑的定制,当我们把这些数据都处理完了之后Flink就会把留数字啊,以我们习惯的结构化表的方式写到 Hollywood 这边呢,Lock 在这边的话,不仅作为我们数据的一个存储,也同时作为一个给下游 BR 数据展现提供动力的这么一个类似 Olap 引擎的性质,我们把这些东西串起来,形成了本次的大数据实时日志采集分析的一个架构,首先是这个 APP,可以允许用户非常简单的去上传自己的一些,还有车辆的照片 API 会他对他进行一个模糊化的处理,在 API 处理之后背景被虚化了,并且车牌的部分还有隐私信息的部分也被遮挡了。
API 做这么两个动作,那么,当我们有用户去访问这个API的时候呢,背后的简单日志服务就会对他进行一个实时的采集采后,我们会用 roxel 里面那些转换的数据加工的能力,对原始的日志去进行一些一定程度的解析和转换,其中就包括将IP地址解析为这个,例如国家、城市纬度、经度等这样的地理信息,也方便后续做下游的分析的时候,可以调度这些信息,而不是直接用 IP。
简单的是服务,还提供了一个非常强大的图形化的数据分析的能力,在这边可以作为一个初级的数据分析的,或者数据勘查的,一个能力可以去看到我们的这个数据的日志的原始日志的转换。是否满足我们对于下游业务支撑的一个需求,那么当这个日志被采集转换,处理完到一定程度了之后,我们会通过 lock Hart 将这个日志投递给我们的流流处理中心,也就是实时计算费,那其实用投递这个词并不是特别的精确,因为实际上是 Flink 主动去订阅,在local里边存储的 locked door 的这些处理过的。类似的信息,那么 Flink 有个非常好的地方,就是我之前有提到,我们可以用非常习以为常的 SQ 去编写那些业务逻辑,包括转换处理一些这个逻辑条件,然后当我们把这个 SQ 写 完了之后只要点个钮上线。
包装成一个 Frank 的 job,并且托管在 flink 的 class 集群里边我们通过这个控制台可以非常方便地反问,到底现在我的 cluster 集群的使用程度,频度如何,Cpu 如何有,有没有异常,有没有报错,包括查看整个 job 的情况,这也是Frank 托管的一个非常大的一个优势,我们几乎是不用去为运为操心,然后当 flink 处理完了这个流数据呢。所以给提供了一个很好的接口,可以使得我们处理完的流数据,以一种类似于表格结构化的方式,直接写入到我们的存储系统。Hologres 有一个特别大的特征,就是它既是 ol,也是 o lap,我们既可以把它拿做 o 去快速的写。
写入同时,也可以对被写入的数据,同时进行一个高并发的低延迟的查询分析,也就是常常说的 o pi 引擎的这个能力,她把两者合并为一块儿,所以 girls 也被称为
hybridservingundergoaprocesshcp。
在本 demo 当中,本次的价格当中,他就主要用来把这个处理完的数据展现给我们的下游。
也就是终端用户终端的业务决策人员可以消费的实时的大屏,那这个实时的大屏会随着 API 被访问,以秒级的延迟,把最新的信息处理完的信息给反映,在这个 DataV Dashboard 的这个实时大屏上。那么这样的话可以很大程度上减少决策人员看到数据当中产生的延迟,如果我们采用的是传统的那种皮处理。
这个方式的话,那每次处理可能要上 TB 的数据,而且他每次处理可能需要几个小时,我们如果采用以 Flink为核心的端到端的实时计算的方案的,这个延迟就能从几个小时被压缩在几秒,甚至是一秒以内。来具体看一个刚才的demo,在这个阶段,AR them a gallery 的页面当中,我们的 team 发布了很多。
有关大数据和 AI 的解决方案,以及一些 demo,那么其中这次的 demo,就会采用到这个叫 people privacy protection 车辆隐私保护的东西,点一下之后可以把这个 web APP,一个 demo 的应用程序打开,我们可以自己上传自己的私家车的照片,我在这里,我就选用一张这个 sample,对他进行点击,那这个时候就会调用背后的API对这张照片进行这个隐私保护的处理。
我们可以看到这个除了车辆之外的部分都被打上了说话,然后车牌的部分也精准地被定位到,并且把那个隐私的信息给模糊掉了。
那我们前端的用户调用了 API 之后,我们来到这个简单律师服务当中,我们就可以看到这个日志。被实时的从服务器当中被调取被解析,我们可以看到这里产生了一个 POS 的行为,来自于这个 IP 地址,然后对这个端口 mpon进行了一次,这个呼叫调用的操作,但是单单只看进行一些数据的话,其实它意义并不大,所以我们在简单的知服务当中呢,加了一个 transformation。
要转换,具体来讲就是把进来的原始日志的数据,对它进行一定程度的解析,其中包括把 IP 地址转换成这个国家城市地理、进度、纬度等这些信息。有了这些信息之后,我们的实时计算全多宽 Frank 计算,在这边我们建了一个给我用的项目空间,点进去的话,就可以看到我们写了一些 SQL 的简单的脚本。
去订阅来自简单日志服务的这个实时的日志,对它进行一定程度的操作,操作完了之后,点一个上线,它就会变成一个作业,被托管在 flink 的集群里边,我们通过这个控制台可以非常简单易懂的去把握整个 flip 集讯现在的状态。他处理的任务至今有多少,是否有报错哪些事件,包括可以看到这个 keymetric 的一个变化,所实现的一个变化情况,还有大量的用于 debug 条约的这些信息、日志都在这边,所以 flink 的一个特别大的优点就是我作为一个开发人员,我只需要包括在专注 SQL 实现我们业务该实现的逻辑,不需要花太多的精力去做运维,包括工程部署这些,基本上都是全部作为一个托管的服务。
当我们把这个流式数据在福利里边处理完了之后,我们需要有个保存的地方,那这个保存的地方,在本电脑当中我们就是使用了 Hologres,在里边我们建了一些表,去存储这个已经被处理完的最终的数字,那这个数字以流的方式再被写入到 Hologres 当中,同时,我们对它同一张表格,对它同时进行这个查询分析一些的操作。
不诉我们可以看到,我刚刚试用了一下那个 API,我们可以看到这个时间,这个时间是中国的时间,我现在是在日本,所以比中国早了一个小时。
我们可以看到来自中国的这么一个 IP 地址,有了一个 POS 的访问,做了这些事情,我们已经可以看到实时的被写入到 hosts 中,那么利用这个 Hollywood 当中,被处理完的数据,我们就可以作为数据源去给下游的得力。
提供实时的信息,现在有从我这个 demo 大线玩到现在,已经有22000多次的这个 API 的调用,几乎是可以说来自世界各地,五湖四海,其中肯定也来自国内的,中国的这个访问会比较多一点,其中我们对于这个访问的属性,做了一定的这个甄别,有些可能是错误的访问,或是恶意的访问,有些绿色的就代表是一个正常的访问,这样的话,就可以帮助决策者。
在非常短的延迟的情况下,可以看到当下实时的 API 的调动的情况,如果出现一些认为被认为不是很正常的情况的话,我们可以及时进行干预。这样的实时的分析能力,其实今天在这个节目当中,我们只是展现了数据数据分析的一个结果,但其实在实际的业务当中,还可以被用作异常的侦测。
比如说在游戏行业的话,可以被用来做这个检测,有没有作弊的行为,有没有外挂的行为等等,那这些对于实时性要求非常高的业务的需求的话,就会需要用到我们在这个本个 demo 当中给各位所展示过的,类似于这个架构,去实现端到端秒级的实时分析,实时掌控。实时处理的一个能力,那这是我们这次准备的第一个 demo 场景。
二、 车辆引擎实时预测维护
接下来是第二个业务场景,我们希望通过这个模拟的摇摇测数据去分析判断在行走在马路上的车,他的车的引擎是否展现一些这个异常的征兆,使得我们可以去提前判断。
这车可能现在也许有点问题,如果放任不管的话,三个月之后可能某个部件也就坏了,这也是一个在实际应用场景当中经常会提到的一个需求,我们称之为 predictive maintenance 就是预测性的维护,预测性的维护在实现的应用场景当中,可以帮客户方省下大量的金钱,因为等你东西坏了,你再去修,肯定不如在他坏之前你就已经知道他要换,把它给替换了。
为了实现这么一个比较接近真实世界的场景,我们做了一些调研,其中就包括了解在车展设备当中有个叫 obdi 的。这个诊断系统,它里边经常包含哪些精品的数据,我们把这些数据采集了一部分过来,然后对它进行加工,进行模拟,写了一个程序呢,是实际上模拟一个比较真实的,在现实环境当中运行的车的引擎的一个数据,那当然本次,因为这是一个 demo,我们是不太可能真的让一辆车在马路上开,所以就写了这么一个程序,利用了各种各样的统计分析的手法去模拟生成这样的行车数据,尽可能达到真实的效果。
这个程序会把模拟的新车引擎遥测数据把它个投递到 Kafla MQ,然后通过我们的实时计算 think 去消费 kafla 的topic,然后根据每个topic进行不同的流失计算,把流失计算的结果呢,我们一部分让他。归档在也就是冷存储,Co pass,把它存储下来之后,我们就有了历史数据,那一部分我们作为热流,热数据源就直接投递给我们开发的一个异常政策的模型,把它部署在 Pai EAS 上面,通过 Frank 可以直接去掉用,然后做了这个机器学习的判断了之后,再去看,我们现在当下的这个领先的数据有没有异常的征兆,那再把这个结果写入到数据库里边,给后边的同样也是对ab进行一个nearrealtown 近视时的一个消费,那么刚才我有提到,这个 realtimecomputer,它做了实时的处理之后,一部分的数据把它归档到了 VS 里边,那这部分数据直接用来干嘛,用来作为历史数据去建模,甚至是重新训练模型,因为每隔一段时间,可能行车的这个特征万一发生了一些变化是底层的 drifting,那么我们都可以用新的产生的历史数据去对模型进行重新的训练,重新训练完的模型,就可以把它作为 web service,把它部署到帕上,供阿里云去用这样的话就完成了一个俗称 lambda 架构的一个大数据的一个解决方案。
我们跟刚才一样,也具体要看一下它里边的各个细节,首先我有提到,我们需要做一些这个模拟数据生成的。工作去把引擎的遥测数据,OBD2的数据把它给模拟出来,投递到这个云上去做分析。那么这边呢,我们采用的是函数计算,函数计算非常的方便,它首先是一个托管服务,它是一个 service 的服务,其次,我们可以把 python 的脚本,从本地开发好的脚本直接照搬。
我们可以把拍摄的脚本从本地开发好的脚本直接照搬靠边配置到这个函数计算里边啊,利用他的这个托管的计算去执行这个模拟数据生成的这么一个程序脚本,非常的方便,我们在本次的模当中采用了每一分钟执行一次函数计算,也就是生成一部分的一个八次的一个批次的这个遥测数据,然后每次生成,间隔三秒投递一个数据,到 Kafal 里面去作为消费,那尽可能去模拟一个真实环境当中的这个数据产生的一个频度。kafal 也是一个非常非常常用的大数据的 pop 的一个系统,它它非常的灵活,扩容性非常的棒,那么在阿里云上的 kafal,我们非常可以在比如夜猫里边自建一个kafal资讯,也可以使用叫 kafal em m q 的一个托管服务来搭建一个完全 service 的 kafal 系统。
本次 demo 的为了方便,我们就采用了 kafal 高 温去搭建一个托管式的这个 pop 的 system,这个 system 其实只是用来囤积前方生产的,也就是我们的车辆投递过来的,这个引擎的数据,那么在在实际的生产环境当中,你车不可能是一辆,甚至肯定是几万辆,几十万辆都有可能,那么采用 kafal 的话就可以非常方便的去扩容,我不管前端的车有十辆还是10万辆,我的架构都不需要做太大的变故,就可以从容地应对这些扩容的弹性的需求,到了实时计算的部分,我们采用的仍然是这个 Frank 的这个实时计算系统啊,只不过在本次的节目当中,我们用的是一个叫 blink 的独享集群,也就是所谓的半托管式的,这个实时计算的平台,其实跟刚才在上一个场景当中,我给诸位展示了这个全托管的。几乎使用方法是几乎是一模一样的,只不过当时在制作这个 demo 的时候,一部分的区域还未上线,flink 全托管,所以我们选择了一个叫 blink 独享集群的每个服务,同样也是报在实时计算的这个家族当中,用起来的方法,几乎跟全托管是一模一样的,开发人员也只需要 photoshop 写这个脚本去做业务。逻辑的处理,然后点一个上线,那剩下的基本上就是完全由 Frank 代为管理,我们只需要去 monitor 监控,有没有异常的出现,包括做一些调优等的工作,非常的方便啊。在这边值得一提的是,我们把 PAI S 的这个模型调用的接口嵌入到了 flink 里边,那么使得福利在实时处理流数据的时候,同时,也可以把一部分的数据扔给 PAI 去做模型的这个推论,那么把结果再跟这个实时流数据合并起来之后,一并写入到我们的下游的存储系统里边,这部分体现了 flink 计算平台,它的一个延展性和扩容性。非常的好用,这部分是展现我们如何用派 studio 一个图形化的机器学习的平台去设置开发一个非常简单的二元分类模型,这个二元分类模型主要就是从过去引擎的历史数据当中去学习哪些特征会被用来判断为引擎有问题,那哪些是是比较属于正常的这个数值,那么我们通过建了这个模型之后,我们就有依据可以用来对未来新产生的引擎数据去进行同一个判断,那么这样的话有助于业务人员提早去预知,现在的这个引擎的数据看起来好像有一点点问题,是因为我们从过去当中已经学习到了相关的特征,以及这个 data 的 pattern 这个模型。
模型的开发,整个过程我们用的 cpu 完全是拓展,几乎是一条大脑门写过,我可能在里边写了一段简单的 python 的脚本,但也没必要,完全可以通过 local 来实现一个模型的开发。那么更好的一点在于,当我今天模型开发完了之后,通过 PAI 我们可以一键部署,把它包装成一个 rest API,一个 web service,放在 pad 平台上去供用户去调用,这个画面就显示了。
一键部署之后,我们去对这个部署王的模型的服务进行一个测试调用的这么一个减免,非常的方便啊。那么当模型完成了,部署完了,我们通过 flink 让他判断完有没有异常,这个流数据进行完实时的处理之后,最后把它写到了一个比较传统意义上用的比较多的 ,SQL 的一个数据库里边,那这个数据库,就会作为数据源去给下游的实时大屏提供一个数据的支撑,那这样的话,业务人员就可以以进食时,也就是隔几秒的这个金额就能看到我现在目前在路上跑的这个车到底有没有问题。在这边放了一个 link,它会把这个实时的大屏给打开,可以看到,
我们整个 demo 架构端到端跑的一个结果,这个 DV 的大屏,我是把它设为每五秒更新一次,也就是说每五秒,我会从数据库当中把最新的预遥测数据,包括这个判断,有没有异常的这个数据啊,把它给拿出来展示在大屏上我们的。
在这边可以看到红色代表这个时间点采集上来的数据,有可能代表是有问题的,那么蓝色就代表什么,我们就是比较正常的数据,这个,为什么是正常不正常,完全是由我们之前产生模拟数据的那个方生控制在控制,因为我们在放生控制的逻辑里边,人为加的,故意的,加了一些会让引擎看起来出错的这种数据,使得这个能把这个不正常的部分能不能体现的更多一点。
之前提到我们用函数计算去生成以前的遥测数据,在这里边,我们换了一些 python 的代码,使得它能在这个全托管的服务的环境当中去运行。那么在这个控制台当中,不仅我们可以去编写代码,管控代码,我们甚至还可以。对它进行一定的调优,这是原代码,我们当时使用了 mark can 作为模拟引擎的特征的一种方式,里边还采用了一些其他的统计的分布的手法,使得这个引擎产生这个异常的这个数据看起来会比较真实一点。
在这边函数计算里面,我们也可以看到,他每次被调用的时候,他的一个 log 输出的情况是否有异常,包括他的一些指标,运行的指标是否也超过了我的一些限制,或者说是有没有异常报警之类的情况,都可以非常方便的去进行一个跟踪和管控。这个数据,传到了卡夫卡之后,同样我们会到 Frank 实时计算平台的,他的一个叫 blink 多项几圈。边去调用它,那么这边用的也是大家非常熟悉的 S QL 去进行一个数据的读取、处理、结合和操作。那值得一提的,是这边有一个叫资源引用的部分,我们网上投放了一个自己编写的 Java 的一个库,这个 Java 的库,就是用来调用 PAI S5 部署在 PAI 上的那个模型的端口。其实里边是一个非常简单的一个 restapi 的 request,那么我们把它用加法编写好了之后,把它包装成一个加的文件,可以上传到 Frank,Frank,就可以在 SQL 里边通过增加这么一段话,去调用我们在外不写的程序,这样的话就等于是无限的扩大的 Frank 里边能进行的操作。那么今天我们是调用模型派的模型的,明天可能我需要调一个别的一个什么东西。你同样可以用同样的方法把它作为一个插线,追加到 Frank 里边,然后在 Frank sq 里面去作为一个函数去调用它,非常的方便了。
最后这个处理的结果,点运维他可以看到在跑,他的跑的一个当下的一个情况,有没有出错,资源消耗是多少,延时是多少等等。
包括里面的一些日志,交给一个输出的情况啊,有没有这个集群条约的可能,包括有没有一些这个废物的情况,我们都可以非常的方便,在这个控制集群上去,注意把握好,那么这个 PAI 的部分也是具体我就不展开了,我们是用可视化建模的这么一个工具,用历史的引擎数据把它装在进来,然后做了一个非常简单的二元分类的模型,那么这个模型开发本身已经是非常非常简单的,连代码都几乎都不用写,那么更好的地方在于它可以实现一键部署。
那么选完了之后,我们只要点一下模型,再建部署他就会帮你把模型封装了一个 CPI,部署到排上,那么我们可以通过这个 pie 模型在线服务的东西去掉,用这个模型,包括对它进行一定的这个 unit test 啊,我们可以在这里看到调研的方式,以及可以进行在线的调用。
在这边准备了一个简单的一个数据通过。UI 对它进行,对我们部署的模型进行一个调用,从返回的结果当中我们就可以谈到,如果我输入的数据是这些的话,那么模型判断它正常的情况,百分比以及不正常的情况来看,以这个数据来看这个引擎,目前是96%的,肯定是正常的。
所以非常快速方便的能把一个模型开发好,并且部署完投入到生产环境当中的使用。