如何使用Flink CEP创建批处理或幻灯片窗口?-问答-阿里云开发者社区-阿里云

开发者社区> 问答> 正文

如何使用Flink CEP创建批处理或幻灯片窗口?

2018-12-06 18:11:30 2544 1

Flink CEP来自Esper CEP引擎。正如您可能(或不知道)所知,在Esper使用他们的语法(EPL)时,您可以轻松地创建一个batch或一个slide窗口,在这些窗口中对事件进行分组,并允许您将这些事件与函数(avg,max,min,...)一起使用。

例如,使用以下模式,您可以创建5秒的批处理窗口,并计算在指定窗口中收到price的所有Stock事件的属性的平均值。

select avg(price) from Stock#time_batch(5 sec)
问题是我想知道如何实现这一点Flink CEP。我知道,可能的目标或方法可能Flink CEP不同,因此实现这一目标的方式可能并不像in中那么简单Esper CEP。

我已经看过关于时间窗口的文档,但我无法实现这个窗口Flink CEP。因此,给出以下代码:

DataStream stream = ...; // Consume events from Kafka

// Filtering events with negative price
Pattern pattern = Pattern.begin("start")

        .where(
                new SimpleCondition<Stock>() {
                    public boolean filter(Stock event) {
                        return event.getPrice() >= 0;
                    }
                }
        );

PatternStream patternStream = CEP.pattern(stream, pattern);

/**
CREATE A BATCH WINDOW OF 5 SECONDS IN WHICH
I COMPUTE OVER THE AVERAGE PRICES AND, IF IT IS
GREATER THAN A THREESHOLD, AN ALERT IS DETECTED

return avg(allEventsInWindow.getPrice()) > 1;
*/

DataStream result = patternStream.select(

        new PatternSelectFunction<Stock, Alert>() {
            @Override
            public Alert select(Map<String, List<Stock>> pattern) throws Exception {
                return new Alert(pattern.toString());
            }
        }
);

如何创建该窗口,从第一个窗口开始,我开始计算5秒内后续事件的平均值。例如:

t = 0 seconds
Stock(price = 1); (...starting batch window...)
Stock(price = 1);
Stock(price = 1);
Stock(price = 2);
Stock(price = 2);
Stock(price = 2);
t = 5 seconds (...end of batch window...)
Avg = 1.5 => Alert detected!
5秒后的平均值为1.5,并将触发警报。我该如何编码呢?

取消 提交回答
全部回答(1)
  • flink小助手
    2019-07-17 23:18:39

    使用Flink的CEP库时,此行为无法表达。我建议使用Flink DataStream或Table API来计算平均值。基于此,您可以再次使用CEP生成其他事件。

    final DataStream input = env

    .fromElements(
            new Stock(1L, 1.0),
            new Stock(2L, 2.0),
            new Stock(3L, 1.0),
            new Stock(4L, 2.0))
    .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Stock>(Time.seconds(0L)) {
        @Override
        public long extractTimestamp(Stock element) {
            return element.getTimestamp();
        }
    });
    

    final DataStream windowAggregation = input

    .timeWindowAll(Time.milliseconds(2))
    .aggregate(new AggregateFunction<Stock, Tuple2<Integer, Double>, Double>() {
        @Override
        public Tuple2<Integer, Double> createAccumulator() {
            return Tuple2.of(0, 0.0);
        }
    
        @Override
        public Tuple2<Integer, Double> add(Stock value, Tuple2<Integer, Double> accumulator) {
            return Tuple2.of(accumulator.f0 + 1, accumulator.f1 + value.getValue());
        }
    
        @Override
        public Double getResult(Tuple2<Integer, Double> accumulator) {
            return accumulator.f1 / accumulator.f0;
        }
    
        @Override
        public Tuple2<Integer, Double> merge(Tuple2<Integer, Double> a, Tuple2<Integer, Double> b) {
            return Tuple2.of(a.f0 + b.f0, a.f1 + b.f1);
        }
    });
    0 0
相关问答

1

回答

如何用 Flink SQL 做简单的数据去重?

2021-12-07 17:22:10 380浏览量 回答数 1

1

回答

请教大神们关于flink-sql中数据赋值问题

2021-12-07 10:53:25 428浏览量 回答数 1

1

回答

如何设置FlinkSQL并行度?

2021-12-07 16:44:04 413浏览量 回答数 1

1

回答

Flink SQL No Watermark如何解决?

2021-12-07 17:33:52 636浏览量 回答数 1

1

回答

Flink SQL的实现原理是什么?

2021-12-07 19:47:18 313浏览量 回答数 1

1

回答

FlinkSQL的是如何实现的?

2021-12-07 19:57:54 171浏览量 回答数 1

1

回答

Flink SQL 自定义函数有哪些?

2021-12-07 20:31:33 170浏览量 回答数 1

1

回答

Flink sql join问题

2021-12-07 22:20:17 370浏览量 回答数 1

1

回答

flink sql实时计算分位数如何实现

2021-12-06 11:44:19 764浏览量 回答数 1

1

回答

Flink SQL读取复杂JSON格式

2021-12-06 12:16:54 581浏览量 回答数 1
+关注
flink小助手
flink小助手会定期更新直播回顾等资料和文章干货,还整合了大家在钉群提出的有关flink的问题及回答。
0
文章
377
问答
问答排行榜
最热
最新
相关电子书
更多
低代码开发师(初级)实战教程
立即下载
阿里巴巴DevOps 最佳实践手册
立即下载
冬季实战营第三期:MySQL数据库进阶实战
立即下载