开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

请问下面该问题如何解决呢?

flink1.15.2版本的的kafkasink写kafka的,写进去的数据的timestamp变成了long的最大值的情况有人遇到吗?

展开
收起
JWRRR 2023-04-03 15:10:09 236 0
2 条回答
写回答
取消 提交回答
  • 公众号:网络技术联盟站,InfoQ签约作者,阿里云社区签约作者,华为云 云享专家,BOSS直聘 创作王者,腾讯课堂创作领航员,博客+论坛:https://www.wljslmz.cn,工程师导航:https://www.wljslmz.com

    Flink 1.15.2版本的KafkaSink在写入Kafka时,如果数据的timestamp为null,则会将当前系统时间作为timestamp写入Kafka。如果您的数据中的timestamp字段为null,那么写入Kafka的数据的timestamp就会是当前系统时间,而不是long的最大值。

    如果您的数据中确实存在timestamp为null且写入Kafka的数据的timestamp为long的最大值的情况,可能是因为在数据流转过程中,某些环节修改了数据的timestamp字段。您可以在Flink任务中打印出数据流中每条数据的timestamp,以便找出出现问题的环节。

    另外,需要注意的是,在Flink中使用KafkaSink写入Kafka时,可以通过设置KafkaProducerConfig中的"transaction.timeout.ms"参数来调整事务超时时间。如果事务超时时间过短,可能会导致写入Kafka失败或者部分数据写入失败。建议您适当调整该参数的值,以获得更好的写入性能和可靠性。

    2023-04-23 23:24:18
    赞同 展开评论 打赏
  • 存在即是合理

    可能是因为您没有设置正确的时间戳提取器或时间戳分配器。

    Flink的Kafka Sink提供了两种时间戳提取器:RecordTimestampExtractor和WatermarkExtractor,默认情况下使用RecordTimestampExtractor。如果使用RecordTimestampExtractor,则Flink会将每条记录的时间戳设置为Kafka消息的时间戳。如果使用WatermarkExtractor,则Flink会将每个水印(Watermark)的时间戳设置为Kafka消息的时间戳。

    另外,Kafka消息的时间戳是以毫秒为单位的long型整数,如果您的时间戳是以秒为单位的话,需要将其乘以1000。

    如果仍然遇到问题,建议检查一下您的代码,以确保正确地设置了时间戳提取器和时间戳分配器,并且将时间戳转换为正确的格式。

    2023-04-14 16:00:58
    赞同 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关电子书

更多
低代码开发师(初级)实战教程 立即下载
冬季实战营第三期:MySQL数据库进阶实战 立即下载
阿里巴巴DevOps 最佳实践手册 立即下载