在idea上使用scala插件,创建scala-maven工程成功运行,打jar包出错
代码:
package com.chanct.idap.ssm.spark
import com.chanct.idap.ssm.common.{HbaseUtils, PlatformConfig}
import org.apache.hadoop.hbase.TableName
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* Created by lichao on 16-4-4.
*
*/
object KafKa2SparkStreaming2Hbase {
def main(args: Array[String]) {
val zkQuorum = "c1d8:2181,c1d9:2181,c1d10:2181"
val group = "1"
val topics = "scala_api_topic"
val numThreads = 2
// val sparkConf = new SparkConf().setAppName("KafkaWordCount2Hbase").setMaster("local[2]").set("spark.eventLog.overwrite","true")
val sparkConf = new SparkConf().setAppName("KafkaWordCount2Hbase").set("spark.eventLog.overwrite","true")
val ssc = new StreamingContext(sparkConf, Seconds(5))
ssc.checkpoint("checkpoint")
val topicpMap = topics.split(",").map((_, numThreads.toInt)).toMap
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2)
val words = lines.flatMap(_.split(" "))
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
wordCounts.print()
wordCounts.foreachRDD {
rdd => rdd.foreachPartition {
partition =>
val conf = PlatformConfig.loadHbaseConf() //加载hbase配置文件
val conn = HbaseUtils.getConnection(conf) //获取hbase连接
val userTable = TableName.valueOf("WCTest")
val table = conn.getTable(userTable)
partition.foreach {
w =>
try {
val put = new Put(Bytes.toBytes(System.currentTimeMillis().toString))
put.addColumn(Bytes.toBytes("A"), Bytes.toBytes("value"), Bytes.toBytes(w._1.toString))
put.addColumn(Bytes.toBytes("A"), Bytes.toBytes("count"), Bytes.toBytes(w._2.toString))
table.put(put)
} catch {
case _: Exception => println("raw error!")
}
}
table.close()
conn.close()
}
}
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}
pom文件
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.chanct.idap</groupId>
<artifactId>ssm</artifactId>
<packaging>jar</packaging>
<version>1.0-SNAPSHOT</version>
<inceptionYear>2016</inceptionYear>
<properties>
<scala.version>2.10.4</scala.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<!-- <repositories>
<repository>
<id>scala-tools.org</id>
<name>Scala-Tools Maven2 Repository</name>
<url>http://scala-tools.org/repo-releases</url>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
<id>scala-tools.org</id>
<name>Scala-Tools Maven2 Repository</name>
<url>http://scala-tools.org/repo-releases</url>
</pluginRepository>
</pluginRepositories>-->
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.6.0-cdh5.4.5</version>
<exclusions>
<exclusion>
<groupId>javax.servlet.jsp</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<artifactId>servlet-api</artifactId>
<groupId>javax.servlet</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>2.6.0-mr1-cdh5.4.5</version>
<exclusions>
<exclusion>
<groupId>javax.servlet.jsp</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<artifactId>servlet-api</artifactId>
<groupId>javax.servlet</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase</artifactId>
<version>1.0.0-cdh5.4.5</version>
<type>pom</type>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>1.0.0-cdh5.4.5</version>
<exclusions>
<exclusion>
<artifactId>servlet-api-2.5</artifactId>
<groupId>org.mortbay.jetty</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.4</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.specs</groupId>
<artifactId>specs</artifactId>
<version>1.2.5</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-assembly_2.10</artifactId>
<version>1.3.0-cdh5.4.5</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.8.2.0</version>
</dependency>
<dependency>
<groupId>org.codehaus.jettison</groupId>
<artifactId>jettison</artifactId>
<version>1.3.7</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-yarn_2.10</artifactId>
<version>1.3.0-cdh5.4.5</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.3.0-cdh5.4.5</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>1.3.0-cdh5.4.5</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.8.2.0</version>
</dependency>
</dependencies>
<build>
<!-- <sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>-->
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
<configuration>
<scalaVersion>${scala.version}</scalaVersion>
<args>
<arg>-target:jvm-1.7</arg>
</args>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.7</source>
<target>1.7</target>
</configuration>
</plugin>
<!--<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-eclipse-plugin</artifactId>
<configuration>
<downloadSources>true</downloadSources>
<buildcommands>
<buildcommand>ch.epfl.lamp.sdt.core.scalabuilder</buildcommand>
</buildcommands>
<additionalProjectnatures>
<projectnature>ch.epfl.lamp.sdt.core.scalanature</projectnature>
</additionalProjectnatures>
<classpathContainers>
<classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
<classpathContainer>ch.epfl.lamp.sdt.launching.SCALA_CONTAINER</classpathContainer>
</classpathContainers>
</configuration>
</plugin>-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
</transformer>
</transformers>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<!-- <reporting>
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<configuration>
<scalaVersion>${scala.version}</scalaVersion>
</configuration>
</plugin>
</plugins>
</reporting>-->
</project> maven打包方法:

spark 提交命令:
spark-submit --class com.chanct.idap.ssm.spark.KafKa2SparkStreaming2Hbase --master yarn-cluster --num-executors 3 --driver-memory 1g --executor-memory 512m --executor-cores 1 /home/lc/ssm-1.0-SNAPSHOT.jar
最后的错误:
Exception in thread "Thread-63" org.apache.spark.SparkException: Job cancelled because SparkContext was shut down at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:699) at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:698) at scala.collection.mutable.HashSet.foreach(HashSet.scala:79) at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:698) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:1411) at org.apache.spark.util.EventLoop.stop(EventLoop.scala:81) at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1346) at org.apache.spark.SparkContext.stop(SparkContext.scala:1386) at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$3.run(ApplicationMaster.scala:107) at org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:54) 16/04/12 10:33:55 INFO cluster.YarnClusterSchedulerBackend: Shutting down all executors 16/04/12 10:33:55 INFO cluster.YarnClusterSchedulerBackend: Asking each executor to shut down 16/04/12 10:33:55 INFO scheduler.OutputCommitCoordinator$OutputCommitCoordinatorActor: OutputCommitCoordinator stopped! 16/04/12 10:33:55 INFO spark.MapOutputTrackerMasterActor: MapOutputTrackerActor stopped! 16/04/12 10:33:55 INFO storage.MemoryStore: MemoryStore cleared 16/04/12 10:33:55 INFO storage.BlockManager: BlockManager stopped 16/04/12 10:33:55 INFO storage.BlockManagerMaster: BlockManagerMaster stopped 16/04/12 10:33:55 INFO remote.RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon. 16/04/12 10:33:55 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports. 16/04/12 10:33:55 WARN scheduler.EventLoggingListener: Event log hdfs://nameservice1/user/spark/applicationHistory/application_1460352776638_0056 already exists. Overwriting... 16/04/12 10:33:55 INFO Remoting: Remoting shut down 16/04/12 10:33:55 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remoting shut down. 16/04/12 10:33:55 INFO spark.SparkContext: Successfully stopped SparkContext 16/04/12 10:33:55 INFO yarn.ApplicationMaster: Unregistering ApplicationMaster with FAILED (diag message: User class threw exception: Job aborted due to stage failure: Task 2 in stage 7.0 failed 4 times, most recent failure: Lost task 2.3 in stage 7.0 (TID 84, c1d3): java.lang.NullPointerException at com.chanct.idap.ssm.spark.KafKa2SparkStreaming2Hbase$$anonfun$main$1$$anonfun$apply$1.apply(KafKa2SparkStreaming2Hbase.scala:47) at com.chanct.idap.ssm.spark.KafKa2SparkStreaming2Hbase$$anonfun$main$1$$anonfun$apply$1.apply(KafKa2SparkStreaming2Hbase.scala:42) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:806) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:806) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1503) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1503) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Driver stacktrace:) 16/04/12 10:33:55 INFO impl.AMRMClientImpl: Waiting for application to be successfully unregistered. 16/04/12 10:33:55 INFO yarn.ApplicationMaster: Deleting staging directory .sparkStaging/application_1460352776638_0056
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
问题解决了,spark与hbase衔接由于版本的问题,会有jar包找不到的问题,在spark配置中加入spark.executor.extraClassPath=/opt/cloudera/parcels/CDH/lib/hbase/lib/htrace-core-3.1.0-incubating.jar
在spark提交任务的时候:spark-submit --master yarn-cluster --driver-class-path /etc/hbase/conf:/opt/cloudera/parcels/CDH/lib/hbase/lib/htrace-core-3.1.0-incubating.jar ....
http://community.cloudera.com/t5/Apache-Hadoop-Concepts-and/Class-not-found-running-Spark-sample-hbase-importformat-py-in/td-p/27084
终于搞完了,花了一天时间还是很有收获的