Flink cdc到doris,starrocks,table store

本文涉及的产品
实时计算 Flink 版,1000CU*H 3个月
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS PostgreSQL,高可用系列 2核4GB
简介: Flink cdc到doris,starrocks,table store


flink cdc到doris


flink cdc同步到doris主类:

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.sink.DorisSink;
import org.apache.doris.flink.sink.writer.SimpleStringSerializer;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.UUID;
/***
 *
 * Synchronize the full database through flink cdc
 *
 */
public class DatabaseFullSync {
    private static final Logger LOG = LoggerFactory.getLogger(DatabaseFullSync.class);
    private static String HOST = "127.0.0.1";
    private static String MYSQL_PASSWD = "password";
    private static int MYSQL_PORT = 3306;
    private static int DORIS_PORT = 8030;
    private static String MYSQL_USER = "root";
    private static String SYNC_DB = "test";
    private static String SYNC_TBLS = "test.*";
    private static String TARGET_DORIS_DB = "test";
    public static void main(String[] args) throws Exception {
        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
            .hostname(HOST)
            .port(MYSQL_PORT)
            .databaseList(SYNC_DB) // set captured database
            .tableList(SYNC_TBLS) // set captured table
            .username(MYSQL_USER)
            .password(MYSQL_PASSWD)
            .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
            .build();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        // enable checkpoint
        env.enableCheckpointing(10000);
        DataStreamSource<String> cdcSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL CDC Source");
        //get table list
        List<String> tableList = getTableList();
        LOG.info("sync table list:{}",tableList);
        for(String tbl : tableList){
            SingleOutputStreamOperator<String> filterStream = filterTableData(cdcSource, tbl);
            SingleOutputStreamOperator<String> cleanStream = clean(filterStream);
            DorisSink dorisSink = buildDorisSink(tbl);
            cleanStream.sinkTo(dorisSink).name("sink " + tbl);
        }
        env.execute("Full Database Sync ");
    }
    /**
     * Get real data
     * {
     *     "before":null,
     *     "after":{
     *         "id":1,
     *         "name":"zhangsan-1",
     *         "age":18
     *     },
     *     "source":{
     *         "db":"test",
     *         "table":"test_1",
     *         ...
     *     },
     *     "op":"c",
     *     ...
     * }
     * */
    private static SingleOutputStreamOperator<String> clean(SingleOutputStreamOperator<String> source) {
        return source.flatMap(new FlatMapFunction<String,String>(){
            @Override
            public void flatMap(String row, Collector<String> out) throws Exception {
                try{
                    JSONObject rowJson = JSON.parseObject(row);
                    String op = rowJson.getString("op");
                    //history,insert,update
                    if(Arrays.asList("r","c","u").contains(op)){
                        out.collect(rowJson.getJSONObject("after").toJSONString());
                    }else{
                        LOG.info("filter other op:{}",op);
                    }
                }catch (Exception ex){
                    LOG.warn("filter other format binlog:{}",row);
                }
            }
        });
    }
    /**
     * Divide according to tablename
     * */
    private static SingleOutputStreamOperator<String> filterTableData(DataStreamSource<String> source, String table) {
        return source.filter(new FilterFunction<String>() {
            @Override
            public boolean filter(String row) throws Exception {
                try {
                    JSONObject rowJson = JSON.parseObject(row);
                    JSONObject source = rowJson.getJSONObject("source");
                    String tbl = source.getString("table");
                    return table.equals(tbl);
                }catch (Exception ex){
                    ex.printStackTrace();
                    return false;
                }
            }
        });
    }
    /**
     * Get all MySQL tables that need to be synchronized
     * */
    private static List<String> getTableList() {
        List<String> tables = new ArrayList<>();
        String sql = "SELECT TABLE_SCHEMA,TABLE_NAME FROM information_schema.tables WHERE TABLE_SCHEMA = '" + SYNC_DB + "'";
        List<JSONObject> tableList = JdbcUtil.executeQuery(HOST, MYSQL_PORT, MYSQL_USER, MYSQL_PASSWD, sql);
        for(JSONObject jsob : tableList){
            String schemaName = jsob.getString("TABLE_SCHEMA");
            String tblName = jsob.getString("TABLE_NAME");
            String schemaTbl = schemaName  + "." + tblName;
            if(schemaTbl.matches(SYNC_TBLS)){
                tables.add(tblName);
            }
        }
        return tables;
    }
    /**
     * create doris sink
     * */
    public static DorisSink buildDorisSink(String table){
        DorisSink.Builder<String> builder = DorisSink.builder();
        DorisOptions.Builder dorisBuilder = DorisOptions.builder();
        dorisBuilder.setFenodes(HOST + ":" + DORIS_PORT)
            .setTableIdentifier(TARGET_DORIS_DB + "." + table)
            .setUsername("root")
            .setPassword("");
        Properties pro = new Properties();
        //json data format
        pro.setProperty("format", "json");
        pro.setProperty("read_json_by_line", "true");
        DorisExecutionOptions executionOptions = DorisExecutionOptions.builder()
            .setLabelPrefix("label-" + table + UUID.randomUUID()) //streamload label prefix,
            .setStreamLoadProp(pro).build();
        builder.setDorisReadOptions(DorisReadOptions.builder().build())
            .setDorisExecutionOptions(executionOptions)
            .setSerializer(new SimpleStringSerializer()) //serialize according to string
            .setDorisOptions(dorisBuilder.build());
        return builder.build();
    }
}

