twitter storm源码走读(四)

简介: TridentTopology是storm提供的高层使用接口,常见的一些SQL中的操作在tridenttopology提供的api中都有类似的影射。关于TridentTopology的使用及运行原理,当前进行详细分析的文章不多。 本文尝试TridentTopology是如何先一步步转换成普通的sto

Trident Topology执行过程分析

TridentTopology是storm提供的高层使用接口,常见的一些SQL中的操作在tridenttopology提供的api中都有类似的影射。关于TridentTopology的使用及运行原理,当前进行详细分析的文章不多。

从TridentTopology到vanilla topology(普通的topology)由三个层次组成:

  1. 面向最终用户的概念stream, operation
  2. 利用planner将tridenttopology转换成vanilla topology
  3. 执行vanilla topology

本文尝试TridentTopology是如何先一步步转换成普通的storm Topology(即vanila topology), 转换后的topology的执行中有哪些区别?

 

概述

从TridentTopology到基本的Topology有三层,下图给出一个全局的视图。

创建TridentTopology

下面的代码摘自StormStarter中的TridentWordCount.java

 

复制代码
    TridentTopology topology = new TridentTopology();
    topology.newStream("spout1", spout).parallelismHint(16).each(new Fields("sentence"),
        new Split(), new Fields("word")).groupBy(new Fields("word")).persistentAggregate(new MemoryMapState.Factory(),
        new Count(), new Fields("count")).parallelismHint(16);

    return topology.build();
复制代码

 

上述代码的newStream一行,分两大部分,一是使用newStream来创建一个stream对象,然后针对该Stream进行各种操作,each/shuffle/persistentAggregate等就是各种operation.

用户在使用TridentTopology的时候,只需要熟悉Stream和TridentTopology中的API函数即可。

转换TridentTopology为Vanilla Topology

上一节创建了Stream,但是如何将其与原有的Spout及Bolt联系起来呢?问题的关键就在TridentTopology::build函数和TridentTopologyBuilder::buildTopology

TridentTopology::build

newStream及其后的函数调用创建了一个含有三大类节点的List,利用该List创建了一个有向非循环图(DAG)。这三类节点分别是operation, partition, spout,在build函数将节点分类分别加入到boltNodes或spoutNodes,注意此处的spout或bolt不能等同于普通的spout和bolt.

TridentTopologyBuilder::buildTopology

利用在build函数中创建的boltNodes,spoutNodes及生成的graph来创建vanilla topology所需要的bolt及spout.

在buildTopology中会看到类似的代码片段。

builder.setBolt(spoutCoordinator(id), new TridentSpoutCoordinator(c.commitStateId, (ITridentSpout) c.spout))
                        .globalGrouping(masterCoordinator(c.batchGroupId), MasterBatchCoordinator.BATCH_STREAM_ID)
                        .globalGrouping(masterCoordinator(c.batchGroupId), MasterBatchCoordinator.SUCCESS_STREAM_ID);
builder.setSpout(masterCoordinator(batch), new MasterBatchCoordinator(commitIds, batchesToSpouts.get(batch)));
for(String b: c.committerBatches) {
                specs.get(b).commitStream = new GlobalStreamId(masterCoordinator(b), MasterBatchCoordinator.COMMIT_STREAM_ID);
            }
            
            BoltDeclarer d = builder.setBolt(id, new TridentBoltExecutor(c.bolt, batchIdsForBolts, specs), c.parallelism);

最终生成的普通Topology,与普通Topology中的Spout相对应的是MasterBatchCoordinator,而在创建TridentTopology使用的spout则成了Bolt,使用于Stream上的各种Operation也存在于多个普通Bolt中。

 TridentTopology的执行

TridentTopology被转换为普通的Topology(vanilla Topology)之后提交到nimbus,它的具体执行过程有什么不同呢?

主要有几点:

  1. MasterBatchCoordinator通过Batch_stream_id来发送通知给TridentSpoutExecutor
  2. TridentSpoutExecutor收到通知发送成批的tuple给下一跳的Bolt
  3. 下一跳的Bolt收到tuple之后,使用TridentBoltExecutor来进行处理
    1. TridentBoltExecutor调用SubtopologyBolt::execute
    2. InitialReceiver::execute被调用
    3. TridentProcessor::execute被调用

MasterBatchCoordinator收到ack之后,会发送success消息给Spout

MasterBatchCoordinator在commit的时候,会发送commit消息给Spout,让Spout将缓存的消息删除

trident topology可靠性分析

