请问FlinkCDC读取mysql数据库, 时间类型字段值和实际不一致的问题有解决方案吗?
package idm.deserialize;
import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.ververica.cdc.debezium.utils.TemporalConversions;
import idm.util.StringUtil;
import io.debezium.time.*;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.LocalDateTime;
import java.time.ZoneId;
/**
@date :2024/3/4 13:08
*/
public class CustomDebeziumDeserializationSchema implements DebeziumDeserializationSchema{
private static final Logger logger = LoggerFactory.getLogger(CustomDebeziumDeserializationSchema.class);
@Override
public void deserialize(SourceRecord sourceRecord, Collector collector) throws Exception {
// logger.info("deserialize:" + JSON.toJSONString(sourceRecord.value()));
JSONObject obj = new JSONObject();
// 解析值
Struct value = (Struct)sourceRecord.value();
// 解析主键
Struct key = (Struct)sourceRecord.key();
JSONObject keys = parseStruct(key);
Struct source = value.getStruct("source");
String table = source.get("table").toString();//增加table区分不同表处理
JSONObject before = parseStruct(value.getStruct("before"));
JSONObject after = parseStruct(value.getStruct("after"));
// 获取操作类型
String op = value.get("op").toString(); //只过滤修改类型
// source.get("db")}.${source.get("table")
obj.put("key",keys);
obj.put("before",before);
obj.put("after",after);
obj.put("op",op);
obj.put("table",table);
collector.collect(obj);
}
/**
* 解析[[Struct]]结构为json
* @param struct_
* @return
*/
public JSONObject parseStruct(Struct struct_){
if(struct_ == null) {
return null;
}
JSONObject json = new JSONObject();
struct_.schema().fields().stream().forEach(field -> {
Object v = struct_.get(field);
String typ = field.schema().name();
Schema.Type type = field.schema().type();
// System.out.println("v:" + v + ",typ:" + typ + ",type:" + type + ",fieldName:" + field.name());
if(v != null){
json.put(field.name(),getFields(v,typ));
}
});
return json;
}
public Object getFields(Object v, String type){
Object value = null;
if (v instanceof Long) {
value = convertLongToTime(type, ((Long) v).longValue());
} else if (v instanceof Integer) {
value = convertIntToDate(((Integer) v).intValue(), type);
} else if (v == null) {
value = null;
} else {
value = convertObjToTime(v, type);
}
return value;
}
/** 类型转换 */
private Object convertObjToTime(Object obj,String type){
// System.out.println("convertObjToTime obj=" + obj + ",type="+type);
if(null == type){
return obj;
}
switch (type){
case Time.SCHEMA_NAME:
case MicroTime.SCHEMA_NAME:
case NanoTime.SCHEMA_NAME:
return java.sql.Time.valueOf(TemporalConversions.toLocalTime(obj));
case Timestamp.SCHEMA_NAME:
case MicroTimestamp.SCHEMA_NAME :
case NanoTimestamp.SCHEMA_NAME:
case ZonedTimestamp.SCHEMA_NAME:
return java.sql.Timestamp.valueOf(TemporalConversions.toLocalDateTime(obj, ZoneId.of("UTC")));
default:
return obj;
}
}
public Object convertIntToDate(Integer obj,String type){
if(null == type){
return obj;
}
SchemaBuilder date_schema = SchemaBuilder.int64().name("org.apache.kafka.connect.data.Date");
if(StringUtil.isNullOrEmpty(type)){
return obj;
}
switch (type){
case Date.SCHEMA_NAME:
java.util.Date date = org.apache.kafka.connect.data.Date.toLogical(date_schema, obj);
return date.toInstant().atZone(ZoneId.of("UTC")).toLocalDateTime().toString();
default:
return obj;
}
}
/**
* long 转换为时间类型
*/
public static Object convertLongToTime(String type,Long obj){
if(StringUtil.isNullOrEmpty(type)){
return obj;
}
SchemaBuilder time_schema = SchemaBuilder.int64().name("org.apache.kafka.connect.data.Time");
SchemaBuilder date_schema = SchemaBuilder.int64().name("org.apache.kafka.connect.data.Date");
SchemaBuilder timestamp_schema = SchemaBuilder.int64().name("org.apache.kafka.connect.data.Timestamp");
switch (type){
case Time.SCHEMA_NAME:
return org.apache.kafka.connect.data.Time.toLogical(time_schema, obj.intValue())
.toInstant().atZone(ZoneId.of("UTC")).toLocalTime();
case MicroTime.SCHEMA_NAME:
return org.apache.kafka.connect.data.Time.toLogical(time_schema, ((Long)(obj / 1000)).intValue())
.toInstant().atZone(ZoneId.of("UTC")).toLocalTime();
case NanoTime.SCHEMA_NAME:
return org.apache.kafka.connect.data.Time.toLogical(time_schema, ((Long)(obj / 1000 / 1000)).intValue())
.toInstant().atZone(ZoneId.of("UTC")).toLocalTime();
case Timestamp.SCHEMA_NAME:
LocalDateTime utc = org.apache.kafka.connect.data.Timestamp.toLogical(timestamp_schema, obj)
.toInstant().atZone(ZoneId.of("UTC")).toLocalDateTime();
return java.sql.Timestamp.valueOf(utc);
case MicroTimestamp.SCHEMA_NAME:
LocalDateTime utc1 = org.apache.kafka.connect.data.Timestamp.toLogical(timestamp_schema, obj / 1000)
.toInstant().atZone(ZoneId.of("UTC")).toLocalDateTime();
return java.sql.Timestamp.valueOf(utc1);
case NanoTimestamp.SCHEMA_NAME:
LocalDateTime t = org.apache.kafka.connect.data.Timestamp.toLogical(timestamp_schema, obj / 1000 / 1000)
.toInstant().atZone(ZoneId.of("UTC")).toLocalDateTime();
return java.sql.Timestamp.valueOf(t);
case Date.SCHEMA_NAME:
return org.apache.kafka.connect.data.Date.toLogical(date_schema, obj.intValue())
.toInstant().atZone(ZoneId.of("UTC")).toLocalDate();
default:
return obj;
}
}
@Override
public TypeInformation<JSONObject> getProducedType() {
return TypeInformation.of(JSONObject.class);
}
}此回答来自钉群Flink CDC 社区。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。