JdbcUtil类:

import com.alibaba.fastjson.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
public class JdbcUtil {
    static {
        try {
            Class.forName("com.mysql.jdbc.Driver");
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        }
    }
    private static final Logger LOG = LoggerFactory.getLogger(JdbcUtil.class);
    public static void main(String[] args) throws SQLException {
    }
    public static List<JSONObject> executeQuery(String hostUrl, int port, String user, String password, String sql){
        List<JSONObject> beJson = new ArrayList<>();
        String connectionUrl = String.format("jdbc:mysql://%s:%s/",hostUrl,port);
        Connection con = null;
        try {
            con = DriverManager.getConnection(connectionUrl,user,password);
            PreparedStatement ps = con.prepareStatement(sql);
            ResultSet rs = ps.executeQuery();
            beJson = resultSetToJson(rs);
        } catch (SQLException e) {
            e.printStackTrace();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                con.close();
            } catch (Exception e) {
            }
        }
        return beJson;
    }
    private static List<JSONObject> resultSetToJson(ResultSet rs) throws SQLException {
        List<JSONObject> list = new ArrayList<>();
        ResultSetMetaData metaData = rs.getMetaData();
        int columnCount = metaData.getColumnCount();
        while (rs.next()) {
            JSONObject jsonObj = new JSONObject();
            for (int i = 1; i <= columnCount; i++) {
                String columnName =metaData.getColumnLabel(i);
                String value = rs.getString(columnName);
                jsonObj.put(columnName, value);
            }
            list.add(jsonObj);
        }
        return list;
    }
}


flink cdc到starrocks


主要实现的流程:

  1. Flink cdc 采集 mysql 数据
  2. 将 cdc 采集到的数据转为 json
  3. 从 json 中获取 数据库、表和数据
  4. 用数据库和表对数据做 key by
  5. 使用 process function 处理每个表的数据,用状态缓存数据,缓存数据达到一定量或者缓存了一定时间(用 timer 触发缓存时间触发的场景)StarRocks 写数据
  6. sink 中拼接数据 使用 Stream Load 往 StarRocks 写数据


主类 CdcToStarRocks

       主要流程很简单: source -> map -> keyBy -> process -> sink,source 读取 mysql binlog(或者全量+增量),map 转换数据格式,keyBy 以数据库名 + 表名对数据分区,process 中对数据攒批,sink 输出数据到 StarRocks

import com.venn.source.mysql.cdc.CommonStringDebeziumDeserializationSchema;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Properties;
/**
 * mysql cdc demo
 * <p>
 * cdc 整库同步数据到 starrocks
 * <p>
 * 局限:
 * 1. 还未实现 starrocks 端表结构跟随 源端表结构同步变更
 * 2. 为了保证效率,仅会在每一个表第一次来的时候判断目标段是否存在该表,如果已经判定该表不存在,后续直接忽略该表的数据变更
 * 3. 部分不导入的表,只在sink 的时候做了过滤,前面的操作还是要继续,可以考虑在 反序列化活map中过滤掉目标库中不存在的表数据
 */
public class CdcToStarRocks {
    // 每个批次最大条数和等待时间
    private static int batchSize = 10000;
    private static long batchInterval = 10 *1000;
    public static void main(String[] args) throws Exception {
        String ip = "localhost";
        int port = 3306;
        String db = "venn";
//        String table = "venn.user_log";
        String table = "venn.*";
        String user = "root";
        String pass = "123456";
        String starrocksIp = "10.201.0.230";
        String starrocksPort = "29030";
        String starrocksLoadPort = "28030";
        String starrocksUser = "root";
        String starrocksPass = "123456";
        String starrocksDb = "test";
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        MySqlSource<String> sourceFunction = MySqlSource.<String>builder()
                .hostname(ip)
                .port(port)
                // 获取两个数据库的所有表
                .databaseList(db)
                .tableList(table)
                .username(user)
                .password(pass)
                .startupOptions(StartupOptions.latest())
                // do not cache schema change
//                .includeSchemaChanges(false)
//                .startupOptions(StartupOptions.initial())
                // 自定义 解析器,讲数据解析成 json
                .deserializer(new CommonStringDebeziumDeserializationSchema(ip, port))
                .build();
        env
                .fromSource(sourceFunction, WatermarkStrategy.noWatermarks(), "cdc")
                .name("source")
                .uid("source")
//                 json 字符串转 CdcRecord
                .map(new CdcStarMapFunction())
                .name("map")
                .keyBy(  record -> record.getDb() + "_" + record.getTable())
                .process(new CdcStarProcessFunction(batchSize, batchInterval))
                .name("process")
                .uid("process")
                .addSink(new StarRocksSink(starrocksIp, starrocksPort, starrocksLoadPort, starrocksUser, starrocksPass, starrocksDb))
                .name("sink");
        env.execute("cdcToStarRocks");
    }
}

反序列化器 CommonStringDebeziumDeserializationSchema

