开发者社区> 问答> 正文

试图将Fuple写入Flink Kafka接收器

我正在尝试编写一个流媒体应用程序,它既可以读取也可以写入Kafka。我目前有这个,但我必须把我的元组课程串起来。

object StreamingJob {
def main(args: Array[String]) {

// set up the streaming execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment

val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("zookeeper.connect", "localhost:2181")
properties.setProperty("group.id", "test")

val consumer = env.addSource(new FlinkKafkaConsumer08[String]("topic", new SimpleStringSchema(), properties))

val counts = consumer.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
  .map { (_, 1) }
  .keyBy(0)
  .timeWindow(Time.seconds(5))
  .sum(1)

val producer = new FlinkKafkaProducer08[String](
  "localhost:9092",
  "my-topic",
  new SimpleStringSchema())

counts.map(_.toString()).addSink(producer)

env.execute("Window Stream WordCount")
env.execute("Flink Streaming Scala API Skeleton")

}
}
我可以得到最接近的工作如下,但FlinkKafkaProducer08拒绝接受type参数作为构造函数的一部分。

val producer = new FlinkKafkaProducer08[(String, Int)](
"localhost:9092",
"my-topic",
new TypeSerializerOutputFormat[(String, Int)])

counts.addSink(producer)
我想知道是否有办法将元组直接写入我的Kafka接收器。

展开
收起
flink小助手 2018-12-06 18:08:40 2246 0
1 条回答
写回答
取消 提交回答
  • flink小助手会定期更新直播回顾等资料和文章干货,还整合了大家在钉群提出的有关flink的问题及回答。

    你需要一个类似这样的类来序列化你的元组:

    private class SerSchema extends SerializationSchema[Tuple2[String, Int]] {
    override def serialize(tuple2: Tuple2[String, Int]): Array[Byte] = ...
    }

    2019-07-17 23:18:39
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Java Spring Boot开发实战系列课程【第16讲】:Spring Boot 2.0 实战Apache Kafka百万级高并发消息中间件与原理解析 立即下载
MaxCompute技术公开课第四季 之 如何将Kafka数据同步至MaxCompute 立即下载
消息队列kafka介绍 立即下载