使用Java编写并运行Spark应用程序

简介:

我们首先提出这样一个简单的需求:
现在要分析某网站的访问日志信息,统计来自不同IP的用户访问的次数,从而通过Geo信息来获得来访用户所在国家地区分布状况。这里我拿我网站的日志记录行示例,如下所示:

1 121.205.198.92 - - [21/Feb/2014:00:00:07 +0800] "GET /archives/417.html HTTP/1.1" 200 11465 "http://shiyanjun.cn/archives/417.html/" "Mozilla/5.0 (Windows NT 5.1; rv:11.0) Gecko/20100101 Firefox/11.0"
2 121.205.198.92 - - [21/Feb/2014:00:00:11 +0800] "POST /wp-comments-post.php HTTP/1.1" 302 26 "http://shiyanjun.cn/archives/417.html/" "Mozilla/5.0 (Windows NT 5.1; rv:23.0) Gecko/20100101 Firefox/23.0"
3 121.205.198.92 - - [21/Feb/2014:00:00:12 +0800] "GET /archives/417.html/ HTTP/1.1" 301 26 "http://shiyanjun.cn/archives/417.html/" "Mozilla/5.0 (Windows NT 5.1; rv:11.0) Gecko/20100101 Firefox/11.0"
4 121.205.198.92 - - [21/Feb/2014:00:00:12 +0800] "GET /archives/417.html HTTP/1.1" 200 11465 "http://shiyanjun.cn/archives/417.html" "Mozilla/5.0 (Windows NT 5.1; rv:11.0) Gecko/20100101 Firefox/11.0"
5 121.205.241.229 - - [21/Feb/2014:00:00:13 +0800] "GET /archives/526.html HTTP/1.1" 200 12080 "http://shiyanjun.cn/archives/526.html/" "Mozilla/5.0 (Windows NT 5.1; rv:11.0) Gecko/20100101 Firefox/11.0"
6 121.205.241.229 - - [21/Feb/2014:00:00:15 +0800] "POST /wp-comments-post.php HTTP/1.1" 302 26 "http://shiyanjun.cn/archives/526.html/" "Mozilla/5.0 (Windows NT 5.1; rv:23.0) Gecko/20100101 Firefox/23.0"

Java实现Spark应用程序(Application)

我们实现的统计分析程序,有如下几个功能点:

  • 从HDFS读取日志数据文件
  • 将每行的第一个字段(IP地址)抽取出来
  • 统计每个IP地址出现的次数
  • 根据每个IP地址出现的次数进行一个降序排序
  • 根据IP地址,调用GeoIP库获取IP所属国家
  • 打印输出结果,每行的格式:[国家代码] IP地址 频率

下面,看我们使用Java实现的统计分析应用程序代码,如下所示:

