《Storm分布式实时计算模式》——1.3 实现单词计数topology

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介:

本节书摘来自华章计算机《Storm分布式实时计算模式》一书中的第1章,第1.3节,作者:(美)P. Taylor Goetz Brian O’Neill 更多章节内容可以访问云栖社区“华章计算机”公众号查看。

1.3 实现单词计数topology

前面介绍了Storm的基础概念,我们已经准备好实现一个简单的应用。现在开始着手开发一个Storm topology,并且在本地模式执行。Storm本地模式会在一个JVM实例中模拟出一个Storm集群。大大简化了用户在开发环境或者IDE中进行开发和调试。后续章节将会演示如何将本地模式下开发好的topology部署到真实的Storm集群环境。
1.3.1 配置开发环境
新建一个Storm项目其实就是将Storm及其依赖的类库添加到Java classpath中。在第2章中,你将了解到,将Storm topology发布到集群环境中,需要将编译好的类和相关依赖打包在一起。基于这个原因,我们强烈建议使用构建管理工具来管理项目,比如Apache Maven、Gradle或者Leinengen。在单词计数这个例子中,我们使用Maven。
首先,建立一个Maven项目:


<a href=https://yqfile.alicdn.com/1548115a59a6387821373f6453d1ffe663ecce78.png
" >

然后,编辑配置文件pom.xml,添加Storm依赖

<a href=https://yqfile.alicdn.com/f669783888dff24ca5e17697e3cac2f5fb325204.png
" >


<a href=https://yqfile.alicdn.com/9536aec55eab80583717ac6a3af5b68a4cb0f95b.png
" >

之后,通过执行下述命令编译项目,来测试配置Maven是否正确。

1e9c1ae45edcd1afe6d578489c86782f14aae7d0

1.3.2 实现SentenceSpout
为简化起见,SentenceSpout的实现通过重复静态语句列表来模拟数据源。每句话作为一个单值的tuple向后循环发射。完整实现如例1.1所示。
例1.1 SentenceSpout.java


<a href=https://yqfile.alicdn.com/4ecf2bf819c4d284269ba398c569b1fc80ea3c32.png
" >


<a href=https://yqfile.alicdn.com/1bb937f8b8240992dedbe50c764bb7add94a0e3d.png
" >

BaseRichSpout类是ISpout接口和IComponent接口的一个简便的实现。接口对本例中用不到的方法提供了默认实现。使用这个类,我们可以专注在所需要的方法上。方法declareOutputFields()是在IComponent接口中定义的,所有Storm的组件(spout和bolt)都必须实现这个接口。Storm的组件通过这个方法告诉Storm该组件会发射哪些数据流,每个数据流的tuple中包含哪些字段。本例中,我们声明了spout会发射一个数据流,其中的tuple包含一个字段(sentence)
Open()方法在ISpout接口中定义,所有Spout组件在初始化时调用这个方法。Open()方法接收三个参数,一个包含了Storm配置信息的map,TopologyContext对象提供了topology中组件的信息,SpoutOutputCollector对象提供了发射tuple的方法。本例中,初始化时不需要做额外操作,因此open()方法实现仅仅是简单将SpoutOutputCollector对象的引用保存在变量中。
nextTuple()方法是所有spout实现的核心所在,Storm通过调用这个方法向输出的collector发射tuple。这个例子中,我们发射当前索引对应的语句,并且递增索引指向下一个语句。
1.3.3 实现语句分割bolt
例1.2列出了SplitSentenceBolt类的实现。
例1.2 SplitSentenceBolt.java

<a href=https://yqfile.alicdn.com/273763604f9b0ec2639fef525dca562500955a1e.png
" >


8fb63578ba729e64a26cedb216c340dede4ead26

BaseRichBolt类是IComponent和IBolt接口的一个简便实现。继承这个类,就不用去实现本例不关心的方法,将注意力放在实现我们需要的功能上。
prepare()方法在IBolt中定义,类同与ISpout接口中定义的open()方法。这个方法在bolt初始化时调用,可以用来准备bolt用到的资源,如数据库连接。和SentenceSpout类一样,SplitSentenceBolt类在初始化时没有额外操作,因此prepare()方法仅仅保存OutputCollector对象的引用。
在declareOutputFields()方法中,SplitSentenceBolt声明了一个输出流,每个tuple包含一个字段“word”。
SplitSentenceBolt类的核心功能在execute()方法中实现,这个方法是IBolt接口定义的。每当从订阅的数据流中接收一个tuple,都会调用这个方法。本例中,execute()方法按照字符串读取“sentence”字段的值,然后将其拆分为单词,每个单词向后面的输出流发射一个tuple。
1.3.4 实现单词计数bolt
WordCountBolt类(见例1.3)是topology中实际进行单词计数的组件。该bolt的prepare()方法中,实例化了一个HashMap的实例,用来存储单词和对应的计数。大部分实例变量通常是在prepare()方法中进行实例化,这个设计模式是由topology的部署方式决定的。当topology发布时,所有的bolt和spout组件首先会进行序列化,然后通过网络发送到集群中。如果spout或者bolt在序列化之前(比如说在构造函数中生成)实例化了任何无法序列化的实例变量,在进行序列化时会抛出NotSerializableException异常,topology就会部署失败。本例中,因为HashMap是可序列化的,所以在构造函数中进行实例化也是安全的。但是,通常情况下最好是在构造函数中对基本数据类型和可序列化的对象进行赋值和实例化,在prepare()方法中对不可序列化的对象进行实例化。
在declareOutputFields()方法中,类WordCountBolt声明了一个输出流,其中的tuple包括了单词和对应的计数。execute()方法中,当接收到一个单词时,首先查找这个单词对应的计数(如果单词没有出现过则计数初始化为0),递增并存储计数,然后将单词和最新计数作为tuple向后发射。将单词计数作为数据流发射,topology中的其他bolt就可以订阅这个数据流进行进一步的处理。
例1.3 WordCountBolt.java

