本节书摘来自华章计算机《Storm分布式实时计算模式》一书中的第1章,第1.3节,作者:(美)P. Taylor Goetz Brian O’Neill 更多章节内容可以访问云栖社区“华章计算机”公众号查看。
1.3 实现单词计数topology
前面介绍了Storm的基础概念,我们已经准备好实现一个简单的应用。现在开始着手开发一个Storm topology,并且在本地模式执行。Storm本地模式会在一个JVM实例中模拟出一个Storm集群。大大简化了用户在开发环境或者IDE中进行开发和调试。后续章节将会演示如何将本地模式下开发好的topology部署到真实的Storm集群环境。
1.3.1 配置开发环境
新建一个Storm项目其实就是将Storm及其依赖的类库添加到Java classpath中。在第2章中,你将了解到,将Storm topology发布到集群环境中,需要将编译好的类和相关依赖打包在一起。基于这个原因,我们强烈建议使用构建管理工具来管理项目,比如Apache Maven、Gradle或者Leinengen。在单词计数这个例子中,我们使用Maven。
首先,建立一个Maven项目:
https://yqfile.alicdn.com/1548115a59a6387821373f6453d1ffe663ecce78.png
" >
然后,编辑配置文件pom.xml,添加Storm依赖
https://yqfile.alicdn.com/f669783888dff24ca5e17697e3cac2f5fb325204.png
" >
https://yqfile.alicdn.com/9536aec55eab80583717ac6a3af5b68a4cb0f95b.png
" >
之后,通过执行下述命令编译项目,来测试配置Maven是否正确。
1.3.2 实现SentenceSpout
为简化起见,SentenceSpout的实现通过重复静态语句列表来模拟数据源。每句话作为一个单值的tuple向后循环发射。完整实现如例1.1所示。
例1.1 SentenceSpout.java
https://yqfile.alicdn.com/4ecf2bf819c4d284269ba398c569b1fc80ea3c32.png
" >
https://yqfile.alicdn.com/1bb937f8b8240992dedbe50c764bb7add94a0e3d.png
" >
BaseRichSpout类是ISpout接口和IComponent接口的一个简便的实现。接口对本例中用不到的方法提供了默认实现。使用这个类,我们可以专注在所需要的方法上。方法declareOutputFields()是在IComponent接口中定义的,所有Storm的组件(spout和bolt)都必须实现这个接口。Storm的组件通过这个方法告诉Storm该组件会发射哪些数据流,每个数据流的tuple中包含哪些字段。本例中,我们声明了spout会发射一个数据流,其中的tuple包含一个字段(sentence)
Open()方法在ISpout接口中定义,所有Spout组件在初始化时调用这个方法。Open()方法接收三个参数,一个包含了Storm配置信息的map,TopologyContext对象提供了topology中组件的信息,SpoutOutputCollector对象提供了发射tuple的方法。本例中,初始化时不需要做额外操作,因此open()方法实现仅仅是简单将SpoutOutputCollector对象的引用保存在变量中。
nextTuple()方法是所有spout实现的核心所在,Storm通过调用这个方法向输出的collector发射tuple。这个例子中,我们发射当前索引对应的语句,并且递增索引指向下一个语句。
1.3.3 实现语句分割bolt
例1.2列出了SplitSentenceBolt类的实现。
例1.2 SplitSentenceBolt.java
https://yqfile.alicdn.com/273763604f9b0ec2639fef525dca562500955a1e.png
" >
BaseRichBolt类是IComponent和IBolt接口的一个简便实现。继承这个类,就不用去实现本例不关心的方法,将注意力放在实现我们需要的功能上。
prepare()方法在IBolt中定义,类同与ISpout接口中定义的open()方法。这个方法在bolt初始化时调用,可以用来准备bolt用到的资源,如数据库连接。和SentenceSpout类一样,SplitSentenceBolt类在初始化时没有额外操作,因此prepare()方法仅仅保存OutputCollector对象的引用。
在declareOutputFields()方法中,SplitSentenceBolt声明了一个输出流,每个tuple包含一个字段“word”。
SplitSentenceBolt类的核心功能在execute()方法中实现,这个方法是IBolt接口定义的。每当从订阅的数据流中接收一个tuple,都会调用这个方法。本例中,execute()方法按照字符串读取“sentence”字段的值,然后将其拆分为单词,每个单词向后面的输出流发射一个tuple。
1.3.4 实现单词计数bolt
WordCountBolt类(见例1.3)是topology中实际进行单词计数的组件。该bolt的prepare()方法中,实例化了一个HashMap的实例,用来存储单词和对应的计数。大部分实例变量通常是在prepare()方法中进行实例化,这个设计模式是由topology的部署方式决定的。当topology发布时,所有的bolt和spout组件首先会进行序列化,然后通过网络发送到集群中。如果spout或者bolt在序列化之前(比如说在构造函数中生成)实例化了任何无法序列化的实例变量,在进行序列化时会抛出NotSerializableException异常,topology就会部署失败。本例中,因为HashMap是可序列化的,所以在构造函数中进行实例化也是安全的。但是,通常情况下最好是在构造函数中对基本数据类型和可序列化的对象进行赋值和实例化,在prepare()方法中对不可序列化的对象进行实例化。
在declareOutputFields()方法中,类WordCountBolt声明了一个输出流,其中的tuple包括了单词和对应的计数。execute()方法中,当接收到一个单词时,首先查找这个单词对应的计数(如果单词没有出现过则计数初始化为0),递增并存储计数,然后将单词和最新计数作为tuple向后发射。将单词计数作为数据流发射,topology中的其他bolt就可以订阅这个数据流进行进一步的处理。
例1.3 WordCountBolt.java
1.3.5 实现上报bolt
ReportBolt类的作用是对所有单词的计数生成一份报告。和WordCountBolt类似,ReportBolt使用一个HashMap对象来保存单词和对应计数。本例中,它的功能是简单的存储接收到计数bolt发射出的计数tuple。
上报bolt和上述其他bolt的一个区别是,它是一个位于数据流末端的bolt,只接收tuple。因为它不发射任何数据流,所以declareOutputFields()方法是空的。
上报bolt中初次引入了cleanup()方法,这个方法在IBolt接口中定义。Storm在终止一个bolt之前会调用这个方法。本例中我们利用cleanup()方法在topology关闭时输出最终的计数结果。通常情况下,cleanup()方法用来释放bolt占用的资源,如打开的文件句柄或者数据库连接。
开发bolt时需要谨记的是,当topology在Storm集群上运行时,IBolt.cleanup()方法是不可靠的,不能保证会执行。下一章讲到Storm的容错机制时,会讨论其中的原因。但这个例子我们是运行在开发模式中的,可以保证cleanup()被调用。
类ReportBolt的完整代码见示例1.4。
例1.4 ReportBolt.java
1.3.6 实现单词计数topology
我们已经定义了计算所需要的spout和bolt。下面将它们整合为一个可运行的topology(见例1.5)
例1.5 WordCountTopology.java
https://yqfile.alicdn.com/f344eb67ae3a4718239f126f41ffafff592b95b9.png
" >
Storm topology通常由Java的main()函数进行定义,运行或者提交(部署到集群的操作)。在本例中,我们首先定义了一系列字符串常量,作为Storm组件的唯一标识符。main()方法中,首先实例化了spout和bolt,并生成一个TopologyBuilder实例。TopologyBuilder类提供了流式接口风格的API来定义topology组件之间的数据流。首先注册一个sentence spout并且赋值给其唯一的ID:
然后注册一个SplitSentenceBolt,这个bolt订阅SentenceSpout发射出来的数据流:
https://yqfile.alicdn.com/b4a75999a7729d021be270d53e20449395a26aa8.png
" >
类TopologyBuilder的setBolt()方法会注册一个bolt,并且返回BoltDeclarer的实例,可以定义bolt的数据源。这个例子中,我们将SentenceSpout的唯一ID赋值给shuffleGrouping()方法确立了这种订阅关系。shuffleGrouping()方法告诉Storm,要将类SentenceSpout发射的tuple随机均匀的分发给SplitSentenceBolt的实例。后续在讨论Storm的并发性时,会解释数据流分组的详情。代码下一行确立了类SplitSentenceBolt和类theWordCountBolt之间的连接关系:
你将了解到,有时候需要将含有特定数据的tuple路由到特殊的bolt实例中。在此我们使用类BoltDeclarer的fieldsGrouping()方法来保证所有“word”字段值相同的tuple会被路由到同一个WordCountBolt实例中。
定义数据流的最后一步是将WordCountBolt实例发射出的tuple流路由到类ReportBolt上。本例中,我们希望WordCountBolt发射的所有tuple路由到唯一的ReportBolt任务中。globalGrouping()方法提供了这种用法:
所有的数据流都已经定义好,运行单词计数计算的最后一步是编译并提交到集群上:
这里我们采用了Storm的本地模式,使用Storm的LocalCluster类在本地开发环境来模拟一个完整的Storm集群。本地模式是开发和测试的简便方式,省去了在分布式集群中反复部署的开销。本地模式还能够很方便地在IDE中执行Storm topology,设置断点,暂停运行,观察变量,分析程序性能。当topology发布到分布式集群后,这些事情会很耗时甚至难以做到。
Storm的Config类是一个HashMap的子类,并定义了一些Storm特有的常量和简便的方法,用来配置topology运行时行为。当一个topology提交时,Storm会将默认配置和Config实例中的配置合并后作为参数传递给submitTopology()方法。合并后的配置被分发给各个spout的bolt的open()、prepare()方法。从这个层面上讲,Config对象代表了对topology所有组件全局生效的配置参数集合。现在可以运行WordCountTopology类了,main()方法会提交topology,在执行10秒后,停止(卸载)该topology,最后关闭本地模式的集群。程序执行完毕后,在控制台可以看到类似以下的输出: