我正在尝试编写一个流媒体应用程序,它既可以读取也可以写入Kafka。我目前有这个,但我必须把我的元组课程串起来。
object StreamingJob {
def main(args: Array[String]) {
// set up the streaming execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("zookeeper.connect", "localhost:2181")
properties.setProperty("group.id", "test")
val consumer = env.addSource(new FlinkKafkaConsumer08[String]("topic", new SimpleStringSchema(), properties))
val counts = consumer.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
.map { (_, 1) }
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1)
val producer = new FlinkKafkaProducer08[String](
"localhost:9092",
"my-topic",
new SimpleStringSchema())
counts.map(_.toString()).addSink(producer)
env.execute("Window Stream WordCount")
env.execute("Flink Streaming Scala API Skeleton")
}
}
我可以得到最接近的工作如下,但FlinkKafkaProducer08拒绝接受type参数作为构造函数的一部分。
val producer = new FlinkKafkaProducer08[(String, Int)](
"localhost:9092",
"my-topic",
new TypeSerializerOutputFormat[(String, Int)])
counts.addSink(producer)
我想知道是否有办法将元组直接写入我的Kafka接收器。
你需要一个类似这样的类来序列化你的元组:
private class SerSchema extends SerializationSchema[Tuple2[String, Int]] {
override def serialize(tuple2: Tuple2[String, Int]): Array[Byte] = ...
}
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。