Flink cdc到doris,starrocks,table store

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
实时计算 Flink 版,5000CU*H 3个月
简介: Flink cdc到doris,starrocks,table store


flink cdc到doris


flink cdc同步到doris主类:

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.sink.DorisSink;
import org.apache.doris.flink.sink.writer.SimpleStringSerializer;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.UUID;
/***
 *
 * Synchronize the full database through flink cdc
 *
 */
public class DatabaseFullSync {
    private static final Logger LOG = LoggerFactory.getLogger(DatabaseFullSync.class);
    private static String HOST = "127.0.0.1";
    private static String MYSQL_PASSWD = "password";
    private static int MYSQL_PORT = 3306;
    private static int DORIS_PORT = 8030;
    private static String MYSQL_USER = "root";
    private static String SYNC_DB = "test";
    private static String SYNC_TBLS = "test.*";
    private static String TARGET_DORIS_DB = "test";
    public static void main(String[] args) throws Exception {
        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
            .hostname(HOST)
            .port(MYSQL_PORT)
            .databaseList(SYNC_DB) // set captured database
            .tableList(SYNC_TBLS) // set captured table
            .username(MYSQL_USER)
            .password(MYSQL_PASSWD)
            .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
            .build();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        // enable checkpoint
        env.enableCheckpointing(10000);
        DataStreamSource<String> cdcSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL CDC Source");
        //get table list
        List<String> tableList = getTableList();
        LOG.info("sync table list:{}",tableList);
        for(String tbl : tableList){
            SingleOutputStreamOperator<String> filterStream = filterTableData(cdcSource, tbl);
            SingleOutputStreamOperator<String> cleanStream = clean(filterStream);
            DorisSink dorisSink = buildDorisSink(tbl);
            cleanStream.sinkTo(dorisSink).name("sink " + tbl);
        }
        env.execute("Full Database Sync ");
    }
    /**
     * Get real data
     * {
     *     "before":null,
     *     "after":{
     *         "id":1,
     *         "name":"zhangsan-1",
     *         "age":18
     *     },
     *     "source":{
     *         "db":"test",
     *         "table":"test_1",
     *         ...
     *     },
     *     "op":"c",
     *     ...
     * }
     * */
    private static SingleOutputStreamOperator<String> clean(SingleOutputStreamOperator<String> source) {
        return source.flatMap(new FlatMapFunction<String,String>(){
            @Override
            public void flatMap(String row, Collector<String> out) throws Exception {
                try{
                    JSONObject rowJson = JSON.parseObject(row);
                    String op = rowJson.getString("op");
                    //history,insert,update
                    if(Arrays.asList("r","c","u").contains(op)){
                        out.collect(rowJson.getJSONObject("after").toJSONString());
                    }else{
                        LOG.info("filter other op:{}",op);
                    }
                }catch (Exception ex){
                    LOG.warn("filter other format binlog:{}",row);
                }
            }
        });
    }
    /**
     * Divide according to tablename
     * */
    private static SingleOutputStreamOperator<String> filterTableData(DataStreamSource<String> source, String table) {
        return source.filter(new FilterFunction<String>() {
            @Override
            public boolean filter(String row) throws Exception {
                try {
                    JSONObject rowJson = JSON.parseObject(row);
                    JSONObject source = rowJson.getJSONObject("source");
                    String tbl = source.getString("table");
                    return table.equals(tbl);
                }catch (Exception ex){
                    ex.printStackTrace();
                    return false;
                }
            }
        });
    }
    /**
     * Get all MySQL tables that need to be synchronized
     * */
    private static List<String> getTableList() {
        List<String> tables = new ArrayList<>();
        String sql = "SELECT TABLE_SCHEMA,TABLE_NAME FROM information_schema.tables WHERE TABLE_SCHEMA = '" + SYNC_DB + "'";
        List<JSONObject> tableList = JdbcUtil.executeQuery(HOST, MYSQL_PORT, MYSQL_USER, MYSQL_PASSWD, sql);
        for(JSONObject jsob : tableList){
            String schemaName = jsob.getString("TABLE_SCHEMA");
            String tblName = jsob.getString("TABLE_NAME");
            String schemaTbl = schemaName  + "." + tblName;
            if(schemaTbl.matches(SYNC_TBLS)){
                tables.add(tblName);
            }
        }
        return tables;
    }
    /**
     * create doris sink
     * */
    public static DorisSink buildDorisSink(String table){
        DorisSink.Builder<String> builder = DorisSink.builder();
        DorisOptions.Builder dorisBuilder = DorisOptions.builder();
        dorisBuilder.setFenodes(HOST + ":" + DORIS_PORT)
            .setTableIdentifier(TARGET_DORIS_DB + "." + table)
            .setUsername("root")
            .setPassword("");
        Properties pro = new Properties();
        //json data format
        pro.setProperty("format", "json");
        pro.setProperty("read_json_by_line", "true");
        DorisExecutionOptions executionOptions = DorisExecutionOptions.builder()
            .setLabelPrefix("label-" + table + UUID.randomUUID()) //streamload label prefix,
            .setStreamLoadProp(pro).build();
        builder.setDorisReadOptions(DorisReadOptions.builder().build())
            .setDorisExecutionOptions(executionOptions)
            .setSerializer(new SimpleStringSerializer()) //serialize according to string
            .setDorisOptions(dorisBuilder.build());
        return builder.build();
    }
}

