Storm - Guaranteeing message processing

本文涉及的产品
容器服务 Serverless 版 ACK Serverless,952元额度 多规格
容器服务 Serverless 版 ACK Serverless,317元额度 多规格
简介:

https://github.com/nathanmarz/storm/wiki/Guaranteeing-message-processing

http://xumingming.sinaapp.com/127/twitter-storm%E5%A6%82%E4%BD%95%E4%BF%9D%E8%AF%81%E6%B6%88%E6%81%AF%E4%B8%8D%E4%B8%A2%E5%A4%B1/

 

这章讨论Storm's reliability capabilities, 如何保证从spout emit出来的所有tuple都被正确的执行(fully processed)?

What does it mean for a message to be "fully processed"?

首先的问题是, 什么叫tuple或message被fully processed? 因为tuple被emit出去后, 可能会被多级bolt处理, 并且bolt也有可能由该tuple生成多组tuples, 所以情况还是比较复杂的 
最终由一个tuple trigger(触发)的所有tuples会形成一个树或DAG(有向无环图)

只有当tuple tree上的所有节点都被成功处理的时候, storm才认为该tuple被fully processed 
如果tuple tree上任一节点失败或者超时, 都被看作该tuple fail, 失败的tuple会被重发 
Storm considers a tuple coming off a spout "fully processed" when the tuple tree has been exhausted and every message in the tree has been processed
A tuple is considered failed when its tree of messages fails to be fully processed within a specified timeout. 
This timeout can be configured on a topology-specific basis using the Config.TOPOLOGY_MESSAGE_TIMEOUT_SECSconfiguration and defaults to 30 seconds.

 

What happens if a message is fully processed or fails to be fully processed?

该机制是如何实现的? 
首先, 所有tuple都有一个唯一标识msgId, 当tuple被emit的时候确定

_collector.emit(new Values("field1", "field2", 3) , msgId);

其次, 看看下面的ISpout接口, 除了获取tuple的nextTuple 
还有ack和fail, 当Storm detect到tuple被fully processed, 会调用ack, 如果超时或detect fail, 则调用fail 
此处需要注意的是, tuple只有在被产生的那个spout task上可以被ack或fail, 具体原因看后面的实现解释就理解了

a tuple will be acked or failed by the exact same Spout task that created it. So if a Spout is executing as many tasks across the cluster, a tuple won't be acked or failed by a different task than the one that created it.

public interface ISpout extends Serializable {
    void open(Map conf, TopologyContext context, SpoutOutputCollector collector);
    void close();
    void nextTuple();
    void ack(Object msgId);
    void fail(Object msgId);
}

最后, 在spout怎么实现的, 其实比较简单. 
对于Spout queue, get message只是open而不是pop, 并且把tuple状态改为pending, 防止该tuple被多次发送. 
一直等到该tuple被ack, 才真正的pop该tuple, 当然该tuple如果fail, 就重新把状态改回初始状态 
这也解释, 为什么tuple只能在被emit的spout task被ack或fail, 因为只有这个task的queue里面有该tuple

When KestrelSpout takes a message off the Kestrel queue, it "opens" the message. 
This means the message is not actually taken off the queue yet, but instead placed in a "pending" statewaiting for acknowledgement that the message is completed. 
While in the pending state, a message will not be sent to other consumers of the queue. Additionally, if a client disconnects all pending messages for that client are put back on the queue.

 

What is Storm's reliability API?

前面一直没有说明的一个问题是, storm本身通过什么机制来判断tuple是否成功被fully processed?

要解决这个问题, 可以分为两个问题, 
1. 如何知道tuple tree的结构? 
2. 如何知道tuple tree上每个节点的运行情况, success或fail?

答案很简单, 你必须告诉它, 如何告诉它? 
1. 对于tuple tree的结构, 需要知道每个tuple由哪些tuple产生, 即tree节点间的link 
   tree节点间的link称为anchoring. 当每次emit新tuple的时候, 必须显式的通过API建立anchoring

Specifying a link in the tuple tree is called anchoring. Anchoring is done at the same time you emit a new tuple. 
Each word tuple is anchored by specifying the input tuple as the first argument to emit.

看下面的代码例子,

_collector.emit(tuple, new Values(word)); 

emit的第一个参数是tuple, 这就是用于建anchoring 
当然你也可以直接调用unanchoring的emit版本, 如果不需要保证reliable的话, 这样效率会比较高

_collector.emit(new Values(word));

同时前面说了, 可能一个tuple依赖于多个输入,

