Flink sql 支持在流式数据上使用LAG和LEAD函数吗-问答-阿里云开发者社区-阿里云

开发者社区> 问答> 正文

Flink sql 支持在流式数据上使用LAG和LEAD函数吗

Banji 2020-06-18 15:37:00 222

如何在流式数据源上使用分析函数LAG和EAD函数:

kafka输入数据如:
{"t":"2020-04-01T05:00:00Z", "id":"1", "speed":1.0}
{"t":"2020-04-01T05:05:00Z", "id":"1", "speed":2.0}
{"t":"2020-04-01T05:10:00Z", "id":"1", "speed":3.0}
{"t":"2020-04-01T05:15:00Z", "id":"1", "speed":4.0}
{"t":"2020-04-01T05:20:00Z", "id":"1", "speed":5.0}
{"t":"2020-04-01T05:25:00Z", "id":"1", "speed":6.0}

sql如下:

INSERT INTO topic_sink
SELECT
  t,
  id,
  speed,
  LAG(speed, 1) OVER w AS speed_1,
  LAG(speed, 2) OVER w AS speed_2
FROM topic_source
WINDOW w AS (
      PARTITION BY id
      ORDER BY t
)

我期望得到的结果数据是
{"t":"2020-04-01T05:00:00Z", "id":"1", "speed":1.0, "speed_1":null, "speed_2":null}
{"t":"2020-04-01T05:05:00Z", "id":"1", "speed":2.0,"speed_1":1.0, "speed_2":null}
{"t":"2020-04-01T05:10:00Z", "id":"1", "speed":3.0,"speed_1":2.0, "speed_2":1.0}
{"t":"2020-04-01T05:15:00Z", "id":"1", "speed":4.0,"speed_1":3.0, "speed_2":2.0}
{"t":"2020-04-01T05:20:00Z", "id":"1", "speed":5.0,"speed_1":4.0, "speed_2":3.0}
{"t":"2020-04-01T05:25:00Z", "id":"1", "speed":6.0",speed_1":5.0, "speed_2":4.0}

实际得到的结果数据是:
{"t":"2020-04-01T05:00:00Z", "id":"1", "speed":1.0, "speed_1":1.0, "speed_2":1.0}
{"t":"2020-04-01T05:05:00Z", "id":"1", "speed":2.0,"speed_1":2.0, "speed_2":2.0}
{"t":"2020-04-01T05:10:00Z", "id":"1", "speed":3.0,"speed_1":3.0, "speed_2":3.0}
{"t":"2020-04-01T05:15:00Z", "id":"1", "speed":4.0,"speed_1":4.0, "speed_2":4.0}
{"t":"2020-04-01T05:20:00Z", "id":"1", "speed":5.0,"speed_1":5.0, "speed_2":5.0}
{"t":"2020-04-01T05:25:00Z", "id":"1", "speed":6.0",speed_1":6.0, "speed_2":6.0}

想问一下flink sql里的LAG函数能完成我期望的计算吗?如果可以sql该如何写?

SQL 消息中间件 Kafka 流计算
分享到
取消 提交回答
全部回答(1)
  • python小菜菜
    2020-06-20 13:49:33

    SQL Server从2012开始,引入了LEAD和LAG函数,这两个函数可以把之前要关联查询的方法,改为可直接获取当前数据上下相邻多少行数据,描述的有点不清楚,上数据吧,测试数据:

    --测试数据 if not object_id(N'Tempdb..#T') is null drop table #T Go Create table #T([Id] int,[Value] int) Insert #T select 1,10 union all select 2,20 union all select 3,30 union all select 4,40 union all select 5,50 Go --测试数据结束        以LEAD方法为例(LAG一样),语法是这样的:LEAD ( scalar_expression [ ,offset ] , [ default ] ) OVER ( [ partition_by_clause ] order_by_clause ),读取的数据,偏移多少行,如果没有值默认值是多少,写法如下:

    SELECT *, LEAD(Value, 1, 666) OVER (ORDER BY Value) AS LEADVALUE, --提前1行,默认值666 LAG(Value, 2, 888) OVER (ORDER BY Value) AS LAGVALUE --滞后2行,默认值888 FROM #T;        结果如下:

           我们也可以不写偏移数和默认值,结果就是这样的:

    SELECT *, LEAD(Value) OVER (ORDER BY Value) AS LEADVALUE, --提前1行 LAG(Value) OVER (ORDER BY Value) AS LAGVALUE --滞后1行

    0 0
大数据
使用钉钉扫一扫加入圈子
+ 订阅

大数据计算实践乐园,近距离学习前沿技术

推荐文章
相似问题
推荐课程