颠覆大数据分析之Storm简介

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介:

颠覆大数据分析之Storm简介

译者:吴京润    购书

之前我们已经极为简单的介绍了Storm。现在我们要对它做一个更详细的了解。Storm是一个复杂事件处理引擎(CEP),最初由Twitter实现。在实时计算与分析领域,Storm正在得到日益广泛的应用。Storm可以辅助基本的流式处理,例如聚合数据流,以及基于数据流的机器学习(译者注:原文是ML,根据上下文判断,此处应是指机器学习,下文相同不再缀述)。通常情况,数据分析(译者注:原文为prestorage analytics,意义应是保存分析结果之前的分析计算)在Storm之上进行,然后把结果保存在NOSQL或关系数据库管理系统(RDBMSs)。以气象频道为例,使用Storm以并行方式处理大数据集(译者注:原文用到munging,意义应是洗数据)并为离线计算持久化它们。

下面是一些公司使用Storm的有趣方式:

  • Storm用于持续计算,p并把处理过的数据传输给一个可视化引擎。Data Salt,一个先行者,使用Storm处理大容量数据源。Twitter采用相同的方式,将Storm作为它的发布者分析产品的基础。
  • Groupon也采用Storm实现了低延迟、高吞吐量的数据处理。
  • Yahoo采用Storm作为CEP每天处理数以亿计的事件。他们还把Storm整合进了0和Hadoop YARN,以便Storm能够弹性的使用集群资源,以及更易于使用HBase和Hadoop生态系统中的其它组件。
  • Infochimps采用Storm-Kafka加强他们的数据交付云服务。
  • Storm还被Cerner公司用于医疗领域,用来处理增量更新,并低延迟的把它们保存在HBase,有效的运用Storm作为流式处理引擎和Hadoop作为批处理引擎。
  • Impetus将Storm与Kafka结合,运行机器学习算法,探索制造业的故障模式。他们的客户是一家大型的电子一站式服务商。他们运行分类算法,依据日志实时探测故障,识别故障根源。这是一个更一般的用例:日志实时分析。
  • Impetus还利用Storm在一个分布式系统中构建实时索引。这个系统非常强大,因为它搜索过程几乎是瞬时的。

数据流

Storm的一个基本概念是数据流,它可以被定义为无级的无界序列。Storm只提供多种去中心化且容错的数据流转换方式。流的模式可以指定它的数据类型为以下几种之一:整型、布尔型、字符串、短整型、长整型、字节、字节数组等等。类OutputFieldsDeclarer用来指定流的模式。还可以使用用户自定义类型,这种情况下,用户可能需要提供自定义序列化程序。一旦声明了一个数据流,它就有一个ID,并有一个默认类型的默认值。

拓扑

在Storm内部,数据流的处理由Storm拓扑完成。拓扑包含一个spout,数据源;bolt,负责处理来自spout和其它bolt的数据。目前已经有各种spout,包括从Kafka读取数据的spout(LinkedIn贡献的分布式发布-订阅系统),Twitter API的spout,Kestrel队列的spout,甚至还有从像Oracle这样的关系数据库读取数据的spoutspout可以是可靠的,一旦数据处理失败,它会重新发送数据流。不可靠的spout不跟踪流的状态,不会在失败时重新发送数据。Spout的一个重要方法是nextTuple——它返回下一条待处理的元组。还有两个分别是ack和fail,分别在流被处理成功或处理不成功时调用。Storm的每个spout必须实现IRichSpout接口。Spout可能会分发多个数据流作为输出。

拓扑中的另一个重要的实体是boltbolt执行数据流转换,包括比如计算、过滤、聚合、连接。一个拓扑可以有多个bolt,用来完成复杂的转换和聚合。在声明一个bolt的输入流时,必须订阅其它组件(要么是spout要么是其它bolt)的特定数据流。通过InputDeclarer类和基于数据流组的适当方法完成订阅,这个方法针对数据流组做了简短说明。

execute方法是bolt的一个重要方法,通过调用它处理数据。它从参数接收一个新的数据流,通过OutputCollector分发新的元组。这个方法是线程安全的,这意味着bolt可以是多线程的。bolt必须实现IBasicBolt接口,这个接口提供了ack方法的声明,用来发送确认通知。

Storm集群

