Spark集群独立模式HA

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
云原生网关 MSE Higress,422元/月
注册配置 MSE Nacos/ZooKeeper,118元/月
简介:

一、Spark简介:


Spark是一种与Hadoop相似的开源集群计算环境

Spark基于MR算法实现的分布式计算,拥有Hadoop MR的优点,不同的是结果保存在内存中

Spark是一个针对超大数据集合的低延迟的集群分布式计算系统,比MapReduce快40倍左右

Spark是在 Scala 语言中实现的,它将 Scala 用作其应用程序框架

Spark兼容Hadoop的API,能够读写Hadoop的HDFS HBASE 顺序文件等


传统的hadoop

wKioL1URY7nRlAbqAAFgqu76O_Y976.jpg



Spark

wKiom1URYpiR_49-AAGA5sVsokc685.jpg


环境概述:

192.168.1.2 master

192.168.1.3 worker

192.168.1.4 worker


二、Scala环境设置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
[root@master ~] # tar zxvf scala-2.10.4.tgz -C /home/hadoop/
[root@master ~] # cd /home/hadoop/
[root@master hadoop] # ln -s scala-2.10.4 scala
[root@master ~] # chown -R hadoop.hadoop /home/hadoop/
# Scala
export  SCALA_HOME= /home/hadoop/scala
export  PATH=$PATH:$HADOOP_DEV_HOME /sbin :$HADOOP_DEV_HOME /bin :$SCALA_HOME /bin
[root@master hadoop] # source /home/hadoop/.bashrc 
[root@master hadoop] # su - hadoop
[hadoop@master ~]$ scala
Welcome to Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_25).
Type  in  expressions to have them evaluated.
Type :help  for  more  information.
 
scala> 
 
# slave1,slave2执行相同的操作

三、spark环境配置

1
2
3
4
5
6
7
8
9
10
11
[root@master ~] # tar zxvf spark-1.0.2-bin-hadoop2.tgz -C /home/hadoop/
[root@master hadoop] # ln -s spark-1.0.2-bin-hadoop2 spark
[root@master hadoop] # chown -R hadoop.hadoop /home/hadoop/
[root@master hadoop] # su - hadoop
# 修改.bashrc文件
# Spark
export  SPARK_HOME= /home/hadoop/spark
export  PATH=$PATH:$HADOOP_DEV_HOME /sbin :$HADOOP_DEV_HOME /bin :$SCALA_HOME /bin :$SPARK_HOME /bin :$SPARK_HOME /sbin
[hadoop@master ~]$  source  .bashrc 
 
# 在slave1,slave2执行相同的操作

四、spark独立模式配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
[hadoop@master ~]$  cd  spark /conf/
[hadoop@master conf]$  cp  spark- env .sh.template spark- env .sh
# 修改spark-env.sh
JAVA_HOME= /usr/java/jdk
SPARK_MASTER_IP=master
SPARK_WORKER_MEMORY=512m
 
# 修改slaves文件
slave1
slave2
 
# 在slave1,slave2节点做相同的操作
 
# 在master节点上启动spark
[hadoop@master sbin]$ . /start-all .sh 
starting org.apache.spark.deploy.master.Master, logging to  /home/hadoop/spark-1 .0.2-bin-hadoop2 /sbin/ .. /logs/spark-hadoop-org .apache.spark.deploy.master.Master-1-master.out
slave2: starting org.apache.spark.deploy.worker.Worker, logging to  /home/hadoop/spark-1 .0.2-bin-hadoop2 /sbin/ .. /logs/spark-hadoop-org .apache.spark.deploy.worker.Worker-1-slave2.out
slave1: starting org.apache.spark.deploy.worker.Worker, logging to  /home/hadoop/spark-1 .0.2-bin-hadoop2 /sbin/ .. /logs/spark-hadoop-org .apache.spark.deploy.worker.Worker-1-slave1.out
 
# 查看进程
[hadoop@master sbin]$ jps
44526 NameNode
44835 ResourceManager
47017 Master
45104 JobHistoryServer
46226 HMaster
44695 SecondaryNameNode
45169 QuorumPeerMain
47125 Jps
 
[hadoop@slave1 conf]$ jps
2302 NodeManager
2914 HRegionServer
2451 QuorumPeerMain
3431 Worker
3481 Jps
2213 DataNode    
 
[hadoop@slave2 ~]$ jps
11262 DataNode
12761 Worker
11502 QuorumPeerMain
11360 NodeManager
12811 Jps
12032 HRegionServer

master webUI:    http://192.168.1.2:8080/

wKioL1UQ6reRxa-3AAMLWQ04k2k233.jpg

worker web UI: http://192.168.1.3:8081/


wKiom1UQ6cnwByfWAAFxeDTpoZo401.jpg


五、spark实践

1
[hadoop@master conf]$ MASTER=spark: //master :7077 spark-shell

wKiom1UQ8JOhbBTcAANJxlXaziA372.jpg




1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
scala> val rdd_a = sc.textFile( "hdfs://master:9000/tmp/wordcount.txt" )
15 /03/24  13:20:31 INFO storage.MemoryStore: ensureFreeSpace(141503) called with curMem=0, maxMem=311387750
15 /03/24  13:20:31 INFO storage.MemoryStore: Block broadcast_0 stored as values to memory (estimated size 138.2 KB,  free  296.8 
MB)rdd_a: org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at <console>:12
scala> rdd_a.first()
15 /03/24  13:25:31 INFO mapred.FileInputFormat: Total input paths to process : 1
15 /03/24  13:25:31 INFO spark.SparkContext: Starting job: first at <console>:15
15 /03/24  13:25:31 INFO scheduler.DAGScheduler: Got job 0 (first at <console>:15) with 1 output partitions (allowLocal= true )
15 /03/24  13:25:31 INFO scheduler.DAGScheduler: Final stage: Stage 0(first at <console>:15)
15 /03/24  13:25:31 INFO scheduler.DAGScheduler: Parents of final stage: List()
15 /03/24  13:25:31 INFO scheduler.DAGScheduler: Missing parents: List()
15 /03/24  13:25:31 INFO scheduler.DAGScheduler: Computing the requested partition locally
15 /03/24  13:25:31 INFO rdd.HadoopRDD: Input  split : hdfs: //master :9000 /tmp/wordcount .txt:0+26
15 /03/24  13:25:31 INFO Configuration.deprecation: mapred.tip. id  is deprecated. Instead, use mapreduce.task. id
15 /03/24  13:25:31 INFO Configuration.deprecation: mapred.task. id  is deprecated. Instead, use mapreduce.task.attempt. id
15 /03/24  13:25:31 INFO Configuration.deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap
15 /03/24  13:25:31 INFO Configuration.deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition
15 /03/24  13:25:31 INFO Configuration.deprecation: mapred.job. id  is deprecated. Instead, use mapreduce.job. id
15 /03/24  13:25:32 INFO spark.SparkContext: Job finished: first at <console>:15, took 0.397477806 s
res1: String = hello world
 
