开发者社区> 问答> 正文

Apache Flink:ProcessWindowFunction实现

我正在尝试使用Scala在我的Apache Flink项目中使用ProcessWindowFunction。不幸的是,我已经无法实现基本的ProcessWindowFunction,就像在Apache Flink文档中使用它一样。

这是我的代码:

object StreamingJob {
def main(args: Array[String]) {

val env = StreamExecutionEnvironment.getExecutionEnvironment
val eventStream = env.addSource(new OrionSource(9001))
val processedDataStream = eventStream.flatMap(event => event.entities)
.map(entity => (entity.id, entity.attrs("temperature").value.asInstanceOf[String]))

 .keyBy(_._1)
 .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
 .process(new MyProcessWindowFunction())

env.execute("Socket Window NgsiEvent")
}
}

private class MyProcessWindowFunction extends ProcessWindowFunction[(String, String), String, String, TimeWindow] {

def process(key: String, context: Context, input: Iterable[(String, String)], out: Collector[String]): Unit = {
var count: Int = 0
for (in <- input) {

count = count + 1

}
out.collect(s"Window ${context.window} count: $count")
}
}
从IntelliJ我得到以下提示:

1)显示创建新类对象的位置:

Type mismatch, expected: ProcessWindowFunction[(String, String), NotInferedR, String, TimeWindow], actual: MyProcessWindowFunction
2)这直接在课堂上显示:

Class 'MyProcessWindowFunction' must either be declared abstract or implement abstract member 'process(key:KEY, context:ProcessWindowFunction.Context, iterable:Iterable, collector:Collector):void' in 'org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction'
构建代码会显示以下错误:

Error:(51, 16) type mismatch;
found : org.apache.flink.MyProcessWindowFunction
required:
org.apache.flink.streaming.api.scala.function.ProcessWindowFunction[(String, String),?,String,org.apache.flink.streaming.api.windowing.windows.TimeWindow]
.process(new MyProcessWindowFunction())
这些是我使用的进口:

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
import org.apache.flink.streaming.api.windowing.time.Time
import org.fiware.cosmos.orion.flink.connector.{NgsiEvent, OrionSource}
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows
import org.apache.flink.util.Collector

import scala.collection.TraversableOnce

展开
收起
flink小助手 2018-12-11 16:44:58 8876 0
3 条回答
写回答
取消 提交回答
  • 遇到了完全一样的问题,楼主最后咋解决的啊啊啊啊啊啊啊啊

    2020-08-06 15:40:41
    赞同 展开评论 打赏
  • 什么都不懂的小萌新

    怎么解决的啊?

    2019-07-17 23:19:54
    赞同 展开评论 打赏
  • flink小助手会定期更新直播回顾等资料和文章干货,还整合了大家在钉群提出的有关flink的问题及回答。

    该process()方法应具有Unit返回类型。

    它通过out: Collector[String]参数发出记录。

    2019-07-17 23:19:53
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Flink CDC Meetup PPT - 龚中强 立即下载
Flink CDC Meetup PPT - 王赫 立即下载
Flink CDC Meetup PPT - 覃立辉 立即下载

相关镜像