经验大分享:Storm概念学习系列之Blot消息处理者

简介: 经验大分享:Storm概念学习系列之Blot消息处理者

  不多说,直接上干货!

Bolt消息处理者

  认识了消息源Spout和消息的数据存储元组Tuple,接下来了解消息的处理者Bolt。Bolt是接收Spout发出元组Tuple后处理数据的组件,所有的消息处理逻辑被封装在Bolt中,Bolt负责处理输入的数据流并产生输出的新数据流。

1. Bolt介绍

  消息处理者Bolt在Storm中是一个被动的角色。Bolt把元组作为输入,然后产生新的元组作为输出。

  1.1 Bolt的功能

  Bolt可以执行过滤、函数操作、合并、写数据库等操作。Bolt还可以简单地 传递消息流,复杂的消息流处理往往需要很多步骤,因此也就需要很多Bolt来处理。

  Bolt可以发出超过一个的流。为此,使用OutputFieldsDeclarer类的declareStream()方法声明多个流,并使用OutputCollector类的emit()方法指定发射的流。

  1.2 Bolt的生命周期

  首先,客户端机器创建Bolt,然后将其序列化为拓扑,并提交给集群中的主机。之后集群启动Worker进程,反序列化Bolt,调用prepare方法开始处理元组。

  接下来,Bolt处理Tuple,Bolt处理一个输入Tuple,发射0个或者多个Tuple。

然后,调用ack通知Storm自己已经处理过这个Tuple了。Storm提供了一个IBasicBolt自动调用ack。

Bolt类接收由Spout或者其他上游Bolt类发来的Tuple,对其进行处理。Bolt的生命周期如图1所示。

                         图1 Bolt的生命周期

在创建Bolt对象时,通过构造方法初始化成员变量,当Bolt被提交到集群时,这些成员变量也会被序列化,所以通过反序列化,可以获取到这些成员变量。

  1.3 Bolt的组件

  IComponent顾名思义,是所有组件的接口:IBasicBolt、IRichBolt、IBatchBolt都继承自IComponent;

IBolt接口是IRichBolt要继承的接口;

还有一些以Base开头的Bolt类,如BaseBasicBolt、BaseBatchBolt、BaseRichBolt、BaseTransactionalBolt等,在这些类中需要注意的是所实现的方法都为空,或者返回值为null,其中,还有一个接口BaseComponent,是Storm提供的一个比较方便的抽象类,这个抽象类及其子类都或多或少实现了其接口定义的部分方法。从图1中,可以从整体上看到这些类的关系图,从而理清这些类之间的关系及结构。

                      图2 Bolt相关组件的继承关系图

1.4 Bolt常用类

  Bolt比较常用的类是BaseRichBolt、BaseBasicBolt等。这两个类继承的父类如图3和图4所示,它们的共同之处是父类中都有BaseComponent和ICompont。不同之处是BaseRichBolt的父接口中有IBolt和IRichBolt,而BaseBasicBolt只有IBasicBolt。

                        图3 BaseRichBolt类图

                         图4 BaseBasicBolt类图

  比较完了父类,还没有真正从使用的本质上区别这两者。下面就比较这两个类的方法。图5为IBolt接口的方法,这是BaseRichBolt继承的父接口或者类之一,IBolt具备的方法与IBasicBolt的方法结构类似,但是有本质区别,那就是方法的作用不同。IBasicBolt接口的方法如图6所示。

                         图5 IBolt接口的主要方法

                          图6 IBasicBolt接口的主要方法

   IBolt继承了java.io.Serializable,在Nimbus上提交Topology以后,创建出来的Bolt在序列化后被发送到具体执行的Worker上,Worker在执行该Bolt时,先调用prepare方法传入当前执行的上下文,然后调用execute方法,对Tuple进行处理,并用prepare方法传入的OutputCollector的ack方法(表示成功)或fail方法(表示失败)来反馈处理结果。而IBasicBolt接口在执行execute方法时,自动调用ack方法,其目的就是实现该接口的Bolt时,不用在代码中提供反馈结果,Storm内部会自动反馈成功。

Bolt实例

  下面的ClassifyBolt实现了BaseRichBolt接口,该类需要实现的主要方法如图7所示。

                          图7 ClassifyBolt的主要方法

  1、prepare方法

  prepare方法和Spout中的open方法类似,为Bolt提供了OutputCollector,用来从Bolt中发送Tuple。在Bolt中载入新的线程进行异步处理。OutputCollector是线程安全的,并且随时都可以调用它。

  在Bolt中,Tuple的发送可以在prepare、execute、cleanup等方法中进行,但一般都是在execute中进行。

  示例代码如下:

