如何在流式数据源上使用分析函数LAG和EAD函数:
kafka输入数据如:
{"t":"2020-04-01T05:00:00Z", "id":"1", "speed":1.0}
{"t":"2020-04-01T05:05:00Z", "id":"1", "speed":2.0}
{"t":"2020-04-01T05:10:00Z", "id":"1", "speed":3.0}
{"t":"2020-04-01T05:15:00Z", "id":"1", "speed":4.0}
{"t":"2020-04-01T05:20:00Z", "id":"1", "speed":5.0}
{"t":"2020-04-01T05:25:00Z", "id":"1", "speed":6.0}
sql如下:
INSERT INTO topic_sink
SELECT
t,
id,
speed,
LAG(speed, 1) OVER w AS speed_1,
LAG(speed, 2) OVER w AS speed_2
FROM topic_source
WINDOW w AS (
PARTITION BY id
ORDER BY t
)
我期望得到的结果数据是
{"t":"2020-04-01T05:00:00Z", "id":"1", "speed":1.0, "speed_1":null, "speed_2":null}
{"t":"2020-04-01T05:05:00Z", "id":"1", "speed":2.0,"speed_1":1.0, "speed_2":null}
{"t":"2020-04-01T05:10:00Z", "id":"1", "speed":3.0,"speed_1":2.0, "speed_2":1.0}
{"t":"2020-04-01T05:15:00Z", "id":"1", "speed":4.0,"speed_1":3.0, "speed_2":2.0}
{"t":"2020-04-01T05:20:00Z", "id":"1", "speed":5.0,"speed_1":4.0, "speed_2":3.0}
{"t":"2020-04-01T05:25:00Z", "id":"1", "speed":6.0",speed_1":5.0, "speed_2":4.0}
实际得到的结果数据是:
{"t":"2020-04-01T05:00:00Z", "id":"1", "speed":1.0, "speed_1":1.0, "speed_2":1.0}
{"t":"2020-04-01T05:05:00Z", "id":"1", "speed":2.0,"speed_1":2.0, "speed_2":2.0}
{"t":"2020-04-01T05:10:00Z", "id":"1", "speed":3.0,"speed_1":3.0, "speed_2":3.0}
{"t":"2020-04-01T05:15:00Z", "id":"1", "speed":4.0,"speed_1":4.0, "speed_2":4.0}
{"t":"2020-04-01T05:20:00Z", "id":"1", "speed":5.0,"speed_1":5.0, "speed_2":5.0}
{"t":"2020-04-01T05:25:00Z", "id":"1", "speed":6.0",speed_1":6.0, "speed_2":6.0}
想问一下flink sql里的LAG函数能完成我期望的计算吗?如果可以sql该如何写?
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。