Storm入门之第五章Bolts

简介:

本文翻译自《Getting Started With Storm》  译者:吴京润   编辑:方腾飞

第5章 Bolts

正如你已经看到的,bolts是一个Storm集群中的关键组件。你将在这一章学到bolt生命周期,一些bolt设计策略,以及几个有关这些内容的例子。

Bolt生命周期

Bolt是这样一种组件,它把元组作为输入,然后产生新的元组作为输出。实现一个bolt时,通常需要实现IRichBolt接口。Bolts对象由客户端机器创建,序列化为拓扑,并提交给集群中的主机。然后集群启动工人进程反序列化bolt,调用prepare,最后开始处理元组。

NOTE:要创建一个bolt对象,它通过构造器参数初始化成员属性,bolt被提交到集群时,这些属性值会随着一起序列化。

Bolt结构

Bolts拥有如下方法:


declareOutputFields(OutputFieldsDeclarer declarer)
    为bolt声明输出模式
prepare(java.util.Map stormConf, TopologyContext context, OutputCollector collector)
    仅在bolt开始处理元组之前调用
execute(Tuple input)
    处理输入的单个元组
cleanup()
    在bolt即将关闭时调用


下面看一个例子,在这个例子中bolt把一句话分割成单词列表:


class SplitSentence implements IRichBolt {
    private OutputCollector collector;
    publlic void prepare(Map conf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
    }

    public void execute(Tuple tuple) {
        String sentence = tuple.getString(0);
        for(String word : sentence.split(" ")) {
            collector.emit(new Values(word));
        }
    }

    public void cleanup(){}

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }
}


正如你所看到的,这是一个很简单的bolt。值得一提的是在这个例子里,没有消息担保。这就意味着,如果bolt因为某些原因丢弃了一些消息——不论是因为bolt挂了,还是因为程序故意丢弃的——生成这条消息的spout不会收到任何通知,任何其它的spoutsbolts也不会收到。

然而在许多情况下,你想确保消息在整个拓扑范围内都被处理过了。

可靠的bolts和不可靠的bolts

正如前面所说的,Storm保证通过spout发送的每条消息会得到所有bolt的全面处理。基于设计上的考虑,这意味着你要自己决定你的bolts是否保证这一点。

拓扑是一个树型结构,消息(元组)穿过其中一条或多条分支。树上的每个节点都会调用ack(tuple)fail(tuple),Storm因此知道一条消息是否失败了,并通知那个/那些制造了这些消息的spout(s)。既然一个Storm拓扑运行在高度并行化的环境里,跟踪始发spout实例的最好方法就是在消息元组内包含一个始发spout引用。这一技巧称做锚定(译者注:原文为Anchoring)。修改一下刚刚讲过的SplitSentence,使它能够确保消息都被处理了。


class SplitSentence implenents IRichBolt {
    private OutputCollector collector;

    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
    }

    public void execute(Tuple tuple) {
        String sentence = tuple.getString(0);
        for(String word : sentence.split(" ")) {
            collector.emit(tuple, new Values(word));
        }
        collector.ack(tuple);
    }

    public void cleanup(){}

    public void declareOutputFields(OutputFieldsDeclarer declarer){
        declar.declare(new Fields("word"));
    }
}


锚定发生在调用collector.emit()时。正如前面提到的,Storm可以沿着元组追踪到始发spoutcollector.ack(tuple)collector.fail(tuple)会告知spout每条消息都发生了什么。当树上的每条消息都已被处理了,Storm就认为来自spout的元组被全面的处理了。如果一个元组没有在设置的超时时间内完成对消息树的处理,就认为这个元组处理失败。默认超时时间为30秒。

NOTE:你可以通过修改Config.TOPOLOGY_MESSAGE_TIMEOUT修改拓扑的超时时间。

当然了spout需要考虑消息的失败情况,并相应的重试或丢弃消息。

NOTE:你处理的每条消息要么是确认的(译者注:collector.ack())要么是失败的(译者注:collector.fail())。Storm使用内存跟踪每个元组,所以如果你不调用这两个方法,该任务最终将耗尽内存。

多数据流

