Storm-源码分析-acker (backtype.storm.daemon.acker)

简介:

backtype.storm.daemon.acker 
设计的巧妙在于, 不用分别记录和track, stream过程中所有的tuple, 而只需要track root tuple, 而所有中间过程都通过异或更新track entry

acker-init, 在spout发送一个tuple时触发, 初始化这个root tuple的track entry  
acker-ack, 在blot ack一个tuple的时候触发, 会对该tuple的anchors-to-ids中记录的每个(root, edge)进行ack, 并出于优化还会附带登记新的edge(对acker透明, 在发送前已经完成) 
acker-fail, 任一个过程中的tuple fail, 都会导致这个root tuple失败

 

(defn mk-acker-bolt []
  (let [output-collector (MutableObject.)
        pending (MutableObject.)]
    (reify IBolt
      (^void prepare [this ^Map storm-conf ^TopologyContext context ^OutputCollector collector]
               (.setObject output-collector collector)
               (.setObject pending (RotatingMap. 2)) ;;用RotatingMap来缓存每个tuple的track信息
               )
      (^void execute [this ^Tuple tuple]
             (let [^RotatingMap pending (.getObject pending)
                   stream-id (.getSourceStreamId tuple)]  ;;从ack tuple中取出streamid
               (if (= stream-id Constants/SYSTEM_TICK_STREAM_ID) ;;收到system_tick_stream, rotate pending, spout的pending和acker的pending超期时间是一样的, 都取决于system-tick
                 (.rotate pending)
                 (let [id (.getValue tuple 0) ;;else,其他的stream,取出tuple id
                       ^OutputCollector output-collector (.getObject output-collector)
                       curr (.get pending id) ;;取出相应tuple的track entry
                       curr (condp = stream-id
                                ACKER-INIT-STREAM-ID (-> curr  ;;初始化tuple的track entry
                                                         (update-ack (.getValue tuple 1)) ;;更新entry中的track value
                                                         (assoc :spout-task (.getValue tuple 2))) ;;记录该tuple和spout-task的关系, 这样在ack或fail的时候才知道通知谁
                                ACKER-ACK-STREAM-ID (update-ack curr (.getValue tuple 1));;ack, 用val和原来的entry value做异或
                                ACKER-FAIL-STREAM-ID (assoc curr :failed true))] ;;fail, 直接把entry的:failed设true
                   (.put pending id curr)
                   (when (and curr (:spout-task curr))
                     (cond (= 0 (:val curr)) ;;val为0, 表示该tuple的所有edge都被成功ack
                           (do
                             (.remove pending id) ;;从pending中删除track entry, 并向相应的spout-task发送ack消息
                             (acker-emit-direct output-collector
                                                (:spout-task curr)
                                                ACKER-ACK-STREAM-ID
                                                [id]
                                                ))
                           (:failed curr) ;;:failed为true, 表示该tuple失败
                           (do
                             (.remove pending id) ;;从pending中删除track entry, 并向相应的spout-task发送fail消息 
                             (acker-emit-direct output-collector
                                                (:spout-task curr)
                                                ACKER-FAIL-STREAM-ID
                                                [id]
                                                ))
                           ))
                   (.ack output-collector tuple) ;;acker bolt也是bolt, 所以最后完成对该ack tuple的ack
                   ))))
      (^void cleanup [this]
        )
      )))

 

(defn- update-ack [curr-entry val]
  (let [old (get curr-entry :val 0)] ;;取出entry中的value值,默认设为0
    (assoc curr-entry :val (bit-xor old val)) ;;将old和新val异或, 赋给entry的value
    ))

本文章摘自博客园,原文发布日期:2013-08-06
相关实践学习
深入解析Docker容器化技术
Docker是一个开源的应用容器引擎,让开发者可以打包他们的应用以及依赖包到一个可移植的容器中,然后发布到任何流行的Linux机器上,也可以实现虚拟化,容器是完全使用沙箱机制,相互之间不会有任何接口。Docker是世界领先的软件容器平台。开发人员利用Docker可以消除协作编码时“在我的机器上可正常工作”的问题。运维人员利用Docker可以在隔离容器中并行运行和管理应用,获得更好的计算密度。企业利用Docker可以构建敏捷的软件交付管道,以更快的速度、更高的安全性和可靠的信誉为Linux和Windows Server应用发布新功能。 在本套课程中,我们将全面的讲解Docker技术栈,从环境安装到容器、镜像操作以及生产环境如何部署开发的微服务应用。本课程由黑马程序员提供。     相关的阿里云产品:容器服务 ACK 容器服务 Kubernetes 版(简称 ACK)提供高性能可伸缩的容器应用管理能力,支持企业级容器化应用的全生命周期管理。整合阿里云虚拟化、存储、网络和安全能力,打造云端最佳容器化应用运行环境。 了解产品详情: https://www.aliyun.com/product/kubernetes
目录
相关文章
|
人工智能
AI 古籍修复的意义
AI 古籍修复的意义
409 0
|
人工智能 数据挖掘 数据库
拥抱Data+AI|破解电商7大挑战,DMS+AnalyticDB助力企业智能决策
本文为数据库「拥抱Data+AI」系列连载第1篇,该系列是阿里云瑶池数据库面向各行业Data+AI应用场景,基于真实客户案例&最佳实践,展示Data+AI行业解决方案的连载文章。本篇内容针对电商行业痛点,将深入探讨如何利用数据与AI技术以及数据分析方法论,为电商行业注入新的活力与效能。
拥抱Data+AI|破解电商7大挑战,DMS+AnalyticDB助力企业智能决策
|
JSON BI API
商城上货API接口的实战案例
在商城上货过程中,API接口扮演着至关重要的角色。以下是对商城上货API接口的实战分析,涵盖其主要功能、类型、安全性以及实战案例等方面。
|
编解码 算法 IDE
Python实现数据加密-解密
Python实现数据加密-解密
269 0
|
人工智能 数据处理 Python
🔍数据侦探的AI助手:Prompt技巧大公开,洞察商业先机不手软
【8月更文挑战第1天】在数据驱动时代,AI助手作为数据侦探的强大伙伴,通过精心设计的AI Prompt技巧帮助解析复杂市场。案例中,一电商平台欲进入新兴市场,面临数据挑战。初始Prompt聚焦消费者偏好及影响因素分析。为进一步深化洞察,Prompt加入节假日购物模式、商品类别偏好及社交媒体影响等细节。结合领域知识,优化Prompt关注价格敏感度与定制化营销策略。最终,AI助手生成的报告揭示了消费者行为模式,并提出市场策略建议,助力电商成功布局新兴市场。此过程展示了AI Prompt在商业洞察中的关键作用,预示着其在未来洞察之旅中的广阔前景。
447 2
|
网络协议 安全 网络性能优化
IPv6的这几个优点,你不会一个都不知道吧?
IPv6的这几个优点,你不会一个都不知道吧?
759 0
|
存储 机器学习/深度学习 人工智能
香橙派——雕琢智能时代的瑰宝为AI而生(一)
香橙派——雕琢智能时代的瑰宝为AI而生(一)
461 0
|
安全 Java Linux
Python特点及优势
Python优势及其特点
385 0
PDF工具Adobe Arcrobat Pro DC下载安装教程
Acrobat是一款PDF(Portable Document Format,便携式文档格式)编辑软件。借助它,您可以以PDF格式制作和保存你的文档 ,以便于浏览和打印,或使用更高级的功能。