Spark First Sample Demo

简介: <h2><span style="font-family:KaiTi_GB2312; font-size:14px">1. 运行环境</span></h2> <ul><li><span style="font-family:KaiTi_GB2312; font-size:14px">OS:</span></li></ul><p></p> <p><span style="font-fa

1. 运行环境

  • OS:

[root@karei spark-1.3.1-bin-hadoop2.6]# uname -a
Linux karei 2.6.18-371.4.1.el5 #1 SMP Wed Jan 8 18:42:07 EST 2014 x86_64 x86_64 x86_64 GNU/Linux
[root@karei spark-1.3.1-bin-hadoop2.6]# 

  • CPU:

[root@karei spark-1.3.1-bin-hadoop2.6]# cat /proc/cpuinfo | grep name | cut -d':' -f2 | uniq -c
      4  Intel(R) Xeon(R) CPU           X5270  @ 3.50GHz
[root@karei spark-1.3.1-bin-hadoop2.6]# 

[root@karei spark-1.3.1-bin-hadoop2.6]# 
[root@karei spark-1.3.1-bin-hadoop2.6]# cat /proc/cpuinfo | grep "physical id" | uniq -c
      1 physical id     : 0
      1 physical id     : 3
      1 physical id     : 0
      1 physical id     : 3
[root@karei spark-1.3.1-bin-hadoop2.6]# 

  • Memory:

[root@karei spark-1.3.1-bin-hadoop2.6]# 
[root@karei spark-1.3.1-bin-hadoop2.6]# grep MemTotal /proc/meminfo
MemTotal:     32961052 kB
[root@karei spark-1.3.1-bin-hadoop2.6]# 
[root@karei spark-1.3.1-bin-hadoop2.6]# 

2. 下载Spark

我用的是spark-1.3.1-bin-hadoop2.6.tgz

http://spark.apache.org/downloads.html

3. Linux上解压Spark压缩包

[root@karei opt]# ll /opt/spark-1.3.1-bin-hadoop2.6/
total 460
-rwxrw-r-- 1 1000 1000 278851 Apr 11 08:32 CHANGES.txt
-rwxrw-r-- 1 1000 1000  46083 Apr 11 08:32 LICENSE
-rwxrw-r-- 1 1000 1000  22559 Apr 11 08:32 NOTICE
-rwxrw-r-- 1 1000 1000   3629 Apr 11 08:32 README.md
-rwxrw-r-- 1 1000 1000    157 Apr 11 08:32 RELEASE
drwxrwxr-x 2 1000 1000   4096 Apr 11 08:32 bin
drwxrwxr-x 2 1000 1000   4096 Apr 11 08:32 conf
drwxrwxr-x 3 1000 1000   4096 Apr 11 08:32 data
drwxrwxr-x 3 1000 1000   4096 Apr 11 08:32 ec2
drwxrwxr-x 3 1000 1000   4096 Apr 11 08:32 examples
drwxrwxr-x 2 1000 1000   4096 Apr 11 08:32 lib
drwxr-xr-x 2 root root   4096 Apr 27 10:49 mylib
drwxrwxr-x 7 1000 1000   4096 Apr 11 08:32 python
drwxrwxr-x 2 1000 1000   4096 Apr 11 08:32 sbin
drwxr-xr-x 4 root root   4096 Apr 27 11:10 tmp
[root@karei opt]# 

4. Linux上配置Spark环境变量

[root@karei opt]# export PATH=/opt/spark-1.3.1-bin-hadoop2.6/bin:$PATH

5. Spark Sample开发

