开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

大佬有序列化flinkcdc数据到这个格式的序列化方法么?

大佬有序列化flinkcdc数据到这个格式的序列化方法么?我这边用java来解析有些卡住了。 0cedd962745dab9f9377719794603f32.png

展开
收起
爱喝咖啡嘿 2022-12-20 16:04:26 271 0
1 条回答
写回答
取消 提交回答
  • 自定义反序列化器

    package com.yyds;
    
    import com.alibaba.fastjson.JSONObject;
    import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
    import io.debezium.data.Envelope;
    import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
    import org.apache.flink.api.common.typeinfo.TypeInformation;
    import org.apache.flink.util.Collector;
    import org.apache.kafka.connect.data.Field;
    import org.apache.kafka.connect.data.Schema;
    import org.apache.kafka.connect.data.Struct;
    import org.apache.kafka.connect.source.SourceRecord;
    
    import java.util.List;
    
    /**
     * 自定义序列化器
     */
    public class MyDeserialization implements DebeziumDeserializationSchema<String> {
    
    
    
        /**
         *封装为json字符串
         * {
         *     "database":"",
         *     "tableName":"",
         *     "type":"c u d",
         *     "before":{
         *         "":"",
         *         "":"",
         *         "":""
         *     },
         *     "after":{
         *               "":"",
         *               "":"",
         *              "":""
         *     }
         * }
         */
        @Override
        public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
    
            JSONObject res = new JSONObject();
    
            // 获取数据库和表名称
            String topic = sourceRecord.topic();
            String[] fields = topic.split("\\.");
            String database = fields[1];
            String tableName = fields[2];
    
    
    
            Struct value = (Struct)sourceRecord.value();
            // 获取before数据
            Struct before = value.getStruct("before");
            JSONObject beforeJson = new JSONObject();
            if(before != null){
                Schema beforeSchema = before.schema();
                List<Field> beforeFields = beforeSchema.fields();
                for (Field field : beforeFields) {
                    Object beforeValue = before.get(field);
                    beforeJson.put(field.name(),beforeValue);
                }
            }
    
    
            // 获取after数据
            Struct after = value.getStruct("after");
            JSONObject afterJson = new JSONObject();
            if(after != null){
                Schema afterSchema = after.schema();
                List<Field> afterFields = afterSchema.fields();
                for (Field field : afterFields) {
                    Object afterValue = after.get(field);
                    afterJson.put(field.name(),afterValue);
                }
            }
    
            //获取操作类型 READ DELETE UPDATE CREATE
            Envelope.Operation operation = Envelope.operationFor(sourceRecord);
            String type = operation.toString().toLowerCase();
            if("create".equals(type)){
                type = "insert";
            }
    
            // 将字段写到json对象中
            res.put("database",database);
            res.put("tableName",tableName);
            res.put("before",beforeJson);
            res.put("after",afterJson);
            res.put("type",type);
    
            //输出数据
            collector.collect(res.toString());
        }
    
        @Override
        public TypeInformation<String> getProducedType() {
            return BasicTypeInfo.STRING_TYPE_INFO;
        }
    }
    
    
    
    2022-12-26 07:57:43
    赞同 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关电子书

更多
低代码开发师(初级)实战教程 立即下载
冬季实战营第三期:MySQL数据库进阶实战 立即下载
阿里巴巴DevOps 最佳实践手册 立即下载