Storm - Guaranteeing message processing

简介:

https://github.com/nathanmarz/storm/wiki/Guaranteeing-message-processing

http://xumingming.sinaapp.com/127/twitter-storm%E5%A6%82%E4%BD%95%E4%BF%9D%E8%AF%81%E6%B6%88%E6%81%AF%E4%B8%8D%E4%B8%A2%E5%A4%B1/

 

这章讨论Storm's reliability capabilities, 如何保证从spout emit出来的所有tuple都被正确的执行(fully processed)?

What does it mean for a message to be "fully processed"?

首先的问题是, 什么叫tuple或message被fully processed? 因为tuple被emit出去后, 可能会被多级bolt处理, 并且bolt也有可能由该tuple生成多组tuples, 所以情况还是比较复杂的 
最终由一个tuple trigger(触发)的所有tuples会形成一个树或DAG(有向无环图)

只有当tuple tree上的所有节点都被成功处理的时候, storm才认为该tuple被fully processed 
如果tuple tree上任一节点失败或者超时, 都被看作该tuple fail, 失败的tuple会被重发 
Storm considers a tuple coming off a spout "fully processed" when the tuple tree has been exhausted and every message in the tree has been processed
A tuple is considered failed when its tree of messages fails to be fully processed within a specified timeout. 
This timeout can be configured on a topology-specific basis using the Config.TOPOLOGY_MESSAGE_TIMEOUT_SECSconfiguration and defaults to 30 seconds.

 

What happens if a message is fully processed or fails to be fully processed?

该机制是如何实现的? 
首先, 所有tuple都有一个唯一标识msgId, 当tuple被emit的时候确定

_collector.emit(new Values("field1", "field2", 3) , msgId);

其次, 看看下面的ISpout接口, 除了获取tuple的nextTuple 
还有ack和fail, 当Storm detect到tuple被fully processed, 会调用ack, 如果超时或detect fail, 则调用fail 
此处需要注意的是, tuple只有在被产生的那个spout task上可以被ack或fail, 具体原因看后面的实现解释就理解了

a tuple will be acked or failed by the exact same Spout task that created it. So if a Spout is executing as many tasks across the cluster, a tuple won't be acked or failed by a different task than the one that created it.

public interface ISpout extends Serializable {
    void open(Map conf, TopologyContext context, SpoutOutputCollector collector);
    void close();
    void nextTuple();
    void ack(Object msgId);
    void fail(Object msgId);
}

最后, 在spout怎么实现的, 其实比较简单. 
对于Spout queue, get message只是open而不是pop, 并且把tuple状态改为pending, 防止该tuple被多次发送. 
一直等到该tuple被ack, 才真正的pop该tuple, 当然该tuple如果fail, 就重新把状态改回初始状态 
这也解释, 为什么tuple只能在被emit的spout task被ack或fail, 因为只有这个task的queue里面有该tuple

When KestrelSpout takes a message off the Kestrel queue, it "opens" the message. 
This means the message is not actually taken off the queue yet, but instead placed in a "pending" statewaiting for acknowledgement that the message is completed. 
While in the pending state, a message will not be sent to other consumers of the queue. Additionally, if a client disconnects all pending messages for that client are put back on the queue.

 

What is Storm's reliability API?

前面一直没有说明的一个问题是, storm本身通过什么机制来判断tuple是否成功被fully processed?

要解决这个问题, 可以分为两个问题, 
1. 如何知道tuple tree的结构? 
2. 如何知道tuple tree上每个节点的运行情况, success或fail?

答案很简单, 你必须告诉它, 如何告诉它? 
1. 对于tuple tree的结构, 需要知道每个tuple由哪些tuple产生, 即tree节点间的link 
   tree节点间的link称为anchoring. 当每次emit新tuple的时候, 必须显式的通过API建立anchoring

Specifying a link in the tuple tree is called anchoring. Anchoring is done at the same time you emit a new tuple. 
Each word tuple is anchored by specifying the input tuple as the first argument to emit.

看下面的代码例子,

_collector.emit(tuple, new Values(word)); 

