3.MapReduce范式
熟悉函数式编程的宝子们应该熟悉MapReduce的思想,MapReduce范式对并行处理中的一种常见情况进行了建模,它应用map()函数和reduce()函数为并行提供支持。
3.1 为什么要使用MapReduce
考虑一个场景,编写一个程序统计文件中出现的单词及其频次。如果文件和单词的数量不多,问题还不算复杂,如果将上述问题扩展到一个拥有数万个文件的环境中,每个文件都包含数十至数百兆的数据,按顺序处理如此大量的数据是不可行的。你可以自己编写并行程序来扩展上述方案,最后整合每台机器的计算结果进行本地计数,不过你需要考虑好怎么利用“管道系统”来协调作业,处理故障。
不过,你的程序可能还不能够复用。费那劲造轮子干嘛。MapReduce来救你了。
3.2 MapReduce是什么?
MapReduce是一种分布式计算框架 ,以一种可靠的,具有容错能力的方式并行地处理上TB级别的海量数据集。主要用于搜索领域,解决海量数据的计算问题。
看看MapReduce做什么?
Map()负责把一个大的block块进行切片并计算。
Reduce() 负责把Map()切片的数据进行汇总、计算。
3.3 MapReduce示例:词汇统计
现在使用MapReduce
基于伪代码来实现词汇统计,并通过这个例子进一步理解Map()和Reduce()函数。
上面的map()函数会记录所有出现的词汇,将其计数设置为1,输出(word,count)对传递给reduce函数,而reduce函数则对数据重新进行分组或者排序,以便将具有特定键的所有记录收集到一起。最后通过增加计数整合词汇计数列表。
其详细过程可以参考下图(Mapreduce作业中的键值流),其中m表示map,r表示reduce,k表示key,v表示value.
3.4 MapReduce任务的并行处理
上面的例子其实并没有关注并行处理的问题,下面我们分析下它的并行处理时会做什么?
1.第一步对输入的数据进行切片,每个切片分配一个map()任务,map()对其中的数据进行计算,对每个数据用键值对的形式记录,然后输出到环形缓冲区(图中sort的位置)。
2.map()中输出的数据在环形缓冲区内进行快排,每个环形缓冲区默认大小100M,当数据达到80M时(默认),把数据输出到磁盘上。形成很多个内部有序整体无序的小文件。
3.框架把磁盘中的小文件传到Reduce()中来,然后进行归并排序,最终输出。
可以看到,MapReduce系统在多台机器上并行执行map()函数,每个map任务处理部分数据,reduce()函数也在多台机器上并行执行,每个reduce任务处理reduce键的一个子集(注意,对reduce函数的特定调用仍然是针对单个reduce键的)
值得关注的是,文件的输入输出会借助Hadoop分布式文件系统(HDFS)实现输入输出的并行化。除了HDFS,MapReduce还可以支持适配各种大数据存储系统,如HBASE,MongoDB,Amazon Dynamo。当然,生产中也会应用复制技术 ,通常是三台,来保证即使一些机器发生故障,也可以从存有故障机器数据副本的其他机器获得数据。
下图展示了并行处理的更多细节,供参考。
3.5 Hadoop中的MapReduce
Hadoop是一个开源的大数据框架,是一个分布式计算的解决方案。Hadoop的两个核心解决了数据存储问题(HDFS分布式文件系统)和分布式计算问题(MapRe-duce)。
Hadoop项目提供了用Java语言编写的、广泛使用的MapReduce开源实现。我们使用Java API来概述它的主要特性。它也提供了Python和C++语言实现的MapReduce API。
java通过继承Mapper和Reducer类,就可以重写map()和reduce()方法。除了reduce()方法,Hadoop还允许程序员定义combine()方法,它可以在执行map()方法的节点上执行部分reduce操作。使用combine的一个好处是他减少了必须通过网络发生的数据量:运行map任务的每个节点在网络上为每个词汇只发送一个条目,而不是多个条目。
Hadoop中的单个M apReduce步骤执行一个map方法和一个reduce方法,一个程序可以有多个MapReduce步骤。每个步骤的reduce()输出被写入(分布式)文件系统,并在后面的步骤中被读出。Hadoop还允许程序员控制为一项作业并行运行的map和reduce任务的数量。
下面显示了前面词汇统计的Hadoop的java实现。
public class WordCount { public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> { // 每个词的初始频次设置为1 private final static IntWritable one = new IntWritable(1); // 定义输出参数:Text,文本类型 private Text word = new Text(); // 重写map方法 // key,输入键:LongWritable,长整型 // value,输入值:Text,文本 public void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException { String line = value.toString(); // StringTokenizer,将文本分解为词 StringTokenizer tokenizer = new StringTokenizer(line); while (tokenizer.hasMoreTokens()) { word.set(tokenizer.nextToken()); // 将单词-频次的键值对写入context,one表示频次为1 context.write(word, one); } } } public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> { // 重写reduce方法 // key,输入键:Text,词,values,一个频次的列表 public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; // 将频次的列表累加 for (IntWritable val : values) { sum += val.get(); } context.write(key, new IntWritable(sum)); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = new Job(conf, "wordcount"); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true); } }
3.6 MapReduce上的SQL
MapReduce的许多应用都是为了使用不容易用SQL表示的计算来并行处理大量的非关系数据库。比如,计算“倒排索引”(这是网络搜索引擎高效回答关键字查询的关键)和计算Google的PageRank。
关系运算可以使用map和reduce步骤来实现,不过这对于程序员来说非常麻烦,使用SQL会更加简洁。不过为了提高效率,新一代的系统已经被开发出来了,允许使用SQL语言(或变体)编写的查询在存储在文件系统中的数据上并行执行。比如Apache Hive和由Microsoft开发的Scope,以及Apache Pig。使用这些系统在MapReduce框架(如Hadoop)上编写查询比直接使用MapReduce范式编写的查询要多得多。
下图是企业中一种常见的大数据分析平台部署框架,Hive和Pig用于报表中心,Hive用于分析报表,Pig用于报表中数据的转换工作。
什么是hive
hive是一个构建在Hadoop上的数据仓库工具(框架)。
可以将结构化的数据文件映射成一张数据表,并可以使用类sql的方式来对这样的数据文件进行读,写以及管理(包括元数据)。
这套HIVE SQL 简称HQL。hive的执行引擎可以是MR、spark、tez。
4.超越MapReduce——代数运算
4.1 代数运算的动机
关系运算可以通过map和reduce步骤的序列来表示,但这样的方式来表示任务可能会相当麻烦。
例如,如果程序员需要计算两个输入的连接,他们应该能够将其表示为单个代数运算,而不必通过map和reduce函数间接的表示出来。访问诸如joins之类的函数可以大大简化程序员的工作。这比直接使用map和reduce更有效,即使是在程序员不必直接编写MapReduce代码的数据仓库(如hive)中,也会更好。
因此,新一代并行数据处理系统增加了对其他关系运算(如joins)的支持,并支持数据分析的各种其他运算。例如,机器模型可以被建模为运算符,这些运算符以训练记录集合作为输入并输出学习模型。数据处理通常设计多个步骤,这些步骤可以建模为序列(流水线)或运算符树。
这些运算的一种统一框架是将它们视为代数运算(algebraic operation),将一个或多个数据集作为输入并输出一个或多个数据集。关系运算的输入必须是原子类型数据,代数运算的输入则支持更复杂的表达式。
有许多框架支持复杂数据上的代数运算,目前最广泛的是Apache Tez和Apache Spark。Tez提供了系统实现者的低级别的API,并非是为了程序员直接使用设计,不展开。
4.2 Spark中的代数运算
Apache Spark是一个广泛使用的并行数据处理系统,支持各种代数运算。数据可以从各种存储系统输入或输出至这些存储系统。
关系数据库使用关系作为数据表示的主要抽象,Spark使用一种称为弹性分布式数据集(Resilient Distributed Dataset,RDD)的表示,它是可以跨多台机器存储的记录的集合。属于分布式distributed 是只存储在不同机器上的记录,而弹性Resilient 是指故障恢复能力:即使其中一台机器发生故障,也可以从存储记录的其他机器中检索出数据。
对开发者而言,RDD可以看作是Spark的一个对象,它本身运行于内存中,如读文件是一个RDD,对文件计算是一个RDD,结果集也是一个RDD ,不同的分片、 数据之间的依赖 、key-value类型的map数据都可以看做RDD。
Spark中的运算符接受一个或者多个RDD作为输入,其输出是一个RDD。存储在RDD中的记录类型不是预先定义的,可以是应用想要的任何类型。Spark还支持被称作DataSet的关系数据表示。
Spark为Java,Scala和Python提供了API,我们对Spark的介绍是基于Java API的。
下面展示使用Spark在Java中编写词汇统计应用,该程序使用RDD数据表示形式,被称为JavaRDD,它用一个尖括号<>指定记录类型。另外JavaPairRDD用于支持结构化数据类型。尽管可以使用任意用户自定义的数据类型,但预定义的数据类型仍然被广泛使用。预定义的数据类型有Tuple2(存储2个属性),Tuple3(存储3个属性),Tuple4(存储4个属性)
下面图片很好的解释了上面程序的内容。
使用spark处理数据的第一步是将数据从输入表示形式转换为RDD的表现形式,这是由spark.read.textfile()函数完成的,它的输入中的每一行创建一个记录。请注意,输入可以是一个文件或者具有多个文件的目录,在多个节点上运行的spark系统实际上会跨多台机器划分RDD。
仅看代码,用户根本体会不到数据在背后是并行计算。从图中能看出数据分布在不同分区(也可以理解不同机器上),数据经过flapMap、map和reduceByKey算子在不同RDD的分区中流转。(这些算子就是上面所说对RDD进行计算的函数)
理解如何实现并行处理的关键是弄明白以下内容:
RDD可以划分并存储在多台机器上;
每种运算可以在多台机器上、在机器上可用的RDD划分上并行执行。可以首先对运算的输入进行重新划分,以便在并行执行操作之前将相关记录放到同一台机器上。例如,reduceByKey将重新划分输入的RDD,使属于同一组的所有记录集中在但台机器上,不同组的记录可能再不同的机器上。
Spark的另一个重要特性是代数运算不需要在函数调用是立刻计算,尽管代码看上去似乎是这么做的。相反,上面展示的代码实际上创建了一颗运算树。这些运算不会一经定义就立刻执行,而是在后面savaASTextFile()执行时强制要求对树进行运算。其他函数比如collect()也有类似的作用,熟悉函数式编程的朋友应该不难理解这一点。
延迟计算(lazy evaluation)的优点是可以在计算时重写树,使得结果更快。
如果一种运算的结果作为不止其他一种运算的输入,这些操作可以形成一个有向无环图(Directed Asyclic Graph,DAG)
RDD很适合应用于诸如文本数据那样的特定数据类型,Spark支持的另一种DataSet类型则支持处理结构化数据。DataSet类型与广泛使用的Requet,ORC和Avro文件格式能够很好的契合。
下面代码说明Spark如何读取和处理Requet格式的数据。
HDFS,Hadoop,Hive和Spark
1.HDFS扮演者数据统一管理的角色,会统一管理100台服务器上的存储空间
2.HDFS中引入了一个MapReduce模块,MapReduce模块实际上是提供了一个任务并行的框架
3.Hadoop中采用HDFS来处理存储,MapRecude来处理计算。
4. 为了能够在大数据上使用SQL来处理数据,Hive应用而生。Hive实际上是在Hadoop上进行结构化数据处理的一个解决方案,目的是能让用户通过编写SQL来处理数据。
5.spark经常用来和Hadoop进行对比,更为精确的说是和Hadoop里面的MapReduce对比,因为Spark本身也是一个计算框架。Spark和MapReduce不同主要是Spak是基于内存的计算,而MapRecude是基于磁盘的计算,所以Spark的卖点就是快
5.流数据
5.1 流数据的应用
许多应用程序中需要在连续到达的数据上持续的执行查询,术语流数据是指以连续方式到达的数据。
常见的流数据有:股票市场的交易,电子商务中的购买行为,传感器的实施监测数据,网络的实时数据,社交媒体(诸如Twitter需要从用户哪里获得连续的消息流)
5.2 流数据查询
与流数据相反,存储在数据库中的数据有时被称为静态数据(data-at-rest),与存储的数据相比,流是无限的,也就是说,从概念上将流永远不会结束。只有在看到流的所有元组之后才能输出结果的查询永远无法输出任何结果。例如,询问流中元组个数的查询永远不会给出最终的结果。
处理流的无限特性的一种方式是在流上定义窗口(window),流上的每个窗口包含具有特定时间戳范围或特定数量的元组。查询可以针对一个或多个窗口,而不是整个流。
另一种选择是输出流中某个特定点的正确结果,但随着更多元组的到达输出会更新,比如计数查询。
基于上面两种选择,有好几种查询数据的方法。
连续查询。即实时对数据库执行插入、更新或删除,可使用SQL,这对用户希望看到所有插入的应用有优势,但如果输入率过高,使用者将会被这种大量更新所淹没。
流式查询语言。通过扩展SQL或者关系代数来定义查询语言。这些语言在语言层将流数据与存储的关系分开,并要求在执行关系运算之前应用窗口操作。有些流不能保证元组具有递增的时间戳,这样的流将包含标点(punctuation),标点定时发出来决定聚集结果何时完成。
流上的代数运算符。编写对输入元组执行的运算符(用户自定义函数)。元组由输入路由到运算符,运算符的每个输出可以路由到另外一个运算符,系统输出或存储在数据库中。运算符可以跨被处理的元组来维护内部状态,从而允许它们对输入数据进行聚集。它们还可以用来在数据库中持久的存储数据。这种方式得到广泛应用。
模式匹配。编写模式,匹配时执行动作,这种系统被称为复杂时间处理系统(Complex Event Processing,CEP)。流行的CEP有Oracle Event processing,FlinkCEP等
许多流系统将数据存在内存中,并不提供持久性保证。许多应用采用Lambada architecture,其中输入数据的一个副本被提供给流处理系统,另一个副本则保存在数据库中存储和后续处理。
然而,流媒体系统和数据库系统是分离的,从而导致以下问题。
查询可能需要用不同的语言编写两次,一次用于流处理系统,一次用于数据库系统。
流式查询可能无法有效地访问存储的数据。
支持带持久性存储的流式查询和跨流和存储数据的查询系统可以避免这些问题。
5.3 流上的代数运算
虽然对流式数据的SQL查询非常有用,但在许多应用中SQL查询并不是很适合,通过流处理的代数运算方法,可以为实现代数运算提供用户自定义的代码。
要执行运算,就必须把输入元组路由给运算符,运算符必须路由给使用者,这种容错的元组路由可以由Apache Storm和Kafka等实现。
元组路由在逻辑上是通过构建一个以操作符为节点的有向无环图来实现的。流处理系统的入口是数据源,从数据源作为入口把元组注入流处理系统。流处理系统的出口是数据接收节点,通过数据接收退出的元组可以存储在文件中或者以别的方式输出。图中间的节点是运算符,边则是元组流。
实现流处理的一种思路就是把图指定为系统配置的一部分,当系统开始处理元组时读取该图,Storm就是这么处理的。
另外一种方式是发布-订阅系统(publish-subcribe,也称为pub-sub系统),订阅者订阅特定的主题,发布者以文档或其他形式发布带有关联主题的数据,所有主题订阅者都会收到对应的副本。这种方式可以动态的增删运算符,较为灵活,kafka
采用的就是这种模式。
接下来讨论如何将流数据源作为此类运算的输入。因为流数据是源源不断的,可能会有无限的输入,为了解决这个问题,Spark允许将流数据分解为离散化流,其中特定时间窗口的流数据被视为代数运算的输入,就像处理文件或者关系那样进行处理。
但这样需要提前对流数据进行离散化。诸如Storm和Flink支持流式运算,它们将流作为输入并输出另一个流。这对于诸如映射或者选择关系那样的运算很简单,每个输出元组从输入元组继承一个时间戳。在整个流被处理完之前,关系聚集运算和reduce操作可能无法产生任何技术处。为了支持这些操作,Flink支持一种将流分解为窗口的窗口操作,聚集在每个窗口内计算并在窗口完成后输出。需要注意的是输出被视为流,其中元组时具有基于窗口结束点的时间戳。
下图是flink使用时间窗口的窗口划分示意。
关于Flink,您可以参考博客,全面解析流处理框架Flink
6.图数据库
对于道路网络等图类型的数据,可以采用图数据库存储。
图数据库提供了许多额外的特性。
允许将关系标识为节点或边,提供定义这种关系的特殊语法
允许设计可以轻松表示路径查询的查询语言
为此类查询提供高效实现
为图可视化等其他特性提供支持
Neo4j是广泛使用的图数据库,它是一种集中式数据库(截止至2018年)。因此,Neo4j并不支持处理非常大的图,诸如社交网络就是这种非常庞大的图的实例。
有两种常用的并行图处理方法用于处理复杂大图。
map-reduce和代数框架,图可以作为关系存储在并行存储系统中,跨多台机器进行划分,使用map-reduce程序,代数框架或并行关系数据库来实现跨多个节点并行处理。这种方法在许多应用中很有效,但当执行遍历图中长路径的迭代计算时,这些方法非常低效,因为他们通常每次迭代都要去读取整张图。
批量同步处理框架。用于图算法的批量同步处理(Bulk Synchronous Processing,BSP)框架将图算法制定以迭代方式操作的顶点相关联的计算。这里的图通常存储在内存中,顶点是跨多台机器进行划分的,最重要的是,不必在每次迭代中读取图。
BSP
已经被Pregel
系统所普及,Giraph
系统是基于Pregel的开源版本。Apache GraphX支持大图上的图计算。