开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

Flink CDC中debezium-json消息消费能直接开多并行度不?

Flink CDC中debezium-json消息消费能直接开多并行度不?有没有同学基于这个格式,只保存after的数据呀

展开
收起
cuicuicuic 2023-12-04 08:17:55 58 0
5 条回答
写回答
取消 提交回答
  • 我们是同主键放一个分区,并行消费 ,此回答整理自钉群“Flink CDC 社区”

    2023-12-05 09:13:46
    赞同 展开评论 打赏
  • 在Flink CDC中,使用Debezium JSON消息格式进行消费时,可以通过调整并行度来实现多并发消费。您可以根据需求和系统资源情况,在Flink任务配置中设置适当的并行度。

    要配置Flink CDC任务的并行度,可以通过以下方式之一:

    1. 在代码中设置并行度:在Flink CDC任务的代码中,可以通过setParallelism()方法来设置任务的并行度。例如,env.setParallelism(4)将任务的并行度设置为4。

    2. 使用命令行参数设置并行度:将并行度作为命令行参数传递给Flink提交任务的命令。例如,flink run -p 4 your-cdc-job.jar将任务的并行度设置为4。

    需要注意的是,并行度的设置需要根据具体的场景需求、数据量、计算资源等因素进行评估和调整。合理的并行度设置可以提高任务的吞吐量和性能。

    关于只保存"After"数据的问题,您可以通过编写自定义的Flink函数或操作符来过滤掉不需要的数据。在处理Debezium JSON消息时,您可以解析JSON,获取相应字段的值,并根据条件判断是否保留该记录。这样可以实现只保存"After"数据的需求。

    需要注意的是,自定义过滤逻辑可能会增加一些额外的开销,如CPU计算和内存消耗。因此,在实际应用中,应根据数据量和性能需求进行评估,并进行适当的优化。

    2023-12-04 20:42:48
    赞同 展开评论 打赏
  • 面对过去,不要迷离;面对未来,不必彷徨;活在今天,你只要把自己完全展示给别人看。

    在Flink CDC中,Debezium-json消息消费确实可以开启多并行度。然而需要注意的是,当设置多并行度时,数据的顺序处理可能会受到影响。这是因为多个并行任务并行处理数据,并且数据到达的顺序可能无法保证。如果你需要保证数据的顺序处理,有以下几种方法可以尝试:使用单并行度,将 Flink CDC 作业的并行度设置为 1,这样只会有一个任务处理数据,确保了数据的顺序处理。但这也会限制作业的吞吐量和并行处理能力;使用时间属性进行分区,如果你的数据流中有时间属性(例如事件时间或处理时间),可以使用 Flink 的 EventTime 或 ProcessingTime 进行分区。通过对数据进行按键分区,确保同一键的数据由同一个任务处理,可以维护某种程度的顺序。

    2023-12-04 13:49:38
    赞同 展开评论 打赏
  • 在Flink CDC中,debezium-json消息消费确实支持多并行度处理。不过需要注意的是,当设置多并行度时,可能会影响数据的顺序处理,因为多个并行任务会并行处理数据,而数据到达的顺序可能无法保证。如果你需要保证数据的顺序处理,可以尝试使用单并行度,将Flink CDC作业的并行度设置为1,这样只会有一个任务处理数据,确保了数据的顺序处理,但这也会限制作业的吞吐量和并行处理能力。另一种方法是使用时间属性进行分区,如果你的数据流中有时间属性(例如事件时间或处理时间),可以使用Flink的EventTime或ProcessingTime进行分区,通过对数据进行按键分区,确保同一键的数据由同一个任务处理,可以维护某种程度的顺序。

    对于只保存after的数据的需求,你可以通过自定义转换函数来实现。具体的实现方式可能需要根据你的业务场景和数据源情况进行定制。同时,也需要注意,如果你在Flink CDC中使用Debezium JSON数据格式,但是只有"before"、"after"和"op"这三个字段,缺少其他字段(如timestamp)的话,可能的原因包括数据源配置问题,或者在CDC连接器中没有启用需要的字段。因此,在使用Debezium JSON格式处理数据时,确保正确的配置和使用对应的字段是非常重要的。

    2023-12-04 11:20:08
    赞同 展开评论 打赏
  • Flink CDC 中的 Debezium JSON 消息可以支持多并行度消费,这通常通过 Flink 的并行数据流处理机制来实现。在创建 Flink 作业时,你可以设置一个大于 1 的并行度(例如,setParallelism(4)),这样 Flink 就会在多个任务实例之间分配工作负载,从而提高处理速度和吞吐量。

    请注意,虽然可以开启多并行度,但需要考虑到数据分区和一致性的问题。在使用多并行度时,你需要确保数据被正确地分发到不同的并行任务上,以避免产生不一致的结果。这可能涉及到对消息中的键进行哈希或者其他形式的数据分区策略。

    至于只保存 after 数据的需求,这是完全可以做到的。在处理 Debezium JSON 消息时,你可以在自定义的 DeserializationSchema 或者 MapFunction 中解析 JSON 格式的消息,并仅提取 after 字段的值。以下是一个简单的示例:

    // 使用 org.apache.flink.api.common.serialization.SimpleStringSchema 或者其他合适的 JSON 解析库
    public class CustomDebeziumDeserializationSchema implements DeserializationSchema<Row> {
        @Override
        public Row deserialize(String message) throws IOException {
            // 假设这里使用了 JSON 库来解析 JSON 字符串为一个 JSON 对象
            JSONObject json = new JSONObject(message);
    
            // 提取 after 字段
            JSONObject afterJson = json.getJSONObject("after");
    
            // 将 after 字段转换为 Flink 的 Row 对象
            Row row = ...; // 使用 afterJson 创建 Row 对象
    
            return row;
        }
    
        ...
    }
    

    然后,在你的 Flink 作业中使用这个自定义的 DeserializationSchema:

    DataStreamSource<String> source = env.addSource(new FlinkKafkaConsumer<>("my_topic", 
                               new CustomDebeziumDeserializationSchema(), 
                               kafkaProperties));
    
    source.print();
    

    这样,你就只会看到 after 数据字段的内容,而不会看到 beforeop 等其他字段的信息。

    2023-12-04 09:47:35
    赞同 1 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关产品

  • 实时计算 Flink版
  • 相关电子书

    更多
    Flink CDC Meetup PPT - 龚中强 立即下载
    Flink CDC Meetup PPT - 王赫 立即下载
    Flink CDC Meetup PPT - 覃立辉 立即下载