《Storm分布式实时计算模式》——1.3 实现单词计数topology

简介:

本节书摘来自华章计算机《Storm分布式实时计算模式》一书中的第1章,第1.3节,作者:(美)P. Taylor Goetz Brian O’Neill 更多章节内容可以访问云栖社区“华章计算机”公众号查看。

1.3 实现单词计数topology

前面介绍了Storm的基础概念,我们已经准备好实现一个简单的应用。现在开始着手开发一个Storm topology,并且在本地模式执行。Storm本地模式会在一个JVM实例中模拟出一个Storm集群。大大简化了用户在开发环境或者IDE中进行开发和调试。后续章节将会演示如何将本地模式下开发好的topology部署到真实的Storm集群环境。
1.3.1 配置开发环境
新建一个Storm项目其实就是将Storm及其依赖的类库添加到Java classpath中。在第2章中,你将了解到,将Storm topology发布到集群环境中,需要将编译好的类和相关依赖打包在一起。基于这个原因,我们强烈建议使用构建管理工具来管理项目,比如Apache Maven、Gradle或者Leinengen。在单词计数这个例子中,我们使用Maven。
首先,建立一个Maven项目:


<a href=https://yqfile.alicdn.com/1548115a59a6387821373f6453d1ffe663ecce78.png
" >

然后,编辑配置文件pom.xml,添加Storm依赖

<a href=https://yqfile.alicdn.com/f669783888dff24ca5e17697e3cac2f5fb325204.png
" >


<a href=https://yqfile.alicdn.com/9536aec55eab80583717ac6a3af5b68a4cb0f95b.png
" >

之后,通过执行下述命令编译项目,来测试配置Maven是否正确。

1e9c1ae45edcd1afe6d578489c86782f14aae7d0

1.3.2 实现SentenceSpout
为简化起见,SentenceSpout的实现通过重复静态语句列表来模拟数据源。每句话作为一个单值的tuple向后循环发射。完整实现如例1.1所示。
例1.1 SentenceSpout.java


<a href=https://yqfile.alicdn.com/4ecf2bf819c4d284269ba398c569b1fc80ea3c32.png
" >


<a href=https://yqfile.alicdn.com/1bb937f8b8240992dedbe50c764bb7add94a0e3d.png
" >

BaseRichSpout类是ISpout接口和IComponent接口的一个简便的实现。接口对本例中用不到的方法提供了默认实现。使用这个类,我们可以专注在所需要的方法上。方法declareOutputFields()是在IComponent接口中定义的,所有Storm的组件(spout和bolt)都必须实现这个接口。Storm的组件通过这个方法告诉Storm该组件会发射哪些数据流,每个数据流的tuple中包含哪些字段。本例中,我们声明了spout会发射一个数据流,其中的tuple包含一个字段(sentence)
Open()方法在ISpout接口中定义,所有Spout组件在初始化时调用这个方法。Open()方法接收三个参数,一个包含了Storm配置信息的map,TopologyContext对象提供了topology中组件的信息,SpoutOutputCollector对象提供了发射tuple的方法。本例中,初始化时不需要做额外操作,因此open()方法实现仅仅是简单将SpoutOutputCollector对象的引用保存在变量中。
nextTuple()方法是所有spout实现的核心所在,Storm通过调用这个方法向输出的collector发射tuple。这个例子中,我们发射当前索引对应的语句,并且递增索引指向下一个语句。
1.3.3 实现语句分割bolt
例1.2列出了SplitSentenceBolt类的实现。
例1.2 SplitSentenceBolt.java

<a href=https://yqfile.alicdn.com/273763604f9b0ec2639fef525dca562500955a1e.png
" >


8fb63578ba729e64a26cedb216c340dede4ead26