一个Storm集群由主节点和从节点构成。主节点通常运行着Nimbus守护进程。Storm已经实现了在Hadoop YARN之上运行——它可以请求YARN的资源管理器额外启动一个应用主节点的守护进程。Nimbus守护进程负责在集群中传输代码,分派任务,监控集群健康状态。在YARN之上实现的Storm可以与YARN的资源管理器配合完成监控及分派任务的工作。

每个从节点运行一个叫做supervisor的守护进程。这是一个工人进程,负责执行拓扑的一部分工作。一个典型的拓扑由运行在多个集群节点中的进程组成。supervisor接受主节点分派的任务后启动工人进程处理。

主从节点之间的协调通讯由ZooKeeper集群完成。(ZooKeeper是一个apache的分布式协作项目,被广泛应用于诸如Storm,Hadoop YARN,以及Kafka等多个项目中。)集群状态由ZooKeeper集群维护,确保集群可恢复性,故障发生时可选举出新的主节点,并继续执行拓扑。

拓扑本身是由spoutsbolts,以及它们连接在一起的方式构成的图结构。它与Map-Reduce任务的主要区别在于,MR任务是短命的,而Storm拓扑一直运行。Storm提供了杀死与重启拓扑的方法。

简单的实时计算例子

一个Kafka spout就是下面展示的样子:

Kafka Spout的open()方法:

1 public void open(Map conf, TopologyContext context, SpoutOutputCollector collector){
2     _collector = collector;
3    _rand = new Random();
4 }

Kafka Spout的nextTuple()方法:

1 public void nextTuple() {
2     KafkaConsumer consumer = new KafkaConsumer(kafkaServerURL, kafkaTopic);
3     ArrayList<String> input_data = consumer.getKafkaStreamData();
4     while(true) {
5        for(String inputTuple : input_data){
6            _collector.emit(new Values(inputTuple));
7        }
8     }
9 }

KafkaConsumer类来自开源项目storm-kafka:https:// github.com/nathanmarz/ storm-contrib/tree/master/storm-kafka。

01 public void prepare(Map stormConf, TopologyContext context){
02 //创建输出日志文件,记录输出结果日志
03  
04     try{
05         String logFileName = logFileLocation;
06         //"file"与"outputFile"已作为类属性定义
07         file = new FileWriter(logFileName);
08         outputFile = new PrintWriter(file);
09         outputFile.println("In the prepare() method of bolt");
10     } catch (IOException e){
11         System.out.println("an exception has occured");
12         e.printStackTrace();
13     }
14 }
15 public void execute(Tuple input, BasicOutputCollector collector){
16     //从元组取得要处理的字符串
17     String inputMsg=input.getString(0);
18     inputMsg=inputMsg = "I am a bolt";
19     outputFile.println("接收的消息:" + inputMsg);
20     outputFile.flush();
21     collector.emit(tuple(inputMsg));
22 }

前面创建的spout与这个bolt连接,这个bolt向数据流的字符串域添加一条消息:我是一个bolt。前文显示的就是这个bolt的代码。接下来的代码是构建拓扑的最后一步。它显示了spoutbolts连接在一起构成拓扑,并运行在集群中。

01 public static void main(String[] args){
02     int numberOfWorkers = 2;
03     //拜年中的工人进程数量
04     int numberOfExecutorsSpout = 1;
05     //spout 执行者数量
06     int numberOfExecutorsBolt = 1;
07     //bolt执行者数量
08     String numbersHost = "192.168.0.0";
09     // Storm集群中运行Nimbus的节点IP
10     TopologyBuilder builder = new TopologyBuilder();
11  
12     Config conf = new Config();
13     builder.setSpout("spout", new TestSpout(false), numberOfExecutorsSpout);
14     //set the spout for the topology
15     builder.setBolt("bolt",new TestBolt(), numberOfExecutorsBolt).shuffleGrouping("spout");
16     //set the bolt for the topology
17     //启动远程 Storm集群的配置
18     conf.setNumWorkers(numberOfWorkers);
19     conf.put(Config.NIMBUS_HOST,nimbusHost);
20     conf.put(Config.NIMBUS_THRIFT_PORT, 6627L);
21  
22     //远程Storm集群配置
23     try{
24         StormSubmitter.submitTopology("testing_topology", conf, builder.createTopology());
25     } catch (AlreadyAliveException e){
26         System.out.println("Topology with the Same name is already running on the cluster.");
27         e.printStackTrace();
28     } catch (InvalidTopologyException e) {
29         System.out.println("Topology seems to be invalid.");
30         e.printStackTrace();
31     }
32 }