JdbcUtil类:

import com.alibaba.fastjson.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
public class JdbcUtil {
    static {
        try {
            Class.forName("com.mysql.jdbc.Driver");
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        }
    }
    private static final Logger LOG = LoggerFactory.getLogger(JdbcUtil.class);
    public static void main(String[] args) throws SQLException {
    }
    public static List<JSONObject> executeQuery(String hostUrl, int port, String user, String password, String sql){
        List<JSONObject> beJson = new ArrayList<>();
        String connectionUrl = String.format("jdbc:mysql://%s:%s/",hostUrl,port);
        Connection con = null;
        try {
            con = DriverManager.getConnection(connectionUrl,user,password);
            PreparedStatement ps = con.prepareStatement(sql);
            ResultSet rs = ps.executeQuery();
            beJson = resultSetToJson(rs);
        } catch (SQLException e) {
            e.printStackTrace();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                con.close();
            } catch (Exception e) {
            }
        }
        return beJson;
    }
    private static List<JSONObject> resultSetToJson(ResultSet rs) throws SQLException {
        List<JSONObject> list = new ArrayList<>();
        ResultSetMetaData metaData = rs.getMetaData();
        int columnCount = metaData.getColumnCount();
        while (rs.next()) {
            JSONObject jsonObj = new JSONObject();
            for (int i = 1; i <= columnCount; i++) {
                String columnName =metaData.getColumnLabel(i);
                String value = rs.getString(columnName);
                jsonObj.put(columnName, value);
            }
            list.add(jsonObj);
        }
        return list;
    }
}


flink cdc到starrocks


主要实现的流程:

  1. Flink cdc 采集 mysql 数据
  2. 将 cdc 采集到的数据转为 json
  3. 从 json 中获取 数据库、表和数据
  4. 用数据库和表对数据做 key by
  5. 使用 process function 处理每个表的数据,用状态缓存数据,缓存数据达到一定量或者缓存了一定时间(用 timer 触发缓存时间触发的场景)StarRocks 写数据
  6. sink 中拼接数据 使用 Stream Load 往 StarRocks 写数据


主类 CdcToStarRocks

       主要流程很简单: source -> map -> keyBy -> process -> sink,source 读取 mysql binlog(或者全量+增量),map 转换数据格式,keyBy 以数据库名 + 表名对数据分区,process 中对数据攒批,sink 输出数据到 StarRocks

import com.venn.source.mysql.cdc.CommonStringDebeziumDeserializationSchema;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Properties;
/**
 * mysql cdc demo
 * <p>
 * cdc 整库同步数据到 starrocks
 * <p>
 * 局限:
 * 1. 还未实现 starrocks 端表结构跟随 源端表结构同步变更
 * 2. 为了保证效率,仅会在每一个表第一次来的时候判断目标段是否存在该表,如果已经判定该表不存在,后续直接忽略该表的数据变更
 * 3. 部分不导入的表,只在sink 的时候做了过滤,前面的操作还是要继续,可以考虑在 反序列化活map中过滤掉目标库中不存在的表数据
 */
public class CdcToStarRocks {
    // 每个批次最大条数和等待时间
    private static int batchSize = 10000;
    private static long batchInterval = 10 *1000;
    public static void main(String[] args) throws Exception {
        String ip = "localhost";
        int port = 3306;
        String db = "venn";
//        String table = "venn.user_log";
        String table = "venn.*";
        String user = "root";
        String pass = "123456";
        String starrocksIp = "10.201.0.230";
        String starrocksPort = "29030";
        String starrocksLoadPort = "28030";
        String starrocksUser = "root";
        String starrocksPass = "123456";
        String starrocksDb = "test";
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        MySqlSource<String> sourceFunction = MySqlSource.<String>builder()
                .hostname(ip)
                .port(port)
                // 获取两个数据库的所有表
                .databaseList(db)
                .tableList(table)
                .username(user)
                .password(pass)
                .startupOptions(StartupOptions.latest())
                // do not cache schema change
//                .includeSchemaChanges(false)
//                .startupOptions(StartupOptions.initial())
                // 自定义 解析器,讲数据解析成 json
                .deserializer(new CommonStringDebeziumDeserializationSchema(ip, port))
                .build();
        env
                .fromSource(sourceFunction, WatermarkStrategy.noWatermarks(), "cdc")
                .name("source")
                .uid("source")
//                 json 字符串转 CdcRecord
                .map(new CdcStarMapFunction())
                .name("map")
                .keyBy(  record -> record.getDb() + "_" + record.getTable())
                .process(new CdcStarProcessFunction(batchSize, batchInterval))
                .name("process")
                .uid("process")
                .addSink(new StarRocksSink(starrocksIp, starrocksPort, starrocksLoadPort, starrocksUser, starrocksPass, starrocksDb))
                .name("sink");
        env.execute("cdcToStarRocks");
    }
}

