twitter storm源码走读(五)

简介: 从用户层面来看TridentTopology,有两个重要的概念一是Stream,另一个是作用于Stream上的各种Operation。在实现层面来看,无论是stream,还是后续的operation都会转变成为各个Node,这些Node之间的关系通过重要的数据结构图来维护。具体到TridentTop

TridentTopology创建过程详解

从用户层面来看TridentTopology,有两个重要的概念一是Stream,另一个是作用于Stream上的各种Operation。在实现层面来看,无论是stream,还是后续的operation都会转变成为各个Node,这些Node之间的关系通过重要的数据结构来维护具体到TridentTopology,实现图的各种操作的组件是jgrapht。

说到图,两个基本的概念会闪现出来,一是结点,二是描述结点之间关系的边。要想很好的理解TridentTopology就需要紧盯图中结点和边的变化。

TridentTopology在转换成为普通的StormTopology时,需要将原始的图分成各个group,每个group将运行于一个独立的bolt中。TridentTopology又是如何知道哪些node应该在同一个group,哪些应该处在另一个group中的呢;如何来确定每个group的并发度(parallismHint)的呢。这些问题的解决都与jgrapht分不开。

关于jgrapht的更多信息,请参考其官方网站 http://jgrapht.org

概要

在TridentTopology中向图中添加结点的api有三种:

  1. addNode
  2. addSourcedNode
  3. addSourcedStateNode

其中addNode在创建stream是使用,addSourcedStateNode在partitionPersist时使用到,其它的operation使用到的是addSourcedNode.

addNode与其它两个方法的一个重要区别还在于,addNode是不需要添加边(Edge),而其它两个API需要往图中添加edge,以确定该node的源是哪个。

TridentTopology

1
2
3
4
public  TridentTopology() {
         _graph =  new  DefaultDirectedGraph( new  ErrorEdgeFactory());
         _gen =  new  UniqueIdGen();
     }

 在TridentTopology的构造函数中,创建了DAG(有向无环图)。利用这个_graph来作为容器以存储后续过程中创建的各个node及它们之间的关系。

newStream

 newStream会为DAG(有向无环图)中创建源结点,其调用关系如下所示。

  • newStream
    • addNode
      • registerNode
复制代码
 1 protected void registerNode(Node n) {
 2         _graph.addVertex(n);
 3         if(n.stateInfo!=null) {
 4             String id = n.stateInfo.id;
 5             if(!_colocate.containsKey(id)) {
 6                 _colocate.put(id, new ArrayList());
 7             }
 8             _colocate.get(id).add(n);
 9         }
10     }
复制代码

 

each

作用于stream上的Operation有很多,以each为例来看新的operation是如何转换成为node添加到_graph中的。

复制代码
//Stream.java
public Stream each(Fields inputFields, Function function, Fields functionFields) { projectionValidation(inputFields); return _topology.addSourcedNode(this, new ProcessorNode(_topology.getUniqueStreamId(), _name, TridentUtils.fieldsConcat(getOutputFields(), functionFields), functionFields, new EachProcessor(inputFields, function))); }
复制代码

调用关系描述如下

  • Stream::each
  • TridentTopology::addSourcedNode
  • TridentTopology::registerSourcedNode

registerSourcedNode的实现如下

复制代码
protected void registerSourcedNode(List<Stream> sources, Node newNode) {
        registerNode(newNode);
        int streamIndex = 0;
        for(Stream s: sources) {
            _graph.addEdge(s._node, newNode, new IndexedEdge(s._node, newNode, streamIndex));
            streamIndex++;
        }        
    }
复制代码

注意此处添加edge是,是有索引的,这样可以区别处理的先后顺序。

在Stream中含有成员变量_node,表示stream最近停泊的node,有了该变量添加edge才成为了可能。

 

partitionPersist

