Twitter Storm中Bolt消息传递路径之源码解读

简介: Bolt作为task被executor执行,而executor是一个个的线程,所以executor必须存在于具体的process之中,而这个process就是worker。至于worker是如何被supervisor创建,尔后worker又如何创建executor线程,这些暂且按下不表。

Bolt作为task被executor执行,而executor是一个个的线程,所以executor必须存在于具体的process之中,而这个process就是worker。至于worker是如何被supervisor创建,尔后worker又如何创建executor线程,这些暂且按下不表。

 
假设同属于一个Topology的Spout与Bolt分别处于不同的JVM,即不同的worker中,不同的JVM可能处于同一台物理机器,也可能处于不同的物理机器中。为了让情景简单,认为JVM处于不同的物理机器中。
 
Spout的输出消息到达Bolt,作为Bolt的输入会经过这么几个阶段。
 
1. spout的输出通过该spout所处worker的消息输出线程,将tuple输入到Bolt所属的worker。它们之间的通路是socket连接,用ZeroMQ实现。
2. bolt所处的worker有一个专门处理socket消息的receive thread 接收到spout发送来的tuple
3. receive thread将接收到的消息传送给对应的bolt所在的executor。 在worker内部(即同一process内部),消息传递使用的是Lmax Disruptor pattern.
4. executor接收到tuple之后,由event-handler进行处理
 
下面是具体的源码
1. worker创建消息接收线程 
 
worker.clj
 
(defn launch-receive-thread [worker]
  (log-message "Launching receive-thread for " (:assignment-id worker) ":" (:port worker))
  (msg-loader/launch-receive-thread!
    (:mq-context worker)
    (:storm-id worker)
    (:port worker)
    (:transfer-local-fn worker)
    (-> worker :storm-conf (get TOPOLOGY-RECEIVER-BUFFER-SIZE))
    :kill-fn (fn [t] (halt-process! 11))))
 
注意加亮的行会将storm.yaml中配置使用ZMQ或其它
storm.messaging.transport:"backtype.storm.messaging.zmq"
 