scala> rdd_a.collect()
15 /03/24  14:00:32 INFO mapred.FileInputFormat: Total input paths to process : 1
15 /03/24  14:00:32 INFO spark.SparkContext: Starting job: collect at <console>:15
15 /03/24  14:00:32 INFO scheduler.DAGScheduler: Got job 0 (collect at <console>:15) with 2 output partitions (allowLocal= false )
15 /03/24  14:00:32 INFO scheduler.DAGScheduler: Final stage: Stage 0(collect at <console>:15)
15 /03/24  14:00:32 INFO scheduler.DAGScheduler: Parents of final stage: List()
15 /03/24  14:00:32 INFO scheduler.DAGScheduler: Missing parents: List()
15 /03/24  14:00:32 INFO scheduler.DAGScheduler: Submitting Stage 0 (MappedRDD[1] at textFile at <console>:12),  which  has no missing parents
15 /03/24  14:00:32 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from Stage 0 (MappedRDD[1] at textFile at <console>:12)
15 /03/24  14:00:32 INFO scheduler.TaskSchedulerImpl: Adding task  set  0.0 with 2 tasks
15 /03/24  14:00:32 INFO scheduler.TaskSetManager: Starting task 0.0:0 as TID 0 on executor 1: slave2 (NODE_LOCAL)
15 /03/24  14:00:32 INFO scheduler.TaskSetManager: Serialized task 0.0:0 as 1725 bytes  in  5 ms
15 /03/24  14:00:32 INFO scheduler.TaskSetManager: Starting task 0.0:1 as TID 1 on executor 1: slave2 (NODE_LOCAL)
15 /03/24  14:00:32 INFO scheduler.TaskSetManager: Serialized task 0.0:1 as 1725 bytes  in  0 ms
15 /03/24  14:00:38 INFO scheduler.DAGScheduler: Completed ResultTask(0, 1)
15 /03/24  14:00:38 INFO scheduler.TaskSetManager: Finished TID 1  in  5942 ms on slave2 (progress: 1 /2 )
15 /03/24  14:00:38 INFO scheduler.TaskSetManager: Finished TID 0  in  5974 ms on slave2 (progress: 2 /2 )
15 /03/24  14:00:38 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
15 /03/24  14:00:38 INFO scheduler.DAGScheduler: Completed ResultTask(0, 0)
15 /03/24  14:00:38 INFO scheduler.DAGScheduler: Stage 0 (collect at <console>:15) finished  in  6.015 s
15 /03/24  14:00:38 INFO spark.SparkContext: Job finished: collect at <console>:15, took 6.133297026 s
res0: Array[String] = Array(hello world, hello world1, hello world1, hello world1,  "" )
scala> val rdd_b = rdd_a.flatMap((line => line. split ( " " ))).map(word => (word, 1))
rdd_b: org.apache.spark.rdd.RDD[(String, Int)] = MappedRDD[3] at map at <console>:14
scala> rdd_b.collect()
15 /03/24  14:11:41 INFO spark.SparkContext: Starting job: collect at <console>:17
15 /03/24  14:11:41 INFO scheduler.DAGScheduler: Got job 1 (collect at <console>:17) with 2 output partitions (allowLocal= false )
15 /03/24  14:11:41 INFO scheduler.DAGScheduler: Final stage: Stage 1(collect at <console>:17)
15 /03/24  14:11:41 INFO scheduler.DAGScheduler: Parents of final stage: List()
15 /03/24  14:11:41 INFO scheduler.DAGScheduler: Missing parents: List()
15 /03/24  14:11:41 INFO scheduler.DAGScheduler: Submitting Stage 1 (MappedRDD[3] at map at <console>:14),  which  has no missing 
parents15 /03/24  14:11:41 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from Stage 1 (MappedRDD[3] at map at <console>:14)
15 /03/24  14:11:41 INFO scheduler.TaskSchedulerImpl: Adding task  set  1.0 with 2 tasks
15 /03/24  14:11:42 INFO scheduler.TaskSetManager: Starting task 1.0:0 as TID 2 on executor 1: slave2 (NODE_LOCAL)
15 /03/24  14:11:42 INFO scheduler.TaskSetManager: Serialized task 1.0:0 as 1816 bytes  in  0 ms
15 /03/24  14:11:42 INFO scheduler.TaskSetManager: Starting task 1.0:1 as TID 3 on executor 1: slave2 (NODE_LOCAL)
15 /03/24  14:11:42 INFO scheduler.TaskSetManager: Serialized task 1.0:1 as 1816 bytes  in  0 ms
15 /03/24  14:11:42 INFO scheduler.TaskSetManager: Finished TID 2  in  177 ms on slave2 (progress: 1 /2 )
15 /03/24  14:11:42 INFO scheduler.DAGScheduler: Completed ResultTask(1, 0)
15 /03/24  14:11:42 INFO scheduler.DAGScheduler: Completed ResultTask(1, 1)
15 /03/24  14:11:42 INFO scheduler.TaskSetManager: Finished TID 3  in  207 ms on slave2 (progress: 2 /2 )
15 /03/24  14:11:42 INFO scheduler.DAGScheduler: Stage 1 (collect at <console>:17) finished  in  0.209 s
15 /03/24  14:11:42 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 
15 /03/24  14:11:42 INFO spark.SparkContext: Job finished: collect at <console>:17, took 0.279714483 s
res1: Array[(String, Int)] = Array((hello,1), (world,1), (hello,1), (world1,1), (hello,1), (world1,1), (hello,1), (world1,1), 
( "" ,1))
scala> val rdd_c = rdd_b.reduceByKey((a, b) => a + b)
rdd_c: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[6] at reduceByKey at <console>:16
scala> rdd_c.collect()
15 /03/24  14:14:42 INFO spark.SparkContext: Starting job: collect at <console>:19
15 /03/24  14:14:43 INFO scheduler.DAGScheduler: Registering RDD 4 (reduceByKey at <console>:16)
15 /03/24  14:14:43 INFO scheduler.DAGScheduler: Got job 2 (collect at <console>:19) with 2 output partitions (allowLocal= false )
15 /03/24  14:14:43 INFO scheduler.DAGScheduler: Final stage: Stage 2(collect at <console>:19)
15 /03/24  14:14:43 INFO scheduler.DAGScheduler: Parents of final stage: List(Stage 3)
15 /03/24  14:14:43 INFO scheduler.DAGScheduler: Missing parents: List(Stage 3)
15 /03/24  14:14:43 INFO scheduler.DAGScheduler: Submitting Stage 3 (MapPartitionsRDD[4] at reduceByKey at <console>:16),  which 
has no missing parents15 /03/24  14:14:43 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from Stage 3 (MapPartitionsRDD[4] at reduceByKey at 
<console>:16)15 /03/24  14:14:43 INFO scheduler.TaskSchedulerImpl: Adding task  set  3.0 with 2 tasks
15 /03/24  14:14:43 INFO scheduler.TaskSetManager: Starting task 3.0:0 as TID 4 on executor 1: slave2 (NODE_LOCAL)
15 /03/24  14:14:43 INFO scheduler.TaskSetManager: Serialized task 3.0:0 as 2074 bytes  in  36 ms
15 /03/24  14:14:43 INFO scheduler.TaskSetManager: Starting task 3.0:1 as TID 5 on executor 1: slave2 (NODE_LOCAL)
15 /03/24  14:14:43 INFO scheduler.TaskSetManager: Serialized task 3.0:1 as 2074 bytes  in  0 ms
15 /03/24  14:14:43 INFO scheduler.TaskSetManager: Finished TID 4  in  282 ms on slave2 (progress: 1 /2 )
15 /03/24  14:14:43 INFO scheduler.DAGScheduler: Completed ShuffleMapTask(3, 0)
15 /03/24  14:14:43 INFO scheduler.TaskSetManager: Finished TID 5  in  241 ms on slave2 (progress: 2 /2 )
15 /03/24  14:14:43 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 3.0, whose tasks have all completed, from pool 
15 /03/24  14:14:43 INFO scheduler.DAGScheduler: Completed ShuffleMapTask(3, 1)
15 /03/24  14:14:43 INFO scheduler.DAGScheduler: Stage 3 (reduceByKey at <console>:16) finished  in  0.286 s
15 /03/24  14:14:43 INFO scheduler.DAGScheduler: looking  for  newly runnable stages
15 /03/24  14:14:43 INFO scheduler.DAGScheduler: running: Set()
15 /03/24  14:14:43 INFO scheduler.DAGScheduler: waiting: Set(Stage 2)
15 /03/24  14:14:43 INFO scheduler.DAGScheduler: failed: Set()
15 /03/24  14:14:43 INFO scheduler.DAGScheduler: Missing parents  for  Stage 2: List()
15 /03/24  14:14:43 INFO scheduler.DAGScheduler: Submitting Stage 2 (MapPartitionsRDD[6] at reduceByKey at <console>:16),  which 
is now runnable15 /03/24  14:14:43 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from Stage 2 (MapPartitionsRDD[6] at reduceByKey at 
<console>:16)15 /03/24  14:14:43 INFO scheduler.TaskSchedulerImpl: Adding task  set  2.0 with 2 tasks
15 /03/24  14:14:43 INFO scheduler.TaskSetManager: Starting task 2.0:0 as TID 6 on executor 1: slave2 (PROCESS_LOCAL)
15 /03/24  14:14:43 INFO scheduler.TaskSetManager: Serialized task 2.0:0 as 1953 bytes  in  1 ms
15 /03/24  14:14:43 INFO scheduler.TaskSetManager: Starting task 2.0:1 as TID 7 on executor 0: slave1 (PROCESS_LOCAL)
15 /03/24  14:14:43 INFO scheduler.TaskSetManager: Serialized task 2.0:1 as 1953 bytes  in  0 ms
15 /03/24  14:14:43 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations  for  shuffle 0 to spark@slave2:374
0415 /03/24  14:14:43 INFO spark.MapOutputTrackerMaster: Size of output statuses  for  shuffle 0 is 136 bytes
15 /03/24  14:14:43 INFO scheduler.DAGScheduler: Completed ResultTask(2, 0)
15 /03/24  14:14:43 INFO scheduler.TaskSetManager: Finished TID 6  in  211 ms on slave2 (progress: 1 /2 )
15 /03/24  14:14:45 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations  for  shuffle 0 to spark@slave1:57339
15 /03/24  14:14:46 INFO scheduler.DAGScheduler: Completed ResultTask(2, 1)
15 /03/24  14:14:46 INFO scheduler.TaskSetManager: Finished TID 7  in  3192 ms on slave1 (progress: 2 /2 )
15 /03/24  14:14:46 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool 
15 /03/24  14:14:46 INFO scheduler.DAGScheduler: Stage 2 (collect at <console>:19) finished  in  3.193 s
15 /03/24  14:14:46 INFO spark.SparkContext: Job finished: collect at <console>:19, took 3.634568622 s
res2: Array[(String, Int)] = Array(( "" ,1), (hello,4), (world,1), (world1,3))
scala> rdd_c.cache()
res3: rdd_c. type  = MapPartitionsRDD[6] at reduceByKey at <console>:16
scala> rdd_c.saveAsTextFile( "hdfs://master:9000/tmp/spark_result" )
15 /03/24  14:17:57 INFO Configuration.deprecation: mapred.tip. id  is deprecated. Instead, use mapreduce.task. id
15 /03/24  14:17:57 INFO Configuration.deprecation: mapred.task. id  is deprecated. Instead, use mapreduce.task.attempt. id
15 /03/24  14:17:57 INFO Configuration.deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap
15 /03/24  14:17:57 INFO Configuration.deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition
15 /03/24  14:17:57 INFO Configuration.deprecation: mapred.job. id  is deprecated. Instead, use mapreduce.job. id
15 /03/24  14:17:58 INFO spark.SparkContext: Starting job: saveAsTextFile at <console>:19
15 /03/24  14:17:58 INFO scheduler.DAGScheduler: Got job 3 (saveAsTextFile at <console>:19) with 2 output partitions (allowLocal
= false )15 /03/24  14:17:58 INFO scheduler.DAGScheduler: Final stage: Stage 4(saveAsTextFile at <console>:19)
15 /03/24  14:17:58 INFO scheduler.DAGScheduler: Parents of final stage: List(Stage 5)
15 /03/24  14:17:58 INFO scheduler.DAGScheduler: Missing parents: List()
15 /03/24  14:17:58 INFO scheduler.DAGScheduler: Submitting Stage 4 (MappedRDD[8] at saveAsTextFile at <console>:19),  which  has 
no missing parents15 /03/24  14:17:58 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from Stage 4 (MappedRDD[8] at saveAsTextFile at <con
sole>:19)15 /03/24  14:17:58 INFO scheduler.TaskSchedulerImpl: Adding task  set  4.0 with 2 tasks
15 /03/24  14:17:58 INFO scheduler.TaskSetManager: Starting task 4.0:0 as TID 8 on executor 0: slave1 (PROCESS_LOCAL)
15 /03/24  14:17:58 INFO scheduler.TaskSetManager: Serialized task 4.0:0 as 11506 bytes  in  1 ms
15 /03/24  14:17:58 INFO scheduler.TaskSetManager: Starting task 4.0:1 as TID 9 on executor 1: slave2 (PROCESS_LOCAL)
15 /03/24  14:17:58 INFO scheduler.TaskSetManager: Serialized task 4.0:1 as 11506 bytes  in  0 ms
15 /03/24  14:17:58 INFO storage.BlockManagerInfo: Added rdd_6_1  in  memory on slave2:37855 (size: 216.0 B,  free : 297.0 MB)
15 /03/24  14:17:58 INFO storage.BlockManagerInfo: Added rdd_6_0  in  memory on slave1:48694 (size: 408.0 B,  free : 297.0 MB)
15 /03/24  14:17:58 INFO scheduler.TaskSetManager: Finished TID 9  in  653 ms on slave2 (progress: 1 /2 )
15 /03/24  14:17:58 INFO scheduler.DAGScheduler: Completed ResultTask(4, 1)
15 /03/24  14:18:00 INFO scheduler.DAGScheduler: Completed ResultTask(4, 0)
15 /03/24  14:18:00 INFO scheduler.TaskSetManager: Finished TID 8  in  2104 ms on slave1 (progress: 2 /2 )
15 /03/24  14:18:00 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 4.0, whose tasks have all completed, from pool 
15 /03/24  14:18:00 INFO scheduler.DAGScheduler: Stage 4 (saveAsTextFile at <console>:19) finished  in  2.105 s
15 /03/24  14:18:00 INFO spark.SparkContext: Job finished: saveAsTextFile at <console>:19, took 2.197440038 s
 
