开发者社区> 问答> 正文

在apache flink中拆分和加入流

"我想我有一个相当不规范的用例。我想使用以下filter函数将源流分割为多个流:

val dataStream:DataStream[MyEvent] = ...
val s1 = dataStream.filter(...).map(...)
val s2 = dataStream.filter(...).map(...)
我还有一个时间戳提取器(传入的事件将在XML中附加一个时间戳):

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
...

dataStream.assignTimestampsAndWatermarks(new MyTimestampExtractor)
...

class MyTimestampExtractor extends AssignerWithPunctuatedWatermarks[Elem]
{
override def checkAndGetNextWatermark(lastElement:Elem, extractedTimestamp:Long):Watermark = new Watermark(extractedTimestamp)
override def extractTimestamp(element:Elem, previousElementTimestamp:Long):Long = XmlOperations.getDateTime(element, ""@timestamp"").getMillis
}
我之所以选择这种方法,并不是简单地做一个单一的流(val s = dataStream.filter(...).map(...).filter(...).map(...)),因为我想构建一个分割/组合任意流的网络(例如s1 + s2-> c1,s1 + s3-> c2,c2 + s4-> c3, ...)

现在,当通过上面的例子发送事件时,可能是事件E1在s1和s2中都结束了。根据我的理解,这意味着将同一事件E1作为第一个实例放入s1(E1a),并将第二个实例放入s2(E1b)。

所以我现在要做的是将E1a和E1b重新组合成一个组合E1,它类似于E1,它既是s1和s2的转换。

我试过了:
val c1 = s1.join(s2)
.where(_.key).equalTo(_.key)
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.apply((e1a, e2b) => { printf(""Got e1a and e1b""); e1a })
然而,似乎事件永远不会达到应用功能,我无法找到原因。

我的例子有什么问题?我的这种流网络的方法/想法是否会起作用?"

展开
收起
flink小助手 2018-11-28 16:07:09 2297 0
1 条回答
写回答
取消 提交回答
  • flink小助手会定期更新直播回顾等资料和文章干货,还整合了大家在钉群提出的有关flink的问题及回答。

    "你有没有安排水印?使用事件时间时,只有当水印到达时才会触发窗口,使事件时间时钟超过窗口结束。您使用时间戳提取器/水印生成器执行此操作; 有关详细信息,请参阅文档中的示例。

    如果其中一个流有时是空闲的,也可能导致问题,因为空闲流上缺少水印会阻止其连接的任何流的水印。

    根据你要执行的操作,你可能会发现使用CoProcessFunction比使用时间窗口连接更容易。例如,从Flink培训网站上看看有关状态丰富和到期状态的练习。"

    2019-07-17 23:16:49
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Apache Flink技术进阶 立即下载
Apache Spark: Cloud and On-Prem 立即下载
Hybrid Cloud and Apache Spark 立即下载

相关镜像