数据流组

spoutbolt都可能并行执行多个任务。必须有一种方法指定哪个数据流路由到哪个spout/bolt。数据流组用来指定一个拓扑内必须遵守的路由过程。下面是Storm内建数据流组:

  • 随机数据流组:随机分发数据流,不过它确保所有任务都可得到相同数量的数据流。
  • 域数据流组:基于元组中域的数据流组。比如,有一个machine_id域,拥有相同machine_id域的元组由相同的任务处理。
  • 全部数据流组:它向所有任务分发元组——它可能导致处理冲突。
  • 直接数据流组:一种特殊的数据流组,实现动态路由。元组生产者决定哪个消费者应该接收这个元组。可能是基于运行时的任务ID。bolt可以通过TopologyContext类得到消费者的任务ID,或OutputCollector的emit方法也可使用直接直接数据流组。
  • 本地数据流组:如果目标bolt在相同进程中有一个以上的任务,元组将被随机分配(就像随机数据流组),但是只分配相同进程中的那些任务。
  • 全局数据流组:所有元组到达拥有最小ID的bolt
  • 不分组:目前与随机数据流组一样。

Storm的消息处理担保

spout生成的元组能够触发进一步的元组分发,基于拓扑和所应用的转换。这意味着可能是整个消息树。Storm担保每个元组被完整的处理了——树上的每个节点已被处理过了。这一担保不能没有程序员的支持。每当消息树中创建了一个新的节点或者一个节点被处理了,程序员都必须向Storm指明。第一点通过锚定实现,也就是将处理完成的元组作为OutputCollector的emit方法的第一个参数。这就保证了消息被锚定到了合适的元组。消息也可以锚定到多个元组,这样就构成了一个消息的非循环有向图(DAG),而不只是一棵树。即使在消息的循环有向图存在的情况下,Storm也可以担保消息处理。

在每条消息被处理后,程序员可通过调用ack或fail方法,告诉Storm这条消息已被成功处理或处理失败。Storm会在失败时重新发送数据流——这里满足至少处理一次的语义。Storm也会在发送数据流时采用超时机制——这是一个storm.yaml的配置参数(config.TOPOLOGY_MESSAGE_TIMEOUT_ SECS)。

在Storm内部,有一组“ackeer”任务持续追踪来自每条元组消息的DAG。这些任务的数量可通过storm.yaml中的TOPOLOGY_ACKERS参数设定。在处理大量消息时,可能将不得不增大这个数字。每个消息元组得到一个64-bit ID,用于ackers追踪。元组的DAG状态由一个叫做ack val的64-bit值维护,只是简单的把树中每个确认过(译者注:原谅是acked)的ID执行异或运算。当ack val成为0时,acker任务就认为这棵元组树被完全处理了。

在某些情况下,当性能至关重要,而可靠性又不是问题时,可靠性也可以关闭。在这些情况下,程序员可以指定TOPOLOGY_ACKERS为0,并在分发新元组时,不指定输入元组的非锚定消息(译者注:原文为unanchor messages)。这样就跳过了确认消息,节省了带宽,提高了吞吐量。到目前为止我们已经讨论且只讨论了至少处理一次数据流的语义(译者注:原文为at-least-once stream semantics)。

仅处理一次数据流的语义可以采用事务性拓扑实现。Storm通过为每条元组提供相关联的事务ID为数据流处理提供事务性语义(仅一次,不完全等同于关系数据库的ACID语义)。对于重新发送数据流来说,相同的事务ID也会被发送并担保这个元组不会被重复处理。这方面牵涉到对于消息处理的严格顺序,就像是在处理一个元组。由于这样做的低效率,Storm允许批量处理由一个事务ID关联的元组。不像早先的情况 ,程序不得不将消息锚定到输入元组,事务性拓扑对程序员是透明的。Storm内部将元组的处理分为两阶段——第一阶段为处理阶段,可以并行处理多个批次,第二阶段为提交阶段,强制严格按照批次ID提交。

事务性拓扑已经过时了——它已被整合进了一个叫做Trident的更大的框架。Trident允许对流数据进行查询,包括聚合、连接、分组函数,还有过滤器。Trident构建于事务性拓扑之上并提供了一致的一次性语义。更多关于Trident的细节请参考wiki:https://github.com/nathanmarz/storm/wiki/ Trident- tutorial。 

