twitter storm源码走读(一)

简介: 本文详细介绍了twitter storm中的nimbus节点的启动场景,分析nimbus是如何一步步实现定义于storm.thrift中的service,以及如何利用curator来和zookeeper server建立通讯。然后尝试分析tuple发送时的两个问题,一是消息在线程间的传递过程及利用

nimbus启动场景分析

本文详细介绍了twitter storm中的nimbus节点的启动场景,分析nimbus是如何一步步实现定义于storm.thrift中的service,以及如何利用curator来和zookeeper server建立通讯。

对于storm client来说,nimbus是storm cluster与外部的唯一接口,是总的接口人,在这个接口上使用thrift定义的各种service。但是nimbus光接单并不干活,具体的脏活累活,这哥们都是分配到各个slots上的,让nimbus来具体管理各个slots也就是worker,似乎还是太累了,中层干部supervisor同学适时参与了。

nimbus并不知道到底有哪些supervisor会加入到自己的团队中,它啥时规定了每个supervisor最多能带几个worker。对于supervisor的加入与退出,是通过zookeeper server来告知的。好了,在下面的分析中,每个接口上的初始化工作具体有哪些将一一呈现。

tuple消息发送场景分析

worker进程内消息接收与处理全景图

先上幅图简要勾勒出worker进程接收到tuple消息之后的处理全过程

IConnection的建立与使用

话说在mk-threads :bolt函数的实现中有这么一段代码,其主要功能是实现tuple的emit功能

复制代码
bolt-emit (fn [stream anchors values task]
          (let [out-tasks (if task
                  (tasks-fn task stream values)
                (tasks-fn stream values))]
        (fast-list-iter [t out-tasks]
                (let [anchors-to-ids (HashMap.)]
                  (fast-list-iter [^TupleImpl a anchors]
                          (let [root-ids (-> a .getMessageId .getAnchorsToIds .keySet)]
                            (when (pos? (count root-ids))
                              (let [edge-id (MessageId/generateId rand)]
                            (.updateAckVal a edge-id)
                            (fast-list-iter [root-id root-ids]
                                    (put-xor! anchors-to-ids root-id edge-id))
                            ))))
                  (transfer-fn t
                           (TupleImpl. worker-context
                               values
                               task-id
                               stream
                               (MessageId/makeId anchors-to-ids)))))
        (or out-tasks [])))
复制代码

 

加亮为蓝色的部分实现的功能是另外发送tuple,那么transfer-fn函数的定义在哪呢?见mk-threads的let部分,能见到下述一行代码

:transfer-fn (mk-executor-transfer-fn batch-transfer->worker)

在继续往下看每个函数实现之前,先确定一下这节代码阅读的目的。storm在线程之间使用disruptor进行通讯,在进程之间进行消息通讯使用的是zeromq或netty, 所以需要从transfer-fn追踪到使用zeromq或netty api的位置。

再看mk-executor-transfer-fn函数实现

复制代码
(defn mk-executor-transfer-fn [batch-transfer->worker]
  (fn this
      ([task tuple block? ^List overflow-buffer]
       (if (and overflow-buffer (not (.isEmpty overflow-buffer)))
       (.add overflow-buffer [task tuple])
     (try-cause
      (disruptor/publish batch-transfer->worker [task tuple] block?)
      (catch InsufficientCapacityException e
         (if overflow-buffer
             (.add overflow-buffer [task tuple])
           (throw e))
         ))))
      ([task tuple overflow-buffer]
       (this task tuple (nil? overflow-buffer) overflow-buffer))
      ([task tuple]
       (this task tuple nil)
       )))
复制代码

disruptor/publish表示将消息从本线程发送出去,至于谁是该消息的接收者,请继续往下看。

worker进程中,有一个receiver-thread是用来专门接收来自外部进程的消息,那么与之相对的是有一个transfer-thread用来将本进程的消息发送给外部进程。所以刚才的disruptor/publish发送出来的消息应该被transfer-thread接收到。

在transfer-thread中,能找到这行下述一行代码

transfer-thread (disruptor/consume-loop* (:transfer-queue worker) transfer-tuples)

对于接收到来自本进程中其它线程发送过来的消息利用transfer-tuples进行处理,transfer-tuples使用mk-transfer-tuples-handler来创建,所以需要看看mk-transfer-tuples-handler能否与zeromq或netty联系上呢?

