Storm源码浅析之topology的提交

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
云原生网关 MSE Higress,422元/月
任务调度 XXL-JOB 版免费试用,400 元额度,开发版规格
简介:
    最近一直在读twitter开源的这个分布式流计算框架——storm的源码,还是有必要记录下一些比较有意思的地方。我按照storm的主要概念进行组织,并且只分析我关注的东西,因此称之为浅析。       

一、介绍
    Storm的开发语言主要是Java和Clojure,其中Java定义骨架,而Clojure编写核心逻辑。源码统计结果:
<!–

Code highlighting produced by Actipro CodeHighlighter (freeware)
http://www.CodeHighlighter.com/

–>     180 text files.
     
177 unique files.                                          
       
7 files ignored.

http:
//cloc.sourceforge.net v 1.55  T=1.0 s (171.0 files/s, 46869.0 lines/s)
——————————————————————————-
Language                     files          blank        comment           code
——————————————————————————-
Java                           
125           5010           2414          25661
Lisp                            
33            732            283           4871
Python                           
7            742            433           4675
CSS                              
1             12             45           1837
ruby                             
2             22              0            104
Bourne Shell                     
1              0              0              6
Javascript                       
2              1             15              6
——————————————————————————-
SUM:                           
171           6519           3190          37160
——————————————————————————-

    Java代码25000多行,而Clojure(Lisp)只有4871行,说语言不重要再次证明是扯淡。
        
二、Topology和Nimbus       
    Topology是storm的核心理念,将spout和bolt组织成一个topology,运行在storm集群里,完成实时分析和计算的任务。这里我主要想介绍下topology部署到storm集群的大概过程。提交一个topology任务到Storm集群是通过StormSubmitter.submitTopology方法提交:
<!–

Code highlighting produced by Actipro CodeHighlighter (freeware)
http://www.CodeHighlighter.com/

–>StormSubmitter.submitTopology(name, conf, builder.createTopology());
    我们将topology打成jar包后,利用bin/storm这个python脚本,执行如下命令:
<!–

Code highlighting produced by Actipro CodeHighlighter (freeware)
http://www.CodeHighlighter.com/

–>bin/storm jar xxxx.jar com.taobao.MyTopology args
    将jar包提交给storm集群。storm脚本会启动JVM执行Topology的main方法,执行submitTopology的过程。而submitTopology会将jar文件上传到nimbus,上传是通过socket传输。在storm这个python脚本的jar方法里可以看到:
<!–

Code highlighting produced by Actipro CodeHighlighter (freeware)
http://www.CodeHighlighter.com/

–>def jar(jarfile, klass, args):                                                                                                                               
   exec_storm_class(                                                                                                                                          
        klass,                                                                                                                                                
        jvmtype
=-client,                                                                                                                                    
        extrajars
=[jarfile, CONF_DIR, STORM_DIR + /bin],                                                                                                    
        args
=args,                                                                                                                                            
        prefix
=export STORM_JAR= + jarfile + ;)
     将jar文件的地址设置为环境变量STORM_JAR,这个环境变量在执行submitTopology的时候用到:
<!–

Code highlighting produced by Actipro CodeHighlighter (freeware)
http://www.CodeHighlighter.com/

–>//StormSubmitter.java 
private static void submitJar(Map conf) {
        
if(submittedJar==null) {
            LOG.info(
Jar not uploaded to master yet. Submitting jar);
            String localJar 
= System.getenv(STORM_JAR);
            submittedJar 
= submitJar(conf, localJar);
        } 
else {
            LOG.info(
Jar already uploaded to master. Not submitting jar.);
        }
    }
    通过环境变量找到jar包的地址,然后上传。利用环境变量传参是个小技巧。

    其次,nimbus在接收到jar文件后,存放到数据目录的inbox目录,nimbus数据目录的结构
<!–

Code highlighting produced by Actipro CodeHighlighter (freeware)
http://www.CodeHighlighter.com/

–>-nimbus
     
-inbox
         
-stormjar-57f1d694-2865-4b3b-8a7c-99104fc0aea3.jar
         
-stormjar-76b4e316-b430-4215-9e26-4f33ba4ee520.jar

     
-stormdist
        
-storm-id
           
-stormjar.jar
           
-stormconf.ser
           
-stormcode.ser
     其中inbox用于存放提交的jar文件,每个jar文件都重命名为stormjar加上一个32位的UUID。而stormdist存放的是启动topology后生成的文件,每个topology都分配一个唯一的id,ID的规则是“name-计数-时间戳”。启动后的topology的jar文件名命名为storm.jar ,而它的配置经过java序列化后存放在stormconf.ser文件,而stormcode.ser是将topology本身序列化后存放的文件。这些文件在部署的时候,supervisor会从这个目录下载这些文件,然后在supervisor本地执行这些代码。
    进入重点,topology任务的分配过程(zookeeper路径说明忽略root):