emit的第一个参数是tuple, 这就是用于建anchoring 
当然你也可以直接调用unanchoring的emit版本, 如果不需要保证reliable的话, 这样效率会比较高

_collector.emit(new Values(word));

同时前面说了, 可能一个tuple依赖于多个输入,

An output tuple can be anchored to more than one input tuple. 
This is useful when doing streaming joins or aggregations. A multi-anchored tuple failing to be processed will cause multiple tuples to be replayed from the spouts.

List<Tuple> anchors = new ArrayList<Tuple>();
anchors.add(tuple1);
anchors.add(tuple2);
_collector.emit(anchors, new Values(1, 2, 3));

对于Multi-anchoring的情况会导致tuple tree变为tuple DGA, 当前storm的版本已经可以很好的支持DAG 
Multi-anchoring adds the output tuple into multiple tuple trees. 
Note that it's also possible for multi-anchoring to break the tree structure and create tuple DAGs,

image

2. 对于tuple tree上每个节点的运行情况, 你需要在每个bolt的逻辑处理完后, 显式的调用OutputCollector的ack和fail来汇报  

This is done by using the ack and fail methods on the OutputCollector
You can use the fail method on the OutputCollector to immediately fail the spout tuple at the root of the tuple tree.

看下面的例子, 在execute函数的最后会调用, 
_collector.ack(tuple);

我比较迷惑, 为啥ack是OutputCollector的function, 而不是tuple的function? 
而且就算ack也是应该对bolt的input进行ack, 为啥是output, 可能因为所有input都是其他bolt的output产生...这个设计的比较不合理

public class SplitSentence extends BaseRichBolt {
        OutputCollector _collector;
        
        public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
            _collector = collector;
        }

        public void execute(Tuple tuple) {
            String sentence = tuple.getString(0);
            for(String word: sentence.split(" ")) {
                _collector.emit(tuple, new Values(word));
            }
            _collector.ack(tuple);
        }

        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word"));
        }        
    }

storm为了保证reliable, 必然是要牺牲效率的, 此处storm会在task memory里面去记录你汇报的tuple tree的结构和运行情况. 
而只有当某tuple节点被ack或fail后才会被从内存中删除, 所以如果你总是不去ack或fail, 那么会导致task的out of memory

Every tuple you process must be acked or failed. Storm uses memory to track each tuple, so if you don't ack/fail every tuple, the task will eventually run out of memory.

 

简单的版本, BasicBolt

面的机制, 会给程序员造成负担, 尤其对于很多简单的case, 比如filter, 每次都要去显式的建立anchoring和ack…

所以storm提供简单的版本, 会自动的建立anchoring, 并在bolt执行完自动调用ack

A lot of bolts follow a common pattern of reading an input tuple, emitting tuples based on it, and then acking the tuple at the end of the execute method. These bolts fall into the categories of filters and simple functions. Storm has an interface called BasicBolt that encapsulates this pattern for you.

public class SplitSentence extends BaseBasicBolt {
        public void execute(Tuple tuple, BasicOutputCollector collector) {
            String sentence = tuple.getString(0);
            for(String word: sentence.split(" ")) {
                collector.emit(new Values(word));
            }
        }

        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word"));
        }        
    }

How do I make my applications work correctly given that tuples can be replayed?

问题是如何保证"fully fault-tolerant exactly-once messaging semantics”, 因为replay会导致一个message在bolt上多次出现, 这样对类似计数这样的应用会有很大影响. 
从Storm0.7开始, 给出的transactional topologies功能就比较好的解决这个问题

As always in software design, the answer is "it depends." Storm 0.7.0 introduced the "transactional topologies" feature, which enables you to get fully fault-tolerant exactly-once messaging semantics for most computations. Read more about transactional topologies here.

 

How does Storm implement reliability in an efficient way?

现在讨论的是Storm如何实现reliablility机制, Storm实现一组特殊的'acker’ task来track每一个spout tuple, 同时acker task的个数你可以根据tuple的数量来配置

