利用flink的DatStream API 创建任务
public static void main(String[] args) throws Exception {
SourceFunction<String> sourceFunction = OracleSource.<String>builder()
.url("jdbc:oracle:thin:@hadoop103:1521:ORCL") // 用来连接oracle的url
.port(1521) // oracle 的端口号
.database("ORCL") // 监听的数据库
.schemaList("SCOTT") // 监听的schema
.tableList("SCOTT.USERS ,SCOTT.APPS") // 监听的表
.username("system") // 用户名
.password("Oracle123") // 密码
// .deserializer(new JsonDebeziumDeserializationSchema()) // 将结果以json格式进行输出。
.deserializer(new OracleDeserializationSchema ()) // 自定义的输出格式
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.addSource(sourceFunction)
.print().setParallelism(1); // use parallelism 1 for sink to keep message ordering
env.execute();
}
自定义输出格式 - -> {schema:xx , table:xx, data:{xx:xx,yy:yy}}
import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import java.math.BigDecimal;
import java.util.LinkedHashMap;
import java.util.List;
public class OracleDeserializationSchema implements DebeziumDeserializationSchema<String> {
@Override
public void deserialize(SourceRecord record, Collector<String> out) throws Exception {
JSONObject result = new JSONObject(new LinkedHashMap<>());
/*得到schema 和 table*/
String topic = record.topic();
String[] fields = topic.split("\\.");
result.put("schema", fields[1]);
result.put("table", fields[2]);
/*得到after数据*/
Struct value = (Struct) record.value();
Struct after = (Struct) value.get("after");
JSONObject afterJson = new JSONObject(new LinkedHashMap<>());
if (after != null) {
Schema schema = after.schema();
List<Field> fieldList = schema.fields();
for (Field field : fieldList) {
/*
由于通过归档日志读取到的number类型的数据格式为:{scale=2,value=[B@6e0782b5}
scale :数值的精度,及小数点后几位;value:数值的二进制数组的格式
需要将改格式转化成实际数据库中存储的格式
主要思路是,将value的值转化为 long类型,将scale的值转化为int类型,通过BigDecimal.valueOf()方法得到存储的值
*/
if (after.get(field) instanceof Struct) {
Struct fieldValue = (Struct) after.get(field);
byte[] bytes = fieldValue.getBytes("value");
String l = Byte.toString(bytes[0]);
afterJson.put(field.name(), l);
Integer scale = fieldValue.getInt32("scale");
BigDecimal valueOf = BigDecimal.valueOf(Long.valueOf(l), scale);
afterJson.put(field.name(), valueOf.toString());
} else {
afterJson.put(field.name(), after.get(field));
}
}
}
result.put("data", afterJson);
out.collect(result.toJSONString());
}
@Override
public TypeInformation<String> getProducedType() {
return BasicTypeInfo.STRING_TYPE_INFO;
}
}
——参考链接。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。