暂无个人介绍
2020年03月
原因不确定,可能有的原因有:(1)内存用超了OOM挂掉了;(2)内存用多了被yarn的nodemanager给killed了,可以看看nodemanger的日志;(3)内存不太够,在做GC耗时较长卡住了,可以看看TM的gc日志;(4)网络抖动,可以尝试把timeout时间调长,taskmanager.network.netty.client.connectTimeoutSec=1800;(5)其他原因。
可以试试在提交的时候加上名称: ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); env.execute("tablejoindemo");
The required configuration option Key: 'taskmanager.cpu.cores' , default: null (fallback keys: []) is not set taskmanager.cpu.cores这个参数要在配置文件中配置一个正值。
为了避免启动很多task,每个task又执行时间很短,运行时间比低,可以尝试让每个task处理多个小文件,把每个split包含多少文件数的参数调大。
除了设计理念和架构层面(内存计算和DAG),spark在后续的优化过程中不断进行优化,比如专门启动了一个钨丝计划(tungsten),钨丝计划主要涉及三块:(1)Memory Management and Binary Processing,(2)Cache-aware computation,(3)Code generation,可以查看blog:https://databricks.com/glossary/tungsten
阿里开源的Alink,同时社区正在发展flink上图计算框架Gelly。
还可以开启压缩,选择压缩算法。还有设置多少个spill文件开始进行merge。
storm是经典的流计算框架,特别是对一条一条数据pipeline的计算非常适合。Spark不光能进行批计算,也就是像MapReduce那样对大量数据进行批量计算,也能进行流计算,不过spark streaming处理逻辑是微批或者叫小批,一次处理几条数据,以这种方式来实现流计算,但是对于一条一条或者一次很少量数据,不太适合。
Spark和MR是不同的计算框架,业界通俗的说法是Spark是MR之后的第二代大数据计算框架。MR是一个stage的计算过程:map->reduce,应用在机器学习领域需要不断的迭代,会有很多个MR job连接起来成为DAG,这样job与job之间的数据通过HDFS来中转,这个过程比较慢,Spark应运而生。Spark最初对比MR优点是内存计算和DAG,map和reduce之间使用内存来shuffle数据,stage与stage之间使用内存来中转数据,这样在机器学习领域性能提升很多。随着Spark的发展,Spark不光能应用在批处理,还能应用于流处理,而MR只能用于批处理。
不知道你这个是不是使用的默认配置,内存配置不匹配,总Flink内存才224MB,但是里面分配的各个内存加起来超过这个内存了,要把里面的各个内存调小点。可以查看1.10文档里有各个内存的配置项。
可以的。不过有一些前提条件,hadoop集群的节点要能网络连通oss也就是MR运行起来的task要能读取到oss的数据;MR任务中map/reduce函数中要自己写读取oss数据的代码,hadoop本身没有自带读取oss;MR运行的task不同task读取oss的不同数据,如果读取的是相同的数据,可以使用hadoop自带的distributedcache功能来实现。
如果是想实际使用的集群的话,一般采用7台服务器就可以:4台datanode(因为数据存储三副本容错)可以同时跑nodemanager+1台resourcemanager/或者jobtracker用来做计算的master+1台namenode用来做存储的master+1台secondarynamenode用来做存储的冷备master。另外找一台比较差的机器作为任务提交机器和admin操作机。 服务器的操作系统可以选择多个发行版,centos可以。 磁盘不需要做raid,因为hadoop本身存储层有三副本容灾,节点上磁盘做raid会浪费。 网络局域网连接就可以。
如果数据量小的话一般不需要使用hadoop,现在关系型数据库越来越强大,再加上把机器配置提高,RDMS就更强大了,现在分布式数据库的产品越来越多,阿里云上就有分布式数据库产品可以使用。如果存储的数据很多(几百T以上)而且还要保存历史数据,再加上要对这些数据进行很多分析,一般使用hadoop比较合适,hadoop集群的节点可以比RDMS使用的机器配置差,搭建成集群之后,使用分布式的方法,让每个节点同时干活,就比较适合分析大量数据。
hadoop下线节点一般采用退役流程decommission,一般不直接把节点上nodemanager或者datanode进程暴力停掉,为了是让数据和任务迁移,特别是数据,所以一般对于datanode一定要采用退役过程。DataNode退役过程启动的时候,虽然新的数据块写入不会再写到这个datanode,但是这台datanode上的数据块要拷贝到别的datanode上,当集群比较小的时候,而且这台datanode上的数据块又很多的时候,拷贝数据块的过程占用集群带宽,其他的datanode节点还会不断的向namenode汇报meta,所以这个过程会导致集群比较卡。 针对这个过程,下线之前,可以先删除一部分不用的hdfs文件减少数据块;dfs.max-repl-streams(默认2)配置选项会对datanode下线时每个datanode同时能够进行多少个block的并行拷贝进行控制,可以配置少点。
主要有几个计算过程: map:数据映射,对数据进行清洗过滤和格式化;map的函数是程序编写; combiner:可选,成为map本地的reduce,对map本地端的数据进行局部聚合;函数自己编写或者直接使用reduce函数; shuffle:这个是mapreduce框架自带,主要负责数据从map分发给reduce任务; reduce:数据聚合。
但是我不知道如何在以前的reduce作业的输出上运行reduce作业。—— 第一个job会配置输出目录,一般是在hdfs上,这个目录就是这个job的输出。 第二个job把第一个job的输出目录作为第二个job的输入目录配置,第二个job就会读取这个目录下的文件来执行,结果再输出到另外一个目录。第二个job的处理逻辑就是max。
对于yarn模式下: 还有resourcemanager:负责节点管理,资源管理和调度,app管理等。 nodemanager:负责task的container启动,里面还有mapreduce托管的shuffle service管理shuffle数据。
对于计算层: 如果是standalone模式,master有jobtracker,slave有taskmanagre; 如果是yarn模式,master有resourcemanager,slave有nodemanager。 对于存储层: master有namenode,slave有datanode,master还有另外一个secondary namenode也算。
可以举个简单的例子来解释下。比如wordcount程序,对这样的语句"daddy finger, daddy finger, where are you, here I am, here I am, how do you do"进行workcount。有两个map分别处理三个短句,map1的输出结果是:<daddy, 1> <finger, 1> <daddy, 1> <finger, 1> <where, 1> <are, 1> <you, 1>;map2的输出结果是:<here, 1> <I, 1> <am ,1> <here, 1> <I, 1> <am, 1> <how, 1> <do, 1> <you, 1> <do, 1>。有一个reduce来处理count操作。如果不采用combiner的话,总共有17条记录传递给reduce task。如果采用combiner的话,combiner会对map task本地端的结果进行combine操作(这里是wordcount),这样map端的数据结果就是combiner之后的结果,比如map1的输出结果就变成<daddy, 2> <finger, 2> <where, 1> <are, 1> <you, 1>;map2的输出结果就变成<here, 2> <I, 2> <am ,2> <how, 1> <do, 2> <you, 1>。这样总共有11条记录传递给reduce task,比之前的少。所以combier主要目的是减少map输出。