[hadoop@master ~]$ hadoop dfs - cat  /tmp/spark_result/ *
DEPRECATED: Use of this script to execute hdfs  command  is deprecated.
Instead use the hdfs  command  for  it.
 
15 /03/24  14:19:12 WARN util.NativeCodeLoader: Unable to load native-hadoop library  for  your platform... using  builtin -java cla
sses where applicable(,1)
(hello,4)
(world,1)
(world1,3)

查看作业 http://192.168.1.2:4040/stages/


wKiom1URAlSCjebiAAQ0dPJivj0255.jpg


基于FileSystem的冗余

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
[hadoop@master ~]$  cd  spark /conf/
# 修改spark-env.sh
JAVA_HOME= /usr/java/jdk
SPARK_DAEMON_JAVA_OPTS= "-Dspark.deploy.recoveryMode=FILESYSTEM -Dspark.deploy.recoveryDirectory=/app/hadoop/spark/recovery"
SPARK_MASTER_IP=master
SPARK_MASTER_PORT=7077
SPARK_WORKER_CORES=1
SPARK_WORKER_MEMORY=512m
MASTER=spark: // ${SPARK_MASTER_IP}:${SPARK_MASTER_PORT}
 
 
[root@master ~] # mkdir /app/hadoop/spark/recovery -p
[root@master ~] # chown -R hadoop.hadoop /app/hadoop/spark/recovery/
 
