postgresql cdc时区问题
1:以postgrsql 作为数据源时,Date和timesatmp等类型cdc同步读出来时,会发现一下几个问题:
时间,日期等类型的数据对应的会转化为Int,long等类型。
源表同步后,时间相差8小时。这是因为时区不同的缘故。
源表:
sink 表:
解决方案:在自定义序列化时进行处理。
java code
package pg.cdc.ds; import com.alibaba.fastjson.JSONObject; import com.ververica.cdc.debezium.DebeziumDeserializationSchema; import io.debezium.data.Envelope; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.util.Collector; import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord; import java.text.SimpleDateFormat; import java.time.ZoneId; import java.util.Date; import java.util.List; public class CustomerDeserialization implements DebeziumDeserializationSchema<String> { ZoneId serverTimeZone; @Override public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception { //1.创建JSON对象用于存储最终数据 JSONObject result = new JSONObject(); Struct value = (Struct) sourceRecord.value(); //2.获取库名&表名 Struct sourceStruct = value.getStruct("source"); String database = sourceStruct.getString("db"); String schema = sourceStruct.getString("schema"); String tableName = sourceStruct.getString("table"); //3.获取"before"数据 Struct before = value.getStruct("before"); JSONObject beforeJson = new JSONObject(); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); SimpleDateFormat sdf1 = new SimpleDateFormat("yyyy-MM-dd"); if (before != null) { Schema beforeSchema = before.schema(); List<Field> beforeFields = beforeSchema.fields(); for (Field field : beforeFields) { Object beforeValue = before.get(field); if ("int64".equals(field.schema().type().getName()) && "io.debezium.time.MicroTimestamp".equals(field.schema().name())) { if (beforeValue != null) { long times = (long) beforeValue / 1000; String dateTime = sdf.format(new Date((times - 8 * 60 * 60 * 1000))); beforeJson.put(field.name(), dateTime); } } else if ("int64".equals(field.schema().type().getName()) && "io.debezium.time.NanoTimestamp".equals(field.schema().name())) { if (beforeValue != null) { long times = (long) beforeValue; String dateTime = sdf.format(new Date((times - 8 * 60 * 60 * 1000))); beforeJson.put(field.name(), dateTime); } } else if ("int64".equals(field.schema().type().getName()) && "io.debezium.time.Timestamp".equals(field.schema().name())) { if (beforeValue != null) { long times = (long) beforeValue; String dateTime = sdf.format(new Date((times - 8 * 60 * 60 ))); beforeJson.put(field.name(), dateTime); } } else if("int32".equals(field.schema().type().getName()) && "io.debezium.time.Date".equals(field.schema().name())){ if(beforeValue != null) { int times = (int) beforeValue; String dateTime = sdf1.format(new Date(times * 24 * 60 * 60L * 1000)); beforeJson.put(field.name(), dateTime); } } else { beforeJson.put(field.name(), beforeValue); } } } //4.获取"after"数据 Struct after = value.getStruct("after"); JSONObject afterJson = new JSONObject(); if (after != null) { Schema afterSchema = after.schema(); List<Field> afterFields = afterSchema.fields(); for (Field field : afterFields) { Object afterValue = after.get(field); if ("int64".equals(field.schema().type().getName()) && "io.debezium.time.MicroTimestamp".equals(field.schema().name())) { if (afterValue != null) { long times = (long) afterValue / 1000; String dateTime = sdf.format(new Date((times - 8 * 60 * 60 * 1000))); afterJson.put(field.name(), dateTime); } } else if ("int64".equals(field.schema().type().getName()) && "io.debezium.time.NanoTimestamp".equals(field.schema().name())) { if (afterValue != null) { long times = (long) afterValue; String dateTime = sdf.format(new Date((times - 8 * 60 * 60 * 1000))); afterJson.put(field.name(), dateTime); } } else if ("int64".equals(field.schema().type().getName()) && "io.debezium.time.Timestamp".equals(field.schema().name())) { if (afterValue != null) { long times = (long) afterValue; String dateTime = sdf.format(new Date((times - 8 * 60 * 60))); afterJson.put(field.name(), dateTime); } } else if("int32".equals(field.schema().type().getName()) && "io.debezium.time.Date".equals(field.schema().name())){ if(afterValue != null) { int times = (int) afterValue; String dateTime = sdf1.format(new Date(times * 24 * 60 * 60L * 1000)); afterJson.put(field.name(), dateTime); } } else { afterJson.put(field.name(), afterValue); } } } //5.获取操作类型 CREATE UPDATE DELETE Envelope.Operation operation = Envelope.operationFor(sourceRecord); String type = operation.toString().toLowerCase(); if ("create".equals(type) || "read".equals(type)) { type = "insert"; } //6.将字段写入JSON对象 result.put("database", database); result.put("schema", schema); result.put("tableName", tableName); result.put("before", beforeJson); result.put("after", afterJson); result.put("type", type); //7.输出数据 collector.collect(result.toJSONString()); } @Override public TypeInformation<String> getProducedType() { return BasicTypeInfo.STRING_TYPE_INFO; } }
scala code
import com.ververica.cdc.debezium.DebeziumDeserializationSchema import com.ververica.cdc.debezium.utils.TemporalConversions import io.debezium.time._ import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.types.Row import org.apache.flink.util.Collector import org.apache.kafka.connect.data.{SchemaBuilder, Struct} import org.apache.kafka.connect.source.SourceRecord import java.sql import java.time.{Instant, LocalDateTime, ZoneId} import scala.collection.JavaConverters._ import scala.util.parsing.json.JSONObject class StructDebeziumDeserializationSchema(serverTimeZone: String) extends DebeziumDeserializationSchema[Row] { override def deserialize(sourceRecord: SourceRecord, collector: Collector[Row]): Unit = { // 解析主键 val key = sourceRecord.key().asInstanceOf[Struct] val keyJs = parseStruct(key) // 解析值 val value = sourceRecord.value().asInstanceOf[Struct] val source = value.getStruct("source") val before = parseStruct(value.getStruct("before")) val after = parseStruct(value.getStruct("after")) val row = Row.withNames() row.setField("table", s"${source.get("db")}.${source.get("table")}") row.setField("key", keyJs) row.setField("op", value.get("op")) row.setField("op_ts", LocalDateTime.ofInstant(Instant.ofEpochMilli(source.getInt64("ts_ms")), ZoneId.of(serverTimeZone))) row.setField("current_ts", LocalDateTime.ofInstant(Instant.ofEpochMilli(value.getInt64("ts_ms")), ZoneId.of(serverTimeZone))) row.setField("before", before) row.setField("after", after) collector.collect(row) } /** 解析[[Struct]]结构为json字符串 */ private def parseStruct(struct: Struct): String = { if (struct == null) return null val map = struct.schema().fields().asScala.map(field => { val v = struct.get(field) val typ = field.schema().name() println(s"$v, $typ, ${field.name()}") val value = v match { case long if long.isInstanceOf[Long] => convertLongToTime(long.asInstanceOf[Long], typ) case iv if iv.isInstanceOf[Int] => convertIntToDate(iv.asInstanceOf[Int], typ) case iv if iv == null => null case _ => convertObjToTime(v, typ) } (field.name(), value) }).filter(_._2 != null).toMap JSONObject.apply(map).toString() } /** 类型转换 */ private def convertObjToTime(obj: Any, typ: String): Any = { typ match { case Time.SCHEMA_NAME | MicroTime.SCHEMA_NAME | NanoTime.SCHEMA_NAME => sql.Time.valueOf(TemporalConversions.toLocalTime(obj)).toString case Timestamp.SCHEMA_NAME | MicroTimestamp.SCHEMA_NAME | NanoTimestamp.SCHEMA_NAME | ZonedTimestamp.SCHEMA_NAME => sql.Timestamp.valueOf(TemporalConversions.toLocalDateTime(obj, ZoneId.of(serverTimeZone))).toString case _ => obj } } /** long 转换为时间类型 */ private def convertLongToTime(obj: Long, typ: String): Any = { val time_schema = SchemaBuilder.int64().name("org.apache.kafka.connect.data.Time") val date_schema = SchemaBuilder.int64().name("org.apache.kafka.connect.data.Date") val timestamp_schema = SchemaBuilder.int64().name("org.apache.kafka.connect.data.Timestamp") typ match { case Time.SCHEMA_NAME => org.apache.kafka.connect.data.Time.toLogical(time_schema, obj.asInstanceOf[Int]).toInstant.atZone(ZoneId.of(serverTimeZone)).toLocalTime.toString case MicroTime.SCHEMA_NAME => org.apache.kafka.connect.data.Time.toLogical(time_schema, (obj / 1000).asInstanceOf[Int]).toInstant.atZone(ZoneId.of(serverTimeZone)).toLocalTime.toString case NanoTime.SCHEMA_NAME => org.apache.kafka.connect.data.Time.toLogical(time_schema, (obj / 1000 / 1000).asInstanceOf[Int]).toInstant.atZone(ZoneId.of(serverTimeZone)).toLocalTime.toString case Timestamp.SCHEMA_NAME => val t = org.apache.kafka.connect.data.Timestamp.toLogical(timestamp_schema, obj).toInstant.atZone(ZoneId.of(serverTimeZone)).toLocalDateTime java.sql.Timestamp.valueOf(t).toString case MicroTimestamp.SCHEMA_NAME => val t = org.apache.kafka.connect.data.Timestamp.toLogical(timestamp_schema, obj / 1000).toInstant.atZone(ZoneId.of(serverTimeZone)).toLocalDateTime java.sql.Timestamp.valueOf(t).toString case NanoTimestamp.SCHEMA_NAME => val t = org.apache.kafka.connect.data.Timestamp.toLogical(timestamp_schema, obj / 1000 / 1000).toInstant.atZone(ZoneId.of(serverTimeZone)).toLocalDateTime java.sql.Timestamp.valueOf(t).toString case Date.SCHEMA_NAME => org.apache.kafka.connect.data.Date.toLogical(date_schema, obj.asInstanceOf[Int]).toInstant.atZone(ZoneId.of(serverTimeZone)).toLocalDate.toString case _ => obj } } private def convertIntToDate(obj:Int, typ: String): Any ={ val date_schema = SchemaBuilder.int64().name("org.apache.kafka.connect.data.Date") typ match { case Date.SCHEMA_NAME => org.apache.kafka.connect.data.Date.toLogical(date_schema, obj).toInstant.atZone(ZoneId.of(serverTimeZone)).toLocalDate.toString case _ => obj } } override def getProducedType: TypeInformation[Row] = { TypeInformation.of(classOf[Row]) } }
mysql cdc时区问题
mysql cdc也会出现上述时区问题,Debezium默认将MySQL中datetime类型转成UTC的时间戳({@link io.debezium.time.Timestamp}),时区是写死的无法更改,导致数据库中设置的UTC+8,到kafka中变成了多八个小时的long型时间戳 Debezium默认将MySQL中的timestamp类型转成UTC的字符串。
解决思路有两种:
1:自定义序列化方式的时候做时区转换。
2:自定义时间转换类,通过debezium配置文件指定转化格式。
这里主要使用第二种方式。
package com.zmn.schema; import io.debezium.spi.converter.CustomConverter; import io.debezium.spi.converter.RelationalColumn; import org.apache.kafka.connect.data.SchemaBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.time.*; import java.time.format.DateTimeFormatter; import java.util.Properties; import java.util.function.Consumer; /** * 处理Debezium时间转换的问题 * Debezium默认将MySQL中datetime类型转成UTC的时间戳({@link io.debezium.time.Timestamp}),时区是写死的无法更改, * 导致数据库中设置的UTC+8,到kafka中变成了多八个小时的long型时间戳 * Debezium默认将MySQL中的timestamp类型转成UTC的字符串。 * | mysql | mysql-binlog-connector | debezium | * | ----------------------------------- | ---------------------------------------- | --------------------------------- | * | date<br>(2021-01-28) | LocalDate<br/>(2021-01-28) | Integer<br/>(18655) | * | time<br/>(17:29:04) | Duration<br/>(PT17H29M4S) | Long<br/>(62944000000) | * | timestamp<br/>(2021-01-28 17:29:04) | ZonedDateTime<br/>(2021-01-28T09:29:04Z) | String<br/>(2021-01-28T09:29:04Z) | * | Datetime<br/>(2021-01-28 17:29:04) | LocalDateTime<br/>(2021-01-28T17:29:04) | Long<br/>(1611854944000) | * * @see io.debezium.connector.mysql.converters.TinyIntOneToBooleanConverter */ public class MySqlDateTimeConverter implements CustomConverter<SchemaBuilder, RelationalColumn> { private final static Logger logger = LoggerFactory.getLogger(MySqlDateTimeConverter.class); private DateTimeFormatter dateFormatter = DateTimeFormatter.ISO_DATE; private DateTimeFormatter timeFormatter = DateTimeFormatter.ISO_TIME; private DateTimeFormatter datetimeFormatter = DateTimeFormatter.ISO_DATE_TIME; private DateTimeFormatter timestampFormatter = DateTimeFormatter.ISO_DATE_TIME; private ZoneId timestampZoneId = ZoneId.systemDefault(); @Override public void configure(Properties props) { readProps(props, "format.date", p -> dateFormatter = DateTimeFormatter.ofPattern(p)); readProps(props, "format.time", p -> timeFormatter = DateTimeFormatter.ofPattern(p)); readProps(props, "format.datetime", p -> datetimeFormatter = DateTimeFormatter.ofPattern(p)); readProps(props, "format.timestamp", p -> timestampFormatter = DateTimeFormatter.ofPattern(p)); readProps(props, "format.timestamp.zone", z -> timestampZoneId = ZoneId.of(z)); } private void readProps(Properties properties, String settingKey, Consumer<String> callback) { String settingValue = (String) properties.get(settingKey); if (settingValue == null || settingValue.length() == 0) { return; } try { callback.accept(settingValue.trim()); } catch (IllegalArgumentException | DateTimeException e) { logger.error("The {} setting is illegal: {}",settingKey,settingValue); throw e; } } @Override public void converterFor(RelationalColumn column, ConverterRegistration<SchemaBuilder> registration) { String sqlType = column.typeName().toUpperCase(); SchemaBuilder schemaBuilder = null; Converter converter = null; if ("DATE".equals(sqlType)) { schemaBuilder = SchemaBuilder.string().optional().name("com.darcytech.debezium.date.string"); converter = this::convertDate; } if ("TIME".equals(sqlType)) { schemaBuilder = SchemaBuilder.string().optional().name("com.darcytech.debezium.time.string"); converter = this::convertTime; } if ("DATETIME".equals(sqlType)) { schemaBuilder = SchemaBuilder.string().optional().name("com.darcytech.debezium.datetime.string"); converter = this::convertDateTime; } if ("TIMESTAMP".equals(sqlType)) { schemaBuilder = SchemaBuilder.string().optional().name("com.darcytech.debezium.timestamp.string"); converter = this::convertTimestamp; } if (schemaBuilder != null) { registration.register(schemaBuilder, converter); } } private String convertDate(Object input) { if (input instanceof LocalDate) { return dateFormatter.format((LocalDate) input); } if (input instanceof Integer) { LocalDate date = LocalDate.ofEpochDay((Integer) input); return dateFormatter.format(date); } return null; } private String convertTime(Object input) { if (input instanceof Duration) { Duration duration = (Duration) input; long seconds = duration.getSeconds(); int nano = duration.getNano(); LocalTime time = LocalTime.ofSecondOfDay(seconds).withNano(nano); return timeFormatter.format(time); } return null; } private String convertDateTime(Object input) { if (input instanceof LocalDateTime) { return datetimeFormatter.format((LocalDateTime) input); } return null; } private String convertTimestamp(Object input) { if (input instanceof ZonedDateTime) { // mysql的timestamp会转成UTC存储,这里的zonedDatetime都是UTC时间 ZonedDateTime zonedDateTime = (ZonedDateTime) input; LocalDateTime localDateTime = zonedDateTime.withZoneSameInstant(timestampZoneId).toLocalDateTime(); return timestampFormatter.format(localDateTime); } return null; } }
使用方式:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); Properties properties = new Properties(); properties.setProperty("snapshot.mode", "schema_only"); // 增量读取 //自定义时间转换配置 properties.setProperty("converters", "dateConverters"); properties.setProperty("dateConverters.type", "pg.cdc.ds.PgSQLDateTimeConverter"); properties.setProperty("dateConverters.format.date", "yyyy-MM-dd"); properties.setProperty("dateConverters.format.time", "HH:mm:ss"); properties.setProperty("dateConverters.format.datetime", "yyyy-MM-dd HH:mm:ss"); properties.setProperty("dateConverters.format.timestamp", "yyyy-MM-dd HH:mm:ss"); properties.setProperty("dateConverters.format.timestamp.zone", "UTC+8"); properties.setProperty("debezium.snapshot.locking.mode","none"); //全局读写锁,可能会影响在线业务,跳过锁设置 properties.setProperty("include.schema.changes", "true"); // 使用flink mysql cdc 发现bigint unsigned类型的字段,capture以后转成了字符串类型, // 用的这个解析吧JsonDebeziumDeserializationSchema。 properties.setProperty("bigint.unsigned.handling.mode","long"); properties.setProperty("decimal.handling.mode","double"); MySqlSource<String> mySqlSource = MySqlSource.<String>builder() .hostname("192.168.10.102") .port(3306) .username("yusys") .password("yusys") .port(3306) .databaseList("gmall") .tableList("gmall.faker_user1") .deserializer(new JsonDebeziumDeserializationSchema()) .debeziumProperties(properties) .serverId(5409) .build(); SingleOutputStreamOperator<string> dataSource = env .addSource(sourceFunction).setParallelism(10).name("binlog-source");