大佬们,请教一下,我现在使用CEP时遇到一个问题,我现在的场景是需要输入三次相同字符串打印一次匹配的List集合,但是遇到的问题是每次都需要输入第四条数据才会触发Pattern的select函数去打印List。
具体实现代码如下:
public class Run3 {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
final DataStream source = env.socketTextStream("localhost", 8888)
.assignTimestampsAndWatermarks(
WatermarkStrategy. forMonotonousTimestamps()
.withTimestampAssigner((String s, long ts) -> System.currentTimeMillis())
)
.keyBy(s -> s);
source.print("source ");
final Pattern<String, String> pattern = Pattern. begin("begin", AfterMatchSkipStrategy.skipPastLastEvent())
.where(new SimpleCondition () {
@Override
public boolean filter(String s) throws Exception {
return true;
}
}).times(3);
final PatternStream patternStream = CEP.pattern(source, pattern);
patternStream.select(new PatternSelectFunction<String, Object>() {
@Override
public Object select(Map<String, List > pattern) {
return pattern.get("begin");
}
}).print("result ");
env.execute();
}
}
环境如下:
Flink 1.12.2
OS:Windows 10
编程工具:IDEA 2021.1.2
使用的是Flink默认的事件时间,水位线用的是单调递增的,使用的是系统时间
运行结果如下所示:
[cid:image001.png@01D75E43.4DB77F10]*来自志愿者整理的flink邮件归档
hello ,使用EventTime的前提下是这样的。事件来了之后不会立即去触发匹配,而是会注册一个timer,然后将数据缓存起来。当后续有事件
advanceWatermark 触发 timer之后才会开始计算。*来自志愿者整理的FLINK邮件归档
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。