001 package org.shirdrn.spark.job;
002
003 import java.io.File;
004 import java.io.IOException;
005 import java.util.Arrays;
006 import java.util.Collections;
007 import java.util.Comparator;
008 import java.util.List;
009 import java.util.regex.Pattern;
010
011 import org.apache.commons.logging.Log;
012 import org.apache.commons.logging.LogFactory;
013 import org.apache.spark.api.java.JavaPairRDD;
014 import org.apache.spark.api.java.JavaRDD;
015 import org.apache.spark.api.java.JavaSparkContext;
016 import org.apache.spark.api.java.function.FlatMapFunction;
017 import org.apache.spark.api.java.function.Function2;
018 import org.apache.spark.api.java.function.PairFunction;
019 import org.shirdrn.spark.job.maxmind.Country;
020 import org.shirdrn.spark.job.maxmind.LookupService;
021
022 import scala.Serializable;
023 import scala.Tuple2;
024
025 public class IPAddressStats implements Serializable {
026
027 private static final long serialVersionUID = 8533489548835413763L;
028 private static final Log LOG = LogFactory.getLog(IPAddressStats.class);
029 private static final Pattern SPACE = Pattern.compile(" ");
030 private transient LookupService lookupService;
031 private transient final String geoIPFile;
032
033 public IPAddressStats(String geoIPFile) {
034 this.geoIPFile = geoIPFile;
035 try {
036 // lookupService: get country code from a IP address
037 File file = new File(this.geoIPFile);
038 LOG.info("GeoIP file: " + file.getAbsolutePath());
039 lookupService = new AdvancedLookupService(file, LookupService.GEOIP_MEMORY_CACHE);
040 } catch (IOException e) {
041 throw new RuntimeException(e);
042 }
043 }
044
045 @SuppressWarnings("serial")
046 public void stat(String[] args) {
047 JavaSparkContext ctx = new JavaSparkContext(args[0], "IPAddressStats",
048 System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(IPAddressStats.class));
049 JavaRDD<String> lines = ctx.textFile(args[1], 1);
050
051 // splits and extracts ip address filed
052 JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
053 @Override
054 public Iterable<String> call(String s) {
055 // 121.205.198.92 - - [21/Feb/2014:00:00:07 +0800] "GET /archives/417.html HTTP/1.1" 200 11465 "http://shiyanjun.cn/archives/417.html/" "Mozilla/5.0 (Windows NT 5.1; rv:11.0) Gecko/20100101 Firefox/11.0"
056 // ip address
057 return Arrays.asList(SPACE.split(s)[0]);
058 }
059 });
060
061 // map
062 JavaPairRDD<String, Integer> ones = words.map(new PairFunction<String, String, Integer>() {
063 @Override
064 public Tuple2<String, Integer> call(String s) {
065 return new Tuple2<String, Integer>(s, 1);
066 }
067 });
068
069 // reduce
070 JavaPairRDD<String, Integer> counts = ones.reduceByKey(newFunction2<Integer, Integer, Integer>() {
071 @Override
072 public Integer call(Integer i1, Integer i2) {
073 return i1 + i2;
074 }
075 });
076
077 List<Tuple2<String, Integer>> output = counts.collect();
078
079 // sort statistics result by value
080 Collections.sort(output, new Comparator<Tuple2<String, Integer>>() {
081 @Override
082 public int compare(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) {
083 if(t1._2 < t2._2) {
084 return 1;
085 } else if(t1._2 > t2._2) {
086 return -1;
087 }
088 return 0;
089 }
090 });
091
092 writeTo(args, output);
093
094 }
095
096 private void writeTo(String[] args, List<Tuple2<String, Integer>> output) {
097 for (Tuple2<?, ?> tuple : output) {
098 Country country = lookupService.getCountry((String) tuple._1);
099 LOG.info("[" + country.getCode() + "] " + tuple._1 + "\t" + tuple._2);
100 }
101 }
102
103 public static void main(String[] args) {
104 // ./bin/run-my-java-example org.shirdrn.spark.job.IPAddressStatsspark://m1:7077 hdfs://m1:9000/user/shirdrn/wwwlog20140222.log/home/shirdrn/cloud/programs/spark-0.9.0-incubating-bin-hadoop1/java-examples/GeoIP_DATABASE.dat
105 if (args.length < 3) {
106 System.err.println("Usage: IPAddressStats <master> <inFile> <GeoIPFile>");
107 System.err.println(" Example: org.shirdrn.spark.job.IPAddressStatsspark://m1:7077 hdfs://m1:9000/user/shirdrn/wwwlog20140222.log/home/shirdrn/cloud/programs/spark-0.9.0-incubating-bin-hadoop1/java-examples/GeoIP_DATABASE.dat");
108 System.exit(1);
109 }
110
111 String geoIPFile = args[2];
112 IPAddressStats stats = new IPAddressStats(geoIPFile);
113 stats.stat(args);
114
115 System.exit(0);
116
117 }
118
119 }

具体实现逻辑,可以参考代码中的注释。我们使用Maven管理构建Java程序,首先看一下我的pom配置中所依赖的软件包,如下所示:

