谁有flink写oracle的连接器参数连接发一下,谢谢
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
利用flink的DatStream API 创建任务
 public static void main(String[] args) throws Exception {
        SourceFunction<String> sourceFunction = OracleSource.<String>builder()
                .url("jdbc:oracle:thin:@hadoop103:1521:ORCL")  // 用来连接oracle的url
                .port(1521)    // oracle  的端口号
                .database("ORCL") // 监听的数据库
                .schemaList("SCOTT") // 监听的schema
                .tableList("SCOTT.USERS ,SCOTT.APPS") // 监听的表
                .username("system") // 用户名
                .password("Oracle123") // 密码 
                //    .deserializer(new JsonDebeziumDeserializationSchema()) // 将结果以json格式进行输出。
                .deserializer(new OracleDeserializationSchema ())         // 自定义的输出格式
                .build();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.addSource(sourceFunction)
           .print().setParallelism(1); // use parallelism 1 for sink to keep message ordering
        env.execute();
    }
自定义输出格式 - -> {schema:xx , table:xx, data:{xx:xx,yy:yy}}
import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import java.math.BigDecimal;
import java.util.LinkedHashMap;
import java.util.List;
public class OracleDeserializationSchema implements DebeziumDeserializationSchema<String> {
    @Override
    public void deserialize(SourceRecord record, Collector<String> out) throws Exception {
        JSONObject result = new JSONObject(new LinkedHashMap<>());
        /*得到schema 和  table*/
        String topic = record.topic();
        String[] fields = topic.split("\\.");
        result.put("schema", fields[1]);
        result.put("table", fields[2]);
        /*得到after数据*/
        Struct value = (Struct) record.value();
        Struct after = (Struct) value.get("after");
        JSONObject afterJson = new JSONObject(new LinkedHashMap<>());
        if (after != null) {
            Schema schema = after.schema();
            List<Field> fieldList = schema.fields();
            for (Field field : fieldList) {
                    /*
                         由于通过归档日志读取到的number类型的数据格式为:{scale=2,value=[B@6e0782b5}
                        scale :数值的精度,及小数点后几位;value:数值的二进制数组的格式
                         需要将改格式转化成实际数据库中存储的格式
                        主要思路是,将value的值转化为 long类型,将scale的值转化为int类型,通过BigDecimal.valueOf()方法得到存储的值
                     */
                if (after.get(field) instanceof Struct) {
                    Struct fieldValue = (Struct) after.get(field);
                    byte[] bytes = fieldValue.getBytes("value");
                    String l = Byte.toString(bytes[0]);
                    afterJson.put(field.name(), l);
                    Integer scale = fieldValue.getInt32("scale");
                    BigDecimal valueOf = BigDecimal.valueOf(Long.valueOf(l), scale);
                    afterJson.put(field.name(), valueOf.toString());
                } else {
                    afterJson.put(field.name(), after.get(field));
                }
            }
        }
        result.put("data", afterJson);
        out.collect(result.toJSONString());
    }
    @Override
    public TypeInformation<String> getProducedType() {
        return  BasicTypeInfo.STRING_TYPE_INFO;
    }
}
——参考链接。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。