public void prepare(Map conf, TopologyContext context, OutputCollector collector) {

_collector = collector;

}

  2、declareOutputFields方法

  用于声明当前Bolt发送的Tuple中包含的字段,和Spout中的类似。当前Bolt类发送的Tuple包含了两个字段:gt和lt。

  示例代码如下:

public void declareOutputFields(OutputFieldsDeclarer declarer) {

// 在geThan流中声明为gt

declarer.declareStream("geThan", new Fields("gt"));

// 在lessThan流中声明为lt

declarer.declareStream("lessThan", new Fields("lt"));

}

  Bolt可以发射多条消息流,使用OutputFieldsDeclarer.declareStream方法来定义流,之后使用OutputCollector.emit来选择要发射的流。

  3、getComponentConf?iguration方法

  和Spout类一样,在Bolt中也可以有getComponentConf?iguration方法。示例代码如下:

public Map getComponentConf?iguration() {

Map conf = new HashMap();

conf.put(Conf?ig.TOPOLOGY_TICK_TUPLE_FREQ_SECS,

emitFrequencyInSeconds);

return conf;

}

  此例定义了从系统组件“_system”的“_tick”流中发送Tuple到当前Bolt的频率,当系统需要每隔一段时间执行特定的处理时,就可以利用该系统组件的特性来完成。

  4、execute方法

  Bolt的主要方法是execute,它以一个Tuple作为输入,Bolt使用OutputCollector来发射Tuple,Bolt必须为它处理的每一个Tuple调用OutputCollector的ack方法,以通知Storm该Tuple被处理完成了,从而通知该Tuple的发射者Spout。

public void execute(Tuple input) {

int randomInt = input.getIntegerByField("randomInt");

// 大于等于50的放在一起

if(randomInt >= CLASSIFY_FLAG){

collector.emit("geThan", new Values(randomInt));

}else{

// 小于50的放在一起

collector.emit("lessThan",new Values(randomInt));

}

collector.ack(input);

}

  execute是Bolt中最关键的一个方法,对Tuple的处理都可以放到此方法中进行。具体的发送也是通过emit方法来完成的。此时,emit方法有两种情况,一种是方法中只有一个参数,另一种是方法中有两个参数。

  1)emit有一个参数:该参数是发送到下游Bolt的Tuple,此时,由上游发来的旧的Tuple在此隔断,新的Tuple和旧的Tuple不再属于同一棵Tuple树。新的Tuple另起一棵新的Tuple树。

  2)emit有两个参数:第一个参数是旧的Tuple的输入流,第二个参数是发往下游Bolt的新的Tuple流。此时,新的Tuple和旧的Tuple仍然属于同一棵Tuple树,即如果下游的Bolt处理Tuple失败,则向上传递到当前Bolt,当前Bolt根据旧的Tuple继续往上游传递,申请重发失败的Tuple,保证Tuple处理的可靠性。

  这两种情况都要根据用户的场景来确定。示例代码如下:

public void execute(Tuple tuple) {

_collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));

_collector.ack(tuple);

}

public void execute(Tuple tuple) {

_collector.emit(new Values(tuple.getString(0) + "!!!"));

}

   此外还有ack、fail、cleanup等方法,其中cleanup方法和Spout中的close方法类似,都是在当前组件关闭时调用,//代码效果参考:http://www.zidongmutanji.com/zsjx/175917.html

但是针对实时计算来说,除非一些特殊的场景要求以外,这两个方法一般都很少用到。

  如下面,

作者:大数据和人工智能躺过的坑

出处:

本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文链接,否则保留追究法律责任的权利。

如果您认为这篇文章还不错或者有所收获,您可以通过右边的“打赏”功能 打赏我一杯咖啡【物质支持】,也可以点击右下角的【好文要顶】按钮【精神支持】,因为这两种支持都是我继续写作,分享的最大动力!