A Storm topology has a set of special "acker" tasks that track the DAG of tuples for every spout tuple. 
When an acker sees that a DAG is complete, it sends a message to the spout task that created the spout tuple to ack the message. 
You can set the number of acker tasks for a topology in the topology configuration using Config.TOPOLOGY_ACKERS. Storm defaults TOPOLOGY_ACKERS to one task -- you will need to increase this number for topologies processing large amounts of messages.

所有被产生的tuple都会有一个随机的64bit的id用于被track 
tuple之间通过emit时的anchor形成tuple tree, 并且每个tuple都知道产生它的spout tuple的id (通过不断的copy传递)

当任何tuple被acked的时候, 都会send message到相应的acker, 具体例子如下图

When a tuple is created in a topology, whether in a spout or a bolt, it is given a random 64 bit id. These ids are used by ackers to track the tuple DAG for every spout tuple.

Every tuple knows the ids of all the spout tuples for which it exists in their tuple trees. When you emit a new tuple in a bolt, the spout tuple ids from the tuple's anchors are copied into the new tuple. When a tuple is acked, it sends a message to the appropriate acker tasks with information about how the tuple tree changed. In particular it tells the acker "I am now completed within the tree for this spout tuple, and here are the new tuples in the tree that were anchored to me".

For example, if tuples "D" and "E" were created based on tuple "C", here's how the tuple tree changes when "C" is acked:

image

 

当然storm具体怎样通过acker task来track所有的tuples, 还需要解决下面几个问题:

1. 当有多个acker的时候, 当一个tuple被acked的时候, 如果知道给哪一个acker发送message? 
因为每个tuple都知道产生它的spout tuple id, 所以使用mod hash(hash方法, m mod n)来分配spout tuple id, 以保证一个spout tuple id所产生的所有tuple tree都会被分配到一个acker上 
当某一个tuple被acked的时候, 只要通过hash找到相应的acker即可

You can have an arbitrary number of acker tasks in a topology. This leads to the following question: when a tuple is acked in the topology, how does it know to which acker task to send that information? Storm uses mod hashing to map a spout tuple id to an acker task. Since every tuple carries with it the spout tuple ids of all the trees they exist within, they know which acker tasks to communicate with.

2. 如果有多个spout task的时候, storm在最终ack spout tuple的时候, 如何知道对应于哪个spout task, 因为必须在产生tuple的那个spout task进行ack? 
答案很简单, spout task在emit一个新的tuple的时候, 会发message告诉相应的acker它的task id, 所以acker是知道tupleid和taskid的map的

How the acker tasks track which spout tasks are responsible for each spout tuple they're tracking?

When a spout task emits a new tuple, it simply sends a message to the appropriate acker telling it that its task id is responsible for that spout tuple. Then when an acker sees a tree has been completed, it knows to which task id to send the completion message.

3. 如果Acker在内存里面显式的监控所有的tuple tree, 会有扩展问题, 当面对海量tuple或复杂workflow的时候, 很有可能会爆内存, 怎么解决这个问题? 
Storm这里采用了一个特别的方法, 这个是storm的主要的突破之一, 该方法的好处就是对于每个spout tuple, 所需要的内存是固定的无论多复杂, 并且只有about 20 bytes 
Acker只需要为每个spout tuple存储spout tuple id, task id, ack val 
这个ack val, 64 bit number, 用于表示整个tuple tree的状况, 产生方法是tuple tree中所有created和acked的tuple的id进行异或(同为0, 异为1) 
当ack val值为0的时候, 即表示tuple tree被完成

这个思路非常巧妙, 两个相同的数去异或为0, 而created和acked时, 会进行两次异或, 所以所有created的tuple都被acked时, 异或值最终为0 
我考虑到不同的tupleid之间的位有重叠时, 是否会有干扰, 简单的试一下, 没有干扰

具体acker工作原理参考, Twitter Storm源代码分析之acker工作流程