2. worker从socket接收到新消息
vthread (async-loop
                 (fn []
                   (let [socket (.bind ^IContext context storm-id port)]
                     (fn []
                       (let [batched (ArrayList.)
                             init (.recv ^IConnection socket 0)]
                         (loop [packet init]
                           (let [task (if packet (.task ^TaskMessage packet))
                                 message (if packet (.message ^TaskMessage packet))]
                             (if (= task -1)
                               (do (log-message "Receiving-thread:[" storm-id ", " port "] received shutdown notice")
                                 (.close socket)
                                 nil )
                               (do
                                 (when packet (.add batched [task message]))
                                 (if (and packet (< (.size batched) max-buffer-size))
                                   (recur (.recv ^IConnection socket 1))
                                   (do (transfer-local-fn batched)
                                     0 ))))))))))
 
加亮行使用的transfer-local-fn会将接收的TaskMessage传递给相应的executor
 
3.  transfer-local-fn
 
(defn mk-transfer-local-fn [worker]
  (let [short-executor-receive-queue-map (:short-executor-receive-queue-map worker)
        task->short-executor (:task->short-executor worker)
        task-getter (comp #(get task->short-executor %) fast-first)]
    (fn [tuple-batch]
      (let [grouped (fast-group-by task-getter tuple-batch)]
        (fast-map-iter [[short-executor pairs] grouped]
          (let [q (short-executor-receive-queue-map short-executor)]
            (if q
              (disruptor/publish q pairs)
              (log-warn "Received invalid messages for unknown tasks. Dropping... ")
              )))))))
 
用disruptor在线程之间进行消息传递。
 
多费一句话,mk-transfer-local-fn表示将外部世界的消息传递给本进程内的线程。而mk-transfer-fn则刚好在方向上反过来。
 
4. 消息被executor处理
 
executor.clj
==========================================================
(defn mk-task-receiver [executor-data tuple-action-fn]
  (let [^KryoTupleDeserializer deserializer (:deserializer executor-data)
        task-ids (:task-ids executor-data)
        debug? (= true (-> executor-data :storm-conf (get TOPOLOGY-DEBUG)))
        ]
    (disruptor/clojure-handler
      (fn [tuple-batch sequence-id end-of-batch?]
        (fast-list-iter [[task-id msg] tuple-batch]
          (let [^TupleImpl tuple (if (instance? Tuple msg) msg (.deserialize deserializer msg))]
            (when debug? (log-message "Processing received message " tuple))
            (if task-id
              (tuple-action-fn task-id tuple)
              ;; null task ids are broadcast tuples
              (fast-list-iter [task-id task-ids]
                (tuple-action-fn task-id tuple)
                ))
            ))))))
 
加亮行中tuple-action-fn定义于mk-threads(源文件executor.clj)中。因为当前以Bolt为例,所以会调用的tuple-action-fn定义于 defmethod mk-threads :bolt [executor-data task-datas]
 
那么mk-task-receiver是如何与disruptor关联起来的呢,可以见定义于mk-threads中的下述代码
(let [receive-queue (:receive-queue executor-data)
              event-handler (mk-task-receiver executor-data tuple-action-fn)]
          (disruptor/consumer-started! receive-queue)
          (fn []            
            (disruptor/consume-batch-when-available receive-queue event-handler)
            0)))
 
到了这里,消息的发送与接收处理路径打通。
目录
相关文章
|
JavaScript 前端开发
HTML VSCode 自用插件列表 (包含Vue)
HTML VSCode 自用插件列表 (包含Vue)
371 0
|
存储 运维 网络协议
如何实现 呼叫速率(caps) 值控制
首先,企业需要明确的是呼叫中心系统搭建的目的。搭建这个呼叫中心是想用来做什么呢? 是为了企业当做呼入型的客服使用? 还是用来当电话外呼使用? 是企业想做营销用呢还是政府单位办公使用? 是要做外包服务呢还是自己用? 是想挣钱用呢还是想做客户服务使用? 需求 呼叫中心的搭建肯定有需求,这就要把一份需求文档写出来。比如想要做什么,每一点写的清清楚楚: 需求包括现在有什么? 碰到了什么问题? 以后想整成什么样? 设置,扩容以及升级的快速,成本与灵活性 一个企业的业务、流程与规模有时候随着市场的快速成长会有很大的变化。这时候,企业的呼叫中心系统就要能够快速的适应市场,能让企业做出适当的调整。比如,呼叫中
|
4月前
|
人工智能 搜索推荐 小程序
分享技术---AI智能题库考试系统
本平台融合AI智能技术,打造高效试题库系统,支持PC、手机在线刷题,提供智能出题、自动解析、错题回顾等功能,提升学习效率。具备章节练习、背题模式、笔记收藏等多样化学习方式,支持全终端同步,助力学员精准突破薄弱环节,快速提分。
|
8月前
|
开发框架 前端开发 JavaScript
一文彻底搞清楚HarmonyOS中的ArkUI
本文介绍了华为推出的跨平台UI框架ArkUI,旨在简化多平台应用开发。ArkUI支持声明式和类Web两种开发范式,其中声明式开发范式因其高效简洁、性能优越和未来发展潜力而被推荐。ArkUI提供了丰富的组件、布局、动画和交互事件等功能,帮助开发者构建美观流畅的应用界面。其架构体系包括声明式UI前端、语言运行时、后端引擎、渲染引擎和平台适配层,确保高效开发和跨平台兼容性。
500 0
一文彻底搞清楚HarmonyOS中的ArkUI
|
安全 Java Serverless
Lambda表达式使用及详解
Java 8引入的Lambda表达式是一种重要的功能,它允许你以更简洁、更直接的方式传递方法。Lambda可以被用来代替只有单个抽象方法的接口的匿名实现类。这里有一些Lambda表达式的实际应用场景及其示例:
258 3
|
SQL 人工智能 大数据
首个大数据批流融合国家标准正式发布,阿里云为牵头起草单位!
近日,国家市场监督管理总局、国家标准化管理委员会正式发布大数据领域首个批流融合国家标准 GB/T 44216-2024《信息技术 大数据 批流融合计算技术要求》,该标准由阿里云牵头起草,并将于2025年2月1日起正式实施。
|
机器学习/深度学习 人工智能 自然语言处理
【人工智能】关键的人工智能领域专业术语及其简要解释
人工智能(Artificial Intelligence, AI)领域涉及众多专业术语,这些术语涵盖了从基础理论到具体应用的各个方面。以下是一些关键的人工智能领域专业术语及其简要解释
659 2
|
缓存 负载均衡 数据管理
深入探索微服务架构的核心要素与实践策略在当今软件开发领域,微服务架构以其独特的优势和灵活性,已成为众多企业和开发者的首选。本文将深入探讨微服务架构的核心要素,包括服务拆分、通信机制、数据管理等,并结合实际案例分析其在不同场景下的应用策略,旨在为读者提供一套全面、深入的微服务架构实践指南。**
**微服务架构作为软件开发领域的热门话题,正引领着一场技术革新。本文从微服务架构的核心要素出发,详细阐述了服务拆分的原则与方法、通信机制的选择与优化、数据管理的策略与挑战等内容。同时,结合具体案例,分析了微服务架构在不同场景下的应用策略,为读者提供了实用的指导和建议。
|
人工智能 机器人 Linux
超级炫酷的AI绘图工具—MidJourney入门使用教程
超级炫酷的AI绘图工具—MidJourney入门使用教程