如何在Scala中使用Flink的KafkaSource?
问题似乎是SBT和Maven配置文件不能很好地协同工作。Flink POM将Scala版本(2.10,2.11,...)称为变量,其中一些在构建配置文件中定义。 SBT未正确评估配置文件,因此包装无法正常工作。
使用以下build.sbt:organization := 'pl.japila.kafka'scalaVersion := '2.11.7'
libraryDependencies += 'org.apache.flink' % 'flink-connector-kafka' % '0.9.0' exclude('org.apache.kafka', 'kafka_${scala.binary.version}')libraryDependencies += 'org.apache.kafka' %% 'kafka' % '0.8.2.1'运行如下:import org.apache.flink.streaming.api.environment._import org.apache.flink.streaming.connectors.kafkaimport org.apache.flink.streaming.connectors.kafka.api._import org.apache.flink.streaming.util.serialization._
object TestKafka { def main(args: Array[String]) {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env
.addSource(new KafkaSource[String]('localhost:2181', 'test', new SimpleStringSchema))
.print
}}输出:[kafka-flink]> run[info] Running TestKafkalog4j:WARN No appenders could be found for logger (org.apache.flink.streaming.api.graph.StreamGraph).log4j:WARN Please initialize the log4j system properly.log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.[success] Total time: 0 s, completed Jul 15, 2015 9:29:31 AM
赞0
踩0