大数据下的实时热点功能实现讨论(实时流的TopN)

简介: 我司内部有个基于jstorm的实时流编程框架,文档里有提到实时Topn,但是还没有实现。。。。这是一个挺常见挺重要的功能,但仔细想想实现起来确实有难度。实时流的TopN其实离大家很近,比如下图百度和微博的实时热搜榜,还有各种资讯类的实时热点,他们具体实现方式不清楚,甚至有可能是半小时离线跑出来的。今天不管他们怎么实现的,我们讨论下实时该怎么实现(基于storm)。

我司内部有个基于jstorm的实时流编程框架,文档里有提到实时Topn,但是还没有实现。。。。这是一个挺常见挺重要的功能,但仔细想想实现起来确实有难度。实时流的TopN其实离大家很近,比如下图百度和微博的实时热搜榜,还有各种资讯类的实时热点,他们具体实现方式不清楚,甚至有可能是半小时离线跑出来的。今天不管他们怎么实现的,我们讨论下实时该怎么实现(基于storm)。

857d7cec3f8cab472cb92203086d0142_70.png

5e4ce0cf63fbfa365bb5c5b0da7d3f36_70.png

 假如我们现在有一些微博搜索记录,求某个时间窗口(以5分钟为例)Topn其实就是对微博搜索记录做word count,然后排序截取前N个。离线情况下可以这么简单的解决了,但在实时流数据下,你每个时刻都会有新数据流进来,当前时刻你拿到数据里的topn在下一时刻就不一定对了。

 一个时间窗口的TopN结果必须是建立在该时间窗口的全量数据上的才能保证100%的正确性,然而在实时流情况下,由于各种不确定性的因素,你很难在一个时间窗口内拿到上个时间窗口的数据。以5分钟窗口为例,你想这个5分钟内就求出上个5分钟的热点Topn,但是很可能正常情况下5分钟只能拿到99%的数据,10分钟拿到99.9%的数据,甚至30分钟才能几乎拿到100%的数据。如果数据来源异常,很可能几个小时你都不大可能拿到某个5分钟窗口内的全部数据。百度热搜和微博热搜都没有说这些结果是哪个时间段的,就是想给自己留条后路,万一内部出故障了,好几个小时数据不更新,用户也看不出,哈哈。

 在现实情况下,这个5分钟的热点数据可能下个5分钟就得展示给用户,当然异常情况除外,那我们就可以退一步,稍微牺牲一点数据准确性来给出一个堪用的结果。这里有个非常简单可行的方案,实时流计算只做word count,然后把计算结果存储起来后有个旁路程序扫结果数据,排序后截取TopN,我估计好多人就是怎么做的,架构如下。

fd44105728745701f31c0b26911ffff9_70.jpg

 这个解决方案优点很明显,因为前面storm就是一个word count,简单可靠。但是如果面对大量的TopN调用,旁路Topn程序就会成为性能瓶颈,因为要涉及到大量的数据查询和排序,而且多一个系统,维护成本也变高了。如果我们想通过纯storm的方式去解决Topn应该怎么做?

 在实时热点topn计算过程中,整个计算包含word count和TopN两部分。另外有几个需要特别注意的地方,首先Topn是建立在全量数据上的,最后肯定只能由一个节点输出TopN值,所以需要在前面的计算节点尽可能的减少对后面传送的数据量。其次,像这种业务数据统计基本上都会出现数据热点的问题。


Word count

先来看下word count的部分如何做,如何解决数据热点,如何减少对后面的压力,我直接上topo图。

fecf04c76788d952c27f975655da3a1b_70.jpg

 其实在bolt中加cache就可以大大减少发出去的消息量,这里我还有个step2的bolt,是因为我们在实践中发现,如果多个bolt对hbase同可以key写入,虽然可以通过hbase的Increment来保证数据的一直性,但在其过程中要对行加锁,高并发的情况下写入性能会受影响。所以可以先数据流随机shuffle到step1,然后对流量做泄压,然后按key fieldsGrouping到Step2,由step2中的bolt对hbase的数据做get add put的三步操作,没有hbase的加锁操作,grouping后也没并发写一个rowkey的问题。


TopN

如果我们已经有count好的<词,词频>数据,其实我们并不需要全局排序后截取前n个来实现topn,其实用最小堆就可以大幅度减少TopN的时间复杂度。假设数据量为M,排序的时间复杂度可能到O(MlogM),用最小堆求TopN时间复杂度为O(MlogN),实际情况下N远小于M,这个优化还是非常大的。在实时流TopN中我们也可以用最小堆做性能优化,topo图如下。

603101d34917f8a6e8b5a9b0d0e0e597_70.jpg

 在spout方法数据的时候做fieldsGrouping,然后step1中的每个bolt就会维护一部分数据的TopN最小堆,缓冲一段时间后把minHeap里的数据全量发给finalTopN,finalTopn拿到数据后和自己的minHeap已合并就可以拿到正真的topn了。


最终实现

要实现实时热点功能,其实讲上面两个 word count和topn的topo合并起来后就好了,最终的topo如下。

