Spark First Sample Demo

简介: <h2><span style="font-family:KaiTi_GB2312; font-size:14px">1. 运行环境</span></h2> <ul><li><span style="font-family:KaiTi_GB2312; font-size:14px">OS:</span></li></ul><p></p> <p><span style="font-fa

1. 运行环境

  • OS:

[root@karei spark-1.3.1-bin-hadoop2.6]# uname -a
Linux karei 2.6.18-371.4.1.el5 #1 SMP Wed Jan 8 18:42:07 EST 2014 x86_64 x86_64 x86_64 GNU/Linux
[root@karei spark-1.3.1-bin-hadoop2.6]# 

  • CPU:

[root@karei spark-1.3.1-bin-hadoop2.6]# cat /proc/cpuinfo | grep name | cut -d':' -f2 | uniq -c
      4  Intel(R) Xeon(R) CPU           X5270  @ 3.50GHz
[root@karei spark-1.3.1-bin-hadoop2.6]# 

[root@karei spark-1.3.1-bin-hadoop2.6]# 
[root@karei spark-1.3.1-bin-hadoop2.6]# cat /proc/cpuinfo | grep "physical id" | uniq -c
      1 physical id     : 0
      1 physical id     : 3
      1 physical id     : 0
      1 physical id     : 3
[root@karei spark-1.3.1-bin-hadoop2.6]# 

  • Memory:

[root@karei spark-1.3.1-bin-hadoop2.6]# 
[root@karei spark-1.3.1-bin-hadoop2.6]# grep MemTotal /proc/meminfo
MemTotal:     32961052 kB
[root@karei spark-1.3.1-bin-hadoop2.6]# 
[root@karei spark-1.3.1-bin-hadoop2.6]# 

2. 下载Spark

我用的是spark-1.3.1-bin-hadoop2.6.tgz

http://spark.apache.org/downloads.html

3. Linux上解压Spark压缩包

[root@karei opt]# ll /opt/spark-1.3.1-bin-hadoop2.6/
total 460
-rwxrw-r-- 1 1000 1000 278851 Apr 11 08:32 CHANGES.txt
-rwxrw-r-- 1 1000 1000  46083 Apr 11 08:32 LICENSE
-rwxrw-r-- 1 1000 1000  22559 Apr 11 08:32 NOTICE
-rwxrw-r-- 1 1000 1000   3629 Apr 11 08:32 README.md
-rwxrw-r-- 1 1000 1000    157 Apr 11 08:32 RELEASE
drwxrwxr-x 2 1000 1000   4096 Apr 11 08:32 bin
drwxrwxr-x 2 1000 1000   4096 Apr 11 08:32 conf
drwxrwxr-x 3 1000 1000   4096 Apr 11 08:32 data
drwxrwxr-x 3 1000 1000   4096 Apr 11 08:32 ec2
drwxrwxr-x 3 1000 1000   4096 Apr 11 08:32 examples
drwxrwxr-x 2 1000 1000   4096 Apr 11 08:32 lib
drwxr-xr-x 2 root root   4096 Apr 27 10:49 mylib
drwxrwxr-x 7 1000 1000   4096 Apr 11 08:32 python
drwxrwxr-x 2 1000 1000   4096 Apr 11 08:32 sbin
drwxr-xr-x 4 root root   4096 Apr 27 11:10 tmp
[root@karei opt]# 

4. Linux上配置Spark环境变量

[root@karei opt]# export PATH=/opt/spark-1.3.1-bin-hadoop2.6/bin:$PATH

5. Spark Sample开发

