Storm的数据处理编程单元:Bolt 学习整理

简介:   Bolt是Topology中的数据处理的单元,也是Storm针对处理过程的编程单元。Topology中所有的处理都是在这些Bolt中完成的,编程人员可以实现自定义的处理过程,例如,过滤、函数、聚集、连接等计算。

  Bolt是Topology中的数据处理的单元,也是Storm针对处理过程的编程单元。Topology中所有的处理都是在这些Bolt中完成的,编程人员可以实现自定义的处理过程,例如,过滤、函数、聚集、连接等计算。如果是复杂的计算过程,往往需要多个步骤和使用多个Bolt。

  Bolt可以将数据项发送至多个数据流(Stream)。编程人员首先可以使用OutputFieldsDeclarer类的declareStream()方法来声明多个流,指定数据将要发送到的流,然后使用SpoutOutputCollector的emit方法将数据发送。

  当声明了一个Bolt的输入流后,可以从其他的组件中接收这些指定的流。当接收某个组件的所有流时,需要在程序中逐个声明接收的过程。InputDeclarer对象默认接收来自某组件默认的流。

//从名称为"1"的组件中接收默认的流。
declarer.shuffleGrouping("1")

 

IBolt 和 IComponent接口

IBolt接口:

//在组件的任务初被初始化时,由集群中的工作进程(worker)调用,prepare()用于实例化Bolt的已给运行时任务,被集群中的某一个进程调用,提供Bolt运行的环境。
//sormConf对象维护Storm中针对该Bolt的配置信息。(来自Topology);context对象是一个上下文对象,用于获取该组件运行时任务的信息。(例如Topology中该Bolt所有任务的位置,包括任务的id、组件id和输入输出信息等)
//collector对象用于从该Bolt发送数据项。数据项可以在任意时刻发送,包括调用open()和close()方法。
void prepare(java.util.Map stormConf,TopologyContext context,OutputCollector collector) //接收一个数据项并处理
//该方法用来接收一个数据项(Tuple),并可以将处理的结果作为新的数据项发送(emit),是Bolt需要实现的最重要的方法。
//参数imput是一个数据项对象,包含了众多的元数据(metadata),包括它来自的组件、流、任务等。数据项中的值,可以通过Tuple类的getValue()方法获得。
void execute(Tuple input) //在IBolt将关闭时调用 void cleanup()

Tuple类的方法,这个类的对象作为execute()方法的输入。(方法举例: int size() ; int fieldIndex(java.lang.String field) ; ......)

方法众多,可以整理分为以下五类:

1、获取属性的方法。 (size()、fieldIndex()和contains()三个方法)

2、获取元数据的方法。(getMessageId()、getSourceComponent()、getSourceTask()、getSourceStreamId()和getSourceGlobalStreamid()方法)

其中MessageId是在数据项被创建时,通过一定的规则赋值的。

3、根据域获取值的方法。(getValue()和多个get具体数据类型的方法)

4、根据域的名称获取值的方法。(这一类包括getFields()、getValues()和select()方法)

5、获取Tuple的值或域列表的方法。(getFields()、getValues()和select()方法)

分别获取该数据项的所有域列表、值列表和值列表子集。

 

简单的案例:

class SplitSentence implements IRichBolt {
    private OutputCollector collector;

    public void prepare(Map conf,TopologyContext context,OutputCollector collector){
    this.collector = collector;
    }
    
    public void execute(Tuple tuple){
    String sentence = tuple.getString(0);
    for(String word : sentence.split(" "){
        collector.emit(new Values(word));
    }
}

public void cleanup(){
}

public void declareOutputFields(OutpuFieldsDeclarer declarer){
    declarer.declare(new Fields("word"));
    }
}

这里说下declareOutputFields()函数参数,声明了输出流的数据项的结构,也即Tuple的域。

结合上节给的Spout的示例,可以在Topology类的main函数中加入相关代码,增加Bolt。

Topology builder builder = new TopologyBuilder();
Builder.SetSpout ("SentenceGenSpout ",new TestWord Spout(),1);
builder.setBolt("splitBoult",new SplitSentence(),2).shuffleGrouping("sentenceGenSpout");

 

目录
相关文章
|
7月前
|
存储 分布式计算 监控
Hadoop【基础知识 01+02】【分布式文件系统HDFS设计原理+特点+存储原理】(部分图片来源于网络)【分布式计算框架MapReduce核心概念+编程模型+combiner&partitioner+词频统计案例解析与进阶+作业的生命周期】(图片来源于网络)
【4月更文挑战第3天】【分布式文件系统HDFS设计原理+特点+存储原理】(部分图片来源于网络)【分布式计算框架MapReduce核心概念+编程模型+combiner&partitioner+词频统计案例解析与进阶+作业的生命周期】(图片来源于网络)
339 2
|
7月前
|
分布式计算 监控 Hadoop
Hadoop【基础知识 02】【分布式计算框架MapReduce核心概念+编程模型+combiner&partitioner+词频统计案例解析与进阶+作业的生命周期】(图片来源于网络)
【4月更文挑战第3天】Hadoop【基础知识 02】【分布式计算框架MapReduce核心概念+编程模型+combiner&partitioner+词频统计案例解析与进阶+作业的生命周期】(图片来源于网络)
324 0
|
7月前
|
机器学习/深度学习 分布式计算 BI
Flink实时流处理框架原理与应用:面试经验与必备知识点解析
【4月更文挑战第9天】本文详尽探讨了Flink实时流处理框架的原理,包括运行时架构、数据流模型、状态管理和容错机制、资源调度与优化以及与外部系统的集成。此外,还介绍了Flink在实时数据管道、分析、数仓与BI、机器学习等领域的应用实践。同时,文章提供了面试经验与常见问题解析,如Flink与其他系统的对比、实际项目挑战及解决方案,并展望了Flink的未来发展趋势。附带Java DataStream API代码样例,为学习和面试准备提供了实用素材。
509 0
|
7月前
|
SQL 数据处理 API
ForwardedFields:流处理中的重要概念
ForwardedFields:流处理中的重要概念
51 4
|
7月前
|
传感器 JSON Java
流计算中的流式图处理是什么?请解释其作用和常用操作。
流计算中的流式图处理是什么?请解释其作用和常用操作。
72 0
|
7月前
|
机器学习/深度学习 分布式计算 Hadoop
通过比喻理解-MapReduce的数据处理流程
通过比喻理解-MapReduce的数据处理流程
86 0
|
7月前
|
存储 传感器 数据挖掘
什么是流计算?请简要解释其概念和特点。
什么是流计算?请简要解释其概念和特点。
227 0
|
7月前
|
消息中间件 分布式计算 Java
流计算与批处理的区别是什么?请举例说明。
流计算与批处理的区别是什么?请举例说明。
118 0
|
流计算
106 Storm编程模型
106 Storm编程模型
51 0
|
消息中间件 存储 缓存
Storm核心组件、编程模型
Storm核心组件、编程模型
219 0