# slave1,slave2做相同的操作
 
[hadoop@master ~]$  cd  /home/hadoop/spark/sbin/
[hadoop@master sbin]$ . /stop-all .sh 
slave2: stopping org.apache.spark.deploy.worker.Worker
slave1: stopping org.apache.spark.deploy.worker.Worker
stopping org.apache.spark.deploy.master.Master
[hadoop@master sbin]$ . /start-all .sh 
starting org.apache.spark.deploy.master.Master, logging to  /home/hadoop/spark-1 .0.2-bin-hadoop2 /sbin/ .. /logs/spark-hadoop-org .
apache.spark.deploy.master.Master-1-master.outslave1: starting org.apache.spark.deploy.worker.Worker, logging to  /home/hadoop/spark-1 .0.2-bin-hadoop2 /sbin/ .. /logs/spark-had
oop-org.apache.spark.deploy.worker.Worker-1-slave1.outslave2: starting org.apache.spark.deploy.worker.Worker, logging to  /home/hadoop/spark-1 .0.2-bin-hadoop2 /sbin/ .. /logs/spark-had
oop-org.apache.spark.deploy.worker.Worker-1-slave2.out


模拟故障

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
[hadoop@master ~]$ spark-shell
scala> val rdd1 = sc.textFile( "hdfs://master:9000/tmp/wordcount.txt" )
15 /03/24  19:57:04 INFO storage.MemoryStore: ensureFreeSpace(70225) called with curMem=141503, maxMem=311387750
15 /03/24  19:57:04 INFO storage.MemoryStore: Block broadcast_1 stored as values to memory (estimated size 68.6 KB,  free  296.8 MB)
rdd1: org.apache.spark.rdd.RDD[String] = MappedRDD[3] at textFile at <console>:12
 
scala> rdd1.first()
15 /03/24  19:57:06 INFO mapred.FileInputFormat: Total input paths to process : 1
15 /03/24  19:57:06 INFO spark.SparkContext: Starting job: first at <console>:15
15 /03/24  19:57:06 INFO scheduler.DAGScheduler: Got job 0 (first at <console>:15) with 1 output partitions (allowLocal= true )
15 /03/24  19:57:06 INFO scheduler.DAGScheduler: Final stage: Stage 0(first at <console>:15)
15 /03/24  19:57:06 INFO scheduler.DAGScheduler: Parents of final stage: List()
15 /03/24  19:57:06 INFO scheduler.DAGScheduler: Missing parents: List()
15 /03/24  19:57:06 INFO scheduler.DAGScheduler: Computing the requested partition locally
15 /03/24  19:57:06 INFO rdd.HadoopRDD: Input  split : hdfs: //master :9000 /tmp/wordcount .txt:0+26
15 /03/24  19:57:06 INFO Configuration.deprecation: mapred.tip. id  is deprecated. Instead, use mapreduce.task. id
15 /03/24  19:57:06 INFO Configuration.deprecation: mapred.task. id  is deprecated. Instead, use mapreduce.task.attempt. id
15 /03/24  19:57:06 INFO Configuration.deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap
15 /03/24  19:57:06 INFO Configuration.deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition
15 /03/24  19:57:06 INFO Configuration.deprecation: mapred.job. id  is deprecated. Instead, use mapreduce.job. id
15 /03/24  19:57:06 INFO spark.SparkContext: Job finished: first at <console>:15, took 0.320624426 s
res1: String = hello world
 
[hadoop@master ~]$ jps
3543 QuorumPeerMain
3631 ResourceManager
3388 SecondaryNameNode
10261 Jps
9935 Master
10071 SparkSubmit
3245 NameNode
[hadoop@master ~]$  kill  9935

触发故障

wKioL1URUuHTkZWJAAb4QlnGEDk536.jpg

重新启动Master