An output tuple can be anchored to more than one input tuple. 
This is useful when doing streaming joins or aggregations. A multi-anchored tuple failing to be processed will cause multiple tuples to be replayed from the spouts.

List<Tuple> anchors = new ArrayList<Tuple>();
anchors.add(tuple1);
anchors.add(tuple2);
_collector.emit(anchors, new Values(1, 2, 3));

对于Multi-anchoring的情况会导致tuple tree变为tuple DGA, 当前storm的版本已经可以很好的支持DAG 
Multi-anchoring adds the output tuple into multiple tuple trees. 
Note that it's also possible for multi-anchoring to break the tree structure and create tuple DAGs,

image

2. 对于tuple tree上每个节点的运行情况, 你需要在每个bolt的逻辑处理完后, 显式的调用OutputCollector的ack和fail来汇报  

This is done by using the ack and fail methods on the OutputCollector
You can use the fail method on the OutputCollector to immediately fail the spout tuple at the root of the tuple tree.

看下面的例子, 在execute函数的最后会调用, 
_collector.ack(tuple);

我比较迷惑, 为啥ack是OutputCollector的function, 而不是tuple的function? 
而且就算ack也是应该对bolt的input进行ack, 为啥是output, 可能因为所有input都是其他bolt的output产生...这个设计的比较不合理

public class SplitSentence extends BaseRichBolt {
        OutputCollector _collector;
        
        public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
            _collector = collector;
        }

        public void execute(Tuple tuple) {
            String sentence = tuple.getString(0);
            for(String word: sentence.split(" ")) {
                _collector.emit(tuple, new Values(word));
            }
            _collector.ack(tuple);
        }

        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word"));
        }        
    }

storm为了保证reliable, 必然是要牺牲效率的, 此处storm会在task memory里面去记录你汇报的tuple tree的结构和运行情况. 
而只有当某tuple节点被ack或fail后才会被从内存中删除, 所以如果你总是不去ack或fail, 那么会导致task的out of memory

Every tuple you process must be acked or failed. Storm uses memory to track each tuple, so if you don't ack/fail every tuple, the task will eventually run out of memory.

 

简单的版本, BasicBolt

面的机制, 会给程序员造成负担, 尤其对于很多简单的case, 比如filter, 每次都要去显式的建立anchoring和ack…

所以storm提供简单的版本, 会自动的建立anchoring, 并在bolt执行完自动调用ack

A lot of bolts follow a common pattern of reading an input tuple, emitting tuples based on it, and then acking the tuple at the end of the execute method. These bolts fall into the categories of filters and simple functions. Storm has an interface called BasicBolt that encapsulates this pattern for you.

public class SplitSentence extends BaseBasicBolt {
        public void execute(Tuple tuple, BasicOutputCollector collector) {
            String sentence = tuple.getString(0);
            for(String word: sentence.split(" ")) {
                collector.emit(new Values(word));
            }
        }

        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word"));
        }        
    }

How do I make my applications work correctly given that tuples can be replayed?

问题是如何保证"fully fault-tolerant exactly-once messaging semantics”, 因为replay会导致一个message在bolt上多次出现, 这样对类似计数这样的应用会有很大影响. 
从Storm0.7开始, 给出的transactional topologies功能就比较好的解决这个问题

As always in software design, the answer is "it depends." Storm 0.7.0 introduced the "transactional topologies" feature, which enables you to get fully fault-tolerant exactly-once messaging semantics for most computations. Read more about transactional topologies here.

 

How does Storm implement reliability in an efficient way?

现在讨论的是Storm如何实现reliablility机制, Storm实现一组特殊的'acker’ task来track每一个spout tuple, 同时acker task的个数你可以根据tuple的数量来配置

A Storm topology has a set of special "acker" tasks that track the DAG of tuples for every spout tuple. 
When an acker sees that a DAG is complete, it sends a message to the spout task that created the spout tuple to ack the message. 
You can set the number of acker tasks for a topology in the topology configuration using Config.TOPOLOGY_ACKERS. Storm defaults TOPOLOGY_ACKERS to one task -- you will need to increase this number for topologies processing large amounts of messages.

所有被产生的tuple都会有一个随机的64bit的id用于被track 
tuple之间通过emit时的anchor形成tuple tree, 并且每个tuple都知道产生它的spout tuple的id (通过不断的copy传递)

当任何tuple被acked的时候, 都会send message到相应的acker, 具体例子如下图

When a tuple is created in a topology, whether in a spout or a bolt, it is given a random 64 bit id. These ids are used by ackers to track the tuple DAG for every spout tuple.

