Storm概念学习系列之Blot消息处理者

简介:

Bolt消息处理者

  认识了消息源Spout和消息的数据存储元组Tuple,接下来了解消息的处理者Bolt。Bolt是接收Spout发出元组Tuple后处理数据的组件,所有的消息处理逻辑被封装在Bolt中,Bolt负责处理输入的数据流并产生输出的新数据流

 

  1. Bolt介绍
消息处理者Bolt在Storm中是一个被动的角色。Bolt把元组作为输入,然后产生新的元组作为输出


1.1 Bolt的功能
Bolt可以执行过滤、函数操作、合并、写数据库等操作。Bolt还可以简单地传递消息流,复杂的消息流处理往往需要很多步骤,因此也就需要很多Bolt来处理

  Bolt可以发出超过一个的流。为此,使用OutputFieldsDeclarer类的declareStream()方法声明多个流,并使用OutputCollector类的emit()方法指定发射的流

 

 


1.2 Bolt的生命周期
首先,客户端机器创建Bolt,然后将其序列化为拓扑,并提交给集群中的主机。之后集群启动Worker进程,反序列化Bolt,调用prepare方法开始处理元组。
接下来,Bolt处理Tuple,Bolt处理一个输入Tuple,发射0个或者多个Tuple。

       然后,调用ack通知Storm自己已经处理过这个Tuple了。Storm提供了一个IBasicBolt自动调用ack。

       Bolt类接收由Spout或者其他上游Bolt类发来的Tuple,对其进行处理。Bolt的生命周期如图1所示。

 

                         图1    Bolt的生命周期

                     

         在创建Bolt对象时,通过构造方法初始化成员变量,当Bolt被提交到集群时,这些成员变量也会被序列化,所以通过反序列化,可以获取到这些成员变量。

 

      

  1.3 Bolt的组件
IComponent顾名思义,是所有组件的接口:IBasicBolt、IRichBolt、IBatchBolt都继承自IComponent;

       IBolt接口是IRichBolt要继承的接口;

       还有一些以Base开头的Bolt类,如BaseBasicBolt、BaseBatchBolt、BaseRichBolt、BaseTransactionalBolt等,在这些类中需要注意的是所实现的方法都为空,或者返回值为null,其中,还有一个接口BaseComponent,是Storm提供的一个比较方便的抽象类,这个抽象类及其子类都或多或少实现了其接口定义的部分方法。从图1中,可以从整体上看到这些类的关系图,从而理清这些类之间的关系及结构。

         

                      图2   Bolt相关组件的继承关系图

 

 

         1.4 Bolt常用类
Bolt比较常用的类是BaseRichBolt、BaseBasicBolt等。这两个类继承的父类如图3和图4所示,它们的共同之处是父类中都有BaseComponent和ICompont。不同之处是BaseRichBolt的父接口中有IBolt和IRichBolt,而BaseBasicBolt只有IBasicBolt。

                           

 

                         图3    BaseRichBolt类图

 

 

              

                         图4    BaseBasicBolt类图

       

   比较完了父类,还没有真正从使用的本质上区别这两者。下面就比较这两个类的方法。图5为IBolt接口的方法,这是BaseRichBolt继承的父接口或者类之一,IBolt具备的方法与IBasicBolt的方法结构类似,但是有本质区别,那就是方法的作用不同。IBasicBolt接口的方法如图6所示。

              

                          图5    IBolt接口的主要方法

 

 

             

                           图6    IBasicBolt接口的主要方法

 

   IBolt继承了java.io.Serializable,在Nimbus上提交Topology以后,创建出来的Bolt在序列化后被发送到具体执行的Worker上,Worker在执行该Bolt时,先调用prepare方法传入当前执行的上下文,然后调用execute方法,对Tuple进行处理,并用prepare方法传入的OutputCollector的ack方法(表示成功)或fail方法(表示失败)来反馈处理结果。而IBasicBolt接口在执行execute方法时,自动调用ack方法,其目的就是实现该接口的Bolt时,不用在代码中提供反馈结果,Storm内部会自动反馈成功。

 

 

 

 

Bolt实例
下面的ClassifyBolt实现了BaseRichBolt接口,该类需要实现的主要方法如图7所示。

                  

                          图7    ClassifyBolt的主要方法

             

   1、prepare方法
