Storm概念学习系列之Topology拓扑

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
MSE Nacos/ZooKeeper 企业版试用,1600元额度,限量50份
云原生网关 MSE Higress,422元/月
简介:

  Hadoop 上运行的是 MapReduce 作业,而在 Storm 上运行的是拓扑 Topology,这两者之间是非常不同的。一个关键的区别是:一个MapReduce 作业最终会结束,而一个 Topology 拓扑会永远运行(除非手动杀掉)

 

 

 

Topology拓扑

  从字面上解释Topology,就是网络拓扑,是指用传输介质互连各种设备的物理布局,是构成网络的成员间特定的物理的(即真实的),或者逻辑的,即虚拟的排列方式。拓扑是一种不考虑物体的大小、形状等物理属性,而只使用点或者线描述多个物体实际位置与关系的抽象表示方法。拓扑不关心事物的细节,也不在乎相互的比例关系,只是以图的形式表示一定范围内多个物体之间的相互关系。从Storm角度考虑,它不是网络拓扑,但是又类似于网络拓扑的结构,所以取名Topology。
那么Storm的Topology指的是类似于网络拓扑图的一种虚拟结构。Storm的拓扑Topology类似于MapReduce任务,一个关键的区别是MapReduce任务运行一段时间后最终会完成,而Storm拓扑一直运行(直到杀掉它)。一个拓扑是由Spout和Bolt组成的图,Spout和Bolt之间通过流分组连接起来。图1形象地描述了Topology中的Spout和Bolt之间的关系。

               

                      图1    Spout和Bolt的关系图

 

 

 

  通过对图1的理解可以看出,Topology是由Spout、Bolt、数据载体Tuple等构成的一定规则的网络拓扑图。Storm提供了TopologyBuilder类来创建Topology。打个比方,TopologyBuilder是Topology的骨架,Spout、Bolt是Topology的肉和血液。TopologyBuilder类的主要方法如图2所示。

            

                    图 2    TopologyBuilder类的主要方法

 

   TopologyBuilder实际上是封装了Topology的Thrift接口,也就是说Topology实际上是通过Thrift定义的一个结构,TopologyBuilder将这个对象建立起来,然后Nimbus实际上运行一个Thrift服务器,用于接收用户提交的结构。由于采用Thrift实现,所以用户可以用其他语言建立Topology,这样就提供了比较方便的多语言操作支持。

 

 

 

 

 

 Topology实例
下面从一个简单的例子开始介绍Topology的构建和定义,通过此案例能够基本理解Storm,并且能够构建一个简单的Topology。本实例使用Topology来统计一个句子中单词出现的频率。下面详细介绍如何设计和运行Topology,以及一些注意事项。


 1. 设计Topology结构
在编写代码之前,首先要设计Topology。在理清数据处理逻辑之后,创建Topology就非常简单了。统计单词词频的Topology的大致结构如图3所示。可以将Topology分成3个部分:一是数据源KafkaSpout,负责发送语句;二是数据处理者SplitSentenceBolt,负责切分语句;三是数据再处理者WordCountBolt,负责累加单词的频率

             

                         图 3      Topology的结构

 

 

                 

         2. 设计数据流
设计的Topology是从KafkaSpout中读取句子,并把句子划分成单词,然后汇总每个单词出现的次数,一个Bolt负责获取句子后划分成单词,一个Bolt分别对应计算每一个单词出现的次数,然后Tuple在Spout和Bolt之间传递,如图3-15所示。

          

                       图4    Topology内部数据流图

  

          

   3. 代码实现
(1)构建Maven环境
为了开发Topology,需要把Storm相关的JAR包添加到CLASSPATH中,要么手动添加所有相关的JAR包,要么使用Maven来管理所有的依赖。Storm的JAR包发布在Clojars(一个Maven库),如果使用Maven,需要把下面的配置代码添加在项目的pom.xml中。

复制代码
<repository>
   <id>clojars.org</id>
   <url>http:// clojars.org/repo</url>