01 <dependencies>
02 <dependency>
03 <groupId>org.apache.spark</groupId>
04 <artifactId>spark-core_2.10</artifactId>
05 <version>0.9.0-incubating</version>
06 </dependency>
07 <dependency>
08 <groupId>log4j</groupId>
09 <artifactId>log4j</artifactId>
10 <version>1.2.16</version>
11 </dependency>
12 <dependency>
13 <groupId>dnsjava</groupId>
14 <artifactId>dnsjava</artifactId>
15 <version>2.1.1</version>
16 </dependency>
17 <dependency>
18 <groupId>commons-net</groupId>
19 <artifactId>commons-net</artifactId>
20 <version>3.1</version>
21 </dependency>
22 <dependency>
23 <groupId>org.apache.hadoop</groupId>
24 <artifactId>hadoop-client</artifactId>
25 <version>1.2.1</version>
26 </dependency>
27 </dependencies>

需要说明的是,当我们将程序在Spark集群上运行时,它要求我们的编写的Job能够进行序列化,如果某些字段不需要序列化或者无法序列化,可以直接使用transient修饰即可,如上面的属性lookupService没有实现序列化接口,使用transient使其不执行序列化,否则的话,可能会出现类似如下的错误:

01 14/03/10 22:34:06 INFO scheduler.DAGScheduler: Failed to run collect at IPAddressStats.java:76
02 Exception in thread "main" org.apache.spark.SparkException: Job aborted: Task not serializable: java.io.NotSerializableException: org.shirdrn.spark.job.IPAddressStats
03 at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
04 at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
05 at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
06 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
07 at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
08 at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:794)
09 at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:737)
10 at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:741)
11 at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:740)
12 at scala.collection.immutable.List.foreach(List.scala:318)
13 at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:740)
14 at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:569)
15 at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
16 at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
17 at akka.actor.ActorCell.invoke(ActorCell.scala:456)
18 at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
19 at akka.dispatch.Mailbox.run(Mailbox.scala:219)
20 at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
21 at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
22 at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
23 at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
24 at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

在Spark集群上运行Java程序

这里,我使用了Maven管理构建Java程序,实现上述代码以后,使用Maven的maven-assembly-plugin插件,配置内容如下所示:

01 <plugin>
02 <artifactId>maven-assembly-plugin</artifactId>
03 <configuration>
04 <archive>
05 <manifest>
06 <mainClass>org.shirdrn.spark.job.UserAgentStats</mainClass>
07 </manifest>
08 </archive>
09 <descriptorRefs>
10 <descriptorRef>jar-with-dependencies</descriptorRef>
11 </descriptorRefs>
12 <excludes>
13 <exclude>*.properties</exclude>
14 <exclude>*.xml</exclude>
15 </excludes>
16 </configuration>
17 <executions>
18 <execution>
19 <id>make-assembly</id>
20 <phase>package</phase>
21 <goals>
22 <goal>single</goal>
23 </goals>
24 </execution>
25 </executions>
26 </plugin>

将相关依赖库文件都打进程序包里面,最后拷贝JAR文件到Linux系统下(不一定非要在Spark集群的Master节点上),保证该节点上Spark的环境变量配置正确即可看。Spark软件发行包解压缩后,可以看到脚本bin/run-example,我们可以直接修改该脚本,将对应的路径指向我们实现的Java程序包(修改变量EXAMPLES_DIR以及我们的JAR文件存放位置相关的内容),使用该脚本就可以运行,脚本内容如下所示:

