开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

在使用 Flink CDC 进行数据流处理时,这个 Kafka sink 是在哪里实现的?

在使用 Flink CDC 进行数据流处理时,我注意到在Flink的源码中似乎没有看到对应 Kafka 的 sink 连接器实现。然而,官方示例中提到了 Kafka sink,我想请问这个 Kafka sink 是在哪里实现的?此外,在尝试编译并运行相关的 pull request 时,无法控制发送 schema change,且发送的Log里字段名称变成了f1,f2序列,没有按预期展示字段名称。请问有人遇到相同的问题吗?
debe994c8ff1fc1250820ecdf2e21b83.jpg
9ee4c8672de63867838d8df5f5b19238.jpg

展开
收起
小小鹿鹿鹿 2024-02-25 22:38:58 105 0
2 条回答
写回答
取消 提交回答
  • 仅仅是demo, 具体的操作还是得看pipeline那一栏,kafka sink目前PR还没合并,
    https://github.com/ververica/flink-cdc-connectors/pull/2938
    。此回答来自钉群Flink CDC 社区。

    2024-02-26 17:53:20
    赞同 展开评论 打赏
  • 面对过去,不要迷离;面对未来,不必彷徨;活在今天,你只要把自己完全展示给别人看。

    Flink CDC 的源码中确实没有直接实现 Kafka Sink,但是可以通过 Flink 提供的 Kafka Connector 来实现将数据写入到 Kafka。

    在 Flink CDC 的官方文档和示例中,通常会使用 Flink 的 Kafka Connector 来将数据写入到 Kafka。Kafka Connector 是 Flink 提供的一个通用的 Kafka 连接器,它支持将数据流写入到 Kafka Topic 中。

    要使用 Flink 的 Kafka Connector,你需要在代码中添加相应的依赖,并配置相关的参数。以下是一个简单的示例代码,演示了如何使用 Flink 的 Kafka Connector 将数据写入到 Kafka:

    import org.apache.flink.api.common.serialization.SimpleStringSchema;
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
    import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
    import org.apache.kafka.clients.producer.ProducerRecord;
    
    // 创建 Kafka 生产者配置
    Properties properties = System.getProperties();
    properties.setProperty("bootstrap.servers", "localhost:9092"); // 设置 Kafka 服务器地址
    properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    
    // 创建 Kafka 序列化器
    KafkaSerializationSchema<String> kafkaSerializationSchema = new SimpleStringSchema();
    
    // 创建 Kafka 生产者
    FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>(
        "my-topic", // 目标 Kafka Topic
        kafkaSerializationSchema,
        properties,
        FlinkKafkaProducer.Semantic.EXACTLY_ONCE // 设置语义为精确一次处理
    );
    
    // 将数据写入到 Kafka
    dataStream.addSink(kafkaProducer);
    

    上述代码中,我们首先设置了 Kafka 服务器的地址和序列化器的配置,然后创建了一个 Kafka 序列化器和一个 Kafka 生产者。最后,我们将数据流添加到 Kafka 生产者中,实现了将数据写入到 Kafka Topic 的功能。

    需要注意的是,具体的配置和使用方法可能会根据你使用的 Flink 版本和环境而有所不同。因此,在实际使用时,请根据你的具体情况进行适当的调整和配置。

    2024-02-26 13:23:05
    赞同 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关产品

  • 实时计算 Flink版
  • 相关电子书

    更多
    Java Spring Boot开发实战系列课程【第16讲】:Spring Boot 2.0 实战Apache Kafka百万级高并发消息中间件与原理解析 立即下载
    MaxCompute技术公开课第四季 之 如何将Kafka数据同步至MaxCompute 立即下载
    消息队列kafka介绍 立即下载