开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

请问flink cdc中独立项目集成flink jar 部署,多实例时,如何解决监听Mysql?

请问flink cdc中独立项目集成flink jar 部署,多实例时,如何解决监听Mysql 时,重复消费(每个实例都会监听到同一条数据的变化) 的问题?spring集成了,比如有一个集群 ,多个实例,监听同

展开
收起
十一0204 2023-07-19 18:05:03 158 0
2 条回答
写回答
取消 提交回答
  • 北京阿里云ACE会长

    在 Flink CDC 中,如果需要在独立项目中集成 Flink Jar 并部署多个实例,可以使用以下方法来解决监听 MySQL 的问题:
    启动多个 Flink Job
    在独立项目中,可以使用 Flink 的 JobClient API 启动多个 Flink Job,每个 Job 监听 MySQL 的一个数据库实例。例如,可以编写一个启动脚本,按照不同的参数启动不同的 Job,例如:
    bash
    Copy
    $ flink run -c com.example.job.MyJob1 /path/to/my-job.jar --db-host=localhost --db-port=3306 --db-name=db1
    $ flink run -c com.example.job.MyJob2 /path/to/my-job.jar --db-host=localhost --db-port=3306 --db-name=db2
    在这个例子中,使用不同的参数启动两个 Flink Job,分别监听 MySQL 的 db1 和 db2 数据库实例。
    使用 FlinkKafkaConsumer
    在独立项目中,可以使用 FlinkKafkaConsumer 作为数据源,将 MySQL 数据库的 binlog 数据写入 Kafka,再使用 FlinkCDC 对 Kafka 中的数据进行消费和处理。这样可以将 MySQL 数据库和 Flink Job 进行解耦,使得多个 Flink Job 可以共享 Kafka 中的数据。例如:
    java
    Copy
    FlinkKafkaConsumer kafkaConsumer = new FlinkKafkaConsumer<>("my-topic", new SimpleStringSchema(), props);
    DataStream kafkaStream = env.addSource(kafkaConsumer);

    TableSchema schema = new TableSchema(new String[]{"id", "name", "age"}, new TypeInformation[]{Types.INT(), Types.STRING(), Types.INT()});
    CDCSource cdcSource = MySQLSource.builder()
    .hostname("localhost")
    .port(3306)
    .databaseList("my_db")
    .tableList("my_table")
    .username("root")
    .password("123456")
    .deserializer(new MyRecordDeserializationSchema(schema))
    .build();
    DataStreamSource mysqlStream = env.addSource(cdcSource);

    Table mysqlTable = tableEnv.fromDataStream(mysqlStream, "id, name, age");
    Table kafkaTable = tableEnv.fromDataStream(kafkaStream, "id, name, age, ts.rowtime");
    Table resultTable = mysqlTable.union(kafkaTable).window(Tumble.over("10.seconds").on("ts").as("w")).groupBy("w").select("sum(age)");

    DataStream resultStream = tableEnv.toDataStream(resultTable, Result.class);
    在这个例子中,使用 FlinkKafkaConsumer 从 Kafka 中读取数据,使用 FlinkCDC 从

    2023-07-29 19:39:43
    赞同 展开评论 打赏
  • 意中人就是我呀!

    伪装成子节点,是会收到的,你可以整库同步到kafka,然后消费kafka的binlog按照同一个group-id消费数据,就不会重复了,这样也会减轻mysql的压力。datastream,这是开发方式,你不是集成了spring吗,直接写java代码。
    此回答整理至钉群“Flink CDC 社区”。

    2023-07-19 18:57:51
    赞同 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关产品

  • 实时计算 Flink版
  • 相关电子书

    更多
    阿里邮箱—安全高效集成 立即下载
    集成智能接入网关APP:优化企业级移动办公网络 立即下载
    云效助力企业集成安全到DevOps中 立即下载