1
2
3
[hadoop@master ~]$  cd  spark /sbin/
[hadoop@master sbin]$ . /start-master .sh 
starting org.apache.spark.deploy.master.Master, logging to  /home/hadoop/spark-1 .0.2-bin-hadoop2 /sbin/ .. /logs/spark-hadoop-org .apache.spark.deploy.master.Master-1-master.out


查看数据是否还存在

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
scala> rdd1.count()
15 /03/24  20:00:02 INFO spark.SparkContext: Starting job: count at <console>:15
15 /03/24  20:00:02 INFO scheduler.DAGScheduler: Got job 1 (count at <console>:15) with 2 output partitions (allowLocal= false )
15 /03/24  20:00:02 INFO scheduler.DAGScheduler: Final stage: Stage 1(count at <console>:15)
15 /03/24  20:00:02 INFO scheduler.DAGScheduler: Parents of final stage: List()
15 /03/24  20:00:02 INFO scheduler.DAGScheduler: Missing parents: List()
15 /03/24  20:00:02 INFO scheduler.DAGScheduler: Submitting Stage 1 (MappedRDD[3] at textFile at <console>:12),  which  has no mis
sing parents15 /03/24  20:00:02 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from Stage 1 (MappedRDD[3] at textFile at <console>:
12)15 /03/24  20:00:02 INFO scheduler.TaskSchedulerImpl: Adding task  set  1.0 with 2 tasks
15 /03/24  20:00:02 INFO scheduler.TaskSetManager: Starting task 1.0:0 as TID 0 on executor 1: slave1 (NODE_LOCAL)
15 /03/24  20:00:02 INFO scheduler.TaskSetManager: Serialized task 1.0:0 as 1717 bytes  in  4 ms
15 /03/24  20:00:02 INFO scheduler.TaskSetManager: Starting task 1.0:1 as TID 1 on executor 0: slave2 (NODE_LOCAL)
15 /03/24  20:00:02 INFO scheduler.TaskSetManager: Serialized task 1.0:1 as 1717 bytes  in  1 ms
15 /03/24  20:00:04 INFO scheduler.TaskSetManager: Finished TID 1  in  2530 ms on slave2 (progress: 1 /2 )
15 /03/24  20:00:04 INFO scheduler.DAGScheduler: Completed ResultTask(1, 1)
15 /03/24  20:00:04 INFO scheduler.DAGScheduler: Completed ResultTask(1, 0)
15 /03/24  20:00:04 INFO scheduler.TaskSetManager: Finished TID 0  in  2641 ms on slave1 (progress: 2 /2 )
15 /03/24  20:00:04 INFO scheduler.DAGScheduler: Stage 1 (count at <console>:15) finished  in  2.645 s
15 /03/24  20:00:04 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 
15 /03/24  20:00:04 INFO spark.SparkContext: Job finished: count at <console>:15, took 2.778519654 s
res2: Long = 5
 
# 数据正常
 
# 查看备份文件
[hadoop@master sbin]$  cd  /app/hadoop/spark/recovery/
[hadoop@master recovery]$  ls
app_app-20150324195542-0000  worker_worker-20150324195414-slave1-56995  worker_worker-20150324195414-slave2-33947


基于Zookeeper的HA,在这里3台节点,已经部署好zookeeper,并启动

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
[hadoop@master ~]$  cd  /home/hadoop/spark/conf/
# 修改spark-env.conf
JAVA_HOME= /usr/java/jdk
SPARK_DAEMON_JAVA_OPTS= "-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=master:2181,slave1:2181,slave2:2181 -Dspark.deploy.zookeeper.dir=/app/hadoop/spark/zookeeper"
#SPARK_MASTER_IP=master
SPARK_MASTER_PORT=7077
SPARK_WORKER_CORES=1
SPARK_WORKER_MEMORY=512m
#MASTER=spark://${SPARK_MASTER_IP}:${SPARK_MASTER_PORT}
 
# 在slave1,slave2执行相同的操作
 
# 在master节点重启spark
[hadoop@master ~]$  cd  /home/hadoop/spark/sbin/
[hadoop@master sbin]$ . /stop-all .sh 
slave1: stopping org.apache.spark.deploy.worker.Worker
slave2: stopping org.apache.spark.deploy.worker.Worker
stopping org.apache.spark.deploy.master.Master
[hadoop@master sbin]$ . /start-all .sh 
starting org.apache.spark.deploy.master.Master, logging to  /home/hadoop/spark-1 .0.2-bin-hadoop2 /sbin/ .. /logs/spark-hadoop-org .
apache.spark.deploy.master.Master-1-master.outslave2: starting org.apache.spark.deploy.worker.Worker, logging to  /home/hadoop/spark-1 .0.2-bin-hadoop2 /sbin/ .. /logs/spark-had
oop-org.apache.spark.deploy.worker.Worker-1-slave2.outslave1: starting org.apache.spark.deploy.worker.Worker, logging to  /home/hadoop/spark-1 .0.2-bin-hadoop2 /sbin/ .. /logs/spark-had
oop-org.apache.spark.deploy.worker.Worker-1-slave1.out 
 
[hadoop@master sbin]$ jps
3543 QuorumPeerMain
3631 ResourceManager
3388 SecondaryNameNode
10692 Master
10812 Jps
3245 NameNode
 
# 在slave1上启动另一个master
[hadoop@slave1 sbin]$ . /start-master .sh 
starting org.apache.spark.deploy.master.Master, logging to  /home/hadoop/spark-1 .0.2-bin-hadoop2 /sbin/ .. /logs/spark-hadoop-org .apache.spark.deploy.master.Master-1-slave1.out
[hadoop@slave1 sbin]$ jps
4020 DataNode
4182 QuorumPeerMain
6435 Worker
6619 Jps
6545 Master
4262 NodeManager


查看master节点上,状态为ALIVE

wKiom1URV5GwfcsPAAMULm5id_k174.jpg

查看slave1节点的master状态为:STANDBY

wKioL1URWPuRVrA2AAJgQuWqm5s628.jpg


模拟故障,杀死master节点的Master进程

1
2
3
4
5
6
7
8
[hadoop@master sbin]$ jps
3543 QuorumPeerMain
3631 ResourceManager
10834 Jps
3388 SecondaryNameNode
10692 Master
3245 NameNode
[hadoop@master sbin]$  kill  10692

查看slave1上的Master状态变为ALIVE,已自动切换

wKioL1URWd_DzoviAAMFoRr4M7A401.jpg


