Storm的BaseBasicBolt源码解析ack机制

本文涉及的产品
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
云解析 DNS,旗舰版 1个月
容器服务 Serverless 版 ACK Serverless,317元额度 多规格
简介: 我们在学习ack机制的时候,我们知道Storm的Bolt有BaseBasicBolt和BaseRichBolt。在BaseBasicBolt中,BasicOutputCollector在emit数据的时候,会自动和输入的tuple相关联,而在execute方法结束的时候那个输入tuple会被自动ack。

我们在学习ack机制的时候,我们知道Storm的Bolt有BaseBasicBolt和BaseRichBolt。
在BaseBasicBolt中,BasicOutputCollector在emit数据的时候,会自动和输入的tuple相关联,而在execute方法结束的时候那个输入tuple会被自动ack。
在使用BaseRichBolt需要在emit数据的时候,显示指定该数据的源tuple要加上第二个参数anchor tuple,以保持tracker链路,即collector.emit(oldTuple, newTuple);并且需要在execute执行成功后调用OutputCollector.ack(tuple), 当失败处理时,执行OutputCollector.fail(tuple);

那么我们来看看BasicBolt的源码是不是这样的,不能因为看到别人的帖子说是这样的,我们就这样任务,以讹传讹,我们要To see is to believe。

 

为了方便看源代码,我先上我们的继承类:

public class SplitSentenceBolt extends BaseBasicBolt {  public void prepare(Map stormConf, TopologyContext context) {
        super.prepare(stormConf, context);
    }
    
  //5:执行我们自己的逻辑处理方法,接收传入的参数。
  
public void execute(Tuple input, BasicOutputCollector collector) { String sentence = (String)input.getValueByField("sentence"); String[] words = sentence.split(" "); for (String word : words) { word = word.trim(); word = word.toLowerCase(); collector.emit(new Values(word,1));//这个地方就是调用OutputCollector的包装类,来发消息 } } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word","num")); } }

 



通过打断点,我们发现,bolt的task会创建这个类下面会标准执行顺序

public class BasicBoltExecutor implements IRichBolt {
    public static Logger LOG = LoggerFactory.getLogger(BasicBoltExecutor.class);    
    
    private IBasicBolt _bolt;
    private transient BasicOutputCollector _collector;
    //1:创建该对象,然后把我们写的SplitSentenceBolt对象赋给父类IBasicBolt。
    public BasicBoltExecutor(IBasicBolt bolt) {
        _bolt = bolt;
    }
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        _bolt.declareOutputFields(declarer);//这里就是调用SplitSentenceBolt对象的方法了。
    }
   //2:给BasicOutputCollector _collector字段赋值,BasicOutputCollector就是对OutputCollector类的包装。
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        _bolt.prepare(stormConf, context);
        _collector = new BasicOutputCollector(collector);
    }
  //3:然后程序执行该方法,input的值source: spout1:4, stream: default, id: {}, [+ - * % /]
    public void execute(Tuple input) {
        _collector.setContext(input);//把接收到的tuple值设置给BasicOutputCollector中inputTuple字段。
        try {
            _bolt.execute(input, _collector);//这个地方是调用我们实现类SplitSentenceBolt的ececute方法。
            _collector.getOutputter().ack(input);//这个地方就是响应
        } catch(FailedException e) {
            if(e instanceof ReportedFailedException) {
                _collector.reportError(e);
            }
            _collector.getOutputter().fail(input);//这个地方就是响应
        }
    }
    public void cleanup() {
        _bolt.cleanup();
    }
    public Map<String, Object> getComponentConfiguration() {
        return _bolt.getComponentConfiguration();
    }
}

 

 

public class BasicOutputCollector implements IBasicOutputCollector {
    private OutputCollector out;
    private Tuple inputTuple;
    public BasicOutputCollector(OutputCollector out) {
        this.out = out;
    }
//4:把收到的tuple数据赋值给inputTuple,这个时候BasicOutputCollector对象的字段都具有值了。
   public void setContext(Tuple inputTuple) {
        this.inputTuple = inputTuple; }
   //6:这里我们发送新的(转换后的)tuple数据,看他内部的调用,其实他也会发送一个anchor tuple保持tracker链路
而这个anchor tuple就是bolt接收到转换前的源tuple数据。

