开发者社区> yichudu> 正文

Storm (实时分布式大数据处理系统) 简介

简介: 相比Hadoop的批处理,Storm的特点就是实时性。 组件 Storm集群主要由一个主节点和一群工作节点(worker node)组成,通过 Zookeeper进行协调。主节点 主节点通常运行一个后台程序 —— Nimbus,用于响应分布在集群中的节点,分配任务和监测故障。这个很类似于Hadoop中的Job Tracker。工作节点 工作节点叫worker,一般就是集群中的一个节
+关注继续查看

相比Hadoop的批处理,Storm的特点就是实时性。

组件

Storm集群主要由一个主节点和一群工作节点(worker node)组成,通过 Zookeeper进行协调。

主节点
主节点通常运行一个后台程序 —— Nimbus,用于响应分布在集群中的节点,分配任务和监测故障。这个很类似于Hadoop中的Job Tracker。

工作节点
工作节点叫worker,一般就是集群中的一个节点,也就是一个计算机。它同样会运行一个后台程序 ——Supervisor,用于收听工作指派并基于要求运行工作进程。每个工作节点都是topology中一个子集的实现。而Nimbus和Supervisor之间的协调则通过Zookeeper系统或者集群。

topology
[tə'pɒlədʒɪ]
topology是一组由Spouts(数据源)和Bolts(数据操作)通过Stream Groupings进行连接的图。也就是我们的整个应用程序。

Zookeeper
Zookeeper是完成Supervisor和Nimbus之间协调的服务。而应用程序实现实时的逻辑则被封装进Storm中的“topology”。

Spout
[spaʊt]  n. 喷水口
简而言之,Spout从来源处读取数据并放入topology。Spout分成可靠和不可靠两种;当Storm接收失败时,可靠的Spout会对tuple(元组,数据项组成的列表)进行重发;而不可靠的Spout不会考虑接收成功与否只发射一次。而Spout中最主要的方法就是nextTuple(),该方法会发射一个新的tuple到topology,如果没有新tuple发射则会简单的返回。

Bolt
[bəʊlt] n.门闩

Topology中所有的处理都由Bolt完成。Bolt可以完成任何事,比如:连接的过滤、聚合、访问文件/数据库、等等。Bolt从Spout中接收数据并进行处理,如果遇到复杂流的处理也可能将tuple发送给另一个Bolt进行处理。而Bolt中最重要的方法是execute(),以新的tuple作为参数接收。不管是Spout还是Bolt,如果将tuple发射成多个流,这些流都可以通过declareStream()来声明。

并行参数

worker,进程数。

Executor,线程总数。

Task,具体的spout和bolt的实例个数。一个Executor可以负责1个或多个task。一般地,task数等于executor数。

tuple分组策略

分布式处理的主要思想就是把大的任务划分成多个子任务,它们在不同的机器或线程中并行处理,最后汇合。以统计班上同学的平均成绩为例,说明问题。
微笑bolt如何设计?
因为分布式的原因,同一bolt类的多个线程之间不能有数据共享。分十个机器,编号为i的机器统计学号末尾为i的同学数和他们的总成绩,最后汇总。
微笑如何控制每一个bolt实例接收哪些学号的数据呢?
结合上文,编号为i的bolt实例只接受学号末尾为i的数据。可以按照tuple的字段分组。

tuple传递是通过序列化,套接字传输,反序列化实现的。
所谓的grouping策略就是在Spout与Bolt、Bolt与Bolt之间传递Tuple的方式。总共有七种方式:
1)shuffleGrouping(随机分组)
2)fieldsGrouping(按照字段分组,在这里即是同一个单词只能发送给一个Bolt)

//declare与emit是一一对应的。
declarer.declareStream("http", new Fields("event", "key"));
collector.emit("http", new Values(new MyClass(), key));
//设置bolt的时候指定按照哪个字段散列
builder.setBolt("httpCount", new AnalyseBolt()).fieldsGrouping("XX", "http", new Fields("key"));
//接收到tuple时按指定字段取值
public void process(Tuple tuple, BasicOutputCollector collector) {
	tuple.getValueByField("event") instanceof MyClass
}	