复制代码
public TridentState partitionPersist(StateSpec stateSpec, Fields inputFields, StateUpdater updater, Fields functionFields) {
        projectionValidation(inputFields);
        String id = _topology.getUniqueStateId();
        ProcessorNode n = new ProcessorNode(_topology.getUniqueStreamId(),
                    _name,
                    functionFields,
                    functionFields,
                    new PartitionPersistProcessor(id, inputFields, updater));
        n.committer = true;
        n.stateInfo = new NodeStateInfo(id, stateSpec);
        return _topology.addSourcedStateNode(this, n);
    }
复制代码

调用关系

  • Stream::partitionPersist
  • TridentTopology::addSourcedStateNode
  • TridentTopology::registerSourcedNode

与addNode及addSourcedNode不同的是,addSourcedStateNode返回的是TridentState而非Stream

既然谈到了TridentState就不得不谈到其另一面Stream::stateQuery,

复制代码
public Stream stateQuery(TridentState state, Fields inputFields, QueryFunction function, Fields functionFields) {
        projectionValidation(inputFields);
        String stateId = state._node.stateInfo.id;
        Node n = new ProcessorNode(_topology.getUniqueStreamId(),
                        _name,
                        TridentUtils.fieldsConcat(getOutputFields(), functionFields),
                        functionFields,
                        new StateQueryProcessor(stateId, inputFields, function));
        _topology._colocate.get(stateId).add(n);
        return _topology.addSourcedNode(this, n);
    }
复制代码

从此处可以看出stateQueryNode最起码有两个inputStream,一是从TridentState而来表示状态已经改变,另一个是处于drpcStream这个方面的上一跳结点。

build

TridentTopology::build是将TridentTopology转变为StormTopology的过程,这一过程中最重要的一环就是将_graph中含有的node进行分组。

grouping

算法逻辑概述

  • 将boltNodes中的每个boltNode作为一个group加入全部加入initialGroups
  • 以graph和initialGroups作为入参创建GraphGrouper
  • 分组的过程其实就是进行合并的过程,详见GraphGrouper::mergeFully()
    • 如果从当前group1的输出目的地都是属于group2,则将group1,group2合并
    • 如果当前group1的所有输入源都是来自于group2,则将group1,group2合并
    • 将需要合并的group1,group2作为入参创建新的group,同时将group1,group2从已有的集合出移除
复制代码
   public void mergeFully() {
        boolean somethingHappened = true;
        while(somethingHappened) {
            somethingHappened = false;
            for(Group g: currGroups) {
                Collection<Group> outgoingGroups = outgoingGroups(g);
                if(outgoingGroups.size()==1) {
                    Group out = outgoingGroups.iterator().next();
                    if(out!=null) {
                        merge(g, out);
                        somethingHappened = true;
                        break;
                    }
                }
                
                Collection<Group> incomingGroups = incomingGroups(g);
                if(incomingGroups.size()==1) {
                    Group in = incomingGroups.iterator().next();
                    if(in!=null) {
                        merge(g, in);
                        somethingHappened = true;
                        break;
                    }
                }                
            }
        }
    }
复制代码

GraphGrouper::merge()

复制代码
  private void merge(Group g1, Group g2) {
        Group newGroup = new Group(g1, g2);
        currGroups.remove(g1);
        currGroups.remove(g2);
        currGroups.add(newGroup);
        for(Node n: newGroup.nodes) {
            groupIndex.put(n, newGroup);
        }
    }
复制代码

在group之间添加partitionNode

复制代码
// add identity partitions between groups
        for(IndexedEdge<Node> e: new HashSet<IndexedEdge>(graph.edgeSet())) {
            if(!(e.source instanceof PartitionNode) && !(e.target instanceof PartitionNode)) {                
                Group g1 = grouper.nodeGroup(e.source);
                Group g2 = grouper.nodeGroup(e.target);
                // g1 being null means the source is a spout node
                if(g1==null && !(e.source instanceof SpoutNode))
                    throw new RuntimeException("Planner exception: Null source group must indicate a spout node at this phase of planning");
                if(g1==null || !g1.equals(g2)) {
                    graph.removeEdge(e);
                    PartitionNode pNode = makeIdentityPartition(e.source);
                    graph.addVertex(pNode);
                    graph.addEdge(e.source, pNode, new IndexedEdge(e.source, pNode, 0));
                    graph.addEdge(pNode, e.target, new IndexedEdge(pNode, e.target, e.index));                    
                }
            }
        }