01 cygwin=false
02 case "`uname`" in
03 CYGWIN*) cygwin=true;;
04 esac
05
06 SCALA_VERSION=2.10
07
08 # Figure out where the Scala framework is installed
09 FWDIR="$(cd `dirname $0`/..; pwd)"
10
11 # Export this as SPARK_HOME
12 export SPARK_HOME="$FWDIR"
13
14 # Load environment variables from conf/spark-env.sh, if it exists
15 if [ -e "$FWDIR/conf/spark-env.sh" ] ; then
16 . $FWDIR/conf/spark-env.sh
17 fi
18
19 if [ -z "$1" ]; then
20 echo "Usage: run-example <example-class> [<args>]" >&2
21 exit 1
22 fi
23
24 # Figure out the JAR file that our examples were packaged into. This includes a bit of a hack
25 # to avoid the -sources and -doc packages that are built by publish-local.
26 EXAMPLES_DIR="$FWDIR"/java-examples
27 SPARK_EXAMPLES_JAR=""
28 if [ -e "$EXAMPLES_DIR"/*.jar ]; then
29 export SPARK_EXAMPLES_JAR=`ls "$EXAMPLES_DIR"/*.jar`
30 fi
31 if [[ -z $SPARK_EXAMPLES_JAR ]]; then
32 echo "Failed to find Spark examples assembly in $FWDIR/examples/target" >&2
33 echo "You need to build Spark with sbt/sbt assembly before running this program" >&2
34 exit 1
35 fi
36
37
38 # Since the examples JAR ideally shouldn't include spark-core (that dependency should be
39 # "provided"), also add our standard Spark classpath, built using compute-classpath.sh.
40 CLASSPATH=`$FWDIR/bin/compute-classpath.sh`
41 CLASSPATH="$SPARK_EXAMPLES_JAR:$CLASSPATH"
42
43 if $cygwin; then
44 CLASSPATH=`cygpath -wp $CLASSPATH`
45 export SPARK_EXAMPLES_JAR=`cygpath -w $SPARK_EXAMPLES_JAR`
46 fi
47
48 # Find java binary
49 if [ -n "${JAVA_HOME}" ]; then
50 RUNNER="${JAVA_HOME}/bin/java"
51 else
52 if [ `command -v java` ]; then
53 RUNNER="java"
54 else
55 echo "JAVA_HOME is not set" >&2
56 exit 1
57 fi
58 fi
59
60 # Set JAVA_OPTS to be able to load native libraries and to set heap size
61 JAVA_OPTS="$SPARK_JAVA_OPTS"
62 JAVA_OPTS="$JAVA_OPTS -Djava.library.path=$SPARK_LIBRARY_PATH"
63 # Load extra JAVA_OPTS from conf/java-opts, if it exists
64 if [ -e "$FWDIR/conf/java-opts" ] ; then
65 JAVA_OPTS="$JAVA_OPTS `cat $FWDIR/conf/java-opts`"
66 fi
67 export JAVA_OPTS
68
69 if [ "$SPARK_PRINT_LAUNCH_COMMAND" == "1" ]; then
70 echo -n "Spark Command: "
71 echo "$RUNNER" -cp "$CLASSPATH" $JAVA_OPTS "$@"
72 echo "========================================"
73 echo
74 fi
75
76 exec "$RUNNER" -cp "$CLASSPATH" $JAVA_OPTS "$@"

在Spark上运行我们开发的Java程序,执行如下命令:

1 cd /home/shirdrn/cloud/programs/spark-0.9.0-incubating-bin-hadoop1
2 ./bin/run-my-java-example org.shirdrn.spark.job.IPAddressStats spark://m1:7077hdfs://m1:9000/user/shirdrn/wwwlog20140222.log /home/shirdrn/cloud/programs/spark-0.9.0-incubating-bin-hadoop1/java-examples/GeoIP_DATABASE.dat

我实现的程序类org.shirdrn.spark.job.IPAddressStats运行需要3个参数:

  1. Spark集群主节点URL:例如我的是spark://m1:7077
  2. 输入文件路径:业务相关的,我这里是从HDFS上读取文件hdfs://m1:9000/user/shirdrn/wwwlog20140222.log
  3. GeoIP库文件:业务相关的,用来计算IP地址所属国家的外部文件

如果程序没有错误,能够正常运行,控制台输出程序运行日志,示例如下所示:

01 14/03/10 22:17:24 INFO job.IPAddressStats: GeoIP file: /home/shirdrn/cloud/programs/spark-0.9.0-incubating-bin-hadoop1/java-examples/GeoIP_DATABASE.dat
02 SLF4J: Class path contains multiple SLF4J bindings.
03 SLF4J: Found binding in [jar:file:/home/shirdrn/cloud/programs/spark-0.9.0-incubating-bin-hadoop1/java-examples/spark-0.0.1-SNAPSHOT-jar-with-dependencies.jar!/org/slf4j/impl/StaticLoggerBinder.class]
04 SLF4J: Found binding in [jar:file:/home/shirdrn/cloud/programs/spark-0.9.0-incubating-bin-hadoop1/assembly/target/scala-2.10/spark-assembly_2.10-0.9.0-incubating-hadoop1.0.4.jar!/org/slf4j/impl/StaticLoggerBinder.class]
05 SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
06 SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
07 14/03/10 22:17:25 INFO slf4j.Slf4jLogger: Slf4jLogger started
08 14/03/10 22:17:25 INFO Remoting: Starting remoting
09 14/03/10 22:17:25 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://spark@m1:57379]
10 14/03/10 22:17:25 INFO Remoting: Remoting now listens on addresses: [akka.tcp://spark@m1:57379]
11 14/03/10 22:17:25 INFO spark.SparkEnv: Registering BlockManagerMaster
12 14/03/10 22:17:25 INFO storage.DiskBlockManager: Created local directory at /tmp/spark-local-20140310221725-c1cb
13 14/03/10 22:17:25 INFO storage.MemoryStore: MemoryStore started with capacity 143.8 MB.
14 14/03/10 22:17:25 INFO network.ConnectionManager: Bound socket to port 45189 with id = ConnectionManagerId(m1,45189)
15 14/03/10 22:17:25 INFO storage.BlockManagerMaster: Trying to register BlockManager
16 14/03/10 22:17:25 INFO storage.BlockManagerMasterActor$BlockManagerInfo: Registering block manager m1:45189 with 143.8 MB RAM
17 14/03/10 22:17:25 INFO storage.BlockManagerMaster: Registered BlockManager
18 14/03/10 22:17:25 INFO spark.HttpServer: Starting HTTP Server
19 14/03/10 22:17:25 INFO server.Server: jetty-7.x.y-SNAPSHOT
20 14/03/10 22:17:25 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:49186
21 14/03/10 22:17:25 INFO broadcast.HttpBroadcast: Broadcast server started athttp://10.95.3.56:49186
22 14/03/10 22:17:25 INFO spark.SparkEnv: Registering MapOutputTracker
23 14/03/10 22:17:25 INFO spark.HttpFileServer: HTTP File server directory is /tmp/spark-56c3e30d-a01b-4752-83d1-af1609ab2370
24 14/03/10 22:17:25 INFO spark.HttpServer: Starting HTTP Server
25 14/03/10 22:17:25 INFO server.Server: jetty-7.x.y-SNAPSHOT
26 14/03/10 22:17:25 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:52073
27 14/03/10 22:17:26 INFO server.Server: jetty-7.x.y-SNAPSHOT
28 14/03/10 22:17:26 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/storage/rdd,null}
29 14/03/10 22:17:26 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/storage,null}
30 14/03/10 22:17:26 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/stages/stage,null}
31 14/03/10 22:17:26 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/stages/pool,null}
32 14/03/10 22:17:26 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/stages,null}
33 14/03/10 22:17:26 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/environment,null}
34 14/03/10 22:17:26 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/executors,null}
35 14/03/10 22:17:26 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/metrics/json,null}
36 14/03/10 22:17:26 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/static,null}
37 14/03/10 22:17:26 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/,null}
38 14/03/10 22:17:26 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040
39 14/03/10 22:17:26 INFO ui.SparkUI: Started Spark Web UI at http://m1:4040
40 14/03/10 22:17:26 INFO spark.SparkContext: Added JAR /home/shirdrn/cloud/programs/spark-0.9.0-incubating-bin-hadoop1/java-examples/spark-0.0.1-SNAPSHOT-jar-with-dependencies.jar at http://10.95.3.56:52073/jars/spark-0.0.1-SNAPSHOT-jar-with-dependencies.jar with timestamp 1394515046396
41 14/03/10 22:17:26 INFO client.AppClient$ClientActor: Connecting to masterspark://m1:7077...
42 14/03/10 22:17:26 INFO storage.MemoryStore: ensureFreeSpace(60341) called with curMem=0, maxMem=150837657
43 14/03/10 22:17:26 INFO storage.MemoryStore: Block broadcast_0 stored as values to memory (estimated size 58.9 KB, free 143.8 MB)
44 14/03/10 22:17:26 INFO cluster.SparkDeploySchedulerBackend: Connected to Spark cluster with app ID app-20140310221726-0000
45 14/03/10 22:17:27 INFO client.AppClient$ClientActor: Executor added: app-20140310221726-0000/0 on worker-20140310221648-s1-52544 (s1:52544) with 1 cores
46 14/03/10 22:17:27 INFO cluster.SparkDeploySchedulerBackend: Granted executor ID app-20140310221726-0000/0 on hostPort s1:52544 with 1 cores, 512.0 MB RAM
47 14/03/10 22:17:27 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
48 14/03/10 22:17:27 WARN snappy.LoadSnappy: Snappy native library not loaded
49 14/03/10 22:17:27 INFO client.AppClient$ClientActor: Executor updated: app-20140310221726-0000/0 is now RUNNING
50 14/03/10 22:17:27 INFO mapred.FileInputFormat: Total input paths to process : 1
51 14/03/10 22:17:27 INFO spark.SparkContext: Starting job: collect at IPAddressStats.java:77
52 14/03/10 22:17:27 INFO scheduler.DAGScheduler: Registering RDD 4 (reduceByKey at IPAddressStats.java:70)
53 14/03/10 22:17:27 INFO scheduler.DAGScheduler: Got job 0 (collect at IPAddressStats.java:77) with 1 output partitions (allowLocal=false)
54 14/03/10 22:17:27 INFO scheduler.DAGScheduler: Final stage: Stage 0 (collect at IPAddressStats.java:77)
55 14/03/10 22:17:27 INFO scheduler.DAGScheduler: Parents of final stage: List(Stage 1)
56 14/03/10 22:17:27 INFO scheduler.DAGScheduler: Missing parents: List(Stage 1)
57 14/03/10 22:17:27 INFO scheduler.DAGScheduler: Submitting Stage 1 (MapPartitionsRDD[4] at reduceByKey at IPAddressStats.java:70), which has no missing parents
58 14/03/10 22:17:27 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from Stage 1 (MapPartitionsRDD[4] at reduceByKey at IPAddressStats.java:70)
59 14/03/10 22:17:27 INFO scheduler.TaskSchedulerImpl: Adding task set 1.0 with 1 tasks
60 14/03/10 22:17:28 INFO cluster.SparkDeploySchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@s1:59233/user/Executor#-671170811] with ID 0
61 14/03/10 22:17:28 INFO scheduler.TaskSetManager: Starting task 1.0:0 as TID 0 on executor 0: s1 (PROCESS_LOCAL)
62 14/03/10 22:17:28 INFO scheduler.TaskSetManager: Serialized task 1.0:0 as 2396 bytes in 5 ms
63 14/03/10 22:17:29 INFO storage.BlockManagerMasterActor$BlockManagerInfo: Registering block manager s1:47282 with 297.0 MB RAM
64 14/03/10 22:17:32 INFO scheduler.TaskSetManager: Finished TID 0 in 3376 ms on s1 (progress: 0/1)
65 14/03/10 22:17:32 INFO scheduler.DAGScheduler: Completed ShuffleMapTask(1, 0)
66 14/03/10 22:17:32 INFO scheduler.DAGScheduler: Stage 1 (reduceByKey at IPAddressStats.java:70) finished in 4.420 s
67 14/03/10 22:17:32 INFO scheduler.DAGScheduler: looking for newly runnable stages
68 14/03/10 22:17:32 INFO scheduler.DAGScheduler: running: Set()
69 14/03/10 22:17:32 INFO scheduler.DAGScheduler: waiting: Set(Stage 0)
70 14/03/10 22:17:32 INFO scheduler.DAGScheduler: failed: Set()
71 14/03/10 22:17:32 INFO scheduler.TaskSchedulerImpl: Remove TaskSet 1.0 from pool
72 14/03/10 22:17:32 INFO scheduler.DAGScheduler: Missing parents for Stage 0: List()
73 14/03/10 22:17:32 INFO scheduler.DAGScheduler: Submitting Stage 0 (MapPartitionsRDD[6] at reduceByKey at IPAddressStats.java:70), which is now runnable
74 14/03/10 22:17:32 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from Stage 0 (MapPartitionsRDD[6] at reduceByKey at IPAddressStats.java:70)
75 14/03/10 22:17:32 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
76 14/03/10 22:17:32 INFO scheduler.TaskSetManager: Starting task 0.0:0 as TID 1 on executor 0: s1 (PROCESS_LOCAL)
77 14/03/10 22:17:32 INFO scheduler.TaskSetManager: Serialized task 0.0:0 as 2255 bytes in 1 ms
78 14/03/10 22:17:32 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 0 to spark@s1:33534
79 14/03/10 22:17:32 INFO spark.MapOutputTrackerMaster: Size of output statuses for shuffle 0 is 120 bytes
80 14/03/10 22:17:32 INFO scheduler.TaskSetManager: Finished TID 1 in 282 ms on s1 (progress: 0/1)
81 14/03/10 22:17:32 INFO scheduler.DAGScheduler: Completed ResultTask(0, 0)
82 14/03/10 22:17:32 INFO scheduler.DAGScheduler: Stage 0 (collect at IPAddressStats.java:77) finished in 0.314 s
83 14/03/10 22:17:32 INFO scheduler.TaskSchedulerImpl: Remove TaskSet 0.0 from pool
84 14/03/10 22:17:32 INFO spark.SparkContext: Job finished: collect at IPAddressStats.java:77, took 4.870958309 s
85 14/03/10 22:17:32 INFO job.IPAddressStats: [CN] 58.246.49.218 312
86 14/03/10 22:17:32 INFO job.IPAddressStats: [KR] 1.234.83.77 300
87 14/03/10 22:17:32 INFO job.IPAddressStats: [CN] 120.43.11.16 212
88 14/03/10 22:17:32 INFO job.IPAddressStats: [CN] 110.85.72.254 207
89 14/03/10 22:17:32 INFO job.IPAddressStats: [CN] 27.150.229.134 185
90 14/03/10 22:17:32 INFO job.IPAddressStats: [HK] 180.178.52.181 181
91 14/03/10 22:17:32 INFO job.IPAddressStats: [CN] 120.37.210.212 180
92 14/03/10 22:17:32 INFO job.IPAddressStats: [CN] 222.77.226.83 176
93 14/03/10 22:17:32 INFO job.IPAddressStats: [CN] 120.43.11.205 169
94 14/03/10 22:17:32 INFO job.IPAddressStats: [CN] 120.43.9.19 165
95 ...

我们也可以通过Web控制台来查看当前执行应用程序(Application)的状态信息,通过Master节点的8080端口(如:http://m1:8080/)就能看到集群的应用程序(Application)状态信息。
另外,需要说明的时候,如果在Unix环境下使用Eclipse使用Java开发Spark应用程序,也能够直接通过Eclipse连接Spark集群,并提交开发的应用程序,然后交给集群去处理。

目录
相关文章
|
2月前
|
人工智能 安全 Java
Java和Python在企业中的应用情况
Java和Python在企业中的应用情况
84 7
|
2月前
|
JSON Java Apache
非常实用的Http应用框架,杜绝Java Http 接口对接繁琐编程
UniHttp 是一个声明式的 HTTP 接口对接框架,帮助开发者快速对接第三方 HTTP 接口。通过 @HttpApi 注解定义接口,使用 @GetHttpInterface 和 @PostHttpInterface 等注解配置请求方法和参数。支持自定义代理逻辑、全局请求参数、错误处理和连接池配置,提高代码的内聚性和可读性。
218 3
|
3天前
|
Java 编译器 开发者
Java中的this关键字详解:深入理解与应用
本文深入解析了Java中`this`关键字的多种用法
29 9
|
3天前
|
Java 应用服务中间件 API
【潜意识Java】javaee中的SpringBoot在Java 开发中的应用与详细分析
本文介绍了 Spring Boot 的核心概念和使用场景,并通过一个实战项目演示了如何构建一个简单的 RESTful API。
22 5
|
3天前
|
人工智能 自然语言处理 搜索推荐
【潜意识Java】了解并详细分析Java与AIGC的结合应用和使用方式
本文介绍了如何将Java与AIGC(人工智能生成内容)技术结合,实现智能文本生成。
23 5
|
3天前
|
SQL Java 数据库连接
【潜意识Java】深入理解MyBatis,从基础到高级的深度细节应用
本文详细介绍了MyBatis,一个轻量级的Java持久化框架。内容涵盖MyBatis的基本概念、配置与环境搭建、基础操作(如创建实体类、Mapper接口及映射文件)以及CRUD操作的实现。此外,还深入探讨了高级特性,包括动态SQL和缓存机制。通过代码示例,帮助开发者更好地掌握MyBatis的使用技巧,提升数据库操作效率。总结部分强调了MyBatis的优势及其在实际开发中的应用价值。
11 1
|
30天前
|
安全 算法 Java
Java CAS原理和应用场景大揭秘:你掌握了吗?
CAS(Compare and Swap)是一种乐观锁机制,通过硬件指令实现原子操作,确保多线程环境下对共享变量的安全访问。它避免了传统互斥锁的性能开销和线程阻塞问题。CAS操作包含三个步骤:获取期望值、比较当前值与期望值是否相等、若相等则更新为新值。CAS广泛应用于高并发场景,如数据库事务、分布式锁、无锁数据结构等,但需注意ABA问题。Java中常用`java.util.concurrent.atomic`包下的类支持CAS操作。
64 2
|
2月前
|
人工智能 前端开发 Java
基于开源框架Spring AI Alibaba快速构建Java应用
本文旨在帮助开发者快速掌握并应用 Spring AI Alibaba,提升基于 Java 的大模型应用开发效率和安全性。
274 12
基于开源框架Spring AI Alibaba快速构建Java应用
|
2月前
|
缓存 Java 开发者
Java多线程并发编程:同步机制与实践应用
本文深入探讨Java多线程中的同步机制,分析了多线程并发带来的数据不一致等问题,详细介绍了`synchronized`关键字、`ReentrantLock`显式锁及`ReentrantReadWriteLock`读写锁的应用,结合代码示例展示了如何有效解决竞态条件,提升程序性能与稳定性。
248 6
|
1月前
|
监控 Java 数据库连接
Java线程管理:守护线程与用户线程的区分与应用
在Java多线程编程中,线程可以分为守护线程(Daemon Thread)和用户线程(User Thread)。这两种线程在行为和用途上有着明显的区别,了解它们的差异对于编写高效、稳定的并发程序至关重要。
48 2