开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

各位大佬,请教个问题,使用flink cdc读取数据时,如果配置一个表,数据过滤是发生在server

各位大佬,请教个问题,使用flink cdc读取数据时,如果配置一个表,数据过滤是发生在server端,即只读取一个表,发送一个表的数据;还是读取整个库的数据,发送到client端,然后在client端过滤出配置的表。

展开
收起
雪哥哥 2022-11-13 20:18:52 1809 0
3 条回答
写回答
取消 提交回答
  • GitHub https://github.com/co63oc/cloud

    配置在server端那应该就是server端过滤

    2022-11-24 16:35:39
    赞同 展开评论 打赏
  • 十年摸盘键,代码未曾试。 今日码示君,谁有上云事。

    CDC 的全称是 Change Data Capture ,在广义的概念上,只要是能捕获数据变更的技术,我们都可以称之为 CDC。 目前通常描述的 CDC 技术主要面向数据库的变更,是一种用于捕获数据库中数据变更的技术。CDC 技术的应用场景非常广泛;

    Flink 的 cdc 是基于日志的:实时消费日志,流处理,例如 MySQL 的 binlog 日志完整记录了数据库中的变更,可以把 binlog文件当作流的数据源;保障数据一致性,因为 binlog 文件包含了所有历史变更明细;保障实时性,因为类似 binlog的日志文件是可以流式消费的,提供的是实时数据。

    目前使用 Flink CDC Connector 做数据同步时,每个表都需要建立一个数据库连接,在多表、整库同步等场景下,对数据库实例的压力非常大,Oceanus 引入了多 source 复用的优化来解决这种问题。

    2022-11-23 14:29:41
    赞同 展开评论 打赏
  • 天下风云出我辈,一入江湖岁月催,皇图霸业谈笑中,不胜人生一场醉。

    全量读取 全量读取某个数据库中的所有库中的所有表的Binlog方式代码如下: public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
        Properties properties = new Properties();
    
        // 配置 Debezium在初始化快照的时候(扫描历史数据的时候) =》 不要锁表
        properties.setProperty("debezium.snapshot.locking.mode", "none");
    
        env.enableCheckpointing(TimeUnit.SECONDS.toMillis(5));
    
        CheckpointConfig checkpointConfig = env.getCheckpointConfig();
    
        //最大同时存在的ck数 和设置的间隔时间有一个就行
        checkpointConfig.setMaxConcurrentCheckpoints(1);
        //超时时间
        checkpointConfig.setCheckpointTimeout(TimeUnit.SECONDS.toMillis(5));
        //2.3 指定从CK自动重启策略
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 5000L));
        //2.4 设置任务关闭的时候保留最后一次CK数据
        checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
    
        DebeziumSourceFunction<String> MysqlSource = MySQLSource.<String>builder()
                .hostname("100.21.112.11")
                .port(3306)
                .deserializer(new MyDeserializationSchema()) //去参数里面找找实现类
                .username("root")
                .password("xxxx")
                .startupOptions(StartupOptions.latest())// 读取binlog策略 这个启动选项有五种
                .debeziumProperties(properties) //配置不要锁表 但是数据一致性不是精准一次 会变成最少一次
                .build();
    
        /*
         *  .startupOptions(StartupOptions.latest()) 参数配置
         *  1.initial() 全量扫描并且继续读取最新的binlog 最佳实践是第一次使用这个
         *  2.earliest() 从binlog的开头开始读取 就是啥时候开的binlog就从啥时候读
         *  3.latest() 从最新的binlog开始读取
         *  4.specificOffset(String specificOffsetFile, int specificOffsetPos) 指定offset读取
         *  5.timestamp(long startupTimestampMillis) 指定时间戳读取
         * */
    
        env.addSource(MysqlSource).print();
    
        env.execute("flink-cdc");
    
    }
    
    
    public static class MyDeserializationSchema implements DebeziumDeserializationSchema<String> {
    
        @Override
        public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
            Struct value = (Struct) sourceRecord.value();
            Struct after = value.getStruct("after");
            Struct source = value.getStruct("source");
    
            String db = source.getString("db");//库名
            String table = source.getString("table");//表名
    
            //获取操作类型 直接将参数穿进去 会自己解析出来 里面是个enum对应每个操作
            /* READ("r"),
               CREATE("c"),
                UPDATE("u"),
                DELETE("d");*/
            Envelope.Operation operation = Envelope.operationFor(sourceRecord);
            String opstr = operation.toString().toLowerCase();
            //类型修正 会把insert识别成create
            if (opstr.equals("create")) {
                opstr = "insert";
            }
    
            //获取after结构体里面的表数据,封装成json输出
            JSONObject json1 = new JSONObject();
            JSONObject json2 = new JSONObject();
            //加个判空
            if (after != null) {
                List<Field> data = after.schema().fields(); //获取结构体
                for (Field field : data) {
                    String name = field.name(); //结构体的名字
                    Object value2 = after.get(field);//结构体的字段值
                    //放进json2里面去 json2放到json1里面去
                    json2.put(name, value2);
                }
            }
    
    
            //整理成大json串输出
            json1.put("db", db);
            json1.put("table", table);
            json1.put("data", json2);
            json1.put("type", opstr);
    
            collector.collect(json1.toJSONString());
    
        }
    
        @Override
        public TypeInformation<String> getProducedType() {
            return TypeInformation.of(String.class);
        }
    }
    

    全量读取某个数据库指定DB中的所有表 可以在build之前 ,添加一个

    databaseList,用来指定特定的DB public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
        Properties properties = new Properties();
    
        // 配置 Debezium在初始化快照的时候(扫描历史数据的时候) =》 不要锁表
        properties.setProperty("debezium.snapshot.locking.mode", "none");
    
        env.enableCheckpointing(TimeUnit.SECONDS.toMillis(5));
    
        CheckpointConfig checkpointConfig = env.getCheckpointConfig();
    
        //最大同时存在的ck数 和设置的间隔时间有一个就行
        checkpointConfig.setMaxConcurrentCheckpoints(1);
        //超时时间
        checkpointConfig.setCheckpointTimeout(TimeUnit.SECONDS.toMillis(5));
        //2.3 指定从CK自动重启策略
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 5000L));
        //2.4 设置任务关闭的时候保留最后一次CK数据
        checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
    
        DebeziumSourceFunction<String> MysqlSource = MySQLSource.<String>builder()
                .hostname("100.21.112.11")
                .port(3306)
                .deserializer(new MyDeserializationSchema()) //去参数里面找找实现类
                .username("root")
                .password("xxxx")
                .databaseList("horse") // 指定某个特定的库
                .startupOptions(StartupOptions.latest())// 读取binlog策略 这个启动选项有五种
                .debeziumProperties(properties) //配置不要锁表 但是数据一致性不是精准一次 会变成最少一次
                .build();
    
        /*
         *  .startupOptions(StartupOptions.latest()) 参数配置
         *  1.initial() 全量扫描并且继续读取最新的binlog 最佳实践是第一次使用这个
         *  2.earliest() 从binlog的开头开始读取 就是啥时候开的binlog就从啥时候读
         *  3.latest() 从最新的binlog开始读取
         *  4.specificOffset(String specificOffsetFile, int specificOffsetPos) 指定offset读取
         *  5.timestamp(long startupTimestampMillis) 指定时间戳读取
         * */
    
        env.addSource(MysqlSource).print();
    
        env.execute("flink-cdc");
    
    }
    
    
    public static class MyDeserializationSchema implements DebeziumDeserializationSchema<String> {
    
        @Override
        public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
            Struct value = (Struct) sourceRecord.value();
            Struct after = value.getStruct("after");
            Struct source = value.getStruct("source");
    
            String db = source.getString("db");//库名
            String table = source.getString("table");//表名
    
            //获取操作类型 直接将参数穿进去 会自己解析出来 里面是个enum对应每个操作
            /* READ("r"),
               CREATE("c"),
                UPDATE("u"),
                DELETE("d");*/
            Envelope.Operation operation = Envelope.operationFor(sourceRecord);
            String opstr = operation.toString().toLowerCase();
            //类型修正 会把insert识别成create
            if (opstr.equals("create")) {
                opstr = "insert";
            }
    
            //获取after结构体里面的表数据,封装成json输出
            JSONObject json1 = new JSONObject();
            JSONObject json2 = new JSONObject();
            //加个判空
            if (after != null) {
                List<Field> data = after.schema().fields(); //获取结构体
                for (Field field : data) {
                    String name = field.name(); //结构体的名字
                    Object value2 = after.get(field);//结构体的字段值
                    //放进json2里面去 json2放到json1里面去
                    json2.put(name, value2);
                }
            }
    
    
            //整理成大json串输出
            json1.put("db", db);
            json1.put("table", table);
            json1.put("data", json2);
            json1.put("type", opstr);
    
            collector.collect(json1.toJSONString());
    
        }
    
        @Override
        public TypeInformation<String> getProducedType() {
            return TypeInformation.of(String.class);
        }
    }
    

    全量读取某个数据库指定DB中的指定表 可以在build之前 ,添加一个

    tableList,用来指定特定的DB中的特定表 public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
        Properties properties = new Properties();
    
        // 配置 Debezium在初始化快照的时候(扫描历史数据的时候) =》 不要锁表
        properties.setProperty("debezium.snapshot.locking.mode", "none");
    
        env.enableCheckpointing(TimeUnit.SECONDS.toMillis(5));
    
        CheckpointConfig checkpointConfig = env.getCheckpointConfig();
    
        //最大同时存在的ck数 和设置的间隔时间有一个就行
        checkpointConfig.setMaxConcurrentCheckpoints(1);
        //超时时间
        checkpointConfig.setCheckpointTimeout(TimeUnit.SECONDS.toMillis(5));
        //2.3 指定从CK自动重启策略
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 5000L));
        //2.4 设置任务关闭的时候保留最后一次CK数据
        checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
    
        DebeziumSourceFunction<String> MysqlSource = MySQLSource.<String>builder()
                .hostname("100.21.112.11")
                .port(3306)
                .deserializer(new MyDeserializationSchema()) //去参数里面找找实现类
                .username("root")
                .password("xxxx")
                .databaseList("horse") // 指定某个特定的库
                .tableList("horse.t_dri_info") //指定特定的表
                .startupOptions(StartupOptions.latest())// 读取binlog策略 这个启动选项有五种
                .debeziumProperties(properties) //配置不要锁表 但是数据一致性不是精准一次 会变成最少一次
                .build();
    
        /*
         *  .startupOptions(StartupOptions.latest()) 参数配置
         *  1.initial() 全量扫描并且继续读取最新的binlog 最佳实践是第一次使用这个
         *  2.earliest() 从binlog的开头开始读取 就是啥时候开的binlog就从啥时候读
         *  3.latest() 从最新的binlog开始读取
         *  4.specificOffset(String specificOffsetFile, int specificOffsetPos) 指定offset读取
         *  5.timestamp(long startupTimestampMillis) 指定时间戳读取
         * */
    
        env.addSource(MysqlSource).print();
    
        env.execute("flink-cdc");
    
    }
    
    
    public static class MyDeserializationSchema implements DebeziumDeserializationSchema<String> {
    
        @Override
        public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
            Struct value = (Struct) sourceRecord.value();
            Struct after = value.getStruct("after");
            Struct source = value.getStruct("source");
    
            String db = source.getString("db");//库名
            String table = source.getString("table");//表名
    
            //获取操作类型 直接将参数穿进去 会自己解析出来 里面是个enum对应每个操作
            /* READ("r"),
               CREATE("c"),
                UPDATE("u"),
                DELETE("d");*/
            Envelope.Operation operation = Envelope.operationFor(sourceRecord);
            String opstr = operation.toString().toLowerCase();
            //类型修正 会把insert识别成create
            if (opstr.equals("create")) {
                opstr = "insert";
            }
    
            //获取after结构体里面的表数据,封装成json输出
            JSONObject json1 = new JSONObject();
            JSONObject json2 = new JSONObject();
            //加个判空
            if (after != null) {
                List<Field> data = after.schema().fields(); //获取结构体
                for (Field field : data) {
                    String name = field.name(); //结构体的名字
                    Object value2 = after.get(field);//结构体的字段值
                    //放进json2里面去 json2放到json1里面去
                    json2.put(name, value2);
                }
            }
    
    
            //整理成大json串输出
            json1.put("db", db);
            json1.put("table", table);
            json1.put("data", json2);
            json1.put("type", opstr);
    
            collector.collect(json1.toJSONString());
    
        }
    
        @Override
        public TypeInformation<String> getProducedType() {
            return TypeInformation.of(String.class);
        }
    }
    
    2022-11-23 11:45:15
    赞同 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关产品

  • 实时计算 Flink版
  • 相关电子书

    更多
    Flink CDC Meetup PPT - 龚中强 立即下载
    Flink CDC Meetup PPT - 王赫 立即下载
    Flink CDC Meetup PPT - 覃立辉 立即下载