使用spark-shell验证

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
[hadoop@master sbin]$ MASTER=spark: //master :7077,slave1:7077 spark-shell
Spark assembly has been built with Hive, including Datanucleus jars on classpath
15 /03/24  20:42:28 INFO spark.SecurityManager: Changing view acls to: hadoop
15 /03/24  20:42:28 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled;  users  with view permissions: Set(hadoop)
15 /03/24  20:42:28 INFO spark.HttpServer: Starting HTTP Server
15 /03/24  20:42:28 INFO server.Server: jetty-8.y.z-SNAPSHOT
15 /03/24  20:42:28 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:42811
Welcome to
       ____              __
      / __ /__   ___ _____/  /__
     _\ \/ _ \/ _ `/ __/  '_/
    /___/  .__/\_,_ /_/  /_/ \_\   version 1.0.2
       /_/
 
Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_25)
Type  in  expressions to have them evaluated.
Type :help  for  more  information.
15 /03/24  20:42:36 INFO spark.SecurityManager: Changing view acls to: hadoop
15 /03/24  20:42:36 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled;  users  with view permissions: Set(hadoop)
15 /03/24  20:42:36 INFO slf4j.Slf4jLogger: Slf4jLogger started
15 /03/24  20:42:36 INFO Remoting: Starting remoting
15 /03/24  20:42:37 INFO Remoting: Remoting started; listening on addresses :[akka.tcp: //spark @master:35418]
15 /03/24  20:42:37 INFO Remoting: Remoting now listens on addresses: [akka.tcp: //spark @master:35418]
15 /03/24  20:42:37 INFO spark.SparkEnv: Registering MapOutputTracker
15 /03/24  20:42:37 INFO spark.SparkEnv: Registering BlockManagerMaster
15 /03/24  20:42:37 INFO storage.DiskBlockManager: Created  local  directory at  /tmp/spark-local-20150324204237-ed6e
15 /03/24  20:42:37 INFO storage.MemoryStore: MemoryStore started with capacity 297.0 MB.
15 /03/24  20:42:37 INFO network.ConnectionManager: Bound socket to port 39310 with  id  = ConnectionManagerId(master,39310)
15 /03/24  20:42:37 INFO storage.BlockManagerMaster: Trying to register BlockManager
15 /03/24  20:42:37 INFO storage.BlockManagerInfo: Registering block manager master:39310 with 297.0 MB RAM
15 /03/24  20:42:37 INFO storage.BlockManagerMaster: Registered BlockManager
15 /03/24  20:42:37 INFO spark.HttpServer: Starting HTTP Server
15 /03/24  20:42:37 INFO server.Server: jetty-8.y.z-SNAPSHOT
15 /03/24  20:42:37 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:54434
15 /03/24  20:42:37 INFO broadcast.HttpBroadcast: Broadcast server started at http: //192 .168.1.2:54434
15 /03/24  20:42:37 INFO spark.HttpFileServer: HTTP File server directory is  /tmp/spark-9c9136b5-274f-4ce0-82ba-4eeabae0e392
15 /03/24  20:42:37 INFO spark.HttpServer: Starting HTTP Server
15 /03/24  20:42:37 INFO server.Server: jetty-8.y.z-SNAPSHOT
15 /03/24  20:42:37 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:39100
15 /03/24  20:42:38 INFO server.Server: jetty-8.y.z-SNAPSHOT
15 /03/24  20:42:38 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040
15 /03/24  20:42:38 INFO ui.SparkUI: Started SparkUI at http: //master :4040
15 /03/24  20:42:39 WARN util.NativeCodeLoader: Unable to load native-hadoop library  for  your platform... using  builtin -java classes where applicable
15 /03/24  20:42:39 INFO client.AppClient$ClientActor: Connecting to master spark: //master :7077...
15 /03/24  20:42:39 INFO client.AppClient$ClientActor: Connecting to master spark: //slave1 :7077...
15 /03/24  20:42:39 INFO repl.SparkILoop: Created spark context..
15 /03/24  20:42:40 WARN client.AppClient$ClientActor: Could not connect to akka.tcp: //sparkMaster @master:7077: akka.remote.EndpointAssociationException: Association failed with [akka.tcp: //spar
kMaster@master:7077]15 /03/24  20:42:40 WARN client.AppClient$ClientActor: Could not connect to akka.tcp: //sparkMaster @master:7077: akka.remote.EndpointAssociationException: Association failed with [akka.tcp: //spar
kMaster@master:7077]15 /03/24  20:42:40 WARN client.AppClient$ClientActor: Could not connect to akka.tcp: //sparkMaster @master:7077: akka.remote.EndpointAssociationException: Association failed with [akka.tcp: //spar
kMaster@master:7077]15 /03/24  20:42:40 WARN client.AppClient$ClientActor: Could not connect to akka.tcp: //sparkMaster @master:7077: akka.remote.EndpointAssociationException: Association failed with [akka.tcp: //spar
kMaster@master:7077]Spark context available as sc.
 
scala> 15 /03/24  20:42:40 INFO cluster.SparkDeploySchedulerBackend: Connected to Spark cluster with app ID app-20150324204239-0000
15 /03/24  20:42:40 INFO client.AppClient$ClientActor: Executor added: app-20150324204239-0000 /0  on worker-20150324202125-slave2-60861 (slave2:60861) with 1 cores
15 /03/24  20:42:40 INFO cluster.SparkDeploySchedulerBackend: Granted executor ID app-20150324204239-0000 /0  on hostPort slave2:60861 with 1 cores, 512.0 MB RAM
15 /03/24  20:42:40 INFO client.AppClient$ClientActor: Executor added: app-20150324204239-0000 /1  on worker-20150324202125-slave1-48347 (slave1:48347) with 1 cores
15 /03/24  20:42:40 INFO cluster.SparkDeploySchedulerBackend: Granted executor ID app-20150324204239-0000 /1  on hostPort slave1:48347 with 1 cores, 512.0 MB RAM
15 /03/24  20:42:40 INFO client.AppClient$ClientActor: Executor updated: app-20150324204239-0000 /0  is now RUNNING
15 /03/24  20:42:40 INFO client.AppClient$ClientActor: Executor updated: app-20150324204239-0000 /1  is now RUNNING
15 /03/24  20:42:45 INFO cluster.SparkDeploySchedulerBackend: Registered executor: Actor[akka.tcp: //sparkExecutor @slave2:56126 /user/Executor #251519544] with ID 0
15 /03/24  20:42:46 INFO storage.BlockManagerInfo: Registering block manager slave2:42208 with 297.0 MB RAM
15 /03/24  20:42:51 INFO cluster.SparkDeploySchedulerBackend: Registered executor: Actor[akka.tcp: //sparkExecutor @slave1:36476 /user/Executor #1937793409] with ID 1
15 /03/24  20:42:53 INFO storage.BlockManagerInfo: Registering block manager slave1:40644 with 297.0 MB RAM
 
 
scala>


发现使用正常


查看zookeeper上面的注册,信息

1
2
3
[hadoop@master bin]$ . /zkCli .sh
[zk: localhost:2181(CONNECTED) 3]  ls  /app/hadoop/spark/zookeeper
[master_status, leader_election]


重新启动master上面的Master进程

1
2
[hadoop@master sbin]$ . /start-master .sh 
starting org.apache.spark.deploy.master.Master, logging to  /home/hadoop/spark-1 .0.2-bin-hadoop2 /sbin/ .. /logs/spark-hadoop-org .apache.spark.deploy.master.Master-1-master.out

发现已经变为STANDBY

wKiom1URWxiyTkcOAAJRLVJSZZc457.jpg



配置历史任务服务器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
[hadoop@master ~]$  cd  /home/hadoop/spark/conf/
[hadoop@master conf]$  cp  spark-defaults.conf.template spark-defaults.conf
 
# 修改spark-defaults.conf
spark.eventLog.enabled   true
spark.eventLog. dir       hdfs: //master :9000 /spark/log
spark.yarn.historyServer.address master:18080
 
# 将配置文件传送到slave1,slave2
 
# 创建日志目录
[hadoop@master ~]$ hdfs dfs - mkdir  -p  /spark/log
15 /03/25  10:54:50 WARN util.NativeCodeLoader: Unable to load native-hadoop library  for  your platform... using  builtin -java cla
sses where applicable[hadoop@master ~]$ hdfs dfs - ls  /
15 /03/25  10:54:57 WARN util.NativeCodeLoader: Unable to load native-hadoop library  for  your platform... using  builtin -java cla
sses where applicableFound 3 items
drwxr-xr-x   - hadoop supergroup          0 2015-03-24 12:47  /hbase
drwxr-xr-x   - hadoop supergroup          0 2015-03-25 10:54  /spark
drwxrwx---   - hadoop supergroup          0 2015-03-24 14:17  /tmp
 
# 修改spark-env.conf文件
JAVA_HOME= /usr/java/jdk
#SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=master:2181,slave1:2181,slave2:218
1 -Dspark.deploy.zookeeper. dir = /app/hadoop/spark/zookeeper "SPARK_MASTER_IP=master
SPARK_MASTER_PORT=7077
SPARK_WORKER_CORES=1
SPARK_WORKER_MEMORY=512m
MASTER=spark: // ${SPARK_MASTER_IP}:${SPARK_MASTER_PORT}
 
# 将配置文件传送到slave1,slave2
 
 
# 重新启动spark集群
[hadoop@master sbin]$ . /start-all .sh 
starting org.apache.spark.deploy.master.Master, logging to  /home/hadoop/spark-1 .0.2-bin-hadoop2 /sbin/ .. /logs/spark-hadoop-org .
apache.spark.deploy.master.Master-1-master.outslave2: starting org.apache.spark.deploy.worker.Worker, logging to  /home/hadoop/spark-1 .0.2-bin-hadoop2 /sbin/ .. /logs/spark-had
oop-org.apache.spark.deploy.worker.Worker-1-slave2.outslave1: starting org.apache.spark.deploy.worker.Worker, logging to  /home/hadoop/spark-1 .0.2-bin-hadoop2 /sbin/ .. /logs/spark-had
oop-org.apache.spark.deploy.worker.Worker-1-slave1.out[hadoop@master sbin]$ jps
2298 SecondaryNameNode
2131 NameNode
2593 JobHistoryServer
2481 ResourceManager
3125 Master
3214 Jps
 
 
# 启动historyserver
[hadoop@master sbin]$ . /start-history-server .sh hdfs: //master :9000 /spark/log
starting org.apache.spark.deploy. history .HistoryServer, logging to  /home/hadoop/spark-1 .0.2-bin-hadoop2 /sbin/ .. /logs/spark-had
oop-org.apache.spark.deploy. history .HistoryServer-1-master.out[hadoop@master sbin]$ jps
2298 SecondaryNameNode
2131 NameNode
2593 JobHistoryServer
3550 HistoryServer
2481 ResourceManager
3362 Master
3600 Jps
 
# 提交一个应用
[hadoop@master sbin]$ spark-shell
scala> val rdd1 = sc.textFile( "hdfs://master:9000/tmp/wordcount.txt" )
15 /03/25  11:11:32 INFO storage.MemoryStore: ensureFreeSpace(180779) called with curMem=180731, maxMem=311387750
15 /03/25  11:11:32 INFO storage.MemoryStore: Block broadcast_1 stored as values to memory (estimated size 176.5 KB,  free  296.6 
MB)rdd1: org.apache.spark.rdd.RDD[String] = MappedRDD[3] at textFile at <console>:12
scala> rdd1.count()
15 /03/25  11:11:57 INFO mapred.FileInputFormat: Total input paths to process : 1
15 /03/25  11:11:57 INFO spark.SparkContext: Starting job: count at <console>:15
15 /03/25  11:11:57 INFO scheduler.DAGScheduler: Got job 0 (count at <console>:15) with 2 output partitions (allowLocal= false )
15 /03/25  11:11:57 INFO scheduler.DAGScheduler: Final stage: Stage 0(count at <console>:15)
15 /03/25  11:11:57 INFO scheduler.DAGScheduler: Parents of final stage: List()
15 /03/25  11:11:57 INFO scheduler.DAGScheduler: Missing parents: List()
15 /03/25  11:11:57 INFO scheduler.DAGScheduler: Submitting Stage 0 (MappedRDD[3] at textFile at <console>:12),  which  has no mis
sing parents15 /03/25  11:11:57 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from Stage 0 (MappedRDD[3] at textFile at <console>:
12)15 /03/25  11:11:57 INFO scheduler.TaskSchedulerImpl: Adding task  set  0.0 with 2 tasks
15 /03/25  11:11:57 INFO scheduler.TaskSetManager: Starting task 0.0:0 as TID 0 on executor 2: slave2 (NODE_LOCAL)
15 /03/25  11:11:57 INFO scheduler.TaskSetManager: Serialized task 0.0:0 as 1717 bytes  in  7 ms
15 /03/25  11:11:57 INFO scheduler.TaskSetManager: Starting task 0.0:1 as TID 1 on executor 0: slave1 (NODE_LOCAL)
15 /03/25  11:11:57 INFO scheduler.TaskSetManager: Serialized task 0.0:1 as 1717 bytes  in  1 ms
15 /03/25  11:12:04 INFO scheduler.TaskSetManager: Finished TID 0  in  6578 ms on slave2 (progress: 1 /2 )
15 /03/25  11:12:04 INFO scheduler.DAGScheduler: Completed ResultTask(0, 0)
15 /03/25  11:12:04 INFO scheduler.DAGScheduler: Completed ResultTask(0, 1)
15 /03/25  11:12:04 INFO scheduler.TaskSetManager: Finished TID 1  in  7216 ms on slave1 (progress: 2 /2 )
15 /03/25  11:12:04 INFO scheduler.DAGScheduler: Stage 0 (count at <console>:15) finished  in  7.232 s
15 /03/25  11:12:04 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
15 /03/25  11:12:04 INFO spark.SparkContext: Job finished: count at <console>:15, took 7.564410596 s
res1: Long = 5
scala> sc.stop()
15 /03/25  11:13:06 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{ /metrics/json ,null}
15 /03/25  11:13:06 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{ /stages/stage/kill ,null}
15 /03/25  11:13:06 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/,null}
15 /03/25  11:13:06 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{ /static ,null}
15 /03/25  11:13:06 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{ /executors/json ,null}
15 /03/25  11:13:06 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{ /executors ,null}
15 /03/25  11:13:06 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{ /environment/json ,null}
15 /03/25  11:13:06 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{ /environment ,null}
15 /03/25  11:13:06 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{ /storage/rdd/json ,null}
15 /03/25  11:13:06 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{ /storage/rdd ,null}
15 /03/25  11:13:06 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{ /storage/json ,null}
15 /03/25  11:13:06 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{ /storage ,null}
15 /03/25  11:13:06 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{ /stages/pool/json ,null}
15 /03/25  11:13:06 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{ /stages/pool ,null}
15 /03/25  11:13:06 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{ /stages/stage/json ,null}
15 /03/25  11:13:06 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{ /stages/stage ,null}
15 /03/25  11:13:06 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{ /stages/json ,null}
15 /03/25  11:13:06 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{ /stages ,null}
15 /03/25  11:13:06 INFO ui.SparkUI: Stopped Spark web UI at http: //master :4040
15 /03/25  11:13:06 INFO scheduler.DAGScheduler: Stopping DAGScheduler
15 /03/25  11:13:06 INFO cluster.SparkDeploySchedulerBackend: Shutting down all executors
15 /03/25  11:13:06 INFO cluster.SparkDeploySchedulerBackend: Asking each executor to shut down
15 /03/25  11:13:08 WARN thread.QueuedThreadPool: 1 threads could not be stopped
15 /03/25  11:13:08 INFO thread.QueuedThreadPool: Couldn't stop Thread[qtp491327803-55 Acceptor0 SocketConnector@0.0.0.0:46318,5
,main]15 /03/25  11:13:08 INFO thread.QueuedThreadPool:  at java.net.SocketException.<init>(SocketException.java:47)
15 /03/25  11:13:08 INFO thread.QueuedThreadPool:  at java.net.PlainSocketImpl.socketAccept(Native Method)
15 /03/25  11:13:08 INFO thread.QueuedThreadPool:  at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:398)
15 /03/25  11:13:08 INFO thread.QueuedThreadPool:  at java.net.ServerSocket.implAccept(ServerSocket.java:530)
15 /03/25  11:13:08 INFO thread.QueuedThreadPool:  at java.net.ServerSocket.accept(ServerSocket.java:498)
15 /03/25  11:13:08 INFO thread.QueuedThreadPool:  at org.eclipse.jetty.server.bio.SocketConnector.accept(SocketConnector.java:1
17)15 /03/25  11:13:08 INFO thread.QueuedThreadPool:  at org.eclipse.jetty.server.AbstractConnector$Acceptor.run(AbstractConnector.
java:938)15 /03/25  11:13:08 INFO thread.QueuedThreadPool:  at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.jav
a:608)15 /03/25  11:13:08 INFO thread.QueuedThreadPool:  at org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java
:543)15 /03/25  11:13:08 INFO thread.QueuedThreadPool:  at java.lang.Thread.run(Thread.java:724)
15 /03/25  11:13:08 INFO spark.MapOutputTrackerMasterActor: MapOutputTrackerActor stopped!
15 /03/25  11:13:09 INFO network.ConnectionManager: Selector thread was interrupted!
15 /03/25  11:13:09 INFO network.ConnectionManager: ConnectionManager stopped
15 /03/25  11:13:09 INFO storage.MemoryStore: MemoryStore cleared
15 /03/25  11:13:09 INFO storage.BlockManager: BlockManager stopped
15 /03/25  11:13:09 INFO storage.BlockManagerMasterActor: Stopping BlockManagerMaster
15 /03/25  11:13:09 INFO storage.BlockManagerMaster: BlockManagerMaster stopped
15 /03/25  11:13:09 INFO remote.RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
15 /03/25  11:13:09 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing rem
ote transports.15 /03/25  11:13:10 INFO spark.SparkContext: Successfully stopped SparkContext
 
scala>  exit
warning: there were 1 deprecation warning(s); re-run with -deprecation  for  details


查看历史任务信息,http://192.168.1.2:18080/

wKiom1USKR2D_ph2AAHTG_nZ6_8935.jpg









     本文转自ljl_19880709 51CTO博客,原文链接:http://blog.51cto.com/luojianlong/1622837,如需转载请自行联系原作者





相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
相关文章
|
分布式计算 Java Hadoop
|
22天前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
59 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
1月前
|
存储 分布式计算 算法
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
62 0
|
1月前
|
消息中间件 分布式计算 NoSQL
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
42 0
|
1月前
|
消息中间件 存储 分布式计算
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
90 0
|
23天前
|
SQL 机器学习/深度学习 分布式计算
Spark快速上手:揭秘大数据处理的高效秘密,让你轻松应对海量数据
【10月更文挑战第25天】本文全面介绍了大数据处理框架 Spark,涵盖其基本概念、安装配置、编程模型及实际应用。Spark 是一个高效的分布式计算平台,支持批处理、实时流处理、SQL 查询和机器学习等任务。通过详细的技术综述和示例代码,帮助读者快速掌握 Spark 的核心技能。
48 6
|
21天前
|
存储 分布式计算 Hadoop
数据湖技术:Hadoop与Spark在大数据处理中的协同作用
【10月更文挑战第27天】在大数据时代,数据湖技术凭借其灵活性和成本效益成为企业存储和分析大规模异构数据的首选。Hadoop和Spark作为数据湖技术的核心组件,通过HDFS存储数据和Spark进行高效计算,实现了数据处理的优化。本文探讨了Hadoop与Spark的最佳实践,包括数据存储、处理、安全和可视化等方面,展示了它们在实际应用中的协同效应。
72 2
|
22天前
|
存储 分布式计算 Hadoop
数据湖技术:Hadoop与Spark在大数据处理中的协同作用
【10月更文挑战第26天】本文详细探讨了Hadoop与Spark在大数据处理中的协同作用,通过具体案例展示了两者的最佳实践。Hadoop的HDFS和MapReduce负责数据存储和预处理,确保高可靠性和容错性;Spark则凭借其高性能和丰富的API,进行深度分析和机器学习,实现高效的批处理和实时处理。
59 1