BaseRichBolt类是IComponent和IBolt接口的一个简便实现。继承这个类,就不用去实现本例不关心的方法,将注意力放在实现我们需要的功能上。
prepare()方法在IBolt中定义,类同与ISpout接口中定义的open()方法。这个方法在bolt初始化时调用,可以用来准备bolt用到的资源,如数据库连接。和SentenceSpout类一样,SplitSentenceBolt类在初始化时没有额外操作,因此prepare()方法仅仅保存OutputCollector对象的引用。
在declareOutputFields()方法中,SplitSentenceBolt声明了一个输出流,每个tuple包含一个字段“word”。
SplitSentenceBolt类的核心功能在execute()方法中实现,这个方法是IBolt接口定义的。每当从订阅的数据流中接收一个tuple,都会调用这个方法。本例中,execute()方法按照字符串读取“sentence”字段的值,然后将其拆分为单词,每个单词向后面的输出流发射一个tuple。
1.3.4 实现单词计数bolt
WordCountBolt类(见例1.3)是topology中实际进行单词计数的组件。该bolt的prepare()方法中,实例化了一个HashMap的实例,用来存储单词和对应的计数。大部分实例变量通常是在prepare()方法中进行实例化,这个设计模式是由topology的部署方式决定的。当topology发布时,所有的bolt和spout组件首先会进行序列化,然后通过网络发送到集群中。如果spout或者bolt在序列化之前(比如说在构造函数中生成)实例化了任何无法序列化的实例变量,在进行序列化时会抛出NotSerializableException异常,topology就会部署失败。本例中,因为HashMap是可序列化的,所以在构造函数中进行实例化也是安全的。但是,通常情况下最好是在构造函数中对基本数据类型和可序列化的对象进行赋值和实例化,在prepare()方法中对不可序列化的对象进行实例化。
在declareOutputFields()方法中,类WordCountBolt声明了一个输出流,其中的tuple包括了单词和对应的计数。execute()方法中,当接收到一个单词时,首先查找这个单词对应的计数(如果单词没有出现过则计数初始化为0),递增并存储计数,然后将单词和最新计数作为tuple向后发射。将单词计数作为数据流发射,topology中的其他bolt就可以订阅这个数据流进行进一步的处理。
例1.3 WordCountBolt.java

00809a31887115afa94bd8cf6a9f32d8ab7c16aa

1.3.5 实现上报bolt
ReportBolt类的作用是对所有单词的计数生成一份报告。和WordCountBolt类似,ReportBolt使用一个HashMap对象来保存单词和对应计数。本例中,它的功能是简单的存储接收到计数bolt发射出的计数tuple。
上报bolt和上述其他bolt的一个区别是,它是一个位于数据流末端的bolt,只接收tuple。因为它不发射任何数据流,所以declareOutputFields()方法是空的。
上报bolt中初次引入了cleanup()方法,这个方法在IBolt接口中定义。Storm在终止一个bolt之前会调用这个方法。本例中我们利用cleanup()方法在topology关闭时输出最终的计数结果。通常情况下,cleanup()方法用来释放bolt占用的资源,如打开的文件句柄或者数据库连接。
开发bolt时需要谨记的是,当topology在Storm集群上运行时,IBolt.cleanup()方法是不可靠的,不能保证会执行。下一章讲到Storm的容错机制时,会讨论其中的原因。但这个例子我们是运行在开发模式中的,可以保证cleanup()被调用。
类ReportBolt的完整代码见示例1.4。
例1.4 ReportBolt.java

05cc9370e43b833080a83a695bf0a0d25bd02961

1.3.6 实现单词计数topology
我们已经定义了计算所需要的spout和bolt。下面将它们整合为一个可运行的topology(见例1.5)
例1.5 WordCountTopology.java

<a href=https://yqfile.alicdn.com/f344eb67ae3a4718239f126f41ffafff592b95b9.png
" >