</repository>
<dependency>
    <groupId>storm</groupId>
   <artifactId>storm</artifactId>
    <version>0.8.2</version>
    <scope>test</scope>
</dependency>
复制代码

 

 

(2)定义Topology
定义Topology的内部逻辑,代码如下:

  

SpoutConf?ig kafkaConf?ig = new SpoutConf?ig(brokerHosts, "storm-sentence", "", "storm");
kafkaConf?ig.scheme = new SchemeAsMultiScheme(new StringScheme());
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(1new KafkaSpout(kafkaConf?ig), 10);// id, spout, parallelism_hint
builder.setBolt(2, new SplitSentence(), 10) .shuffleGrouping(1);
builder.setBolt(3, new WordCount(), 20) .f?ieldsGrouping(2, new Fields("word"));

 

 

  声明的Topology的Spout是从Kafka中读取句子,Spout用setSpout方法插入一个独特的ID到Topology中。Topology中的每个节点必须给予一个ID,ID是由其他Bolt用于订阅该节点的输出流,KafkaSpout在Topology中的ID为1。
setBolt用于在Topology中插入Bolt。在Topology中定义的第一个Bolt是切割句子的Bolt,该Bolt(即SplitSentence)将句子流转成单词流;setBolt的最后一个参数是Bolt的并行量,因为SplitSentence是10个并发,所以在Storm集群中有10个线程并行执行。当Topology遇到性能瓶颈时,可以通过增加Bolt并行数量来解决。setBolt方法返回一个对象,用来定义Bolt的输入。例如,SplitSentence约定使用组件ID为1的输出流,1是指已经定义的KafkaSpout。SplitSentence会消耗KafkaSpout发出的每一个元组。
SplitSentence的关键方法是execute,它将句子拆分成单词,并发出每个单词作为新的元组。另一个重要的方法是declareOutputFields,其中声明了Bolt输出元组的架构,这个方法声明它发出一个域为“word”的元组。
SplitSentence对句子中的每个单词发射一个新的Tuple,WordCount在内存中维护每个单词出现次数的映射,WordCount每收到一个单词,都会更新内存中的统计状态。

 

 

  SplitSentence的实现代码如下:

复制代码
public class SplitSentence implements IBasicBolt{
      public void prepare(Map conf, TopologyContext context) {
       }
     public void execute(Tuple tuple, BasicOutputCollector collector) {
          String sentence = tuple.getString(0);
           for(String word: sentence.split(" ")) {
                  collector.emit(new Values(word));
             }
          }
       public void cleanup() {
      }
      public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word"));
         }
 }
复制代码

 

 

 

  WordCount的实现代码如下:

 

复制代码
public class WordCount implements IBasicBolt {
      private Map<String, Integer> _counts = new HashMap<String, Integer>();
      public void prepare(Map conf, TopologyContext context) {
      }
     public void execute(Tuple tuple, BasicOutputCollector collector) {
          String word = tuple.getString(0);
          int count;
          if(_counts.containsKey(word)) {
               count = _counts.get(word);
          } else {
               count = 0;
 }
          count++;
          _counts.put(word, count);
          collector.emit(new Values(word, count));
     }
     public void cleanup() {
     }
     public void declareOutputFields(OutputFieldsDeclarer declarer) {
          declarer.declare(new Fields("word", "count"));
     }
}
复制代码

 

 

 

  4. Topology运行
Topology运行有两种模式:本地模式和分布式模式。这两种模式的接口区别很大,使用场景也不相同。另外,下面还将介绍Topology的运行流程、方法调用过程以及并行度等。
1. Topology运行模式
Topology的运行模式可以分为本地模式和分布式模式,模式可以在配置文件中和代码中设置。
(1)本地模式
Storm用一个进程中的线程来模拟所有的Spout和Bolt。本地模式对开发和测试来说比较有用。storm-starter中的Topology是以本地模式运行的,可以看到Topology中的每一个组件发射的消息。示例代码如下:

 

