Flink实战:全局TopN分析与实现

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Flink 全局TopN分析与实现

在上一篇Flink实战: 窗口TopN分析与实现中实现了在一个窗口内的分组topN,但是在实际中也会遇到没有窗口期的topN,例如在一些实时大屏监控展示中,展示历史到现在所有的TopN数据,将这个称之为全局topN,仍然以计算区域维度销售额topN的商品为例,看一下全局TopN的实现方法。
先将需求分解为以下几步:

  • 按照区域areaId+商品gdsId分组,计算每个分组的累计销售额
  • 将得到的区域areaId+商品gdsId维度的销售额按照区域areaId分组,然后求得TopN的销售额商品,并且定时更新输出

与窗口TopN不同,全局TopN没有时间窗口的概念,也就没有时间的概念,因此使用ProcessingTime语义即可,并且也不能再使用Window算子来操作,但是在这个过程中需要完成数据累加操作与定时输出功能,选择ProcessFunction函数来完成,使用State保存中间结果数据,保证数据一致性语义,使用定时器来完成定时输出功能。

销售额统计

对数据流按照区域areaId+商品gdsId分组,不断累加销售额保存起来,然后输出到下游。

1.  `val env =StreamExecutionEnvironment.getExecutionEnvironment`

2.  `env.setParallelism(1)`

3.  `val kafkaConfig =newProperties();`

4.  `kafkaConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");`

5.  `kafkaConfig.put(ConsumerConfig.GROUP_ID_CONFIG,"test1");`

7.  `val consumer =newFlinkKafkaConsumer011[String]("topic1",newSimpleStringSchema(), kafkaConfig)`

8.  `val orderStream = env.addSource(consumer)`

9.  `.map(x =>{`

10.  `val a = x.split(",")`

11.  `Order(a(0), a(1).toLong, a(2), a(3).toDouble, a(4))`

12.  `})`

14.  `val salesStream=orderStream.keyBy(x =>{`

15.  `x.areaId +"_"+ x.gdsId`

16.  `}).process(newKeyedProcessFunction[String,Order,GdsSales](){`

18.  `var orderState:ValueState[Double]= _`

19.  `var orderStateDesc:ValueStateDescriptor[Double]= _`

21.  `override def open(parameters:Configuration):Unit={`

22.  `orderStateDesc =newValueStateDescriptor[Double]("order-state",TypeInformation.of(classOf[Double]))`

23.  `orderState = getRuntimeContext.getState(orderStateDesc)`

24.  `}`

26.  `override def processElement(value:Order, ctx:KeyedProcessFunction[String,Order,GdsSales]#Context,out:Collector[GdsSales]):Unit={`

28.  `val currV = orderState.value()`

29.  `if(currV ==null){`

30.  `orderState.update(value.amount)`

31.  `}else{`

32.  `val newV = currV + value.amount`

33.  `orderState.update(newV)`

34.  `}`

35.  `out.collect(GdsSales.of(value.areaId, value.gdsId, orderState.value(), value.orderTime))`

36.  `}`

37.  `})`

使用keyBy按照areaId+gdsId来分组,然后使用KeyedProcessFunction来完成累加操作。在KeyedProcessFunction里面定义了一个ValueState来保存每个分组的销售额,processElement完成销售额累加操作,并且不断更新ValueState与collect输出。
说明:这里使用ValueState来完成累加过程显得比较繁琐,可以使用ReducingState来替代,这里只是为了表现出累加这个过程。

区域TopN计算