目录
相关文章
|
1月前
|
存储 机器学习/深度学习 SQL
大数据处理与分析技术
大数据处理与分析技术
121 2
|
6天前
|
机器学习/深度学习 数据可视化 大数据
机器学习与大数据分析的结合:智能决策的新引擎
机器学习与大数据分析的结合:智能决策的新引擎
71 15
|
12天前
|
SQL 分布式计算 DataWorks
DataWorks产品测评|基于DataWorks和MaxCompute产品组合实现用户画像分析
本文介绍了如何使用DataWorks和MaxCompute产品组合实现用户画像分析。首先,通过阿里云官网开通DataWorks服务并创建资源组,接着创建MaxCompute项目和数据源。随后,利用DataWorks的数据集成和数据开发模块,将业务数据同步至MaxCompute,并通过ODPS SQL完成用户画像的数据加工,最终将结果写入`ads_user_info_1d`表。文章详细记录了每一步的操作过程,包括任务开发、运行、运维操作和资源释放,帮助读者顺利完成用户画像分析。此外,还指出了文档中的一些不一致之处,并提供了相应的解决方法。
|
11天前
|
分布式计算 DataWorks 搜索推荐
用户画像分析(MaxCompute简化版)
通过本教程,您可以了解如何使用DataWorks和MaxCompute产品组合进行数仓开发与分析,并通过案例体验DataWorks数据集成、数据开发和运维中心模块的相关能力。
|
1月前
|
机器学习/深度学习 存储 大数据
在大数据时代,高维数据处理成为难题,主成分分析(PCA)作为一种有效的数据降维技术,通过线性变换将数据投影到新的坐标系
在大数据时代,高维数据处理成为难题,主成分分析(PCA)作为一种有效的数据降维技术,通过线性变换将数据投影到新的坐标系,保留最大方差信息,实现数据压缩、去噪及可视化。本文详解PCA原理、步骤及其Python实现,探讨其在图像压缩、特征提取等领域的应用,并指出使用时的注意事项,旨在帮助读者掌握这一强大工具。
70 4
|
1月前
|
关系型数据库 分布式数据库 数据库
PolarDB 以其出色的性能和可扩展性,成为大数据分析的重要工具
在数字化时代,企业面对海量数据的挑战,PolarDB 以其出色的性能和可扩展性,成为大数据分析的重要工具。它不仅支持高速数据读写,还通过数据分区、索引优化等策略提升分析效率,适用于电商、金融等多个行业,助力企业精准决策。
35 4
|
1月前
|
机器学习/深度学习 分布式计算 算法
【大数据分析&机器学习】分布式机器学习
本文主要介绍分布式机器学习基础知识,并介绍主流的分布式机器学习框架,结合实例介绍一些机器学习算法。
208 5
|
1月前
|
存储 监控 数据挖掘
【Clikhouse 探秘】ClickHouse 物化视图:加速大数据分析的新利器
ClickHouse 的物化视图是一种特殊表,通过预先计算并存储查询结果,显著提高查询性能,减少资源消耗,适用于实时报表、日志分析、用户行为分析、金融数据分析和物联网数据分析等场景。物化视图的创建、数据插入、更新和一致性保证通过事务机制实现。
166 14
|
1月前
|
消息中间件 分布式计算 大数据
数据为王:大数据处理与分析技术在企业决策中的力量
【10月更文挑战第29天】在信息爆炸的时代,大数据处理与分析技术为企业提供了前所未有的洞察力和决策支持。本文探讨了大数据技术在企业决策中的重要性和实际应用,包括数据的力量、实时分析、数据驱动的决策以及数据安全与隐私保护。通过这些技术,企业能够从海量数据中提取有价值的信息,预测市场趋势,优化业务流程,从而在竞争中占据优势。
117 2
|
1月前
|
数据采集 机器学习/深度学习 搜索推荐
大数据与社交媒体:用户行为分析
【10月更文挑战第31天】在数字化时代,社交媒体成为人们生活的重要部分,大数据技术的发展使其用户行为分析成为企业理解用户需求、优化产品设计和提升用户体验的关键手段。本文探讨了大数据在社交媒体用户行为分析中的应用,包括用户画像构建、情感分析、行为路径分析和社交网络分析,以及面临的挑战与机遇。