Acker tasks do not track the tree of tuples explicitly. For large tuple trees with tens of thousands of nodes (or more), tracking all the tuple trees could overwhelm the memory used by the ackers. Instead, the ackers take a different strategy that only requires a fixed amount of space per spout tuple (about 20 bytes). This tracking algorithm is the key to how Storm works and is one of its major breakthroughs. An acker task stores a map from a spout tuple id to a pair of values. The first value is the task id that created the spout tuple which is used later on to send completion messages. The second value is a 64 bit number called the "ack val". The ack val is a representation of the state of the entire tuple tree, no matter how big or how small. It is simply the xor of all tuple ids that have been created and/or acked in the tree. When an acker task sees that an "ack val" has become 0, then it knows that the tuple tree is completed.

 

最后, 考虑task fail的情况, 
一般task fail, 导致超时, spout会replay 
Acker task fail, 会导致它跟踪的所有tuple无法被ack, 所以会全部超时被spout重发 
Spout task fail, 如果spout本身fail, 那么需要源头来负责replay, 比如RabbitMQ或Kafka

Now that you understand the reliability algorithm, let's go over all the failure cases and see how in each case Storm avoids data loss:

  • Task dies: In this case the spout tuple ids at the root of the trees for the failed tuple will time out and be replayed.
  • Acker task dies: In this case all the spout tuples the acker was tracking will time out and be replayed.
  • Spout task dies: In this case the source that the spout talks to is responsible for replaying the messages. For example, queues like Kestrel and RabbitMQ will place all pending messages back on the queue when a client disconnects.

As you have seen, Storm's reliability mechanisms are completely distributed, scalable, and fault-tolerant.

 

Tuning reliability

当然reliability必然会给系统带来较大的overload, 比如number of messages就会翻倍, 由于和acker之间的通信 
所以如果不需要reliability, 可以通过下面的方法将其关闭

Acker tasks are lightweight, so you don't need very many of them in a topology. You can track their performance through the Storm UI (component id "__acker"). If the throughput doesn't look right, you'll need to add more acker tasks.

If reliability isn't important to you -- that is, you don't care about losing tuples in failure situations -- then you can improve performance by not tracking the tuple tree for spout tuples. Not tracking a tuple tree halves the number of messages transferred since normally there's an ack message for every tuple in the tuple tree. Additionally, it requires less ids to be kept in each downstream tuple, reducing bandwidth usage.

There are three ways to remove reliability.

1. The first is to set Config.TOPOLOGY_ACKERS to 0. In this case, Storm will call the ack method on the spout immediately after the spout emits a tuple. The tuple tree won't be tracked.

2. The second way is to omit a message id in the SpoutOutputCollector.emit method.

3. Finally, emit them as unanchored tuples


本文章摘自博客园,原文发布日期:2013-05-08