  public List<Integer> emit(List<Object> tuple) {
     return emit(Utils.DEFAULT_STREAM_ID, tuple);
   }
public List<Integer> emit(String streamId, List<Object> tuple) { return out.emit(streamId, inputTuple, tuple); } public void emitDirect(int taskId, String streamId, List<Object> tuple) { out.emitDirect(taskId, streamId, inputTuple, tuple); } public void emitDirect(int taskId, List<Object> tuple) { emitDirect(taskId, Utils.DEFAULT_STREAM_ID, tuple); } protected IOutputCollector getOutputter() { return out; } public void reportError(Throwable t) { out.reportError(t); } }

这里大家不要纠结bolt的启动时从哪里开始的,我后面会讲的,这里我们关注的是,BasicBoltExecutor对象创建后的执行过程,以这我们来看执行的过程。在BasicBoltExecutor的execute方法中,我们看到了ack和fail方法会被自动调用的,当我们的程序抛出异常则会执行fail方法的。

这个

 

作者: intsmaze(刘洋)
老铁,你的--->推荐,--->关注,--->评论--->是我继续写作的动力。
微信公众号号:Apache技术研究院
由于博主能力有限,文中可能存在描述不正确,欢迎指正、补充!
本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。
相关实践学习
通过Ingress进行灰度发布
本场景您将运行一个简单的应用,部署一个新的应用用于新的发布,并通过Ingress能力实现灰度发布。
容器应用与集群管理
欢迎来到《容器应用与集群管理》课程,本课程是“云原生容器Clouder认证“系列中的第二阶段。课程将向您介绍与容器集群相关的概念和技术,这些概念和技术可以帮助您了解阿里云容器服务ACK/ACK Serverless的使用。同时,本课程也会向您介绍可以采取的工具、方法和可操作步骤,以帮助您了解如何基于容器服务ACK Serverless构建和管理企业级应用。 学习完本课程后,您将能够: 掌握容器集群、容器编排的基本概念 掌握Kubernetes的基础概念及核心思想 掌握阿里云容器服务ACK/ACK Serverless概念及使用方法 基于容器服务ACK Serverless搭建和管理企业级网站应用
相关文章
|
9天前
|
监控 Java 应用服务中间件
高级java面试---spring.factories文件的解析源码API机制
【11月更文挑战第20天】Spring Boot是一个用于快速构建基于Spring框架的应用程序的开源框架。它通过自动配置、起步依赖和内嵌服务器等特性,极大地简化了Spring应用的开发和部署过程。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,特别是spring.factories文件的解析源码API机制。
33 2
|
1月前
|
存储 缓存 算法
分布式锁服务深度解析:以Apache Flink的Checkpointing机制为例
【10月更文挑战第7天】在分布式系统中,多个进程或节点可能需要同时访问和操作共享资源。为了确保数据的一致性和系统的稳定性,我们需要一种机制来协调这些进程或节点的访问,避免并发冲突和竞态条件。分布式锁服务正是为此而生的一种解决方案。它通过在网络环境中实现锁机制,确保同一时间只有一个进程或节点能够访问和操作共享资源。
72 3
|
9天前
|
存储 安全 Linux
Golang的GMP调度模型与源码解析
【11月更文挑战第11天】GMP 调度模型是 Go 语言运行时系统的核心部分,用于高效管理和调度大量协程(goroutine)。它通过少量的操作系统线程(M)和逻辑处理器(P)来调度大量的轻量级协程(G),从而实现高性能的并发处理。GMP 模型通过本地队列和全局队列来减少锁竞争,提高调度效率。在 Go 源码中,`runtime.h` 文件定义了关键数据结构,`schedule()` 和 `findrunnable()` 函数实现了核心调度逻辑。通过深入研究 GMP 模型,可以更好地理解 Go 语言的并发机制。
|
11天前
|
存储 消息中间件 算法
深入探索操作系统的心脏——内核机制解析
本文旨在揭示操作系统核心——内核的工作原理,通过剖析其关键组件与机制,为读者提供一个清晰的内核结构图景。不同于常规摘要的概述性内容,本文摘要将直接聚焦于内核的核心概念、主要功能以及其在系统管理中扮演的角色,旨在激发读者对操作系统深层次运作原理的兴趣与理解。
|
22天前
|
消息中间件 缓存 安全
Future与FutureTask源码解析,接口阻塞问题及解决方案
【11月更文挑战第5天】在Java开发中,多线程编程是提高系统并发性能和资源利用率的重要手段。然而,多线程编程也带来了诸如线程安全、死锁、接口阻塞等一系列复杂问题。本文将深度剖析多线程优化技巧、Future与FutureTask的源码、接口阻塞问题及解决方案,并通过具体业务场景和Java代码示例进行实战演示。
39 3
|
23天前
|
存储 缓存 安全
🌟Java零基础:深入解析Java序列化机制
【10月更文挑战第20天】本文收录于「滚雪球学Java」专栏,专业攻坚指数级提升,希望能够助你一臂之力,帮你早日登顶实现财富自由🚀;同时,欢迎大家关注&&收藏&&订阅!持续更新中,up!up!up!!
24 3
|
29天前
|
Java 开发者 UED
Java编程中的异常处理机制解析
在Java的世界里,异常处理是确保程序稳定性和可靠性的关键。本文将深入探讨Java的异常处理机制,包括异常的类型、如何捕获和处理异常以及自定义异常的创建和使用。通过理解这些概念,开发者可以编写更加健壮和易于维护的代码。
|
1月前
|
存储
让星星⭐月亮告诉你,HashMap的put方法源码解析及其中两种会触发扩容的场景(足够详尽,有问题欢迎指正~)
`HashMap`的`put`方法通过调用`putVal`实现,主要涉及两个场景下的扩容操作:1. 初始化时,链表数组的初始容量设为16,阈值设为12;2. 当存储的元素个数超过阈值时,链表数组的容量和阈值均翻倍。`putVal`方法处理键值对的插入,包括链表和红黑树的转换,确保高效的数据存取。
56 5
|
1月前
|
Java Spring
Spring底层架构源码解析(三)
Spring底层架构源码解析(三)
113 5
|
1月前
|
JavaScript 前端开发 开发者
原型链深入解析:JavaScript中的核心机制
【10月更文挑战第13天】原型链深入解析:JavaScript中的核心机制
32 0

推荐镜像

更多
下一篇
无影云桌面