prepare方法和Spout中的open方法类似,为Bolt提供了OutputCollector,用来从Bolt中发送Tuple。在Bolt中载入新的线程进行异步处理。OutputCollector是线程安全的,并且随时都可以调用它。
在Bolt中,Tuple的发送可以在prepare、execute、cleanup等方法中进行,但一般都是在execute中进行。
示例代码如下:

public void prepare(Map conf, TopologyContext context, OutputCollector collector) {      
      _collector = collector;
 }

 

 

 

   2、declareOutputFields方法
用于声明当前Bolt发送的Tuple中包含的字段,和Spout中的类似。当前Bolt类发送的Tuple包含了两个字段:gt和lt。
示例代码如下:

public void declareOutputFields(OutputFieldsDeclarer declarer) {
      // 在geThan流中声明为gt
      declarer.declareStream("geThan", new Fields("gt"));
      // 在lessThan流中声明为lt
      declarer.declareStream("lessThan", new Fields("lt"));
}

  Bolt可以发射多条消息流,使用OutputFieldsDeclarer.declareStream方法来定义流,之后使用OutputCollector.emit来选择要发射的流。

 

 


3、getComponentConf?iguration方法
和Spout类一样,在Bolt中也可以有getComponentConf?iguration方法。示例代码如下:

 

复制代码
public Map<String, Object> getComponentConf?iguration() {
      Map<String, Object> conf = new HashMap<String, Object>();
          conf.put(Conf?ig.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 
  emitFrequencyInSeconds);           
  return conf; 
}
复制代码

  此例定义了从系统组件“_system”的“_tick”流中发送Tuple到当前Bolt的频率,当系统需要每隔一段时间执行特定的处理时,就可以利用该系统组件的特性来完成。

 

 

 

  4、execute方法
Bolt的主要方法是execute,它以一个Tuple作为输入,Bolt使用OutputCollector来发射Tuple,Bolt必须为它处理的每一个Tuple调用OutputCollector的ack方法,以通知Storm该Tuple被处理完成了,从而通知该Tuple的发射者Spout。

 

复制代码
public void execute(Tuple input) {
      int randomInt = input.getIntegerByField("randomInt");
// 大于等于50的放在一起
      if(randomInt >= CLASSIFY_FLAG){
           collector.emit("geThan", new Values(randomInt));
      }else{
// 小于50的放在一起
           collector.emit("lessThan",new Values(randomInt));
      }
      collector.ack(input);
   }
复制代码

  execute是Bolt中最关键的一个方法,对Tuple的处理都可以放到此方法中进行。具体的发送也是通过emit方法来完成的。此时,emit方法有两种情况,一种是方法中只有一个参数,另一种是方法中有两个参数。
1)emit有一个参数:该参数是发送到下游Bolt的Tuple,此时,由上游发来的旧的Tuple在此隔断,新的Tuple和旧的Tuple不再属于同一棵Tuple树。新的Tuple另起一棵新的Tuple树。
2)emit有两个参数:第一个参数是旧的Tuple的输入流,第二个参数是发往下游Bolt的新的Tuple流。此时,新的Tuple和旧的Tuple仍然属于同一棵Tuple树,即如果下游的Bolt处理Tuple失败,则向上传递到当前Bolt,当前Bolt根据旧的Tuple继续往上游传递,申请重发失败的Tuple,保证Tuple处理的可靠性。

 


这两种情况都要根据用户的场景来确定。示例代码如下:

复制代码
public void execute(Tuple tuple) {
      _collector.emit(tuple, new Values(tuple.getString(0) + "!!!")); 
      _collector.ack(tuple); 
   } 
public void execute(Tuple tuple) {
      _collector.emit(new Values(tuple.getString(0) + "!!!"));
 }
复制代码

   此外还有ack、fail、cleanup等方法,其中cleanup方法和Spout中的close方法类似,都是在当前组件关闭时调用,但是针对实时计算来说,除非一些特殊的场景要求以外,这两个方法一般都很少用到。

 

 

 

 

 

 

 

  如下面,


本文转自大数据躺过的坑博客园博客,原文链接:http://www.cnblogs.com/zlslch/p/5989707.html,如需转载请自行联系原作者