Maven(3.2.3) + Eclipse(Luna 4.4.1) + Spark(1.3.1) + JDK(1.8.45)

  • pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<groupId>shuai.spark</groupId>
	<artifactId>spark-demo</artifactId>
	<version>1.0</version>
	<packaging>jar</packaging>

	<name>spark-demo</name>
	<url>http://maven.apache.org</url>

	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<spark.version>1.3.1</spark.version>
		<junit.version>4.11</junit.version>
		<maven.compiler.source>1.8</maven.compiler.source>
		<maven.compiler.target>1.8</maven.compiler.target>
	</properties>

	<dependencies>
		<dependency>
			<groupId>org.apache.commons</groupId>
			<artifactId>commons-lang3</artifactId>
			<version>3.1</version>
		</dependency>
		<dependency>
			<groupId>commons-io</groupId>
			<artifactId>commons-io</artifactId>
			<version>2.4</version>
		</dependency>

		<dependency>
			<groupId>org.apache.spark</groupId>
			<artifactId>spark-core_2.10</artifactId>
			<version>${spark.version}</version>
		</dependency>

		<dependency>
			<groupId>junit</groupId>
			<artifactId>junit</artifactId>
			<scope>test</scope>
			<version>${junit.version}</version>
		</dependency>
		<dependency>
			<groupId>org.hamcrest</groupId>
			<artifactId>hamcrest-all</artifactId>
			<scope>test</scope>
			<version>1.3</version>
		</dependency>
	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-compiler-plugin</artifactId>
				<version>3.3</version>
			</plugin>

			<!--  
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-shade-plugin</artifactId>
				<executions>
					<execution>
						<phase>package</phase>
						<goals>
							<goal>shade</goal>
						</goals>
					</execution>
				</executions>
				<configuration>
					<finalName>spark-demo</finalName>
					<shadedArtifactAttached>true</shadedArtifactAttached>
					<transformers>
						<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
							<mainClass>shuai.spark.SparkDemo</mainClass>
						</transformer>
					</transformers>
				</configuration>
			</plugin>
			-->
		</plugins>
	</build>
</project>
  • SparkDemo.java

package shuai.spark;

import java.io.File;
import java.util.Arrays;

import org.apache.commons.io.FileUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;

import scala.Tuple2;

public class SparkDemo {
	private static JavaSparkContext javaSparkContext = null;

	public static void init(String appName, String master) {
		SparkConf conf = new SparkConf().setAppName(appName).setMaster(master);
		javaSparkContext = new JavaSparkContext(conf);
	}

	@SuppressWarnings("serial")
	private static void wordCount(String filePath, String fileDir) {
		FileUtils.deleteQuietly(new File(fileDir));

		JavaRDD<String> file = javaSparkContext.textFile(filePath);

		JavaRDD<String> words = file.flatMap(new FlatMapFunction<String, String>() {
			@Override
			public Iterable<String> call(String s) {
				return Arrays.asList(s.split(" "));
			}
		});

		JavaPairRDD<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {
			@Override
			public Tuple2<String, Integer> call(String s) {
				return new Tuple2<String, Integer>(s, 1);
			}
		});

		JavaPairRDD<String, Integer> counts = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {
			@Override
			public Integer call(Integer a, Integer b) {
				return a + b;
			}
		});

		counts.saveAsTextFile(fileDir);
	}

	@SuppressWarnings("serial")
	private static void errorCount(String filePath) {
		JavaRDD<String> file = javaSparkContext.textFile(filePath);

		JavaRDD<String> errors = file.filter(new Function<String, Boolean>() {
			@Override
			public Boolean call(String s) {
				return s.contains("ERROR");
			}
		});

		errors.count();

		errors.filter(new Function<String, Boolean>() {
			@Override
			public Boolean call(String s) {
				return s.contains("ORACLE");
			}
		}).count();

		errors.filter(new Function<String, Boolean>() {
			@Override
			public Boolean call(String s) {
				return s.contains("ORACLE");
			}
		}).collect();
	}

	public static void main(String[] args) {
		if (args.length < 2) {
			System.err.println("Please provide the input file full path and output dir as argument");
			System.exit(0);
		}

		SparkDemo.init("shuai.spark", "local");

		SparkDemo.wordCount(args[0], args[1]);

		SparkDemo.errorCount(args[0]);
	}
}

6. Linux上部署Spark

创建相应的目录和文件,并将spark-demo-1.0.jar部署到环境上

