开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

Flink CDC 自定义时间转换?

Flink CDC 自定义时间转换?import io.debezium.spi.converter.CustomConverter;
import io.debezium.spi.converter.RelationalColumn;
import org.apache.kafka.connect.data.SchemaBuilder;

import java.time.*;
import java.time.format.DateTimeFormatter;
import java.util.Properties;

public class MySqlDateTimeConverter implements CustomConverter {

private DateTimeFormatter dateFormatter = DateTimeFormatter.ISO_DATE;
private DateTimeFormatter timeFormatter = DateTimeFormatter.ISO_TIME;
private DateTimeFormatter datetimeFormatter = DateTimeFormatter.ISO_DATE_TIME;
private DateTimeFormatter timestampFormatter = DateTimeFormatter.ISO_DATE_TIME;
private ZoneId timestampZoneId = ZoneId.systemDefault();

@Override
public void configure(Properties props) {

}

@Override
public void converterFor(RelationalColumn column, ConverterRegistration<SchemaBuilder> registration) {

    String sqlType = column.typeName().toUpperCase();
    SchemaBuilder schemaBuilder = null;
    Converter converter = null;

    if ("DATE".equals(sqlType)) {
        schemaBuilder = SchemaBuilder.string().optional().name("com.darcytech.debezium.date.string");
        converter = this::convertDate;
    }

    if ("TIME".equals(sqlType)) {
        schemaBuilder = SchemaBuilder.string().optional().name("com.darcytech.debezium.time.string");
        converter = this::convertTime;
    }

    if ("DATETIME".equals(sqlType)) {
        schemaBuilder = SchemaBuilder.string().optional().name("com.darcytech.debezium.datetime.string");
        converter = this::convertDateTime;
    }

    if ("TIMESTAMP".equals(sqlType)) {
        schemaBuilder = SchemaBuilder.string().optional().name("com.darcytech.debezium.timestamp.string");
        converter = this::convertTimestamp;
    }

    if (schemaBuilder != null) {
        registration.register(schemaBuilder, converter);
    }

}


private String convertDate(Object input) {

    if (input == null) return null;
    if (input instanceof LocalDate) {
        return dateFormatter.format((LocalDate) input);
    }

    if (input instanceof Integer) {
        LocalDate date = LocalDate.ofEpochDay((Integer) input);
        return dateFormatter.format(date);
    }
    return String.valueOf(input);

}


private String convertTime(Object input) {
    if (input == null) return null;
    if (input instanceof Duration) {
        Duration duration = (Duration) input;
        long seconds = duration.getSeconds();
        int nano = duration.getNano();
        LocalTime time = LocalTime.ofSecondOfDay(seconds).withNano(nano);
        return timeFormatter.format(time);
    }

    return String.valueOf(input);

}


private String convertDateTime(Object input) {
    if (input == null) return null;
    if (input instanceof LocalDateTime) {
        return datetimeFormatter.format((LocalDateTime) input).replaceAll("T", " ");
    }
    return String.valueOf(input);
}

private String convertTimestamp(Object input) {
    if (input == null) return null;
    if (input instanceof ZonedDateTime) {
        // mysql的timestamp会转成UTC存储,这里的zonedDatetime都是UTC时间
        ZonedDateTime zonedDateTime = (ZonedDateTime) input;
        LocalDateTime localDateTime = zonedDateTime.withZoneSameInstant(timestampZoneId).toLocalDateTime();
        return timestampFormatter.format(localDateTime).replaceAll("T", " ");
    }
    return String.valueOf(input);
}

}
image.png

展开
收起
真的很搞笑 2023-09-12 18:56:27 205 0
1 条回答
写回答
取消 提交回答
  • 在 Flink CDC 中,您可以通过自定义时间转换函数来处理时间字段的转换和格式化。以下是一个示例,展示了如何自定义时间转换函数:

    import org.apache.flink.table.functions.FunctionContext;
    import org.apache.flink.table.functions.ScalarFunction;

    import java.sql.Timestamp;
    import java.time.LocalDateTime;
    import java.time.format.DateTimeFormatter;

    public class CustomTimeConverter extends ScalarFunction {
    private transient DateTimeFormatter inputFormatter;
    private transient DateTimeFormatter outputFormatter;

    @Override
    public void open(FunctionContext context) {
        // 初始化转换器
        inputFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
        outputFormatter = DateTimeFormatter.ofPattern("yyyy/MM/dd");
    }
    
    public String convert(String inputTime) {
        // 执行时间转换逻辑
        LocalDateTime dateTime = LocalDateTime.parse(inputTime, inputFormatter);
        return dateTime.format(outputFormatter);
    }
    

    }
    在上述示例中,CustomTimeConverter 类继承了 ScalarFunction 类,实现了时间转换逻辑。在 open() 方法中,您可以初始化您所需的时间格式化器。convert() 方法接收一个输入时间字符串,并将其转换为另一种格式,然后返回转换后的结果。

    接下来,您可以在 Flink SQL 中使用自定义时间转换函数。例如,假设您有一个名为 source_table

    2023-10-23 11:12:53
    赞同 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关产品

  • 实时计算 Flink版
  • 热门讨论

    热门文章

    相关电子书

    更多
    Flink CDC Meetup PPT - 龚中强 立即下载
    Flink CDC Meetup PPT - 王赫 立即下载
    Flink CDC Meetup PPT - 覃立辉 立即下载