开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

Flink CDC中mysql cdc支持全量的时候并发读取,这个怎么配置呀?

Flink CDC中mysql cdc支持全量的时候并发读取,这个怎么配置呀?我使用了flink -p 命令指定了,但是task还是单并行度image.png
上面是jobManager的config已经显示配置了4个并行度,但是task执行的时候还是一个

展开
收起
真的很搞笑 2023-12-04 07:34:22 200 0
4 条回答
写回答
取消 提交回答
  • 多并行度就行了 ,此回答整理自钉群“Flink CDC 社区”

    2023-12-05 08:27:40
    赞同 展开评论 打赏
  • Flink CDC从2.0版本开始支持全量数据的并发读取,这项特性在2021年8月的更新中被引入。要实现MySQL的全量并发读取,首先需要添加Flink CDC MySQL连接器的依赖。如果你使用Maven,可以在pom.xml文件中添加如下依赖:

    <groupId>com.ververica</groupId>
    <artifactId>flink-connector-mysql-cdc</artifactId>
    <version>2.5-SNAPSHOT</version>
    

    注意这里的版本号应使用已发布的版本,对于snapshot版本的依赖需要本地自行编译。

    然后,你可以使用Flink的DataStream API或Table API来读取全量数据。以下是一个使用DataStream API的例子:

    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
    import org.apache.flink.types.Row;
    
    // 创建流执行环境
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    // 创建表执行环境
    StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
    
    // 设置MySQL的CDC连接器相关参数
    String url = "jdbc:mysql://localhost:3306/test?serverTimezone=UTC";
    String user = "root";
    String password = "password";
    String database = "test";
    String table = "my_table";
    
    // 使用Flink SQL读取全量数据
    DataStream<Row> result = tableEnv.toAppendStream(tableEnv.sqlQuery("SELECT * FROM " + database + "." + table), Row.class);
    

    以上代码将连接到MySQL数据库,并将指定表的所有数据作为初始状态读取出来。请确保替换上述代码中的数据库连接信息为你自己的数据库信息。

    2023-12-04 23:11:09
    赞同 展开评论 打赏
  • 面对过去,不要迷离;面对未来,不必彷徨;活在今天,你只要把自己完全展示给别人看。

    在Flink CDC中,MySQL CDC支持全量并发读取的配置如下:

    1. 首先,确保你已经添加了MySQL JDBC驱动和Debezium MySQL Connector的依赖。

    2. 在Flink SQL中创建源表时,使用debezium-sql-connector作为连接器名称,并指定MySQL的连接信息、要监控的数据库和表等参数。例如:

    CREATE TABLE my_source (
      id INT,
      name STRING,
      age INT
    ) WITH (
      'connector' = 'mysql-cdc',
      'hostname' = 'localhost',
      'port' = '3306',
      'username' = 'root',
      'password' = 'password',
      'database-name' = 'mydb',
      'table-name' = 'mytable',
      'debezium-internal.offset-storage' = 'org.apache.flink.streaming.connectors.kafka.KafkaOffsetBackingStore',
      'debezium-internal.offset-storage.topic' = 'mydb-offsets',
      'debezium-internal.offset-storage.partition.group.id' = 'mydb-group',
      'scan.startup.mode' = 'latest-offset',
      'scan.startup.timeout' = '30000',
      'format' = 'debezium-json',
      'debezium-sql-connector.history.kafka.bootstrap.servers' = 'localhost:9092',
      'debezium-sql-connector.history.kafka.topic' = 'dbhistory.mydb',
      'debezium-sql-connector.history.kafka.groupId' = 'mydb-group',
      'debezium-sql-connector.snapshot.mode' = 'initial',
      'debezium-sql-connector.max.queued.messages' = '10000',
      'debezium-sql-connector.max.retry.attempts' = '16',
      'debezium-sql-connector.heartbeat.interval' = '10000',
      'debezium-sql-connector.max.allowed.packet.size' = '5242880',
      'checkpointing.interval' = '60000',
      'parallelism' = '4' -- 设置并行度为4
    );
    
    1. 然后,执行Flink SQL查询以消费数据。例如:
    SELECT * FROM my_source;
    

    通过以上配置,Flink CDC中的MySQL CDC应该可以支持全量并发读取。如果仍然遇到问题,请检查Flink集群的资源分配情况,确保有足够的资源来支持并发读取。

    2023-12-04 14:28:26
    赞同 展开评论 打赏
  • 在Flink中,并行度的设置是在JobGraph层面进行的,而不是在Task层面进行的。JobGraph是Flink作业的静态描述,包含了作业的所有信息,包括输入输出、算子、并行度等。而Task是JobGraph在执行时的实例化,每个Task负责处理一部分数据。

    在你的截图中,我们可以看到JobManager的Config显示了Parallelism为4,这意味着整个Flink作业的并行度被设置为4。然而,这并不意味着每个Task都会以4个并行度运行。在每个Task中,并行度是由算子的并行度决定的。

    对于Flink CDC的MySQL连接器来说,它的并行度默认为1,也就是说,无论作业的并行度如何设置,Flink CDC的MySQL连接器只会启动一个并行任务来读取MySQL的binlog。这是因为Flink CDC的MySQL连接器需要保证数据的一致性,不能同时从同一个binlog position读取数据。

    如果你想提高Flink CDC的MySQL连接器的并行度,你需要修改连接器的配置,将parallelism属性设置为大于1的值。例如,你可以通过编程的方式,在创建SourceFunction时设置parallelism属性,或者通过命令行的方式,在启动Flink任务时设置parallelism属性。

    2023-12-04 11:59:27
    赞同 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关产品

  • 实时计算 Flink版
  • 相关电子书

    更多
    搭建电商项目架构连接MySQL 立即下载
    搭建4层电商项目架构,实战连接MySQL 立即下载
    PolarDB MySQL引擎重磅功能及产品能力盛大发布 立即下载

    相关镜像