一个bolt可以使用emit(streamId, tuple)把元组分发到多个流,其中参数streamId是一个用来标识流的字符串。然后,你可以在TopologyBuilder决定由哪个流订阅它。

多锚定

为了用bolt连接或聚合数据流,你需要借助内存缓冲元组。为了在这一场景下确保消息完成,你不得不把流锚定到多个元组上。可以向emit方法传入一个元组列表来达成目的。


...
List anchors = new ArrayList();
anchors.add(tuple1);
anchors.add(tuple2);
collector.emit(anchors, values);
...


通过这种方式,bolt在任意时刻调用ackfail方法,都会通知消息树,而且由于流锚定了多个元组,所有相关的spout都会收到通知。

使用IBasicBolt自动确认

你可能已经注意到了,在许多情况下都需要消息确认。简单起见,Storm提供了另一个用来实现bolt的接口,IBasicBolt。对于该接口的实现类的对象,会在执行execute方法之后自动调用ack方法。


class SplitSentence extends BaseBasicBolt {
    public void execute(Tuple tuple, BasicOutputCollector collector) {
        String sentence = tuple.getString(0);
        for(String word : sentence.split(" ")) {
            collector.emit(new Values(word));
        }
}

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }
}


NOTE:分发消息的BasicOutputCollector自动锚定到作为参数传入的元组。 

相关实践学习
通过Ingress进行灰度发布
本场景您将运行一个简单的应用,部署一个新的应用用于新的发布,并通过Ingress能力实现灰度发布。
容器应用与集群管理
欢迎来到《容器应用与集群管理》课程,本课程是“云原生容器Clouder认证“系列中的第二阶段。课程将向您介绍与容器集群相关的概念和技术,这些概念和技术可以帮助您了解阿里云容器服务ACK/ACK Serverless的使用。同时,本课程也会向您介绍可以采取的工具、方法和可操作步骤,以帮助您了解如何基于容器服务ACK Serverless构建和管理企业级应用。 学习完本课程后,您将能够: 掌握容器集群、容器编排的基本概念 掌握Kubernetes的基础概念及核心思想 掌握阿里云容器服务ACK/ACK Serverless概念及使用方法 基于容器服务ACK Serverless搭建和管理企业级网站应用
目录
相关文章
|
流计算
106 Storm编程模型
106 Storm编程模型
45 0
|
大数据 数据挖掘
大数据框架原理简介(2)
大数据框架原理简介(2)
142 0
大数据框架原理简介(2)
|
消息中间件 分布式计算 Java
一文搞懂Kafka核心基础知识
Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统、低延迟的实时系统、Storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等,用scala语言编写,Linkedin于2010年贡献给了Apache基金会并成为顶级开源 项目。Kafka 官网的下载地址是 https://kafka.apache.org/downloads ;打开下载页面后我们可以看 到不同版本的
|
jstorm 分布式计算 Java
storm笔记:storm基本概念
本文主要介绍storm中的基本概念,从基础上了解strom的体系结构,便于后续编程过程中作为基础指导。主要的概念包括
342 0
storm笔记:storm基本概念
|
消息中间件 IDE Java
Flink实战(三) - 编程范式及核心概念(一)
Flink实战(三) - 编程范式及核心概念(一)
157 0
Flink实战(三) - 编程范式及核心概念(一)
|
分布式计算 Java API
Flink实战(三) - 编程范式及核心概念(三)
Flink实战(三) - 编程范式及核心概念(三)
125 0
Flink实战(三) - 编程范式及核心概念(三)
|
分布式计算 Java Hadoop
Flink实战(三) - 编程范式及核心概念(四)
Flink实战(三) - 编程范式及核心概念(四)
157 0
Flink实战(三) - 编程范式及核心概念(四)
|
存储 分布式计算 Java
Flink实战(三) - 编程范式及核心概念(二)
Flink实战(三) - 编程范式及核心概念(二)
161 0
Flink实战(三) - 编程范式及核心概念(二)
|
SQL 大数据 OLAP
大数据框架原理简介(4)
大数据框架原理简介(4)
134 0
下一篇
无影云桌面