开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

请问FlinkCDC读取mysql数据库, 时间类型字段值和实际不一致的问题有解决方案吗?

请问FlinkCDC读取mysql数据库, 时间类型字段值和实际不一致的问题有解决方案吗?

展开
收起
小小鹿鹿鹿 2024-04-23 22:13:13 58 0
1 条回答
写回答
取消 提交回答
  • package idm.deserialize;

    import com.alibaba.fastjson.JSONObject;
    import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
    import com.ververica.cdc.debezium.utils.TemporalConversions;
    import idm.util.StringUtil;
    import io.debezium.time.*;
    import org.apache.flink.api.common.typeinfo.TypeInformation;
    import org.apache.flink.util.Collector;
    import org.apache.kafka.connect.data.Schema;
    import org.apache.kafka.connect.data.SchemaBuilder;
    import org.apache.kafka.connect.data.Struct;
    import org.apache.kafka.connect.source.SourceRecord;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import java.time.LocalDateTime;
    import java.time.ZoneId;

    /**

    • @author:lvmei
    • @date :2024/3/4 13:08
      */
      public class CustomDebeziumDeserializationSchema implements DebeziumDeserializationSchema{

      private static final Logger logger = LoggerFactory.getLogger(CustomDebeziumDeserializationSchema.class);

      @Override
      public void deserialize(SourceRecord sourceRecord, Collector collector) throws Exception {

    // logger.info("deserialize:" + JSON.toJSONString(sourceRecord.value()));
    JSONObject obj = new JSONObject();

        // 解析值
        Struct value = (Struct)sourceRecord.value();
        // 解析主键
        Struct key = (Struct)sourceRecord.key();
        JSONObject keys = parseStruct(key);
    
        Struct source = value.getStruct("source");
        String table = source.get("table").toString();//增加table区分不同表处理
    
        JSONObject before = parseStruct(value.getStruct("before"));
        JSONObject after = parseStruct(value.getStruct("after"));
        // 获取操作类型
        String op = value.get("op").toString();  //只过滤修改类型
    

    // source.get("db")}.${source.get("table")
    obj.put("key",keys);
    obj.put("before",before);
    obj.put("after",after);
    obj.put("op",op);
    obj.put("table",table);
    collector.collect(obj);
    }

    /**
     * 解析[[Struct]]结构为json
     * @param struct_
     * @return
     */
    public JSONObject parseStruct(Struct struct_){
        if(struct_ == null) {
            return null;
        }
    
        JSONObject json = new JSONObject();
    
        struct_.schema().fields().stream().forEach(field -> {
            Object v = struct_.get(field);
            String typ = field.schema().name();
            Schema.Type type = field.schema().type();
    

    // System.out.println("v:" + v + ",typ:" + typ + ",type:" + type + ",fieldName:" + field.name());

            if(v != null){
                json.put(field.name(),getFields(v,typ));
            }
        });
        return json;
    }
    
    public Object getFields(Object v, String type){
        Object value = null;
        if (v instanceof Long) {
            value = convertLongToTime(type, ((Long) v).longValue());
        } else if (v instanceof Integer) {
            value = convertIntToDate(((Integer) v).intValue(), type);
        } else if (v == null) {
            value =  null;
        } else {
            value = convertObjToTime(v, type);
        }
        return value;
    }
    
    /** 类型转换 */
    private Object convertObjToTime(Object obj,String type){
    

    // System.out.println("convertObjToTime obj=" + obj + ",type="+type);
    if(null == type){
    return obj;
    }
    switch (type){
    case Time.SCHEMA_NAME:
    case MicroTime.SCHEMA_NAME:
    case NanoTime.SCHEMA_NAME:
    return java.sql.Time.valueOf(TemporalConversions.toLocalTime(obj));
    case Timestamp.SCHEMA_NAME:
    case MicroTimestamp.SCHEMA_NAME :
    case NanoTimestamp.SCHEMA_NAME:
    case ZonedTimestamp.SCHEMA_NAME:
    return java.sql.Timestamp.valueOf(TemporalConversions.toLocalDateTime(obj, ZoneId.of("UTC")));
    default:
    return obj;
    }
    }

    public Object convertIntToDate(Integer obj,String type){
        if(null == type){
            return obj;
        }
        SchemaBuilder date_schema = SchemaBuilder.int64().name("org.apache.kafka.connect.data.Date");
        if(StringUtil.isNullOrEmpty(type)){
            return obj;
        }
        switch (type){
            case Date.SCHEMA_NAME:
                java.util.Date date = org.apache.kafka.connect.data.Date.toLogical(date_schema, obj);
                return date.toInstant().atZone(ZoneId.of("UTC")).toLocalDateTime().toString();
            default:
                return obj;
        }
    }
    
    /**
     * long 转换为时间类型
     */
    public static Object convertLongToTime(String type,Long obj){
    
        if(StringUtil.isNullOrEmpty(type)){
            return obj;
        }
    
        SchemaBuilder time_schema = SchemaBuilder.int64().name("org.apache.kafka.connect.data.Time");
        SchemaBuilder date_schema = SchemaBuilder.int64().name("org.apache.kafka.connect.data.Date");
        SchemaBuilder timestamp_schema = SchemaBuilder.int64().name("org.apache.kafka.connect.data.Timestamp");
    
        switch (type){
            case Time.SCHEMA_NAME:
                return org.apache.kafka.connect.data.Time.toLogical(time_schema, obj.intValue())
                 .toInstant().atZone(ZoneId.of("UTC")).toLocalTime();
    
            case MicroTime.SCHEMA_NAME:
                return org.apache.kafka.connect.data.Time.toLogical(time_schema, ((Long)(obj / 1000)).intValue())
                 .toInstant().atZone(ZoneId.of("UTC")).toLocalTime();
    
            case NanoTime.SCHEMA_NAME:
                return org.apache.kafka.connect.data.Time.toLogical(time_schema, ((Long)(obj / 1000 / 1000)).intValue())
                    .toInstant().atZone(ZoneId.of("UTC")).toLocalTime();
    
            case Timestamp.SCHEMA_NAME:
                LocalDateTime utc = org.apache.kafka.connect.data.Timestamp.toLogical(timestamp_schema, obj)
                        .toInstant().atZone(ZoneId.of("UTC")).toLocalDateTime();
                return java.sql.Timestamp.valueOf(utc);
    
            case MicroTimestamp.SCHEMA_NAME:
                LocalDateTime utc1 = org.apache.kafka.connect.data.Timestamp.toLogical(timestamp_schema, obj / 1000)
                        .toInstant().atZone(ZoneId.of("UTC")).toLocalDateTime();
                return java.sql.Timestamp.valueOf(utc1);
    
            case NanoTimestamp.SCHEMA_NAME:
                LocalDateTime t = org.apache.kafka.connect.data.Timestamp.toLogical(timestamp_schema, obj / 1000 / 1000)
                        .toInstant().atZone(ZoneId.of("UTC")).toLocalDateTime();
                return java.sql.Timestamp.valueOf(t);
    
            case Date.SCHEMA_NAME:
                return org.apache.kafka.connect.data.Date.toLogical(date_schema, obj.intValue())
                        .toInstant().atZone(ZoneId.of("UTC")).toLocalDate();
            default:
                return obj;
        }
    }
    
    
    @Override
    public TypeInformation<JSONObject> getProducedType() {
        return TypeInformation.of(JSONObject.class);
    }
    

    }此回答来自钉群Flink CDC 社区。

    2024-04-23 22:41:08
    赞同 2 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关电子书

更多
搭建电商项目架构连接MySQL 立即下载
搭建4层电商项目架构,实战连接MySQL 立即下载
PolarDB MySQL引擎重磅功能及产品能力盛大发布 立即下载

相关镜像