[root@karei mylib]#
[root@karei mylib]# ll /usr/local/spark/mylib/spark-demo-1.0.jar
-rwxr--r-- 1 spark bigdata 9027 Apr 27 11:05 /usr/local/spark/mylib/spark-demo-1.0.jar
[root@karei mylib]#
[root@karei mylib]# ll /home/hadoop/input/test.log
-rw-r--r-- 1 root root 119 Apr 30 08:14 /home/hadoop/input/test.log
[root@karei mylib]#

[root@karei spark]# cat /home/hadoop/input/test.log
INFO : Spark testing start
WARN : Hadoop HDFS Configuration
ERROR : Database ORACLE Operation
INFO : Spark testing end
[root@karei spark]#

7. Sample运行

[root@karei spark]#
[root@karei spark]# ./bin/spark-submit --class shuai.spark.SparkDemo --master local[2] mylib/spark-demo-1.0.jar /home/hadoop/input/test.log /home/hadoop/output
[root@karei spark]#

[root@karei spark]#
[root@karei spark]# ./bin/spark-submit --class shuai.spark.SparkDemo --master local[2] mylib/spark-demo-1.0.jar /home/hadoop/input/test.log /home/hadoop/output
15/04/30 08:24:39 INFO spark.SparkContext: Running Spark version 1.3.1
15/04/30 08:24:39 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/04/30 08:24:39 INFO spark.SecurityManager: Changing view acls to: root
15/04/30 08:24:39 INFO spark.SecurityManager: Changing modify acls to: root
15/04/30 08:24:39 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); users with modify permissions: Set(root)
15/04/30 08:24:40 INFO slf4j.Slf4jLogger: Slf4jLogger started
15/04/30 08:24:40 INFO Remoting: Starting remoting
15/04/30 08:24:40 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@karei.netact.noklab.net:41232]
15/04/30 08:24:40 INFO util.Utils: Successfully started service 'sparkDriver' on port 41232.
15/04/30 08:24:40 INFO spark.SparkEnv: Registering MapOutputTracker
15/04/30 08:24:40 INFO spark.SparkEnv: Registering BlockManagerMaster
15/04/30 08:24:40 INFO storage.DiskBlockManager: Created local directory at /tmp/spark-5713b972-849e-44c9-9e92-8cd0b4de7668/blockmgr-24d0ad17-fef5-4179-9018-39057c519c5b
15/04/30 08:24:40 INFO storage.MemoryStore: MemoryStore started with capacity 265.0 MB
15/04/30 08:24:40 INFO spark.HttpFileServer: HTTP File server directory is /tmp/spark-ea736e60-3575-49b7-b8ba-1b2f8587bac3/httpd-eb6af892-2e5c-4331-a510-d882af085db4
15/04/30 08:24:40 INFO spark.HttpServer: Starting HTTP Server
15/04/30 08:24:40 INFO server.Server: jetty-8.y.z-SNAPSHOT
15/04/30 08:24:40 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:32977
15/04/30 08:24:40 INFO util.Utils: Successfully started service 'HTTP file server' on port 32977.
15/04/30 08:24:40 INFO spark.SparkEnv: Registering OutputCommitCoordinator
15/04/30 08:24:40 INFO server.Server: jetty-8.y.z-SNAPSHOT
15/04/30 08:24:40 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040
15/04/30 08:24:40 INFO util.Utils: Successfully started service 'SparkUI' on port 4040.
15/04/30 08:24:40 INFO ui.SparkUI: Started SparkUI at http://karei.netact.noklab.net:4040
15/04/30 08:24:40 INFO spark.SparkContext: Added JAR file:/usr/local/spark/mylib/spark-demo-1.0.jar at http://10.9.167.55:32977/jars/spark-demo-1.0.jar with timestamp 1430371480725
15/04/30 08:24:40 INFO executor.Executor: Starting executor ID <driver> on host localhost
15/04/30 08:24:40 INFO util.AkkaUtils: Connecting to HeartbeatReceiver: akka.tcp://sparkDriver@karei.netact.noklab.net:41232/user/HeartbeatReceiver
15/04/30 08:24:40 INFO netty.NettyBlockTransferService: Server created on 34209
15/04/30 08:24:40 INFO storage.BlockManagerMaster: Trying to register BlockManager
15/04/30 08:24:40 INFO storage.BlockManagerMasterActor: Registering block manager localhost:34209 with 265.0 MB RAM, BlockManagerId(<driver>, localhost, 34209)
15/04/30 08:24:40 INFO storage.BlockManagerMaster: Registered BlockManager
15/04/30 08:24:41 WARN util.SizeEstimator: Failed to check whether UseCompressedOops is set; assuming yes
15/04/30 08:24:41 INFO storage.MemoryStore: ensureFreeSpace(165175) called with curMem=0, maxMem=277877882
15/04/30 08:24:41 INFO storage.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 161.3 KB, free 264.8 MB)
15/04/30 08:24:41 INFO storage.MemoryStore: ensureFreeSpace(25610) called with curMem=165175, maxMem=277877882
15/04/30 08:24:41 INFO storage.MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 25.0 KB, free 264.8 MB)
15/04/30 08:24:41 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:34209 (size: 25.0 KB, free: 265.0 MB)
15/04/30 08:24:41 INFO storage.BlockManagerMaster: Updated info of block broadcast_0_piece0
15/04/30 08:24:41 INFO spark.SparkContext: Created broadcast 0 from textFile at SparkDemo.java:30
15/04/30 08:24:41 INFO mapred.FileInputFormat: Total input paths to process : 1
15/04/30 08:24:41 INFO Configuration.deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id
15/04/30 08:24:41 INFO Configuration.deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id
15/04/30 08:24:41 INFO Configuration.deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap
15/04/30 08:24:41 INFO Configuration.deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition
15/04/30 08:24:41 INFO Configuration.deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id
15/04/30 08:24:41 INFO spark.SparkContext: Starting job: saveAsTextFile at SparkDemo.java:53
15/04/30 08:24:41 INFO scheduler.DAGScheduler: Registering RDD 3 (mapToPair at SparkDemo.java:39)
15/04/30 08:24:41 INFO scheduler.DAGScheduler: Got job 0 (saveAsTextFile at SparkDemo.java:53) with 1 output partitions (allowLocal=false)
15/04/30 08:24:41 INFO scheduler.DAGScheduler: Final stage: Stage 1(saveAsTextFile at SparkDemo.java:53)
15/04/30 08:24:41 INFO scheduler.DAGScheduler: Parents of final stage: List(Stage 0)
15/04/30 08:24:41 INFO scheduler.DAGScheduler: Missing parents: List(Stage 0)
15/04/30 08:24:41 INFO scheduler.DAGScheduler: Submitting Stage 0 (MapPartitionsRDD[3] at mapToPair at SparkDemo.java:39), which has no missing parents
15/04/30 08:24:41 INFO storage.MemoryStore: ensureFreeSpace(4344) called with curMem=190785, maxMem=277877882
15/04/30 08:24:41 INFO storage.MemoryStore: Block broadcast_1 stored as values in memory (estimated size 4.2 KB, free 264.8 MB)
15/04/30 08:24:41 INFO storage.MemoryStore: ensureFreeSpace(3085) called with curMem=195129, maxMem=277877882
15/04/30 08:24:41 INFO storage.MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 3.0 KB, free 264.8 MB)
15/04/30 08:24:41 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:34209 (size: 3.0 KB, free: 265.0 MB)
15/04/30 08:24:41 INFO storage.BlockManagerMaster: Updated info of block broadcast_1_piece0
15/04/30 08:24:41 INFO spark.SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:839
15/04/30 08:24:41 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from Stage 0 (MapPartitionsRDD[3] at mapToPair at SparkDemo.java:39)
15/04/30 08:24:41 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
15/04/30 08:24:41 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, PROCESS_LOCAL, 1343 bytes)
15/04/30 08:24:41 INFO executor.Executor: Running task 0.0 in stage 0.0 (TID 0)
15/04/30 08:24:41 INFO executor.Executor: Fetching http://10.9.167.55:32977/jars/spark-demo-1.0.jar with timestamp 1430371480725
15/04/30 08:24:42 INFO util.Utils: Fetching http://10.9.167.55:32977/jars/spark-demo-1.0.jar to /tmp/spark-4cb44a60-1a2b-4490-bf6a-f123e6c13949/userFiles-22bb9d0d-6d44-48bd-910b-8a0b6581d11a/fetchFileTemp4845879217433172142.tmp
15/04/30 08:24:42 INFO executor.Executor: Adding file:/tmp/spark-4cb44a60-1a2b-4490-bf6a-f123e6c13949/userFiles-22bb9d0d-6d44-48bd-910b-8a0b6581d11a/spark-demo-1.0.jar to class loader
15/04/30 08:24:42 INFO rdd.HadoopRDD: Input split: file:/home/hadoop/input/test.log:0+119
15/04/30 08:24:42 INFO executor.Executor: Finished task 0.0 in stage 0.0 (TID 0). 2003 bytes result sent to driver
15/04/30 08:24:42 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 274 ms on localhost (1/1)
15/04/30 08:24:42 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
15/04/30 08:24:42 INFO scheduler.DAGScheduler: Stage 0 (mapToPair at SparkDemo.java:39) finished in 0.293 s
15/04/30 08:24:42 INFO scheduler.DAGScheduler: looking for newly runnable stages
15/04/30 08:24:42 INFO scheduler.DAGScheduler: running: Set()
15/04/30 08:24:42 INFO scheduler.DAGScheduler: waiting: Set(Stage 1)
15/04/30 08:24:42 INFO scheduler.DAGScheduler: failed: Set()
15/04/30 08:24:42 INFO scheduler.DAGScheduler: Missing parents for Stage 1: List()
15/04/30 08:24:42 INFO scheduler.DAGScheduler: Submitting Stage 1 (MapPartitionsRDD[5] at saveAsTextFile at SparkDemo.java:53), which is now runnable
15/04/30 08:24:42 INFO storage.MemoryStore: ensureFreeSpace(127584) called with curMem=198214, maxMem=277877882
15/04/30 08:24:42 INFO storage.MemoryStore: Block broadcast_2 stored as values in memory (estimated size 124.6 KB, free 264.7 MB)
15/04/30 08:24:42 INFO storage.MemoryStore: ensureFreeSpace(77104) called with curMem=325798, maxMem=277877882
15/04/30 08:24:42 INFO storage.MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 75.3 KB, free 264.6 MB)
15/04/30 08:24:42 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in memory on localhost:34209 (size: 75.3 KB, free: 264.9 MB)
15/04/30 08:24:42 INFO storage.BlockManagerMaster: Updated info of block broadcast_2_piece0
15/04/30 08:24:42 INFO spark.SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:839
15/04/30 08:24:42 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from Stage 1 (MapPartitionsRDD[5] at saveAsTextFile at SparkDemo.java:53)
15/04/30 08:24:42 INFO scheduler.TaskSchedulerImpl: Adding task set 1.0 with 1 tasks
15/04/30 08:24:42 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1, localhost, PROCESS_LOCAL, 1114 bytes)
15/04/30 08:24:42 INFO executor.Executor: Running task 0.0 in stage 1.0 (TID 1)
15/04/30 08:24:42 INFO storage.ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 1 blocks
15/04/30 08:24:42 INFO storage.ShuffleBlockFetcherIterator: Started 0 remote fetches in 4 ms
15/04/30 08:24:42 INFO output.FileOutputCommitter: Saved output of task 'attempt_201504300824_0001_m_000000_1' to file:/home/hadoop/output/_temporary/0/task_201504300824_0001_m_000000
15/04/30 08:24:42 INFO mapred.SparkHadoopMapRedUtil: attempt_201504300824_0001_m_000000_1: Committed
15/04/30 08:24:42 INFO executor.Executor: Finished task 0.0 in stage 1.0 (TID 1). 1828 bytes result sent to driver
15/04/30 08:24:42 INFO scheduler.DAGScheduler: Stage 1 (saveAsTextFile at SparkDemo.java:53) finished in 0.171 s
15/04/30 08:24:42 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 171 ms on localhost (1/1)
15/04/30 08:24:42 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
15/04/30 08:24:42 INFO scheduler.DAGScheduler: Job 0 finished: saveAsTextFile at SparkDemo.java:53, took 0.640220 s
15/04/30 08:24:42 INFO storage.MemoryStore: ensureFreeSpace(76899) called with curMem=402902, maxMem=277877882
15/04/30 08:24:42 INFO storage.MemoryStore: Block broadcast_3 stored as values in memory (estimated size 75.1 KB, free 264.5 MB)
15/04/30 08:24:42 INFO storage.MemoryStore: ensureFreeSpace(36686) called with curMem=479801, maxMem=277877882
15/04/30 08:24:42 INFO storage.MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 35.8 KB, free 264.5 MB)
15/04/30 08:24:42 INFO storage.BlockManagerInfo: Added broadcast_3_piece0 in memory on localhost:34209 (size: 35.8 KB, free: 264.9 MB)
15/04/30 08:24:42 INFO storage.BlockManagerMaster: Updated info of block broadcast_3_piece0
15/04/30 08:24:42 INFO spark.SparkContext: Created broadcast 3 from textFile at SparkDemo.java:58
15/04/30 08:24:42 INFO mapred.FileInputFormat: Total input paths to process : 1
15/04/30 08:24:42 INFO spark.SparkContext: Starting job: count at SparkDemo.java:67
15/04/30 08:24:42 INFO scheduler.DAGScheduler: Got job 1 (count at SparkDemo.java:67) with 1 output partitions (allowLocal=false)
15/04/30 08:24:42 INFO scheduler.DAGScheduler: Final stage: Stage 2(count at SparkDemo.java:67)
15/04/30 08:24:42 INFO scheduler.DAGScheduler: Parents of final stage: List()
15/04/30 08:24:42 INFO scheduler.DAGScheduler: Missing parents: List()
15/04/30 08:24:42 INFO scheduler.DAGScheduler: Submitting Stage 2 (MapPartitionsRDD[8] at filter at SparkDemo.java:60), which has no missing parents
15/04/30 08:24:42 INFO storage.MemoryStore: ensureFreeSpace(2936) called with curMem=516487, maxMem=277877882
15/04/30 08:24:42 INFO storage.MemoryStore: Block broadcast_4 stored as values in memory (estimated size 2.9 KB, free 264.5 MB)
15/04/30 08:24:42 INFO storage.MemoryStore: ensureFreeSpace(2102) called with curMem=519423, maxMem=277877882
15/04/30 08:24:42 INFO storage.MemoryStore: Block broadcast_4_piece0 stored as bytes in memory (estimated size 2.1 KB, free 264.5 MB)
15/04/30 08:24:42 INFO storage.BlockManagerInfo: Added broadcast_4_piece0 in memory on localhost:34209 (size: 2.1 KB, free: 264.9 MB)
15/04/30 08:24:42 INFO storage.BlockManagerMaster: Updated info of block broadcast_4_piece0
15/04/30 08:24:42 INFO spark.SparkContext: Created broadcast 4 from broadcast at DAGScheduler.scala:839
15/04/30 08:24:42 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from Stage 2 (MapPartitionsRDD[8] at filter at SparkDemo.java:60)
15/04/30 08:24:42 INFO scheduler.TaskSchedulerImpl: Adding task set 2.0 with 1 tasks
15/04/30 08:24:42 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 2.0 (TID 2, localhost, PROCESS_LOCAL, 1354 bytes)
15/04/30 08:24:42 INFO executor.Executor: Running task 0.0 in stage 2.0 (TID 2)
15/04/30 08:24:42 INFO rdd.HadoopRDD: Input split: file:/home/hadoop/input/test.log:0+119
15/04/30 08:24:42 INFO executor.Executor: Finished task 0.0 in stage 2.0 (TID 2). 1830 bytes result sent to driver
15/04/30 08:24:42 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 2.0 (TID 2) in 8 ms on localhost (1/1)
15/04/30 08:24:42 INFO scheduler.DAGScheduler: Stage 2 (count at SparkDemo.java:67) finished in 0.008 s
15/04/30 08:24:42 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool
15/04/30 08:24:42 INFO scheduler.DAGScheduler: Job 1 finished: count at SparkDemo.java:67, took 0.017870 s
15/04/30 08:24:42 INFO spark.SparkContext: Starting job: count at SparkDemo.java:74
15/04/30 08:24:42 INFO scheduler.DAGScheduler: Got job 2 (count at SparkDemo.java:74) with 1 output partitions (allowLocal=false)
15/04/30 08:24:42 INFO scheduler.DAGScheduler: Final stage: Stage 3(count at SparkDemo.java:74)
15/04/30 08:24:42 INFO scheduler.DAGScheduler: Parents of final stage: List()
15/04/30 08:24:42 INFO scheduler.DAGScheduler: Missing parents: List()
15/04/30 08:24:42 INFO scheduler.DAGScheduler: Submitting Stage 3 (MapPartitionsRDD[9] at filter at SparkDemo.java:69), which has no missing parents
15/04/30 08:24:42 INFO storage.MemoryStore: ensureFreeSpace(3040) called with curMem=521525, maxMem=277877882
15/04/30 08:24:42 INFO storage.MemoryStore: Block broadcast_5 stored as values in memory (estimated size 3.0 KB, free 264.5 MB)
15/04/30 08:24:42 INFO storage.MemoryStore: ensureFreeSpace(2156) called with curMem=524565, maxMem=277877882
15/04/30 08:24:42 INFO storage.MemoryStore: Block broadcast_5_piece0 stored as bytes in memory (estimated size 2.1 KB, free 264.5 MB)
15/04/30 08:24:42 INFO storage.BlockManagerInfo: Added broadcast_5_piece0 in memory on localhost:34209 (size: 2.1 KB, free: 264.9 MB)
15/04/30 08:24:42 INFO storage.BlockManagerMaster: Updated info of block broadcast_5_piece0
15/04/30 08:24:42 INFO spark.SparkContext: Created broadcast 5 from broadcast at DAGScheduler.scala:839
15/04/30 08:24:42 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from Stage 3 (MapPartitionsRDD[9] at filter at SparkDemo.java:69)
15/04/30 08:24:42 INFO scheduler.TaskSchedulerImpl: Adding task set 3.0 with 1 tasks
15/04/30 08:24:42 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 3.0 (TID 3, localhost, PROCESS_LOCAL, 1354 bytes)
15/04/30 08:24:42 INFO executor.Executor: Running task 0.0 in stage 3.0 (TID 3)
15/04/30 08:24:42 INFO rdd.HadoopRDD: Input split: file:/home/hadoop/input/test.log:0+119
15/04/30 08:24:42 INFO executor.Executor: Finished task 0.0 in stage 3.0 (TID 3). 1830 bytes result sent to driver
15/04/30 08:24:42 INFO scheduler.DAGScheduler: Stage 3 (count at SparkDemo.java:74) finished in 0.009 s
15/04/30 08:24:42 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 3.0 (TID 3) in 9 ms on localhost (1/1)
15/04/30 08:24:42 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 3.0, whose tasks have all completed, from pool
15/04/30 08:24:42 INFO scheduler.DAGScheduler: Job 2 finished: count at SparkDemo.java:74, took 0.019982 s
15/04/30 08:24:42 INFO spark.SparkContext: Starting job: collect at SparkDemo.java:81
15/04/30 08:24:42 INFO scheduler.DAGScheduler: Got job 3 (collect at SparkDemo.java:81) with 1 output partitions (allowLocal=false)
15/04/30 08:24:42 INFO scheduler.DAGScheduler: Final stage: Stage 4(collect at SparkDemo.java:81)
15/04/30 08:24:42 INFO scheduler.DAGScheduler: Parents of final stage: List()
15/04/30 08:24:42 INFO scheduler.DAGScheduler: Missing parents: List()
15/04/30 08:24:42 INFO scheduler.DAGScheduler: Submitting Stage 4 (MapPartitionsRDD[10] at filter at SparkDemo.java:76), which has no missing parents
15/04/30 08:24:42 INFO storage.MemoryStore: ensureFreeSpace(3056) called with curMem=526721, maxMem=277877882
15/04/30 08:24:42 INFO storage.MemoryStore: Block broadcast_6 stored as values in memory (estimated size 3.0 KB, free 264.5 MB)
15/04/30 08:24:42 INFO storage.MemoryStore: ensureFreeSpace(2164) called with curMem=529777, maxMem=277877882
15/04/30 08:24:42 INFO storage.MemoryStore: Block broadcast_6_piece0 stored as bytes in memory (estimated size 2.1 KB, free 264.5 MB)
15/04/30 08:24:42 INFO storage.BlockManagerInfo: Added broadcast_6_piece0 in memory on localhost:34209 (size: 2.1 KB, free: 264.9 MB)
15/04/30 08:24:42 INFO storage.BlockManagerMaster: Updated info of block broadcast_6_piece0
15/04/30 08:24:42 INFO spark.SparkContext: Created broadcast 6 from broadcast at DAGScheduler.scala:839
15/04/30 08:24:42 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from Stage 4 (MapPartitionsRDD[10] at filter at SparkDemo.java:76)
15/04/30 08:24:42 INFO scheduler.TaskSchedulerImpl: Adding task set 4.0 with 1 tasks
15/04/30 08:24:42 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 4.0 (TID 4, localhost, PROCESS_LOCAL, 1354 bytes)
15/04/30 08:24:42 INFO executor.Executor: Running task 0.0 in stage 4.0 (TID 4)
15/04/30 08:24:42 INFO rdd.HadoopRDD: Input split: file:/home/hadoop/input/test.log:0+119
15/04/30 08:24:42 INFO executor.Executor: Finished task 0.0 in stage 4.0 (TID 4). 1828 bytes result sent to driver
15/04/30 08:24:42 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 4.0 (TID 4) in 7 ms on localhost (1/1)
15/04/30 08:24:42 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 4.0, whose tasks have all completed, from pool
15/04/30 08:24:42 INFO scheduler.DAGScheduler: Stage 4 (collect at SparkDemo.java:81) finished in 0.006 s
15/04/30 08:24:42 INFO scheduler.DAGScheduler: Job 3 finished: collect at SparkDemo.java:81, took 0.017157 s
[root@karei spark]#