00809a31887115afa94bd8cf6a9f32d8ab7c16aa

1.3.5 实现上报bolt
ReportBolt类的作用是对所有单词的计数生成一份报告。和WordCountBolt类似,ReportBolt使用一个HashMap对象来保存单词和对应计数。本例中,它的功能是简单的存储接收到计数bolt发射出的计数tuple。
上报bolt和上述其他bolt的一个区别是,它是一个位于数据流末端的bolt,只接收tuple。因为它不发射任何数据流,所以declareOutputFields()方法是空的。
上报bolt中初次引入了cleanup()方法,这个方法在IBolt接口中定义。Storm在终止一个bolt之前会调用这个方法。本例中我们利用cleanup()方法在topology关闭时输出最终的计数结果。通常情况下,cleanup()方法用来释放bolt占用的资源,如打开的文件句柄或者数据库连接。
开发bolt时需要谨记的是,当topology在Storm集群上运行时,IBolt.cleanup()方法是不可靠的,不能保证会执行。下一章讲到Storm的容错机制时,会讨论其中的原因。但这个例子我们是运行在开发模式中的,可以保证cleanup()被调用。
类ReportBolt的完整代码见示例1.4。
例1.4 ReportBolt.java

05cc9370e43b833080a83a695bf0a0d25bd02961

1.3.6 实现单词计数topology
我们已经定义了计算所需要的spout和bolt。下面将它们整合为一个可运行的topology(见例1.5)
例1.5 WordCountTopology.java

<a href=https://yqfile.alicdn.com/f344eb67ae3a4718239f126f41ffafff592b95b9.png
" >

Storm topology通常由Java的main()函数进行定义,运行或者提交(部署到集群的操作)。在本例中,我们首先定义了一系列字符串常量,作为Storm组件的唯一标识符。main()方法中,首先实例化了spout和bolt,并生成一个TopologyBuilder实例。TopologyBuilder类提供了流式接口风格的API来定义topology组件之间的数据流。首先注册一个sentence spout并且赋值给其唯一的ID:

de7a51d5f9e04cbbf5d2b6ec85874bcebeb1166f

然后注册一个SplitSentenceBolt,这个bolt订阅SentenceSpout发射出来的数据流:

<a href=https://yqfile.alicdn.com/b4a75999a7729d021be270d53e20449395a26aa8.png
" >

类TopologyBuilder的setBolt()方法会注册一个bolt,并且返回BoltDeclarer的实例,可以定义bolt的数据源。这个例子中,我们将SentenceSpout的唯一ID赋值给shuffleGrouping()方法确立了这种订阅关系。shuffleGrouping()方法告诉Storm,要将类SentenceSpout发射的tuple随机均匀的分发给SplitSentenceBolt的实例。后续在讨论Storm的并发性时,会解释数据流分组的详情。代码下一行确立了类SplitSentenceBolt和类theWordCountBolt之间的连接关系:

eb5b7fe9cd49ab4999634ddeebc6eb504ee8a5a1

你将了解到,有时候需要将含有特定数据的tuple路由到特殊的bolt实例中。在此我们使用类BoltDeclarer的fieldsGrouping()方法来保证所有“word”字段值相同的tuple会被路由到同一个WordCountBolt实例中。
定义数据流的最后一步是将WordCountBolt实例发射出的tuple流路由到类ReportBolt上。本例中,我们希望WordCountBolt发射的所有tuple路由到唯一的ReportBolt任务中。globalGrouping()方法提供了这种用法:

27d99b4cdcdd64cdefa5d47dbd60b568dc30bcca

所有的数据流都已经定义好,运行单词计数计算的最后一步是编译并提交到集群上:

ce3577b2041dc07a674d125bc4918c0d593a456e