相关实践学习
通过Ingress进行灰度发布
本场景您将运行一个简单的应用,部署一个新的应用用于新的发布,并通过Ingress能力实现灰度发布。
容器应用与集群管理
欢迎来到《容器应用与集群管理》课程,本课程是“云原生容器Clouder认证“系列中的第二阶段。课程将向您介绍与容器集群相关的概念和技术,这些概念和技术可以帮助您了解阿里云容器服务ACK/ACK Serverless的使用。同时,本课程也会向您介绍可以采取的工具、方法和可操作步骤,以帮助您了解如何基于容器服务ACK Serverless构建和管理企业级应用。 学习完本课程后,您将能够: 掌握容器集群、容器编排的基本概念 掌握Kubernetes的基础概念及核心思想 掌握阿里云容器服务ACK/ACK Serverless概念及使用方法 基于容器服务ACK Serverless搭建和管理企业级网站应用
相关文章
|
7月前
|
存储 人工智能 安全
经验大分享:Storm概念学习系列之Blot消息处理者
经验大分享:Storm概念学习系列之Blot消息处理者
46 0
|
5月前
|
消息中间件 Kafka Java
Spring 框架与 Kafka 联姻,竟引发软件世界的革命风暴!事件驱动架构震撼登场!
【8月更文挑战第31天】《Spring 框架与 Kafka 集成:实现事件驱动架构》介绍如何利用 Spring 框架的强大功能与 Kafka 分布式流平台结合,构建灵活且可扩展的事件驱动系统。通过添加 Spring Kafka 依赖并配置 Kafka 连接信息,可以轻松实现消息的生产和消费。文中详细展示了如何设置 `KafkaTemplate`、`ProducerFactory` 和 `ConsumerFactory`,并通过示例代码说明了生产者发送消息及消费者接收消息的具体实现。这一组合为构建高效可靠的分布式应用程序提供了有力支持。
131 0
|
5月前
|
存储 Java 流计算
Flink 分布式快照,神秘机制背后究竟隐藏着怎样的惊人奥秘?快来一探究竟!
【8月更文挑战第26天】Flink是一款开源框架,支持有状态流处理与批处理任务。其核心功能之一为分布式快照,通过“检查点(Checkpoint)”机制确保系统能在故障发生时从最近的一致性状态恢复,实现可靠容错。Flink通过JobManager触发检查点,各节点暂停接收新数据并保存当前状态至稳定存储(如HDFS)。采用“异步屏障快照(Asynchronous Barrier Snapshotting)”技术,插入特殊标记“屏障(Barrier)”随数据流传播,在不影响整体流程的同时高效完成状态保存。例如可在Flink中设置每1000毫秒进行一次检查点并指定存储位置。
107 0
|
8月前
|
机器学习/深度学习 分布式计算 BI
Flink实时流处理框架原理与应用:面试经验与必备知识点解析
【4月更文挑战第9天】本文详尽探讨了Flink实时流处理框架的原理,包括运行时架构、数据流模型、状态管理和容错机制、资源调度与优化以及与外部系统的集成。此外,还介绍了Flink在实时数据管道、分析、数仓与BI、机器学习等领域的应用实践。同时,文章提供了面试经验与常见问题解析,如Flink与其他系统的对比、实际项目挑战及解决方案,并展望了Flink的未来发展趋势。附带Java DataStream API代码样例,为学习和面试准备提供了实用素材。
535 0
|
存储 NoSQL 算法
【Storm】Storm实战之频繁二项集挖掘(附源码)
针对大叔据实时处理的入门,除了使用WordCount示例之外,还需要相对更深入点的示例来理解Storm,因此,本篇博文利用Storm实现了频繁项集挖掘的案例,以方便更好的入门Storm。
125 0
【Storm】Storm实战之频繁二项集挖掘(附源码)
|
jstorm 监控 测试技术
性能、稳定性、反压、Exactly Once,Jstorm开源最佳实践全解析
在2017年在线技术峰会——阿里开源项目最佳实践上,来自阿里巴巴中间件的技术专家卫乐分享了Jstorm开源最佳实践。他主要介绍了Jstorm的架构,从性能、稳定性、监控系统、大规模部署、反压、灰度发布&热升级、Exactly-Once、新的窗口机制等方面详细扥想了Jstorm做的优化。
9608 0
|
流计算
一决高下,分布式流处理框架孰优孰劣
本文PPT来自技术专家毛玮于10月16日在2016年杭州云栖大会上发表的《分布式流处理框架--功能对比和性能评估》。
6029 1
|
存储 分布式计算 大数据
探寻流式计算
流计算的出现拓宽了应对复杂实时计算需求能力。Storm作为流计算的利器,极大方便了应用。
|
前端开发 调度 安全
《Akka应用模式:分布式应用程序设计实践指南》读书笔记4
优秀的Actor设计   我看到这一章节真是如获至宝,毕竟它阐释了如何使用Akka,以及优秀的设计模式。 大系统小做   其实作者想告诉我们的是要尽量提前关注actor级别的设计,而不是仅仅考虑大的架构。
1705 0
|
设计模式 消息中间件 程序员
《Akka应用模式:分布式应用程序设计实践指南》读书笔记1
  作者属于Scala、Akka技术爱好者,但苦于Akka没有关于设计模式的文章,偶尔搜到《Akka应用模式》一书,如获至宝。现整理一些读书笔记和自己的感悟,以供参考。 Actor模型 Actor模型感觉还是很给力的,要是按我以前学习actor模型,绝对会对他嗤之以鼻,这玩儿意能干啥。
1870 0