1.在zookeeper上创建/taskheartbeats/{storm id} 路径,用于任务的心跳检测。storm对zookeeper的一个重要应用就是利用zk的临时节点做存活检测。task将定时刷新节点的时间戳,然后nimbus会检测这个时间戳是否超过timeout设置。
2.从topology中获取bolts,spouts设置的并行数目以及全局配置的最大并行数,然后产生task id列表,如[1 2 3 4]
3.在zookeeper上创建/tasks/{strom id}/{task id}路径,并存储task信息
4.开始分配任务(内部称为assignment), 具体步骤:
 (1)从zk上获得已有的assignment(新的toplogy当然没有了)
 (2)查找所有可用的slot,所谓slot就是可用的worker,在所有supervisor上配置的多个worker的端口。
 (3)将任务均匀地分配给可用的worker,这里有两种情况:
 (a)task数目比worker多,例如task是[1 2 3 4],可用的slot只有[host1:port1 host2:port1],那么最终是这样分配
<!–

Code highlighting produced by Actipro CodeHighlighter (freeware)
http://www.CodeHighlighter.com/

–>{1: [host1:port1] 2 : [host2:port1]
         
3 : [host1:port1] 4 : [host2:port1]}
,可以看到任务平均地分配在两个worker上。
(b)如果task数目比worker少,例如task是[1 2],而worker有[host1:port1 host1:port2 host2:port1 host2:port2],那么首先会将woker排序,*将不同host间隔排列,保证task不会全部分配到同一个worker上,也就是将worker排列成
<!–

Code highlighting produced by Actipro CodeHighlighter (freeware)
http://www.CodeHighlighter.com/

–>[host1:port1 host2:port1 host1:port2 host2:port2]
,然后分配任务为
<!–

Code highlighting produced by Actipro CodeHighlighter (freeware)
http://www.CodeHighlighter.com/

–>{1: host1:port1 , 2 : host2:port2}

(4)记录启动时间
(5)判断现有的assignment是否跟重新分配的assignment相同,如果相同,不需要变更,否则更新assignment到zookeeper的/assignments/{storm id}上。
5.启动topology,所谓启动,只是将zookeeper上/storms/{storm id}对应的数据里的active设置为true。

6.nimbus会检查task的心跳,如果发现task心跳超过超时时间,那么会重新跳到第4步做re-assignment。


本文来源于"阿里中间件团队播客",原文发表时间" 2011-12-02 "
相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
相关文章
|
资源调度 Java 流计算
flink yarn-per-job提交报错
flink yarn-per-job提交这个是什么问题吗,主机端口都是正常的,就报了拒绝连接?image.png 一提交到yarn就报这个错,然后失败,提交命令 flink run -t yarn-per-job -c app.dwm.UVFilterDetail /root/gmall_flink/flink_app/gmall-start-try-self-do-1.0-SNAPSHOT-jar-with-dependencies.jar,yarn-session提交是正常的,就per-job有问题?
114 1
|
8月前
|
资源调度 关系型数据库 数据库
实时计算 Flink版产品使用合集之flink-cdc.sh xx.yaml提交到yarn 发现没有启动task manager的,怎么处理
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStreamAPI、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
5月前
|
容灾 流计算
美团 Flink 大作业部署问题之Checkpoint 的 metadata 文件包含什么信息
美团 Flink 大作业部署问题之Checkpoint 的 metadata 文件包含什么信息
|
6月前
|
数据处理 API 调度
深入理解Flink Flink Job提交和Flink Graph详解
Apache Flink通过其高效的作业提交流程及灵活的Graph表示,为处理大规模数据流提供了强大的能力。理解Flink Job的提交与任务调度,以及Flink Graph的构建和优化,是深入掌握Flink并高效利用其处理能力的关键。Flink的设计哲学和强大功能使其成为实时数据处理领域的重要选择之一。
234 3
|
6月前
|
消息中间件 Kubernetes Kafka
实时计算 Flink版操作报错合集之在Rancher K8s部署时,TaskManager无法正常连接到其他TaskManager,该如何处理
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
7月前
|
资源调度 分布式计算 Oracle
实时计算 Flink版操作报错合集之flink on yarn job manager 可以启动, 但不给分配slot ,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
213 0
|
8月前
|
存储 监控 调度
【Flink】怎么提交的实时任务,有多少Job Manager?
【4月更文挑战第18天】【Flink】怎么提交的实时任务,有多少Job Manager?
|
8月前
|
监控 Java 流计算
读Flink源码谈设计:Metric
前阵子笔者涉及了些许监控相关的开发工作,在开发过程中也碰到过些许问题,便翻读了Flink相关部分的代码,在读代码的过程中发现了一些好的设计,因此也是写成文章整理上来。
408 0
读Flink源码谈设计:Metric
|
资源调度 程序员 网络安全
Flink on Yarn三部曲之三:提交Flink任务
Flink on Yarn在使用的时候分为两种模式,Job Mode和Session Mode,一起来体验这两种模式
379 2
Flink on Yarn三部曲之三:提交Flink任务
|
存储 资源调度 Kubernetes
Flink on Yarn_K8S 原理剖析及实践(二)| 学习笔记
快速学习 Flink on Yarn_K8S 原理剖析及实践。
Flink on Yarn_K8S 原理剖析及实践(二)| 学习笔记