反序列化器直接拿之前写的通用的 flink cdc 反序列化器过来,继承 DebeziumDeserializationSchema,主要是从数据中获取 数据库、表、操作类型和数据,需求特别注意以下几点:

  1. insert 类型的操心,只需要获取 after 中的数据
  2. update 类型的操作,需要同时解析 before 和 after 的数据,before 是修改前的,after 是修改后的,如果不需要修改前的,可以只获取 after
  3. delete 类型的操作,需要获取 before
  4. 如果有 ddl 操作,需要特殊处理(本次不包含)
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.google.gson.JsonObject;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
/**
 * deserialize debezium format binlog
 */
public class CommonStringDebeziumDeserializationSchema implements DebeziumDeserializationSchema<String> {
    private String host;
    private int port;
    public CommonStringDebeziumDeserializationSchema(String host, int port) {
        this.host = host;
        this.port = port;
    }
    public void deserialize(SourceRecord record, Collector<String> out) {
        JsonObject jsonObject = new JsonObject();
        String binlog = record.sourceOffset().get("file").toString();
        String offset = record.sourceOffset().get("pos").toString();
        String ts_sec = record.sourceOffset().get("ts_sec").toString();
//        System.out.println("binlog : " + binlog + ", offset = " + offset);
        // todo get schame change
        jsonObject.addProperty("host", host);
        // add meta
        jsonObject.addProperty("binlog", binlog);
        jsonObject.addProperty("offset", offset);
        jsonObject.addProperty("ts_sec", ts_sec);
        jsonObject.addProperty("port", port);
        jsonObject.addProperty("file", (String) record.sourceOffset().get("file"));
        jsonObject.addProperty("pos", (Long) record.sourceOffset().get("pos"));
        jsonObject.addProperty("ts_sec", (Long) record.sourceOffset().get("ts_sec"));
        String[] name = record.valueSchema().name().split("\\.");
        jsonObject.addProperty("db", name[1]);
        jsonObject.addProperty("table", name[2]);
        Struct value = ((Struct) record.value());
        String operatorType = value.getString("op");
        jsonObject.addProperty("operator_type", operatorType);
        // c : create, u: update, d: delete, r: read
        // insert update
        if (!"d".equals(operatorType)) {
            Struct after = value.getStruct("after");
            JsonObject afterJsonObject = parseRecord(after);
            jsonObject.add("after", afterJsonObject);
        }
        // update & delete
        if ("u".equals(operatorType) || "d".equals(operatorType)) {
            Struct source = value.getStruct("before");
            JsonObject beforeJsonObject = parseRecord(source);
            jsonObject.add("before", beforeJsonObject);
        }
        jsonObject.addProperty("parse_time", System.currentTimeMillis() / 1000);
        out.collect(jsonObject.toString());
    }
    private JsonObject parseRecord(Struct after) {
        JsonObject jo = new JsonObject();
        for (Field field : after.schema().fields()) {
            switch ((field.schema()).type()) {
                case INT8:
                    int resultInt8 = after.getInt8(field.name());
                    jo.addProperty(field.name(), resultInt8);
                    break;
                case INT64:
                    Long resultInt = after.getInt64(field.name());
                    jo.addProperty(field.name(), resultInt);
                    break;
                case FLOAT32:
                    Float resultFloat32 = after.getFloat32(field.name());
                    jo.addProperty(field.name(), resultFloat32);
                    break;
                case FLOAT64:
                    Double resultFloat64 = after.getFloat64(field.name());
                    jo.addProperty(field.name(), resultFloat64);
                    break;
                case BYTES:
                    // json ignore byte column
                    // byte[] resultByte = after.getBytes(field.name());
                    // jo.addProperty(field.name(), String.valueOf(resultByte));
                    break;
                case STRING:
                    String resultStr = after.getString(field.name());
                    jo.addProperty(field.name(), resultStr);
                    break;
                default:
            }
        }
        return jo;
    }
    public TypeInformation<String> getProducedType() {
        return BasicTypeInfo.STRING_TYPE_INFO;
    }
}

转换函数 CdcStarMapFunction

CdcStarMapFunction 比较简单,将 json 字符串,转成 CdcRecord 类型的对象,只获取了需要的 数据库、表、操作类型和数据。

获取数据时,insert 和 update 只获取 after 的值

import java.util.LinkedHashMap;
import java.util.Map;
import lombok.Data;
/**
 * cdcRecord save
 */
 @Data
 public class CdcRecord {
    private String db;
    private String table;
    private String op;
    private Map<String, String> data = new LinkedHashMap<>();
}
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
public class CdcStarMapFunction extends RichMapFunction<String, CdcRecord> {
    private final static Logger LOG = LoggerFactory.getLogger(CdcStarMapFunction.class);
    private JsonParser parser;
    @Override
    public void open(Configuration parameters) throws Exception {
        parser = new JsonParser();
    }
    @Override
    public CdcRecord map(String element) throws Exception {
        LOG.debug("data : {}" , element );
        JsonObject object = parser.parse(element).getAsJsonObject();
        String db = object.get("db").getAsString();
        String table = object.get("table").getAsString();
        String op = object.get("operator_type").getAsString();
        CdcRecord record = new CdcRecord(db, table, op);
        // insert/update
        String dataLocation = "after";
        if("d".equals(op)){
            // if op is delete, get before
            dataLocation = "before";
        }
        JsonObject data = object.get(dataLocation).getAsJsonObject();
        for(Map.Entry<String, JsonElement> entry: data.entrySet()){
            String columnName = entry.getKey();
            String columnValue;
            JsonElement value = entry.getValue();
            if(!value.isJsonNull()){
                // if column value is not null, get as string
                columnValue = value.getAsString();
                // put column name/value to record.data
                record.getData().put(columnName, columnValue);
            }
        }
        return record;
    }
}