反序列化器 CommonStringDebeziumDeserializationSchema

反序列化器直接拿之前写的通用的 flink cdc 反序列化器过来,继承 DebeziumDeserializationSchema,主要是从数据中获取 数据库、表、操作类型和数据,需求特别注意以下几点:

  1. insert 类型的操心,只需要获取 after 中的数据
  2. update 类型的操作,需要同时解析 before 和 after 的数据,before 是修改前的,after 是修改后的,如果不需要修改前的,可以只获取 after
  3. delete 类型的操作,需要获取 before
  4. 如果有 ddl 操作,需要特殊处理(本次不包含)
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.google.gson.JsonObject;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
/**
 * deserialize debezium format binlog
 */
public class CommonStringDebeziumDeserializationSchema implements DebeziumDeserializationSchema<String> {
    private String host;
    private int port;
    public CommonStringDebeziumDeserializationSchema(String host, int port) {
        this.host = host;
        this.port = port;
    }
    public void deserialize(SourceRecord record, Collector<String> out) {
        JsonObject jsonObject = new JsonObject();
        String binlog = record.sourceOffset().get("file").toString();
        String offset = record.sourceOffset().get("pos").toString();
        String ts_sec = record.sourceOffset().get("ts_sec").toString();
//        System.out.println("binlog : " + binlog + ", offset = " + offset);
        // todo get schame change
        jsonObject.addProperty("host", host);
        // add meta
        jsonObject.addProperty("binlog", binlog);
        jsonObject.addProperty("offset", offset);
        jsonObject.addProperty("ts_sec", ts_sec);
        jsonObject.addProperty("port", port);
        jsonObject.addProperty("file", (String) record.sourceOffset().get("file"));
        jsonObject.addProperty("pos", (Long) record.sourceOffset().get("pos"));
        jsonObject.addProperty("ts_sec", (Long) record.sourceOffset().get("ts_sec"));
        String[] name = record.valueSchema().name().split("\\.");
        jsonObject.addProperty("db", name[1]);
        jsonObject.addProperty("table", name[2]);
        Struct value = ((Struct) record.value());
        String operatorType = value.getString("op");
        jsonObject.addProperty("operator_type", operatorType);
        // c : create, u: update, d: delete, r: read
        // insert update
        if (!"d".equals(operatorType)) {
            Struct after = value.getStruct("after");
            JsonObject afterJsonObject = parseRecord(after);
            jsonObject.add("after", afterJsonObject);
        }
        // update & delete
        if ("u".equals(operatorType) || "d".equals(operatorType)) {
            Struct source = value.getStruct("before");
            JsonObject beforeJsonObject = parseRecord(source);
            jsonObject.add("before", beforeJsonObject);
        }
        jsonObject.addProperty("parse_time", System.currentTimeMillis() / 1000);
        out.collect(jsonObject.toString());
    }
    private JsonObject parseRecord(Struct after) {
        JsonObject jo = new JsonObject();
        for (Field field : after.schema().fields()) {
            switch ((field.schema()).type()) {
                case INT8:
                    int resultInt8 = after.getInt8(field.name());
                    jo.addProperty(field.name(), resultInt8);
                    break;
                case INT64:
                    Long resultInt = after.getInt64(field.name());
                    jo.addProperty(field.name(), resultInt);
                    break;
                case FLOAT32:
                    Float resultFloat32 = after.getFloat32(field.name());
                    jo.addProperty(field.name(), resultFloat32);
                    break;
                case FLOAT64:
                    Double resultFloat64 = after.getFloat64(field.name());
                    jo.addProperty(field.name(), resultFloat64);
                    break;
                case BYTES:
                    // json ignore byte column
                    // byte[] resultByte = after.getBytes(field.name());
                    // jo.addProperty(field.name(), String.valueOf(resultByte));
                    break;
                case STRING:
                    String resultStr = after.getString(field.name());
                    jo.addProperty(field.name(), resultStr);
                    break;
                default:
            }
        }
        return jo;
    }
    public TypeInformation<String> getProducedType() {
        return BasicTypeInfo.STRING_TYPE_INFO;
    }
}

转换函数 CdcStarMapFunction

CdcStarMapFunction 比较简单,将 json 字符串,转成 CdcRecord 类型的对象,只获取了需要的 数据库、表、操作类型和数据。

获取数据时,insert 和 update 只获取 after 的值

