Storm之Collector-p1

简介:



IBasicOutputCollector.java   


List<Integer> emit(String streamId, List<Object> tuple);

提交一系列的tuple,返回接收到这些tuple的taskId


void emitDirect(int taskId, String streamId, List<Object> tuple);

直接向某个task提交一系列的tuple


BasicOutputCollector.java 

这个类里封装了一个OutputCollector的代理和一个inputTuple

OutputCollector是在构造函数里传入的,在Bolt处理完tuple之后调用此类的emit方法时,方法内部会调用封装的OutputCollector来进行emit,

最终的emit是OutputCollector的emit

此类还提供了两个emit方法的重载,目的是在没有指定streamId的时候提供一个默认名为“default”的streamId.

此类的emit方法没有提供anchors参数,每次bolt执行完之后进行emit时会自动将输入tuples和输出tuples关联,如果不需要关联,则可不用此类。


IOutputCollector.java


List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple);

提交一系列的tuple,返回接收到这些tuple的taskId,anchors参数是指接收到的tuples


void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple);

直接向某个task提交一系列tuple,同样的也会附带上输入的tuple


void ack(Tuple input);

ack某个tuple


void fail(Tuple input);

fail某个tuple


OutputCollector.java

封装了一个IOutputCollector的代理,该代理在构造函数时传递进来被初始化。

提供了多个emit方法的重载,基本上包括有(单个anchor,无anchor,无指定streamId,只有输出的tuple)这些

emitDirect重载的方式也基本上一样,都是为了使用方便来做的。

当然,最终的emit,ack和fail都是通过代理来实现的。


CoordinatedOutputCollector.java

这个类比较奇葩,是定义在CoordinatedBolt的内部类,只有CoordinatedBolt这个类使用。

封装了一个IOutputCollector代理,该代理在构造函数时被初始化。

此类没有重载emit和emitDirect方法,但是在emit和emitDirect方法内部会调用一个名为updateTaskCounts的方法


private void updateTaskCounts(Object id, List<Integer> tasks) {
    synchronized(_tracked) {
        TrackingInfo track = _tracked.get(id);
        if (track != null) {
            Map<Integer, Integer> taskEmittedTuples = track.taskEmittedTuples;
            for(Integer task: tasks) {
                int newCount = get(taskEmittedTuples, task, 0) + 1;
                taskEmittedTuples.put(task, newCount);
            }
        }
    }
}

这个方法主要是更新目标task和向其发送的tuple数量关系,其关系维护在_tracked变量里,关系链为 

tuple_id —> task_id —> num


public void ack(Tuple tuple) {
    Object id = tuple.getValue(0);
    synchronized(_tracked) {
        TrackingInfo track = _tracked.get(id);
        if (track != null)
            track.receivedTuples++;
    }
    boolean failed = checkFinishId(tuple, TupleType.REGULAR);
    if(failed) {
        _delegate.fail(tuple);
    } else {
        _delegate.ack(tuple);
    }
}


将收到的tupleID对应的跟踪信息中receivedTuples(已接收数量)+1 ,然后检查是否已经处理完该tupleID对应的任务,如果检查失败就fail回上一个bolt

TupleType.REGULAR是为了保证不是传递ID的也不是传递数量的流。


public void fail(Tuple tuple) {
    Object id = tuple.getValue(0);
    synchronized(_tracked) {
        TrackingInfo track = _tracked.get(id);
        if (track != null)
            track.failed = true;
    }
    checkFinishId(tuple, TupleType.REGULAR);
    _delegate.fail(tuple);
}


设置跟踪信息failed,然后checkFinishId方法中会fail所有应该ack的tuples,然后删除这个tupleID对应的跟踪信息