处理函数 CdcStarProcessFunction

CdcStarProcessFunction 有三部分逻辑:

  1. 三个状态cacheTimer、cacheSize、cache,分别存下一次timer 触发时间、缓存的数据条数、缓存的数据
  2. process 处理每个表的数据,每个表的数据第一次到的时候,基于当前时间 + batchInterval,注册下次时间触发的 timer。数据存储到 cache 中,如果数据量超过预定的 batchSize,触发 flushData 方法往下游输出数据,并删除之前注册的定时器,清理状态
  3. timer 触发 flushData 方法往下游输出数据,清理状态
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
public class CdcStarProcessFunction extends KeyedProcessFunction<String, CdcRecord, List<CdcRecord>> {
    private final static Logger LOG = LoggerFactory.getLogger(CdcStarProcessFunction.class);
    private int batchSize;
    private long batchInterval;
    // next timer time
    private ValueState<Long> cacheTimer;
    // current cache size
    private ValueState<Integer> cacheSize;
    // cache data
    private ListState<CdcRecord> cache;
    public CdcStarProcessFunction(int batchSize, long batchInterval) {
        this.batchSize = batchSize;
        this.batchInterval = batchInterval;
    }
    @Override
    public void open(Configuration parameters) throws Exception {
        ListStateDescriptor cacheDescriptor = new ListStateDescriptor<CdcRecord>("cache", TypeInformation.of(CdcRecord.class));
        cache = getRuntimeContext().getListState(cacheDescriptor);
        ValueStateDescriptor cacheSizeDescriptor = new ValueStateDescriptor<Integer>("cacheSize", Integer.class);
        cacheSize = getRuntimeContext().getState(cacheSizeDescriptor);
        ValueStateDescriptor cacheTimerDescriptor = new ValueStateDescriptor<Long>("cacheTimer", Long.class);
        cacheTimer = getRuntimeContext().getState(cacheTimerDescriptor);
    }
    @Override
    public void processElement(CdcRecord element, KeyedProcessFunction<String, CdcRecord, List<CdcRecord>>.Context ctx, Collector<List<CdcRecord>> out) throws Exception {
        // cache size + 1
        if (cacheSize.value() != null) {
            cacheSize.update(cacheSize.value() + 1);
        } else {
            cacheSize.update(1);
            // add timer for interval flush
            long nextTimer = System.currentTimeMillis() + batchInterval;
            LOG.debug("register timer : {} , key : {}", nextTimer, ctx.getCurrentKey());
            cacheTimer.update(nextTimer);
            ctx.timerService().registerProcessingTimeTimer(nextTimer);
        }
        // add data to cache state
        cache.add(element);
        // cache size max than batch Size
        if (cacheSize.value() >= batchSize) {
            // remove next timer
            long nextTimer = cacheTimer.value();
            LOG.debug("{} remove timer, key : {}", nextTimer, ctx.getCurrentKey());
            ctx.timerService().deleteProcessingTimeTimer(nextTimer);
            // flush data to down stream
            flushData(out);
        }
    }
    /**
     * flush data to down stream
     */
    private void flushData(Collector<List<CdcRecord>> out) throws Exception {
        List<CdcRecord> tmpCache = new ArrayList<>();
        Iterator<CdcRecord> it = cache.get().iterator();
        while (it.hasNext()) {
            tmpCache.add(it.next());
        }
        if (tmpCache.size() > 0) {
            out.collect(tmpCache);
            // finish flush all cache data, clear state
            cache.clear();
            cacheSize.clear();
            cacheTimer.clear();
        }
    }
    @Override
    public void onTimer(long timestamp, KeyedProcessFunction<String, CdcRecord, List<CdcRecord>>.OnTimerContext ctx, Collector<List<CdcRecord>> out) throws Exception {
        LOG.info("{} trigger timer to flush data", ctx.getCurrentKey(), timestamp);
        // batch interval trigger flush data
        flushData(out);
    }
    @Override
    public void close() throws Exception {
    }
}

输出函数 StarRocksSink

StarRocksSink 稍微复杂一点,需要基于数据中的表名,去目标数据库中获取对应的表结构(为了避免每次查询数据库,将获取到的表结构存到内存中),基于目标表的字段顺序从数据中获取对应列的值,拼接上数据的操作类型。

  • StarRocksSink 在往 StarRocks 写数据的时候,实现了 upsert 和 delete 操作,需要在数据中拼接上 0/1,0 代表 UPSERT 操作,1 代表 DELETE 操作
  • 见参考文档1

