flink cdc 默认的序列化对象是什么?
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
Apache Flink CDC (Change Data Capture) 连接器通常会使用 Flink 自带的序列化机制来序列化和反序列化数据。在 Flink 中,默认的序列化框架是 Kryo。Kryo 是一个快速高效的Java序列化框架,能够处理各种类型的对象。
当使用 Flink CDC 连接器读取变更数据时,Flink 会将读取到的数据(比如数据库表中的行)封装成一种特定的数据类型,通常是RowData类型的事件。RowData是 Flink 的内部数据表示,用于高效的内存计算。如果你没有指定自己的序列化方式,则 Flink 会用 Kryo 来进行序列化。
不过,对于 Flink 的状态后端来说,默认会采用 Flink 自己实现的序列化机制。如果需要在状态后端存储RowData,它通常会通过内置的RowDataSerializer来进行序列化和反序列化操作。RowDataSerializer是专为RowData设计的,能够很好地处理它的内部结构。
然而,在 Flink 应用中,当数据需要经过网络传输或者需要持久化时,可以通过自定义序列化器来对数据进行序列化和反序列化,这样就可以优化这些过程的性能或满足特定的需求。例如,你可以为 Flink 程序定义 Avro、Protobuf 或 JSON 等格式的序列化器。
总之,Flink CDC 默认使用 Flink 的 RowDataSerializer 来序列化RowData类型的对象,而对于其他类型的对象,如果没有指定序列化器,则会默认使用 Kryo。在实际开发中,可以根据数据处理的需要选择合适的序列化方式。
序列化如下:
import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Field;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Schema;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Struct;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.source.SourceRecord;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import java.util.ArrayList;
import java.util.List;
/**
* @Author
* @Date 2022/8/26 16:14
*/
public class CustomerDeserialization implements DebeziumDeserializationSchema<String> {
@Override
public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
//1.创建 JSON 对象用于存储最终数据
JSONObject result = new JSONObject();
//2.获取库名&表名放入 source
String topic = sourceRecord.topic();
String[] fields = topic.split("\\.");
String database = fields[1];
String tableName = fields[2];
JSONObject source = new JSONObject();
source.put("db",database);
source.put("table",tableName);
Struct value = (Struct) sourceRecord.value();
//3.获取"before"数据
Struct before = value.getStruct("before");
JSONObject beforeJson = new JSONObject();
if (before != null) {
Schema beforeSchema = before.schema();
List<Field> beforeFields = beforeSchema.fields();
for (Field field : beforeFields) {
Object beforeValue = before.get(field);
beforeJson.put(field.name(), beforeValue);
}
}
//4.获取"after"数据
Struct after = value.getStruct("after");
JSONObject afterJson = new JSONObject();
if (after != null) {
Schema afterSchema = after.schema();
List<Field> afterFields = afterSchema.fields();
for (Field field : afterFields) {
Object afterValue = after.get(field);
afterJson.put(field.name(), afterValue);
}
}
//5.获取操作类型 CREATE UPDATE DELETE 进行符合 Debezium-op 的字母
Envelope.Operation operation = Envelope.operationFor(sourceRecord);
String type = operation.toString().toLowerCase();
if ("insert".equals(type)) {
type = "c";
}
if ("update".equals(type)) {
type = "u";
}
if ("delete".equals(type)) {
type = "d";
}
if ("create".equals(type)) {
type = "c";
}
//6.将字段写入 JSON 对象
result.put("source", source);
result.put("before", beforeJson);
result.put("after", afterJson);
result.put("op", type);
//7.输出数据
collector.collect(result.toJSONString());
}
@Override
public TypeInformation<String> getProducedType() {
return BasicTypeInfo.STRING_TYPE_INFO;
}
}
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。