相关实践学习
深入解析Docker容器化技术
Docker是一个开源的应用容器引擎,让开发者可以打包他们的应用以及依赖包到一个可移植的容器中,然后发布到任何流行的Linux机器上,也可以实现虚拟化,容器是完全使用沙箱机制,相互之间不会有任何接口。Docker是世界领先的软件容器平台。开发人员利用Docker可以消除协作编码时“在我的机器上可正常工作”的问题。运维人员利用Docker可以在隔离容器中并行运行和管理应用,获得更好的计算密度。企业利用Docker可以构建敏捷的软件交付管道,以更快的速度、更高的安全性和可靠的信誉为Linux和Windows Server应用发布新功能。 在本套课程中,我们将全面的讲解Docker技术栈,从环境安装到容器、镜像操作以及生产环境如何部署开发的微服务应用。本课程由黑马程序员提供。 &nbsp; &nbsp; 相关的阿里云产品:容器服务 ACK 容器服务 Kubernetes 版(简称 ACK)提供高性能可伸缩的容器应用管理能力,支持企业级容器化应用的全生命周期管理。整合阿里云虚拟化、存储、网络和安全能力,打造云端最佳容器化应用运行环境。 了解产品详情: https://www.aliyun.com/product/kubernetes
目录
相关文章
Flutter:创建一个自动调整大小的 TextField
在 Flutter 中,您可以通过以下方式之一根据需要创建自动调整大小的 TextField(或 TextFormField): 将 maxlines 参数设置为 null。如果输入大量文本,文本字段可以永远扩展。 将 minlinesz 设置为 1,MAXLINES 到 N(一个独到之处整数)。文本字段将根据输入文本的长度自动扩展或收缩。如果到达 N**行,它将停止扩展并开始滚动。
1306 0
Flutter:创建一个自动调整大小的 TextField
|
持续交付 虚拟化 Docker
Docker 架构解析:理解 Docker 引擎和容器运行时
Docker 架构解析:理解 Docker 引擎和容器运行时
1788 1
|
云安全 安全 小程序
无影-阿里云第一款云电脑,它拥有超越PC的完美体验
无影是一款面向数字经济时代的生产力工具,基于流式传输服务和容器化架构,可实现随时随地云上办公、海量算力触手可得、海量应用一网打尽,依托阿里云打造云管端一体化安全防护体系,全面保障企业业务和数据安全,拥有超越PC的便捷、流畅、安全、高效体验。
27351 0
无影-阿里云第一款云电脑,它拥有超越PC的完美体验
|
缓存 应用服务中间件 Go
Go打包和部署:从编译到运行的全指南
本文介绍了Go语言项目的打包和部署方法,包括使用`go run`、`go build`和`go install`命令进行编译,以及跨平台交叉编译。文章还提到了编译参数如`-x`、`-n`和`-race`等。此外,还讨论了如何利用第三方工具(如go-bindata)打包非Go文件,以及清理编译缓存和压缩二进制文件。最后,文中展示了如何使用supervisor和Docker进行部署,并提供了Nginx的反向代理配置示例。
4043 1
|
3月前
|
搜索推荐 算法 Java
2025 年互联网大厂校园招聘 JAVA 工程师笔试题及备考要点解析
本文针对互联网大厂校招Java工程师笔试题进行解析,涵盖基础知识、面向对象编程、数据结构与算法、异常处理及集合框架等核心内容。从数据类型、运算符到流程控制语句,从类与对象、继承多态到数组链表、排序算法,再到异常捕获与集合框架应用,结合实际案例深入剖析,助你系统掌握考点,提升应试能力。资源链接:[点此获取](https://pan.quark.cn/s/14fcf913bae6)。
160 9
支付系统43-----支付宝支付-统一收单退款,全额退款这里可以发起一笔或者两笔订单
支付系统43-----支付宝支付-统一收单退款,全额退款这里可以发起一笔或者两笔订单
|
SQL 分布式计算 大数据
MaxCompute产品使用问题之如何修改字段类型
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
220 2
|
JavaScript
原生js实现复选框(全选/全不选/反选)效果【含完整代码】
原生js实现复选框(全选/全不选/反选)效果【含完整代码】
661 0
|
网络协议 Linux
Linux如何查询端口被占用?
在Linux环境中,查询端口占用可使用`netstat`、`lsof`和`ss`命令。`netstat -tulnp | grep 80`显示TCP/UDP监听端口,`lsof -i:80`列出使用80端口的进程,而`ss -tuln | grep 80`是`netstat`的现代替代选项。若需解决端口占用问题,先找出占用进程的ID,然后用`kill -9`命令终止它,或调整服务配置以避免冲突。
897 2
|
网络虚拟化 网络架构 内存技术
vlan的作用及配置命令
vlan的作用及配置命令

热门文章

最新文章