问题1:请问flinkcdc中,哪里能看到写入upsert-kafka的数据不会过期呢?
问题2:没有找到说这个的。我查了topic好像是有过期的。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
在 Flink CDC 中,可以使用 Kafka Connector 将数据写入 Kafka,同时可以设置 Kafka 中数据的过期时间。具体而言,可以在 Kafka 生产者(Producer)的配置中设置消息的过期时间,如下所示:
java
Copy
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.setProperty(ProducerConfig.MAX_BLOCK_MS_CONFIG, "5000");
properties.setProperty(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000");
properties.setProperty(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, "15000");
properties.setProperty(ProducerConfig.LINGER_MS_CONFIG, "100");
properties.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");
properties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
properties.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
properties.setProperty(ProducerConfig.ACKS_CONFIG, "all");
properties.setProperty(ProducerConfig.RETRIES_CONFIG, "3");
properties.setProperty(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "1000");
properties.setProperty(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "30000");
properties.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, "60000");
properties.setProperty(ProducerConfig.MESSAGE_TIMEOUT_MS_CONFIG, "86400000"); // 设置消息的过期时间为 1 天
在上述示例中,通过设置 ProducerConfig.MESSAGE_TIMEOUT_MS_CONFIG 属性来设置消息的过期时间,单位为毫秒。在此示例中,将消息的过期时间设置为 1 天。
需要注意的是,Kafka 中的数据过期时间是由 Kafka 自身维护的,Flink CDC 并不直接管理数据的过期时间。因此,如果需要检查 Kafka 中的数据是否已过期
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。