Storm ack和fail机制再论

简介:

之前对这个的理解有些问题,今天用到有仔细梳理了一遍,记录一下

 

首先开启storm tracker机制的前提是,

1. 在spout emit tuple的时候,要加上第3个参数messageid 
2. 在配置中acker数目至少为1 
3. 在bolt emit的时候,要加上第二个参数anchor tuple,以保持tracker链路

 

流程,

1. 当tuple具有messageid时,spout会把该tuple加到pending list里面 
   并发消息给acker,通知acker开始tracker这条tuple

2. 然后再后续的bolt的处理逻辑中,你必须显式的ack或fail所有处理的tuple 
   如果这条tuple在整个DAG图上都成功执行了,那么acker会发现该tuple的track异或值为0 
   于是acker会发ack_message给spout 
   当然如果在DAG图上任意一个节点bolt上fail,那么acker会认为该tuple fail 
   于是acker会发fail_message给spout

3. 当spout收到ack或fail message如何处理, 
    首先是从pending list里面删掉这条tuple,因为无论ack或fail,只要得到结果,这条tuple就没有继续被cache的必要了 
    然后做的事是调用spout.ack或spout.fail 
    所以系统默认是不会做任何事的,甚至是fail后的重发,你也需要在fail里面自己实现 
    如何实现后面看

4. 如果一条tuple没有被ack或fail,最终是会超时的 
    Spout会根据system tick去rotate pending list,对于每个过时的tuple,都调用spout.fail

 

下面的问题就是如何做fail重发,

这个必须用户通过自己处理fail来做,系统是不会自己做的,

public void fail(Object msgId)

看看系统提供的接口,只有msgId这个参数,这里的设计不合理,其实在系统里是有cache整个msg的,只给用户一个messageid,用户如何取得原来的msg

貌似需要自己cache,然后用这个msgId去查询,太坑爹了

阿里自己的Jstorm会提供

public interface IFailValueSpout { void fail(Object msgId, List<object>values); }

这样更合理一些, 可以直接取得系统cache的msg values

本文章摘自博客园,原文发布日期:   2014-06-24 
相关实践学习
通过Ingress进行灰度发布
本场景您将运行一个简单的应用,部署一个新的应用用于新的发布,并通过Ingress能力实现灰度发布。
容器应用与集群管理
欢迎来到《容器应用与集群管理》课程,本课程是“云原生容器Clouder认证“系列中的第二阶段。课程将向您介绍与容器集群相关的概念和技术,这些概念和技术可以帮助您了解阿里云容器服务ACK/ACK Serverless的使用。同时,本课程也会向您介绍可以采取的工具、方法和可操作步骤,以帮助您了解如何基于容器服务ACK Serverless构建和管理企业级应用。 学习完本课程后,您将能够: 掌握容器集群、容器编排的基本概念 掌握Kubernetes的基础概念及核心思想 掌握阿里云容器服务ACK/ACK Serverless概念及使用方法 基于容器服务ACK Serverless搭建和管理企业级网站应用
目录
相关文章
|
1月前
|
消息中间件 Java Kafka
Kafka ACK机制详解!
本文深入剖析了Kafka的ACK机制,涵盖其原理、源码分析及应用场景,并探讨了acks=0、acks=1和acks=all三种级别的优缺点。文中还介绍了ISR(同步副本)的工作原理及其维护机制,帮助读者理解如何在性能与可靠性之间找到最佳平衡。适合希望深入了解Kafka消息传递机制的开发者阅读。
188 0
|
3月前
|
Kubernetes 监控 Perl
在k8S中,自动扩容机制是什么?
在k8S中,自动扩容机制是什么?
|
3月前
|
存储 网络安全 API
【Azure Service Bus】 Service Bus如何确保消息发送成功,发送端是否有Ack机制 
【Azure Service Bus】 Service Bus如何确保消息发送成功,发送端是否有Ack机制 
|
3月前
|
Kubernetes Java 调度
在K8S中,Pod突然挂掉,K8S有什么机制或功能自动清除Pod?
在K8S中,Pod突然挂掉,K8S有什么机制或功能自动清除Pod?
|
3月前
|
Kubernetes 安全 Linux
在k8S中,PodSecurityPolicy 机制能实现哪些安全策略?
在k8S中,PodSecurityPolicy 机制能实现哪些安全策略?
|
3月前
|
Kubernetes 安全 调度
在k8S中, PodSecurityPolicy机制是什么?
在k8S中, PodSecurityPolicy机制是什么?
|
3月前
|
Kubernetes 监控 Perl
在K8S中,RC的机制是什么?
在K8S中,RC的机制是什么?
|
6月前
|
Prometheus 监控 Kubernetes
Kubernetes 集群的监控与日志管理实践深入理解PHP的命名空间与自动加载机制
【5月更文挑战第30天】 在容器化和微服务架构日益普及的背景下,Kubernetes 已成为众多企业的首选容器编排工具。然而,随之而来的挑战是集群的监控与日志管理。本文将深入探讨 Kubernetes 集群监控的最佳实践,包括节点资源使用情况、Pods 健康状态以及网络流量分析等关键指标的监控方法。同时,我们也将讨论日志聚合、存储和查询策略,以确保快速定位问题并优化系统性能。文中将介绍常用的开源工具如 Prometheus 和 Fluentd,并分享如何结合这些工具构建高效、可靠的监控和日志管理系统。
|
6月前
|
消息中间件 Java Spring
五、消息确认机制(ACK)
五、消息确认机制(ACK)
216 1
|
11月前
|
存储 Kubernetes Unix
k8s教程(Volume篇)-CSI存储机制详解
k8s教程(Volume篇)-CSI存储机制详解
1437 0
k8s教程(Volume篇)-CSI存储机制详解

热门文章

最新文章

下一篇
无影云桌面