Maven(3.2.3) + Eclipse(Luna 4.4.1) + Spark(1.3.1) + JDK(1.8.45)

  • pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<groupId>shuai.spark</groupId>
	<artifactId>spark-demo</artifactId>
	<version>1.0</version>
	<packaging>jar</packaging>

	<name>spark-demo</name>
	<url>http://maven.apache.org</url>

	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<spark.version>1.3.1</spark.version>
		<junit.version>4.11</junit.version>
		<maven.compiler.source>1.8</maven.compiler.source>
		<maven.compiler.target>1.8</maven.compiler.target>
	</properties>

	<dependencies>
		<dependency>
			<groupId>org.apache.commons</groupId>
			<artifactId>commons-lang3</artifactId>
			<version>3.1</version>
		</dependency>
		<dependency>
			<groupId>commons-io</groupId>
			<artifactId>commons-io</artifactId>
			<version>2.4</version>
		</dependency>

		<dependency>
			<groupId>org.apache.spark</groupId>
			<artifactId>spark-core_2.10</artifactId>
			<version>${spark.version}</version>
		</dependency>

		<dependency>
			<groupId>junit</groupId>
			<artifactId>junit</artifactId>
			<scope>test</scope>
			<version>${junit.version}</version>
		</dependency>
		<dependency>
			<groupId>org.hamcrest</groupId>
			<artifactId>hamcrest-all</artifactId>
			<scope>test</scope>
			<version>1.3</version>
		</dependency>
	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-compiler-plugin</artifactId>
				<version>3.3</version>
			</plugin>

			<!--  
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-shade-plugin</artifactId>
				<executions>
					<execution>
						<phase>package</phase>
						<goals>
							<goal>shade</goal>
						</goals>
					</execution>
				</executions>
				<configuration>
					<finalName>spark-demo</finalName>
					<shadedArtifactAttached>true</shadedArtifactAttached>
					<transformers>
						<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
							<mainClass>shuai.spark.SparkDemo</mainClass>
						</transformer>
					</transformers>
				</configuration>
			</plugin>
			-->
		</plugins>
	</build>
</project>
  • SparkDemo.java

package shuai.spark;

import java.io.File;
import java.util.Arrays;

import org.apache.commons.io.FileUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;

import scala.Tuple2;

public class SparkDemo {
	private static JavaSparkContext javaSparkContext = null;

	public static void init(String appName, String master) {
		SparkConf conf = new SparkConf().setAppName(appName).setMaster(master);
		javaSparkContext = new JavaSparkContext(conf);
	}

	@SuppressWarnings("serial")
	private static void wordCount(String filePath, String fileDir) {
		FileUtils.deleteQuietly(new File(fileDir));

		JavaRDD<String> file = javaSparkContext.textFile(filePath);

		JavaRDD<String> words = file.flatMap(new FlatMapFunction<String, String>() {
			@Override
			public Iterable<String> call(String s) {
				return Arrays.asList(s.split(" "));
			}
		});

		JavaPairRDD<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {
			@Override
			public Tuple2<String, Integer> call(String s) {
				return new Tuple2<String, Integer>(s, 1);
			}
		});

		JavaPairRDD<String, Integer> counts = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {
			@Override
			public Integer call(Integer a, Integer b) {
				return a + b;
			}
		});

		counts.saveAsTextFile(fileDir);
	}

	@SuppressWarnings("serial")
	private static void errorCount(String filePath) {
		JavaRDD<String> file = javaSparkContext.textFile(filePath);

		JavaRDD<String> errors = file.filter(new Function<String, Boolean>() {
			@Override
			public Boolean call(String s) {
				return s.contains("ERROR");
			}
		});

		errors.count();

		errors.filter(new Function<String, Boolean>() {
			@Override
			public Boolean call(String s) {
				return s.contains("ORACLE");
			}
		}).count();

		errors.filter(new Function<String, Boolean>() {
			@Override
			public Boolean call(String s) {
				return s.contains("ORACLE");
			}
		}).collect();
	}

	public static void main(String[] args) {
		if (args.length < 2) {
			System.err.println("Please provide the input file full path and output dir as argument");
			System.exit(0);
		}

		SparkDemo.init("shuai.spark", "local");

		SparkDemo.wordCount(args[0], args[1]);

		SparkDemo.errorCount(args[0]);
	}
}

6. Linux上部署Spark

创建相应的目录和文件,并将spark-demo-1.0.jar部署到环境上