8. 输出结果

[root@karei spark]# cat /home/hadoop/output/part-00000
(Spark,2)
(Operation,1)
(ORACLE,1)
(ERROR,1)
(start,1)
(INFO,2)
(Configuration,1)
(end,1)
(Database,1)
(testing,2)
(HDFS,1)
(WARN,1)
(:,4)
(Hadoop,1)
[root@karei spark]#

相关文章
|
分布式计算 Java 大数据
Jerry&#39;s spark demo application
Jerry&#39;s spark demo application
Jerry&#39;s spark demo application
|
分布式计算 Java 大数据
|
机器学习/深度学习 分布式计算 Spark
|
机器学习/深度学习 分布式计算 算法
|
分布式计算 Apache Spark
|
分布式计算 算法 测试技术
|
3月前
|
机器学习/深度学习 SQL 分布式计算
Apache Spark 的基本概念和在大数据分析中的应用
介绍 Apache Spark 的基本概念和在大数据分析中的应用
159 0
|
14天前
|
分布式计算 Hadoop 大数据
大数据技术与Python:结合Spark和Hadoop进行分布式计算
【4月更文挑战第12天】本文介绍了大数据技术及其4V特性,阐述了Hadoop和Spark在大数据处理中的作用。Hadoop提供分布式文件系统和MapReduce,Spark则为内存计算提供快速处理能力。通过Python结合Spark和Hadoop,可在分布式环境中进行数据处理和分析。文章详细讲解了如何配置Python环境、安装Spark和Hadoop,以及使用Python编写和提交代码到集群进行计算。掌握这些技能有助于应对大数据挑战。
|
3月前
|
机器学习/深度学习 SQL 分布式计算
介绍 Apache Spark 的基本概念和在大数据分析中的应用。
介绍 Apache Spark 的基本概念和在大数据分析中的应用。