import java.util.LinkedHashMap;
import java.util.Map;
import lombok.Data;
/**
 * cdcRecord save
 */
 @Data
 public class CdcRecord {
    private String db;
    private String table;
    private String op;
    private Map<String, String> data = new LinkedHashMap<>();
}
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
public class CdcStarMapFunction extends RichMapFunction<String, CdcRecord> {
    private final static Logger LOG = LoggerFactory.getLogger(CdcStarMapFunction.class);
    private JsonParser parser;
    @Override
    public void open(Configuration parameters) throws Exception {
        parser = new JsonParser();
    }
    @Override
    public CdcRecord map(String element) throws Exception {
        LOG.debug("data : {}" , element );
        JsonObject object = parser.parse(element).getAsJsonObject();
        String db = object.get("db").getAsString();
        String table = object.get("table").getAsString();
        String op = object.get("operator_type").getAsString();
        CdcRecord record = new CdcRecord(db, table, op);
        // insert/update
        String dataLocation = "after";
        if("d".equals(op)){
            // if op is delete, get before
            dataLocation = "before";
        }
        JsonObject data = object.get(dataLocation).getAsJsonObject();
        for(Map.Entry<String, JsonElement> entry: data.entrySet()){
            String columnName = entry.getKey();
            String columnValue;
            JsonElement value = entry.getValue();
            if(!value.isJsonNull()){
                // if column value is not null, get as string
                columnValue = value.getAsString();
                // put column name/value to record.data
                record.getData().put(columnName, columnValue);
            }
        }
        return record;
    }
}

处理函数 CdcStarProcessFunction

CdcStarProcessFunction 有三部分逻辑:

  1. 三个状态cacheTimer、cacheSize、cache,分别存下一次timer 触发时间、缓存的数据条数、缓存的数据
  2. process 处理每个表的数据,每个表的数据第一次到的时候,基于当前时间 + batchInterval,注册下次时间触发的 timer。数据存储到 cache 中,如果数据量超过预定的 batchSize,触发 flushData 方法往下游输出数据,并删除之前注册的定时器,清理状态
  3. timer 触发 flushData 方法往下游输出数据,清理状态
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
public class CdcStarProcessFunction extends KeyedProcessFunction<String, CdcRecord, List<CdcRecord>> {
    private final static Logger LOG = LoggerFactory.getLogger(CdcStarProcessFunction.class);
    private int batchSize;
    private long batchInterval;
    // next timer time
    private ValueState<Long> cacheTimer;
    // current cache size
    private ValueState<Integer> cacheSize;
    // cache data
    private ListState<CdcRecord> cache;
    public CdcStarProcessFunction(int batchSize, long batchInterval) {
        this.batchSize = batchSize;
        this.batchInterval = batchInterval;
    }
    @Override
    public void open(Configuration parameters) throws Exception {
        ListStateDescriptor cacheDescriptor = new ListStateDescriptor<CdcRecord>("cache", TypeInformation.of(CdcRecord.class));
        cache = getRuntimeContext().getListState(cacheDescriptor);
        ValueStateDescriptor cacheSizeDescriptor = new ValueStateDescriptor<Integer>("cacheSize", Integer.class);
        cacheSize = getRuntimeContext().getState(cacheSizeDescriptor);
        ValueStateDescriptor cacheTimerDescriptor = new ValueStateDescriptor<Long>("cacheTimer", Long.class);
        cacheTimer = getRuntimeContext().getState(cacheTimerDescriptor);
    }
    @Override
    public void processElement(CdcRecord element, KeyedProcessFunction<String, CdcRecord, List<CdcRecord>>.Context ctx, Collector<List<CdcRecord>> out) throws Exception {
        // cache size + 1
        if (cacheSize.value() != null) {
            cacheSize.update(cacheSize.value() + 1);
        } else {
            cacheSize.update(1);
            // add timer for interval flush
            long nextTimer = System.currentTimeMillis() + batchInterval;
            LOG.debug("register timer : {} , key : {}", nextTimer, ctx.getCurrentKey());
            cacheTimer.update(nextTimer);
            ctx.timerService().registerProcessingTimeTimer(nextTimer);
        }
        // add data to cache state
        cache.add(element);
        // cache size max than batch Size
        if (cacheSize.value() >= batchSize) {
            // remove next timer
            long nextTimer = cacheTimer.value();
            LOG.debug("{} remove timer, key : {}", nextTimer, ctx.getCurrentKey());
            ctx.timerService().deleteProcessingTimeTimer(nextTimer);
            // flush data to down stream
            flushData(out);
        }
    }
    /**
     * flush data to down stream
     */
    private void flushData(Collector<List<CdcRecord>> out) throws Exception {
        List<CdcRecord> tmpCache = new ArrayList<>();
        Iterator<CdcRecord> it = cache.get().iterator();
        while (it.hasNext()) {
            tmpCache.add(it.next());
        }
        if (tmpCache.size() > 0) {
            out.collect(tmpCache);
            // finish flush all cache data, clear state
            cache.clear();
            cacheSize.clear();
            cacheTimer.clear();
        }
    }
    @Override
    public void onTimer(long timestamp, KeyedProcessFunction<String, CdcRecord, List<CdcRecord>>.OnTimerContext ctx, Collector<List<CdcRecord>> out) throws Exception {
        LOG.info("{} trigger timer to flush data", ctx.getCurrentKey(), timestamp);
        // batch interval trigger flush data
        flushData(out);
    }
    @Override
    public void close() throws Exception {
    }
}

