开发者社区> 问答> 正文

Flink SQL 如何在流式数据上使用LAG和LEAD函数

如何在流式数据源上使用分析函数LAG和EAD函数:

kafka输入数据如:
{"t":"2020-04-01T05:00:00Z", "id":"1", "speed":1.0}
{"t":"2020-04-01T05:05:00Z", "id":"1", "speed":2.0}
{"t":"2020-04-01T05:10:00Z", "id":"1", "speed":3.0}
{"t":"2020-04-01T05:15:00Z", "id":"1", "speed":4.0}
{"t":"2020-04-01T05:20:00Z", "id":"1", "speed":5.0}
{"t":"2020-04-01T05:25:00Z", "id":"1", "speed":6.0}

sql如下:

INSERT INTO topic_sink
SELECT
  t,
  id,
  speed,
  LAG(speed, 1) OVER w AS speed_1,
  LAG(speed, 2) OVER w AS speed_2
FROM topic_source
WINDOW w AS (
      PARTITION BY id
      ORDER BY t
)

我期望得到的结果数据是
{"t":"2020-04-01T05:00:00Z", "id":"1", "speed":1.0, "speed_1":null, "speed_2":null}
{"t":"2020-04-01T05:05:00Z", "id":"1", "speed":2.0,"speed_1":1.0, "speed_2":null}
{"t":"2020-04-01T05:10:00Z", "id":"1", "speed":3.0,"speed_1":2.0, "speed_2":1.0}
{"t":"2020-04-01T05:15:00Z", "id":"1", "speed":4.0,"speed_1":3.0, "speed_2":2.0}
{"t":"2020-04-01T05:20:00Z", "id":"1", "speed":5.0,"speed_1":4.0, "speed_2":3.0}
{"t":"2020-04-01T05:25:00Z", "id":"1", "speed":6.0",speed_1":5.0, "speed_2":4.0}

实际得到的结果数据是:
{"t":"2020-04-01T05:00:00Z", "id":"1", "speed":1.0, "speed_1":1.0, "speed_2":1.0}
{"t":"2020-04-01T05:05:00Z", "id":"1", "speed":2.0,"speed_1":2.0, "speed_2":2.0}
{"t":"2020-04-01T05:10:00Z", "id":"1", "speed":3.0,"speed_1":3.0, "speed_2":3.0}
{"t":"2020-04-01T05:15:00Z", "id":"1", "speed":4.0,"speed_1":4.0, "speed_2":4.0}
{"t":"2020-04-01T05:20:00Z", "id":"1", "speed":5.0,"speed_1":5.0, "speed_2":5.0}
{"t":"2020-04-01T05:25:00Z", "id":"1", "speed":6.0",speed_1":6.0, "speed_2":6.0}

想问一下flink sql里的LAG函数能完成我期望的计算吗?如果可以sql该如何写?

展开
收起
Banji 2020-06-18 15:33:02 3877 0
1 条回答
写回答
取消 提交回答
  • 请问有解决方法了吗

    2022-10-24 18:43:06
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Flink CDC Meetup PPT - 龚中强 立即下载
Flink CDC Meetup PPT - 王赫 立即下载
Flink CDC Meetup PPT - 覃立辉 立即下载