上一步得到的salesStream是一个按照区域areaId+商品gdsId维度的销售额,并且是不断更新输出到下游的,接下来就需要完成TopN的计算,在Flink实战: 窗口TopN分析与实现中分析到TopN的计算不需要保存所有的结果数据,使用红黑树来模拟类似优先级队列功能即可,但是与其不同在于:窗口TopN每次计算TopN是一个全量的窗口结果,而全局TopN其销售额是会不断变动的,因此需要做以下逻辑判断:

  1. 如果TreeSet[GdsSales]包含该商品的销售额数据,则需要更新该商品销售额,这个过程包含判断商品gdsId是否存在与移除该GdsSales对象功能,但是TreeSet不具备直接判断gdsId是否存在功能,那么可以使用一种额外的数据结构Map, key为商品gdsId, value为商品销售额数据GdsSales,该value对应TreeSet[GdsSales]中数据
  2. 如果TreeSet[GdsSales]包含该商品的销售额数据,当TreeSet里面的数据到达N, 就获取第一个节点数据(最小值)与当前需要插入的数据进行比较,如果比其大,则直接舍弃,如果比其小,那么就将TreeSet中第一个节点数据删除,插入新的数据
    实现代码如下:
1.  `salesStream.keyBy(_.getAreaId)`

2.  `.process(newKeyedProcessFunction[String,GdsSales,Void]{`

3.  `var topState:ValueState[java.util.TreeSet[GdsSales]]= _`

4.  `var topStateDesc:ValueStateDescriptor[java.util.TreeSet[GdsSales]]= _`

6.  `var mappingState:MapState[String,GdsSales]= _`

7.  `var mappingStateDesc:MapStateDescriptor[String,GdsSales]= _`

8.  `val interval:Long=60000`

9.  `val N:Int=3`

10.  `override def open(parameters:Configuration):Unit={`

11.  `topStateDesc =newValueStateDescriptor[java.util.TreeSet[GdsSales]]("top-state",TypeInformation.of(classOf[java.util.TreeSet[GdsSales]]))`

12.  `topState = getRuntimeContext.getState(topStateDesc)`

14.  `mappingStateDesc =newMapStateDescriptor[String,GdsSales]("mapping-state",TypeInformation.of(classOf[String]),TypeInformation.of(classOf[GdsSales]))`

15.  `mappingState = getRuntimeContext.getMapState(mappingStateDesc)`

16.  `}`

17.  `override def processElement(value:GdsSales, ctx:KeyedProcessFunction[String,GdsSales,Void]#Context,out:Collector[Void]):Unit={`

19.  `val top = topState.value()`

20.  `if(top ==null){`

21.  `val topMap: util.TreeSet[GdsSales]=new util.TreeSet[GdsSales](newComparator[GdsSales]{`

22.  `override def compare(o1:GdsSales, o2:GdsSales):Int=(o1.getAmount - o2.getAmount).toInt`

23.  `})`

24.  `topMap.add(value)`

25.  `topState.update(topMap)`

26.  `mappingState.put(value.getGdsId, value)`

27.  `}else{`

28.  `mappingState.contains(value.getGdsId) match {` 

29.  `case true=>{//已经存在该商品的销售数据`

30.  `val oldV = mappingState.get(value.getGdsId)`

31.  `mappingState.put(value.getGdsId, value)`

32.  `val values = topState.value()`

33.  `values.remove(oldV)`

34.  `values.add(value)//更新旧的商品销售数据`

35.  `topState.update(values)`

36.  `}`

37.  `case false=>{//不存在该商品销售数据`

38.  `if(top.size()>= N){//已经达到N 则判断更新`

39.  `val min = top.first()`

40.  `if(value.getAmount > min.getAmount){`

41.  `top.pollFirst()`

42.  `top.add(value)`

43.  `mappingState.put(value.getGdsId, value)`

44.  `topState.update(top)`

45.  `}`

46.  `}else{//还未到达N则直接插入`

47.  `top.add(value)`

48.  `mappingState.put(value.getGdsId, value)`

49.  `topState.update(top)`

50.  `}`

51.  `}}}}`

52.  `})`

在open中定义个两个state:ValueState与MapState, ValueState保存该区域下的TopN商品销售数据GdsSales,MapState保存了商品gdsId与商品销售数据GdsSale的对应关系。
在processElement中,首先会判断ValueState是否为空,如果为空则定义按照销售额比较升序排序的Comparator 的TreeSet,则走更新逻辑判断。

定时输出