[root@karei mylib]#
[root@karei mylib]# ll /usr/local/spark/mylib/spark-demo-1.0.jar
-rwxr--r-- 1 spark bigdata 9027 Apr 27 11:05 /usr/local/spark/mylib/spark-demo-1.0.jar
[root@karei mylib]#
[root@karei mylib]# ll /home/hadoop/input/test.log
-rw-r--r-- 1 root root 119 Apr 30 08:14 /home/hadoop/input/test.log
[root@karei mylib]#

[root@karei spark]# cat /home/hadoop/input/test.log
INFO : Spark testing start
WARN : Hadoop HDFS Configuration
ERROR : Database ORACLE Operation
INFO : Spark testing end
[root@karei spark]#

7. Sample运行

[root@karei spark]#
[root@karei spark]# ./bin/spark-submit --class shuai.spark.SparkDemo --master local[2] mylib/spark-demo-1.0.jar /home/hadoop/input/test.log /home/hadoop/output
[root@karei spark]#

[root@karei spark]#
[root@karei spark]# ./bin/spark-submit --class shuai.spark.SparkDemo --master local[2] mylib/spark-demo-1.0.jar /home/hadoop/input/test.log /home/hadoop/output
15/04/30 08:24:39 INFO spark.SparkContext: Running Spark version 1.3.1
15/04/30 08:24:39 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/04/30 08:24:39 INFO spark.SecurityManager: Changing view acls to: root
15/04/30 08:24:39 INFO spark.SecurityManager: Changing modify acls to: root
15/04/30 08:24:39 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); users with modify permissions: Set(root)
15/04/30 08:24:40 INFO slf4j.Slf4jLogger: Slf4jLogger started
15/04/30 08:24:40 INFO Remoting: Starting remoting
15/04/30 08:24:40 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@karei.netact.noklab.net:41232]
15/04/30 08:24:40 INFO util.Utils: Successfully started service 'sparkDriver' on port 41232.
15/04/30 08:24:40 INFO spark.SparkEnv: Registering MapOutputTracker
15/04/30 08:24:40 INFO spark.SparkEnv: Registering BlockManagerMaster
15/04/30 08:24:40 INFO storage.DiskBlockManager: Created local directory at /tmp/spark-5713b972-849e-44c9-9e92-8cd0b4de7668/blockmgr-24d0ad17-fef5-4179-9018-39057c519c5b
15/04/30 08:24:40 INFO storage.MemoryStore: MemoryStore started with capacity 265.0 MB
15/04/30 08:24:40 INFO spark.HttpFileServer: HTTP File server directory is /tmp/spark-ea736e60-3575-49b7-b8ba-1b2f8587bac3/httpd-eb6af892-2e5c-4331-a510-d882af085db4
15/04/30 08:24:40 INFO spark.HttpServer: Starting HTTP Server
15/04/30 08:24:40 INFO server.Server: jetty-8.y.z-SNAPSHOT
15/04/30 08:24:40 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:32977
15/04/30 08:24:40 INFO util.Utils: Successfully started service 'HTTP file server' on port 32977.
15/04/30 08:24:40 INFO spark.SparkEnv: Registering OutputCommitCoordinator
15/04/30 08:24:40 INFO server.Server: jetty-8.y.z-SNAPSHOT
15/04/30 08:24:40 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040
15/04/30 08:24:40 INFO util.Utils: Successfully started service 'SparkUI' on port 4040.
15/04/30 08:24:40 INFO ui.SparkUI: Started SparkUI at http://karei.netact.noklab.net:4040
15/04/30 08:24:40 INFO spark.SparkContext: Added JAR file:/usr/local/spark/mylib/spark-demo-1.0.jar at http://10.9.167.55:32977/jars/spark-demo-1.0.jar with timestamp 1430371480725
15/04/30 08:24:40 INFO executor.Executor: Starting executor ID <driver> on host localhost
15/04/30 08:24:40 INFO util.AkkaUtils: Connecting to HeartbeatReceiver: akka.tcp://sparkDriver@karei.netact.noklab.net:41232/user/HeartbeatReceiver
15/04/30 08:24:40 INFO netty.NettyBlockTransferService: Server created on 34209
15/04/30 08:24:40 INFO storage.BlockManagerMaster: Trying to register BlockManager
15/04/30 08:24:40 INFO storage.BlockManagerMasterActor: Registering block manager localhost:34209 with 265.0 MB RAM, BlockManagerId(<driver>, localhost, 34209)
15/04/30 08:24:40 INFO storage.BlockManagerMaster: Registered BlockManager
15/04/30 08:24:41 WARN util.SizeEstimator: Failed to check whether UseCompressedOops is set; assuming yes
15/04/30 08:24:41 INFO storage.MemoryStore: ensureFreeSpace(165175) called with curMem=0, maxMem=277877882
15/04/30 08:24:41 INFO storage.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 161.3 KB, free 264.8 MB)
15/04/30 08:24:41 INFO storage.MemoryStore: ensureFreeSpace(25610) called with curMem=165175, maxMem=277877882
15/04/30 08:24:41 INFO storage.MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 25.0 KB, free 264.8 MB)
15/04/30 08:24:41 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:34209 (size: 25.0 KB, free: 265.0 MB)
15/04/30 08:24:41 INFO storage.BlockManagerMaster: Updated info of block broadcast_0_piece0
15/04/30 08:24:41 INFO spark.SparkContext: Created broadcast 0 from textFile at SparkDemo.java:30
15/04/30 08:24:41 INFO mapred.FileInputFormat: Total input paths to process : 1
15/04/30 08:24:41 INFO Configuration.deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id
15/04/30 08:24:41 INFO Configuration.deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id
15/04/30 08:24:41 INFO Configuration.deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap
15/04/30 08:24:41 INFO Configuration.deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition
15/04/30 08:24:41 INFO Configuration.deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id
15/04/30 08:24:41 INFO spark.SparkContext: Starting job: saveAsTextFile at SparkDemo.java:53
15/04/30 08:24:41 INFO scheduler.DAGScheduler: Registering RDD 3 (mapToPair at SparkDemo.java:39)
15/04/30 08:24:41 INFO scheduler.DAGScheduler: Got job 0 (saveAsTextFile at SparkDemo.java:53) with 1 output partitions (allowLocal=false)
15/04/30 08:24:41 INFO scheduler.DAGScheduler: Final stage: Stage 1(saveAsTextFile at SparkDemo.java:53)
15/04/30 08:24:41 INFO scheduler.DAGScheduler: Parents of final stage: List(Stage 0)
15/04/30 08:24:41 INFO scheduler.DAGScheduler: Missing parents: List(Stage 0)
15/04/30 08:24:41 INFO scheduler.DAGScheduler: Submitting Stage 0 (MapPartitionsRDD[3] at mapToPair at SparkDemo.java:39), which has no missing parents
15/04/30 08:24:41 INFO storage.MemoryStore: ensureFreeSpace(4344) called with curMem=190785, maxMem=277877882
15/04/30 08:24:41 INFO storage.MemoryStore: Block broadcast_1 stored as values in memory (estimated size 4.2 KB, free 264.8 MB)
15/04/30 08:24:41 INFO storage.MemoryStore: ensureFreeSpace(3085) called with curMem=195129, maxMem=277877882
15/04/30 08:24:41 INFO storage.MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 3.0 KB, free 264.8 MB)
15/04/30 08:24:41 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:34209 (size: 3.0 KB, free: 265.0 MB)
15/04/30 08:24:41 INFO storage.BlockManagerMaster: Updated info of block broadcast_1_piece0
15/04/30 08:24:41 INFO spark.SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:839
15/04/30 08:24:41 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from Stage 0 (MapPartitionsRDD[3] at mapToPair at SparkDemo.java:39)
15/04/30 08:24:41 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
15/04/30 08:24:41 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, PROCESS_LOCAL, 1343 bytes)
15/04/30 08:24:41 INFO executor.Executor: Running task 0.0 in stage 0.0 (TID 0)
15/04/30 08:24:41 INFO executor.Executor: Fetching http://10.9.167.55:32977/jars/spark-demo-1.0.jar with timestamp 1430371480725
15/04/30 08:24:42 INFO util.Utils: Fetching http://10.9.167.55:32977/jars/spark-demo-1.0.jar to /tmp/spark-4cb44a60-1a2b-4490-bf6a-f123e6c13949/userFiles-22bb9d0d-6d44-48bd-910b-8a0b6581d11a/fetchFileTemp4845879217433172142.tmp
15/04/30 08:24:42 INFO executor.Executor: Adding file:/tmp/spark-4cb44a60-1a2b-4490-bf6a-f123e6c13949/userFiles-22bb9d0d-6d44-48bd-910b-8a0b6581d11a/spark-demo-1.0.jar to class loader
15/04/30 08:24:42 INFO rdd.HadoopRDD: Input split: file:/home/hadoop/input/test.log:0+119
15/04/30 08:24:42 INFO executor.Executor: Finished task 0.0 in stage 0.0 (TID 0). 2003 bytes result sent to driver
15/04/30 08:24:42 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 274 ms on localhost (1/1)
15/04/30 08:24:42 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
15/04/30 08:24:42 INFO scheduler.DAGScheduler: Stage 0 (mapToPair at SparkDemo.java:39) finished in 0.293 s
15/04/30 08:24:42 INFO scheduler.DAGScheduler: looking for newly runnable stages
15/04/30 08:24:42 INFO scheduler.DAGScheduler: running: Set()
15/04/30 08:24:42 INFO scheduler.DAGScheduler: waiting: Set(Stage 1)
15/04/30 08:24:42 INFO scheduler.DAGScheduler: failed: Set()
15/04/30 08:24:42 INFO scheduler.DAGScheduler: Missing parents for Stage 1: List()
15/04/30 08:24:42 INFO scheduler.DAGScheduler: Submitting Stage 1 (MapPartitionsRDD[5] at saveAsTextFile at SparkDemo.java:53), which is now runnable
15/04/30 08:24:42 INFO storage.MemoryStore: ensureFreeSpace(127584) called with curMem=198214, maxMem=277877882
15/04/30 08:24:42 INFO storage.MemoryStore: Block broadcast_2 stored as values in memory (estimated size 124.6 KB, free 264.7 MB)
15/04/30 08:24:42 INFO storage.MemoryStore: ensureFreeSpace(77104) called with curMem=325798, maxMem=277877882
15/04/30 08:24:42 INFO storage.MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 75.3 KB, free 264.6 MB)
15/04/30 08:24:42 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in memory on localhost:34209 (size: 75.3 KB, free: 264.9 MB)
15/04/30 08:24:42 INFO storage.BlockManagerMaster: Updated info of block broadcast_2_piece0
15/04/30 08:24:42 INFO spark.SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:839
15/04/30 08:24:42 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from Stage 1 (MapPartitionsRDD[5] at saveAsTextFile at SparkDemo.java:53)
15/04/30 08:24:42 INFO scheduler.TaskSchedulerImpl: Adding task set 1.0 with 1 tasks
15/04/30 08:24:42 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1, localhost, PROCESS_LOCAL, 1114 bytes)
15/04/30 08:24:42 INFO executor.Executor: Running task 0.0 in stage 1.0 (TID 1)
15/04/30 08:24:42 INFO storage.ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 1 blocks
15/04/30 08:24:42 INFO storage.ShuffleBlockFetcherIterator: Started 0 remote fetches in 4 ms
15/04/30 08:24:42 INFO output.FileOutputCommitter: Saved output of task 'attempt_201504300824_0001_m_000000_1' to file:/home/hadoop/output/_temporary/0/task_201504300824_0001_m_000000
15/04/30 08:24:42 INFO mapred.SparkHadoopMapRedUtil: attempt_201504300824_0001_m_000000_1: Committed
15/04/30 08:24:42 INFO executor.Executor: Finished task 0.0 in stage 1.0 (TID 1). 1828 bytes result sent to driver
15/04/30 08:24:42 INFO scheduler.DAGScheduler: Stage 1 (saveAsTextFile at SparkDemo.java:53) finished in 0.171 s
15/04/30 08:24:42 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 171 ms on localhost (1/1)
15/04/30 08:24:42 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
15/04/30 08:24:42 INFO scheduler.DAGScheduler: Job 0 finished: saveAsTextFile at SparkDemo.java:53, took 0.640220 s
15/04/30 08:24:42 INFO storage.MemoryStore: ensureFreeSpace(76899) called with curMem=402902, maxMem=277877882
15/04/30 08:24:42 INFO storage.MemoryStore: Block broadcast_3 stored as values in memory (estimated size 75.1 KB, free 264.5 MB)
15/04/30 08:24:42 INFO storage.MemoryStore: ensureFreeSpace(36686) called with curMem=479801, maxMem=277877882
15/04/30 08:24:42 INFO storage.MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 35.8 KB, free 264.5 MB)
15/04/30 08:24:42 INFO storage.BlockManagerInfo: Added broadcast_3_piece0 in memory on localhost:34209 (size: 35.8 KB, free: 264.9 MB)
15/04/30 08:24:42 INFO storage.BlockManagerMaster: Updated info of block broadcast_3_piece0
15/04/30 08:24:42 INFO spark.SparkContext: Created broadcast 3 from textFile at SparkDemo.java:58
15/04/30 08:24:42 INFO mapred.FileInputFormat: Total input paths to process : 1
15/04/30 08:24:42 INFO spark.SparkContext: Starting job: count at SparkDemo.java:67
15/04/30 08:24:42 INFO scheduler.DAGScheduler: Got job 1 (count at SparkDemo.java:67) with 1 output partitions (allowLocal=false)
15/04/30 08:24:42 INFO scheduler.DAGScheduler: Final stage: Stage 2(count at SparkDemo.java:67)
15/04/30 08:24:42 INFO scheduler.DAGScheduler: Parents of final stage: List()
15/04/30 08:24:42 INFO scheduler.DAGScheduler: Missing parents: List()
15/04/30 08:24:42 INFO scheduler.DAGScheduler: Submitting Stage 2 (MapPartitionsRDD[8] at filter at SparkDemo.java:60), which has no missing parents
15/04/30 08:24:42 INFO storage.MemoryStore: ensureFreeSpace(2936) called with curMem=516487, maxMem=277877882
15/04/30 08:24:42 INFO storage.MemoryStore: Block broadcast_4 stored as values in memory (estimated size 2.9 KB, free 264.5 MB)
15/04/30 08:24:42 INFO storage.MemoryStore: ensureFreeSpace(2102) called with curMem=519423, maxMem=277877882
15/04/30 08:24:42 INFO storage.MemoryStore: Block broadcast_4_piece0 stored as bytes in memory (estimated size 2.1 KB, free 264.5 MB)
15/04/30 08:24:42 INFO storage.BlockManagerInfo: Added broadcast_4_piece0 in memory on localhost:34209 (size: 2.1 KB, free: 264.9 MB)
15/04/30 08:24:42 INFO storage.BlockManagerMaster: Updated info of block broadcast_4_piece0
15/04/30 08:24:42 INFO spark.SparkContext: Created broadcast 4 from broadcast at DAGScheduler.scala:839
15/04/30 08:24:42 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from Stage 2 (MapPartitionsRDD[8] at filter at SparkDemo.java:60)
15/04/30 08:24:42 INFO scheduler.TaskSchedulerImpl: Adding task set 2.0 with 1 tasks
15/04/30 08:24:42 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 2.0 (TID 2, localhost, PROCESS_LOCAL, 1354 bytes)
15/04/30 08:24:42 INFO executor.Executor: Running task 0.0 in stage 2.0 (TID 2)
15/04/30 08:24:42 INFO rdd.HadoopRDD: Input split: file:/home/hadoop/input/test.log:0+119
15/04/30 08:24:42 INFO executor.Executor: Finished task 0.0 in stage 2.0 (TID 2). 1830 bytes result sent to driver
15/04/30 08:24:42 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 2.0 (TID 2) in 8 ms on localhost (1/1)
15/04/30 08:24:42 INFO scheduler.DAGScheduler: Stage 2 (count at SparkDemo.java:67) finished in 0.008 s
15/04/30 08:24:42 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool
15/04/30 08:24:42 INFO scheduler.DAGScheduler: Job 1 finished: count at SparkDemo.java:67, took 0.017870 s
15/04/30 08:24:42 INFO spark.SparkContext: Starting job: count at SparkDemo.java:74
15/04/30 08:24:42 INFO scheduler.DAGScheduler: Got job 2 (count at SparkDemo.java:74) with 1 output partitions (allowLocal=false)
15/04/30 08:24:42 INFO scheduler.DAGScheduler: Final stage: Stage 3(count at SparkDemo.java:74)
15/04/30 08:24:42 INFO scheduler.DAGScheduler: Parents of final stage: List()
15/04/30 08:24:42 INFO scheduler.DAGScheduler: Missing parents: List()
15/04/30 08:24:42 INFO scheduler.DAGScheduler: Submitting Stage 3 (MapPartitionsRDD[9] at filter at SparkDemo.java:69), which has no missing parents
15/04/30 08:24:42 INFO storage.MemoryStore: ensureFreeSpace(3040) called with curMem=521525, maxMem=277877882
15/04/30 08:24:42 INFO storage.MemoryStore: Block broadcast_5 stored as values in memory (estimated size 3.0 KB, free 264.5 MB)
15/04/30 08:24:42 INFO storage.MemoryStore: ensureFreeSpace(2156) called with curMem=524565, maxMem=277877882
15/04/30 08:24:42 INFO storage.MemoryStore: Block broadcast_5_piece0 stored as bytes in memory (estimated size 2.1 KB, free 264.5 MB)
15/04/30 08:24:42 INFO storage.BlockManagerInfo: Added broadcast_5_piece0 in memory on localhost:34209 (size: 2.1 KB, free: 264.9 MB)
15/04/30 08:24:42 INFO storage.BlockManagerMaster: Updated info of block broadcast_5_piece0
15/04/30 08:24:42 INFO spark.SparkContext: Created broadcast 5 from broadcast at DAGScheduler.scala:839
15/04/30 08:24:42 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from Stage 3 (MapPartitionsRDD[9] at filter at SparkDemo.java:69)
15/04/30 08:24:42 INFO scheduler.TaskSchedulerImpl: Adding task set 3.0 with 1 tasks
15/04/30 08:24:42 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 3.0 (TID 3, localhost, PROCESS_LOCAL, 1354 bytes)
15/04/30 08:24:42 INFO executor.Executor: Running task 0.0 in stage 3.0 (TID 3)
15/04/30 08:24:42 INFO rdd.HadoopRDD: Input split: file:/home/hadoop/input/test.log:0+119
15/04/30 08:24:42 INFO executor.Executor: Finished task 0.0 in stage 3.0 (TID 3). 1830 bytes result sent to driver
15/04/30 08:24:42 INFO scheduler.DAGScheduler: Stage 3 (count at SparkDemo.java:74) finished in 0.009 s
15/04/30 08:24:42 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 3.0 (TID 3) in 9 ms on localhost (1/1)
15/04/30 08:24:42 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 3.0, whose tasks have all completed, from pool
15/04/30 08:24:42 INFO scheduler.DAGScheduler: Job 2 finished: count at SparkDemo.java:74, took 0.019982 s
15/04/30 08:24:42 INFO spark.SparkContext: Starting job: collect at SparkDemo.java:81
15/04/30 08:24:42 INFO scheduler.DAGScheduler: Got job 3 (collect at SparkDemo.java:81) with 1 output partitions (allowLocal=false)
15/04/30 08:24:42 INFO scheduler.DAGScheduler: Final stage: Stage 4(collect at SparkDemo.java:81)
15/04/30 08:24:42 INFO scheduler.DAGScheduler: Parents of final stage: List()
15/04/30 08:24:42 INFO scheduler.DAGScheduler: Missing parents: List()
15/04/30 08:24:42 INFO scheduler.DAGScheduler: Submitting Stage 4 (MapPartitionsRDD[10] at filter at SparkDemo.java:76), which has no missing parents
15/04/30 08:24:42 INFO storage.MemoryStore: ensureFreeSpace(3056) called with curMem=526721, maxMem=277877882
15/04/30 08:24:42 INFO storage.MemoryStore: Block broadcast_6 stored as values in memory (estimated size 3.0 KB, free 264.5 MB)
15/04/30 08:24:42 INFO storage.MemoryStore: ensureFreeSpace(2164) called with curMem=529777, maxMem=277877882
15/04/30 08:24:42 INFO storage.MemoryStore: Block broadcast_6_piece0 stored as bytes in memory (estimated size 2.1 KB, free 264.5 MB)
15/04/30 08:24:42 INFO storage.BlockManagerInfo: Added broadcast_6_piece0 in memory on localhost:34209 (size: 2.1 KB, free: 264.9 MB)
15/04/30 08:24:42 INFO storage.BlockManagerMaster: Updated info of block broadcast_6_piece0
15/04/30 08:24:42 INFO spark.SparkContext: Created broadcast 6 from broadcast at DAGScheduler.scala:839
15/04/30 08:24:42 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from Stage 4 (MapPartitionsRDD[10] at filter at SparkDemo.java:76)
15/04/30 08:24:42 INFO scheduler.TaskSchedulerImpl: Adding task set 4.0 with 1 tasks
15/04/30 08:24:42 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 4.0 (TID 4, localhost, PROCESS_LOCAL, 1354 bytes)
15/04/30 08:24:42 INFO executor.Executor: Running task 0.0 in stage 4.0 (TID 4)
15/04/30 08:24:42 INFO rdd.HadoopRDD: Input split: file:/home/hadoop/input/test.log:0+119
15/04/30 08:24:42 INFO executor.Executor: Finished task 0.0 in stage 4.0 (TID 4). 1828 bytes result sent to driver
15/04/30 08:24:42 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 4.0 (TID 4) in 7 ms on localhost (1/1)
15/04/30 08:24:42 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 4.0, whose tasks have all completed, from pool
15/04/30 08:24:42 INFO scheduler.DAGScheduler: Stage 4 (collect at SparkDemo.java:81) finished in 0.006 s
15/04/30 08:24:42 INFO scheduler.DAGScheduler: Job 3 finished: collect at SparkDemo.java:81, took 0.017157 s
[root@karei spark]#