Every tuple knows the ids of all the spout tuples for which it exists in their tuple trees. When you emit a new tuple in a bolt, the spout tuple ids from the tuple's anchors are copied into the new tuple. When a tuple is acked, it sends a message to the appropriate acker tasks with information about how the tuple tree changed. In particular it tells the acker "I am now completed within the tree for this spout tuple, and here are the new tuples in the tree that were anchored to me".

For example, if tuples "D" and "E" were created based on tuple "C", here's how the tuple tree changes when "C" is acked:

image

 

当然storm具体怎样通过acker task来track所有的tuples, 还需要解决下面几个问题:

1. 当有多个acker的时候, 当一个tuple被acked的时候, 如果知道给哪一个acker发送message? 
因为每个tuple都知道产生它的spout tuple id, 所以使用mod hash(hash方法, m mod n)来分配spout tuple id, 以保证一个spout tuple id所产生的所有tuple tree都会被分配到一个acker上 
当某一个tuple被acked的时候, 只要通过hash找到相应的acker即可

You can have an arbitrary number of acker tasks in a topology. This leads to the following question: when a tuple is acked in the topology, how does it know to which acker task to send that information? Storm uses mod hashing to map a spout tuple id to an acker task. Since every tuple carries with it the spout tuple ids of all the trees they exist within, they know which acker tasks to communicate with.

2. 如果有多个spout task的时候, storm在最终ack spout tuple的时候, 如何知道对应于哪个spout task, 因为必须在产生tuple的那个spout task进行ack? 
答案很简单, spout task在emit一个新的tuple的时候, 会发message告诉相应的acker它的task id, 所以acker是知道tupleid和taskid的map的

How the acker tasks track which spout tasks are responsible for each spout tuple they're tracking?

When a spout task emits a new tuple, it simply sends a message to the appropriate acker telling it that its task id is responsible for that spout tuple. Then when an acker sees a tree has been completed, it knows to which task id to send the completion message.

3. 如果Acker在内存里面显式的监控所有的tuple tree, 会有扩展问题, 当面对海量tuple或复杂workflow的时候, 很有可能会爆内存, 怎么解决这个问题? 
Storm这里采用了一个特别的方法, 这个是storm的主要的突破之一, 该方法的好处就是对于每个spout tuple, 所需要的内存是固定的无论多复杂, 并且只有about 20 bytes 
Acker只需要为每个spout tuple存储spout tuple id, task id, ack val 
这个ack val, 64 bit number, 用于表示整个tuple tree的状况, 产生方法是tuple tree中所有created和acked的tuple的id进行异或(同为0, 异为1) 
当ack val值为0的时候, 即表示tuple tree被完成

这个思路非常巧妙, 两个相同的数去异或为0, 而created和acked时, 会进行两次异或, 所以所有created的tuple都被acked时, 异或值最终为0 
我考虑到不同的tupleid之间的位有重叠时, 是否会有干扰, 简单的试一下, 没有干扰

具体acker工作原理参考, Twitter Storm源代码分析之acker工作流程

Acker tasks do not track the tree of tuples explicitly. For large tuple trees with tens of thousands of nodes (or more), tracking all the tuple trees could overwhelm the memory used by the ackers. Instead, the ackers take a different strategy that only requires a fixed amount of space per spout tuple (about 20 bytes). This tracking algorithm is the key to how Storm works and is one of its major breakthroughs. An acker task stores a map from a spout tuple id to a pair of values. The first value is the task id that created the spout tuple which is used later on to send completion messages. The second value is a 64 bit number called the "ack val". The ack val is a representation of the state of the entire tuple tree, no matter how big or how small. It is simply the xor of all tuple ids that have been created and/or acked in the tree. When an acker task sees that an "ack val" has become 0, then it knows that the tuple tree is completed.

 

最后, 考虑task fail的情况, 
一般task fail, 导致超时, spout会replay 
Acker task fail, 会导致它跟踪的所有tuple无法被ack, 所以会全部超时被spout重发 
Spout task fail, 如果spout本身fail, 那么需要源头来负责replay, 比如RabbitMQ或Kafka

Now that you understand the reliability algorithm, let's go over all the failure cases and see how in each case Storm avoids data loss:

  • Task dies: In this case the spout tuple ids at the root of the trees for the failed tuple will time out and be replayed.
  • Acker task dies: In this case all the spout tuples the acker was tracking will time out and be replayed.
  • Spout task dies: In this case the source that the spout talks to is responsible for replaying the messages. For example, queues like Kestrel and RabbitMQ will place all pending messages back on the queue when a client disconnects.

As you have seen, Storm's reliability mechanisms are completely distributed, scalable, and fault-tolerant.

 