复制代码
Config conf = new Conf?ig();
conf.setDebug(true);
conf.setNumWorkers(2);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", conf, builder.createTopology());
Utils.sleep(10000);
cluster.killTopology("test");
cluster.shutdown();
复制代码

  首先,这段代码通过定义一个LocalCluster对象来定义一个进程内的集群。提交Topology给这个虚拟的集群和提交Topology给分布式集群相同。通过调用submitTopology方法来提交Topology,共有3个参数:要运行的Topology的名称、一个配置对象,以及要运行的Topology本身。
Topology是以名称来唯一区别的,可以用这个名称来杀掉该Topology,而且必须显式地杀掉,否则它会一直运行。

 


conf对象可以配置内容很多,下面两个是最常见的:
TOPOLOGY_WORKERS (setNumWorkers):定义希望集群分配多少个工作进程来执行这个Topology。Topology中的每个组件都需要线程来执行。每个组件到底用多少个线程是通过setBolt和setSpout来指定的。这些线程都运行在工作进程中。每一个工作进程包含一些节点的一些工作线程。例如,指定300个线程,60个进程,那么每个工作进程中要执行6个线程,而这6个线程可能属于不同的组件(Spout或Bolt)。可以调整每个组件的并行度以及这些线程所在的进程数量来调整Topology的性能。
TOPOLOGY_DEBUG (setDebug):当它设置为true时,Storm会记录下每个组件发射的每条消息。这在本地环境调试Topology时很有用,但是在生产环境中如果这么做,则会影响性能。

 

 

 (2)分布式模式
Storm由若干节点组成。提交Topology给Nimbus时,也会提交Topology代码。Nimbus负责分发代码和给Topolgoy分配工作进程。如果一个工作进程挂掉了,Nimbus节点会将其重新分配到其他节点。分布式模式提交拓扑的代码如下:

StormSubmitter.submitTopology(topologyName, topologyConf?ig,  builder.createTopology());

 

   在Storm代码编写完成之后,需要打包成JAR包放到Nimbus中运行。在打包时,不需要把依赖的JAR都打进去,否则运行时会出现重复的配置文件错误导致Topology无法运行,因为在Topology运行之前,会加载本地的storm.yaml配置文件。
在Nimbus运行的命令如下。

storm  jar  StormTopology.jar  mainclass  args

 

 

 

  2. Topology运行流程
在Topology的运行流程中,有几点需要特别说明。
1)提交Topology后,Storm会把代码先存放到Nimbus节点的inbox目录下;之后,把当前Storm运行的配置生成一个stormconf.ser文件放到Nimbus节点的stormdist目录中,此目录中同时还有序列化之后的Topology代码文件。
2)在设定Topology关联的Spout和Bolt时,可以同时设置当前Spout和Bolt的Executor和Task数量。在默认情况下,一个Topology的Task总和与Executor的总和一致。之后,系统根据Worker的数量,尽量将这些Task平均分配到不同的Worker上执行。Worker在哪个Supervisor节点上运行是由Storm本身决定的。
3)在任务分配好之后,Nimbus节点将任务的信息提交到ZooKeeper集群,同时在ZooKeeper集群中有Workerbeats,这里存储了当前Topology所有Worker进程的心跳信息。
4)Supervisor节点不断轮询ZooKeeper集群,在ZooKeeper的assignments中保存了所有Topology的任务分配信息、代码存储目录、任务之间的关联关系等,Supervisor通过轮询此节点的内容来领取自己的任务,启动Worker进程运行。
5)一个Topology运行之后,不断通过Spout来发送流,通过Bolt来不断处理接收到的流,流是无界的。最后一步会不间断地执行,除非手动结束该Topology。

 


 3. Topology的方法调用流程
