1. 运行环境
- OS:
[root@karei spark-1.3.1-bin-hadoop2.6]# uname -a
Linux karei 2.6.18-371.4.1.el5 #1 SMP Wed Jan 8 18:42:07 EST 2014 x86_64 x86_64 x86_64 GNU/Linux
[root@karei spark-1.3.1-bin-hadoop2.6]#
- CPU:
[root@karei spark-1.3.1-bin-hadoop2.6]# cat /proc/cpuinfo | grep name | cut -d':' -f2 | uniq -c
4 Intel(R) Xeon(R) CPU X5270 @ 3.50GHz
[root@karei spark-1.3.1-bin-hadoop2.6]#
[root@karei spark-1.3.1-bin-hadoop2.6]#
[root@karei spark-1.3.1-bin-hadoop2.6]# cat /proc/cpuinfo | grep "physical id" | uniq -c
1 physical id : 0
1 physical id : 3
1 physical id : 0
1 physical id : 3
[root@karei spark-1.3.1-bin-hadoop2.6]#
- Memory:
[root@karei spark-1.3.1-bin-hadoop2.6]#
[root@karei spark-1.3.1-bin-hadoop2.6]# grep MemTotal /proc/meminfo
MemTotal: 32961052 kB
[root@karei spark-1.3.1-bin-hadoop2.6]#
[root@karei spark-1.3.1-bin-hadoop2.6]#
2. 下载Spark
我用的是spark-1.3.1-bin-hadoop2.6.tgz
http://spark.apache.org/downloads.html
3. Linux上解压Spark压缩包
[root@karei opt]# ll /opt/spark-1.3.1-bin-hadoop2.6/
total 460
-rwxrw-r-- 1 1000 1000 278851 Apr 11 08:32 CHANGES.txt
-rwxrw-r-- 1 1000 1000 46083 Apr 11 08:32 LICENSE
-rwxrw-r-- 1 1000 1000 22559 Apr 11 08:32 NOTICE
-rwxrw-r-- 1 1000 1000 3629 Apr 11 08:32 README.md
-rwxrw-r-- 1 1000 1000 157 Apr 11 08:32 RELEASE
drwxrwxr-x 2 1000 1000 4096 Apr 11 08:32 bin
drwxrwxr-x 2 1000 1000 4096 Apr 11 08:32 conf
drwxrwxr-x 3 1000 1000 4096 Apr 11 08:32 data
drwxrwxr-x 3 1000 1000 4096 Apr 11 08:32 ec2
drwxrwxr-x 3 1000 1000 4096 Apr 11 08:32 examples
drwxrwxr-x 2 1000 1000 4096 Apr 11 08:32 lib
drwxr-xr-x 2 root root 4096 Apr 27 10:49 mylib
drwxrwxr-x 7 1000 1000 4096 Apr 11 08:32 python
drwxrwxr-x 2 1000 1000 4096 Apr 11 08:32 sbin
drwxr-xr-x 4 root root 4096 Apr 27 11:10 tmp
[root@karei opt]#
4. Linux上配置Spark环境变量
[root@karei opt]# export PATH=/opt/spark-1.3.1-bin-hadoop2.6/bin:$PATH
5. Spark Sample开发
Maven(3.2.3) + Eclipse(Luna 4.4.1) + Spark(1.3.1) + JDK(1.8.45)
- pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>shuai.spark</groupId> <artifactId>spark-demo</artifactId> <version>1.0</version> <packaging>jar</packaging> <name>spark-demo</name> <url>http://maven.apache.org</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <spark.version>1.3.1</spark.version> <junit.version>4.11</junit.version> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>3.1</version> </dependency> <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> <version>2.4</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.10</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <scope>test</scope> <version>${junit.version}</version> </dependency> <dependency> <groupId>org.hamcrest</groupId> <artifactId>hamcrest-all</artifactId> <scope>test</scope> <version>1.3</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.3</version> </plugin> <!-- <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> </execution> </executions> <configuration> <finalName>spark-demo</finalName> <shadedArtifactAttached>true</shadedArtifactAttached> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>shuai.spark.SparkDemo</mainClass> </transformer> </transformers> </configuration> </plugin> --> </plugins> </build> </project>
- SparkDemo.java
package shuai.spark; import java.io.File; import java.util.Arrays; import org.apache.commons.io.FileUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import scala.Tuple2; public class SparkDemo { private static JavaSparkContext javaSparkContext = null; public static void init(String appName, String master) { SparkConf conf = new SparkConf().setAppName(appName).setMaster(master); javaSparkContext = new JavaSparkContext(conf); } @SuppressWarnings("serial") private static void wordCount(String filePath, String fileDir) { FileUtils.deleteQuietly(new File(fileDir)); JavaRDD<String> file = javaSparkContext.textFile(filePath); JavaRDD<String> words = file.flatMap(new FlatMapFunction<String, String>() { @Override public Iterable<String> call(String s) { return Arrays.asList(s.split(" ")); } }); JavaPairRDD<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String s) { return new Tuple2<String, Integer>(s, 1); } }); JavaPairRDD<String, Integer> counts = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer a, Integer b) { return a + b; } }); counts.saveAsTextFile(fileDir); } @SuppressWarnings("serial") private static void errorCount(String filePath) { JavaRDD<String> file = javaSparkContext.textFile(filePath); JavaRDD<String> errors = file.filter(new Function<String, Boolean>() { @Override public Boolean call(String s) { return s.contains("ERROR"); } }); errors.count(); errors.filter(new Function<String, Boolean>() { @Override public Boolean call(String s) { return s.contains("ORACLE"); } }).count(); errors.filter(new Function<String, Boolean>() { @Override public Boolean call(String s) { return s.contains("ORACLE"); } }).collect(); } public static void main(String[] args) { if (args.length < 2) { System.err.println("Please provide the input file full path and output dir as argument"); System.exit(0); } SparkDemo.init("shuai.spark", "local"); SparkDemo.wordCount(args[0], args[1]); SparkDemo.errorCount(args[0]); } }
6. Linux上部署Spark
创建相应的目录和文件,并将spark-demo-1.0.jar部署到环境上
[root@karei mylib]# ll /usr/local/spark/mylib/spark-demo-1.0.jar
-rwxr--r-- 1 spark bigdata 9027 Apr 27 11:05 /usr/local/spark/mylib/spark-demo-1.0.jar
[root@karei mylib]#
[root@karei mylib]# ll /home/hadoop/input/test.log
-rw-r--r-- 1 root root 119 Apr 30 08:14 /home/hadoop/input/test.log
[root@karei mylib]#
[root@karei spark]# cat /home/hadoop/input/test.log INFO : Spark testing start WARN : Hadoop HDFS Configuration ERROR : Database ORACLE Operation INFO : Spark testing end [root@karei spark]#
7. Sample运行
[root@karei spark]#
[root@karei spark]# ./bin/spark-submit --class shuai.spark.SparkDemo --master local[2] mylib/spark-demo-1.0.jar /home/hadoop/input/test.log /home/hadoop/output
[root@karei spark]#
[root@karei spark]# [root@karei spark]# ./bin/spark-submit --class shuai.spark.SparkDemo --master local[2] mylib/spark-demo-1.0.jar /home/hadoop/input/test.log /home/hadoop/output 15/04/30 08:24:39 INFO spark.SparkContext: Running Spark version 1.3.1 15/04/30 08:24:39 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 15/04/30 08:24:39 INFO spark.SecurityManager: Changing view acls to: root 15/04/30 08:24:39 INFO spark.SecurityManager: Changing modify acls to: root 15/04/30 08:24:39 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); users with modify permissions: Set(root) 15/04/30 08:24:40 INFO slf4j.Slf4jLogger: Slf4jLogger started 15/04/30 08:24:40 INFO Remoting: Starting remoting 15/04/30 08:24:40 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@karei.netact.noklab.net:41232] 15/04/30 08:24:40 INFO util.Utils: Successfully started service 'sparkDriver' on port 41232. 15/04/30 08:24:40 INFO spark.SparkEnv: Registering MapOutputTracker 15/04/30 08:24:40 INFO spark.SparkEnv: Registering BlockManagerMaster 15/04/30 08:24:40 INFO storage.DiskBlockManager: Created local directory at /tmp/spark-5713b972-849e-44c9-9e92-8cd0b4de7668/blockmgr-24d0ad17-fef5-4179-9018-39057c519c5b 15/04/30 08:24:40 INFO storage.MemoryStore: MemoryStore started with capacity 265.0 MB 15/04/30 08:24:40 INFO spark.HttpFileServer: HTTP File server directory is /tmp/spark-ea736e60-3575-49b7-b8ba-1b2f8587bac3/httpd-eb6af892-2e5c-4331-a510-d882af085db4 15/04/30 08:24:40 INFO spark.HttpServer: Starting HTTP Server 15/04/30 08:24:40 INFO server.Server: jetty-8.y.z-SNAPSHOT 15/04/30 08:24:40 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:32977 15/04/30 08:24:40 INFO util.Utils: Successfully started service 'HTTP file server' on port 32977. 15/04/30 08:24:40 INFO spark.SparkEnv: Registering OutputCommitCoordinator 15/04/30 08:24:40 INFO server.Server: jetty-8.y.z-SNAPSHOT 15/04/30 08:24:40 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040 15/04/30 08:24:40 INFO util.Utils: Successfully started service 'SparkUI' on port 4040. 15/04/30 08:24:40 INFO ui.SparkUI: Started SparkUI at http://karei.netact.noklab.net:4040 15/04/30 08:24:40 INFO spark.SparkContext: Added JAR file:/usr/local/spark/mylib/spark-demo-1.0.jar at http://10.9.167.55:32977/jars/spark-demo-1.0.jar with timestamp 1430371480725 15/04/30 08:24:40 INFO executor.Executor: Starting executor ID <driver> on host localhost 15/04/30 08:24:40 INFO util.AkkaUtils: Connecting to HeartbeatReceiver: akka.tcp://sparkDriver@karei.netact.noklab.net:41232/user/HeartbeatReceiver 15/04/30 08:24:40 INFO netty.NettyBlockTransferService: Server created on 34209 15/04/30 08:24:40 INFO storage.BlockManagerMaster: Trying to register BlockManager 15/04/30 08:24:40 INFO storage.BlockManagerMasterActor: Registering block manager localhost:34209 with 265.0 MB RAM, BlockManagerId(<driver>, localhost, 34209) 15/04/30 08:24:40 INFO storage.BlockManagerMaster: Registered BlockManager 15/04/30 08:24:41 WARN util.SizeEstimator: Failed to check whether UseCompressedOops is set; assuming yes 15/04/30 08:24:41 INFO storage.MemoryStore: ensureFreeSpace(165175) called with curMem=0, maxMem=277877882 15/04/30 08:24:41 INFO storage.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 161.3 KB, free 264.8 MB) 15/04/30 08:24:41 INFO storage.MemoryStore: ensureFreeSpace(25610) called with curMem=165175, maxMem=277877882 15/04/30 08:24:41 INFO storage.MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 25.0 KB, free 264.8 MB) 15/04/30 08:24:41 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:34209 (size: 25.0 KB, free: 265.0 MB) 15/04/30 08:24:41 INFO storage.BlockManagerMaster: Updated info of block broadcast_0_piece0 15/04/30 08:24:41 INFO spark.SparkContext: Created broadcast 0 from textFile at SparkDemo.java:30 15/04/30 08:24:41 INFO mapred.FileInputFormat: Total input paths to process : 1 15/04/30 08:24:41 INFO Configuration.deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id 15/04/30 08:24:41 INFO Configuration.deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id 15/04/30 08:24:41 INFO Configuration.deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap 15/04/30 08:24:41 INFO Configuration.deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition 15/04/30 08:24:41 INFO Configuration.deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id 15/04/30 08:24:41 INFO spark.SparkContext: Starting job: saveAsTextFile at SparkDemo.java:53 15/04/30 08:24:41 INFO scheduler.DAGScheduler: Registering RDD 3 (mapToPair at SparkDemo.java:39) 15/04/30 08:24:41 INFO scheduler.DAGScheduler: Got job 0 (saveAsTextFile at SparkDemo.java:53) with 1 output partitions (allowLocal=false) 15/04/30 08:24:41 INFO scheduler.DAGScheduler: Final stage: Stage 1(saveAsTextFile at SparkDemo.java:53) 15/04/30 08:24:41 INFO scheduler.DAGScheduler: Parents of final stage: List(Stage 0) 15/04/30 08:24:41 INFO scheduler.DAGScheduler: Missing parents: List(Stage 0) 15/04/30 08:24:41 INFO scheduler.DAGScheduler: Submitting Stage 0 (MapPartitionsRDD[3] at mapToPair at SparkDemo.java:39), which has no missing parents 15/04/30 08:24:41 INFO storage.MemoryStore: ensureFreeSpace(4344) called with curMem=190785, maxMem=277877882 15/04/30 08:24:41 INFO storage.MemoryStore: Block broadcast_1 stored as values in memory (estimated size 4.2 KB, free 264.8 MB) 15/04/30 08:24:41 INFO storage.MemoryStore: ensureFreeSpace(3085) called with curMem=195129, maxMem=277877882 15/04/30 08:24:41 INFO storage.MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 3.0 KB, free 264.8 MB) 15/04/30 08:24:41 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:34209 (size: 3.0 KB, free: 265.0 MB) 15/04/30 08:24:41 INFO storage.BlockManagerMaster: Updated info of block broadcast_1_piece0 15/04/30 08:24:41 INFO spark.SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:839 15/04/30 08:24:41 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from Stage 0 (MapPartitionsRDD[3] at mapToPair at SparkDemo.java:39) 15/04/30 08:24:41 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0 with 1 tasks 15/04/30 08:24:41 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, PROCESS_LOCAL, 1343 bytes) 15/04/30 08:24:41 INFO executor.Executor: Running task 0.0 in stage 0.0 (TID 0) 15/04/30 08:24:41 INFO executor.Executor: Fetching http://10.9.167.55:32977/jars/spark-demo-1.0.jar with timestamp 1430371480725 15/04/30 08:24:42 INFO util.Utils: Fetching http://10.9.167.55:32977/jars/spark-demo-1.0.jar to /tmp/spark-4cb44a60-1a2b-4490-bf6a-f123e6c13949/userFiles-22bb9d0d-6d44-48bd-910b-8a0b6581d11a/fetchFileTemp4845879217433172142.tmp 15/04/30 08:24:42 INFO executor.Executor: Adding file:/tmp/spark-4cb44a60-1a2b-4490-bf6a-f123e6c13949/userFiles-22bb9d0d-6d44-48bd-910b-8a0b6581d11a/spark-demo-1.0.jar to class loader 15/04/30 08:24:42 INFO rdd.HadoopRDD: Input split: file:/home/hadoop/input/test.log:0+119 15/04/30 08:24:42 INFO executor.Executor: Finished task 0.0 in stage 0.0 (TID 0). 2003 bytes result sent to driver 15/04/30 08:24:42 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 274 ms on localhost (1/1) 15/04/30 08:24:42 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 15/04/30 08:24:42 INFO scheduler.DAGScheduler: Stage 0 (mapToPair at SparkDemo.java:39) finished in 0.293 s 15/04/30 08:24:42 INFO scheduler.DAGScheduler: looking for newly runnable stages 15/04/30 08:24:42 INFO scheduler.DAGScheduler: running: Set() 15/04/30 08:24:42 INFO scheduler.DAGScheduler: waiting: Set(Stage 1) 15/04/30 08:24:42 INFO scheduler.DAGScheduler: failed: Set() 15/04/30 08:24:42 INFO scheduler.DAGScheduler: Missing parents for Stage 1: List() 15/04/30 08:24:42 INFO scheduler.DAGScheduler: Submitting Stage 1 (MapPartitionsRDD[5] at saveAsTextFile at SparkDemo.java:53), which is now runnable 15/04/30 08:24:42 INFO storage.MemoryStore: ensureFreeSpace(127584) called with curMem=198214, maxMem=277877882 15/04/30 08:24:42 INFO storage.MemoryStore: Block broadcast_2 stored as values in memory (estimated size 124.6 KB, free 264.7 MB) 15/04/30 08:24:42 INFO storage.MemoryStore: ensureFreeSpace(77104) called with curMem=325798, maxMem=277877882 15/04/30 08:24:42 INFO storage.MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 75.3 KB, free 264.6 MB) 15/04/30 08:24:42 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in memory on localhost:34209 (size: 75.3 KB, free: 264.9 MB) 15/04/30 08:24:42 INFO storage.BlockManagerMaster: Updated info of block broadcast_2_piece0 15/04/30 08:24:42 INFO spark.SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:839 15/04/30 08:24:42 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from Stage 1 (MapPartitionsRDD[5] at saveAsTextFile at SparkDemo.java:53) 15/04/30 08:24:42 INFO scheduler.TaskSchedulerImpl: Adding task set 1.0 with 1 tasks 15/04/30 08:24:42 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1, localhost, PROCESS_LOCAL, 1114 bytes) 15/04/30 08:24:42 INFO executor.Executor: Running task 0.0 in stage 1.0 (TID 1) 15/04/30 08:24:42 INFO storage.ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 1 blocks 15/04/30 08:24:42 INFO storage.ShuffleBlockFetcherIterator: Started 0 remote fetches in 4 ms 15/04/30 08:24:42 INFO output.FileOutputCommitter: Saved output of task 'attempt_201504300824_0001_m_000000_1' to file:/home/hadoop/output/_temporary/0/task_201504300824_0001_m_000000 15/04/30 08:24:42 INFO mapred.SparkHadoopMapRedUtil: attempt_201504300824_0001_m_000000_1: Committed 15/04/30 08:24:42 INFO executor.Executor: Finished task 0.0 in stage 1.0 (TID 1). 1828 bytes result sent to driver 15/04/30 08:24:42 INFO scheduler.DAGScheduler: Stage 1 (saveAsTextFile at SparkDemo.java:53) finished in 0.171 s 15/04/30 08:24:42 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 171 ms on localhost (1/1) 15/04/30 08:24:42 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 15/04/30 08:24:42 INFO scheduler.DAGScheduler: Job 0 finished: saveAsTextFile at SparkDemo.java:53, took 0.640220 s 15/04/30 08:24:42 INFO storage.MemoryStore: ensureFreeSpace(76899) called with curMem=402902, maxMem=277877882 15/04/30 08:24:42 INFO storage.MemoryStore: Block broadcast_3 stored as values in memory (estimated size 75.1 KB, free 264.5 MB) 15/04/30 08:24:42 INFO storage.MemoryStore: ensureFreeSpace(36686) called with curMem=479801, maxMem=277877882 15/04/30 08:24:42 INFO storage.MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 35.8 KB, free 264.5 MB) 15/04/30 08:24:42 INFO storage.BlockManagerInfo: Added broadcast_3_piece0 in memory on localhost:34209 (size: 35.8 KB, free: 264.9 MB) 15/04/30 08:24:42 INFO storage.BlockManagerMaster: Updated info of block broadcast_3_piece0 15/04/30 08:24:42 INFO spark.SparkContext: Created broadcast 3 from textFile at SparkDemo.java:58 15/04/30 08:24:42 INFO mapred.FileInputFormat: Total input paths to process : 1 15/04/30 08:24:42 INFO spark.SparkContext: Starting job: count at SparkDemo.java:67 15/04/30 08:24:42 INFO scheduler.DAGScheduler: Got job 1 (count at SparkDemo.java:67) with 1 output partitions (allowLocal=false) 15/04/30 08:24:42 INFO scheduler.DAGScheduler: Final stage: Stage 2(count at SparkDemo.java:67) 15/04/30 08:24:42 INFO scheduler.DAGScheduler: Parents of final stage: List() 15/04/30 08:24:42 INFO scheduler.DAGScheduler: Missing parents: List() 15/04/30 08:24:42 INFO scheduler.DAGScheduler: Submitting Stage 2 (MapPartitionsRDD[8] at filter at SparkDemo.java:60), which has no missing parents 15/04/30 08:24:42 INFO storage.MemoryStore: ensureFreeSpace(2936) called with curMem=516487, maxMem=277877882 15/04/30 08:24:42 INFO storage.MemoryStore: Block broadcast_4 stored as values in memory (estimated size 2.9 KB, free 264.5 MB) 15/04/30 08:24:42 INFO storage.MemoryStore: ensureFreeSpace(2102) called with curMem=519423, maxMem=277877882 15/04/30 08:24:42 INFO storage.MemoryStore: Block broadcast_4_piece0 stored as bytes in memory (estimated size 2.1 KB, free 264.5 MB) 15/04/30 08:24:42 INFO storage.BlockManagerInfo: Added broadcast_4_piece0 in memory on localhost:34209 (size: 2.1 KB, free: 264.9 MB) 15/04/30 08:24:42 INFO storage.BlockManagerMaster: Updated info of block broadcast_4_piece0 15/04/30 08:24:42 INFO spark.SparkContext: Created broadcast 4 from broadcast at DAGScheduler.scala:839 15/04/30 08:24:42 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from Stage 2 (MapPartitionsRDD[8] at filter at SparkDemo.java:60) 15/04/30 08:24:42 INFO scheduler.TaskSchedulerImpl: Adding task set 2.0 with 1 tasks 15/04/30 08:24:42 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 2.0 (TID 2, localhost, PROCESS_LOCAL, 1354 bytes) 15/04/30 08:24:42 INFO executor.Executor: Running task 0.0 in stage 2.0 (TID 2) 15/04/30 08:24:42 INFO rdd.HadoopRDD: Input split: file:/home/hadoop/input/test.log:0+119 15/04/30 08:24:42 INFO executor.Executor: Finished task 0.0 in stage 2.0 (TID 2). 1830 bytes result sent to driver 15/04/30 08:24:42 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 2.0 (TID 2) in 8 ms on localhost (1/1) 15/04/30 08:24:42 INFO scheduler.DAGScheduler: Stage 2 (count at SparkDemo.java:67) finished in 0.008 s 15/04/30 08:24:42 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool 15/04/30 08:24:42 INFO scheduler.DAGScheduler: Job 1 finished: count at SparkDemo.java:67, took 0.017870 s 15/04/30 08:24:42 INFO spark.SparkContext: Starting job: count at SparkDemo.java:74 15/04/30 08:24:42 INFO scheduler.DAGScheduler: Got job 2 (count at SparkDemo.java:74) with 1 output partitions (allowLocal=false) 15/04/30 08:24:42 INFO scheduler.DAGScheduler: Final stage: Stage 3(count at SparkDemo.java:74) 15/04/30 08:24:42 INFO scheduler.DAGScheduler: Parents of final stage: List() 15/04/30 08:24:42 INFO scheduler.DAGScheduler: Missing parents: List() 15/04/30 08:24:42 INFO scheduler.DAGScheduler: Submitting Stage 3 (MapPartitionsRDD[9] at filter at SparkDemo.java:69), which has no missing parents 15/04/30 08:24:42 INFO storage.MemoryStore: ensureFreeSpace(3040) called with curMem=521525, maxMem=277877882 15/04/30 08:24:42 INFO storage.MemoryStore: Block broadcast_5 stored as values in memory (estimated size 3.0 KB, free 264.5 MB) 15/04/30 08:24:42 INFO storage.MemoryStore: ensureFreeSpace(2156) called with curMem=524565, maxMem=277877882 15/04/30 08:24:42 INFO storage.MemoryStore: Block broadcast_5_piece0 stored as bytes in memory (estimated size 2.1 KB, free 264.5 MB) 15/04/30 08:24:42 INFO storage.BlockManagerInfo: Added broadcast_5_piece0 in memory on localhost:34209 (size: 2.1 KB, free: 264.9 MB) 15/04/30 08:24:42 INFO storage.BlockManagerMaster: Updated info of block broadcast_5_piece0 15/04/30 08:24:42 INFO spark.SparkContext: Created broadcast 5 from broadcast at DAGScheduler.scala:839 15/04/30 08:24:42 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from Stage 3 (MapPartitionsRDD[9] at filter at SparkDemo.java:69) 15/04/30 08:24:42 INFO scheduler.TaskSchedulerImpl: Adding task set 3.0 with 1 tasks 15/04/30 08:24:42 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 3.0 (TID 3, localhost, PROCESS_LOCAL, 1354 bytes) 15/04/30 08:24:42 INFO executor.Executor: Running task 0.0 in stage 3.0 (TID 3) 15/04/30 08:24:42 INFO rdd.HadoopRDD: Input split: file:/home/hadoop/input/test.log:0+119 15/04/30 08:24:42 INFO executor.Executor: Finished task 0.0 in stage 3.0 (TID 3). 1830 bytes result sent to driver 15/04/30 08:24:42 INFO scheduler.DAGScheduler: Stage 3 (count at SparkDemo.java:74) finished in 0.009 s 15/04/30 08:24:42 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 3.0 (TID 3) in 9 ms on localhost (1/1) 15/04/30 08:24:42 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 3.0, whose tasks have all completed, from pool 15/04/30 08:24:42 INFO scheduler.DAGScheduler: Job 2 finished: count at SparkDemo.java:74, took 0.019982 s 15/04/30 08:24:42 INFO spark.SparkContext: Starting job: collect at SparkDemo.java:81 15/04/30 08:24:42 INFO scheduler.DAGScheduler: Got job 3 (collect at SparkDemo.java:81) with 1 output partitions (allowLocal=false) 15/04/30 08:24:42 INFO scheduler.DAGScheduler: Final stage: Stage 4(collect at SparkDemo.java:81) 15/04/30 08:24:42 INFO scheduler.DAGScheduler: Parents of final stage: List() 15/04/30 08:24:42 INFO scheduler.DAGScheduler: Missing parents: List() 15/04/30 08:24:42 INFO scheduler.DAGScheduler: Submitting Stage 4 (MapPartitionsRDD[10] at filter at SparkDemo.java:76), which has no missing parents 15/04/30 08:24:42 INFO storage.MemoryStore: ensureFreeSpace(3056) called with curMem=526721, maxMem=277877882 15/04/30 08:24:42 INFO storage.MemoryStore: Block broadcast_6 stored as values in memory (estimated size 3.0 KB, free 264.5 MB) 15/04/30 08:24:42 INFO storage.MemoryStore: ensureFreeSpace(2164) called with curMem=529777, maxMem=277877882 15/04/30 08:24:42 INFO storage.MemoryStore: Block broadcast_6_piece0 stored as bytes in memory (estimated size 2.1 KB, free 264.5 MB) 15/04/30 08:24:42 INFO storage.BlockManagerInfo: Added broadcast_6_piece0 in memory on localhost:34209 (size: 2.1 KB, free: 264.9 MB) 15/04/30 08:24:42 INFO storage.BlockManagerMaster: Updated info of block broadcast_6_piece0 15/04/30 08:24:42 INFO spark.SparkContext: Created broadcast 6 from broadcast at DAGScheduler.scala:839 15/04/30 08:24:42 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from Stage 4 (MapPartitionsRDD[10] at filter at SparkDemo.java:76) 15/04/30 08:24:42 INFO scheduler.TaskSchedulerImpl: Adding task set 4.0 with 1 tasks 15/04/30 08:24:42 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 4.0 (TID 4, localhost, PROCESS_LOCAL, 1354 bytes) 15/04/30 08:24:42 INFO executor.Executor: Running task 0.0 in stage 4.0 (TID 4) 15/04/30 08:24:42 INFO rdd.HadoopRDD: Input split: file:/home/hadoop/input/test.log:0+119 15/04/30 08:24:42 INFO executor.Executor: Finished task 0.0 in stage 4.0 (TID 4). 1828 bytes result sent to driver 15/04/30 08:24:42 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 4.0 (TID 4) in 7 ms on localhost (1/1) 15/04/30 08:24:42 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 4.0, whose tasks have all completed, from pool 15/04/30 08:24:42 INFO scheduler.DAGScheduler: Stage 4 (collect at SparkDemo.java:81) finished in 0.006 s 15/04/30 08:24:42 INFO scheduler.DAGScheduler: Job 3 finished: collect at SparkDemo.java:81, took 0.017157 s [root@karei spark]#
8. 输出结果
[root@karei spark]# cat /home/hadoop/output/part-00000 (Spark,2) (Operation,1) (ORACLE,1) (ERROR,1) (start,1) (INFO,2) (Configuration,1) (end,1) (Database,1) (testing,2) (HDFS,1) (WARN,1) (:,4) (Hadoop,1) [root@karei spark]#