c20a82ab0fb60685f830567304b42a05_70.jpg

  spout收到消息后随机shuffle,step1中的bolt讲一部分统计结果写入cache,待cache失效的时候按key fieldsGrouping到step2,这样可以减少对发出的消息量。 step2中的bolt不仅有个cache还有个minheap,cache中存的是每个key的wordcount,minheap其实是维护的改bolt拿到部分数据的topn。step2中cache数据会失效,失效的时候需要数据更新到hbase中,同时也更新minheap。step2中的minheap超时后,全量数据丢到finalTopn中,再由finalTopn汇总。

  在最后一步finalTopn中,同一个key可能由step2多次传下来,所以finalTopn更新其minheap的时候不能只是简单的和根节点做对比,heap中有的话要更新其值。


其它问题

1. 各过程中cache和minheap的失效机制什么设?

cache失效机制我有遇到过相关的例子,一般是数据量或是cache大小触发的,其实这个是做成参数配置的,在不同的业务环境下可能有不同的适合配置。minheap只能以超时时间为触发条件,超时事件设多少得看具体情况了。感觉都是超参,需要调。


2. 如何保证数据不丢?

storm的acker机制就可以保证数据at least once,就是保证数据不丢,但不保证数据不重复,如果真的需要exactly once,还是放弃jstorm吧,可以用flink试试。但我觉得其实在这种非数据强一致性的情况下,ack机制都不需要开,比较storm的ack还是要消耗一定性能的(有看过别人的数据,开启acker要消耗10%以上性能,参考下)。 在我上图topo设计下,如果集群中有一台机器宕了,cache里的数据就全丢了,其实可能损失也不小,但是机器宕机毕竟还是个小概率事件。


3. 最终数据如何尽可能准确?

对一个时间窗口的维护时间越长,越可能拿到全量数据,结果就越准确。这个肯定的最后那个finalTopn的bolt来做了,数据量越大,finalTopnbolt 的挑战也就越大,我觉得可以起多个bolt,按时间窗口Grouping,然后还得对minheap所有数据做持久化,还得支持持久化后的更新,可以对minheap序列化后放到hbase里。

相关实践学习
简单用户画像分析
本场景主要介绍基于海量日志数据进行简单用户画像分析为背景,如何通过使用DataWorks完成数据采集 、加工数据、配置数据质量监控和数据可视化展现等任务。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
目录
相关文章
|
5月前
|
分布式计算 DataWorks 关系型数据库
MaxCompute支持通过DataWorks数据集成功能将其他数据源数据同步至MaxCompute
MaxCompute支持通过DataWorks数据集成功能将其他数据源数据同步至MaxCompute
37 1
|
8月前
|
分布式计算 运维 大数据
MaxCompute资源管理——使用成本优化功能实现包年包月计算资源降本增效
MaxCompute提供成本优化(计算资源优化推荐)功能,可基于实际作业请求量和资源配置期望,对包年包月一级Quota类型的计算资源生成更优的资源配置方案,帮助进一步提升计算资源利用率,优化计算成本。本文我们一起通过典型场景案例来看看如何通过成本优化(计算资源优化推荐)功能提供降本增效的参考建议。
208 0
|
9月前
|
Cloud Native 大数据
阿里云最新产品手册——阿里云核心产品——云原生大数据计算服务——产品功能
阿里云最新产品手册——阿里云核心产品——云原生大数据计算服务——产品功能自制脑图
77 1
|
10月前
|
数据采集 Java 大数据
大数据数据采集的数据采集(收集/聚合)的Logstash之强大的插件功能
在大数据领域中,Logstash是一款非常流行的数据采集工具。它具有丰富的插件功能,可以完成各种不同数据来源的数据采集任务。本文将介绍Logstash的插件功能,并为大家介绍几款强大的插件。
140 1
|
11月前
|
分布式计算 资源调度 运维
【大数据运维】Hadoop开启Yarn的日志监控功能
【大数据运维】Hadoop开启Yarn的日志监控功能
【大数据运维】Hadoop开启Yarn的日志监控功能
|
SQL 消息中间件 存储
大数据生态圈常用组件(二):概括介绍、功能特性、适用场景
大数据生态圈常用组件(二):概括介绍、功能特性、适用场景
|
存储 机器学习/深度学习 分布式计算
【MaxCompute】核心功能
统一丰富的计算和存储能力MaxCompute 支持多种计算模型和丰富的 UDF。 采用列压缩存储格式,通常情况下具备 5 倍压缩能力,可以大幅节省存储成本。
【MaxCompute】核心功能
|
存储 分布式计算 Oracle
Hbase迎接电信TB级大数据洗礼之热点网站功能实践
在今年年初的时候联通王志军院长就Hadoop在电信行业的大数据应用谈了自己的经验,随着3G网络的发展中国联通目前运营着世界上最大的CDMA网络,流量运营是中国联通一个重要特点。中国联通3G套餐当中流量占比非常非常大,中国联通3G用户流量使用情况也是非常可观的。那么在3G网络功能中上网冲浪占了很大的比例,去研究用户感兴趣的热点网站成为了行为分析中很有特点的一项功能,联通就可以根据这些网站信息推出增值服务,古人云:大浪淘沙始到金啊!
197 0
|
SQL 存储 分布式计算
MaxCompute Information Schema功能详解
阿里云的技术专家为大家带来MaxCompute新功能Information Schema的详细介绍。内容包括Information Schema的简介,安装,使用场景,以及对此新功能的使用建议。
1822 1
MaxCompute Information Schema功能详解
|
存储 分布式计算 MaxCompute
maxCompute的UDAF demo,实现累加功能
maxCompute的UDAF demo,实现累加功能

热门文章

最新文章