请问flinkcdc中,哪里能看到写入upsert-kafka的数据不会过期呢?

问题1:请问flinkcdc中,哪里能看到写入upsert-kafka的数据不会过期呢?
问题2:没有找到说这个的。我查了topic好像是有过期的。

展开
收起
十一0204 2023-07-19 18:38:27 120 分享 版权
2 条回答
写回答
取消 提交回答
  • 北京阿里云ACE会长

    在 Flink CDC 中,可以使用 Kafka Connector 将数据写入 Kafka,同时可以设置 Kafka 中数据的过期时间。具体而言,可以在 Kafka 生产者(Producer)的配置中设置消息的过期时间,如下所示:
    java
    Copy
    Properties properties = new Properties();
    properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    properties.setProperty(ProducerConfig.MAX_BLOCK_MS_CONFIG, "5000");
    properties.setProperty(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000");
    properties.setProperty(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, "15000");
    properties.setProperty(ProducerConfig.LINGER_MS_CONFIG, "100");
    properties.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");
    properties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
    properties.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
    properties.setProperty(ProducerConfig.ACKS_CONFIG, "all");
    properties.setProperty(ProducerConfig.RETRIES_CONFIG, "3");
    properties.setProperty(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "1000");
    properties.setProperty(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "30000");
    properties.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, "60000");
    properties.setProperty(ProducerConfig.MESSAGE_TIMEOUT_MS_CONFIG, "86400000"); // 设置消息的过期时间为 1 天
    在上述示例中,通过设置 ProducerConfig.MESSAGE_TIMEOUT_MS_CONFIG 属性来设置消息的过期时间,单位为毫秒。在此示例中,将消息的过期时间设置为 1 天。
    需要注意的是,Kafka 中的数据过期时间是由 Kafka 自身维护的,Flink CDC 并不直接管理数据的过期时间。因此,如果需要检查 Kafka 中的数据是否已过期

    2023-07-29 19:15:10
    赞同 展开评论
  • 意中人就是我呀!

    回答1:你百度upsert-kafka和kafka的区别。此回答整理至钉群“Flink CDC 社区”。

    2023-07-19 19:03:23
    赞同 展开评论

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

还有其他疑问?
咨询AI助理