Apache flink 1.52 Rowtime时间戳为空-问答-阿里云开发者社区-阿里云

开发者社区> 问答> 正文

Apache flink 1.52 Rowtime时间戳为空

2018-12-10 11:41:08 4181 1

我正在使用以下代码进行一些查询:

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<Row> ds = SourceHelp.builder().env(env).consumer010(MyKafka.builder().build().kafkaWithWaterMark2())
        .rowTypeInfo(MyRowType.builder().build().typeInfo())
        .build().source4();
//,proctime.proctime,rowtime.rowtime
String sql1 = "select a,b,max(rowtime)as rowtime from user_device group by a,b";
DataStream<Row> ds2 = TableHelp.builder().tableEnv(tableEnv).tableName("user_device").fields("a,b,rowtime.rowtime")
        .rowTypeInfo(MyRowType.builder().build().typeInfo13())
        .sql(sql1).in(ds).build().result();

ds2.print();
// String sql2 = "select a,count(b) as b from user_device2 group by a";
String sql2 = "select a,count(b) as b,HOP_END(rowtime,INTERVAL '5' SECOND,INTERVAL '30' SECOND) as c from user_device2 group by HOP(rowtime, INTERVAL '5' SECOND, INTERVAL '30' SECOND),a";
DataStream<Row> ds3 = TableHelp.builder().tableEnv(tableEnv).tableName("user_device2").fields("a,b,rowtime.rowtime")
        .rowTypeInfo(MyRowType.builder().build().typeInfo14())
        .sql(sql2).in(ds2).build().result();

ds3.print();
env.execute("test");

注意:对于sql1,我使用带有rowtime的max函数,它不起作用,并抛出以下异常:

线程“main”中的异常org.apache.flink.runtime.client.JobExecutionException:java.lang.RuntimeException:Rowtime时间戳为null。请确保定义了正确的TimestampAssigner,并且流环境使用EventTime时间特性。在org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:625)在org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)在com.aicaigroup.water .WaterTest.testRowtimeWithMoreSqls5(WaterTest.java:158)at com.aicaigroup.water.WaterTest.main(WaterTest.java:20)引起:java.lang.RuntimeException:Rowtime timestamp为null。请确保定义了正确的TimestampAssigner,并且流环境使用EventTime时间特性。

然后我尝试像这样更新sql1“从user_device中选择a,b,rowtime”,它可以工作。那么如何修复错误呢?第一个sql应该使用group by,第二个sql应该使用timeWindow的rowtime。

取消 提交回答
全部回答(1)
  • flink小助手
    2019-07-17 23:19:09

    用assignTimestampsAndWatermarks,只需使用默认和普通的实现BoundedOutOfOrdernessTimestampExtractor。您需要编写extractTimestamp函数来提取时间戳值并在构造函数中声明窗口间隔。
    append,proctime.proctime,rowtime.rowtime在字段的末尾(我使用fromDataStream(Flink 1.6)将流转换为表)
    如果要将exists字段用作rowtime。例如,数据源字段是“a,clicktime,c”,您可以声明“a,clicktime.rowtime,c”
    希望它可以帮助你。

    0 0
相关问答

1

回答

Flink SQL 的内置函数有什么?

2021-12-09 20:19:56 402浏览量 回答数 1

1

回答

如何用 Flink SQL 做简单的数据去重?

2021-12-07 17:22:10 335浏览量 回答数 1

1

回答

请教大神们关于flink-sql中数据赋值问题

2021-12-07 10:53:25 378浏览量 回答数 1

1

回答

Flink SQL 关键字 user?

2021-12-06 15:19:44 232浏览量 回答数 1

1

回答

FlinkSQL 窗口使用问题

2021-12-06 14:51:53 793浏览量 回答数 1

1

回答

flink sql实时计算分位数如何实现

2021-12-06 11:44:19 708浏览量 回答数 1

1

回答

Flink SQL共享source 问题

2021-12-06 12:31:23 503浏览量 回答数 1

1

回答

请问flink sql实时计算分位数如何实现哈?

2021-12-06 12:31:00 634浏览量 回答数 1

1

回答

flink sql 如何实时计算百分比

2021-12-06 12:18:21 378浏览量 回答数 1

1

回答

DataStreamAPI 与flink sql疑问

2021-12-05 11:38:01 313浏览量 回答数 1
+关注
flink小助手
flink小助手会定期更新直播回顾等资料和文章干货,还整合了大家在钉群提出的有关flink的问题及回答。
0
文章
377
问答
问答排行榜
最热
最新
相关电子书
更多
低代码开发师(初级)实战教程
立即下载
阿里巴巴DevOps 最佳实践手册
立即下载
冬季实战营第三期:MySQL数据库进阶实战
立即下载