flink cdc 默认的序列化对象是什么?

flink cdc 默认的序列化对象是什么?

展开
收起
wenti 2023-02-06 15:28:11 368 分享 版权
2 条回答
写回答
取消 提交回答
  • Apache Flink CDC (Change Data Capture) 连接器通常会使用 Flink 自带的序列化机制来序列化和反序列化数据。在 Flink 中,默认的序列化框架是 Kryo。Kryo 是一个快速高效的Java序列化框架,能够处理各种类型的对象。

    当使用 Flink CDC 连接器读取变更数据时,Flink 会将读取到的数据(比如数据库表中的行)封装成一种特定的数据类型,通常是RowData类型的事件。RowData是 Flink 的内部数据表示,用于高效的内存计算。如果你没有指定自己的序列化方式,则 Flink 会用 Kryo 来进行序列化。

    不过,对于 Flink 的状态后端来说,默认会采用 Flink 自己实现的序列化机制。如果需要在状态后端存储RowData,它通常会通过内置的RowDataSerializer来进行序列化和反序列化操作。RowDataSerializer是专为RowData设计的,能够很好地处理它的内部结构。

    然而,在 Flink 应用中,当数据需要经过网络传输或者需要持久化时,可以通过自定义序列化器来对数据进行序列化和反序列化,这样就可以优化这些过程的性能或满足特定的需求。例如,你可以为 Flink 程序定义 Avro、Protobuf 或 JSON 等格式的序列化器。

    总之,Flink CDC 默认使用 Flink 的 RowDataSerializer 来序列化RowData类型的对象,而对于其他类型的对象,如果没有指定序列化器,则会默认使用 Kryo。在实际开发中,可以根据数据处理的需要选择合适的序列化方式。

    2024-02-26 17:46:07
    赞同 展开评论
  • 序列化如下:

    import com.alibaba.fastjson.JSONObject;
    import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Field;
    import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Schema;
    import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Struct;
    import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.source.SourceRecord;
    import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
    import io.debezium.data.Envelope;
    import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
    import org.apache.flink.api.common.typeinfo.TypeInformation;
    import org.apache.flink.util.Collector;
    
    import java.util.ArrayList;
    import java.util.List;
    /**
     * @Author 
     * @Date 2022/8/26 16:14
     */
    public class CustomerDeserialization implements DebeziumDeserializationSchema<String> {
    
    
        @Override
        public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
    
            //1.创建 JSON 对象用于存储最终数据
            JSONObject result = new JSONObject();
    
            //2.获取库名&表名放入 source
            String topic = sourceRecord.topic();
            String[] fields = topic.split("\\.");
            String database = fields[1];
            String tableName = fields[2];
            JSONObject source = new JSONObject();
            source.put("db",database);
            source.put("table",tableName);
    
            Struct value = (Struct) sourceRecord.value();
            //3.获取"before"数据
            Struct before = value.getStruct("before");
            JSONObject beforeJson = new JSONObject();
            if (before != null) {
                Schema beforeSchema = before.schema();
                List<Field> beforeFields = beforeSchema.fields();
                for (Field field : beforeFields) {
                    Object beforeValue = before.get(field);
                    beforeJson.put(field.name(), beforeValue);
                }
            }
    
            //4.获取"after"数据
            Struct after = value.getStruct("after");
            JSONObject afterJson = new JSONObject();
            if (after != null) {
                Schema afterSchema = after.schema();
                List<Field> afterFields = afterSchema.fields();
                for (Field field : afterFields) {
                    Object afterValue = after.get(field);
                    afterJson.put(field.name(), afterValue);
                }
            }
    
            //5.获取操作类型  CREATE UPDATE DELETE 进行符合 Debezium-op 的字母
            Envelope.Operation operation = Envelope.operationFor(sourceRecord);
            String type = operation.toString().toLowerCase();
            if ("insert".equals(type)) {
                type = "c";
            }
            if ("update".equals(type)) {
                type = "u";
            }
            if ("delete".equals(type)) {
                type = "d";
            }
            if ("create".equals(type)) {
                type = "c";
            }
    
            //6.将字段写入 JSON 对象
            result.put("source", source);
            result.put("before", beforeJson);
            result.put("after", afterJson);
            result.put("op", type);
    
            //7.输出数据
            collector.collect(result.toJSONString());
    
        }
    
        @Override
        public TypeInformation<String> getProducedType() {
            return BasicTypeInfo.STRING_TYPE_INFO;
        }
    }
    
    2023-02-09 07:49:33
    赞同 展开评论

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

收录在圈子:
实时计算 Flink 版(Alibaba Cloud Realtime Compute for Apache Flink,Powered by Ververica)是阿里云基于 Apache Flink 构建的企业级、高性能实时大数据处理系统,由 Apache Flink 创始团队官方出品,拥有全球统一商业化品牌,完全兼容开源 Flink API,提供丰富的企业级增值功能。
还有其他疑问?
咨询AI助理