复制代码
(defn mk-transfer-tuples-handler [worker]
  (let [^DisruptorQueue transfer-queue (:transfer-queue worker)
            drainer (ArrayList.)
            node+port->socket (:cached-node+port->socket worker)
            task->node+port (:cached-task->node+port worker)
            endpoint-socket-lock (:endpoint-socket-lock worker)
            ]
    (disruptor/clojure-handler
     (fn [packets _ batch-end?]
     (.addAll drainer packets)
     (when batch-end?
       (read-locked endpoint-socket-lock
            (let [node+port->socket @node+port->socket
                        task->node+port @task->node+port]
              ;; consider doing some automatic batching here (would need to not be serialized at this point to remo
              ;; try using multipart messages ... first sort the tuples by the target node (without changing the lo
              17
              (fast-list-iter [[task ser-tuple] drainer]
                      ;; TODO: consider write a batch of tuples here to every target worker
                      ;; group by node+port, do multipart send
                      (let [node-port (get task->node+port task)]
                        (when node-port
                          (.send ^IConnection (get node+port->socket node-port) task ser-tuple))
                        ))))
       (.clear drainer))))))
复制代码

上述代码中出现了与zeromq可能有联系的部分了即加亮为红色的一行。

那凭什么说加亮的IConnection一行与zeromq有关系的,这话得慢慢说起,需要从配置文件开始。

在storm.yaml中有这么一行配置项,即

storm.messaging.transport: "backtype.storm.messaging.zmq"

这个配置项与worker中的mqcontext相对应,所以在worker中以mqcontext为线索,就能够一步步找到IConnection的实现。connections在函数mk-refresh-connections中建立

refresh-connections (mk-refresh-connections worker)

mk-refresh-connection函数中与mq-context相关联的一部分代码如下所示

复制代码
(swap! (:cached-node+port->socket worker)
       #(HashMap. (merge (into {} %1) %2))
       (into {}
         (dofor [endpoint-str new-connections
                  :let [[node port] (string->endpoint endpoint-str)]]
             [endpoint-str
               (.connect
            ^IContext (:mq-context worker)
            storm-id
            ((:node->host assignment) node)
            port)
               ]
            )))
复制代码

注意加亮部分,利用mq-conext中connect函数来创建IConnection. 当打开zmq.clj时候,就能验证我们的猜测。

复制代码
(^IConnection connect [this ^String storm-id ^String host ^int port]
          (require 'backtype.storm.messaging.zmq)
          (-> context
          (mq/socket mq/push)
          (mq/set-hwm hwm)
          (mq/set-linger linger-ms)
          (mq/connect (get-connect-zmq-url local? host port))
          mk-connection))
复制代码

代码走到这里,IConnection什么时候建立起来的谜底就揭开了,消息是如何从bolt或spout线程传递到transfer-thread,再由zeromq将tuple发送给下跳的路径打通了。

tuple的分发策略 grouping

从一个bolt中产生的tuple可以有多个bolt接收,到底发送给哪一个bolt呢?这牵扯到分发策略问题,其实在twitter storm中有两个层面的分发策略问题,一个是对于task level的,在讲topology submit的时候已经涉及到。另一个就是现在要讨论的针对tuple level的分发。

再次将视线拉回到bolt-emit中,这次将目光集中在变量t的前前后后。

复制代码
  (let [out-tasks (if task
(tasks-fn task stream values)
(tasks-fn stream values))]
(fast-list-iter [t out-tasks]
(let [anchors-to-ids (HashMap.)]
(fast-list-iter [^TupleImpl a anchors]
(let [root-ids (-> a .getMessageId .getAnchorsToIds .keySet)]
(when (pos? (count root-ids))
(let [edge-id (MessageId/generateId rand)]
(.updateAckVal a edge-id)
(fast-list-iter [root-id root-ids]
(put-xor! anchors-to-ids root-id edge-id))
))))
(transfer-fn t
(TupleImpl. worker-context
values
task-id
stream
(MessageId/makeId anchors-to-ids)))))
复制代码

上述代码显示t从out-tasks来,而out-tasks是tasks-fn的返回值

    tasks-fn (:tasks-fn task-data)

一谈tasks-fn,原来从未涉及的文件task.clj这次被挂上了,task-data与由task/mk-task创建。将中间环节跳过,调用关系如下所列。

  • mk-task
  • mk-task-data
  • mk-tasks-fn

tasks-fn中会使用到grouping,处理代码如下

复制代码
fn ([^Integer out-task-id ^String stream ^List values]
          (when debug?
            (log-message "Emitting direct: " out-task-id "; " component-id " " stream " " values))
          (let [target-component (.getComponentId worker-context out-task-id)
                component->grouping (get stream->component->grouper stream)
                grouping (get component->grouping target-component)
                out-task-id (if grouping out-task-id)]
            (when (and (not-nil? grouping) (not= :direct grouping))
              (throw (IllegalArgumentException. "Cannot emitDirect to a task expecting a regular grouping")))                          
            (apply-hooks user-context .emit (EmitInfo. values stream task-id [out-task-id]))
            (when (emit-sampler)
              (builtin-metrics/emitted-tuple! (:builtin-metrics task-data) executor-stats stream)
              (stats/emitted-tuple! executor-stats stream)
              (if out-task-id
                (stats/transferred-tuples! executor-stats stream 1)
                (builtin-metrics/transferred-tuple! (:builtin-metrics task-data) executor-stats stream 1)))
            (if out-task-id [out-task-id])
            ))
复制代码

而每个topology中的grouping策略又是如何被executor知道的呢,这从另一端executor-data说起。

在mk-executor-data中有下面一行代码 

:stream->component->grouper (outbound-components worker-context component-id)

outbound-components的定义如下

复制代码
(defn outbound-components
  "Returns map of stream id to component id to grouper"
  [^WorkerTopologyContext worker-context component-id]
  (->> (.getTargets worker-context component-id)
       clojurify-structure
       (map (fn [[stream-id component->grouping]]
        [stream-id
         (outbound-groupings
          worker-context
          component-id
          stream-id
          (.getComponentOutputFields worker-context component-id stream-id)
          component->grouping)]))
       (into {})
       (HashMap.)))
复制代码


相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
11天前
|
物联网 虚拟化 Windows
Windows 10 version 22H2 中文版、英文版下载 (2025 年 9 月更新)
Windows 10 version 22H2 中文版、英文版下载 (2025 年 9 月更新)
686 2
Windows 10 version 22H2 中文版、英文版下载 (2025 年 9 月更新)
|
8月前
|
机器学习/深度学习 编解码 数据可视化
RT-DETR改进策略【Backbone/主干网络】| 替换骨干网络为2023-CVPR ConvNeXt V2 (附网络详解和完整配置步骤)
RT-DETR改进策略【Backbone/主干网络】| 替换骨干网络为2023-CVPR ConvNeXt V2 (附网络详解和完整配置步骤)
475 11
RT-DETR改进策略【Backbone/主干网络】| 替换骨干网络为2023-CVPR ConvNeXt V2 (附网络详解和完整配置步骤)
|
22天前
|
SQL 关系型数据库 API
如何开发工程项目部管理系统中的质量管理板块(附架构图+流程图+代码参考)
本文详解如何构建工程项目管理系统中的质量管理模块,涵盖质量检查计划、检查登记、问题清单、整改记录及问题看板五大核心功能。内容包括系统架构设计、业务流程、数据模型、API接口、开发技巧及上线建议,助力实现质量风险的数字化闭环管理,提升项目验收效率与合规性。
|
3月前
|
人工智能 自然语言处理 算法
2025 年 7 月境内深度合成服务算法备案情况分析报告
2025年7月,中央网信办发布第十二批深度合成算法备案信息,全国389款产品通过备案,服务提供者占比超七成。截至7月14日,全国累计备案达3834款,覆盖文本、图像、音视频等多模态场景,广泛应用于生活服务、医疗、金融等领域。广东以135款居首,数字人、AI客服等C端应用主导,民营企业成主力,国企聚焦公共服务。随着AI政策推动,备案已成为AI产品合规上线关键环节。
|
12月前
|
JavaScript 前端开发
原生js常见报错及其处理方案
原生js常见报错及其处理方案
225 0
|
11月前
|
Prometheus 监控 Java
深入探索:自制Agent监控API接口耗时实践
在微服务架构中,监控API接口的调用耗时对于性能优化至关重要。通过监控接口耗时,我们可以识别性能瓶颈,优化服务响应速度。本文将分享如何自己动手实现一个Agent来统计API接口的调用耗时,提供一种实用的技术解决方案。
362 3
|
12月前
|
弹性计算 安全 小程序
编程之美:Python让你领略浪漫星空下的流星雨奇观
这段代码使用 Python 的 `turtle` 库实现了一个流星雨动画。程序通过创建 `Meteor` 类来生成具有随机属性的流星,包括大小、颜色、位置和速度。在无限循环中,流星不断移动并重新绘制,营造出流星雨的效果。环境需求为 Python 3.11.4 和 PyCharm 2023.2.5。
336 9
|
12月前
|
并行计算 算法 前端开发
10GBase-T:解锁万兆以太网的新篇章
【10月更文挑战第18天】
403 3
|
12月前
|
监控 开发者
确保动态导入模块按正确顺序加载
【10月更文挑战第12天】 在复杂应用开发中,确保动态导入模块按正确顺序加载至关重要,直接影响应用性能、功能和稳定性。本文深入探讨了动态模块加载顺序的影响因素、解决方案及实践案例,提供了详细的策略和方法,帮助开发者有效管理模块加载顺序,提升应用质量。
|
11月前
|
网络协议 算法 数据库
OSPF 与 BGP 的互操作性:构建复杂网络的通信桥梁
OSPF 与 BGP 的互操作性:构建复杂网络的通信桥梁
340 0