Storm ack和fail机制再论

简介:

之前对这个的理解有些问题,今天用到有仔细梳理了一遍,记录一下

 

首先开启storm tracker机制的前提是,

1. 在spout emit tuple的时候,要加上第3个参数messageid 
2. 在配置中acker数目至少为1 
3. 在bolt emit的时候,要加上第二个参数anchor tuple,以保持tracker链路

 

流程,

1. 当tuple具有messageid时,spout会把该tuple加到pending list里面 
   并发消息给acker,通知acker开始tracker这条tuple

2. 然后再后续的bolt的处理逻辑中,你必须显式的ack或fail所有处理的tuple 
   如果这条tuple在整个DAG图上都成功执行了,那么acker会发现该tuple的track异或值为0 
   于是acker会发ack_message给spout 
   当然如果在DAG图上任意一个节点bolt上fail,那么acker会认为该tuple fail 
   于是acker会发fail_message给spout

3. 当spout收到ack或fail message如何处理, 
    首先是从pending list里面删掉这条tuple,因为无论ack或fail,只要得到结果,这条tuple就没有继续被cache的必要了 
    然后做的事是调用spout.ack或spout.fail 
    所以系统默认是不会做任何事的,甚至是fail后的重发,你也需要在fail里面自己实现 
    如何实现后面看

4. 如果一条tuple没有被ack或fail,最终是会超时的 
    Spout会根据system tick去rotate pending list,对于每个过时的tuple,都调用spout.fail

 

下面的问题就是如何做fail重发,

这个必须用户通过自己处理fail来做,系统是不会自己做的,

public void fail(Object msgId)

看看系统提供的接口,只有msgId这个参数,这里的设计不合理,其实在系统里是有cache整个msg的,只给用户一个messageid,用户如何取得原来的msg

貌似需要自己cache,然后用这个msgId去查询,太坑爹了

阿里自己的Jstorm会提供

public interface IFailValueSpout { void fail(Object msgId, List<object>values); }

这样更合理一些, 可以直接取得系统cache的msg values

本文章摘自博客园,原文发布日期:   2014-06-24 
相关实践学习
深入解析Docker容器化技术
Docker是一个开源的应用容器引擎,让开发者可以打包他们的应用以及依赖包到一个可移植的容器中,然后发布到任何流行的Linux机器上,也可以实现虚拟化,容器是完全使用沙箱机制,相互之间不会有任何接口。Docker是世界领先的软件容器平台。开发人员利用Docker可以消除协作编码时“在我的机器上可正常工作”的问题。运维人员利用Docker可以在隔离容器中并行运行和管理应用,获得更好的计算密度。企业利用Docker可以构建敏捷的软件交付管道,以更快的速度、更高的安全性和可靠的信誉为Linux和Windows Server应用发布新功能。 在本套课程中,我们将全面的讲解Docker技术栈,从环境安装到容器、镜像操作以及生产环境如何部署开发的微服务应用。本课程由黑马程序员提供。 &nbsp; &nbsp; 相关的阿里云产品:容器服务 ACK 容器服务 Kubernetes 版(简称 ACK)提供高性能可伸缩的容器应用管理能力,支持企业级容器化应用的全生命周期管理。整合阿里云虚拟化、存储、网络和安全能力,打造云端最佳容器化应用运行环境。 了解产品详情: https://www.aliyun.com/product/kubernetes
目录
相关文章
|
11月前
|
消息中间件 Java Kafka
Kafka ACK机制详解!
本文深入剖析了Kafka的ACK机制,涵盖其原理、源码分析及应用场景,并探讨了acks=0、acks=1和acks=all三种级别的优缺点。文中还介绍了ISR(同步副本)的工作原理及其维护机制,帮助读者理解如何在性能与可靠性之间找到最佳平衡。适合希望深入了解Kafka消息传递机制的开发者阅读。
891 0
|
Kubernetes 监控 Perl
在k8S中,自动扩容机制是什么?
在k8S中,自动扩容机制是什么?
|
存储 网络安全 API
【Azure Service Bus】 Service Bus如何确保消息发送成功,发送端是否有Ack机制 
【Azure Service Bus】 Service Bus如何确保消息发送成功,发送端是否有Ack机制 
|
Kubernetes Java 调度
在K8S中,Pod突然挂掉,K8S有什么机制或功能自动清除Pod?
在K8S中,Pod突然挂掉,K8S有什么机制或功能自动清除Pod?
|
Kubernetes 安全 Linux
在k8S中,PodSecurityPolicy 机制能实现哪些安全策略?
在k8S中,PodSecurityPolicy 机制能实现哪些安全策略?
|
Kubernetes 安全 调度
在k8S中, PodSecurityPolicy机制是什么?
在k8S中, PodSecurityPolicy机制是什么?
|
Kubernetes 监控 Perl
在K8S中,RC的机制是什么?
在K8S中,RC的机制是什么?
|
Prometheus 监控 Kubernetes
Kubernetes 集群的监控与日志管理实践深入理解PHP的命名空间与自动加载机制
【5月更文挑战第30天】 在容器化和微服务架构日益普及的背景下,Kubernetes 已成为众多企业的首选容器编排工具。然而,随之而来的挑战是集群的监控与日志管理。本文将深入探讨 Kubernetes 集群监控的最佳实践,包括节点资源使用情况、Pods 健康状态以及网络流量分析等关键指标的监控方法。同时,我们也将讨论日志聚合、存储和查询策略,以确保快速定位问题并优化系统性能。文中将介绍常用的开源工具如 Prometheus 和 Fluentd,并分享如何结合这些工具构建高效、可靠的监控和日志管理系统。
|
消息中间件 Java Spring
五、消息确认机制(ACK)
五、消息确认机制(ACK)
597 1
|
存储 Kubernetes Unix
k8s教程(Volume篇)-CSI存储机制详解
k8s教程(Volume篇)-CSI存储机制详解
2386 0
k8s教程(Volume篇)-CSI存储机制详解