开源最大的特征就是开放性,云生态则让开源技术更具开放性与创造性,Elastic 与阿里云的合作正是开源与云生态共生共荣的典范。值此合作三周年之际,我们邀请业界资深人士相聚云端,共话云上Elasticsearch生态与技术的未来。
本篇内容是力萌信息数据技术专家李猛带来的Elasticsearch基于Pipeline窗口函数实现实时聚合计算
分享人:力萌信息数据技术专家李猛
视频地址:https://developer.aliyun.com/live/246153
本文将通过三个部分展开介绍Elasticsearch基于Pipeline窗口函数如何实现实时聚合计算
- Pipeline实时计算模型
- ES-Pipeline实时计算能力和工作特点
- ES+X实时计算畅想
一、 Pipeline实时计算模型
首先,我们来探讨一下Pipeline实时计算模型是什么。
Pipeline翻译过来是管道的意思,上图大家可以看到有三根管子,我们在大数据领域或者在做应用系统的时候,其实编程抽象来说就是这三点:输入数据,process处理,输出。如果业务程序或者大数据逻辑比较复杂,那么输出就会成为下一个管道的输入,所以就会到第二根管子;第二个管子处理完之后又会到下一根管子,也就是不停地在写很多复杂的数据处理逻辑。其实早期很多同学去做大数据开发,Map-reduce如果把中间的数据描述出来会发现也是这三步。Map- reduce-map-reduce的循环,中间的数据一直在,也是按照这种管道的模型不断地在变化。其实最早Pipeline的思维是来自于早期接触到Elastic公司推出的ELK三件套,其中采集数据的编程的模型配置里面有三个步骤其实就应对了这个模型,在编程里面可以input,然后可以接受上一次处理的逻辑,这就是管道。
Streaming计算模型
我们在大数据领域经常会谈到一个概念叫流式计算,而Pipeline这个模型无论是做实时计算还是离线计算,其实思维都是Streaming的计算模型。
如图,DStream数据从上游进行到下游,是按照追加这种方式的一种流计算,我们要去从中提取数据进行一些模型和处理。而在SPARK领域,它目前的流计算只支持微批处理这个概念,但这并不影响我们管道实施的思维模型。
Streaming窗口计算模型
在Spark中我们又可以做一个窗口计算模型,创建一个window,设定一个1-2分钟的时间窗口,在时序的数据处理场景确实很合适,比如使用spark streaming等等。当然有很多数据场景上它并不是时序的,这个时候我们要基于别的逻辑去做这种数据的窗口计算,我最早的窗口思维也是来自于Spark,大概四五年前系统地学习Spark的时候,第一次感受到window其实就是我们传统地做数据统计的时候设定的一个时间范围,只是换了一个计算引擎。以前我们是在数据库里选定一个时间窗口,在Spark里我们是基于一个内存模式把数据丢进去,然后在这个范围内计算。
当前在实时计算领域,Flink绝对是领先的。大家可以看到,在Flink计算体系里,中间是Flink的计算引擎,它的上游可以支持很多日志,IOT,Clicks,real-time,离线型的Base事件,中间它会通过一些管道处理,这些管道本身就可以支持互相的穿插,通过管道把一些数据处理完后输出到另外一个管道,做下一步的数据。如果数据最终没有再继续处理,就把这个数据交换出去,存到某一个系统里面。
图中大家可以看到,用Flink做实时计算至少需要三点:
第一是上游的数据输入源input,中间就需要用flink,当然也可以选择自己写入程序,如果简单的话是没有任何问题的,或者是用logstash等等。下游的输出端也有很多,可以输到下一个应用程序里面,可能是另外一组的Flink或其他的的系统,也可以输入到Event Log,也可以输出到最终的数据库里面。但我们想讲的Pipeline并不是这个意思,这只是我的一个思维起源,Flink这个框架产品帮我解决了很多调度的问题。
Q:现有流计算的问题是什么?
在标准的实时计算的领域,至少需要三个以上的步骤。
第一,现在基本上大部分企业都会选择Kafka ,中间选择Flink,下游根据应用程序逻辑有可能还需要下一步的处理,会放到Kafka,也可能最终不需要处理,需要给Elasticsearch做最终查询。目前来说Elasticsearch在数据查询方面是领先的,在这个领域里是最好用的。我们在做一个标准的实时流计算的时候,基于Pipeline会需要输入-处理-输出,也就是去搭建一个哪怕是轻量级的实时数据处理计算都需要融入至少三个产品,会带来什么问题?大家可以想象,在IT里做系统架构等等,每增加一个处理的环节,增加一个节点,或者每增加一个大型的数据系统融入进来,这个系统的复杂性就会增加好几倍,并且数据架构的可靠性也会降低。同时,这三个产品对于研发人员和对架构师的考验非常大,不能保证很快就可以完成。
举个例子,我们一般数据计算之后,会用Elasticsearch去存储,而Elasticsearch也有多方面的配置,索引的创建,集群的搭建等等又回到一个现实的问题,就是数据库是否需要一个DBA,上面有什么业务在并行,所以每一次当我们融入新的技术都会遇到问题。所以今天探讨的是,虽然Pipeline计算模型非常好,但是现在的实时计算的思维是有问题的,一些产品并不能简化我们的操作。
二、 ES Pipeline实时计算能力
ES基于它已有的特性,为了避免上面三件套的问题,在Pipeline上做了一些工作。
1、ES Ingest Pipeline
Elasticsearch推出了Ingest功能,即数据处理。图中大家可以看到,上游数据用任何输入源都是可以的,然后经过Elasticsearch里Ingest这个具备处理能力的节点,让你的数据经过Pipeline管道的处理,可能需要拆分数据做一些复杂的计算。比如本来传入一个性别,需要把性别转换成矩阵,把男和女转化为零和一这样的区别,然后就可以在这里面去编程,完成之后数据直接输出到一个索引里。图中大家可以看到,相比之前的基于标准三件套的实时处理要简化了很多,用一个产品可以替代之前的几个产品。可以把Kafka,Flink的逻辑合并掉,全部用Elasticsearch来替代,就更加便利。所以如果你对ES有兴趣或是熟知的话,可以大规模地使用它,它也有很多缺点,但是它非常的实用,用一个技术替代了三个技术,从图中可以看到它其实也是完全符合Pipeline聚合模型的。
Ingest里提供了很多pipeline 的函数,其中有一个功能叫 enrich。当数据输入后,ES的索引可以做一些反查比对,然后再把数据丰富一下。相当于数据进来后要去查一下数据库,然后把一些附属的属性绑定,最终塞到另外一个索引里,其实这也是一个标准的管道。所以在现实世界里管道编程的这种思维其实早以深入人心,并广泛应用。右图是来自于一个标准的函数,这个函数里还有一个用来给原数据增加一些字段的processor,这个是ES的一个特性。 对于这些特性不用去刻意地记录,当你认可我们这种编制思维模型的时候就可以去学习一下。
2、ES Rollup Pipeline
Rollup,即数据上卷,图中左边大家看到数据有三种颜色,这些数据表示的是一个点击事件,记录某一些用户点击的事件,性别。在原始的数据里,我们可以想象,假设做一个网站或系统,pv、uv的统计,每天可能有上10亿的数据量,但是我们分析需求的时候实际上只需要分析今天什么时候哪个域名点击了多少次,这个我们可以通过Rollup这个概念把数据做一次转换,压缩。到右边,我们的数据就已经精确到天或者按小时。对于ES来说,它其实也是遵循三个关键步骤:第一,原始数据输入到Elasticsearch里存起来,中间开启一个Rollup实时计算的能力,然后把数据经过一定的折叠之后,输出到另外一个索引,这样就完全满足了Pipeline的思维,也完全满足了实时计算。所以在这个领域, ES算做了一个伟大的创新,只需要一套去处理,就可以把管道模型深切地融入到自己的数据里。如果基于Flink做数据统计,上游会先用Kafka输入数据,然后中间用Flink计算,比如每1000条,每1万条数据就Rollup一下,然后输出到另外一个下游又会放到ES里来存储,这就会带来技术的成本,实施的代价。
我们用颜色解决了海量明细数据的查询,但是现在又有一些隔夜的统计需求,比如定期的,该如何去做?常规来说,有些大数据的专家给你建议可能会需要把数据取出来,这个其实并不是很好,所以这个时候如果你用的是Elasticsearch这一套件,可以自己创建一个Rollup节点输到另外一个就完成了。带来的经济效益不言而喻,而且功能特性也非常简单。
3、ES Transform Pipeline
Rollup其实是基于时间的维度,那么如果数据并没有时间,想要做Pipeline转换的处理也是一样的。在ES里推出了功能叫Transform Pipeline的模型,同样可以解决这种问题,只是统计方式略有不同。在Transform里用户也可以编辑很多自己的函数,也可以写一些复杂的脚本。
4、ES Aggregations Pipeline
接下来讲到ES强大的聚合能力,聚合其实是做统计,比如1000条数据要做一个SUM的值,或者AVG,Min ,Max等等,这些就叫Aggregation。在Elasticsearch 里其实是会根据现实世界融入自己的一些逻辑。图中我们可以看到,聚合一共分成了两次,初次聚合基于原始索引输入做一次聚合,聚合之后压缩,比如原来有一亿条数据,聚合出来大概的结果就只有1万,然后再基于这1万左右的数据又要做二次聚合。在ES里这个思维就叫Pipeline,可以在官方网站搜索到叫Elastic Pipeline的聚合。二次聚合之后,ES把第一次和第二次聚合的结果一起推给应用端,这个特性是其他很多数据产品没有的。比如原本数据库里的SQL是做不到的,其他的一些大型产品可能会用到,但是复杂程度极高,所以比较推荐ES这个功能。
在Elastic里,Pipeline的窗口函数其实提供了很多方式,比如Aggregation目前提供了至少图中这三种方式。比如可以选择moving avg计算移动的平均值,通过自定义函数moving fn写自定义脚本等等。
以上就是基于Pipeline实现窗口计算的一些现实的问题和逻辑,还有ES的一些特性。
三、 ES+X实时计算畅谈
(一)数据编程本质
现实世界中,写一个函数,做一个系统,会发现这个系统提供对外一些API输入数据,然后系统做一些逻辑,逻辑之后又输出,可能输出到下一个处理环节,如果是数据库就存在数据库里面。输入的地方也是一样,假设是一个rest API,接受外围的请求,处理,输出,返回数据,假如是一个大数据里面的离线计算批处理,也是写一个批处理逻辑输入数据,比如Spark读取文件,然后做计算,输出,可能又存到输入上去了,或者是输到ES。如果这一步完成了,我们在大数据里面经常还要做调度编排,一般会用Airflow去完成,在编写Airflow流程的时候,实际上是编写一个一个流的方式,然后把它挪到下面去,再输入到下一个逻辑。
(二)流式实时计算数据库
虽然ES也具备一些实时的处理能力,但它有很明显的局限性。比如它不支持非常精确毫秒级的处理。抛开这个限制,我希望有一种数据产品既可以充当Kafka的角色,也能支持流式数据进来。我可以编写简单的SQL处理函数在上面做一些计算,经过Pipeline可以支持很多函数,同时这些数据又可以再回到数据库里去消化掉,这个数据库同时又可以对外提供查询或者其他的能力。其实我想探讨的就是这个思维,all in one,我们需要一个叫做Streaming Database的概念,希望有一个流式数据库,把数据的in和process,还有数据的output全部融在一起,让整个编程模型变得简单高效。相比之下之前所提的三件套就太过于标准化,过于陈旧,虽然它很吸引人,很优秀。
(三)Elasticsearch + X畅谈
我希望ES可以支持这三种门类,其实它已经支持了两种,就是数据的process里有in,transformer,还有aggregation,还有一些其他的函数能力。如果能把上游的流式解决,我相信未来它应该可以占领更多的流式的处理市场。
其实ES本身是支持索引的,如果把这个索引充当一个Buffer流就会面临,数据可以写进来但是读数据就比较麻烦,因为它的数据在顺序上没有得到很好的控制,数据可以从头写到尾,但我们无法像Kafka一样从尾读到头,这就是ES还需要改进的地方。所以希望未来市场上有更多Streaming产品,因为现在它已经具备了Bash批处理模拟,如果它未来加入了,我相信在ES的市场上绝对能够很好地发展。比如把Kafka的节点融进来;把Kafka的特性、具备的能力加入到ES里面来;创建索引的时候直接就可以创建一个队列的索引;写数据的时候可以从头写,读的时候可以从底部读,不用按照标准的索引search去查了。然后经过process节点的时候,processor具备了Flink的处理能力和逻辑能力,最终输出到索引。
这就是我想和大家探讨的一个全新思维,虽然标题讲的是基于窗口的,但其实窗口计算在整个大数据里只是占了很小的一点,从high level这个层次来看,实时计算流法还是本质。
【阿里云Elastic Stack】100%兼容开源ES,独有9大能力,提供免费 X-pack服务(单节点价值$6000)
相关活动
更多折扣活动,请访问阿里云 Elasticsearch 官网
阿里云 Elasticsearch 商业通用版,1核2G ,SSD 20G首月免费
阿里云 Logstash 2核4G首月免费
下载白皮书:Elasticsearch 八大经典场景应用