相关实践学习
深入解析Docker容器化技术
Docker是一个开源的应用容器引擎,让开发者可以打包他们的应用以及依赖包到一个可移植的容器中,然后发布到任何流行的Linux机器上,也可以实现虚拟化,容器是完全使用沙箱机制,相互之间不会有任何接口。Docker是世界领先的软件容器平台。开发人员利用Docker可以消除协作编码时“在我的机器上可正常工作”的问题。运维人员利用Docker可以在隔离容器中并行运行和管理应用,获得更好的计算密度。企业利用Docker可以构建敏捷的软件交付管道,以更快的速度、更高的安全性和可靠的信誉为Linux和Windows Server应用发布新功能。 在本套课程中,我们将全面的讲解Docker技术栈,从环境安装到容器、镜像操作以及生产环境如何部署开发的微服务应用。本课程由黑马程序员提供。 &nbsp; &nbsp; 相关的阿里云产品:容器服务 ACK 容器服务 Kubernetes 版(简称 ACK)提供高性能可伸缩的容器应用管理能力,支持企业级容器化应用的全生命周期管理。整合阿里云虚拟化、存储、网络和安全能力,打造云端最佳容器化应用运行环境。 了解产品详情: https://www.aliyun.com/product/kubernetes
相关文章
|
Scala 流计算
Flink / Scala - 使用 CountWindow 实现按条数触发窗口
CountWindow 数量窗口分为滑动窗口与滚动窗口,类似于之前 TimeWindow 的滚动时间与滑动时间,这里滚动窗口不存在元素重复而滑动窗口存在元素重复的情况,下面 demo 场景为非重复场景,所以将采用滚动窗口。......
1088 0
Flink / Scala - 使用 CountWindow 实现按条数触发窗口
|
API 项目管理 数据安全/隐私保护
OpenStack创建和管理用户
【8月更文挑战第5天】
617 11
|
12月前
|
存储 监控 数据处理
flink 向doris 数据库写入数据时出现背压如何排查?
本文介绍了如何确定和解决Flink任务向Doris数据库写入数据时遇到的背压问题。首先通过Flink Web UI和性能指标监控识别背压,然后从Doris数据库性能、网络连接稳定性、Flink任务数据处理逻辑及资源配置等方面排查原因,并通过分析相关日志进一步定位问题。
805 61
|
JavaScript
Vue3基础(四)___Vue-Router
本文介绍了在Vue 3中如何使用`vue-router@4`进行路由管理,包括安装路由库、定义路由配置、在组件中使用`useRouter`和`useRoute`钩子函数,以及如何在组件中进行路由跳转和获取路由参数。
191 1
Vue3基础(四)___Vue-Router
|
消息中间件 Kafka Apache
Flink 提供了与 Kafka 集成的官方 Connector,使得 Flink 能够消费 Kafka 数据
【2月更文挑战第6天】Flink 提供了与 Kafka 集成的官方 Connector,使得 Flink 能够消费 Kafka 数据
518 2
|
12月前
|
安全 测试技术 数据安全/隐私保护
原生鸿蒙的竞争力到底如何?
长期以来,移动操作系统市场被IOS和安卓所垄断,一直都难以推出完整的自主系统,面临诸多挑战,如推广困难、应用适配难度大,以及技术底座缺乏自主性。但原生鸿蒙操作系统展示其在突破这些瓶颈方面的努力,基于安全牢固的“鸿蒙内核”,上层应用的开发与创新得以实现,不再被卡脖子,更不牵制于外界。本身该系统在OS内核、框架、数据库等方面进行全面自研,实现真正的自主可控。
406 4
|
JavaScript Java 关系型数据库
Springboot+vue的网上鲜花商城管理系统。Javaee项目,springboot vue前后端分离项目。
Springboot+vue的网上鲜花商城管理系统。Javaee项目,springboot vue前后端分离项目。
|
监控 大数据 数据处理
大数据组件之Storm简介
【5月更文挑战第2天】Apache Storm是用于实时大数据处理的分布式系统,提供容错和高可用的实时计算。核心概念包括Topology(由Spouts和Bolts构成的DAG)、Spouts(数据源)和Bolts(数据处理器)。Storm通过acker机制确保数据完整性。常见问题包括数据丢失、性能瓶颈和容错理解不足。避免这些问题的方法包括深入学习架构、监控日志、性能调优和编写健壮逻辑。示例展示了实现单词计数的简单Topology。进阶话题涵盖数据延迟、倾斜的处理,以及Trident状态管理和高级实践,强调调试、性能优化和数据安全性。
766 4
|
Java Unix Linux
grep命令的使用方法及实用技巧详解
grep命令的使用方法及实用技巧详解
|
存储 NoSQL Java
Spring Boot与Cassandra数据库的集成应用
Spring Boot与Cassandra数据库的集成应用
下一篇
开通oss服务