Storm topology通常由Java的main()函数进行定义,运行或者提交(部署到集群的操作)。在本例中,我们首先定义了一系列字符串常量,作为Storm组件的唯一标识符。main()方法中,首先实例化了spout和bolt,并生成一个TopologyBuilder实例。TopologyBuilder类提供了流式接口风格的API来定义topology组件之间的数据流。首先注册一个sentence spout并且赋值给其唯一的ID:

de7a51d5f9e04cbbf5d2b6ec85874bcebeb1166f

然后注册一个SplitSentenceBolt,这个bolt订阅SentenceSpout发射出来的数据流:

<a href=https://yqfile.alicdn.com/b4a75999a7729d021be270d53e20449395a26aa8.png
" >

类TopologyBuilder的setBolt()方法会注册一个bolt,并且返回BoltDeclarer的实例,可以定义bolt的数据源。这个例子中,我们将SentenceSpout的唯一ID赋值给shuffleGrouping()方法确立了这种订阅关系。shuffleGrouping()方法告诉Storm,要将类SentenceSpout发射的tuple随机均匀的分发给SplitSentenceBolt的实例。后续在讨论Storm的并发性时,会解释数据流分组的详情。代码下一行确立了类SplitSentenceBolt和类theWordCountBolt之间的连接关系:

eb5b7fe9cd49ab4999634ddeebc6eb504ee8a5a1

你将了解到,有时候需要将含有特定数据的tuple路由到特殊的bolt实例中。在此我们使用类BoltDeclarer的fieldsGrouping()方法来保证所有“word”字段值相同的tuple会被路由到同一个WordCountBolt实例中。
定义数据流的最后一步是将WordCountBolt实例发射出的tuple流路由到类ReportBolt上。本例中,我们希望WordCountBolt发射的所有tuple路由到唯一的ReportBolt任务中。globalGrouping()方法提供了这种用法:

27d99b4cdcdd64cdefa5d47dbd60b568dc30bcca

所有的数据流都已经定义好,运行单词计数计算的最后一步是编译并提交到集群上:

ce3577b2041dc07a674d125bc4918c0d593a456e

这里我们采用了Storm的本地模式,使用Storm的LocalCluster类在本地开发环境来模拟一个完整的Storm集群。本地模式是开发和测试的简便方式,省去了在分布式集群中反复部署的开销。本地模式还能够很方便地在IDE中执行Storm topology,设置断点,暂停运行,观察变量,分析程序性能。当topology发布到分布式集群后,这些事情会很耗时甚至难以做到。
Storm的Config类是一个HashMap的子类,并定义了一些Storm特有的常量和简便的方法,用来配置topology运行时行为。当一个topology提交时,Storm会将默认配置和Config实例中的配置合并后作为参数传递给submitTopology()方法。合并后的配置被分发给各个spout的bolt的open()、prepare()方法。从这个层面上讲,Config对象代表了对topology所有组件全局生效的配置参数集合。现在可以运行WordCountTopology类了,main()方法会提交topology,在执行10秒后,停止(卸载)该topology,最后关闭本地模式的集群。程序执行完毕后,在控制台可以看到类似以下的输出:

b2c15fe0f2a3a7e58056660984e65b332942870f
相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
消息中间件 运维 Kafka
直播预告|Kafka+Flink双引擎实战:手把手带你搭建分布式实时分析平台!
在数字化转型中,企业亟需从海量数据中快速提取价值并转化为业务增长动力。5月15日19:00-21:00,阿里云三位技术专家将讲解Kafka与Flink的强强联合方案,帮助企业零门槛构建分布式实时分析平台。此组合广泛应用于实时风控、用户行为追踪等场景,具备高吞吐、弹性扩缩容及亚秒级响应优势。直播适合初学者、开发者和数据工程师,参与还有机会领取定制好礼!扫描海报二维码或点击链接预约直播:[https://developer.aliyun.com/live/255088](https://developer.aliyun.com/live/255088)
738 35
直播预告|Kafka+Flink双引擎实战:手把手带你搭建分布式实时分析平台!
|
消息中间件 运维 Kafka
直播预告|Kafka+Flink 双引擎实战:手把手带你搭建分布式实时分析平台!
直播预告|Kafka+Flink 双引擎实战:手把手带你搭建分布式实时分析平台!
360 11
|
存储 缓存 算法
分布式锁服务深度解析:以Apache Flink的Checkpointing机制为例
【10月更文挑战第7天】在分布式系统中,多个进程或节点可能需要同时访问和操作共享资源。为了确保数据的一致性和系统的稳定性,我们需要一种机制来协调这些进程或节点的访问,避免并发冲突和竞态条件。分布式锁服务正是为此而生的一种解决方案。它通过在网络环境中实现锁机制,确保同一时间只有一个进程或节点能够访问和操作共享资源。
657 3
|
SQL 关系型数据库 MySQL
Flink CDC 3.4 发布, 优化高频 DDL 处理,支持 Batch 模式,新增 Iceberg 支持
Apache Flink CDC 3.4.0 版本正式发布!经过4个月的开发,此版本强化了对高频表结构变更的支持,新增 batch 执行模式和 Apache Iceberg Sink 连接器,可将数据库数据全增量实时写入 Iceberg 数据湖。51位贡献者完成了259次代码提交,优化了 MySQL、MongoDB 等连接器,并修复多个缺陷。未来 3.5 版本将聚焦脏数据处理、数据限流等能力及 AI 生态对接。欢迎下载体验并提出反馈!
1977 1
Flink CDC 3.4 发布, 优化高频 DDL 处理,支持 Batch 模式,新增 Iceberg 支持
|
关系型数据库 MySQL 数据库
基于Flink CDC 开发,支持Web-UI的实时KingBase 连接器,三大模式无缝切换,效率翻倍!
TIS 是一款基于Web-UI的开源大数据集成工具,通过与人大金仓Kingbase的深度整合,提供高效、灵活的实时数据集成方案。它支持增量数据监听和实时写入,兼容MySQL、PostgreSQL和Oracle模式,无需编写复杂脚本,操作简单直观,特别适合非专业开发人员使用。TIS率先实现了Kingbase CDC连接器的整合,成为业界首个开箱即用的Kingbase CDC数据同步解决方案,助力企业数字化转型。
3147 5
基于Flink CDC 开发,支持Web-UI的实时KingBase 连接器,三大模式无缝切换,效率翻倍!
|
SQL 数据建模 BI
【YashanDB 知识库】用 yasldr 配置 Bulkload 模式作单线程迁移 300G 的业务数据到分布式数据库,迁移任务频繁出错
问题描述 详细版本:YashanDB Server Enterprise Edition Release 23.2.4.100 x86_64 6db1237 影响范围: 离线数据迁移场景,影响业务数据入库。 外场将部分 NewCIS 的报表业务放到分布式数据库,验证 SQL 性能水平。 操作系统环境配置: 125G 内存 32C CPU 2T 的 HDD 磁盘 问题出现的步骤/操作: 1、部署崖山分布式数据库 1mm 1cn 3dn 单线启动 yasldr 数据迁移任务,设置 32 线程的 bulk load 模式 2、观察 yasldr.log 是否出现如下错
|
监控
Saga模式在分布式系统中保证事务的隔离性
Saga模式在分布式系统中保证事务的隔离性
384 4
|
资源调度 分布式计算 大数据
大数据-111 Flink 安装部署 YARN部署模式 FlinkYARN模式申请资源、提交任务
大数据-111 Flink 安装部署 YARN部署模式 FlinkYARN模式申请资源、提交任务
612 0
|
消息中间件 大数据 测试技术
流计算框架 Flink 与 Storm 的性能对比
分布式实时计算框架 Flink 与 Storm 进行性能对比,为实时计算平台和业务提供数据参考。
2127 0

热门文章

最新文章