flink cdc到doris
flink cdc同步到doris主类:
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.ververica.cdc.connectors.mysql.source.MySqlSource; import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema; import org.apache.doris.flink.cfg.DorisExecutionOptions; import org.apache.doris.flink.cfg.DorisOptions; import org.apache.doris.flink.cfg.DorisReadOptions; import org.apache.doris.flink.sink.DorisSink; import org.apache.doris.flink.sink.writer.SimpleStringSerializer; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Properties; import java.util.UUID; /*** * * Synchronize the full database through flink cdc * */ public class DatabaseFullSync { private static final Logger LOG = LoggerFactory.getLogger(DatabaseFullSync.class); private static String HOST = ""; private static String MYSQL_PASSWD = "password"; private static int MYSQL_PORT = 3306; private static int DORIS_PORT = 8030; private static String MYSQL_USER = "root"; private static String SYNC_DB = "test"; private static String SYNC_TBLS = "test.*"; private static String TARGET_DORIS_DB = "test"; public static void main(String[] args) throws Exception { MySqlSource<String> mySqlSource = MySqlSource.<String>builder() .hostname(HOST) .port(MYSQL_PORT) .databaseList(SYNC_DB) // set captured database .tableList(SYNC_TBLS) // set captured table .username(MYSQL_USER) .password(MYSQL_PASSWD) .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String .build(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // enable checkpoint env.enableCheckpointing(10000); DataStreamSource<String> cdcSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL CDC Source"); //get table list List<String> tableList = getTableList(); LOG.info("sync table list:{}",tableList); for(String tbl : tableList){ SingleOutputStreamOperator<String> filterStream = filterTableData(cdcSource, tbl); SingleOutputStreamOperator<String> cleanStream = clean(filterStream); DorisSink dorisSink = buildDorisSink(tbl); cleanStream.sinkTo(dorisSink).name("sink " + tbl); } env.execute("Full Database Sync "); } /** * Get real data * { * "before":null, * "after":{ * "id":1, * "name":"zhangsan-1", * "age":18 * }, * "source":{ * "db":"test", * "table":"test_1", * ... * }, * "op":"c", * ... * } * */ private static SingleOutputStreamOperator<String> clean(SingleOutputStreamOperator<String> source) { return source.flatMap(new FlatMapFunction<String,String>(){ @Override public void flatMap(String row, Collector<String> out) throws Exception { try{ JSONObject rowJson = JSON.parseObject(row); String op = rowJson.getString("op"); //history,insert,update if(Arrays.asList("r","c","u").contains(op)){ out.collect(rowJson.getJSONObject("after").toJSONString()); }else{ LOG.info("filter other op:{}",op); } }catch (Exception ex){ LOG.warn("filter other format binlog:{}",row); } } }); } /** * Divide according to tablename * */ private static SingleOutputStreamOperator<String> filterTableData(DataStreamSource<String> source, String table) { return source.filter(new FilterFunction<String>() { @Override public boolean filter(String row) throws Exception { try { JSONObject rowJson = JSON.parseObject(row); JSONObject source = rowJson.getJSONObject("source"); String tbl = source.getString("table"); return table.equals(tbl); }catch (Exception ex){ ex.printStackTrace(); return false; } } }); } /** * Get all MySQL tables that need to be synchronized * */ private static List<String> getTableList() { List<String> tables = new ArrayList<>(); String sql = "SELECT TABLE_SCHEMA,TABLE_NAME FROM information_schema.tables WHERE TABLE_SCHEMA = '" + SYNC_DB + "'"; List<JSONObject> tableList = JdbcUtil.executeQuery(HOST, MYSQL_PORT, MYSQL_USER, MYSQL_PASSWD, sql); for(JSONObject jsob : tableList){ String schemaName = jsob.getString("TABLE_SCHEMA"); String tblName = jsob.getString("TABLE_NAME"); String schemaTbl = schemaName + "." + tblName; if(schemaTbl.matches(SYNC_TBLS)){ tables.add(tblName); } } return tables; } /** * create doris sink * */ public static DorisSink buildDorisSink(String table){ DorisSink.Builder<String> builder = DorisSink.builder(); DorisOptions.Builder dorisBuilder = DorisOptions.builder(); dorisBuilder.setFenodes(HOST + ":" + DORIS_PORT) .setTableIdentifier(TARGET_DORIS_DB + "." + table) .setUsername("root") .setPassword(""); Properties pro = new Properties(); //json data format pro.setProperty("format", "json"); pro.setProperty("read_json_by_line", "true"); DorisExecutionOptions executionOptions = DorisExecutionOptions.builder() .setLabelPrefix("label-" + table + UUID.randomUUID()) //streamload label prefix, .setStreamLoadProp(pro).build(); builder.setDorisReadOptions(DorisReadOptions.builder().build()) .setDorisExecutionOptions(executionOptions) .setSerializer(new SimpleStringSerializer()) //serialize according to string .setDorisOptions(dorisBuilder.build()); return builder.build(); } }
import com.alibaba.fastjson.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.util.ArrayList; import java.util.List; public class JdbcUtil { static { try { Class.forName("com.mysql.jdbc.Driver"); } catch (ClassNotFoundException e) { e.printStackTrace(); } } private static final Logger LOG = LoggerFactory.getLogger(JdbcUtil.class); public static void main(String[] args) throws SQLException { } public static List<JSONObject> executeQuery(String hostUrl, int port, String user, String password, String sql){ List<JSONObject> beJson = new ArrayList<>(); String connectionUrl = String.format("jdbc:mysql://%s:%s/",hostUrl,port); Connection con = null; try { con = DriverManager.getConnection(connectionUrl,user,password); PreparedStatement ps = con.prepareStatement(sql); ResultSet rs = ps.executeQuery(); beJson = resultSetToJson(rs); } catch (SQLException e) { e.printStackTrace(); } catch (Exception e) { e.printStackTrace(); } finally { try { con.close(); } catch (Exception e) { } } return beJson; } private static List<JSONObject> resultSetToJson(ResultSet rs) throws SQLException { List<JSONObject> list = new ArrayList<>(); ResultSetMetaData metaData = rs.getMetaData(); int columnCount = metaData.getColumnCount(); while (rs.next()) { JSONObject jsonObj = new JSONObject(); for (int i = 1; i <= columnCount; i++) { String columnName =metaData.getColumnLabel(i); String value = rs.getString(columnName); jsonObj.put(columnName, value); } list.add(jsonObj); } return list; } }
flink cdc到starrocks
- Flink cdc 采集 mysql 数据
- 将 cdc 采集到的数据转为 json
- 从 json 中获取 数据库、表和数据
- 用数据库和表对数据做 key by
- 使用 process function 处理每个表的数据,用状态缓存数据,缓存数据达到一定量或者缓存了一定时间(用 timer 触发缓存时间触发的场景)StarRocks 写数据
- sink 中拼接数据 使用 Stream Load 往 StarRocks 写数据
主类 CdcToStarRocks
主要流程很简单: source -> map -> keyBy -> process -> sink,source 读取 mysql binlog(或者全量+增量),map 转换数据格式,keyBy 以数据库名 + 表名对数据分区,process 中对数据攒批,sink 输出数据到 StarRocks
import com.venn.source.mysql.cdc.CommonStringDebeziumDeserializationSchema; import com.ververica.cdc.connectors.mysql.source.MySqlSource; import com.ververica.cdc.connectors.mysql.table.StartupOptions; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.util.Properties; /** * mysql cdc demo * <p> * cdc 整库同步数据到 starrocks * <p> * 局限: * 1. 还未实现 starrocks 端表结构跟随 源端表结构同步变更 * 2. 为了保证效率,仅会在每一个表第一次来的时候判断目标段是否存在该表,如果已经判定该表不存在,后续直接忽略该表的数据变更 * 3. 部分不导入的表,只在sink 的时候做了过滤,前面的操作还是要继续,可以考虑在 反序列化活map中过滤掉目标库中不存在的表数据 */ public class CdcToStarRocks { // 每个批次最大条数和等待时间 private static int batchSize = 10000; private static long batchInterval = 10 *1000; public static void main(String[] args) throws Exception { String ip = "localhost"; int port = 3306; String db = "venn"; // String table = "venn.user_log"; String table = "venn.*"; String user = "root"; String pass = "123456"; String starrocksIp = ""; String starrocksPort = "29030"; String starrocksLoadPort = "28030"; String starrocksUser = "root"; String starrocksPass = "123456"; String starrocksDb = "test"; StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); MySqlSource<String> sourceFunction = MySqlSource.<String>builder() .hostname(ip) .port(port) // 获取两个数据库的所有表 .databaseList(db) .tableList(table) .username(user) .password(pass) .startupOptions(StartupOptions.latest()) // do not cache schema change // .includeSchemaChanges(false) // .startupOptions(StartupOptions.initial()) // 自定义 解析器,讲数据解析成 json .deserializer(new CommonStringDebeziumDeserializationSchema(ip, port)) .build(); env .fromSource(sourceFunction, WatermarkStrategy.noWatermarks(), "cdc") .name("source") .uid("source") // json 字符串转 CdcRecord .map(new CdcStarMapFunction()) .name("map") .keyBy( record -> record.getDb() + "_" + record.getTable()) .process(new CdcStarProcessFunction(batchSize, batchInterval)) .name("process") .uid("process") .addSink(new StarRocksSink(starrocksIp, starrocksPort, starrocksLoadPort, starrocksUser, starrocksPass, starrocksDb)) .name("sink"); env.execute("cdcToStarRocks"); } }
反序列化器 CommonStringDebeziumDeserializationSchema
反序列化器直接拿之前写的通用的 flink cdc 反序列化器过来,继承 DebeziumDeserializationSchema,主要是从数据中获取 数据库、表、操作类型和数据,需求特别注意以下几点:
- insert 类型的操心,只需要获取 after 中的数据
- update 类型的操作,需要同时解析 before 和 after 的数据,before 是修改前的,after 是修改后的,如果不需要修改前的,可以只获取 after
- delete 类型的操作,需要获取 before
- 如果有 ddl 操作,需要特殊处理(本次不包含)
import com.ververica.cdc.debezium.DebeziumDeserializationSchema; import com.google.gson.JsonObject; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.util.Collector; import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord; /** * deserialize debezium format binlog */ public class CommonStringDebeziumDeserializationSchema implements DebeziumDeserializationSchema<String> { private String host; private int port; public CommonStringDebeziumDeserializationSchema(String host, int port) { this.host = host; this.port = port; } public void deserialize(SourceRecord record, Collector<String> out) { JsonObject jsonObject = new JsonObject(); String binlog = record.sourceOffset().get("file").toString(); String offset = record.sourceOffset().get("pos").toString(); String ts_sec = record.sourceOffset().get("ts_sec").toString(); // System.out.println("binlog : " + binlog + ", offset = " + offset); // todo get schame change jsonObject.addProperty("host", host); // add meta jsonObject.addProperty("binlog", binlog); jsonObject.addProperty("offset", offset); jsonObject.addProperty("ts_sec", ts_sec); jsonObject.addProperty("port", port); jsonObject.addProperty("file", (String) record.sourceOffset().get("file")); jsonObject.addProperty("pos", (Long) record.sourceOffset().get("pos")); jsonObject.addProperty("ts_sec", (Long) record.sourceOffset().get("ts_sec")); String[] name = record.valueSchema().name().split("\\."); jsonObject.addProperty("db", name[1]); jsonObject.addProperty("table", name[2]); Struct value = ((Struct) record.value()); String operatorType = value.getString("op"); jsonObject.addProperty("operator_type", operatorType); // c : create, u: update, d: delete, r: read // insert update if (!"d".equals(operatorType)) { Struct after = value.getStruct("after"); JsonObject afterJsonObject = parseRecord(after); jsonObject.add("after", afterJsonObject); } // update & delete if ("u".equals(operatorType) || "d".equals(operatorType)) { Struct source = value.getStruct("before"); JsonObject beforeJsonObject = parseRecord(source); jsonObject.add("before", beforeJsonObject); } jsonObject.addProperty("parse_time", System.currentTimeMillis() / 1000); out.collect(jsonObject.toString()); } private JsonObject parseRecord(Struct after) { JsonObject jo = new JsonObject(); for (Field field : after.schema().fields()) { switch ((field.schema()).type()) { case INT8: int resultInt8 = after.getInt8(field.name()); jo.addProperty(field.name(), resultInt8); break; case INT64: Long resultInt = after.getInt64(field.name()); jo.addProperty(field.name(), resultInt); break; case FLOAT32: Float resultFloat32 = after.getFloat32(field.name()); jo.addProperty(field.name(), resultFloat32); break; case FLOAT64: Double resultFloat64 = after.getFloat64(field.name()); jo.addProperty(field.name(), resultFloat64); break; case BYTES: // json ignore byte column // byte[] resultByte = after.getBytes(field.name()); // jo.addProperty(field.name(), String.valueOf(resultByte)); break; case STRING: String resultStr = after.getString(field.name()); jo.addProperty(field.name(), resultStr); break; default: } } return jo; } public TypeInformation<String> getProducedType() { return BasicTypeInfo.STRING_TYPE_INFO; } }
转换函数 CdcStarMapFunction
CdcStarMapFunction 比较简单,将 json 字符串,转成 CdcRecord 类型的对象,只获取了需要的 数据库、表、操作类型和数据。
获取数据时,insert 和 update 只获取 after 的值
import java.util.LinkedHashMap; import java.util.Map; import lombok.Data; /** * cdcRecord save */ @Data public class CdcRecord { private String db; private String table; private String op; private Map<String, String> data = new LinkedHashMap<>(); }
import com.google.gson.JsonElement; import com.google.gson.JsonObject; import com.google.gson.JsonParser; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.configuration.Configuration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Map; public class CdcStarMapFunction extends RichMapFunction<String, CdcRecord> { private final static Logger LOG = LoggerFactory.getLogger(CdcStarMapFunction.class); private JsonParser parser; @Override public void open(Configuration parameters) throws Exception { parser = new JsonParser(); } @Override public CdcRecord map(String element) throws Exception { LOG.debug("data : {}" , element ); JsonObject object = parser.parse(element).getAsJsonObject(); String db = object.get("db").getAsString(); String table = object.get("table").getAsString(); String op = object.get("operator_type").getAsString(); CdcRecord record = new CdcRecord(db, table, op); // insert/update String dataLocation = "after"; if("d".equals(op)){ // if op is delete, get before dataLocation = "before"; } JsonObject data = object.get(dataLocation).getAsJsonObject(); for(Map.Entry<String, JsonElement> entry: data.entrySet()){ String columnName = entry.getKey(); String columnValue; JsonElement value = entry.getValue(); if(!value.isJsonNull()){ // if column value is not null, get as string columnValue = value.getAsString(); // put column name/value to record.data record.getData().put(columnName, columnValue); } } return record; } }
处理函数 CdcStarProcessFunction
CdcStarProcessFunction 有三部分逻辑:
- 三个状态cacheTimer、cacheSize、cache,分别存下一次timer 触发时间、缓存的数据条数、缓存的数据
- process 处理每个表的数据,每个表的数据第一次到的时候,基于当前时间 + batchInterval,注册下次时间触发的 timer。数据存储到 cache 中,如果数据量超过预定的 batchSize,触发 flushData 方法往下游输出数据,并删除之前注册的定时器,清理状态
- timer 触发 flushData 方法往下游输出数据,清理状态
import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.util.Collector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Iterator; import java.util.List; public class CdcStarProcessFunction extends KeyedProcessFunction<String, CdcRecord, List<CdcRecord>> { private final static Logger LOG = LoggerFactory.getLogger(CdcStarProcessFunction.class); private int batchSize; private long batchInterval; // next timer time private ValueState<Long> cacheTimer; // current cache size private ValueState<Integer> cacheSize; // cache data private ListState<CdcRecord> cache; public CdcStarProcessFunction(int batchSize, long batchInterval) { this.batchSize = batchSize; this.batchInterval = batchInterval; } @Override public void open(Configuration parameters) throws Exception { ListStateDescriptor cacheDescriptor = new ListStateDescriptor<CdcRecord>("cache", TypeInformation.of(CdcRecord.class)); cache = getRuntimeContext().getListState(cacheDescriptor); ValueStateDescriptor cacheSizeDescriptor = new ValueStateDescriptor<Integer>("cacheSize", Integer.class); cacheSize = getRuntimeContext().getState(cacheSizeDescriptor); ValueStateDescriptor cacheTimerDescriptor = new ValueStateDescriptor<Long>("cacheTimer", Long.class); cacheTimer = getRuntimeContext().getState(cacheTimerDescriptor); } @Override public void processElement(CdcRecord element, KeyedProcessFunction<String, CdcRecord, List<CdcRecord>>.Context ctx, Collector<List<CdcRecord>> out) throws Exception { // cache size + 1 if (cacheSize.value() != null) { cacheSize.update(cacheSize.value() + 1); } else { cacheSize.update(1); // add timer for interval flush long nextTimer = System.currentTimeMillis() + batchInterval; LOG.debug("register timer : {} , key : {}", nextTimer, ctx.getCurrentKey()); cacheTimer.update(nextTimer); ctx.timerService().registerProcessingTimeTimer(nextTimer); } // add data to cache state cache.add(element); // cache size max than batch Size if (cacheSize.value() >= batchSize) { // remove next timer long nextTimer = cacheTimer.value(); LOG.debug("{} remove timer, key : {}", nextTimer, ctx.getCurrentKey()); ctx.timerService().deleteProcessingTimeTimer(nextTimer); // flush data to down stream flushData(out); } } /** * flush data to down stream */ private void flushData(Collector<List<CdcRecord>> out) throws Exception { List<CdcRecord> tmpCache = new ArrayList<>(); Iterator<CdcRecord> it = cache.get().iterator(); while (it.hasNext()) { tmpCache.add(it.next()); } if (tmpCache.size() > 0) { out.collect(tmpCache); // finish flush all cache data, clear state cache.clear(); cacheSize.clear(); cacheTimer.clear(); } } @Override public void onTimer(long timestamp, KeyedProcessFunction<String, CdcRecord, List<CdcRecord>>.OnTimerContext ctx, Collector<List<CdcRecord>> out) throws Exception { LOG.info("{} trigger timer to flush data", ctx.getCurrentKey(), timestamp); // batch interval trigger flush data flushData(out); } @Override public void close() throws Exception { } }
输出函数 StarRocksSink
StarRocksSink 稍微复杂一点,需要基于数据中的表名,去目标数据库中获取对应的表结构(为了避免每次查询数据库,将获取到的表结构存到内存中),基于目标表的字段顺序从数据中获取对应列的值,拼接上数据的操作类型。
- StarRocksSink 在往 StarRocks 写数据的时候,实现了 upsert 和 delete 操作,需要在数据中拼接上 0/1,0 代表 UPSERT 操作,1 代表 DELETE 操作
- 见参考文档1
invoke 方法
StarRocksSink 的核心逻辑都在 invoke 方法中,逻辑如下:
- 从数据中获取数据库和表,拼接成 key
- 获取目标表的 schema(整库映射,源端和目标端表名一致),先从缓存中获取,如果不存在就从数据库中获取
- 组装数据
- 拼接 load url
- 用 http 方式往 StarRocks 写数据
loadTargetTableSchema 方法
执行 desc db.table
获取目标表的表结构,组装成两种结果:将所有列名用 "," 拼接成字符串,再拼接 "__op" 用于 http header 请求中标识数据的列;将所有列按顺序添加到 list 中,用于从源数据中获取对应列的数据
parseUploadData 方法
用目标表列顺序,从数据中获取对应列的值,使用列分隔符拼接数据,最后基于操作类型拼接 0/1,删除拼接 1,其他类型拼接 0
doHttp 方法
用 http 的方式往 StarRocks 中写数据,没什么特别的,忽略
import org.apache.commons.codec.binary.Base64; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.http.HttpException; import org.apache.http.HttpHeaders; import org.apache.http.HttpRequest; import org.apache.http.HttpRequestInterceptor; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpPut; import org.apache.http.entity.StringEntity; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.DefaultRedirectStrategy; import org.apache.http.impl.client.HttpClientBuilder; import org.apache.http.impl.client.HttpClients; import org.apache.http.protocol.HTTP; import org.apache.http.protocol.HttpContext; import org.apache.http.util.EntityUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.sql.*; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; public class StarRocksSink extends RichSinkFunction<List<CdcRecord>> { private final static Logger LOG = LoggerFactory.getLogger(StarRocksSink.class); public final static String COL_SEP = "\\\\x01"; public final static String ROW_SEP = "\\\\x02"; public final static String NULL_COL = "\\N"; private String ip; private String port; private String loadPort; private String user; private String pass; private String db; private Connection connection; private Map<String, String> spliceColumnMap = new HashMap<>(); private Map<String, List<String>> columnMap = new HashMap<>(); public StarRocksSink() { } public StarRocksSink(String ip, String port, String loadPort, String user, String pass, String db) { this.ip = ip; this.port = port; this.loadPort = loadPort; this.user = user; this.pass = pass; this.db = db; } @Override public void open(Configuration parameters) throws Exception { reConnect(); } @Override public void invoke(List<CdcRecord> element, Context context) throws Exception { LOG.info("write batch size: " + element.size()); if(element == null || element.size() ==0){ LOG.info("ignore empty element"); return; } // use StarRocks db name // String db = cache.get(0).getDb(); String table = element.get(0).getTable(); String key = db + "_" + table; // get table schema List<String> columnList; if (!columnMap.containsKey(key)) { // db.table is first coming, load column, put to spliceColumnMap & columnMap loadTargetTableSchema(key, db, table); } String columns = spliceColumnMap.get(key); columnList = columnMap.get(key); if (columnList.size() == 0) { LOG.info("{}.{} not exists in target starrocks, ingore data change", db, table); } // make up data String data = parseUploadData(element, columnList); final String loadUrl = String.format("http://%s:%s/api/%s/%s/_stream_load", ip, loadPort, db, table); String label = db + "_" + table + "_" + System.currentTimeMillis(); // send data to starrocks doHttp(loadUrl, data, label, columns); } /** * http send data to starrocks */ private void doHttp(String loadUrl, String data, String label, String columns) throws IOException, SQLException { final HttpClientBuilder httpClientBuilder = HttpClients .custom() .setRedirectStrategy(new DefaultRedirectStrategy() { @Override protected boolean isRedirectable(String method) { return true; } }) .addInterceptorFirst(new ContentLengthHeaderRemover()); try (CloseableHttpClient client = httpClientBuilder.build()) { HttpPut put = new HttpPut(loadUrl); StringEntity entity = new StringEntity(data, "UTF-8"); put.setHeader(HttpHeaders.EXPECT, "100-continue"); put.setHeader(HttpHeaders.AUTHORIZATION, basicAuthHeader(user, pass)); // the label header is optional, not necessary // use label header can ensure at most once semantics put.setHeader("label", label); put.setHeader("columns", columns); put.setHeader("row_delimiter", ROW_SEP); put.setHeader("column_separator", COL_SEP); put.setEntity(entity); try (CloseableHttpResponse response = client.execute(put)) { String loadResult = ""; if (response.getEntity() != null) { loadResult = EntityUtils.toString(response.getEntity()); } final int statusCode = response.getStatusLine().getStatusCode(); // statusCode 200 just indicates that starrocks be service is ok, not stream load // you should see the output content to find whether stream load is success if (statusCode != 200) { throw new IOException( String.format("Stream load failed, statusCode=%s load result=%s", statusCode, loadResult)); } } } } private String basicAuthHeader(String username, String password) { final String tobeEncode = username + ":" + password; byte[] encoded = Base64.encodeBase64(tobeEncode.getBytes(StandardCharsets.UTF_8)); return "Basic " + new String(encoded); } private String parseUploadData(List<CdcRecord> cache, List<String> columnList) { StringBuilder builder = new StringBuilder(); for (CdcRecord element : cache) { Map<String, String> data = element.getData(); for (String column : columnList) { if (data.containsKey(column)) { builder.append(data.get(column)).append(COL_SEP); } else { // if target column not exists in source data, set as null builder.append(NULL_COL).append(COL_SEP); } } // add __op if ("d".equals(element.getOp())) { // delete builder.append("1"); } else { // upsert builder.append("0"); } // add row separator builder.append(ROW_SEP); } // remove last row sep builder = builder.delete(builder.length() - 5, builder.length()); String data = builder.toString(); return data; } /** * load table schema, parse to http column and column list for load source data */ private void loadTargetTableSchema(String key, String db, String table) throws SQLException { List<String> columnList = new ArrayList<>(); StringBuilder builer = new StringBuilder(); try { // load table schema PreparedStatement insertPS = connection.prepareStatement("desc " + db + "." + table); ResultSet result = insertPS.executeQuery(); while (result.next()) { String column = result.getString(1); builer.append(column).append(","); columnList.add(column); } } catch (SQLException e) { LOG.warn("load {}.{} schema error. {}", db, table, e.getStackTrace()); } builer.append("__op"); String columns = builer.toString(); spliceColumnMap.put(key, columns); columnMap.put(key, columnList); } /** * reconnect to starrocks * * @throws SQLException */ private void reConnect() throws SQLException { String driver = "jdbc:mysql://" + ip + ":" + port; if (connection == null || connection.isClosed()) { connection = DriverManager.getConnection(driver, user, pass); } } @Override public void finish() throws Exception { LOG.info("finish"); } @Override public void close() throws Exception { LOG.info("close..."); connection.close(); } private static class ContentLengthHeaderRemover implements HttpRequestInterceptor { @Override public void process(HttpRequest request, HttpContext context) throws HttpException, IOException { // fighting org.apache.http.protocol.RequestContent's ProtocolException("Content-Length header already present"); request.removeHeaders(HTTP.CONTENT_LEN); } } }
- 还未实现 starrocks 端表结构跟随 源端表结构同步变更
- 为了保证效率,仅会在每一个表第一次来的时候判断目标段是否存在该表,如果已经判定该表不存在,后续直接忽略该表的数据变更
- 部分不导入的表,只在sink 的时候做了过滤,前面的操作还是要继续,可以考虑在 反序列化活map中过滤掉目标库中不存在的表数据
flink cdc到table store
package name.lijiaqi; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.SqlDialect; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; public class MysqlToHudiExample { public static void main(String[] args) throws Exception { EnvironmentSettings fsSettings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, fsSettings); tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); // 数据源表 String sourceDDL = "CREATE TEMPORARY TABLE ods_lineitem (\n" + " l_orderkey INT NOT NULL,\n" + " l_partkey INT NOT NULL,\n" + " l_suppkey INT NOT NULL,\n" + " l_linenumber INT NOT NULL,\n" + " l_quantity DECIMAL(15, 2) NOT NULL,\n" + " l_extendedprice DECIMAL(15, 2) NOT NULL,\n" + " l_discount DECIMAL(15, 2) NOT NULL,\n" + " l_tax DECIMAL(15, 2) NOT NULL,\n" + " l_returnflag CHAR(1) NOT NULL,\n" + " l_linestatus CHAR(1) NOT NULL,\n" + " l_shipdate DATE NOT NULL,\n" + " l_commitdate DATE NOT NULL,\n" + " l_receiptdate DATE NOT NULL,\n" + " l_shipinstruct CHAR(25) NOT NULL,\n" + " l_shipmode CHAR(10) NOT NULL,\n" + " l_comment VARCHAR(44) NOT NULL,\n" + " PRIMARY KEY (l_orderkey, l_linenumber) NOT ENFORCED \n" + ") WITH (\n" + " 'connector' = 'mysql-cdc',\n" + " 'hostname' = '',\n" + -- 如果想使用 host,可以修改宿主机 /etc/hosts 加入 mysql.docker.internal " 'port' = '3307',\n" + " 'username' = 'flink',\n" + " 'password' = 'flink',\n" + " 'database-name' = 'tpch_s10',\n" + " 'table-name' = 'lineitem' \n" + ");" String tableSourceDDL= "CREATE CATALOG `table_store` WITH (\n" + "'type' = 'table-store',\n" + "'warehouse' = '/tmp/table-store-101'\n" + ");\n" + "USE CATALOG `table_store`;" // 输出目标表 String sinkDWDDDL = "CREATE TABLE IF NOT EXISTS dwd_lineitem (\n" + " l_orderkey INT NOT NULL,\n" + " l_partkey INT NOT NULL,\n" + " l_suppkey INT NOT NULL,\n" + " l_linenumber INT NOT NULL,\n" + " l_quantity DECIMAL(15, 2) NOT NULL,\n" + " l_extendedprice DECIMAL(15, 2) NOT NULL,\n" + " l_discount DECIMAL(15, 2) NOT NULL,\n" + " l_tax DECIMAL(15, 2) NOT NULL,\n" + " l_returnflag CHAR(1) NOT NULL,\n" + " l_linestatus CHAR(1) NOT NULL,\n" + " l_shipdate DATE NOT NULL,\n" + " l_commitdate DATE NOT NULL,\n" + " l_receiptdate DATE NOT NULL,\n" + " l_shipinstruct CHAR(25) NOT NULL,\n" + " l_shipmode CHAR(10) NOT NULL,\n" + " l_comment VARCHAR(44) NOT NULL,\n" + " l_year BIGINT NOT NULL,\n" + " l_month BIGINT NOT NULL,\n" + " PRIMARY KEY (l_orderkey, l_linenumber, l_year, l_month) NOT ENFORCED \n" + " ) PARTITIONED BY (l_year, l_month) WITH ( " -- 每个 partition 下设置 2 个 bucket " 'bucket' = '2',\n" + " -- 设置 changelog-producer 为 'input',这会使得上游 CDC Source 不丢弃 update_before,并且下游消费 dwd_lineitem 时没有 changelog-normalize 节点 " 'changelog-producer' = 'input' \n" + " );" // 输出目标表 String sinkADSDDL = "CREATE TABLE IF NOT EXISTS ads_pricing_summary (\n" + "l_returnflag CHAR(1) NOT NULL,\n" + "l_linestatus CHAR(1) NOT NULL,\n" + "sum_quantity DOUBLE NOT NULL,\n" + "sum_base_price DOUBLE NOT NULL,\n" + "sum_discount_price DOUBLE NOT NULL,\n" + "sum_charge_vat_inclusive DOUBLE NOT NULL,\n" + "avg_quantity DOUBLE NOT NULL,\n" + "avg_base_price DOUBLE NOT NULL,\n" + "avg_discount DOUBLE NOT NULL,\n" + "count_order BIGINT NOT NULL \n" + ") WITH ( \n" + "'bucket' = '2'\n" + ");" // 简单的聚合处理 String transformSQL1 = "INSERT INTO dwd_lineitem SELECT l_orderkey,l_partkey,l_suppkey,l_linenumber,l_quantity,l_extendedprice,l_discount,\n" + "l_tax,l_returnflag,l_linestatus,l_shipdate,l_commitdate,l_receiptdate,l_shipinstruct,l_shipmode,l_comment,\n" + "YEAR(l_shipdate) AS l_year,MONTH(l_shipdate) AS l_month FROM ods_lineitem;" String transformSQL2= "INSERT INTO ads_pricing_summary SELECT l_returnflag,l_linestatus,SUM(l_quantity) AS sum_quantity,\n" + "SUM(l_extendedprice) AS sum_base_price,SUM(l_extendedprice * (1-l_discount)) AS sum_discount_price,\n" + "SUM(l_extendedprice * (1-l_discount) * (1+l_tax)) AS sum_charge_vat_inclusive,AVG(l_quantity) AS avg_quantity,\n" + "AVG(l_extendedprice) AS avg_base_price,AVG(l_discount) AS avg_discount,COUNT(*) AS count_order FROM dwd_lineitem \n" + "WHERE (l_year < 1998 OR (l_year = 1998 AND l_month<= 9)) AND l_shipdate <= DATE '1998-12-01' - INTERVAL '90' DAY \n" + "GROUP BY l_returnflag,l_linestatus;" // 插入hudi表 tableEnv.executeSql(tableSourceDDL); tableEnv.executeSql(sourceDDL); tableEnv.executeSql(sinkDWDDDL); tableEnv.executeSql(sinkADSDDL); TableResult result = tableEnv.executeSql(transformSQL1); TableResult result = tableEnv.executeSql(transformSQL2); env.execute("mysql-to-tableSource"); } }