开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

Flink需求: 两个设备日志流通过设备id和相近时间进行关联 请问我可能哪一步出问题了?

Flink需求:
两个设备日志流通过设备id和相近时间进行关联

设备日志流在kafka的aTopic和bTopic里面,由于有灰度设置,灰度设置分别对应aTopicGray,bTopicGray,他们互为灰度

所有flink的job消费的时候需要同时消费全量和灰度的topic。

会发生的情况:topic有多个分区, 切换过程中灰度的topic和非灰度的topic都有新数据,全量之后原Topic不会有新数据
并且topic里面数据只保留一段时间,过了几天之后数据会过期被清除

实现:
1,通过kafka消费两个日志流

KafkaSource source = KafkaSource.builder()
.setBootstrapServers(brokers)
.setTopics("aTopic,aTopicGray")
.setGroupId("my-group")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();

env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");

2,将stream转成table,并且设置时间戳和水印,创建临时表。
Table table =
tableEnv.fromDataStream(
dataStream,
Schema.newBuilder()
.columnByExpression("rowtime", "CAST(event_time AS TIMESTAMP_LTZ(3))")
.watermark("rowtime", "rowtime - INTERVAL '10' SECOND")
.build());

tableEnv.createTemporaryView("MyView", dataStream);

3,使用sqlQuery来进行interval join,并且设置时间戳和水印,创建临时表

Table joinedTable =
tableEnv.sqlQuery(
"SELECT U.name, O.amount " +
"FROM UserTable U left join OrderTable O " +
"WHERE U.uid = O.uid AND O.ts BETWEEN U.ts AND U.ts + INTERVAL '5' MINUTES");

DataStream join_table_stream = tableEnv.toDataStream(joinedTable);

Table join_table_stream_watermark =
tableEnv.fromDataStream(
dataStream,
Schema.newBuilder()
.columnByExpression("joinrowtime", "CAST(event_time AS TIMESTAMP_LTZ(3))")
.watermark("joinrowtime", "joinrowtime - INTERVAL '10' SECOND")
.build());

tEnv.createTemporaryView("join_table_stream_watermark", join_table_stream_watermark);

4,使用Window Top-N 来对关联进行去重,只保留关联到的第一条记录
SELECT *
FROM (
SELECT bidtime, price, item, supplier_id, window_start, window_end, ROW_NUMBER() OVER (PARTITION BY window_start, window_end,objId,aTime ORDER BY bTime DESC) as rownum
FROM TABLE(
TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))
) WHERE rownum <= 1;

5,进行后续操作

遇到的问题:
在测试环境没有灰度和单分区的环境是正常运行的,到了生产环境就在Window Top-N最后这一步遇到的时间戳推进缓慢,并且输出的关联到的数据少
1,请问我可能哪一步出问题了?是什么问题?可以怎么解决?
2,当灰度和全量topic有一个topic数据都过期了的情况下是不是会影响?
3,Window Top-N对通过sqlQuery产生的joinTable这种临时表是否能够正常作用?
4,flink的水印时间和水印在我这里面的datastream和table之间转换和sqlQuery之后流的转换过程中是怎样流转和变化的?flink 1.17.2

展开
收起
真的很搞笑 2024-05-06 18:00:50 45 0
1 条回答
写回答
取消 提交回答
  • 如果只是仅仅保证最新一条数据的话,我觉得还不如直接一个topic一个分区一个并行度,更新写,当然如果数据量大,可以并发写多个字段,每个并行度写一个,然后取多个字段的最新的数据再合并,这个只是我的思路,因为我不知道你具体想要做啥,但是有主键就可以,不过还是要看你们底层的数据库是啥,我们是doris 可以这么干。此回答来自钉群Flink CDC 社区。

    2024-05-24 19:42:32
    赞同 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关产品

  • 实时计算 Flink版
  • 热门讨论

    热门文章

    相关电子书

    更多
    PostgresChina2018_赖思超_PostgreSQL10_hash索引的WAL日志修改版final 立即下载
    Kubernetes下日志实时采集、存储与计算实践 立即下载
    日志数据采集与分析对接 立即下载