我们首先提出这样一个简单的需求:
现在要分析某网站的访问日志信息,统计来自不同IP的用户访问的次数,从而通过Geo信息来获得来访用户所在国家地区分布状况。这里我拿我网站的日志记录行示例,如下所示:
1 |
121.205.198.92 - - [21/Feb/2014:00:00:07 +0800] "GET /archives/417.html HTTP/1.1" 200 11465 "http://shiyanjun.cn/archives/417.html/" "Mozilla/5.0 (Windows NT 5.1; rv:11.0) Gecko/20100101 Firefox/11.0" |
2 |
121.205.198.92 - - [21/Feb/2014:00:00:11 +0800] "POST /wp-comments-post.php HTTP/1.1" 302 26 "http://shiyanjun.cn/archives/417.html/" "Mozilla/5.0 (Windows NT 5.1; rv:23.0) Gecko/20100101 Firefox/23.0" |
3 |
121.205.198.92 - - [21/Feb/2014:00:00:12 +0800] "GET /archives/417.html/ HTTP/1.1" 301 26 "http://shiyanjun.cn/archives/417.html/" "Mozilla/5.0 (Windows NT 5.1; rv:11.0) Gecko/20100101 Firefox/11.0" |
4 |
121.205.198.92 - - [21/Feb/2014:00:00:12 +0800] "GET /archives/417.html HTTP/1.1" 200 11465 "http://shiyanjun.cn/archives/417.html" "Mozilla/5.0 (Windows NT 5.1; rv:11.0) Gecko/20100101 Firefox/11.0" |
5 |
121.205.241.229 - - [21/Feb/2014:00:00:13 +0800] "GET /archives/526.html HTTP/1.1" 200 12080 "http://shiyanjun.cn/archives/526.html/" "Mozilla/5.0 (Windows NT 5.1; rv:11.0) Gecko/20100101 Firefox/11.0" |
6 |
121.205.241.229 - - [21/Feb/2014:00:00:15 +0800] "POST /wp-comments-post.php HTTP/1.1" 302 26 "http://shiyanjun.cn/archives/526.html/" "Mozilla/5.0 (Windows NT 5.1; rv:23.0) Gecko/20100101 Firefox/23.0" |
Java实现Spark应用程序(Application)
我们实现的统计分析程序,有如下几个功能点:
- 从HDFS读取日志数据文件
- 将每行的第一个字段(IP地址)抽取出来
- 统计每个IP地址出现的次数
- 根据每个IP地址出现的次数进行一个降序排序
- 根据IP地址,调用GeoIP库获取IP所属国家
- 打印输出结果,每行的格式:[国家代码] IP地址 频率
下面,看我们使用Java实现的统计分析应用程序代码,如下所示:
001 |
package org.shirdrn.spark.job; |
004 |
import java.io.IOException; |
005 |
import java.util.Arrays; |
006 |
import java.util.Collections; |
007 |
import java.util.Comparator; |
008 |
import java.util.List; |
009 |
import java.util.regex.Pattern; |
011 |
import org.apache.commons.logging.Log; |
012 |
import org.apache.commons.logging.LogFactory; |
013 |
import org.apache.spark.api.java.JavaPairRDD; |
014 |
import org.apache.spark.api.java.JavaRDD; |
015 |
import org.apache.spark.api.java.JavaSparkContext; |
016 |
import org.apache.spark.api.java.function.FlatMapFunction; |
017 |
import org.apache.spark.api.java.function.Function2; |
018 |
import org.apache.spark.api.java.function.PairFunction; |
019 |
import org.shirdrn.spark.job.maxmind.Country; |
020 |
import org.shirdrn.spark.job.maxmind.LookupService; |
022 |
import scala.Serializable; |
025 |
public class IPAddressStats implements Serializable { |
027 |
private static final long serialVersionUID = 8533489548835413763L; |
028 |
private static final Log LOG = LogFactory.getLog(IPAddressStats. class ); |
029 |
private static final Pattern SPACE = Pattern.compile( " " ); |
030 |
private transient LookupService lookupService; |
031 |
private transient final String geoIPFile; |
033 |
public IPAddressStats(String geoIPFile) { |
034 |
this .geoIPFile = geoIPFile; |
037 |
File file = new File( this .geoIPFile); |
038 |
LOG.info( "GeoIP file: " + file.getAbsolutePath()); |
039 |
lookupService = new AdvancedLookupService(file, LookupService.GEOIP_MEMORY_CACHE); |
040 |
} catch (IOException e) { |
041 |
throw new RuntimeException(e); |
045 |
@SuppressWarnings ( "serial" ) |
046 |
public void stat(String[] args) { |
047 |
JavaSparkContext ctx = new JavaSparkContext(args[ 0 ], "IPAddressStats" , |
048 |
System.getenv( "SPARK_HOME" ), JavaSparkContext.jarOfClass(IPAddressStats. class )); |
049 |
JavaRDD<String> lines = ctx.textFile(args[ 1 ], 1 ); |
052 |
JavaRDD<String> words = lines.flatMap( new FlatMapFunction<String, String>() { |
054 |
public Iterable<String> call(String s) { |
057 |
return Arrays.asList(SPACE.split(s)[ 0 ]); |
062 |
JavaPairRDD<String, Integer> ones = words.map( new PairFunction<String, String, Integer>() { |
064 |
public Tuple2<String, Integer> call(String s) { |
065 |
return new Tuple2<String, Integer>(s, 1 ); |
070 |
JavaPairRDD<String, Integer> counts = ones.reduceByKey( new Function2<Integer, Integer, Integer>() { |
072 |
public Integer call(Integer i1, Integer i2) { |
077 |
List<Tuple2<String, Integer>> output = counts.collect(); |
080 |
Collections.sort(output, new Comparator<Tuple2<String, Integer>>() { |
082 |
public int compare(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) { |
085 |
} else if (t1._2 > t2._2) { |
092 |
writeTo(args, output); |
096 |
private void writeTo(String[] args, List<Tuple2<String, Integer>> output) { |
097 |
for (Tuple2<?, ?> tuple : output) { |
098 |
Country country = lookupService.getCountry((String) tuple._1); |
099 |
LOG.info( "[" + country.getCode() + "] " + tuple._1 + "\t" + tuple._2); |
103 |
public static void main(String[] args) { |
105 |
if (args.length < 3 ) { |
106 |
System.err.println( "Usage: IPAddressStats <master> <inFile> <GeoIPFile>" ); |
111 |
String geoIPFile = args[ 2 ]; |
112 |
IPAddressStats stats = new IPAddressStats(geoIPFile); |
具体实现逻辑,可以参考代码中的注释。我们使用Maven管理构建Java程序,首先看一下我的pom配置中所依赖的软件包,如下所示:
03 |
< groupId >org.apache.spark</ groupId > |
04 |
< artifactId >spark-core_2.10</ artifactId > |
05 |
< version >0.9.0-incubating</ version > |
08 |
< groupId >log4j</ groupId > |
09 |
< artifactId >log4j</ artifactId > |
10 |
< version >1.2.16</ version > |
13 |
< groupId >dnsjava</ groupId > |
14 |
< artifactId >dnsjava</ artifactId > |
15 |
< version >2.1.1</ version > |
18 |
< groupId >commons-net</ groupId > |
19 |
< artifactId >commons-net</ artifactId > |
20 |
< version >3.1</ version > |
23 |
< groupId >org.apache.hadoop</ groupId > |
24 |
< artifactId >hadoop-client</ artifactId > |
25 |
< version >1.2.1</ version > |
需要说明的是,当我们将程序在Spark集群上运行时,它要求我们的编写的Job能够进行序列化,如果某些字段不需要序列化或者无法序列化,可以直接使用transient修饰即可,如上面的属性lookupService没有实现序列化接口,使用transient使其不执行序列化,否则的话,可能会出现类似如下的错误:
01 |
14/03/10 22:34:06 INFO scheduler.DAGScheduler: Failed to run collect at IPAddressStats.java:76 |
02 |
Exception in thread "main" org.apache.spark.SparkException: Job aborted: Task not serializable: java.io.NotSerializableException: org.shirdrn.spark.job.IPAddressStats |
03 |
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028) |
04 |
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026) |
05 |
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) |
06 |
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) |
07 |
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026) |
08 |
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:794) |
09 |
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:737) |
10 |
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:741) |
11 |
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:740) |
12 |
at scala.collection.immutable.List.foreach(List.scala:318) |
13 |
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:740) |
14 |
at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:569) |
15 |
at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207) |
16 |
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) |
17 |
at akka.actor.ActorCell.invoke(ActorCell.scala:456) |
18 |
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) |
19 |
at akka.dispatch.Mailbox.run(Mailbox.scala:219) |
20 |
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) |
21 |
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) |
22 |
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) |
23 |
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) |
24 |
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) |
在Spark集群上运行Java程序
这里,我使用了Maven管理构建Java程序,实现上述代码以后,使用Maven的maven-assembly-plugin插件,配置内容如下所示:
02 |
< artifactId >maven-assembly-plugin</ artifactId > |
06 |
< mainClass >org.shirdrn.spark.job.UserAgentStats</ mainClass > |
10 |
< descriptorRef >jar-with-dependencies</ descriptorRef > |
13 |
< exclude >*.properties</ exclude > |
14 |
< exclude >*.xml</ exclude > |
19 |
< id >make-assembly</ id > |
20 |
< phase >package</ phase > |
将相关依赖库文件都打进程序包里面,最后拷贝JAR文件到Linux系统下(不一定非要在Spark集群的Master节点上),保证该节点上Spark的环境变量配置正确即可看。Spark软件发行包解压缩后,可以看到脚本bin/run-example,我们可以直接修改该脚本,将对应的路径指向我们实现的Java程序包(修改变量EXAMPLES_DIR以及我们的JAR文件存放位置相关的内容),使用该脚本就可以运行,脚本内容如下所示:
03 |
CYGWIN*) cygwin= true ;; |
09 |
FWDIR= "$(cd `dirname $0`/..; pwd)" |
12 |
export SPARK_HOME= "$FWDIR" |
15 |
if [ -e "$FWDIR/conf/spark-env.sh" ] ; then |
16 |
. $FWDIR/conf/spark- env .sh |
20 |
echo "Usage: run-example <example-class> [<args>]" >&2 |
26 |
EXAMPLES_DIR= "$FWDIR" /java-examples |
28 |
if [ -e "$EXAMPLES_DIR" /*.jar ]; then |
29 |
export SPARK_EXAMPLES_JAR=` ls "$EXAMPLES_DIR" /*.jar` |
31 |
if [[ -z $SPARK_EXAMPLES_JAR ]]; then |
32 |
echo "Failed to find Spark examples assembly in $FWDIR/examples/target" >&2 |
33 |
echo "You need to build Spark with sbt/sbt assembly before running this program" >&2 |
40 |
CLASSPATH=`$FWDIR/bin/compute-classpath.sh` |
41 |
CLASSPATH= "$SPARK_EXAMPLES_JAR:$CLASSPATH" |
44 |
CLASSPATH=`cygpath -wp $CLASSPATH` |
45 |
export SPARK_EXAMPLES_JAR=`cygpath -w $SPARK_EXAMPLES_JAR` |
49 |
if [ -n "${JAVA_HOME}" ]; then |
50 |
RUNNER= "${JAVA_HOME}/bin/java" |
52 |
if [ ` command - v java` ]; then |
55 |
echo "JAVA_HOME is not set" >&2 |
61 |
JAVA_OPTS= "$SPARK_JAVA_OPTS" |
62 |
JAVA_OPTS= "$JAVA_OPTS -Djava.library.path=$SPARK_LIBRARY_PATH" |
64 |
if [ -e "$FWDIR/conf/java-opts" ] ; then |
65 |
JAVA_OPTS= "$JAVA_OPTS `cat $FWDIR/conf/java-opts`" |
69 |
if [ "$SPARK_PRINT_LAUNCH_COMMAND" == "1" ]; then |
70 |
echo -n "Spark Command: " |
71 |
echo "$RUNNER" - cp "$CLASSPATH" $JAVA_OPTS "$@" |
72 |
echo "========================================" |
76 |
exec "$RUNNER" - cp "$CLASSPATH" $JAVA_OPTS "$@" |
在Spark上运行我们开发的Java程序,执行如下命令:
1 |
cd /home/shirdrn/cloud/programs/spark-0.9.0-incubating-bin-hadoop1 |
我实现的程序类org.shirdrn.spark.job.IPAddressStats运行需要3个参数:
- Spark集群主节点URL:例如我的是spark://m1:7077
- 输入文件路径:业务相关的,我这里是从HDFS上读取文件hdfs://m1:9000/user/shirdrn/wwwlog20140222.log
- GeoIP库文件:业务相关的,用来计算IP地址所属国家的外部文件
如果程序没有错误,能够正常运行,控制台输出程序运行日志,示例如下所示:
01 |
14/03/10 22:17:24 INFO job.IPAddressStats: GeoIP file: /home/shirdrn/cloud/programs/spark-0.9.0-incubating-bin-hadoop1/java-examples/GeoIP_DATABASE.dat |
02 |
SLF4J: Class path contains multiple SLF4J bindings. |
03 |
SLF4J: Found binding in [jar:file:/home/shirdrn/cloud/programs/spark-0.9.0-incubating-bin-hadoop1/java-examples/spark-0.0.1-SNAPSHOT-jar-with-dependencies.jar!/org/slf4j/impl/StaticLoggerBinder.class] |
04 |
SLF4J: Found binding in [jar:file:/home/shirdrn/cloud/programs/spark-0.9.0-incubating-bin-hadoop1/assembly/target/scala-2.10/spark-assembly_2.10-0.9.0-incubating-hadoop1.0.4.jar!/org/slf4j/impl/StaticLoggerBinder.class] |
06 |
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] |
07 |
14/03/10 22:17:25 INFO slf4j.Slf4jLogger: Slf4jLogger started |
08 |
14/03/10 22:17:25 INFO Remoting: Starting remoting |
09 |
14/03/10 22:17:25 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://spark@m1:57379] |
11 |
14/03/10 22:17:25 INFO spark.SparkEnv: Registering BlockManagerMaster |
12 |
14/03/10 22:17:25 INFO storage.DiskBlockManager: Created local directory at /tmp/spark-local-20140310221725-c1cb |
13 |
14/03/10 22:17:25 INFO storage.MemoryStore: MemoryStore started with capacity 143.8 MB. |
14 |
14/03/10 22:17:25 INFO network.ConnectionManager: Bound socket to port 45189 with id = ConnectionManagerId(m1,45189) |
15 |
14/03/10 22:17:25 INFO storage.BlockManagerMaster: Trying to register BlockManager |
16 |
14/03/10 22:17:25 INFO storage.BlockManagerMasterActor$BlockManagerInfo: Registering block manager m1:45189 with 143.8 MB RAM |
17 |
14/03/10 22:17:25 INFO storage.BlockManagerMaster: Registered BlockManager |
18 |
14/03/10 22:17:25 INFO spark.HttpServer: Starting HTTP Server |
19 |
14/03/10 22:17:25 INFO server.Server: jetty-7.x.y-SNAPSHOT |
20 |
14/03/10 22:17:25 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:49186 |
22 |
14/03/10 22:17:25 INFO spark.SparkEnv: Registering MapOutputTracker |
23 |
14/03/10 22:17:25 INFO spark.HttpFileServer: HTTP File server directory is /tmp/spark-56c3e30d-a01b-4752-83d1-af1609ab2370 |
24 |
14/03/10 22:17:25 INFO spark.HttpServer: Starting HTTP Server |
25 |
14/03/10 22:17:25 INFO server.Server: jetty-7.x.y-SNAPSHOT |
26 |
14/03/10 22:17:25 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:52073 |
27 |
14/03/10 22:17:26 INFO server.Server: jetty-7.x.y-SNAPSHOT |
28 |
14/03/10 22:17:26 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/storage/rdd,null} |
29 |
14/03/10 22:17:26 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/storage,null} |
30 |
14/03/10 22:17:26 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/stages/stage,null} |
31 |
14/03/10 22:17:26 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/stages/pool,null} |
32 |
14/03/10 22:17:26 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/stages,null} |
33 |
14/03/10 22:17:26 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/environment,null} |
34 |
14/03/10 22:17:26 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/executors,null} |
35 |
14/03/10 22:17:26 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/metrics/json,null} |
36 |
14/03/10 22:17:26 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/static,null} |
37 |
14/03/10 22:17:26 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/,null} |
38 |
14/03/10 22:17:26 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040 |
39 |
14/03/10 22:17:26 INFO ui.SparkUI: Started Spark Web UI at http://m1:4040 |
41 |
14/03/10 22:17:26 INFO client.AppClient$ClientActor: Connecting to masterspark://m1:7077... |
42 |
14/03/10 22:17:26 INFO storage.MemoryStore: ensureFreeSpace(60341) called with curMem=0, maxMem=150837657 |
43 |
14/03/10 22:17:26 INFO storage.MemoryStore: Block broadcast_0 stored as values to memory (estimated size 58.9 KB, free 143.8 MB) |
44 |
14/03/10 22:17:26 INFO cluster.SparkDeploySchedulerBackend: Connected to Spark cluster with app ID app-20140310221726-0000 |
45 |
14/03/10 22:17:27 INFO client.AppClient$ClientActor: Executor added: app-20140310221726-0000/0 on worker-20140310221648-s1-52544 (s1:52544) with 1 cores |
46 |
14/03/10 22:17:27 INFO cluster.SparkDeploySchedulerBackend: Granted executor ID app-20140310221726-0000/0 on hostPort s1:52544 with 1 cores, 512.0 MB RAM |
47 |
14/03/10 22:17:27 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable |
48 |
14/03/10 22:17:27 WARN snappy.LoadSnappy: Snappy native library not loaded |
49 |
14/03/10 22:17:27 INFO client.AppClient$ClientActor: Executor updated: app-20140310221726-0000/0 is now RUNNING |
50 |
14/03/10 22:17:27 INFO mapred.FileInputFormat: Total input paths to process : 1 |
51 |
14/03/10 22:17:27 INFO spark.SparkContext: Starting job: collect at IPAddressStats.java:77 |
52 |
14/03/10 22:17:27 INFO scheduler.DAGScheduler: Registering RDD 4 (reduceByKey at IPAddressStats.java:70) |
53 |
14/03/10 22:17:27 INFO scheduler.DAGScheduler: Got job 0 (collect at IPAddressStats.java:77) with 1 output partitions (allowLocal=false) |
54 |
14/03/10 22:17:27 INFO scheduler.DAGScheduler: Final stage: Stage 0 (collect at IPAddressStats.java:77) |
55 |
14/03/10 22:17:27 INFO scheduler.DAGScheduler: Parents of final stage: List(Stage 1) |
56 |
14/03/10 22:17:27 INFO scheduler.DAGScheduler: Missing parents: List(Stage 1) |
57 |
14/03/10 22:17:27 INFO scheduler.DAGScheduler: Submitting Stage 1 (MapPartitionsRDD[4] at reduceByKey at IPAddressStats.java:70), which has no missing parents |
58 |
14/03/10 22:17:27 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from Stage 1 (MapPartitionsRDD[4] at reduceByKey at IPAddressStats.java:70) |
59 |
14/03/10 22:17:27 INFO scheduler.TaskSchedulerImpl: Adding task set 1.0 with 1 tasks |
61 |
14/03/10 22:17:28 INFO scheduler.TaskSetManager: Starting task 1.0:0 as TID 0 on executor 0: s1 (PROCESS_LOCAL) |
62 |
14/03/10 22:17:28 INFO scheduler.TaskSetManager: Serialized task 1.0:0 as 2396 bytes in 5 ms |
63 |
14/03/10 22:17:29 INFO storage.BlockManagerMasterActor$BlockManagerInfo: Registering block manager s1:47282 with 297.0 MB RAM |
64 |
14/03/10 22:17:32 INFO scheduler.TaskSetManager: Finished TID 0 in 3376 ms on s1 (progress: 0/1) |
65 |
14/03/10 22:17:32 INFO scheduler.DAGScheduler: Completed ShuffleMapTask(1, 0) |
66 |
14/03/10 22:17:32 INFO scheduler.DAGScheduler: Stage 1 (reduceByKey at IPAddressStats.java:70) finished in 4.420 s |
67 |
14/03/10 22:17:32 INFO scheduler.DAGScheduler: looking for newly runnable stages |
68 |
14/03/10 22:17:32 INFO scheduler.DAGScheduler: running: Set() |
69 |
14/03/10 22:17:32 INFO scheduler.DAGScheduler: waiting: Set(Stage 0) |
70 |
14/03/10 22:17:32 INFO scheduler.DAGScheduler: failed: Set() |
71 |
14/03/10 22:17:32 INFO scheduler.TaskSchedulerImpl: Remove TaskSet 1.0 from pool |
72 |
14/03/10 22:17:32 INFO scheduler.DAGScheduler: Missing parents for Stage 0: List() |
73 |
14/03/10 22:17:32 INFO scheduler.DAGScheduler: Submitting Stage 0 (MapPartitionsRDD[6] at reduceByKey at IPAddressStats.java:70), which is now runnable |
74 |
14/03/10 22:17:32 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from Stage 0 (MapPartitionsRDD[6] at reduceByKey at IPAddressStats.java:70) |
75 |
14/03/10 22:17:32 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0 with 1 tasks |
76 |
14/03/10 22:17:32 INFO scheduler.TaskSetManager: Starting task 0.0:0 as TID 1 on executor 0: s1 (PROCESS_LOCAL) |
77 |
14/03/10 22:17:32 INFO scheduler.TaskSetManager: Serialized task 0.0:0 as 2255 bytes in 1 ms |
78 |
14/03/10 22:17:32 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 0 to spark@s1:33534 |
79 |
14/03/10 22:17:32 INFO spark.MapOutputTrackerMaster: Size of output statuses for shuffle 0 is 120 bytes |
80 |
14/03/10 22:17:32 INFO scheduler.TaskSetManager: Finished TID 1 in 282 ms on s1 (progress: 0/1) |
81 |
14/03/10 22:17:32 INFO scheduler.DAGScheduler: Completed ResultTask(0, 0) |
82 |
14/03/10 22:17:32 INFO scheduler.DAGScheduler: Stage 0 (collect at IPAddressStats.java:77) finished in 0.314 s |
83 |
14/03/10 22:17:32 INFO scheduler.TaskSchedulerImpl: Remove TaskSet 0.0 from pool |
84 |
14/03/10 22:17:32 INFO spark.SparkContext: Job finished: collect at IPAddressStats.java:77, took 4.870958309 s |
85 |
14/03/10 22:17:32 INFO job.IPAddressStats: [CN] 58.246.49.218 312 |
86 |
14/03/10 22:17:32 INFO job.IPAddressStats: [KR] 1.234.83.77 300 |
87 |
14/03/10 22:17:32 INFO job.IPAddressStats: [CN] 120.43.11.16 212 |
88 |
14/03/10 22:17:32 INFO job.IPAddressStats: [CN] 110.85.72.254 207 |
89 |
14/03/10 22:17:32 INFO job.IPAddressStats: [CN] 27.150.229.134 185 |
90 |
14/03/10 22:17:32 INFO job.IPAddressStats: [HK] 180.178.52.181 181 |
91 |
14/03/10 22:17:32 INFO job.IPAddressStats: [CN] 120.37.210.212 180 |
92 |
14/03/10 22:17:32 INFO job.IPAddressStats: [CN] 222.77.226.83 176 |
93 |
14/03/10 22:17:32 INFO job.IPAddressStats: [CN] 120.43.11.205 169 |
94 |
14/03/10 22:17:32 INFO job.IPAddressStats: [CN] 120.43.9.19 165 |
我们也可以通过Web控制台来查看当前执行应用程序(Application)的状态信息,通过Master节点的8080端口(如:http://m1:8080/)就能看到集群的应用程序(Application)状态信息。
另外,需要说明的时候,如果在Unix环境下使用Eclipse使用Java开发Spark应用程序,也能够直接通过Eclipse连接Spark集群,并提交开发的应用程序,然后交给集群去处理。