输出函数 StarRocksSink

StarRocksSink 稍微复杂一点,需要基于数据中的表名,去目标数据库中获取对应的表结构(为了避免每次查询数据库,将获取到的表结构存到内存中),基于目标表的字段顺序从数据中获取对应列的值,拼接上数据的操作类型。

  • StarRocksSink 在往 StarRocks 写数据的时候,实现了 upsert 和 delete 操作,需要在数据中拼接上 0/1,0 代表 UPSERT 操作,1 代表 DELETE 操作
  • 见参考文档1

invoke 方法

StarRocksSink 的核心逻辑都在 invoke 方法中,逻辑如下:

  1. 从数据中获取数据库和表,拼接成 key
  2. 获取目标表的 schema(整库映射,源端和目标端表名一致),先从缓存中获取,如果不存在就从数据库中获取
  3. 组装数据
  4. 拼接 load url
  5. 用 http 方式往 StarRocks 写数据

loadTargetTableSchema 方法

执行 desc db.table 获取目标表的表结构,组装成两种结果:将所有列名用 "," 拼接成字符串,再拼接 "__op" 用于 http header 请求中标识数据的列;将所有列按顺序添加到 list 中,用于从源数据中获取对应列的数据

parseUploadData 方法

用目标表列顺序,从数据中获取对应列的值,使用列分隔符拼接数据,最后基于操作类型拼接 0/1,删除拼接 1,其他类型拼接 0

doHttp 方法

用 http 的方式往 StarRocks 中写数据,没什么特别的,忽略