invoke 方法

StarRocksSink 的核心逻辑都在 invoke 方法中,逻辑如下:

  1. 从数据中获取数据库和表,拼接成 key
  2. 获取目标表的 schema(整库映射,源端和目标端表名一致),先从缓存中获取,如果不存在就从数据库中获取
  3. 组装数据
  4. 拼接 load url
  5. 用 http 方式往 StarRocks 写数据

loadTargetTableSchema 方法

执行 desc db.table 获取目标表的表结构,组装成两种结果:将所有列名用 "," 拼接成字符串,再拼接 "__op" 用于 http header 请求中标识数据的列;将所有列按顺序添加到 list 中,用于从源数据中获取对应列的数据

parseUploadData 方法

用目标表列顺序,从数据中获取对应列的值,使用列分隔符拼接数据,最后基于操作类型拼接 0/1,删除拼接 1,其他类型拼接 0

doHttp 方法

用 http 的方式往 StarRocks 中写数据,没什么特别的,忽略

import org.apache.commons.codec.binary.Base64;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.http.HttpException;
import org.apache.http.HttpHeaders;
import org.apache.http.HttpRequest;
import org.apache.http.HttpRequestInterceptor;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.DefaultRedirectStrategy;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.protocol.HTTP;
import org.apache.http.protocol.HttpContext;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.sql.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class StarRocksSink extends RichSinkFunction<List<CdcRecord>> {
    private final static Logger LOG = LoggerFactory.getLogger(StarRocksSink.class);
    public final static String COL_SEP = "\\\\x01";
    public final static String ROW_SEP = "\\\\x02";
    public final static String NULL_COL = "\\N";
    private String ip;
    private String port;
    private String loadPort;
    private String user;
    private String pass;
    private String db;
    private Connection connection;
    private Map<String, String> spliceColumnMap = new HashMap<>();
    private Map<String, List<String>> columnMap = new HashMap<>();
    public StarRocksSink() {
    }
    public StarRocksSink(String ip, String port, String loadPort, String user, String pass, String db) {
        this.ip = ip;
        this.port = port;
        this.loadPort = loadPort;
        this.user = user;
        this.pass = pass;
        this.db = db;
    }
    @Override
    public void open(Configuration parameters) throws Exception {
        reConnect();
    }
    @Override
    public void invoke(List<CdcRecord> element, Context context) throws Exception {
        LOG.info("write batch size: " + element.size());
        if(element == null || element.size() ==0){
            LOG.info("ignore empty element");
            return;
        }
        // use StarRocks db name
//        String db = cache.get(0).getDb();
        String table = element.get(0).getTable();
        String key = db + "_" + table;
        // get table schema
        List<String> columnList;
        if (!columnMap.containsKey(key)) {
            // db.table is first coming, load column, put to spliceColumnMap & columnMap
            loadTargetTableSchema(key, db, table);
        }
        String columns = spliceColumnMap.get(key);
        columnList = columnMap.get(key);
        if (columnList.size() == 0) {
            LOG.info("{}.{} not exists in target starrocks, ingore data change", db, table);
        }
        // make up data
        String data = parseUploadData(element, columnList);
        final String loadUrl = String.format("http://%s:%s/api/%s/%s/_stream_load", ip, loadPort, db, table);
        String label = db + "_" + table + "_" + System.currentTimeMillis();
        // send data to starrocks
        doHttp(loadUrl, data, label, columns);
    }
    /**
     * http send data to starrocks
     */
    private void doHttp(String loadUrl, String data, String label, String columns) throws IOException, SQLException {
        final HttpClientBuilder httpClientBuilder = HttpClients
                .custom()
                .setRedirectStrategy(new DefaultRedirectStrategy() {
                    @Override
                    protected boolean isRedirectable(String method) {
                        return true;
                    }
                })
                .addInterceptorFirst(new ContentLengthHeaderRemover());
        try (CloseableHttpClient client = httpClientBuilder.build()) {
            HttpPut put = new HttpPut(loadUrl);
            StringEntity entity = new StringEntity(data, "UTF-8");
            put.setHeader(HttpHeaders.EXPECT, "100-continue");
            put.setHeader(HttpHeaders.AUTHORIZATION, basicAuthHeader(user, pass));
            // the label header is optional, not necessary
            // use label header can ensure at most once semantics
            put.setHeader("label", label);
            put.setHeader("columns", columns);
            put.setHeader("row_delimiter", ROW_SEP);
            put.setHeader("column_separator", COL_SEP);
            put.setEntity(entity);
            try (CloseableHttpResponse response = client.execute(put)) {
                String loadResult = "";
                if (response.getEntity() != null) {
                    loadResult = EntityUtils.toString(response.getEntity());
                }
                final int statusCode = response.getStatusLine().getStatusCode();
                // statusCode 200 just indicates that starrocks be service is ok, not stream load
                // you should see the output content to find whether stream load is success
                if (statusCode != 200) {
                    throw new IOException(
                            String.format("Stream load failed, statusCode=%s load result=%s", statusCode, loadResult));
                }
            }
        }
    }
    private String basicAuthHeader(String username, String password) {
        final String tobeEncode = username + ":" + password;
        byte[] encoded = Base64.encodeBase64(tobeEncode.getBytes(StandardCharsets.UTF_8));
        return "Basic " + new String(encoded);
    }
    private String parseUploadData(List<CdcRecord> cache, List<String> columnList) {
        StringBuilder builder = new StringBuilder();
        for (CdcRecord element : cache) {
            Map<String, String> data = element.getData();
            for (String column : columnList) {
                if (data.containsKey(column)) {
                    builder.append(data.get(column)).append(COL_SEP);
                } else {
                    // if target column not exists in source data, set as null
                    builder.append(NULL_COL).append(COL_SEP);
                }
            }
            // add __op
            if ("d".equals(element.getOp())) {
                // delete
                builder.append("1");
            } else {
                // upsert
                builder.append("0");
            }
            // add row separator
            builder.append(ROW_SEP);
        }
        // remove last row sep
        builder = builder.delete(builder.length() - 5, builder.length());
        String data = builder.toString();
        return data;
    }
    /**
     * load table schema, parse to http column and column list for load source data
     */
    private void loadTargetTableSchema(String key, String db, String table) throws SQLException {
        List<String> columnList = new ArrayList<>();
        StringBuilder builer = new StringBuilder();
        try {
            // load table schema
            PreparedStatement insertPS = connection.prepareStatement("desc " + db + "." + table);
            ResultSet result = insertPS.executeQuery();
            while (result.next()) {
                String column = result.getString(1);
                builer.append(column).append(",");
                columnList.add(column);
            }
        } catch (SQLException e) {
            LOG.warn("load {}.{} schema error. {}", db, table, e.getStackTrace());
        }
        builer.append("__op");
        String columns = builer.toString();
        spliceColumnMap.put(key, columns);
        columnMap.put(key, columnList);
    }
    /**
     * reconnect to starrocks
     *
     * @throws SQLException
     */
    private void reConnect() throws SQLException {
        String driver = "jdbc:mysql://" + ip + ":" + port;
        if (connection == null || connection.isClosed()) {
            connection = DriverManager.getConnection(driver, user, pass);
        }
    }
    @Override
    public void finish() throws Exception {
        LOG.info("finish");
    }
    @Override
    public void close() throws Exception {
        LOG.info("close...");
        connection.close();
    }
    private static class ContentLengthHeaderRemover implements HttpRequestInterceptor {
        @Override
        public void process(HttpRequest request, HttpContext context) throws HttpException, IOException {
            // fighting org.apache.http.protocol.RequestContent's ProtocolException("Content-Length header already present");
            request.removeHeaders(HTTP.CONTENT_LEN);
        }
    }
}

