开发者社区> 问答> 正文

如何在Scala中使用Flink的KafkaSource?

尝试用Flink的KafkaSource运行一个简单的测试程序,内容如下:
Flink 0.9
Scala 2.10.4
Kafka 0.8.2.1
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka

object TestKafka {
def main(args: Array[String]) {

val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env
 .addSource(new KafkaSource[String]("localhost:2181", "test", new SimpleStringSchema))
 .print

}
}
报错:
[ERROR] TestKafka.scala:8: error: not found: type KafkaSource
[ERROR] .addSource(new KafkaSourceString)

展开
收起
【方向】 2018-11-17 18:19:53 3757 0
1 条回答
写回答
取消 提交回答
  • 欢迎各位对内容方向及质量提需求,我们尽量满足,将国外优质的内容呈现给大家!

    问题似乎是SBT和Maven配置文件不能很好地协同工作。Flink POM将Scala版本(2.10,2.11,...)称为变量,其中一些在构建配置文件中定义。 SBT未正确评估配置文件,因此包装无法正常工作。

    使用以下build.sbt:
    organization := "pl.japila.kafka"
    scalaVersion := "2.11.7"

    libraryDependencies += "org.apache.flink" % "flink-connector-kafka" % "0.9.0" exclude("org.apache.kafka", "kafka_${scala.binary.version}")
    libraryDependencies += "org.apache.kafka" %% "kafka" % "0.8.2.1"
    运行如下:
    import org.apache.flink.streaming.api.environment._
    import org.apache.flink.streaming.connectors.kafka
    import org.apache.flink.streaming.connectors.kafka.api._
    import org.apache.flink.streaming.util.serialization._

    object TestKafka {
    def main(args: Array[String]) {

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val stream = env
     .addSource(new KafkaSource[String]("localhost:2181", "test", new SimpleStringSchema))
     .print

    }
    }
    输出:
    [kafka-flink]> run
    [info] Running TestKafka
    log4j:WARN No appenders could be found for logger (org.apache.flink.streaming.api.graph.StreamGraph).
    log4j:WARN Please initialize the log4j system properly.
    log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
    [success] Total time: 0 s, completed Jul 15, 2015 9:29:31 AM

    2019-07-17 23:15:10
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Flink峰会 - 徐榜江 立即下载
Just Enough Scala for Spark 立即下载
JDK8新特性与生产-for“华东地区scala爱好者聚会” 立即下载