"我有2个数据流(例如)
ts | device | custId | temp
1 | 'device1'| 1 | 10
1 | 'device2'| 4 | 7
2 | 'device1'| 1 | 10
3 | 'device1'| 1 | 10
4 | 'device1'| 1 | 10
5 | 'device2'| 4 | 10
我创建了一个CEP模式,我想在4秒内检查温度是否大于30。
val pattern = Pattern.beginDevice
.where(_.sumtemp >= 30)
.within(Time.seconds(4))
有没有办法将此模式流的输出连接到另一个传入数据流以获得低于?
ts | custId | morethanthiry
1 | 1 | yes
2 | 4 | no
如果可以分享一个例子,我将非常感激。"
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
"有多个选项。你可以加入你的溪stream coGroup
例:
set1.coGroup(set2).where().equalTo().with(new MyCoGroupFunction());
您可以将其视为SQL中的连接。
实现的一个小例子:
class MyCoGroupFunction extends RichCoGroupFunction[DataTypeOfStream1, DataTypeOfStream2, DataTypeOfOutput] {
override def coGroup(first: DataTypeOfStream1,
second: DataTypeOfStream2],
out: DataTypeOfOutput): Unit = {
out.collect(...)
//your output
}
}
如果需要,您还可以使用状态。比如
union(如果要连接的流具有相同的数据类型)
连
coFlatMap这些方法之间的差异很小。
有关 更多信息,请参阅https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/。"