局限性:

  1. 还未实现 starrocks 端表结构跟随 源端表结构同步变更
  2. 为了保证效率,仅会在每一个表第一次来的时候判断目标段是否存在该表,如果已经判定该表不存在,后续直接忽略该表的数据变更
  3. 部分不导入的表,只在sink 的时候做了过滤,前面的操作还是要继续,可以考虑在 反序列化活map中过滤掉目标库中不存在的表数据


flink cdc到table store

package name.lijiaqi;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class MysqlToHudiExample {
    public static void main(String[] args) throws Exception {
        EnvironmentSettings fsSettings = EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, fsSettings);
        tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
        // 数据源表
    String sourceDDL =
            "CREATE TEMPORARY TABLE ods_lineitem (\n" +
            " l_orderkey INT NOT NULL,\n" +
            " l_partkey INT NOT NULL,\n" +
            " l_suppkey INT NOT NULL,\n" +
            " l_linenumber INT NOT NULL,\n" +
            " l_quantity DECIMAL(15, 2) NOT NULL,\n" +
            " l_extendedprice DECIMAL(15, 2) NOT NULL,\n" +
            " l_discount DECIMAL(15, 2) NOT NULL,\n" +
            " l_tax DECIMAL(15, 2) NOT NULL,\n" +
            " l_returnflag CHAR(1) NOT NULL,\n" +
            " l_linestatus CHAR(1) NOT NULL,\n" +
            " l_shipdate DATE NOT NULL,\n" +
            " l_commitdate DATE NOT NULL,\n" +
            " l_receiptdate DATE NOT NULL,\n" +
            " l_shipinstruct CHAR(25) NOT NULL,\n" +
            " l_shipmode CHAR(10) NOT NULL,\n" +
            " l_comment VARCHAR(44) NOT NULL,\n" +
            " PRIMARY KEY (l_orderkey, l_linenumber) NOT ENFORCED \n" +
            ") WITH (\n" +
            "  'connector' = 'mysql-cdc',\n" +
            "  'hostname' = '127.0.0.1',\n" + -- 如果想使用 host,可以修改宿主机 /etc/hosts 加入 127.0.0.1 mysql.docker.internal
            "  'port' = '3307',\n" +
            "  'username' = 'flink',\n" +
            "  'password' = 'flink',\n" +
            "  'database-name' = 'tpch_s10',\n" +
            "  'table-name' = 'lineitem' \n" +
            ");"  
    String tableSourceDDL=
      "CREATE CATALOG `table_store` WITH (\n" +
      "'type' = 'table-store',\n" +
      "'warehouse' = '/tmp/table-store-101'\n" +
      ");\n" +
      "USE CATALOG `table_store`;"
        // 输出目标表
        String sinkDWDDDL =
            "CREATE TABLE IF NOT EXISTS dwd_lineitem (\n" +
        " l_orderkey INT NOT NULL,\n" +
        " l_partkey INT NOT NULL,\n" +
        " l_suppkey INT NOT NULL,\n" +
        " l_linenumber INT NOT NULL,\n" +
        " l_quantity DECIMAL(15, 2) NOT NULL,\n" +
        " l_extendedprice DECIMAL(15, 2) NOT NULL,\n" +
        " l_discount DECIMAL(15, 2) NOT NULL,\n" +
        " l_tax DECIMAL(15, 2) NOT NULL,\n" +
        " l_returnflag CHAR(1) NOT NULL,\n" +
        " l_linestatus CHAR(1) NOT NULL,\n" +
        " l_shipdate DATE NOT NULL,\n" +
        " l_commitdate DATE NOT NULL,\n" +
        " l_receiptdate DATE NOT NULL,\n" +
        " l_shipinstruct CHAR(25) NOT NULL,\n" +
        " l_shipmode CHAR(10) NOT NULL,\n" +
        " l_comment VARCHAR(44) NOT NULL,\n" +
        " l_year BIGINT NOT NULL,\n" +
        " l_month BIGINT NOT NULL,\n" +
        " PRIMARY KEY (l_orderkey, l_linenumber, l_year, l_month) NOT ENFORCED \n" +
        " ) PARTITIONED BY (l_year, l_month) WITH (
        " -- 每个 partition 下设置 2 个 bucket
        " 'bucket' = '2',\n" +
        " -- 设置 changelog-producer 为 'input',这会使得上游 CDC Source 不丢弃 update_before,并且下游消费 dwd_lineitem 时没有 changelog-normalize 节点
        " 'changelog-producer' = 'input' \n" +
        " );"
         // 输出目标表
        String sinkADSDDL =
      "CREATE  TABLE IF NOT EXISTS ads_pricing_summary (\n" +
      "l_returnflag CHAR(1) NOT  NULL,\n" +
      "l_linestatus CHAR(1) NOT  NULL,\n" +
      "sum_quantity DOUBLE  NOT NULL,\n" +
      "sum_base_price DOUBLE  NOT NULL,\n" +
      "sum_discount_price DOUBLE  NOT NULL,\n" +
      "sum_charge_vat_inclusive DOUBLE  NOT NULL,\n" +
      "avg_quantity DOUBLE  NOT NULL,\n" +
      "avg_base_price DOUBLE  NOT NULL,\n" +
      "avg_discount DOUBLE  NOT NULL,\n" +
      "count_order BIGINT  NOT NULL \n" +
      ") WITH ( \n" +
      "'bucket' = '2'\n" +
      ");"
        // 简单的聚合处理
        String transformSQL1 =
                "INSERT  INTO dwd_lineitem SELECT l_orderkey,l_partkey,l_suppkey,l_linenumber,l_quantity,l_extendedprice,l_discount,\n" +
                 "l_tax,l_returnflag,l_linestatus,l_shipdate,l_commitdate,l_receiptdate,l_shipinstruct,l_shipmode,l_comment,\n" +
          "YEAR(l_shipdate) AS l_year,MONTH(l_shipdate) AS l_month FROM ods_lineitem;"
    String transformSQL2=
               "INSERT  INTO ads_pricing_summary SELECT l_returnflag,l_linestatus,SUM(l_quantity) AS sum_quantity,\n" +
               "SUM(l_extendedprice) AS sum_base_price,SUM(l_extendedprice * (1-l_discount)) AS sum_discount_price,\n" + 
                "SUM(l_extendedprice * (1-l_discount) * (1+l_tax)) AS sum_charge_vat_inclusive,AVG(l_quantity) AS avg_quantity,\n" + 
                "AVG(l_extendedprice) AS avg_base_price,AVG(l_discount) AS avg_discount,COUNT(*) AS count_order FROM dwd_lineitem \n" + 
                "WHERE (l_year < 1998 OR (l_year = 1998 AND l_month<= 9)) AND l_shipdate <= DATE '1998-12-01' - INTERVAL '90' DAY \n" + 
                "GROUP BY  l_returnflag,l_linestatus;"    
        // 插入hudi表
        tableEnv.executeSql(tableSourceDDL);
        tableEnv.executeSql(sourceDDL);
        tableEnv.executeSql(sinkDWDDDL);
        tableEnv.executeSql(sinkADSDDL);
        TableResult result = tableEnv.executeSql(transformSQL1);
        TableResult result = tableEnv.executeSql(transformSQL2);
        env.execute("mysql-to-tableSource");
    }
}