本文详细分析TridentTopology的可靠性实现, TridentTopology通过transactional spout与transactional state相结合,能够做到tuple“只被处理一次,不多也不少”。也就是做到事务性处理exactly-once,要么成功,要么失败。

而一般的storm topology是无法保证eactly-once的处理的,它们要么是at-least-once(至少被处理一次,有可能被处理多次);要么是at-most-once(最多被处理一次,这样就存在遗漏的可能).

TridentTopology在设计中借鉴和保留了目前已经过期的transactional topology的设计思想。

Storm Topology的ack机制

 

 在进行TridentTopology的可靠性分析之前,我们先回顾一下在storm topology中的ack机制。ack bolt是在提交到storm cluster中,由系统自动产生的,一般来说一个topology只有一个ack bolt(当然可以通过配置参数指定多个)。

当bolt处理并下发完tuple给下一跳的bolt时,会发送一个ack给ack bolt。ack bolt通过简单的异或原理(即同一个数与自己异或结果为零)来判定从spout发出的某一个bolt是否已经被完全处理完毕。如果结果为真,ack bolt发送消息给spout,spout中的ack函数被调用并执行。如果超时,则发送fail消息给spout,spout中的fail函数被调用并执行,spout中的ack和fail的处理逻辑由用户自行填写。

如在github上的kerstel spout就能做到只有当某一个tuple被成功处理之后,它才会从缓存中移除,否则继续放入到处理队列再次进行处理。

TridentTopology的可靠性机制

在“走读之6”一文中分析了一个tridenttopology是如何转换成storm topology的,我想用上面这幅图再次阐述一下转变后的结果。

  • 一个tridenttopoloy会至少引入一个MasterBatchCoordinator,这个MBC就类似于storm topology中的spout
  • newStream时使用的入参spout会裂变成两个bolt,一是TridentSpoutCoordinator,另一个是TridentSpoutExecutor
  • 针对stream的各种操作则被分散到各个Bolt中,它们的执行上下文是TridentBoltExecutor

可以看出使用TridentTopology Api进行操作时,所有的东西其实都运行在bolt context中,而真正的spout是在调用TridentTopologyBuilder.buildTopology()的时候被添加的。

  • MasterBatchCoordinator使用batch_stream发送一个类似于seeder tuple的东西给tridentspoutcoordinator,tridentspoutcoordinator将该信号继续下发给TridentSpoutExecutor, TridentSpout是如何一步步被调用到的呢。
    • TridentBoltExecutor::execute
      •   TridentSpoutExecutor::execute
        •   BatchSpoutExecutor::execute
          •   ITridentSpout::emitBatch

emitBatch是产生真正需要被处理的tuple的,这些tuple会被各个Operation所在的bolt所接收。它们的调用顺序是

  • TridentBoltExecutor::execute
    •   SubtopologyBolt::execute
      •   InitialReceiver::receive
        •   TridentProcessor::execute

处理结束的判断依据

在TridentSpout中是如何判断所有的tuple都已经被处理的呢。

  1. 在每跳中认为自己处理完毕的时候,它都会告诉下一跳,即下游,我给你发送了多少tuple,如果下游将上游发送过来的确认消息与自身确实已经处理的消息比对一致的话,则认为处理都完成,于是发送ack.
  2. 问题的关键变成每一个bolt是如何判断自己已经处理完毕的呢,请看步骤3
  3. 总有一个bolt是没有上游的,即TridentSpoutExecutor,它只会收到启动指令,但不接收真正的业务数据,于是它会告诉下一跳,我发了多少tuple给你。

STREAM

在MasterBatchCoordinator中定义了三种不同的stream,这三种stream分别是

  1. BATCH_STREAM
  2. COMMIT_STREAM
  3. SUCCESS_STREAM

这些stream分别在什么时候被使用呢,下图给出一个大概的时序

简要说明:

  1. masterbatchcoordinator通过batch_stream发送seeder tuple给tridentspoutcoordinator
  2. tridentspoutcoordinator给tridentspoutexecutor继续传递该指令
  3. TridentSpoutExecutor在收到启动指令后,调用ITridentSpout接口的实现类进行emitBatch
  4. TridentSpoutExecutor在发送完一批batch后,finishBatch被调用,通过emitDirect会给下一跳通过coord_stream发送trackedinfo,即我已经发送了多少消息给你
  5. TridentSpoutExecutor紧接着还会给ack bolt发送ack消息,ack bolt将其传达到MasterBatchCoordinator
  6. MasterBatchCoordinator在收到第一个ack后,将状态置为processed
  7. 当MasterBatchCoordinator再次收到ack后,会将状态转为committing,同时通过commit_stream发送tuple给TridentSpoutExecutor
  8. 收到commit_stream上传来的tuple后,TridentSpoutExecutor会调用ITridentSpout中的emmitter, emmitter::commit()被执行,TridentSpoutExecutor会再次ack收到tuple
  9. MasterBatchCoordinator在收到这个tuple之后,会认为针对某一个seeder tuple的处理已经完全实现,于是通过SUCCESS_STREM告知TridentSpoutCoordinator,所有的活都已经都完成了,收工。
  10. 收到Success_stream上传来的信号后,ITridentSpout中的内嵌子类Emmit和Coordinator中相应的success方法会被调用执行。

