大家有测试过 一个脚本采用flink cdc 同步mysql 能同时同步多少表吗 ?
在 OLTP 系统中,为了解决单表数据量大的问题,通常采用分库分表的方式将单个大表进行拆分以提高系统的吞吐量。 但是为了方便数据分析,通常需要将分库分表拆分出的表在同步到数据仓库、数据湖时,再合并成一个大表。
进入 MySQL 容器中: 创建两个不同的数据库,并在每个数据库中创建两个表,作为 user 表分库分表下拆分出的表。
使用 Flink DDL 创建表 1. Flink sql 开启 checkpoint
2. Flink sql 创建 MySQL 分库分表 source 表
3.Flink sql 创建 Iceberg sink 表
流式写入 Iceberg 1. 使用Flink SQL 语句将数据从 MySQL 写入 Iceberg 中,就会启动一个流式作业,源源不断将 MySQL 数据库中的全量和增量数据同步到 Iceberg 中。
然后就可以使用命令看到 Iceberg 中的写入的文件,使用Flink SQL 语句查询表 all_users_sink 中的数据,修改 MySQL 中表的数据,Iceberg 中的表 all_users_sink 中的数据也将实时更新,上面就是介绍了使用 Flink CDC 同步 MySQL 分库分表的数据,快速构建 Icberg 实时数据湖。
MySQL 存储的数据量大了之后往往会出现查询性能下降的问题,这时候通过 Flink SQL 里的 MySQL CDC Connector 将数据同步到其他数据存储是常见的一种处理方式。
例如 CDC 到 ES 实现数据检索,CDC 到 ClikHouse 进行 OLAP 分析,CDC 到 Kafka 实现数据同步等,然而目前官方 MySQL CDC Connector 还无法实现动态同步表结构,如果新增字段,则下游无法收到新增字段的数据,如果删除字段,那 Flink 任务将会报错退出,需要修改 SQL 后才能正常启动。
mysql-cdc支撑正则表达式的库名表名来匹配多个库多个表来获取分库分表情况下的mysql数据。只需要在创建flink源表时在数据库和表名上使用正则匹配即可。
java调用sql(也可以直接在flinksql客户端执行其中的sql)。
执行后,flink会启动任务将存量数据同步到目标表,并且如果增量修改数据也会被同步过去,可以修改源表数据后再查看目标表中的数据库是否变化。
按正常来说一个job只能同步一张表,但是可以动态同步,主要难度在于同步表结构,如果高版本的支持多表同步的话也可以尝试一下,这里解决一下同步表结构的问题 首先我们需要实现自己的 DebeziumDeserializationSchema,这里实现了一个名为 JsonStringDebeziumDeserializationSchema 的简单示例,实现将 binlog 数据转换为 JSON,在实际业务中可以根据业务需求实现更个性化的操作,例如向下游发送自定义的 Schema 变更通知等等。
public class JsonStringDebeziumDeserializationSchema implements DebeziumDeserializationSchema {
@Override public void deserialize(SourceRecord record, Collector out) throws Exception { Envelope.Operation op = Envelope.operationFor(record); Struct value = (Struct) record.value(); Schema valueSchema = record.valueSchema(); if (op == Envelope.Operation.CREATE || op == Envelope.Operation.READ) { String insert = extractAfterRow(value, valueSchema); out.collect(new Tuple2<>(true, insert)); } else if (op == Envelope.Operation.DELETE) { String delete = extractBeforeRow(value, valueSchema); out.collect(new Tuple2<>(false, delete)); }else { String after = extractAfterRow(value, valueSchema); out.collect(new Tuple2<>(true, after)); } }
private Map<String,Object> getRowMap(Struct after){ return after.schema().fields().stream().collect(Collectors.toMap(Field::name,f->after.get(f))); }
private String extractAfterRow(Struct value, Schema valueSchema) throws Exception { Struct after = value.getStruct(Envelope.FieldName.AFTER); Map<String,Object> rowMap = getRowMap(after); ObjectMapper objectMapper = new ObjectMapper(); return objectMapper.writeValueAsString(rowMap); } private String extractBeforeRow(Struct value, Schema valueSchema) throws Exception { Struct after = value.getStruct(Envelope.FieldName.BEFORE); Map<String,Object> rowMap = getRowMap(after); ObjectMapper objectMapper = new ObjectMapper(); return objectMapper.writeValueAsString(rowMap); }
@Override public TypeInformation getProducedType() { return TypeInformation.of(new TypeHint<Tuple2<Boolean,String>>(){}); } }
实现 DebeziumDeserializationSchema 需要实现 deserialize、getProducedType 两个函数。deserialize 实现转换数据的逻辑,getProducedType 定义返回的类型,这里返回两个参数,第一个Boolean 类型的参数表示数据是 upsert 或是 delete,第二个参数返回转换后的 JSON string,这里的 JSON 将会包含 Schema 变更后的 Column 与对应的 Value。
编写启动 Main 函数,将我们自定义的 DebeziumDeserializationSchema 实现设置到 SourceFunction 中
public class MySQLCDC{ public static void main(String[] args) throws Exception { ParameterTool params = ParameterTool.fromArgs(args); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 关闭 Operator Chaining, 令运行图更容易初学者理解 env.disableOperatorChaining(); env.setParallelism(1); //checkpoint的一些配置 env.enableCheckpointing(params.getInt("checkpointInterval",60000)); env.getCheckpointConfig().setCheckpointTimeout(5000); env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); SourceFunction source = MySQLSource.builder() .hostname(params.get("hostname","127.0.0.1")) .port(params.getInt("port",3306)) .username(params.get("username","root")) .password(params.get("password","")) .serverTimeZone("Asia/Shanghai") //设置我们自己的实现 .deserializer(new JsonStringDebeziumDeserializationSchema()) .databaseList(params.get("databaseList","test")) .tableList(params.get("tableList","test.my_test")) .build(); // 定义数据源 DataStream<Tuple2<Boolean, String>> streamSource = env.addSource(source).name("MySQLSource"); ... env.execute(MySQLCDC.class.getSimpleName()); } }
建立测试数据库,并插入几条数据
CREATE TABLE my_test
( f_sequence
int(11) DEFAULT NULL, f_random
int(11) DEFAULT NULL, f_random_str
varchar(255) NOT NULL DEFAULT '', Name
varchar(255) DEFAULT '', f_date
date DEFAULT NULL, f_datetime
datetime DEFAULT NULL, f_timestamp
bigint(20) DEFAULT NULL, PRIMARY KEY (f_random_str
) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 这个时候运行程序,已经可以看到一些输出了。
Schema 变更前输出:
(true,{"f_date":18545,"f_random_str":"1","f_sequence":1,"f_timestamp":1630486762,"f_datetime":1602328271000,"f_random":1,"Name":"1"}) (true,{"f_date":18545,"f_random_str":"2","f_sequence":2,"f_timestamp":1630486762,"f_datetime":1602328271000,"f_random":2,"Name":"2"}) (true,{"f_date":18545,"f_random_str":"3","f_sequence":3,"f_timestamp":1630486762,"f_datetime":1602328271000,"f_random":3,"Name":"3"}) (true,{"f_date":18545,"f_random_str":"4","f_sequence":33333,"f_timestamp":1630486762,"f_datetime":1602328271000,"f_random":4,"Name":"3"}) 但是与数据库对比可以发现,这里的时间戳与数据库时间刚好相差了 8 个小时
f_sequence|f_random|f_random_str|Name|f_date |f_datetime |f_timestamp| ----------+--------+------------+----+----------+-------------------+-----------+ 1| 1|1 |1 |2020-10-10|2020-10-10 11:11:11| 1630486762| 2| 2|2 |2 |2020-10-10|2020-10-10 11:11:11| 1630486762| 3| 3|3 |3 |2020-10-10|2020-10-10 11:11:11| 1630486762| 33333| 4|4 |3 |2020-10-10|2020-10-10 11:11:11| 1630486762| 说明我们启动时设置的 .serverTimeZone("Asia/Shanghai") 并没有生效,查源码可以发现,底层的 Debezium 并没有实现 serverTimeZone 的配置,相应的转换是在 RowDataDebeziumDeserializeSchema 内实现的,源码如下:
private TimestampData convertToTimestamp(Object dbzObj, Schema schema) { if (dbzObj instanceof Long) { switch (schema.name()) { case Timestamp.SCHEMA_NAME: return TimestampData.fromEpochMillis((Long) dbzObj); case MicroTimestamp.SCHEMA_NAME: long micro = (long) dbzObj; return TimestampData.fromEpochMillis(micro / 1000, (int) (micro % 1000 * 1000)); case NanoTimestamp.SCHEMA_NAME: long nano = (long) dbzObj; return TimestampData.fromEpochMillis(nano / 1000_000, (int) (nano % 1000_000)); } } //这里的serverTimeZone来自于Bean构造函数传入的配置项 LocalDateTime localDateTime = TemporalConversions.toLocalDateTime(dbzObj, serverTimeZone); return TimestampData.fromLocalDateTime(localDateTime); }
因此如果要实现完整的功能,那么我们自己实现的 JsonStringDebeziumDeserializationSchema 也需要包含对应的 Converter,最终代码如下:
public class JsonStringDebeziumDeserializationSchema implements DebeziumDeserializationSchema {
public JsonStringDebeziumDeserializationSchema(int zoneOffset) { //实现一个用于转换时间的Converter
this.runtimeConverter = (dbzObj,schema) -> { if(schema.name() != null){ switch (schema.name()) { case Timestamp.SCHEMA_NAME: return TimestampData.fromEpochMillis((Long) dbzObj).toLocalDateTime().atOffset(ZoneOffset.ofHours(zoneOffset)).format(DateTimeFormatter.ISO_OFFSET_DATE_TIME); case MicroTimestamp.SCHEMA_NAME: long micro = (long) dbzObj; return TimestampData.fromEpochMillis(micro / 1000, (int) (micro % 1000 * 1000)).toLocalDateTime().atOffset(ZoneOffset.ofHours(zoneOffset)).format(DateTimeFormatter.ISO_OFFSET_DATE_TIME); case NanoTimestamp.SCHEMA_NAME: long nano = (long) dbzObj; return TimestampData.fromEpochMillis(nano / 1000_000, (int) (nano % 1000_000)).toLocalDateTime().atOffset(ZoneOffset.ofHours(zoneOffset)).format(DateTimeFormatter.ISO_OFFSET_DATE_TIME); case Date.SCHEMA_NAME: return TemporalConversions.toLocalDate(dbzObj).format(DateTimeFormatter.ISO_LOCAL_DATE); } } return dbzObj; }; }
//定义接口
private interface DeserializationRuntimeConverter extends Serializable {
Object convert(Object dbzObj, Schema schema);
}
private final JsonStringDebeziumDeserializationSchema.DeserializationRuntimeConverter runtimeConverter;
private Map<String,Object> getRowMap(Struct after){
//转换时使用对应的转换器
return after.schema().fields().stream()
.collect(Collectors.toMap(Field::name,f->runtimeConverter.convert(after.get(f),f.schema())));
}
...
}
同时修改 Main 函数,在构造 JsonStringDebeziumDeserializationSchema 时传入对应的时区,再次运行时就可以看到符合我们预期的输出了。
修改时区后输出:
(true,{"f_date":"2020-10-10","f_random_str":"1","f_sequence":1,"f_timestamp":1630486762,"f_datetime":"2020-10-10T11:11:11+08:00","f_random":1,"Name":"1"}) (true,{"f_date":"2020-10-10","f_random_str":"2","f_sequence":2,"f_timestamp":1630486762,"f_datetime":"2020-10-10T11:11:11+08:00","f_random":2,"Name":"2"}) (true,{"f_date":"2020-10-10","f_random_str":"3","f_sequence":3,"f_timestamp":1630486762,"f_datetime":"2020-10-10T11:11:11+08:00","f_random":3,"Name":"3"}) (true,{"f_date":"2020-10-10","f_random_str":"4","f_sequence":33333,"f_timestamp":1630486762,"f_datetime":"2020-10-10T11:11:11+08:00","f_random":4,"Name":"3"}) 最后我们可以验证一下 Schema 变更是不是可以及时同步到输出的 JSON 中,通过语句在数据库中新增一个字段,并插入一条新数据:
ALTER TABLE my_test ADD f_added_string varchar(255) NOT NULL DEFAULT '' COMMENT '新增字段'; INSERT INTO my_test VALUES(1,1,'new','new','2020-10-10 10:10:10',1630486762,'new'); 可以看到输出中已经出现了新增的字段
其实这个同步的mysql的表的个数,与当前的服务器配置是有关系的,根据当前服务器的配置不同,同步的个数也不会相同。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。