8. 输出结果

[root@karei spark]# cat /home/hadoop/output/part-00000
(Spark,2)
(Operation,1)
(ORACLE,1)
(ERROR,1)
(start,1)
(INFO,2)
(Configuration,1)
(end,1)
(Database,1)
(testing,2)
(HDFS,1)
(WARN,1)
(:,4)
(Hadoop,1)
[root@karei spark]#

相关文章
|
机器学习/深度学习 分布式计算 Spark
|
机器学习/深度学习 分布式计算 算法
|
分布式计算 Apache Spark
|
分布式计算 算法 测试技术
|
16天前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
48 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
1月前
|
存储 分布式计算 算法
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
60 0
|
1月前
|
消息中间件 分布式计算 NoSQL
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
40 0
|
1月前
|
消息中间件 存储 分布式计算
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
82 0
|
17天前
|
SQL 机器学习/深度学习 分布式计算
Spark快速上手:揭秘大数据处理的高效秘密,让你轻松应对海量数据
【10月更文挑战第25天】本文全面介绍了大数据处理框架 Spark,涵盖其基本概念、安装配置、编程模型及实际应用。Spark 是一个高效的分布式计算平台,支持批处理、实时流处理、SQL 查询和机器学习等任务。通过详细的技术综述和示例代码,帮助读者快速掌握 Spark 的核心技能。
45 6