import org.apache.commons.codec.binary.Base64;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.http.HttpException;
import org.apache.http.HttpHeaders;
import org.apache.http.HttpRequest;
import org.apache.http.HttpRequestInterceptor;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.DefaultRedirectStrategy;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.protocol.HTTP;
import org.apache.http.protocol.HttpContext;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.sql.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class StarRocksSink extends RichSinkFunction<List<CdcRecord>> {
    private final static Logger LOG = LoggerFactory.getLogger(StarRocksSink.class);
    public final static String COL_SEP = "\\\\x01";
    public final static String ROW_SEP = "\\\\x02";
    public final static String NULL_COL = "\\N";
    private String ip;
    private String port;
    private String loadPort;
    private String user;
    private String pass;
    private String db;
    private Connection connection;
    private Map<String, String> spliceColumnMap = new HashMap<>();
    private Map<String, List<String>> columnMap = new HashMap<>();
    public StarRocksSink() {
    }
    public StarRocksSink(String ip, String port, String loadPort, String user, String pass, String db) {
        this.ip = ip;
        this.port = port;
        this.loadPort = loadPort;
        this.user = user;
        this.pass = pass;
        this.db = db;
    }
    @Override
    public void open(Configuration parameters) throws Exception {
        reConnect();
    }
    @Override
    public void invoke(List<CdcRecord> element, Context context) throws Exception {
        LOG.info("write batch size: " + element.size());
        if(element == null || element.size() ==0){
            LOG.info("ignore empty element");
            return;
        }
        // use StarRocks db name
//        String db = cache.get(0).getDb();
        String table = element.get(0).getTable();
        String key = db + "_" + table;
        // get table schema
        List<String> columnList;
        if (!columnMap.containsKey(key)) {
            // db.table is first coming, load column, put to spliceColumnMap & columnMap
            loadTargetTableSchema(key, db, table);
        }
        String columns = spliceColumnMap.get(key);
        columnList = columnMap.get(key);
        if (columnList.size() == 0) {
            LOG.info("{}.{} not exists in target starrocks, ingore data change", db, table);
        }
        // make up data
        String data = parseUploadData(element, columnList);
        final String loadUrl = String.format("http://%s:%s/api/%s/%s/_stream_load", ip, loadPort, db, table);
        String label = db + "_" + table + "_" + System.currentTimeMillis();
        // send data to starrocks
        doHttp(loadUrl, data, label, columns);
    }
    /**
     * http send data to starrocks
     */
    private void doHttp(String loadUrl, String data, String label, String columns) throws IOException, SQLException {
        final HttpClientBuilder httpClientBuilder = HttpClients
                .custom()
                .setRedirectStrategy(new DefaultRedirectStrategy() {
                    @Override
                    protected boolean isRedirectable(String method) {
                        return true;
                    }
                })
                .addInterceptorFirst(new ContentLengthHeaderRemover());
        try (CloseableHttpClient client = httpClientBuilder.build()) {
            HttpPut put = new HttpPut(loadUrl);
            StringEntity entity = new StringEntity(data, "UTF-8");
            put.setHeader(HttpHeaders.EXPECT, "100-continue");
            put.setHeader(HttpHeaders.AUTHORIZATION, basicAuthHeader(user, pass));
            // the label header is optional, not necessary
            // use label header can ensure at most once semantics
            put.setHeader("label", label);
            put.setHeader("columns", columns);
            put.setHeader("row_delimiter", ROW_SEP);
            put.setHeader("column_separator", COL_SEP);
            put.setEntity(entity);
            try (CloseableHttpResponse response = client.execute(put)) {
                String loadResult = "";
                if (response.getEntity() != null) {
                    loadResult = EntityUtils.toString(response.getEntity());
                }
                final int statusCode = response.getStatusLine().getStatusCode();
                // statusCode 200 just indicates that starrocks be service is ok, not stream load
                // you should see the output content to find whether stream load is success
                if (statusCode != 200) {
                    throw new IOException(
                            String.format("Stream load failed, statusCode=%s load result=%s", statusCode, loadResult));
                }
            }
        }
    }
    private String basicAuthHeader(String username, String password) {
        final String tobeEncode = username + ":" + password;
        byte[] encoded = Base64.encodeBase64(tobeEncode.getBytes(StandardCharsets.UTF_8));
        return "Basic " + new String(encoded);
    }
    private String parseUploadData(List<CdcRecord> cache, List<String> columnList) {
        StringBuilder builder = new StringBuilder();
        for (CdcRecord element : cache) {
            Map<String, String> data = element.getData();
            for (String column : columnList) {
                if (data.containsKey(column)) {
                    builder.append(data.get(column)).append(COL_SEP);
                } else {
                    // if target column not exists in source data, set as null
                    builder.append(NULL_COL).append(COL_SEP);
                }
            }
            // add __op
            if ("d".equals(element.getOp())) {
                // delete
                builder.append("1");
            } else {
                // upsert
                builder.append("0");
            }
            // add row separator
            builder.append(ROW_SEP);
        }
        // remove last row sep
        builder = builder.delete(builder.length() - 5, builder.length());
        String data = builder.toString();
        return data;
    }
    /**
     * load table schema, parse to http column and column list for load source data
     */
    private void loadTargetTableSchema(String key, String db, String table) throws SQLException {
        List<String> columnList = new ArrayList<>();
        StringBuilder builer = new StringBuilder();
        try {
            // load table schema
            PreparedStatement insertPS = connection.prepareStatement("desc " + db + "." + table);
            ResultSet result = insertPS.executeQuery();
            while (result.next()) {
                String column = result.getString(1);
                builer.append(column).append(",");
                columnList.add(column);
            }
        } catch (SQLException e) {
            LOG.warn("load {}.{} schema error. {}", db, table, e.getStackTrace());
        }
        builer.append("__op");
        String columns = builer.toString();
        spliceColumnMap.put(key, columns);
        columnMap.put(key, columnList);
    }
    /**
     * reconnect to starrocks
     *
     * @throws SQLException
     */
    private void reConnect() throws SQLException {
        String driver = "jdbc:mysql://" + ip + ":" + port;
        if (connection == null || connection.isClosed()) {
            connection = DriverManager.getConnection(driver, user, pass);
        }
    }
    @Override
    public void finish() throws Exception {
        LOG.info("finish");
    }
    @Override
    public void close() throws Exception {
        LOG.info("close...");
        connection.close();
    }
    private static class ContentLengthHeaderRemover implements HttpRequestInterceptor {
        @Override
        public void process(HttpRequest request, HttpContext context) throws HttpException, IOException {
            // fighting org.apache.http.protocol.RequestContent's ProtocolException("Content-Length header already present");
            request.removeHeaders(HTTP.CONTENT_LEN);
        }
    }
}

局限性:

  1. 还未实现 starrocks 端表结构跟随 源端表结构同步变更
  2. 为了保证效率,仅会在每一个表第一次来的时候判断目标段是否存在该表,如果已经判定该表不存在,后续直接忽略该表的数据变更
  3. 部分不导入的表,只在sink 的时候做了过滤,前面的操作还是要继续,可以考虑在 反序列化活map中过滤掉目标库中不存在的表数据


flink cdc到table store