Topology中的流处理时,调用方法的过程如图3-16所示。
Topology方法调用的过程有如下一些要点:
1)每个组件(Spout或者Bolt)的构造方法和declareOutputFields方法都只被调用一次。
2)open方法和prepare方法被调用多次。在入口函数中设定的setSpout或者setBolt中的并行度参数是指Executor的数量,是负责运行组件中的Task的线程数量,此数量是多少,上述两个方法就会被调用多少次,在每个Executor运行时调用一次。
3)nextTuple方法和execute方法是一直运行的,nextTuple方法不断发射Tuple,Bolt的execute不断接收Tuple进行处理。只有这样不断地运行,才会产生无界的Tuple流,体现实时性。这类似于Java线程的run方法。
4)提交一个Topology之后,Storm创建Spout/Bolt实例并进行序列化。之后,将序列化的组件发送给所有任务所在的节点(即Supervisor节点),在每一个任务上反序列化组件。
5)Spout和Bolt之间、Bolt和Bolt之间的通信,通过ZeroMQ的消息队列实现。
6)图3-16没有列出ack和fail方法,在一个Tuple成功处理之后,需要调用ack方法来标记成功,否则调用fail方法标记失败,重新处理该Tuple。

             

                         图5    Topology流处理过程图      

     

 

  4. Topology并行度
在Topology的执行单元中,有几个和并行度相关的概念。
(1)Worker
每个Worker都属于一个特定的Topology,每个Supervisor节点的Worker可以有多个,每个Worker使用一个单独的端口,Worker对Topology中的每个组件运行一个或者多个Executor线程来提供Task的执行服务。
(2)Executor
Executor是产生于Worker进程内部的线程,会执行同一个组件的一个或者多个Task。
(3)Task
实际的数据处理由Task完成。在Topology的生命周期中,每个组件的Task数量不会变化,而Executor的数量却不一定。Executor数量小于等于Task的数量,在默认情况下,二者是相等的。
在运行一个Topology时,可以根据具体的情况来设置不同数量的Worker、Task、Executor,设置的位置也可以在多个地方。
1)Worker设置:可以设置yaml中的topology.workers属性。在代码中通过Conf?ig的setNumWorkers方法设定。
2)Executor设置:通过Topology的入口类中的setBolt、setSpout方法的最后一个参数指定,如果不指定,则使用默认值1。
3)Task设置:在默认情况下,和executor数量一致。在代码中通过TopologyBuilder的setNumTasks方法设定具体某个组件的Task数量。

 

  5. 终止Topology
在Nimbus启动的节点上,使用下面的命令来终止一个Topology的运行。
storm kill topologyName
执行kill之后,通过UI界面查看Topology状态,其先变成KILLED状态,清理完本地目录和ZooKeeper集群中与当前Topology相关的信息之后,此Topology将彻底消失。

 

  6.Topology跟踪
提交Topology后,可以在Storm UI界面查看整个Topology运行的过程。

 

 

 

 

   

         

         

 

 

  如下



本文转自大数据躺过的坑博客园博客,原文链接:http://www.cnblogs.com/zlslch/p/5989714.html,如需转载请自行联系原作者