这里我们采用了Storm的本地模式,使用Storm的LocalCluster类在本地开发环境来模拟一个完整的Storm集群。本地模式是开发和测试的简便方式,省去了在分布式集群中反复部署的开销。本地模式还能够很方便地在IDE中执行Storm topology,设置断点,暂停运行,观察变量,分析程序性能。当topology发布到分布式集群后,这些事情会很耗时甚至难以做到。
Storm的Config类是一个HashMap的子类,并定义了一些Storm特有的常量和简便的方法,用来配置topology运行时行为。当一个topology提交时,Storm会将默认配置和Config实例中的配置合并后作为参数传递给submitTopology()方法。合并后的配置被分发给各个spout的bolt的open()、prepare()方法。从这个层面上讲,Config对象代表了对topology所有组件全局生效的配置参数集合。现在可以运行WordCountTopology类了,main()方法会提交topology,在执行10秒后,停止(卸载)该topology,最后关闭本地模式的集群。程序执行完毕后,在控制台可以看到类似以下的输出:

b2c15fe0f2a3a7e58056660984e65b332942870f
相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
30天前
|
存储 缓存 算法
分布式锁服务深度解析:以Apache Flink的Checkpointing机制为例
【10月更文挑战第7天】在分布式系统中,多个进程或节点可能需要同时访问和操作共享资源。为了确保数据的一致性和系统的稳定性,我们需要一种机制来协调这些进程或节点的访问,避免并发冲突和竞态条件。分布式锁服务正是为此而生的一种解决方案。它通过在网络环境中实现锁机制,确保同一时间只有一个进程或节点能够访问和操作共享资源。
61 3
|
1月前
|
分布式计算 资源调度 大数据
大数据-110 Flink 安装部署 下载解压配置 Standalone模式启动 打包依赖(一)
大数据-110 Flink 安装部署 下载解压配置 Standalone模式启动 打包依赖(一)
48 0
|
1月前
|
分布式计算 资源调度 大数据
大数据-110 Flink 安装部署 下载解压配置 Standalone模式启动 打包依赖(二)
大数据-110 Flink 安装部署 下载解压配置 Standalone模式启动 打包依赖(二)
65 0
|
13天前
|
监控
Saga模式在分布式系统中保证事务的隔离性
Saga模式在分布式系统中保证事务的隔离性
|
1月前
|
存储 缓存 NoSQL
大数据-38 Redis 高并发下的分布式缓存 Redis简介 缓存场景 读写模式 旁路模式 穿透模式 缓存模式 基本概念等
大数据-38 Redis 高并发下的分布式缓存 Redis简介 缓存场景 读写模式 旁路模式 穿透模式 缓存模式 基本概念等
55 4
|
2月前
Saga模式在分布式系统中如何保证事务的隔离性
Saga模式在分布式系统中如何保证事务的隔离性
|
1月前
|
资源调度 分布式计算 大数据
大数据-111 Flink 安装部署 YARN部署模式 FlinkYARN模式申请资源、提交任务
大数据-111 Flink 安装部署 YARN部署模式 FlinkYARN模式申请资源、提交任务
86 0
|
3月前
|
开发者 云计算 数据库
从桌面跃升至云端的华丽转身:深入解析如何运用WinForms与Azure的强大组合,解锁传统应用向现代化分布式系统演变的秘密,实现性能与安全性的双重飞跃——你不可不知的开发新模式
【8月更文挑战第31天】在数字化转型浪潮中,传统桌面应用面临新挑战。本文探讨如何融合Windows Forms(WinForms)与Microsoft Azure,助力应用向云端转型。通过Azure的虚拟机、容器及无服务器计算,可轻松解决性能瓶颈,满足全球用户需求。文中还提供了连接Azure数据库的示例代码,并介绍了集成Azure Storage和Functions的方法。尽管存在安全性、网络延迟及成本等问题,但合理设计架构可有效应对,帮助开发者构建高效可靠的现代应用。
31 0
|
3月前
|
关系型数据库 MySQL 数据处理
实时计算 Flink版产品使用问题之mini-cluster模式下,怎么指定checkpoint的时间间隔
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
28天前
|
NoSQL Java Redis
太惨痛: Redis 分布式锁 5个大坑,又大又深, 如何才能 避开 ?
Redis分布式锁在高并发场景下是重要的技术手段,但其实现过程中常遇到五大深坑:**原子性问题**、**连接耗尽问题**、**锁过期问题**、**锁失效问题**以及**锁分段问题**。这些问题不仅影响系统的稳定性和性能,还可能导致数据不一致。尼恩在实际项目中总结了这些坑,并提供了详细的解决方案,包括使用Lua脚本保证原子性、设置合理的锁过期时间和使用看门狗机制、以及通过锁分段提升性能。这些经验和技巧对面试和实际开发都有很大帮助,值得深入学习和实践。
太惨痛: Redis 分布式锁 5个大坑,又大又深, 如何才能 避开 ?