package name.lijiaqi;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class MysqlToHudiExample {
    public static void main(String[] args) throws Exception {
        EnvironmentSettings fsSettings = EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, fsSettings);
        tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
        // 数据源表
    String sourceDDL =
            "CREATE TEMPORARY TABLE ods_lineitem (\n" +
            " l_orderkey INT NOT NULL,\n" +
            " l_partkey INT NOT NULL,\n" +
            " l_suppkey INT NOT NULL,\n" +
            " l_linenumber INT NOT NULL,\n" +
            " l_quantity DECIMAL(15, 2) NOT NULL,\n" +
            " l_extendedprice DECIMAL(15, 2) NOT NULL,\n" +
            " l_discount DECIMAL(15, 2) NOT NULL,\n" +
            " l_tax DECIMAL(15, 2) NOT NULL,\n" +
            " l_returnflag CHAR(1) NOT NULL,\n" +
            " l_linestatus CHAR(1) NOT NULL,\n" +
            " l_shipdate DATE NOT NULL,\n" +
            " l_commitdate DATE NOT NULL,\n" +
            " l_receiptdate DATE NOT NULL,\n" +
            " l_shipinstruct CHAR(25) NOT NULL,\n" +
            " l_shipmode CHAR(10) NOT NULL,\n" +
            " l_comment VARCHAR(44) NOT NULL,\n" +
            " PRIMARY KEY (l_orderkey, l_linenumber) NOT ENFORCED \n" +
            ") WITH (\n" +
            "  'connector' = 'mysql-cdc',\n" +
            "  'hostname' = '127.0.0.1',\n" + -- 如果想使用 host,可以修改宿主机 /etc/hosts 加入 127.0.0.1 mysql.docker.internal
            "  'port' = '3307',\n" +
            "  'username' = 'flink',\n" +
            "  'password' = 'flink',\n" +
            "  'database-name' = 'tpch_s10',\n" +
            "  'table-name' = 'lineitem' \n" +
            ");"  
    String tableSourceDDL=
      "CREATE CATALOG `table_store` WITH (\n" +
      "'type' = 'table-store',\n" +
      "'warehouse' = '/tmp/table-store-101'\n" +
      ");\n" +
      "USE CATALOG `table_store`;"
        // 输出目标表
        String sinkDWDDDL =
            "CREATE TABLE IF NOT EXISTS dwd_lineitem (\n" +
        " l_orderkey INT NOT NULL,\n" +
        " l_partkey INT NOT NULL,\n" +
        " l_suppkey INT NOT NULL,\n" +
        " l_linenumber INT NOT NULL,\n" +
        " l_quantity DECIMAL(15, 2) NOT NULL,\n" +
        " l_extendedprice DECIMAL(15, 2) NOT NULL,\n" +
        " l_discount DECIMAL(15, 2) NOT NULL,\n" +
        " l_tax DECIMAL(15, 2) NOT NULL,\n" +
        " l_returnflag CHAR(1) NOT NULL,\n" +
        " l_linestatus CHAR(1) NOT NULL,\n" +
        " l_shipdate DATE NOT NULL,\n" +
        " l_commitdate DATE NOT NULL,\n" +
        " l_receiptdate DATE NOT NULL,\n" +
        " l_shipinstruct CHAR(25) NOT NULL,\n" +
        " l_shipmode CHAR(10) NOT NULL,\n" +
        " l_comment VARCHAR(44) NOT NULL,\n" +
        " l_year BIGINT NOT NULL,\n" +
        " l_month BIGINT NOT NULL,\n" +
        " PRIMARY KEY (l_orderkey, l_linenumber, l_year, l_month) NOT ENFORCED \n" +
        " ) PARTITIONED BY (l_year, l_month) WITH (
        " -- 每个 partition 下设置 2 个 bucket
        " 'bucket' = '2',\n" +
        " -- 设置 changelog-producer 为 'input',这会使得上游 CDC Source 不丢弃 update_before,并且下游消费 dwd_lineitem 时没有 changelog-normalize 节点
        " 'changelog-producer' = 'input' \n" +
        " );"
         // 输出目标表
        String sinkADSDDL =
      "CREATE  TABLE IF NOT EXISTS ads_pricing_summary (\n" +
      "l_returnflag CHAR(1) NOT  NULL,\n" +
      "l_linestatus CHAR(1) NOT  NULL,\n" +
      "sum_quantity DOUBLE  NOT NULL,\n" +
      "sum_base_price DOUBLE  NOT NULL,\n" +
      "sum_discount_price DOUBLE  NOT NULL,\n" +
      "sum_charge_vat_inclusive DOUBLE  NOT NULL,\n" +
      "avg_quantity DOUBLE  NOT NULL,\n" +
      "avg_base_price DOUBLE  NOT NULL,\n" +
      "avg_discount DOUBLE  NOT NULL,\n" +
      "count_order BIGINT  NOT NULL \n" +
      ") WITH ( \n" +
      "'bucket' = '2'\n" +
      ");"
        // 简单的聚合处理
        String transformSQL1 =
                "INSERT  INTO dwd_lineitem SELECT l_orderkey,l_partkey,l_suppkey,l_linenumber,l_quantity,l_extendedprice,l_discount,\n" +
                 "l_tax,l_returnflag,l_linestatus,l_shipdate,l_commitdate,l_receiptdate,l_shipinstruct,l_shipmode,l_comment,\n" +
          "YEAR(l_shipdate) AS l_year,MONTH(l_shipdate) AS l_month FROM ods_lineitem;"
    String transformSQL2=
               "INSERT  INTO ads_pricing_summary SELECT l_returnflag,l_linestatus,SUM(l_quantity) AS sum_quantity,\n" +
               "SUM(l_extendedprice) AS sum_base_price,SUM(l_extendedprice * (1-l_discount)) AS sum_discount_price,\n" + 
                "SUM(l_extendedprice * (1-l_discount) * (1+l_tax)) AS sum_charge_vat_inclusive,AVG(l_quantity) AS avg_quantity,\n" + 
                "AVG(l_extendedprice) AS avg_base_price,AVG(l_discount) AS avg_discount,COUNT(*) AS count_order FROM dwd_lineitem \n" + 
                "WHERE (l_year < 1998 OR (l_year = 1998 AND l_month<= 9)) AND l_shipdate <= DATE '1998-12-01' - INTERVAL '90' DAY \n" + 
                "GROUP BY  l_returnflag,l_linestatus;"    
        // 插入hudi表
        tableEnv.executeSql(tableSourceDDL);
        tableEnv.executeSql(sourceDDL);
        tableEnv.executeSql(sinkDWDDDL);
        tableEnv.executeSql(sinkADSDDL);
        TableResult result = tableEnv.executeSql(transformSQL1);
        TableResult result = tableEnv.executeSql(transformSQL2);
        env.execute("mysql-to-tableSource");
    }
}

详细请参阅:

https://github.com/LadyForest/flink-table-store-101/blob/master/real-time-update/README.zh.md


相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
相关文章
|
2月前
|
SQL 消息中间件 关系型数据库
Apache Doris Flink Connector 24.0.0 版本正式发布
该版本新增了对 Flink 1.20 的支持,并支持通过 Arrow Flight SQL 高速读取 Doris 中数据。
|
3天前
|
消息中间件 资源调度 关系型数据库
如何在Flink on YARN环境中配置Debezium CDC 3.0,以实现实时捕获数据库变更事件并将其传输到Flink进行处理
本文介绍了如何在Flink on YARN环境中配置Debezium CDC 3.0,以实现实时捕获数据库变更事件并将其传输到Flink进行处理。主要内容包括安装Debezium、配置Kafka Connect、创建Flink任务以及启动任务的具体步骤,为构建实时数据管道提供了详细指导。
21 9
|
2月前
|
算法 API Apache
Flink CDC:新一代实时数据集成框架
本文源自阿里云实时计算团队 Apache Flink Committer 任庆盛在 Apache Asia CommunityOverCode 2024 的分享,涵盖 Flink CDC 的概念、版本历程、内部实现及社区未来规划。Flink CDC 是一种基于数据库日志的 CDC 技术实现的数据集成框架,能高效完成全量和增量数据的实时同步。自 2020 年以来,Flink CDC 经过多次迭代,已成为功能强大的实时数据集成工具,支持多种数据库和数据湖仓系统。未来将进一步扩展生态并提升稳定性。
551 1
Flink CDC:新一代实时数据集成框架
|
2月前
|
消息中间件 canal 数据采集
Flink CDC 在货拉拉的落地与实践
陈政羽在Apache Asia Community Over Code 2024上分享了《货拉拉在Flink CDC生产实践落地》。文章介绍了货拉拉业务背景、技术选型及其在实时数据采集中的挑战与解决方案,详细阐述了Flink CDC的技术优势及在稳定性、兼容性等方面的应用成果。通过实际案例展示了Flink CDC在提升数据采集效率、降低延迟等方面的显著成效,并展望了未来发展方向。
524 14
Flink CDC 在货拉拉的落地与实践
|
3月前
|
Oracle 关系型数据库 新能源
Flink CDC 在新能源制造业的实践
本文撰写自某新能源企业的研发工程师 单葛尧 老师。本文详细介绍该新能源企业的大数据平台中 CDC 技术架构选型和 Flink CDC 的最佳实践。
441 13
Flink CDC 在新能源制造业的实践
|
3月前
|
SQL 数据库 流计算
Flink CDC数据读取问题之一致性如何解决
Flink CDC 使用Change Data Capture (CDC)技术从数据库捕获变更事件,并利用Flink的流处理能力确保数据读取一致性。相较于传统工具,它具备全增量一体化数据集成能力,满足实时性需求。在实践中解决了高效数据同步、稳定同步大量表数据等问题。应用场景包括实时数据同步、实时数据集成等。快速上手需学习基本概念与实践操作。未来发展方向包括提升效率与稳定性,并依据用户需求持续优化。
114 1
|
12月前
|
SQL 关系型数据库 MySQL
Flink教程(16)- Flink Table与SQL
Flink教程(16)- Flink Table与SQL
308 0
|
6月前
|
SQL Apache 流计算
Flink table&SQL 的使用
Flink table&SQL 的使用
53 0
|
SQL IDE Java
【Flink】(十)Flink Table API 和 Flink SQL 入门
【Flink】(十)Flink Table API 和 Flink SQL 入门
323 0
|
SQL API 流计算
Flink-Table-&-SQL
简介 Apache Flink具有两个关系API - 表API和SQL - 用于统一流和批处理。Table API是Scala和Java的语言集成查询API,允许以非常直观的方式组合来自关系运算符的查询,Table API和SQL接口彼此紧密集成,以及Flink的DataStream和DataSet API。
1180 0

热门文章

最新文章