各位大佬,请教个问题,使用flink cdc读取数据时,如果配置一个表,数据过滤是发生在server端,即只读取一个表,发送一个表的数据;还是读取整个库的数据,发送到client端,然后在client端过滤出配置的表。
配置在server端那应该就是server端过滤
CDC 的全称是 Change Data Capture ,在广义的概念上,只要是能捕获数据变更的技术,我们都可以称之为 CDC。 目前通常描述的 CDC 技术主要面向数据库的变更,是一种用于捕获数据库中数据变更的技术。CDC 技术的应用场景非常广泛;
Flink 的 cdc 是基于日志的:实时消费日志,流处理,例如 MySQL 的 binlog 日志完整记录了数据库中的变更,可以把 binlog文件当作流的数据源;保障数据一致性,因为 binlog 文件包含了所有历史变更明细;保障实时性,因为类似 binlog的日志文件是可以流式消费的,提供的是实时数据。
目前使用 Flink CDC Connector 做数据同步时,每个表都需要建立一个数据库连接,在多表、整库同步等场景下,对数据库实例的压力非常大,Oceanus 引入了多 source 复用的优化来解决这种问题。
全量读取 全量读取某个数据库中的所有库中的所有表的Binlog方式代码如下: public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
// 配置 Debezium在初始化快照的时候(扫描历史数据的时候) =》 不要锁表
properties.setProperty("debezium.snapshot.locking.mode", "none");
env.enableCheckpointing(TimeUnit.SECONDS.toMillis(5));
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
//最大同时存在的ck数 和设置的间隔时间有一个就行
checkpointConfig.setMaxConcurrentCheckpoints(1);
//超时时间
checkpointConfig.setCheckpointTimeout(TimeUnit.SECONDS.toMillis(5));
//2.3 指定从CK自动重启策略
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 5000L));
//2.4 设置任务关闭的时候保留最后一次CK数据
checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
DebeziumSourceFunction<String> MysqlSource = MySQLSource.<String>builder()
.hostname("100.21.112.11")
.port(3306)
.deserializer(new MyDeserializationSchema()) //去参数里面找找实现类
.username("root")
.password("xxxx")
.startupOptions(StartupOptions.latest())// 读取binlog策略 这个启动选项有五种
.debeziumProperties(properties) //配置不要锁表 但是数据一致性不是精准一次 会变成最少一次
.build();
/*
* .startupOptions(StartupOptions.latest()) 参数配置
* 1.initial() 全量扫描并且继续读取最新的binlog 最佳实践是第一次使用这个
* 2.earliest() 从binlog的开头开始读取 就是啥时候开的binlog就从啥时候读
* 3.latest() 从最新的binlog开始读取
* 4.specificOffset(String specificOffsetFile, int specificOffsetPos) 指定offset读取
* 5.timestamp(long startupTimestampMillis) 指定时间戳读取
* */
env.addSource(MysqlSource).print();
env.execute("flink-cdc");
}
public static class MyDeserializationSchema implements DebeziumDeserializationSchema<String> {
@Override
public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
Struct value = (Struct) sourceRecord.value();
Struct after = value.getStruct("after");
Struct source = value.getStruct("source");
String db = source.getString("db");//库名
String table = source.getString("table");//表名
//获取操作类型 直接将参数穿进去 会自己解析出来 里面是个enum对应每个操作
/* READ("r"),
CREATE("c"),
UPDATE("u"),
DELETE("d");*/
Envelope.Operation operation = Envelope.operationFor(sourceRecord);
String opstr = operation.toString().toLowerCase();
//类型修正 会把insert识别成create
if (opstr.equals("create")) {
opstr = "insert";
}
//获取after结构体里面的表数据,封装成json输出
JSONObject json1 = new JSONObject();
JSONObject json2 = new JSONObject();
//加个判空
if (after != null) {
List<Field> data = after.schema().fields(); //获取结构体
for (Field field : data) {
String name = field.name(); //结构体的名字
Object value2 = after.get(field);//结构体的字段值
//放进json2里面去 json2放到json1里面去
json2.put(name, value2);
}
}
//整理成大json串输出
json1.put("db", db);
json1.put("table", table);
json1.put("data", json2);
json1.put("type", opstr);
collector.collect(json1.toJSONString());
}
@Override
public TypeInformation<String> getProducedType() {
return TypeInformation.of(String.class);
}
}
全量读取某个数据库指定DB中的所有表 可以在build之前 ,添加一个
databaseList,用来指定特定的DB public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
// 配置 Debezium在初始化快照的时候(扫描历史数据的时候) =》 不要锁表
properties.setProperty("debezium.snapshot.locking.mode", "none");
env.enableCheckpointing(TimeUnit.SECONDS.toMillis(5));
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
//最大同时存在的ck数 和设置的间隔时间有一个就行
checkpointConfig.setMaxConcurrentCheckpoints(1);
//超时时间
checkpointConfig.setCheckpointTimeout(TimeUnit.SECONDS.toMillis(5));
//2.3 指定从CK自动重启策略
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 5000L));
//2.4 设置任务关闭的时候保留最后一次CK数据
checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
DebeziumSourceFunction<String> MysqlSource = MySQLSource.<String>builder()
.hostname("100.21.112.11")
.port(3306)
.deserializer(new MyDeserializationSchema()) //去参数里面找找实现类
.username("root")
.password("xxxx")
.databaseList("horse") // 指定某个特定的库
.startupOptions(StartupOptions.latest())// 读取binlog策略 这个启动选项有五种
.debeziumProperties(properties) //配置不要锁表 但是数据一致性不是精准一次 会变成最少一次
.build();
/*
* .startupOptions(StartupOptions.latest()) 参数配置
* 1.initial() 全量扫描并且继续读取最新的binlog 最佳实践是第一次使用这个
* 2.earliest() 从binlog的开头开始读取 就是啥时候开的binlog就从啥时候读
* 3.latest() 从最新的binlog开始读取
* 4.specificOffset(String specificOffsetFile, int specificOffsetPos) 指定offset读取
* 5.timestamp(long startupTimestampMillis) 指定时间戳读取
* */
env.addSource(MysqlSource).print();
env.execute("flink-cdc");
}
public static class MyDeserializationSchema implements DebeziumDeserializationSchema<String> {
@Override
public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
Struct value = (Struct) sourceRecord.value();
Struct after = value.getStruct("after");
Struct source = value.getStruct("source");
String db = source.getString("db");//库名
String table = source.getString("table");//表名
//获取操作类型 直接将参数穿进去 会自己解析出来 里面是个enum对应每个操作
/* READ("r"),
CREATE("c"),
UPDATE("u"),
DELETE("d");*/
Envelope.Operation operation = Envelope.operationFor(sourceRecord);
String opstr = operation.toString().toLowerCase();
//类型修正 会把insert识别成create
if (opstr.equals("create")) {
opstr = "insert";
}
//获取after结构体里面的表数据,封装成json输出
JSONObject json1 = new JSONObject();
JSONObject json2 = new JSONObject();
//加个判空
if (after != null) {
List<Field> data = after.schema().fields(); //获取结构体
for (Field field : data) {
String name = field.name(); //结构体的名字
Object value2 = after.get(field);//结构体的字段值
//放进json2里面去 json2放到json1里面去
json2.put(name, value2);
}
}
//整理成大json串输出
json1.put("db", db);
json1.put("table", table);
json1.put("data", json2);
json1.put("type", opstr);
collector.collect(json1.toJSONString());
}
@Override
public TypeInformation<String> getProducedType() {
return TypeInformation.of(String.class);
}
}
全量读取某个数据库指定DB中的指定表 可以在build之前 ,添加一个
tableList,用来指定特定的DB中的特定表 public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
// 配置 Debezium在初始化快照的时候(扫描历史数据的时候) =》 不要锁表
properties.setProperty("debezium.snapshot.locking.mode", "none");
env.enableCheckpointing(TimeUnit.SECONDS.toMillis(5));
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
//最大同时存在的ck数 和设置的间隔时间有一个就行
checkpointConfig.setMaxConcurrentCheckpoints(1);
//超时时间
checkpointConfig.setCheckpointTimeout(TimeUnit.SECONDS.toMillis(5));
//2.3 指定从CK自动重启策略
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 5000L));
//2.4 设置任务关闭的时候保留最后一次CK数据
checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
DebeziumSourceFunction<String> MysqlSource = MySQLSource.<String>builder()
.hostname("100.21.112.11")
.port(3306)
.deserializer(new MyDeserializationSchema()) //去参数里面找找实现类
.username("root")
.password("xxxx")
.databaseList("horse") // 指定某个特定的库
.tableList("horse.t_dri_info") //指定特定的表
.startupOptions(StartupOptions.latest())// 读取binlog策略 这个启动选项有五种
.debeziumProperties(properties) //配置不要锁表 但是数据一致性不是精准一次 会变成最少一次
.build();
/*
* .startupOptions(StartupOptions.latest()) 参数配置
* 1.initial() 全量扫描并且继续读取最新的binlog 最佳实践是第一次使用这个
* 2.earliest() 从binlog的开头开始读取 就是啥时候开的binlog就从啥时候读
* 3.latest() 从最新的binlog开始读取
* 4.specificOffset(String specificOffsetFile, int specificOffsetPos) 指定offset读取
* 5.timestamp(long startupTimestampMillis) 指定时间戳读取
* */
env.addSource(MysqlSource).print();
env.execute("flink-cdc");
}
public static class MyDeserializationSchema implements DebeziumDeserializationSchema<String> {
@Override
public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
Struct value = (Struct) sourceRecord.value();
Struct after = value.getStruct("after");
Struct source = value.getStruct("source");
String db = source.getString("db");//库名
String table = source.getString("table");//表名
//获取操作类型 直接将参数穿进去 会自己解析出来 里面是个enum对应每个操作
/* READ("r"),
CREATE("c"),
UPDATE("u"),
DELETE("d");*/
Envelope.Operation operation = Envelope.operationFor(sourceRecord);
String opstr = operation.toString().toLowerCase();
//类型修正 会把insert识别成create
if (opstr.equals("create")) {
opstr = "insert";
}
//获取after结构体里面的表数据,封装成json输出
JSONObject json1 = new JSONObject();
JSONObject json2 = new JSONObject();
//加个判空
if (after != null) {
List<Field> data = after.schema().fields(); //获取结构体
for (Field field : data) {
String name = field.name(); //结构体的名字
Object value2 = after.get(field);//结构体的字段值
//放进json2里面去 json2放到json1里面去
json2.put(name, value2);
}
}
//整理成大json串输出
json1.put("db", db);
json1.put("table", table);
json1.put("data", json2);
json1.put("type", opstr);
collector.collect(json1.toJSONString());
}
@Override
public TypeInformation<String> getProducedType() {
return TypeInformation.of(String.class);
}
}
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。