到这里我们已经计算出了每个时刻的TopN数据,存储在ValueState[java.util.TreeSet[GdsSales]] 中,现在希望每隔1min将TopN的数据输出,可以使用在时间系统系列里面提供较为底层的直接获取到InternalTimeService来完成,由于ProcessFunction本身提供了定时调用功能,我们就按照在窗口实用触发器:ContinuousEventTimeTrigger中讲到的持续触发器的原理来实现,

1.  `var fireState:ValueState[Long]= _`

2.  `var fireStateDesc:ValueStateDescriptor[Long]= _`

3.  `//放在open方法中`

4.  `fireStateDesc =newValueStateDescriptor[Long]("fire-time",TypeInformation.of(classOf[Long]))`

5.  `fireState = getRuntimeContext.getState(fireStateDesc)`

定义了一个ValueState,保存每一次触发的时间,不使用ReducingState是因为没有Window里面在使用SessionWindow的合并机制。

1.  `//放在processElement里面`

2.  `val currTime = ctx.timerService().currentProcessingTime()`

3.  `//1min输出一次`

4.  `if(fireState.value()==null){`

5.  `val start = currTime -(currTime % interval)`

6.  `val nextFireTimestamp = start + interval`

7.  `ctx.timerService().registerProcessingTimeTimer(nextFireTimestamp)`

8.  `fireState.update(nextFireTimestamp)`

9.  `}`

对于每一个区域areaId(key)在processElement只需要注册一次即可。

1.  `override def onTimer(timestamp:Long, ctx:KeyedProcessFunction[String,GdsSales,Void]#OnTimerContext,out:Collector[Void]):Unit={`

2.  `println(timestamp +"===")`

3.  `topState.value().foreach(x =>{`

4.  `println(x)`

5.  `})`

6.  `val fireTimestamp = fireState.value()`

7.  `if(fireTimestamp !=null&&(fireTimestamp == timestamp)){`

8.  `fireState.clear()`

9.  `fireState.update(timestamp + interval)`

10.  `ctx.timerService().registerProcessingTimeTimer(timestamp + interval)`

11.  `}`

12.  `}`

onTimer定时输出,并且注册下一个触发的时间点。

测试

准备数据

1.  `//2019-11-16 21:25:10`

2.  `orderId01,1573874530000,gdsId03,300,beijing`

3.  `orderId02,1573874540000,gdsId01,100,beijing`

4.  `orderId02,1573874540000,gdsId04,200,beijing`

5.  `orderId02,1573874540000,gdsId02,500,beijing`

6.  `orderId01,1573874530000,gdsId01,300,beijing`

等到2019-11-16 21:26:00得到结果

1.  `1573910760000===`

2.  `GdsSales{areaId='beijing', gdsId='gdsId03', amount=300.0}`

3.  `GdsSales{areaId='beijing', gdsId='gdsId01', amount=400.0}`

4.  `GdsSales{areaId='beijing', gdsId='gdsId02', amount=500.0}`

接着在生产一条数据

1.  `orderId02,1573874540000,gdsId04,500,beijing`

等到2019-11-16 21:27:00得到结果

1.  `1573910820000===`

2.  `GdsSales{areaId='beijing', gdsId='gdsId01', amount=400.0}`

3.  `GdsSales{areaId='beijing', gdsId='gdsId02', amount=500.0}`

4.  `GdsSales{areaId='beijing', gdsId='gdsId04', amount=700.0}`

至此完成全局topN的全部实现。

总结

全局TopN要求状态保存所有的聚合数据,对于key比较多的情况,不管是销售额数据还是定时器数据都会占用比较多的内存,可以选择RocksDb作为StateBackend。

                                         ----------------------------------- 

精彩推文:

1. Flink中延时调用设计与实现[](http://mp.weixin.qq.com/s?__biz=MzU5MTc1NDUyOA==&mid=2247483943&idx=1&sn=d3e1002255cbd6cfe16855639f82b80f&chksm=fe2b6668c95cef7e3639c575ac8cb899e7317f82c2b446a23335c8f33dc97a968ea7f345b914&scene=21#wechat_redirect)

2. Flink维表关联系列之Hbase维表关联:LRU策略

3. 你应该了解的Watermark

4. Flink exactly-once系列之事务性输出实现

5. Flink时间系统系列之实例讲解:如何做定时输出

关注回复Flink获取系列文章~

image

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
6月前
|
SQL Java 关系型数据库
Flink DataSet API迁移到DataStream API实战
本文介绍了作者的Flink项目从DataSet API迁移到DataStream API的背景、方法和遇到的问题以及解决方案。
216 3
|
6月前
|
自然语言处理 监控 数据挖掘
【Flink】Flink中的窗口分析
【4月更文挑战第19天】【Flink】Flink中的窗口分析
|
14天前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
46 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
1月前
|
SQL 消息中间件 分布式计算
大数据-124 - Flink State 01篇 状态原理和原理剖析:状态类型 执行分析
大数据-124 - Flink State 01篇 状态原理和原理剖析:状态类型 执行分析
66 5
|
3月前
|
消息中间件 监控 数据挖掘
基于RabbitMQ与Apache Flink构建实时分析系统
【8月更文第28天】本文将介绍如何利用RabbitMQ作为数据源,结合Apache Flink进行实时数据分析。我们将构建一个简单的实时分析系统,该系统能够接收来自不同来源的数据,对数据进行实时处理,并将结果输出到另一个队列或存储系统中。
215 2
|
1月前
|
消息中间件 druid Kafka
从Apache Flink到Kafka再到Druid的实时数据传输,用于分析/决策
从Apache Flink到Kafka再到Druid的实时数据传输,用于分析/决策
70 0
|
3月前
|
大数据 API 数据处理
揭秘!Flink如何从默默无闻到大数据界的璀璨明星?起源、设计理念与实战秘籍大公开!
【8月更文挑战第24天】Apache Flink是一款源自Stratosphere项目的开源流处理框架,由柏林理工大学等机构于2010至2014年间开发,并于2014年捐赠给Apache软件基金会。Flink设计之初即聚焦于提供统一的数据处理模型,支持事件时间处理、精确一次状态一致性等特性,实现了流批一体化处理。其核心优势包括高吞吐量、低延迟及强大的容错机制。
60 1
|
3月前
|
消息中间件 数据挖掘 Kafka
揭秘数据洪流中的救世主:Confluent与Flink的实时分析奇迹!
【8月更文挑战第9天】在现代数据处理中,实时数据分析至关重要。Confluent Platform与Apache Flink的组合提供了一套高效的数据流处理方案。Confluent Platform基于Apache Kafka构建,负责数据的收集、传输及管理;而Flink则擅长实时处理这些数据流,进行复杂分析。首先需配置Confluent Platform,包括设置Zookeeper、Kafka brokers及相关服务。
69 1
|
3月前
|
API C# Shell
WPF与Windows Shell完美融合:深入解析文件系统操作技巧——从基本文件管理到高级Shell功能调用,全面掌握WPF中的文件处理艺术
【8月更文挑战第31天】Windows Presentation Foundation (WPF) 是 .NET Framework 的关键组件,用于构建 Windows 桌面应用程序。WPF 提供了丰富的功能来创建美观且功能强大的用户界面。本文通过问题解答的形式,探讨了如何在 WPF 应用中集成 Windows Shell 功能,并通过具体示例代码展示了文件系统的操作方法,包括列出目录下的所有文件、创建和删除文件、移动和复制文件以及打开文件夹或文件等。
76 0
|
5月前
|
消息中间件 分布式计算 Kafka
深度分析:Apache Flink及其在大数据处理中的应用
Apache Flink是低延迟、高吞吐量的流处理框架,以其状态管理和事件时间处理能力脱颖而出。与Apache Spark Streaming相比,Flink在实时性上更强,但Spark生态系统更丰富。Apache Storm在低延迟上有优势,而Kafka Streams适合轻量级流处理。选型考虑延迟、状态管理、生态系统和运维成本。Flink适用于实时数据分析、复杂事件处理等场景,使用时注意资源配置、状态管理和窗口操作的优化。