开发者社区> 问答> 正文

kafkaScala开发问题

祈祷繁星 2017-09-20 23:39:20 3156

package rizhichuli.sparkstreaming
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.SparkConf
import java.util.Properties
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.kafka
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.{Seconds,StreamingContext}
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.util._
//import org.apache.spark.examples.streaming._
import org.apache.kafka.serializer.StringDecoder
import org.apache.spark.streaming.util._
import kafka.utils.VerifiableProperties    //红色的为后加上的,并且有错,无法找到kafka下的utils等包


/**
* @author ${穆金星}
*/
object App {

  def main(args : Array[String]) {
    //if(args.length<4){
    // System.err.println("Usage: KafkaWordCount <zkQuorum> <group> <topics> <numThreads>")
    // System.exit(1)
  //  }
    System.setProperty("hadoop.home.dir", "D:\\appbase\\hadoop-2.7.4\\hadoop-2.7.4")

    val topic=Set("linuxSysteminfos")
    val brokers="10.218.7.232:9092"

    val sparkConf=new SparkConf().setMaster("local[2]").setAppName("App")
    val sc=new SparkContext(sparkConf)
    val ssc = new StreamingContext(sc, Seconds(10))

    //ssc.sparkContext.setLogLevel("ERROR")

    val kafkaParams=Map[String,String]("metadata.broker.list"->brokers,"serializer.class"->"kafka.serializer.StringEnvoder")
    val kafkaStream=KafkaUtils.createDirectStream(ssc,kafkaParams,topic)

    kafkaStream.print()
    ssc.start()
    ssc.awaitTermination()

    }

}








运行后出现的错误是

17/09/20 22:35:10 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.NoSuchMethodException: scala.runtime.Nothing$.<init>(kafka.utils.VerifiableProperties)
at java.lang.Class.getConstructor0(Class.java:3074)
at java.lang.Class.getConstructor(Class.java:1817)
at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.<init>(KafkaRDD.scala:150)
at org.apache.spark.streaming.kafka.KafkaRDD.compute(KafkaRDD.scala:136)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
17/09/20 22:35:11 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.lang.NoSuchMethodException: scala.runtime.Nothing$.<init>(kafka.utils.VerifiableProperties)
at java.lang.Class.getConstructor0(Class.java:3074)
at java.lang.Class.getConstructor(Class.java:1817)
at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.<init>(KafkaRDD.scala:150)
at org.apache.spark.streaming.kafka.KafkaRDD.compute(KafkaRDD.scala:136)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)




添加kafka.utils.VerifiableProperties类后发现

object utils is not a member of package org.apache.kafka




但是确定kafka包已经包含在了maven中且其中有此类,忘各位大神能够帮忙解答
消息中间件 Java Kafka Maven
分享到
取消 提交回答
全部回答(1)
  • 祈祷繁星
    2017-09-21 09:25:58
    自己顶,期待大神的到来
    自己顶,期待大神的到来
    0 0
+ 订阅

构建可靠、高效、易扩展的技术基石

推荐文章
相似问题