详细请参阅:

https://github.com/LadyForest/flink-table-store-101/blob/master/real-time-update/README.zh.md


相关实践学习
每个IT人都想学的“Web应用上云经典架构”实战
本实验从Web应用上云这个最基本的、最普遍的需求出发,帮助IT从业者们通过“阿里云Web应用上云解决方案”,了解一个企业级Web应用上云的常见架构,了解如何构建一个高可用、可扩展的企业级应用架构。
MySQL数据库入门学习
本课程通过最流行的开源数据库MySQL带你了解数据库的世界。 &nbsp; 相关的阿里云产品:云数据库RDS MySQL 版 阿里云关系型数据库RDS(Relational Database Service)是一种稳定可靠、可弹性伸缩的在线数据库服务,提供容灾、备份、恢复、迁移等方面的全套解决方案,彻底解决数据库运维的烦恼。 了解产品详情:&nbsp;https://www.aliyun.com/product/rds/mysql&nbsp;
相关文章
|
5月前
|
存储 监控 数据挖掘
京东物流基于Flink & StarRocks的湖仓建设实践
本文整理自京东物流高级数据开发工程师梁宝彬在Flink Forward Asia 2024的分享,聚焦实时湖仓的探索与建设、应用实践、问题思考及未来展望。内容涵盖京东物流通过Flink和Paimon等技术构建实时湖仓体系的过程,解决复杂业务场景下的数据分析挑战,如多维OLAP分析、大屏监控等。同时,文章详细介绍了基于StarRocks的湖仓一体方案,优化存储成本并提升查询效率,以及存算分离的应用实践。最后,对未来数据服务的发展方向进行了展望,计划推广长周期数据存储服务和原生数据湖建设,进一步提升数据分析能力。
491 1
京东物流基于Flink & StarRocks的湖仓建设实践
|
4月前
|
数据采集 SQL canal
Amoro + Flink CDC 数据融合入湖新体验
本文总结了货拉拉高级大数据开发工程师陈政羽在Flink Forward Asia 2024上的分享,聚焦Flink CDC在货拉拉的应用与优化。内容涵盖CDC应用现状、数据入湖新体验、入湖优化及未来规划。文中详细分析了CDC在多业务场景中的实践,包括数据采集平台化、稳定性建设,以及面临的文件碎片化、Schema演进等挑战。同时介绍了基于Apache Amoro的湖仓融合架构,通过自优化服务解决小文件问题,提升数据新鲜度与读写平衡。未来将深化Paimon与Amoro的结合,打造更高效的入湖生态与自动化优化方案。
238 1
Amoro + Flink CDC 数据融合入湖新体验
|
4月前
|
SQL 关系型数据库 MySQL
Flink CDC 3.4 发布, 优化高频 DDL 处理,支持 Batch 模式,新增 Iceberg 支持
Apache Flink CDC 3.4.0 版本正式发布!经过4个月的开发,此版本强化了对高频表结构变更的支持,新增 batch 执行模式和 Apache Iceberg Sink 连接器,可将数据库数据全增量实时写入 Iceberg 数据湖。51位贡献者完成了259次代码提交,优化了 MySQL、MongoDB 等连接器,并修复多个缺陷。未来 3.5 版本将聚焦脏数据处理、数据限流等能力及 AI 生态对接。欢迎下载体验并提出反馈!
827 1
Flink CDC 3.4 发布, 优化高频 DDL 处理,支持 Batch 模式,新增 Iceberg 支持
|
5月前
|
SQL API Apache
Dinky 和 Flink CDC 在实时整库同步的探索之路
本次分享围绕 Dinky 的整库同步技术演进,从传统数据集成方案的痛点出发,探讨了 Flink CDC Yaml 作业的探索历程。内容分为三个部分:起源、探索、未来。在起源部分,分析了传统数据集成方案中全量与增量割裂、时效性低等问题,引出 Flink CDC 的优势;探索部分详细对比了 Dinky CDC Source 和 Flink CDC Pipeline 的架构与能力,深入讲解了 YAML 作业的细节,如模式演变、数据转换等;未来部分则展望了 Dinky 对 Flink CDC 的支持与优化方向,包括 Pipeline 转换功能、Transform 扩展及实时湖仓治理等。
672 12
Dinky 和 Flink CDC 在实时整库同步的探索之路
|
3月前
|
消息中间件 SQL 关系型数据库
Flink CDC + Kafka 加速业务实时化
Flink CDC 是一种支持流批一体的分布式数据集成工具,通过 YAML 配置实现数据传输过程中的路由与转换操作。它已从单一数据源的 CDC 数据流发展为完整的数据同步解决方案,支持 MySQL、Kafka 等多种数据源和目标端(如 Delta Lake、Iceberg)。其核心功能包括多样化数据输入链路、Schema Evolution、Transform 和 Routing 模块,以及丰富的监控指标。相比传统 SQL 和 DataStream 作业,Flink CDC 提供更灵活的 Schema 变更控制和原始 binlog 同步能力。
|
消息中间件 缓存 关系型数据库
Flink CDC产品常见问题之upsert-kafka增加参数报错如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
Oracle 关系型数据库 MySQL
flink cdc 插件问题之报错如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
Java 关系型数据库 MySQL
Flink CDC有见这个报错不?
【2月更文挑战第29天】Flink CDC有见这个报错不?
210 2
|
监控 关系型数据库 MySQL
Flink CDC产品常见问题之使用3.0测试mysql到starrocks启动报错如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
存储 关系型数据库 MySQL
Flink CDC产品常见问题之写hudi的时候报错如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。

热门文章

最新文章