相关文章
|
机器学习/深度学习 运维
Moment:又一个开源的时间序列基础模型
MOMENT团队推出Time-series Pile,一个大型公共时间序列数据集,用于预训练首个开源时间序列模型家族。模型基于Transformer,采用遮蔽预训练技术,适用于预测、分类、异常检测和输入任务。研究发现,随机初始化比使用语言模型权重更有效,且直接预训练的模型表现出色。MOMENT改进了Transformer架构,调整了Layer norm并引入关系位置嵌入。模型在长期预测和异常检测中表现优异,但对于数值预测的效果尚不明朗。论文贡献包括开源方法、数据集创建和资源有限情况下的性能评估框架。
1000 0
汇编语言之常见的汇编指令
汇编语言之常见的汇编指令
2076 0
汇编语言之常见的汇编指令
|
SQL 分布式计算 API
Spark学习------SparkSQL(概述、编程、数据的加载和保存)
Spark学习------SparkSQL(概述、编程、数据的加载和保存)
329 2
|
7月前
|
机器学习/深度学习 测试技术
ChronosX: 可使用外生变量的时间序列预测基础模型
时间序列预测中,基础模型虽在单变量任务中表现出色,但引入协变量支持仍面临挑战。Chronos研究团队提出ChronosX架构,通过适配器层有效整合历史与未来协变量信息,适用于任何单变量模型。实验表明,ChronosX显著提升预测性能,尤其在复杂数据集上优势明显。消融研究进一步验证了协变量模块的重要性。尽管需要轻量训练,但其灵活性和通用性为时间序列建模提供了新思路,未来或可通过类似LLM提示机制实现更高效的协变量处理。
410 16
ChronosX: 可使用外生变量的时间序列预测基础模型
|
7月前
|
存储 数据管理 网络虚拟化
特殊网络类型分类
本文介绍了网络技术中的关键概念,包括虚拟局域网(VLAN)、存储区域网络(SAN)、网络桥接、接入网以及按拓扑结构和交换方式分类的网络类型。VLAN通过逻辑分隔提高性能与安全性;SAN提供高性能的数据存储解决方案;网络桥接实现不同网络间的互联互通;接入网解决“最后一千米”的连接问题。此外,文章详细对比了总线型、星型、树型、环型和网状型等网络拓扑结构的特点,并分析了电路交换、报文交换和分组交换的优缺点,为网络设计与应用提供了全面参考。
247 8
|
分布式计算 数据库
Mapreduce中的Mapper&reducer
【9月更文挑战第19天】在 MapReduce 框架中,Mapper 和 Reducer 是处理大规模数据集的关键组件。Mapper 负责将输入数据分割成键值对,而 Reducer 则对这些键值对进行汇总处理,生成最终结果。两者通过并行处理和分布式计算协同工作,Mapper 将数据转换为键值对,Reducer 对相同键的值进行聚合。开发人员需实现相应接口并编写定制逻辑,以充分利用框架优势,处理大规模数据集并获得有价值的结果。
570 7
|
运维 监控 NoSQL
【Redis】哨兵(Sentinel)原理与实战全解~炒鸡简单啊
Redis 的哨兵模式(Sentinel)是一种用于实现高可用性的机制。它通过监控主节点和从节点,并在主节点故障时自动进行切换,确保集群持续提供服务。哨兵模式包括主节点、从节点和哨兵实例,具备监控、通知、自动故障转移等功能,能显著提高系统的稳定性和可靠性。本文详细介绍了哨兵模式的组成、功能、工作机制以及其优势和局限性,并提供了单实例的安装和配置步骤,包括系统优化、安装、配置、启停管理和性能监控等。此外,还介绍了如何配置主从复制和哨兵,确保在故障时能够自动切换并恢复服务。
|
Web App开发 数据采集 Python
让ChromeDriver 125顺利运行:解决找不到chromedriver.exe的技巧
本文介绍了如何解决Selenium使用ChromeDriver时遇到的版本不匹配问题,以及如何设置环境变量。同时,文章提供了示例代码,展示如何在Python中配置Selenium使用代理IP、设定User-Agent和Cookie进行网页抓取,以提高爬虫的效率和成功率。确保ChromeDriver与Chrome浏览器版本一致,将`chromedriver.exe`添加到环境变量,然后使用`Options`和`Proxy`类配置代理和浏览器选项,最后通过`webdriver.Chrome()`启动浏览器并执行抓取任务。
1129 0
让ChromeDriver 125顺利运行:解决找不到chromedriver.exe的技巧
|
机器学习/深度学习 人工智能 自动驾驶
实战案例分析:AI在特定行业的深度应用
【7月更文第20天】随着人工智能技术的飞速发展,其在各行各业的应用日益广泛且深入,不仅推动了产业创新,也极大地提升了服务效率与质量。本文将聚焦于金融、教育、和交通三大领域,通过具体案例与技术解析,展现AI如何在这三个行业中发挥着革命性的作用。
2199 0
|
机器学习/深度学习 人工智能 自然语言处理
Python转换Excel到Markdown
Python转换Excel到Markdown
432 0

热门文章

最新文章