Flink cdc到doris,starrocks,table store

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
实时计算 Flink 版,5000CU*H 3个月
简介: Flink cdc到doris,starrocks,table store


flink cdc到doris


flink cdc同步到doris主类:

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.sink.DorisSink;
import org.apache.doris.flink.sink.writer.SimpleStringSerializer;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.UUID;
/***
 *
 * Synchronize the full database through flink cdc
 *
 */
public class DatabaseFullSync {
    private static final Logger LOG = LoggerFactory.getLogger(DatabaseFullSync.class);
    private static String HOST = "127.0.0.1";
    private static String MYSQL_PASSWD = "password";
    private static int MYSQL_PORT = 3306;
    private static int DORIS_PORT = 8030;
    private static String MYSQL_USER = "root";
    private static String SYNC_DB = "test";
    private static String SYNC_TBLS = "test.*";
    private static String TARGET_DORIS_DB = "test";
    public static void main(String[] args) throws Exception {
        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
            .hostname(HOST)
            .port(MYSQL_PORT)
            .databaseList(SYNC_DB) // set captured database
            .tableList(SYNC_TBLS) // set captured table
            .username(MYSQL_USER)
            .password(MYSQL_PASSWD)
            .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
            .build();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        // enable checkpoint
        env.enableCheckpointing(10000);
        DataStreamSource<String> cdcSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL CDC Source");
        //get table list
        List<String> tableList = getTableList();
        LOG.info("sync table list:{}",tableList);
        for(String tbl : tableList){
            SingleOutputStreamOperator<String> filterStream = filterTableData(cdcSource, tbl);
            SingleOutputStreamOperator<String> cleanStream = clean(filterStream);
            DorisSink dorisSink = buildDorisSink(tbl);
            cleanStream.sinkTo(dorisSink).name("sink " + tbl);
        }
        env.execute("Full Database Sync ");
    }
    /**
     * Get real data
     * {
     *     "before":null,
     *     "after":{
     *         "id":1,
     *         "name":"zhangsan-1",
     *         "age":18
     *     },
     *     "source":{
     *         "db":"test",
     *         "table":"test_1",
     *         ...
     *     },
     *     "op":"c",
     *     ...
     * }
     * */
    private static SingleOutputStreamOperator<String> clean(SingleOutputStreamOperator<String> source) {
        return source.flatMap(new FlatMapFunction<String,String>(){
            @Override
            public void flatMap(String row, Collector<String> out) throws Exception {
                try{
                    JSONObject rowJson = JSON.parseObject(row);
                    String op = rowJson.getString("op");
                    //history,insert,update
                    if(Arrays.asList("r","c","u").contains(op)){
                        out.collect(rowJson.getJSONObject("after").toJSONString());
                    }else{
                        LOG.info("filter other op:{}",op);
                    }
                }catch (Exception ex){
                    LOG.warn("filter other format binlog:{}",row);
                }
            }
        });
    }
    /**
     * Divide according to tablename
     * */
    private static SingleOutputStreamOperator<String> filterTableData(DataStreamSource<String> source, String table) {
        return source.filter(new FilterFunction<String>() {
            @Override
            public boolean filter(String row) throws Exception {
                try {
                    JSONObject rowJson = JSON.parseObject(row);
                    JSONObject source = rowJson.getJSONObject("source");
                    String tbl = source.getString("table");
                    return table.equals(tbl);
                }catch (Exception ex){
                    ex.printStackTrace();
                    return false;
                }
            }
        });
    }
    /**
     * Get all MySQL tables that need to be synchronized
     * */
    private static List<String> getTableList() {
        List<String> tables = new ArrayList<>();
        String sql = "SELECT TABLE_SCHEMA,TABLE_NAME FROM information_schema.tables WHERE TABLE_SCHEMA = '" + SYNC_DB + "'";
        List<JSONObject> tableList = JdbcUtil.executeQuery(HOST, MYSQL_PORT, MYSQL_USER, MYSQL_PASSWD, sql);
        for(JSONObject jsob : tableList){
            String schemaName = jsob.getString("TABLE_SCHEMA");
            String tblName = jsob.getString("TABLE_NAME");
            String schemaTbl = schemaName  + "." + tblName;
            if(schemaTbl.matches(SYNC_TBLS)){
                tables.add(tblName);
            }
        }
        return tables;
    }
    /**
     * create doris sink
     * */
    public static DorisSink buildDorisSink(String table){
        DorisSink.Builder<String> builder = DorisSink.builder();
        DorisOptions.Builder dorisBuilder = DorisOptions.builder();
        dorisBuilder.setFenodes(HOST + ":" + DORIS_PORT)
            .setTableIdentifier(TARGET_DORIS_DB + "." + table)
            .setUsername("root")
            .setPassword("");
        Properties pro = new Properties();
        //json data format
        pro.setProperty("format", "json");
        pro.setProperty("read_json_by_line", "true");
        DorisExecutionOptions executionOptions = DorisExecutionOptions.builder()
            .setLabelPrefix("label-" + table + UUID.randomUUID()) //streamload label prefix,
            .setStreamLoadProp(pro).build();
        builder.setDorisReadOptions(DorisReadOptions.builder().build())
            .setDorisExecutionOptions(executionOptions)
            .setSerializer(new SimpleStringSerializer()) //serialize according to string
            .setDorisOptions(dorisBuilder.build());
        return builder.build();
    }
}

JdbcUtil类:

import com.alibaba.fastjson.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
public class JdbcUtil {
    static {
        try {
            Class.forName("com.mysql.jdbc.Driver");
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        }
    }
    private static final Logger LOG = LoggerFactory.getLogger(JdbcUtil.class);
    public static void main(String[] args) throws SQLException {
    }
    public static List<JSONObject> executeQuery(String hostUrl, int port, String user, String password, String sql){
        List<JSONObject> beJson = new ArrayList<>();
        String connectionUrl = String.format("jdbc:mysql://%s:%s/",hostUrl,port);
        Connection con = null;
        try {
            con = DriverManager.getConnection(connectionUrl,user,password);
            PreparedStatement ps = con.prepareStatement(sql);
            ResultSet rs = ps.executeQuery();
            beJson = resultSetToJson(rs);
        } catch (SQLException e) {
            e.printStackTrace();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                con.close();
            } catch (Exception e) {
            }
        }
        return beJson;
    }
    private static List<JSONObject> resultSetToJson(ResultSet rs) throws SQLException {
        List<JSONObject> list = new ArrayList<>();
        ResultSetMetaData metaData = rs.getMetaData();
        int columnCount = metaData.getColumnCount();
        while (rs.next()) {
            JSONObject jsonObj = new JSONObject();
            for (int i = 1; i <= columnCount; i++) {
                String columnName =metaData.getColumnLabel(i);
                String value = rs.getString(columnName);
                jsonObj.put(columnName, value);
            }
            list.add(jsonObj);
        }
        return list;
    }
}


flink cdc到starrocks


主要实现的流程:

  1. Flink cdc 采集 mysql 数据
  2. 将 cdc 采集到的数据转为 json
  3. 从 json 中获取 数据库、表和数据
  4. 用数据库和表对数据做 key by
  5. 使用 process function 处理每个表的数据,用状态缓存数据,缓存数据达到一定量或者缓存了一定时间(用 timer 触发缓存时间触发的场景)StarRocks 写数据
  6. sink 中拼接数据 使用 Stream Load 往 StarRocks 写数据


主类 CdcToStarRocks

       主要流程很简单: source -> map -> keyBy -> process -> sink,source 读取 mysql binlog(或者全量+增量),map 转换数据格式,keyBy 以数据库名 + 表名对数据分区,process 中对数据攒批,sink 输出数据到 StarRocks

import com.venn.source.mysql.cdc.CommonStringDebeziumDeserializationSchema;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Properties;
/**
 * mysql cdc demo
 * <p>
 * cdc 整库同步数据到 starrocks
 * <p>
 * 局限:
 * 1. 还未实现 starrocks 端表结构跟随 源端表结构同步变更
 * 2. 为了保证效率,仅会在每一个表第一次来的时候判断目标段是否存在该表,如果已经判定该表不存在,后续直接忽略该表的数据变更
 * 3. 部分不导入的表,只在sink 的时候做了过滤,前面的操作还是要继续,可以考虑在 反序列化活map中过滤掉目标库中不存在的表数据
 */
public class CdcToStarRocks {
    // 每个批次最大条数和等待时间
    private static int batchSize = 10000;
    private static long batchInterval = 10 *1000;
    public static void main(String[] args) throws Exception {
        String ip = "localhost";
        int port = 3306;
        String db = "venn";
//        String table = "venn.user_log";
        String table = "venn.*";
        String user = "root";
        String pass = "123456";
        String starrocksIp = "10.201.0.230";
        String starrocksPort = "29030";
        String starrocksLoadPort = "28030";
        String starrocksUser = "root";
        String starrocksPass = "123456";
        String starrocksDb = "test";
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        MySqlSource<String> sourceFunction = MySqlSource.<String>builder()
                .hostname(ip)
                .port(port)
                // 获取两个数据库的所有表
                .databaseList(db)
                .tableList(table)
                .username(user)
                .password(pass)
                .startupOptions(StartupOptions.latest())
                // do not cache schema change
//                .includeSchemaChanges(false)
//                .startupOptions(StartupOptions.initial())
                // 自定义 解析器,讲数据解析成 json
                .deserializer(new CommonStringDebeziumDeserializationSchema(ip, port))
                .build();
        env
                .fromSource(sourceFunction, WatermarkStrategy.noWatermarks(), "cdc")
                .name("source")
                .uid("source")
//                 json 字符串转 CdcRecord
                .map(new CdcStarMapFunction())
                .name("map")
                .keyBy(  record -> record.getDb() + "_" + record.getTable())
                .process(new CdcStarProcessFunction(batchSize, batchInterval))
                .name("process")
                .uid("process")
                .addSink(new StarRocksSink(starrocksIp, starrocksPort, starrocksLoadPort, starrocksUser, starrocksPass, starrocksDb))
                .name("sink");
        env.execute("cdcToStarRocks");
    }
}

反序列化器 CommonStringDebeziumDeserializationSchema

反序列化器直接拿之前写的通用的 flink cdc 反序列化器过来,继承 DebeziumDeserializationSchema,主要是从数据中获取 数据库、表、操作类型和数据,需求特别注意以下几点:

  1. insert 类型的操心,只需要获取 after 中的数据
  2. update 类型的操作,需要同时解析 before 和 after 的数据,before 是修改前的,after 是修改后的,如果不需要修改前的,可以只获取 after
  3. delete 类型的操作,需要获取 before
  4. 如果有 ddl 操作,需要特殊处理(本次不包含)
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.google.gson.JsonObject;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
/**
 * deserialize debezium format binlog
 */
public class CommonStringDebeziumDeserializationSchema implements DebeziumDeserializationSchema<String> {
    private String host;
    private int port;
    public CommonStringDebeziumDeserializationSchema(String host, int port) {
        this.host = host;
        this.port = port;
    }
    public void deserialize(SourceRecord record, Collector<String> out) {
        JsonObject jsonObject = new JsonObject();
        String binlog = record.sourceOffset().get("file").toString();
        String offset = record.sourceOffset().get("pos").toString();
        String ts_sec = record.sourceOffset().get("ts_sec").toString();
//        System.out.println("binlog : " + binlog + ", offset = " + offset);
        // todo get schame change
        jsonObject.addProperty("host", host);
        // add meta
        jsonObject.addProperty("binlog", binlog);
        jsonObject.addProperty("offset", offset);
        jsonObject.addProperty("ts_sec", ts_sec);
        jsonObject.addProperty("port", port);
        jsonObject.addProperty("file", (String) record.sourceOffset().get("file"));
        jsonObject.addProperty("pos", (Long) record.sourceOffset().get("pos"));
        jsonObject.addProperty("ts_sec", (Long) record.sourceOffset().get("ts_sec"));
        String[] name = record.valueSchema().name().split("\\.");
        jsonObject.addProperty("db", name[1]);
        jsonObject.addProperty("table", name[2]);
        Struct value = ((Struct) record.value());
        String operatorType = value.getString("op");
        jsonObject.addProperty("operator_type", operatorType);
        // c : create, u: update, d: delete, r: read
        // insert update
        if (!"d".equals(operatorType)) {
            Struct after = value.getStruct("after");
            JsonObject afterJsonObject = parseRecord(after);
            jsonObject.add("after", afterJsonObject);
        }
        // update & delete
        if ("u".equals(operatorType) || "d".equals(operatorType)) {
            Struct source = value.getStruct("before");
            JsonObject beforeJsonObject = parseRecord(source);
            jsonObject.add("before", beforeJsonObject);
        }
        jsonObject.addProperty("parse_time", System.currentTimeMillis() / 1000);
        out.collect(jsonObject.toString());
    }
    private JsonObject parseRecord(Struct after) {
        JsonObject jo = new JsonObject();
        for (Field field : after.schema().fields()) {
            switch ((field.schema()).type()) {
                case INT8:
                    int resultInt8 = after.getInt8(field.name());
                    jo.addProperty(field.name(), resultInt8);
                    break;
                case INT64:
                    Long resultInt = after.getInt64(field.name());
                    jo.addProperty(field.name(), resultInt);
                    break;
                case FLOAT32:
                    Float resultFloat32 = after.getFloat32(field.name());
                    jo.addProperty(field.name(), resultFloat32);
                    break;
                case FLOAT64:
                    Double resultFloat64 = after.getFloat64(field.name());
                    jo.addProperty(field.name(), resultFloat64);
                    break;
                case BYTES:
                    // json ignore byte column
                    // byte[] resultByte = after.getBytes(field.name());
                    // jo.addProperty(field.name(), String.valueOf(resultByte));
                    break;
                case STRING:
                    String resultStr = after.getString(field.name());
                    jo.addProperty(field.name(), resultStr);
                    break;
                default:
            }
        }
        return jo;
    }
    public TypeInformation<String> getProducedType() {
        return BasicTypeInfo.STRING_TYPE_INFO;
    }
}

转换函数 CdcStarMapFunction

CdcStarMapFunction 比较简单,将 json 字符串,转成 CdcRecord 类型的对象,只获取了需要的 数据库、表、操作类型和数据。

获取数据时,insert 和 update 只获取 after 的值

import java.util.LinkedHashMap;
import java.util.Map;
import lombok.Data;
/**
 * cdcRecord save
 */
 @Data
 public class CdcRecord {
    private String db;
    private String table;
    private String op;
    private Map<String, String> data = new LinkedHashMap<>();
}
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
public class CdcStarMapFunction extends RichMapFunction<String, CdcRecord> {
    private final static Logger LOG = LoggerFactory.getLogger(CdcStarMapFunction.class);
    private JsonParser parser;
    @Override
    public void open(Configuration parameters) throws Exception {
        parser = new JsonParser();
    }
    @Override
    public CdcRecord map(String element) throws Exception {
        LOG.debug("data : {}" , element );
        JsonObject object = parser.parse(element).getAsJsonObject();
        String db = object.get("db").getAsString();
        String table = object.get("table").getAsString();
        String op = object.get("operator_type").getAsString();
        CdcRecord record = new CdcRecord(db, table, op);
        // insert/update
        String dataLocation = "after";
        if("d".equals(op)){
            // if op is delete, get before
            dataLocation = "before";
        }
        JsonObject data = object.get(dataLocation).getAsJsonObject();
        for(Map.Entry<String, JsonElement> entry: data.entrySet()){
            String columnName = entry.getKey();
            String columnValue;
            JsonElement value = entry.getValue();
            if(!value.isJsonNull()){
                // if column value is not null, get as string
                columnValue = value.getAsString();
                // put column name/value to record.data
                record.getData().put(columnName, columnValue);
            }
        }
        return record;
    }
}

处理函数 CdcStarProcessFunction

CdcStarProcessFunction 有三部分逻辑:

  1. 三个状态cacheTimer、cacheSize、cache,分别存下一次timer 触发时间、缓存的数据条数、缓存的数据
  2. process 处理每个表的数据,每个表的数据第一次到的时候,基于当前时间 + batchInterval,注册下次时间触发的 timer。数据存储到 cache 中,如果数据量超过预定的 batchSize,触发 flushData 方法往下游输出数据,并删除之前注册的定时器,清理状态
  3. timer 触发 flushData 方法往下游输出数据,清理状态
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
public class CdcStarProcessFunction extends KeyedProcessFunction<String, CdcRecord, List<CdcRecord>> {
    private final static Logger LOG = LoggerFactory.getLogger(CdcStarProcessFunction.class);
    private int batchSize;
    private long batchInterval;
    // next timer time
    private ValueState<Long> cacheTimer;
    // current cache size
    private ValueState<Integer> cacheSize;
    // cache data
    private ListState<CdcRecord> cache;
    public CdcStarProcessFunction(int batchSize, long batchInterval) {
        this.batchSize = batchSize;
        this.batchInterval = batchInterval;
    }
    @Override
    public void open(Configuration parameters) throws Exception {
        ListStateDescriptor cacheDescriptor = new ListStateDescriptor<CdcRecord>("cache", TypeInformation.of(CdcRecord.class));
        cache = getRuntimeContext().getListState(cacheDescriptor);
        ValueStateDescriptor cacheSizeDescriptor = new ValueStateDescriptor<Integer>("cacheSize", Integer.class);
        cacheSize = getRuntimeContext().getState(cacheSizeDescriptor);
        ValueStateDescriptor cacheTimerDescriptor = new ValueStateDescriptor<Long>("cacheTimer", Long.class);
        cacheTimer = getRuntimeContext().getState(cacheTimerDescriptor);
    }
    @Override
    public void processElement(CdcRecord element, KeyedProcessFunction<String, CdcRecord, List<CdcRecord>>.Context ctx, Collector<List<CdcRecord>> out) throws Exception {
        // cache size + 1
        if (cacheSize.value() != null) {
            cacheSize.update(cacheSize.value() + 1);
        } else {
            cacheSize.update(1);
            // add timer for interval flush
            long nextTimer = System.currentTimeMillis() + batchInterval;
            LOG.debug("register timer : {} , key : {}", nextTimer, ctx.getCurrentKey());
            cacheTimer.update(nextTimer);
            ctx.timerService().registerProcessingTimeTimer(nextTimer);
        }
        // add data to cache state
        cache.add(element);
        // cache size max than batch Size
        if (cacheSize.value() >= batchSize) {
            // remove next timer
            long nextTimer = cacheTimer.value();
            LOG.debug("{} remove timer, key : {}", nextTimer, ctx.getCurrentKey());
            ctx.timerService().deleteProcessingTimeTimer(nextTimer);
            // flush data to down stream
            flushData(out);
        }
    }
    /**
     * flush data to down stream
     */
    private void flushData(Collector<List<CdcRecord>> out) throws Exception {
        List<CdcRecord> tmpCache = new ArrayList<>();
        Iterator<CdcRecord> it = cache.get().iterator();
        while (it.hasNext()) {
            tmpCache.add(it.next());
        }
        if (tmpCache.size() > 0) {
            out.collect(tmpCache);
            // finish flush all cache data, clear state
            cache.clear();
            cacheSize.clear();
            cacheTimer.clear();
        }
    }
    @Override
    public void onTimer(long timestamp, KeyedProcessFunction<String, CdcRecord, List<CdcRecord>>.OnTimerContext ctx, Collector<List<CdcRecord>> out) throws Exception {
        LOG.info("{} trigger timer to flush data", ctx.getCurrentKey(), timestamp);
        // batch interval trigger flush data
        flushData(out);
    }
    @Override
    public void close() throws Exception {
    }
}

输出函数 StarRocksSink

StarRocksSink 稍微复杂一点,需要基于数据中的表名,去目标数据库中获取对应的表结构(为了避免每次查询数据库,将获取到的表结构存到内存中),基于目标表的字段顺序从数据中获取对应列的值,拼接上数据的操作类型。

  • StarRocksSink 在往 StarRocks 写数据的时候,实现了 upsert 和 delete 操作,需要在数据中拼接上 0/1,0 代表 UPSERT 操作,1 代表 DELETE 操作
  • 见参考文档1

invoke 方法

StarRocksSink 的核心逻辑都在 invoke 方法中,逻辑如下:

  1. 从数据中获取数据库和表,拼接成 key
  2. 获取目标表的 schema(整库映射,源端和目标端表名一致),先从缓存中获取,如果不存在就从数据库中获取
  3. 组装数据
  4. 拼接 load url
  5. 用 http 方式往 StarRocks 写数据

loadTargetTableSchema 方法

执行 desc db.table 获取目标表的表结构,组装成两种结果:将所有列名用 "," 拼接成字符串,再拼接 "__op" 用于 http header 请求中标识数据的列;将所有列按顺序添加到 list 中,用于从源数据中获取对应列的数据

parseUploadData 方法

用目标表列顺序,从数据中获取对应列的值,使用列分隔符拼接数据,最后基于操作类型拼接 0/1,删除拼接 1,其他类型拼接 0

doHttp 方法

用 http 的方式往 StarRocks 中写数据,没什么特别的,忽略

import org.apache.commons.codec.binary.Base64;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.http.HttpException;
import org.apache.http.HttpHeaders;
import org.apache.http.HttpRequest;
import org.apache.http.HttpRequestInterceptor;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.DefaultRedirectStrategy;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.protocol.HTTP;
import org.apache.http.protocol.HttpContext;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.sql.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class StarRocksSink extends RichSinkFunction<List<CdcRecord>> {
    private final static Logger LOG = LoggerFactory.getLogger(StarRocksSink.class);
    public final static String COL_SEP = "\\\\x01";
    public final static String ROW_SEP = "\\\\x02";
    public final static String NULL_COL = "\\N";
    private String ip;
    private String port;
    private String loadPort;
    private String user;
    private String pass;
    private String db;
    private Connection connection;
    private Map<String, String> spliceColumnMap = new HashMap<>();
    private Map<String, List<String>> columnMap = new HashMap<>();
    public StarRocksSink() {
    }
    public StarRocksSink(String ip, String port, String loadPort, String user, String pass, String db) {
        this.ip = ip;
        this.port = port;
        this.loadPort = loadPort;
        this.user = user;
        this.pass = pass;
        this.db = db;
    }
    @Override
    public void open(Configuration parameters) throws Exception {
        reConnect();
    }
    @Override
    public void invoke(List<CdcRecord> element, Context context) throws Exception {
        LOG.info("write batch size: " + element.size());
        if(element == null || element.size() ==0){
            LOG.info("ignore empty element");
            return;
        }
        // use StarRocks db name
//        String db = cache.get(0).getDb();
        String table = element.get(0).getTable();
        String key = db + "_" + table;
        // get table schema
        List<String> columnList;
        if (!columnMap.containsKey(key)) {
            // db.table is first coming, load column, put to spliceColumnMap & columnMap
            loadTargetTableSchema(key, db, table);
        }
        String columns = spliceColumnMap.get(key);
        columnList = columnMap.get(key);
        if (columnList.size() == 0) {
            LOG.info("{}.{} not exists in target starrocks, ingore data change", db, table);
        }
        // make up data
        String data = parseUploadData(element, columnList);
        final String loadUrl = String.format("http://%s:%s/api/%s/%s/_stream_load", ip, loadPort, db, table);
        String label = db + "_" + table + "_" + System.currentTimeMillis();
        // send data to starrocks
        doHttp(loadUrl, data, label, columns);
    }
    /**
     * http send data to starrocks
     */
    private void doHttp(String loadUrl, String data, String label, String columns) throws IOException, SQLException {
        final HttpClientBuilder httpClientBuilder = HttpClients
                .custom()
                .setRedirectStrategy(new DefaultRedirectStrategy() {
                    @Override
                    protected boolean isRedirectable(String method) {
                        return true;
                    }
                })
                .addInterceptorFirst(new ContentLengthHeaderRemover());
        try (CloseableHttpClient client = httpClientBuilder.build()) {
            HttpPut put = new HttpPut(loadUrl);
            StringEntity entity = new StringEntity(data, "UTF-8");
            put.setHeader(HttpHeaders.EXPECT, "100-continue");
            put.setHeader(HttpHeaders.AUTHORIZATION, basicAuthHeader(user, pass));
            // the label header is optional, not necessary
            // use label header can ensure at most once semantics
            put.setHeader("label", label);
            put.setHeader("columns", columns);
            put.setHeader("row_delimiter", ROW_SEP);
            put.setHeader("column_separator", COL_SEP);
            put.setEntity(entity);
            try (CloseableHttpResponse response = client.execute(put)) {
                String loadResult = "";
                if (response.getEntity() != null) {
                    loadResult = EntityUtils.toString(response.getEntity());
                }
                final int statusCode = response.getStatusLine().getStatusCode();
                // statusCode 200 just indicates that starrocks be service is ok, not stream load
                // you should see the output content to find whether stream load is success
                if (statusCode != 200) {
                    throw new IOException(
                            String.format("Stream load failed, statusCode=%s load result=%s", statusCode, loadResult));
                }
            }
        }
    }
    private String basicAuthHeader(String username, String password) {
        final String tobeEncode = username + ":" + password;
        byte[] encoded = Base64.encodeBase64(tobeEncode.getBytes(StandardCharsets.UTF_8));
        return "Basic " + new String(encoded);
    }
    private String parseUploadData(List<CdcRecord> cache, List<String> columnList) {
        StringBuilder builder = new StringBuilder();
        for (CdcRecord element : cache) {
            Map<String, String> data = element.getData();
            for (String column : columnList) {
                if (data.containsKey(column)) {
                    builder.append(data.get(column)).append(COL_SEP);
                } else {
                    // if target column not exists in source data, set as null
                    builder.append(NULL_COL).append(COL_SEP);
                }
            }
            // add __op
            if ("d".equals(element.getOp())) {
                // delete
                builder.append("1");
            } else {
                // upsert
                builder.append("0");
            }
            // add row separator
            builder.append(ROW_SEP);
        }
        // remove last row sep
        builder = builder.delete(builder.length() - 5, builder.length());
        String data = builder.toString();
        return data;
    }
    /**
     * load table schema, parse to http column and column list for load source data
     */
    private void loadTargetTableSchema(String key, String db, String table) throws SQLException {
        List<String> columnList = new ArrayList<>();
        StringBuilder builer = new StringBuilder();
        try {
            // load table schema
            PreparedStatement insertPS = connection.prepareStatement("desc " + db + "." + table);
            ResultSet result = insertPS.executeQuery();
            while (result.next()) {
                String column = result.getString(1);
                builer.append(column).append(",");
                columnList.add(column);
            }
        } catch (SQLException e) {
            LOG.warn("load {}.{} schema error. {}", db, table, e.getStackTrace());
        }
        builer.append("__op");
        String columns = builer.toString();
        spliceColumnMap.put(key, columns);
        columnMap.put(key, columnList);
    }
    /**
     * reconnect to starrocks
     *
     * @throws SQLException
     */
    private void reConnect() throws SQLException {
        String driver = "jdbc:mysql://" + ip + ":" + port;
        if (connection == null || connection.isClosed()) {
            connection = DriverManager.getConnection(driver, user, pass);
        }
    }
    @Override
    public void finish() throws Exception {
        LOG.info("finish");
    }
    @Override
    public void close() throws Exception {
        LOG.info("close...");
        connection.close();
    }
    private static class ContentLengthHeaderRemover implements HttpRequestInterceptor {
        @Override
        public void process(HttpRequest request, HttpContext context) throws HttpException, IOException {
            // fighting org.apache.http.protocol.RequestContent's ProtocolException("Content-Length header already present");
            request.removeHeaders(HTTP.CONTENT_LEN);
        }
    }
}

局限性:

  1. 还未实现 starrocks 端表结构跟随 源端表结构同步变更
  2. 为了保证效率,仅会在每一个表第一次来的时候判断目标段是否存在该表,如果已经判定该表不存在,后续直接忽略该表的数据变更
  3. 部分不导入的表,只在sink 的时候做了过滤,前面的操作还是要继续,可以考虑在 反序列化活map中过滤掉目标库中不存在的表数据


flink cdc到table store

package name.lijiaqi;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class MysqlToHudiExample {
    public static void main(String[] args) throws Exception {
        EnvironmentSettings fsSettings = EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, fsSettings);
        tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
        // 数据源表
    String sourceDDL =
            "CREATE TEMPORARY TABLE ods_lineitem (\n" +
            " l_orderkey INT NOT NULL,\n" +
            " l_partkey INT NOT NULL,\n" +
            " l_suppkey INT NOT NULL,\n" +
            " l_linenumber INT NOT NULL,\n" +
            " l_quantity DECIMAL(15, 2) NOT NULL,\n" +
            " l_extendedprice DECIMAL(15, 2) NOT NULL,\n" +
            " l_discount DECIMAL(15, 2) NOT NULL,\n" +
            " l_tax DECIMAL(15, 2) NOT NULL,\n" +
            " l_returnflag CHAR(1) NOT NULL,\n" +
            " l_linestatus CHAR(1) NOT NULL,\n" +
            " l_shipdate DATE NOT NULL,\n" +
            " l_commitdate DATE NOT NULL,\n" +
            " l_receiptdate DATE NOT NULL,\n" +
            " l_shipinstruct CHAR(25) NOT NULL,\n" +
            " l_shipmode CHAR(10) NOT NULL,\n" +
            " l_comment VARCHAR(44) NOT NULL,\n" +
            " PRIMARY KEY (l_orderkey, l_linenumber) NOT ENFORCED \n" +
            ") WITH (\n" +
            "  'connector' = 'mysql-cdc',\n" +
            "  'hostname' = '127.0.0.1',\n" + -- 如果想使用 host,可以修改宿主机 /etc/hosts 加入 127.0.0.1 mysql.docker.internal
            "  'port' = '3307',\n" +
            "  'username' = 'flink',\n" +
            "  'password' = 'flink',\n" +
            "  'database-name' = 'tpch_s10',\n" +
            "  'table-name' = 'lineitem' \n" +
            ");"  
    String tableSourceDDL=
      "CREATE CATALOG `table_store` WITH (\n" +
      "'type' = 'table-store',\n" +
      "'warehouse' = '/tmp/table-store-101'\n" +
      ");\n" +
      "USE CATALOG `table_store`;"
        // 输出目标表
        String sinkDWDDDL =
            "CREATE TABLE IF NOT EXISTS dwd_lineitem (\n" +
        " l_orderkey INT NOT NULL,\n" +
        " l_partkey INT NOT NULL,\n" +
        " l_suppkey INT NOT NULL,\n" +
        " l_linenumber INT NOT NULL,\n" +
        " l_quantity DECIMAL(15, 2) NOT NULL,\n" +
        " l_extendedprice DECIMAL(15, 2) NOT NULL,\n" +
        " l_discount DECIMAL(15, 2) NOT NULL,\n" +
        " l_tax DECIMAL(15, 2) NOT NULL,\n" +
        " l_returnflag CHAR(1) NOT NULL,\n" +
        " l_linestatus CHAR(1) NOT NULL,\n" +
        " l_shipdate DATE NOT NULL,\n" +
        " l_commitdate DATE NOT NULL,\n" +
        " l_receiptdate DATE NOT NULL,\n" +
        " l_shipinstruct CHAR(25) NOT NULL,\n" +
        " l_shipmode CHAR(10) NOT NULL,\n" +
        " l_comment VARCHAR(44) NOT NULL,\n" +
        " l_year BIGINT NOT NULL,\n" +
        " l_month BIGINT NOT NULL,\n" +
        " PRIMARY KEY (l_orderkey, l_linenumber, l_year, l_month) NOT ENFORCED \n" +
        " ) PARTITIONED BY (l_year, l_month) WITH (
        " -- 每个 partition 下设置 2 个 bucket
        " 'bucket' = '2',\n" +
        " -- 设置 changelog-producer 为 'input',这会使得上游 CDC Source 不丢弃 update_before,并且下游消费 dwd_lineitem 时没有 changelog-normalize 节点
        " 'changelog-producer' = 'input' \n" +
        " );"
         // 输出目标表
        String sinkADSDDL =
      "CREATE  TABLE IF NOT EXISTS ads_pricing_summary (\n" +
      "l_returnflag CHAR(1) NOT  NULL,\n" +
      "l_linestatus CHAR(1) NOT  NULL,\n" +
      "sum_quantity DOUBLE  NOT NULL,\n" +
      "sum_base_price DOUBLE  NOT NULL,\n" +
      "sum_discount_price DOUBLE  NOT NULL,\n" +
      "sum_charge_vat_inclusive DOUBLE  NOT NULL,\n" +
      "avg_quantity DOUBLE  NOT NULL,\n" +
      "avg_base_price DOUBLE  NOT NULL,\n" +
      "avg_discount DOUBLE  NOT NULL,\n" +
      "count_order BIGINT  NOT NULL \n" +
      ") WITH ( \n" +
      "'bucket' = '2'\n" +
      ");"
        // 简单的聚合处理
        String transformSQL1 =
                "INSERT  INTO dwd_lineitem SELECT l_orderkey,l_partkey,l_suppkey,l_linenumber,l_quantity,l_extendedprice,l_discount,\n" +
                 "l_tax,l_returnflag,l_linestatus,l_shipdate,l_commitdate,l_receiptdate,l_shipinstruct,l_shipmode,l_comment,\n" +
          "YEAR(l_shipdate) AS l_year,MONTH(l_shipdate) AS l_month FROM ods_lineitem;"
    String transformSQL2=
               "INSERT  INTO ads_pricing_summary SELECT l_returnflag,l_linestatus,SUM(l_quantity) AS sum_quantity,\n" +
               "SUM(l_extendedprice) AS sum_base_price,SUM(l_extendedprice * (1-l_discount)) AS sum_discount_price,\n" + 
                "SUM(l_extendedprice * (1-l_discount) * (1+l_tax)) AS sum_charge_vat_inclusive,AVG(l_quantity) AS avg_quantity,\n" + 
                "AVG(l_extendedprice) AS avg_base_price,AVG(l_discount) AS avg_discount,COUNT(*) AS count_order FROM dwd_lineitem \n" + 
                "WHERE (l_year < 1998 OR (l_year = 1998 AND l_month<= 9)) AND l_shipdate <= DATE '1998-12-01' - INTERVAL '90' DAY \n" + 
                "GROUP BY  l_returnflag,l_linestatus;"    
        // 插入hudi表
        tableEnv.executeSql(tableSourceDDL);
        tableEnv.executeSql(sourceDDL);
        tableEnv.executeSql(sinkDWDDDL);
        tableEnv.executeSql(sinkADSDDL);
        TableResult result = tableEnv.executeSql(transformSQL1);
        TableResult result = tableEnv.executeSql(transformSQL2);
        env.execute("mysql-to-tableSource");
    }
}

详细请参阅:

https://github.com/LadyForest/flink-table-store-101/blob/master/real-time-update/README.zh.md


相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
相关文章
|
3月前
|
SQL 消息中间件 关系型数据库
Apache Doris Flink Connector 24.0.0 版本正式发布
该版本新增了对 Flink 1.20 的支持,并支持通过 Arrow Flight SQL 高速读取 Doris 中数据。
|
25天前
|
消息中间件 资源调度 关系型数据库
如何在Flink on YARN环境中配置Debezium CDC 3.0,以实现实时捕获数据库变更事件并将其传输到Flink进行处理
本文介绍了如何在Flink on YARN环境中配置Debezium CDC 3.0,以实现实时捕获数据库变更事件并将其传输到Flink进行处理。主要内容包括安装Debezium、配置Kafka Connect、创建Flink任务以及启动任务的具体步骤,为构建实时数据管道提供了详细指导。
52 9
|
3月前
|
算法 API Apache
Flink CDC:新一代实时数据集成框架
本文源自阿里云实时计算团队 Apache Flink Committer 任庆盛在 Apache Asia CommunityOverCode 2024 的分享,涵盖 Flink CDC 的概念、版本历程、内部实现及社区未来规划。Flink CDC 是一种基于数据库日志的 CDC 技术实现的数据集成框架,能高效完成全量和增量数据的实时同步。自 2020 年以来,Flink CDC 经过多次迭代,已成为功能强大的实时数据集成工具,支持多种数据库和数据湖仓系统。未来将进一步扩展生态并提升稳定性。
635 2
Flink CDC:新一代实时数据集成框架
|
3月前
|
消息中间件 canal 数据采集
Flink CDC 在货拉拉的落地与实践
陈政羽在Apache Asia Community Over Code 2024上分享了《货拉拉在Flink CDC生产实践落地》。文章介绍了货拉拉业务背景、技术选型及其在实时数据采集中的挑战与解决方案,详细阐述了Flink CDC的技术优势及在稳定性、兼容性等方面的应用成果。通过实际案例展示了Flink CDC在提升数据采集效率、降低延迟等方面的显著成效,并展望了未来发展方向。
570 14
Flink CDC 在货拉拉的落地与实践
|
4月前
|
Oracle 关系型数据库 新能源
Flink CDC 在新能源制造业的实践
本文撰写自某新能源企业的研发工程师 单葛尧 老师。本文详细介绍该新能源企业的大数据平台中 CDC 技术架构选型和 Flink CDC 的最佳实践。
464 13
Flink CDC 在新能源制造业的实践
|
3月前
|
运维 数据处理 数据安全/隐私保护
阿里云实时计算Flink版测评报告
该测评报告详细介绍了阿里云实时计算Flink版在用户行为分析与标签画像中的应用实践,展示了其毫秒级的数据处理能力和高效的开发流程。报告还全面评测了该服务在稳定性、性能、开发运维及安全性方面的卓越表现,并对比自建Flink集群的优势。最后,报告评估了其成本效益,强调了其灵活扩展性和高投资回报率,适合各类实时数据处理需求。
|
1月前
|
存储 分布式计算 流计算
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
本文介绍了阿里云开源大数据团队在实时计算领域的最新成果——向量化流计算引擎Flash。文章主要内容包括:Apache Flink 成为业界流计算标准、Flash 核心技术解读、性能测试数据以及在阿里巴巴集团的落地效果。Flash 是一款完全兼容 Apache Flink 的新一代流计算引擎,通过向量化技术和 C++ 实现,大幅提升了性能和成本效益。
960 73
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
zdl
|
18天前
|
消息中间件 运维 大数据
大数据实时计算产品的对比测评:实时计算Flink版 VS 自建Flink集群
本文介绍了实时计算Flink版与自建Flink集群的对比,涵盖部署成本、性能表现、易用性和企业级能力等方面。实时计算Flink版作为全托管服务,显著降低了运维成本,提供了强大的集成能力和弹性扩展,特别适合中小型团队和业务波动大的场景。文中还提出了改进建议,并探讨了与其他产品的联动可能性。总结指出,实时计算Flink版在简化运维、降低成本和提升易用性方面表现出色,是大数据实时计算的优选方案。
zdl
128 56
|
5月前
|
存储 监控 大数据
阿里云实时计算Flink在多行业的应用和实践
本文整理自 Flink Forward Asia 2023 中闭门会的分享。主要分享实时计算在各行业的应用实践,对回归实时计算的重点场景进行介绍以及企业如何使用实时计算技术,并且提供一些在技术架构上的参考建议。
845 7
阿里云实时计算Flink在多行业的应用和实践
|
4月前
|
SQL 消息中间件 Kafka
实时计算 Flink版产品使用问题之如何在EMR-Flink的Flink SOL中针对source表单独设置并行度
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。

热门文章

最新文章