Tuning reliability

当然reliability必然会给系统带来较大的overload, 比如number of messages就会翻倍, 由于和acker之间的通信 
所以如果不需要reliability, 可以通过下面的方法将其关闭

Acker tasks are lightweight, so you don't need very many of them in a topology. You can track their performance through the Storm UI (component id "__acker"). If the throughput doesn't look right, you'll need to add more acker tasks.

If reliability isn't important to you -- that is, you don't care about losing tuples in failure situations -- then you can improve performance by not tracking the tuple tree for spout tuples. Not tracking a tuple tree halves the number of messages transferred since normally there's an ack message for every tuple in the tuple tree. Additionally, it requires less ids to be kept in each downstream tuple, reducing bandwidth usage.

There are three ways to remove reliability.

1. The first is to set Config.TOPOLOGY_ACKERS to 0. In this case, Storm will call the ack method on the spout immediately after the spout emits a tuple. The tuple tree won't be tracked.

2. The second way is to omit a message id in the SpoutOutputCollector.emit method.

3. Finally, emit them as unanchored tuples


本文章摘自博客园,原文发布日期:2013-05-08

相关实践学习
通过Ingress进行灰度发布
本场景您将运行一个简单的应用,部署一个新的应用用于新的发布,并通过Ingress能力实现灰度发布。
容器应用与集群管理
欢迎来到《容器应用与集群管理》课程,本课程是“云原生容器Clouder认证“系列中的第二阶段。课程将向您介绍与容器集群相关的概念和技术,这些概念和技术可以帮助您了解阿里云容器服务ACK/ACK Serverless的使用。同时,本课程也会向您介绍可以采取的工具、方法和可操作步骤,以帮助您了解如何基于容器服务ACK Serverless构建和管理企业级应用。 学习完本课程后,您将能够: 掌握容器集群、容器编排的基本概念 掌握Kubernetes的基础概念及核心思想 掌握阿里云容器服务ACK/ACK Serverless概念及使用方法 基于容器服务ACK Serverless搭建和管理企业级网站应用
目录
相关文章
|
6月前
|
消息中间件 中间件 Kafka
Kafka - TimeoutException: Expiring 1 record(s) for art-0:120001 ms has passed since batch creation
Kafka - TimeoutException: Expiring 1 record(s) for art-0:120001 ms has passed since batch creation
899 0
|
消息中间件 Kafka 流计算
Flink读取Kafka报Error sending fetch request
实时计算Flink读取消息队列Kafka,flink日志中出现Error sending fetch request (sessionId=1510763375, epoch=12890978) to node 103: {}. org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.DisconnectException: null
11468 3
Flink读取Kafka报Error sending fetch request
|
6月前
|
消息中间件 Kubernetes Java
实时计算 Flink版操作报错合集之写入 Kafka 报错 "Failed to send data to Kafka: Failed to allocate memory within the configured max blocking time 60000 ms",该怎么解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
463 0
|
6月前
|
SQL 关系型数据库 数据库
实时计算 Flink版操作报错合集之遇到报错:"An OperatorEvent from an OperatorCoordinator to a task was lost. Triggering task failover to ensure consistency." ,该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
139 0
|
存储 API 流计算
Flink - checkpoint Failure reason: Not all required tasks are currently running
Flink 程序增加 readFile 生成文件流后,最初运行期间 CheckPoint 存储没有问题,待文件流 Finished 后 CheckPoint 存储报错: checkpoint Failure reason: Not all required tasks are currently running,下面分析并解决下。
2766 0
Flink - checkpoint Failure reason: Not all required tasks are currently running
|
消息中间件 Kafka Apache
《Apache Kafka and the Rise of Stream Processing》电子版地址
Apache Kafka and the Rise of Stream Processing
69 0
《Apache Kafka and the Rise of Stream Processing》电子版地址
|
存储 SQL Java
Stream Processing with Apache Flink | 学习笔记(五)
快速学习 Stream Processing with Apache Flink
Stream Processing with Apache Flink | 学习笔记(五)
|
存储 Java Apache
Stream Processing with Apache Flink | 学习笔记(四)
快速学习 Stream Processing with Apache Flink
|
机器学习/深度学习 SQL 存储
Flink Next:Beyond Stream Processing
Apache Flink 中文社区发起人王峰在 FFA 2021 的分享
Flink Next:Beyond Stream Processing
SAP MM Error message - Customizing incorrectly maintained – in transaction code ML81N
SAP MM Error message - Customizing incorrectly maintained – in transaction code ML81N
SAP MM Error message - Customizing incorrectly maintained – in transaction code ML81N