开发者社区> 问答> 正文

关于CEP处理事件遇到问题

大佬们,请教一下,我现在使用CEP时遇到一个问题,我现在的场景是需要输入三次相同字符串打印一次匹配的List集合,但是遇到的问题是每次都需要输入第四条数据才会触发Pattern的select函数去打印List。

具体实现代码如下:

public class Run3 {

public static void main(String[] args) throws Exception {

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setParallelism(1);

final DataStream source = env.socketTextStream("localhost", 8888)

.assignTimestampsAndWatermarks(

WatermarkStrategy. forMonotonousTimestamps()

.withTimestampAssigner((String s, long ts) -> System.currentTimeMillis())

)

.keyBy(s -> s);

source.print("source ");

final Pattern<String, String> pattern = Pattern. begin("begin", AfterMatchSkipStrategy.skipPastLastEvent())

.where(new SimpleCondition () {

@Override

public boolean filter(String s) throws Exception {

return true;

}

}).times(3);

final PatternStream patternStream = CEP.pattern(source, pattern);

patternStream.select(new PatternSelectFunction<String, Object>() {

@Override

public Object select(Map<String, List > pattern) {

return pattern.get("begin");

}

}).print("result ");

env.execute();

}

}

环境如下:

Flink 1.12.2

OS:Windows 10

编程工具:IDEA 2021.1.2

使用的是Flink默认的事件时间,水位线用的是单调递增的,使用的是系统时间

运行结果如下所示:

[cid:image001.png@01D75E43.4DB77F10]*来自志愿者整理的flink邮件归档

展开
收起
moonlightdisco 2021-12-02 14:07:19 820 0
1 条回答
写回答
取消 提交回答
  • hello ,使用EventTime的前提下是这样的。事件来了之后不会立即去触发匹配,而是会注册一个timer,然后将数据缓存起来。当后续有事件

    advanceWatermark 触发 timer之后才会开始计算。*来自志愿者整理的FLINK邮件归档

    2021-12-02 14:32:24
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
低代码开发师(初级)实战教程 立即下载
冬季实战营第三期:MySQL数据库进阶实战 立即下载
阿里巴巴DevOps 最佳实践手册 立即下载