Storm-源码分析- Component ,Executor ,Task之间关系

简介:

Component包含Executor(threads)的个数 
在StormBase中的num-executors, 这对应于你写topology代码时, 为每个component指定的并发数(通过setBolt和setSpout)

 

Component和Task的对应关系, (storm-task-info) 
默认你可以不指定task数, 那么task和executor为1:1关系 
当然也可以通过ComponentConfigurationDeclarer#setNumTasks()去设置TOPOLOGY_TASKS 
这个函数, 首先读出所有components 
对每个component, 读出ComponentComm中的json_conf, 然后从里面读出上面设置的TOPOLOGY_TASKS 
最后用递增序列产生taskid, 并最终生成component和task的对应关系 
如果不设置, task数等于executor数, 后面分配就很容易, 否则就涉及task分配问题

(defn storm-task-info
  "Returns map from task -> component id"
  [^StormTopology user-topology storm-conf]
  (->> (system-topology! storm-conf user-topology)
       all-components
       (map-val (comp #(get % TOPOLOGY-TASKS) component-conf))
       (sort-by first)
       (mapcat (fn [[c num-tasks]] (repeat num-tasks c)))
       (map (fn [id comp] [id comp]) (iterate (comp int inc) (int 1)))
       (into {})
       ))

首先产生system-topology!, 因为system-topology!会增加系统components, acker, systemBolt, metricsBlot, 这些也都是topology中不可缺少的部分, 所以单纯使用用户定义的topology是不够的 
然后取出topology里面所有component

(defn all-components [^StormTopology topology]
  (apply merge {}
         (for [f thrift/STORM-TOPOLOGY-FIELDS]
           (.getFieldValue topology f)
           )))
使用thrift/STORM-TOPOLOGY-FIELDS从StormTopology的metadata里面读出每个fieldid, 并取出value进行merge 
所以结果就是下面3个map, merge在一起的集合 
struct StormTopology {
  //ids must be unique across maps
  // #workers to use is in conf
  1: required map<string, SpoutSpec> spouts;
  2: required map<string, Bolt> bolts;
  3: required map<string, StateSpoutSpec> state_spouts;
}

使用map-value对map中的component进行如下操作 
取出component里面的ComponentComm对象(.getcommon), 并读出json_conf, 最终读出conf中TOPOLOGY-TASKS

(defn component-conf [component]
  (->> component
      .get_common
      .get_json_conf
      from-json))
struct ComponentCommon {
  1: required map<GlobalStreamId, Grouping> inputs;
  2: required map<string, StreamInfo> streams; //key is stream id
  3: optional i32 parallelism_hint; //how many threads across the cluster should be dedicated to this component
  // component specific configuration
  4: optional string json_conf;
}

输出{component-string:tasknum}, 按component-string排序, 再进行mapcat 
{c1 3, c2 2, c3 1} –> (c1,c1,c1,c2,c2,c3) 
再加上递增编号, into到map, {1 c1, 2 c1, 3 c1, 4 c2, 5 c2, 6 c3} 

Topology中, Task和Executor的分配关系, (compute-executors) 
上面已经产生, component->executors 和 component->task, 现在根据component对应的task和executor个数进行task分配(到executor) 
默认是1:1分配, 但如果设置了task数, 
比如对于c1, 2个executor, 3个tasks [1 2 3], 分配结果就是['(1 2) ‘(3)] 
最终to-executor-id, 列出每个executor中task id的范围([(first task-ids) (last task-ids)])

(defn- compute-executors [nimbus storm-id]
  (let [conf (:conf nimbus)
        storm-base (.storm-base (:storm-cluster-state nimbus) storm-id nil)
        component->executors (:component->executors storm-base) ;从storm-base中获取每个component配置的(executor)线程数
        storm-conf (read-storm-conf conf storm-id)
        topology (read-storm-topology conf storm-id)
        task->component (storm-task-info topology storm-conf)]
    (->> (storm-task-info topology storm-conf)
         reverse-map ;{“c1” [1,2,3], “c2” [4,5], “c3” 6}
         (map-val sort)
         (join-maps component->executors) ; {"c1" ‘(2 [1 2 3]), "c2" ‘(2 [4 5]), "c3" ‘(1 6)}
         (map-val (partial apply partition-fixed)) ; {"c1" ['(1 2) '(3)], "c2" ['(4) '(5)], "c3" ['(6)]} 
         (mapcat second) ;((1 2) (3) (4) (5) (6))
         (map to-executor-id) ;([1 2] [3 3] [4 4] [5 5] [6 6])
         )))

 

partition-fixed, 将aseq分成max-num-chunks份

思路, 
7整除3, 2余1 
所以, 分成3份, 每份2个, 还余一个 
把这个放到第一份里面, 
所以, 有1份的2+1个, 有(3-1)份的2个

这里使用integer-divided(7 3), ([3 1] [2 2]) , 刚开始比较难理解, 其实函数名起的不好, 这里不光除, 已经做了划分 
返回的结果的意思是, 1份3个, 2份2个

接着就是使用split-at, loop划分

(defn partition-fixed 
“(partition-fixed 3 '( 1 2 3 4 5 6 7)) [(1 2 3) (4 5) (6 7)]”
  [max-num-chunks aseq]
  (if (zero? max-num-chunks)
    []
    (let [chunks (->> (integer-divided (count aseq) max-num-chunks)
                      (#(dissoc % 0))
                      (sort-by (comp - first))
                      (mapcat (fn [[size amt]] (repeat amt size)))
                      )]
      (loop [result []
             [chunk & rest-chunks] chunks
             data aseq]
        (if (nil? chunk)
          result
          (let [[c rest-data] (split-at chunk data)]
            (recur (conj result c)
                   rest-chunks
                   rest-data)))))))

 

Topology中, Executor和component的关系, (compute-executor->component ), 根据(executor:task)关系和(task:component)关系join

(defn- compute-executor->component [nimbus storm-id]
  (let [conf (:conf nimbus)
        executors (compute-executors nimbus storm-id)
        topology (read-storm-topology conf storm-id)
        storm-conf (read-storm-conf conf storm-id)
        task->component (storm-task-info topology storm-conf)
        executor->component (into {} (for [executor executors
                                           :let [start-task (first executor)
                                                 component (task->component start-task)]]
                                       {executor component}))]
        executor->component)) ;{[1 2] “c1”, [3 3] “c1”, [4 4] “c2”, [5 5] “c2”, [6 6] “c3”} 
 

最终目的就是获得executor->component关系, 用于后面的assignment, 其中每个executor包含task范围[starttask, endtask]


本文章摘自博客园,原文发布日期:2013-06-18

目录
相关文章
|
1月前
|
Java 数据库
详解Task 和 ValueTask 的使用区别
详解Task 和 ValueTask 的使用区别
29 0
|
分布式计算 并行计算 数据处理
|
存储 分布式计算 大数据
Spark 原理_运行过程_stage 和 task 的关系 | 学习笔记
快速学习 Spark 原理_运行过程_stage 和 task 的关系
207 0
Spark 原理_运行过程_stage 和 task 的关系 | 学习笔记
|
分布式计算 大数据 调度
Spark 原理_运行过程_Job 和 Stage 的关系 | 学习笔记
快速学习 Spark 原理_运行过程_Job 和 Stage 的关系
162 0
Spark 原理_运行过程_Job 和 Stage 的关系 | 学习笔记
|
分布式计算 Spark C++
Spark的一个经典问题(1个Core5个Executor和5个Core1个Executor有什么区别)
Spark的一个经典问题(1个Core5个Executor和5个Core1个Executor有什么区别)
|
存储 算法 Unix
bthread源码剖析(四): 通过ParkingLot实现Worker间任务状态同步
通过之前的文章我们知道TaskGroup(以下简称TG)是在死循环等待任务,然后切换栈去执行任务。在当前TG没有任务的时候会进行“工作窃取”窃取其他TG的任务。在没有任务的时候TG会“休眠”,当任务出现的时候被唤醒然后消费。
318 0