开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

Flink CDC全库订阅binlog你们怎么配置的,我们这好像有漏数据的情况?

Flink CDC全库订阅binlog你们怎么配置的,我们这好像有漏数据的情况?

展开
收起
真的很搞笑 2024-01-01 09:01:42 64 0
3 条回答
写回答
取消 提交回答
  • 对表结构有要求, 我这边试的结果是要满足这些条件
    ● 不使用 blob 字段
    ● 不用 char 类型字段做主键
    ● 建表须有主键
    ● 不要用 double 类型做主键 ,此回答整理自钉群“Flink CDC 社区”

    2024-01-02 08:13:32
    赞同 展开评论 打赏
  • 面对过去,不要迷离;面对未来,不必彷徨;活在今天,你只要把自己完全展示给别人看。

    Flink CDC 全库订阅binlog的配置可以参考以下步骤:

    1. 配置MySQL的binlog格式为ROW。在MySQL的配置文件中添加以下内容:
    [mysqld]
    log-bin=mysql-bin
    binlog-format=ROW
    server-id=1
    
    1. 在Flink中创建BinlogSourceFunction,并指定要订阅的binlog文件名和位置。例如:
    BinlogSourceFunction<String> binlogSourceFunction = new BinlogSourceFunction<>(
        "localhost", 3306, "root", "password", "test_db", "test_table");
    binlogSourceFunction.setStartPosition(BinlogEntry.Position.START_EVENT_V4);
    
    1. 将BinlogSourceFunction添加到Flink数据流中,并进行相应的转换和处理操作。例如:
    DataStream<String> binlogStream = env.addSource(binlogSourceFunction)
        .map(new MyMapFunction());
    

    如果您发现有漏数据的情况,可能是由于某些原因导致Flink无法正常连接到MySQL或无法读取到完整的binlog信息。您可以检查以下几点:

    1. 确保MySQL的binlog格式已经设置为ROW,并且Flink中的配置与MySQL一致。
    2. 确保Flink能够正常连接到MySQL,并且具有足够的权限来读取binlog信息。
    3. 如果使用的是MySQL 5.7及以上版本,请确保开启了并行复制功能,否则可能会导致部分binlog信息无法被读取到。可以在MySQL的配置文件中添加以下内容来开启并行复制功能:
    [mysqld]
    gtid_mode=ON
    enforce_gtid_consistency=ON
    
    2024-01-01 12:44:51
    赞同 展开评论 打赏
  • Flink CDC全库订阅binlog的配置可以参考以下步骤:

    1. 引入Flink CDC依赖

    在项目的pom.xml文件中添加如下依赖:

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-mysql-cdc</artifactId>
        <version>2.1.0</version>
    </dependency>
    
    1. 创建Flink SQL执行环境
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
    
    public class FlinkCDCDemo {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        }
    }
    
    1. 注册MySQL源表

    ```java
    import org.apache.flink.table.api.DataTypes;
    import org.apache.flink.table.descriptors.*;
    import org.apache.flink.table.sources.tsextractors.RowtimeAttributeDescriptor;
    import org.apache.flink.types.Row;

    public class FlinkCDCDemo {
    public static void main(String[] args) throws Exception {
    // ... 创建Flink SQL执行环境和表环境的代码 ...

        // 注册MySQL源表
        tableEnv.executeSql("""
            CREATE TABLE my_mysql (
                id INT,
                name STRING,
                age INT,
                create_time TIMESTAMP(3),
                update_time TIMESTAMP(3),
                PRIMARY KEY (id) NOT ENFORCED
            ) WITH (
                'connector' = 'mysql-cdc', -- 使用MySQL CDC连接器
                'hostname' = 'localhost', -- MySQL服务器地址
                'port' = '3306', -- MySQL服务器端口
                'username' = 'root', -- MySQL用户名
                'password' = 'password', -- MySQL密码
                'database-name' = 'test', -- 需要订阅的数据库名
                'table-name' = 'my_table', -- 需要订阅的表名
                'scan.startup.mode' = 'latest-offset', -- 从最新的位置开始消费数据,避免重复消费和丢失数据
                'debezium-internal.offset-storage' = 'org.apache.kafka.connect.storage.FileOffsetBackingStore', -- 使用Kafka作为Offset存储后端,以便在发生故障时恢复消费位置
                'debezium-internal.history' = 'io.debezium.relational.history.FileDatabaseHistory', -- 使用文件系统作为历史记录存储后端,以便在发生故障时恢复消费位置
                'debezium-sqlserver-heartbeat-interval' = '60000', -- 设置心跳间隔,默认为60秒,可以根据实际需求调整
                'debezium-sqlserver-paging-size' = '5000', -- 设置分页大小,默认为5000条记录,可以根据实际需求调整
                'debezium-sqlserver-max-queued-messages' = '1000', -- 设置最大队列消息数,默认为1000条,可以根据实际需求调整
                'debezium-sqlserver-snapshot-mode' = 'initial', -- 设置快照模式,默认为initial,可以根据实际需求调整
                'format.type' = 'json', -- 设置数据格式类型,默认为json,可以根据实际需求调整
                'debezium-sqlserver-include-schema-changes' = 'false', -- 是否包含模式更改事件,默认为false,可以根据实际需求调整
                'debezium-sqlserver-exclude-tables' = 'dbo.__RefactorLog, dbo.__RefactorTable, dbo.__SysAgentEvents, dbo.__SysSchemas, dbo.__SysObjects, dbo.__Triggers, dbo.__Views, dbo.__XmlSchemaCollections, dbo.__XmlTableCollections, sysdiagrams, sysschobjs, syscomments, sysdepends, sysfiles, syscolpars, sysconstraints, sysstatgroups, sysstats, sysusers, sysallocunits, syslanguages, sysremotelogins, sysfts_configurations, sysfts_templates, sysfts_contents, syspartitionstats, sysfilegroups, sysindexes, syscolumns, sysdenypermissions, sysrolemembership, syslogins, sysprocedures, syssequenceobjects, sysremoteprocedures, systypes, sysuserdefinedtabletypes, sysuserdefinedaggregates, sysrulesets, sysforeignkeys, syssqlmodules, sysparamsets, sysxlogins, sysxmlextensions', -- 排除不需要订阅的表和视图,可以根据实际需求调整
            ) WITH (
                'connector' = 'jdbc', -- 使用JDBC连接器,以便将数据转换为Flink支持的数据类型和格式
                'url' = 'jdbc:mysql://localhost:3306/test?useSSL=false&serverTimezone=UTC', -- JDBC连接URL,根据实际情况修改数据库地址、端口和时区等参数
                'table-name' = 'my_table', -- 需要订阅的表名,与上面的MySQL源表保持一致,以便正确映射到对应的源表和目标表
                'username' = 'root', -- JDBC用户名,根据实际情况修改数据库用户名和密码等参数
                'password' = 'password', -- JDBC密码,根据实际情况修改数据库用户名和密码等参数
                'driver-name' = 'com.mysql.cj.jdbc.Driver', -- JDBC驱动名称,根据实际情况修改数据库驱动名称和版本等参数,
    
    2024-01-01 10:09:28
    赞同 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关产品

  • 实时计算 Flink版
  • 相关电子书

    更多
    Flink峰会 - 徐榜江 立即下载
    Flink CDC Meetup PPT - 龚中强 立即下载
    PolarDB-X 2.0 全局 Binlog 与备份恢复能 立即下载