3)allGrouping(广播发送,即每一个Tuple,每一个Bolt都会收到)
4)globalGrouping(全局分组,将Tuple分配到task id值最低的task里面)
5)noneGrouping(随机分派)
6)directGrouping(直接分组,指定Tuple与Bolt的对应发送关系)
7)Local or shuffle Grouping

8)customGrouping (自定义的Grouping)

消息可靠性交付

要理解这个问题,需要看一下tuple在离开spout之后的生命周期。作为参考,下面是spout实现的接口

public interface ISpout extendsSerializable {
         void	open(Map conf, TopologyContext context,SpoutOutputCollector collector);
         void	close();
         void	nextTuple();
         void	ack(Object msgId);
         void	fail(Object msgId);
}

首先,Storm会通过Spout的nextTuple()方法从Spout申请一个tuple。在open方法中,Spout使用此方法提供的SpoutOutputCollector去发射一个tuple到输出streams中去。当发射一个tuple时,Spout会提供一个“message id”,用来后面区分不同的tuple。例如, KestrelSpout从kestrel队列中读取消息,然后在发射时会将Kestrel为消息提供的id作为“message id”。发射一条消息到SpoutOutputCollector,如下所示:
_collector.emit(newValues("field1", "field2", 3), msgId);
然后,这个tuple会发送到消费bolts,同时Storm会跟踪已被创建的消息树状图。如果Storm检测到一个tuple已被“fully processed”, Storm将会原始的Spout task(即发射这个tuple的Spout)上调用ack()方法,参数msgId就是这个Spout提供给Storm的“message id”。类似的,如果这个tuple超时了, Storm会在原始的Spout task上调用fail()方法。注意, 一个tuple只能被创建它的Spouttask进行acked或者failed。因此,即使一个Spout在集群上正在执行很多tasks,一个tuple也只能被创建它的task进行acked或failed,而其他的task则不行。

storm UI

storm自带的一个仪表盘,见图1.


图1 storm自带的一个仪表盘

emitted,发射的tuple数。
transferred,若一个tuple被其他bolt读取,则transferred+1。
acked,tuple被完整处理。
failed,在处理过程中出现错误或超时的tuple数。超时参数可以设置。


版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

相关文章
史上最快! 10小时大数据入门实战(三)-分布式文件系统HDFS
HDFS 环境搭建 HDFS 伪分布式环境搭建 CentOS 环境安装步骤 MacOS安装环境 安装jd...
1474 0
突破Java面试(15)-分布式搜索引擎Elastic Search的工作流程
面试官就是想看看你是否了解ES的一些基本原理. ES无非就是写/查数据,你如果不明白你发起写入/搜索请求后,ES做了什么,那你该劝退了.
3046 0
Can't load AMD 64-bit .dll on a IA 32-bit platform错误消息的处理
Can't load AMD 64-bit .dll on a IA 32-bit platform错误消息的处理
17 0
实战 | canal 实现Mysql到Elasticsearch实时增量同步
关系型数据库Mysql/Oracle增量同步Elasticsearch是持续关注的问题,也是社区、QQ群等讨论最多的问题之一。 问题包含但不限于: 1、Mysql如何同步到Elasticsearch? 2、Logstash、kafka_connector、canal选型有什么不同,如何取舍? 3、能实现同步增删改查吗? … 本文给出答案。
76 0
Can't load AMD 64-bit .dll on a IA 32-bit platform错误消息的处理
Can't load AMD 64-bit .dll on a IA 32-bit platform错误消息的处理
30 0
分布式系统架构中高可用方案技术选型:Hystrix 框架实现服务保护使用详解
本篇文章介绍了分布式系统架构下的经典的高可用框架Hsytrix实现的断路器,服务降级,服务熔断,服务隔离以及解决服务雪崩效应的问题。详细分析了基于Hystrix框架解决分布式系统中服务雪崩效应的几种机制,以及在项目中如何集成Hystrix框架并且在项目中使用Hystrix框架实现分布式系统中的服务治理。
319 0
+关注
619
文章
1
问答
文章排行榜
最热
最新
相关电子书
更多
OceanBase 入门到实战教程
立即下载
阿里云图数据库GDB,加速开启“图智”未来.ppt
立即下载
实时数仓Hologres技术实战一本通2.0版(下)
立即下载