开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

谁有flinkcdc写oracle的连接器参数连接发一下?谢谢

谁有flink写oracle的连接器参数连接发一下,谢谢

展开
收起
防火防爆 2024-08-26 14:54:51 44 0
1 条回答
写回答
取消 提交回答
  • 利用flink的DatStream API 创建任务

     public static void main(String[] args) throws Exception {
            SourceFunction<String> sourceFunction = OracleSource.<String>builder()
                    .url("jdbc:oracle:thin:@hadoop103:1521:ORCL")  // 用来连接oracle的url
                    .port(1521)    // oracle  的端口号
                    .database("ORCL") // 监听的数据库
                    .schemaList("SCOTT") // 监听的schema
                    .tableList("SCOTT.USERS ,SCOTT.APPS") // 监听的表
                    .username("system") // 用户名
                    .password("Oracle123") // 密码 
                    //    .deserializer(new JsonDebeziumDeserializationSchema()) // 将结果以json格式进行输出。
                    .deserializer(new OracleDeserializationSchema ())         // 自定义的输出格式
                    .build();
    
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            env.addSource(sourceFunction)
               .print().setParallelism(1); // use parallelism 1 for sink to keep message ordering
    
            env.execute();
        }
    

    自定义输出格式 - -> {schema:xx , table:xx, data:{xx:xx,yy:yy}}

    import com.alibaba.fastjson.JSONObject;
    import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
    import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
    import org.apache.flink.api.common.typeinfo.TypeInformation;
    import org.apache.flink.util.Collector;
    import org.apache.kafka.connect.data.Field;
    import org.apache.kafka.connect.data.Schema;
    import org.apache.kafka.connect.data.Struct;
    import org.apache.kafka.connect.source.SourceRecord;
    
    import java.math.BigDecimal;
    import java.util.LinkedHashMap;
    import java.util.List;
    
    public class OracleDeserializationSchema implements DebeziumDeserializationSchema<String> {
    
    
        @Override
        public void deserialize(SourceRecord record, Collector<String> out) throws Exception {
            JSONObject result = new JSONObject(new LinkedHashMap<>());
            /*得到schema 和  table*/
            String topic = record.topic();
            String[] fields = topic.split("\\.");
            result.put("schema", fields[1]);
            result.put("table", fields[2]);
            /*得到after数据*/
            Struct value = (Struct) record.value();
            Struct after = (Struct) value.get("after");
            JSONObject afterJson = new JSONObject(new LinkedHashMap<>());
            if (after != null) {
                Schema schema = after.schema();
                List<Field> fieldList = schema.fields();
                for (Field field : fieldList) {
                        /*
                             由于通过归档日志读取到的number类型的数据格式为:{scale=2,value=[B@6e0782b5}
                            scale :数值的精度,及小数点后几位;value:数值的二进制数组的格式
                             需要将改格式转化成实际数据库中存储的格式
                            主要思路是,将value的值转化为 long类型,将scale的值转化为int类型,通过BigDecimal.valueOf()方法得到存储的值
                         */
                    if (after.get(field) instanceof Struct) {
                        Struct fieldValue = (Struct) after.get(field);
                        byte[] bytes = fieldValue.getBytes("value");
                        String l = Byte.toString(bytes[0]);
                        afterJson.put(field.name(), l);
                        Integer scale = fieldValue.getInt32("scale");
                        BigDecimal valueOf = BigDecimal.valueOf(Long.valueOf(l), scale);
                        afterJson.put(field.name(), valueOf.toString());
                    } else {
                        afterJson.put(field.name(), after.get(field));
                    }
                }
            }
            result.put("data", afterJson);
            out.collect(result.toJSONString());
        }
    
        @Override
        public TypeInformation<String> getProducedType() {
            return  BasicTypeInfo.STRING_TYPE_INFO;
        }
    }
    

    ——参考链接

    2024-08-30 08:12:40
    赞同 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关产品

  • 实时计算 Flink版
  • 相关电子书

    更多
    PostgresChina2018_樊文凯_ORACLE数据库和应用异构迁移最佳实践 立即下载
    PostgresChina2018_王帅_从Oracle到PostgreSQL的数据迁移 立即下载
    Oracle云上最佳实践 立即下载

    相关镜像