复制代码


_graph中所有的node在变换过后,变成两组元素,一是spoutNodes,另一个是合并后的mergedGroup.

spoutNodes中的每个元素作为spout添加到TridentTopologyBuilder的_spouts数组中,mergedGroup中的每个group添加到TridentTopologyBuilder的_bolt数组中。在TridentTopologyBuilder::build()中最主要的事情是为每个_spouts和_bolts数组中的成员添加grouping关系。

小结

到目前为止,通过两篇文章分析了TridentTopology的创建过程及其运行时在每个TridentBoltExecutor中的消息传递情况。接下来会分析TridentTopology提供的API实现及其作用场景。

目录
相关文章
|
Java 流计算
twitter storm源码走读(二)
storm cluster可以想像成为一个工厂,nimbus主要负责从外部接收订单和任务分配。除了从外部接单,nimbus还要将这些外部订单转换成为内部工作分配,这个时候nimbus充当了调度室的角色。supervisor作为中层干部,职责就是生产车间的主任,他的日常工作就是时刻等待着调度到给他下
2366 0
|
缓存 Java 流计算
twitter storm源码走读(三)
本文重点分析storm的worker进程在正常启动之后有哪些类型的线程,针对每种类型的线程,剖析其用途及消息的接收与发送流程。本文从外部消息在worker进程内部的转化,传递及处理过程入手,一步步分析在worker-data中的数据项存在的原因和意义。试图从代码实现的角度来回答,如果是从头开始实现w
1910 0
|
API 流计算 缓存
twitter storm源码走读(四)
TridentTopology是storm提供的高层使用接口,常见的一些SQL中的操作在tridenttopology提供的api中都有类似的影射。关于TridentTopology的使用及运行原理,当前进行详细分析的文章不多。 本文尝试TridentTopology是如何先一步步转换成普通的sto
2469 0
|
消息中间件 Java 流计算
twitter storm源码走读(一)
本文详细介绍了twitter storm中的nimbus节点的启动场景,分析nimbus是如何一步步实现定义于storm.thrift中的service,以及如何利用curator来和zookeeper server建立通讯。然后尝试分析tuple发送时的两个问题,一是消息在线程间的传递过程及利用
2047 0
|
分布式计算 Java Hadoop
一脸懵逼学习Storm的搭建--(一个开源的分布式实时计算系统)
Storm的官方网址:http://storm.apache.org/index.html 1:集群部署的基本流程(基本套路): 集群部署的流程:下载安装包、解压安装包、修改配置文件、分发安装包、启动集群;  1:安装一个zookeeper集群,之前已经部署过,这里省略,贴一下步骤; 安装配置zooekeeper集群:        1.
1574 0
|
Java 流计算 消息中间件
Twitter Storm中Bolt消息传递路径之源码解读
Bolt作为task被executor执行,而executor是一个个的线程,所以executor必须存在于具体的process之中,而这个process就是worker。至于worker是如何被supervisor创建,尔后worker又如何创建executor线程,这些暂且按下不表。
1987 0
|
流计算
|
Android开发 数据安全/隐私保护 开发者
Twitter模块开发
 Twitter模块开发 关于Twitter这一块,自发这篇博文之后有很多人问我,有的验证成功了不跳转,或者其它原因什么的 =======我看了一下,这篇博文里面有写呀,下面以红色粗体文字注明一下  Twitter和Facebook,就类似于国内的微博,或者分享功...
1109 0