"问题1:2023-07-13 11:32:51,308 INFO org.apache.kafka.clients.FetchSessionHandler [] - [Consumer clientId=consumer-group7-2, groupId=group7] Error sending fetch request (sessionId=1029083009, epoch=INITIAL) to node 3: {}.
org.apache.kafka.common.errors.DisconnectException: null
2023-07-13 11:32:51,311 WARN org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher [] - Committing offsets to Kafka failed. This does not compromise Flink's checkpoints.
org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit failed with a retriable exception. You should retry committing the latest consumed offsets.
Caused by: org.apache.kafka.common.errors.TimeoutException: Failed to send request after 60000 ms.
flinkcdc中flink消费kafka的数据报这个错是为什么?放的是upsert-kafka,又从upsert-kafka dataStream方式读的。org.apache.kafka.common.errors.DisconnectException,kafka没挂。
问题2:不可能吧,kafka又不是我一个人用的。别的业务线都在用,如果真是kafka挂了,所有的人都炸了。等不到我说。上次我往kafka写了一个2千多万的表,kafka崩了。他们都炸了。增量的数据到目前为止150多万。"
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
在 Flink CDC 中,如果 Flink 消费 Kafka 的数据时报错,可能是因为多种原因导致的。常见的错误类型包括:
序列化错误
如果 Flink 消费 Kafka 的数据时报错,并提示类似 java.io.IOException: Failed to deserialize Avro record 的错误信息,可能是因为 Kafka 中的数据格式与 Flink 的数据格式不匹配,或者数据序列化/反序列化的代码有误导致的。解决方法通常包括检查数据格式和序列化/反序列化代码是否正确,以及针对特定的数据格式和序列化库进行调整和优化。
数据丢失
如果 Flink 消费 Kafka 的数据时报错,并提示类似 OffsetOutOfRangeException 或者 kafka.common.OffsetOutOfRangeException 的错误信息,可能是因为 Flink 消费的数据已经超出了 Kafka 的消息偏移量范围,导致数据无法正确消费和处理。解决方法通常包括重新设置消费的起始偏移量,或者对数据进行去重和补偿等操作。
配置错误
如果 Flink 消费 Kafka 的数据时报错,并提示类似 java.lang.IllegalArgumentException: requirement failed: Could not find a 'KafkaProducer' configuration property
"回答1:你现在看到的是没挂,当时程序运行的时候你不知道,你这个日志就是显示flink和kafka连接失败。
回答2:你写的是upsert-kakfa,upsert-kakfa相当于持久化数据。和你没关系。upser-kafka慎用。此回答整理至钉群“Flink CDC 社区”。"
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。