相关实践学习
深入解析Docker容器化技术
Docker是一个开源的应用容器引擎,让开发者可以打包他们的应用以及依赖包到一个可移植的容器中,然后发布到任何流行的Linux机器上,也可以实现虚拟化,容器是完全使用沙箱机制,相互之间不会有任何接口。Docker是世界领先的软件容器平台。开发人员利用Docker可以消除协作编码时“在我的机器上可正常工作”的问题。运维人员利用Docker可以在隔离容器中并行运行和管理应用,获得更好的计算密度。企业利用Docker可以构建敏捷的软件交付管道,以更快的速度、更高的安全性和可靠的信誉为Linux和Windows Server应用发布新功能。 在本套课程中,我们将全面的讲解Docker技术栈,从环境安装到容器、镜像操作以及生产环境如何部署开发的微服务应用。本课程由黑马程序员提供。 &nbsp; &nbsp; 相关的阿里云产品:容器服务 ACK 容器服务 Kubernetes 版(简称 ACK)提供高性能可伸缩的容器应用管理能力,支持企业级容器化应用的全生命周期管理。整合阿里云虚拟化、存储、网络和安全能力,打造云端最佳容器化应用运行环境。 了解产品详情: https://www.aliyun.com/product/kubernetes
目录
相关文章
|
安全 区块链 数据安全/隐私保护
一文盘点知名DAO及其管理机制
DAO(Decentralized Autonomous Organization,去中心化组织)代表了一种全新的人类组织协同方式,该组织围绕一个共同目标建立,通过在区块链上执行的一套共享规则进行协调和合作。这些组织通常会根据具体目标或任务汇集投资资金。值得注意的是,DAO是自治的。
|
机器学习/深度学习 存储 算法
神经网络分类算法原理详解
神经网络分类算法原理详解
|
3月前
|
JSON API 数据安全/隐私保护
深度分析淘宝卖家订单详情API接口,用json返回数据
淘宝卖家订单详情API(taobao.trade.fullinfo.get)是淘宝开放平台提供的重要接口,用于获取单个订单的完整信息,包括订单状态、买家信息、商品明细、支付与物流信息等,支撑订单管理、ERP对接及售后处理。需通过appkey、appsecret和session认证,并遵守调用频率与数据权限限制。本文详解其使用方法并附Python调用示例。
|
2月前
|
监控 算法 API
亚马逊商品列表API开发指南
亚马逊商品列表API助力开发者批量获取商品数据,支持市场分析、竞品监控与推荐系统。涵盖商品信息、价格、库存等,提供Python调用示例,快速接入。
|
6月前
|
存储 JavaScript 安全
【HarmonyOS Next之旅】HarmonyOS开发基础知识(二)
本文主要介绍了HarmonyOS应用的配置文件说明、应用数据管理和应用安全管理三大核心内容。在配置文件说明部分,详细解析了“config.json”中app、deviceConfig和module三个关键组成部分的结构与功能,涵盖版本管理、设备配置及模块定义等细节。应用数据管理部分阐述了本地数据存储、分布式数据服务、文件共享及搜索服务等功能,确保跨设备数据一致性。应用安全管理则从开发准备、编码安全、权限使用到发布分发全流程,提供了保障应用安全的最佳实践。内容全面覆盖了HarmonyOS应用开发的核心环节,为开发者提供了详尽指导。
362 1
|
缓存 安全 网络安全
2021年中职“网络安全“江西省赛题—A模块解析
2021年中职“网络安全“江西省赛题—A模块解析
585 1
2021年中职“网络安全“江西省赛题—A模块解析
|
安全 物联网 Linux
操作系统的心脏——内核
【10月更文挑战第22天】 本文将深入探讨操作系统的核心组成部分——内核,包括其定义、功能、类型以及在现代计算中的重要性。通过了解内核的工作原理和设计哲学,我们可以更好地理解计算机是如何执行任务和管理资源的。
840 2
|
安全 Linux Shell
Kali渗透测试:使用Metasploit对Web应用的攻击
Kali渗透测试:使用Metasploit对Web应用的攻击
412 4
|
开发者 图形学 API
从零起步,深度揭秘:运用Unity引擎及网络编程技术,一步步搭建属于你的实时多人在线对战游戏平台——详尽指南与实战代码解析,带你轻松掌握网络化游戏开发的核心要领与最佳实践路径
【8月更文挑战第31天】构建实时多人对战平台是技术与创意的结合。本文使用成熟的Unity游戏开发引擎,从零开始指导读者搭建简单的实时对战平台。内容涵盖网络架构设计、Unity网络API应用及客户端与服务器通信。首先,创建新项目并选择适合多人游戏的模板,使用推荐的网络传输层。接着,定义基本玩法,如2D多人射击游戏,创建角色预制件并添加Rigidbody2D组件。然后,引入网络身份组件以同步对象状态。通过示例代码展示玩家控制逻辑,包括移动和发射子弹功能。最后,设置服务器端逻辑,处理客户端连接和断开。本文帮助读者掌握构建Unity多人对战平台的核心知识,为进一步开发打下基础。
691 0
|
JSON 数据格式 计算机视觉
Opencv实用笔记(一): 获取并绘制JSON标注文件目标区域(可单独保存目标小图)
本文介绍了如何使用OpenCV和Python根据JSON标注文件获取并绘制目标区域,同时可将裁剪的图像单独保存。通过示例代码,展示了如何读取图片路径、解析JSON标注、绘制标注框并保存裁剪图像的过程。此外,还提供了相关的博客链接,供读者进一步学习。
385 0