注意:

  1. 为了描述方便,将TridentTopology进行了简化,认为其在转换成真正的storm topology时,只有一个TridentProcessor所在的bolt。真实的情况可能比这复杂,但消息的传递路径还是差不多的。
  2. 注意在TridentTopology中ack会被多次反复调用,这不同于普通的storm topology

状态机

在MasterBatchCoordinator中,针对每一个seeder tuple,其状态机如下图所示。注意这些状态是会被保存到zookeeper server中的,使用的api定义在TransactionalState中。

总结

通过上面的分析可以看出,TridentTopology实现了一个比较好的框架,但真正要做到exactly-once的处理,还需要用户自己去实现ITridentSpout中的两个重要内嵌类,Emmitter和Coordinator。

具体如何实现该接口,可以查看storm-core/src/jvm/storm/trident/testing目录下的FixedBatchSpout.java和FeederCommitterBatchSpout.java

相关实践学习
通过Ingress进行灰度发布
本场景您将运行一个简单的应用,部署一个新的应用用于新的发布,并通过Ingress能力实现灰度发布。
容器应用与集群管理
欢迎来到《容器应用与集群管理》课程,本课程是“云原生容器Clouder认证“系列中的第二阶段。课程将向您介绍与容器集群相关的概念和技术,这些概念和技术可以帮助您了解阿里云容器服务ACK/ACK Serverless的使用。同时,本课程也会向您介绍可以采取的工具、方法和可操作步骤,以帮助您了解如何基于容器服务ACK Serverless构建和管理企业级应用。 学习完本课程后,您将能够: 掌握容器集群、容器编排的基本概念 掌握Kubernetes的基础概念及核心思想 掌握阿里云容器服务ACK/ACK Serverless概念及使用方法 基于容器服务ACK Serverless搭建和管理企业级网站应用
目录
相关文章
|
8月前
|
机器学习/深度学习 分布式计算 BI
Flink实时流处理框架原理与应用:面试经验与必备知识点解析
【4月更文挑战第9天】本文详尽探讨了Flink实时流处理框架的原理,包括运行时架构、数据流模型、状态管理和容错机制、资源调度与优化以及与外部系统的集成。此外,还介绍了Flink在实时数据管道、分析、数仓与BI、机器学习等领域的应用实践。同时,文章提供了面试经验与常见问题解析,如Flink与其他系统的对比、实际项目挑战及解决方案,并展望了Flink的未来发展趋势。附带Java DataStream API代码样例,为学习和面试准备提供了实用素材。
516 0
|
SQL 资源调度 前端开发
深度剖析Dinky源码(下)
深度剖析Dinky源码(下)
365 0
|
资源调度 前端开发 Java
深度剖析Dinky源码(上)
深度剖析Dinky源码
461 0
|
机器学习/深度学习 Java 程序员
Flink处理函数实战之三:KeyedProcessFunction类
通过实战学习和了解处理函数的KeyedProcessFunction类
141 1
Flink处理函数实战之三:KeyedProcessFunction类
|
SQL Web App开发 JSON
深度剖析Dinky源码
深度剖析Dinky源码
916 0
|
流计算
|
存储 NoSQL 算法
【Storm】Storm实战之频繁二项集挖掘(附源码)
针对大叔据实时处理的入门,除了使用WordCount示例之外,还需要相对更深入点的示例来理解Storm,因此,本篇博文利用Storm实现了频繁项集挖掘的案例,以方便更好的入门Storm。
120 0
【Storm】Storm实战之频繁二项集挖掘(附源码)
|
分布式计算 Java Hadoop
一脸懵逼学习Storm的搭建--(一个开源的分布式实时计算系统)
Storm的官方网址:http://storm.apache.org/index.html 1:集群部署的基本流程(基本套路): 集群部署的流程:下载安装包、解压安装包、修改配置文件、分发安装包、启动集群;  1:安装一个zookeeper集群,之前已经部署过,这里省略,贴一下步骤; 安装配置zooekeeper集群:        1.
1559 0