【万字长文】Flink cdc源码精讲(推荐收藏)(四)

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
实时计算 Flink 版,1000CU*H 3个月
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
简介: 【万字长文】Flink cdc源码精讲(推荐收藏)

我们进入fetcherTask中,只看task逻辑即可

class FetcherTask{
 @Override
    public boolean run() throws IOException {
        try {
            if (!isWakenUp() && lastRecords == null) {
              // 返回的是该对象 public final class MySqlRecords implements RecordsWithSplitIds<SourceRecord>
              // 调用了我们在创建fetcherTask的时候传入的splitReader对象,实际上还是调用reader的fetch方法来真正的获取数据
                lastRecords = splitReader.fetch();
            }
            if (!isWakenUp()) {
                // The order matters here. We must first put the last records into the queue.
                // This ensures the handling of the fetched records is atomic to wakeup.
               // 将读取的数据放入到队列汇总
                if (elementsQueue.put(fetcherIndex, lastRecords)) {
                    if (!lastRecords.finishedSplits().isEmpty()) {
                        // The callback does not throw InterruptedException.
                        splitFinishedCallback.accept(lastRecords.finishedSplits());
                    }
                    lastRecords = null;
                }
            }
        } catch (InterruptedException e) {
            throw new IOException("Source fetch execution was interrupted", e);
            if (isWakenUp()) {
                wakeup = false;
            }
        }
        return true;
    }
}

通过上面我们基本上已经清楚了在flink层面是怎么最终调用了cdc读取数据的代码,现在我们根据主要的读取代码看看是怎么样子实现的

currentReader.pollSplitRecords() ,我们简单介绍一下currentReader(BinlogSplitReader/SnapshotSplitReader)主要两种实现,大概的思路这里面会根据不同的性质区分进行读取数据,在submitSplit的时候会创建readRask读取指定split的数据,结果会放入StatefulTaskContext的queue中,在fetch方法会先提交split,让其执行read数据,然后通过pollSplitRecords方法在调用queue.poll拉取数据,这是一个阻塞的操作,如果超时则抛出中断异常

public class MySqlSplitReader implements SplitReader<SourceRecord, MySqlSplit> {
    private final Queue<MySqlSplit> splits;
    private final MySqlSourceConfig sourceConfig;
    private final int subtaskId;
    @Nullable private DebeziumReader<SourceRecord, MySqlSplit> currentReader;
    @Nullable private String currentSplitId;
    @Override
    public RecordsWithSplitIds<SourceRecord> fetch() throws IOException {
        // 执行fetch的时候提前检查一下currentReader,并根据不同的split创建不同的对应的reader,binlog/snapshot
        checkSplitOrStartNext();
        Iterator<SourceRecord> dataIt = null;
        try {
           // 调用具体的debeziumReader执行任务
           // 在reader中会调用StatefulTaskContext的queue的poll方法拉取数据,该方法会阻塞(也可以根据时间阻塞),如果时间间隔内没有返回数据则被中断,抛出InterruptedException
            dataIt = currentReader.pollSplitRecords(); 
        } catch (InterruptedException e) {
            LOG.warn("fetch data failed.", e);
            throw new IOException(e);
        }
        return dataIt == null
                ? finishedSnapshotSplit() // 如果没有读取到数据则返回一个空的,该方法执行后会将currentSplitId置位null,表示已经该split执行完成
                : MySqlRecords.forRecords(currentSplitId, dataIt);
    }
    @Override
    public void handleSplitsChanges(SplitsChange<MySqlSplit> splitsChanges) {
        if (!(splitsChanges instanceof SplitsAddition)) {
            throw new UnsupportedOperationException(
                    String.format(
                            "The SplitChange type of %s is not supported.",
                            splitsChanges.getClass()));
        }
        LOG.debug("Handling split change {}", splitsChanges);
        splits.addAll(splitsChanges.splits());
    }
    private void checkSplitOrStartNext() throws IOException {
        // the binlog reader should keep alive
        if (currentReader instanceof BinlogSplitReader) {
            return;
        }
        if (canAssignNextSplit()) {
            final MySqlSplit nextSplit = splits.poll();
            if (nextSplit == null) {
                throw new IOException("Cannot fetch from another split - no split remaining");
            }
            currentSplitId = nextSplit.splitId();
            if (nextSplit.isSnapshotSplit()) {
                if (currentReader == null) {
                    final MySqlConnection jdbcConnection =
                            createMySqlConnection(sourceConfig.getDbzConfiguration());
                    final BinaryLogClient binaryLogClient =
                            createBinaryClient(sourceConfig.getDbzConfiguration());
                    final StatefulTaskContext statefulTaskContext =
                            new StatefulTaskContext(sourceConfig, binaryLogClient, jdbcConnection);
                    currentReader = new SnapshotSplitReader(statefulTaskContext, subtaskId);
                }
            } else {
                // point from snapshot split to binlog split
                if (currentReader != null) {
                    LOG.info("It's turn to read binlog split, close current snapshot reader");
                    currentReader.close();
                }
                final MySqlConnection jdbcConnection =
                        createMySqlConnection(sourceConfig.getDbzConfiguration());
                final BinaryLogClient binaryLogClient =
                        createBinaryClient(sourceConfig.getDbzConfiguration());
                final StatefulTaskContext statefulTaskContext =
                        new StatefulTaskContext(sourceConfig, binaryLogClient, jdbcConnection);
                currentReader = new BinlogSplitReader(statefulTaskContext, subtaskId);
                LOG.info("BinlogSplitReader is created.");
            }
           // 提交一个split到reader,reader会在在submitSplit方法创建ReadTask对象,进行读取数据,将数据放入StatefulTaskContext的queue中,readTask放入线程池执行任务
            currentReader.submitSplit(nextSplit);
        }
    }
    private boolean canAssignNextSplit() {
        return currentReader == null || currentReader.isFinished();
    }
}


上面基本运行流程已经走通了现在就差实际读取数据的阶段了,现在我们直接跟着代码一点一点走 看看实际的执行逻辑是什么样子的,怎么读取数据的,由于cdc的代码比较多,我们就过滤掉了,binlogReadTask太繁琐,我们就不一步一步讲解了,后面可以简单介绍一下流程看看

640.png


我们一次看看对于reader对应的处理逻辑

1.checkSplitWithSplitIds方法

在这个方法中最主要的是调用了submitSplit开始我们下面的读取数据的一个流程

// -------------------------   SnapshotSplitReader.submitSplit方法  ------------------------------------------
public void submitSplit(MySqlSplit mySqlSplit) {
        this.currentSnapshotSplit = mySqlSplit.asSnapshotSplit();
        statefulTaskContext.configure(currentSnapshotSplit);
     // 拿到context的queue,在pollSplitSrecords的时候需要
        this.queue = statefulTaskContext.getQueue();
        this.nameAdjuster = statefulTaskContext.getSchemaNameAdjuster();
        this.hasNextElement.set(true);
        this.reachEnd.set(false);
     // 主要读取逻辑在readTask中
        this.splitSnapshotReadTask =
                new MySqlSnapshotSplitReadTask(
                        statefulTaskContext.getConnectorConfig(),
                        statefulTaskContext.getOffsetContext(),
                        statefulTaskContext.getSnapshotChangeEventSourceMetrics(),
                        statefulTaskContext.getDatabaseSchema(),
                        statefulTaskContext.getConnection(),
                        statefulTaskContext.getDispatcher(),
                        statefulTaskContext.getTopicSelector(),
                        StatefulTaskContext.getClock(),
                        currentSnapshotSplit);
     // 提交一个runnable到线程中,主要是执行readTask的execute方法
        executor.submit(
                () -> {
                    try {
                        currentTaskRunning = true;
                       // 自己实现的contextImpl 主要记录高水位和低水位用
                        final SnapshotSplitChangeEventSourceContextImpl sourceContext =
                                new SnapshotSplitChangeEventSourceContextImpl();
                       // 执行readTask
                        SnapshotResult snapshotResult =
                                splitSnapshotReadTask.execute(sourceContext);
                        final MySqlBinlogSplit backfillBinlogSplit =
                                createBackfillBinlogSplit(sourceContext);
                        // optimization that skip the binlog read when the low watermark equals high
                        // watermark
                       // 如由于snapshot是并行读取的,所以当该读取该split的数据,低水位和高水位相同,说明在read数据中没有出现其他操作,所以可以退出binlog优化阶段,可以认为该split范围的数据没有变更,不需要在snapshot之后进行binlog的read
                        final boolean binlogBackfillRequired =
                                backfillBinlogSplit
                                        .getEndingOffset()
                                        .isAfter(backfillBinlogSplit.getStartingOffset());
                        if (!binlogBackfillRequired) {
                            dispatchHighWatermark(backfillBinlogSplit);
                            currentTaskRunning = false;
                            return;
                        }
                        // snapshot执行完成后,开始binlogReadTask的读取操作
                        if (snapshotResult.isCompletedOrSkipped()) {
                           // 根据snapshot read task读取结束后,会记录高低水位,水位线作为参数构建binlog read task
                            final MySqlBinlogSplitReadTask backfillBinlogReadTask =
                                    createBackfillBinlogReadTask(backfillBinlogSplit);
                           // 执行binlog read task,由于里面的处理逻辑太复杂了,我们就不直接进行阅读了
                           // 我这里直接简单介绍一下流程,就是拿到snapshot的高水位,作为endOffset,在binlog read task中,会
                           // 以endOffset作为结束条件,小宇endOffset的数据都会被read,并发送下游
                            backfillBinlogReadTask.execute(
                                    new SnapshotBinlogSplitChangeEventSourceContextImpl());
                        } else {
                            readException =
                                    new IllegalStateException(
                                            String.format(
                                                    "Read snapshot for mysql split %s fail",
                                                    currentSnapshotSplit));
                        }
                    } catch (Exception e) {
                        currentTaskRunning = false;
                        LOG.error(
                                String.format(
                                        "Execute snapshot read task for mysql split %s fail",
                                        currentSnapshotSplit),
                                e);
                        readException = e;
                    }
                });
    }
// -------------------------   MySqlSnapshotSplitReadTask.execute(sourceContext)方法  ------------------------------------------
 @Override
    public SnapshotResult execute(ChangeEventSourceContext context) throws InterruptedException {
        SnapshottingTask snapshottingTask = getSnapshottingTask(previousOffset);//就是new了一个
        final SnapshotContext ctx;
        try {
            ctx = prepare(context); //重新new了一个 context对象,比较无用
        } catch (Exception e) {
            LOG.error("Failed to initialize snapshot context.", e);
            throw new RuntimeException(e);
        }
        try {
           // 上面都是无用代码,这里直接调用了doExecute方法,我们进入该方法看主要逻辑即可
            return doExecute(context, ctx, snapshottingTask);
        } catch (InterruptedException e) {
            LOG.warn("Snapshot was interrupted before completion");
            throw e;
        } catch (Exception t) {
            throw new DebeziumException(t);
        }
    }
// -------------------------   MySqlSnapshotSplitReadTask.doExecute(sourceContext)方法  ------------------------------------------
 @Override
    protected SnapshotResult doExecute(
            ChangeEventSourceContext context,
            SnapshotContext snapshotContext,
            SnapshottingTask snapshottingTask)
            throws Exception {
        final RelationalSnapshotChangeEventSource.RelationalSnapshotContext ctx =
                (RelationalSnapshotChangeEventSource.RelationalSnapshotContext) snapshotContext;
        ctx.offset = offsetContext;
       // 一个dispatcher,用于记录水位线事件,后面会通过该dispatcher发射数据,当然是通过emitter发射了
        final SignalEventDispatcher signalEventDispatcher =
                new SignalEventDispatcher(
                        offsetContext.getPartition(),
                        topicSelector.topicNameFor(snapshotSplit.getTableId()),
                        dispatcher.getQueue());
    // 其实log输出的日志就已经很清晰了
       // 记录低水位
        final BinlogOffset lowWatermark = currentBinlogOffset(jdbcConnection);
        LOG.info(
                "Snapshot step 1 - Determining low watermark {} for split {}",
                lowWatermark,
                snapshotSplit);
        ((SnapshotSplitReader.SnapshotSplitChangeEventSourceContextImpl) (context))
                .setLowWatermark(lowWatermark);
        signalEventDispatcher.dispatchWatermarkEvent(
                snapshotSplit, lowWatermark, SignalEventDispatcher.WatermarkKind.LOW);
        LOG.info("Snapshot step 2 - Snapshotting data");
       // 读取数据  主要方法重点介绍的地方
        createDataEvents(ctx, snapshotSplit.getTableId());
    // 记录高水位
        final BinlogOffset highWatermark = currentBinlogOffset(jdbcConnection);
        LOG.info(
                "Snapshot step 3 - Determining high watermark {} for split {}",
                highWatermark,
                snapshotSplit);
        signalEventDispatcher.dispatchWatermarkEvent(
                snapshotSplit, highWatermark, SignalEventDispatcher.WatermarkKind.HIGH);
        ((SnapshotSplitReader.SnapshotSplitChangeEventSourceContextImpl) (context))
                .setHighWatermark(highWatermark);
        return SnapshotResult.completed(ctx.offset);
    }
// 我们看看createDataEvents 调用过程
private void createDataEvents(
            RelationalSnapshotChangeEventSource.RelationalSnapshotContext snapshotContext,
            TableId tableId)
            throws Exception {
        EventDispatcher.SnapshotReceiver snapshotReceiver =
                dispatcher.getSnapshotChangeEventReceiver();
        LOG.debug("Snapshotting table {}", tableId);
        createDataEventsForTable(
                snapshotContext, snapshotReceiver, databaseSchema.tableFor(tableId));
     // receiver的逻辑我们就不看了,我这里介绍一下就好
     // receiver通过changeRecord方法接收到数据后,通过一个成员变量(bufferedEvent)控制,如果!=null加入队列,然后创建一个新的SourceRecord,直到所有的数据读取完成,所以说最后一条数据创建成功之后,如果没有新的数据了,则不会调用changeRecord该方法,也就是说成员变量记录了最后一个record
     // 这里调用completeSnapshot方法的时候会对bufferedEvent变量进行判断,如果不等于null做一些complete相关的工作最后加入队列中,如果不调用该方法,则当前split的snapshot阶段读取的数据少了一条,嘻嘻嘻
        snapshotReceiver.completeSnapshot();
    }
// createDataEvents中调用到本类的createDataEventsForTable,也就是开始了具体读取逻辑
    private void createDataEventsForTable(
            RelationalSnapshotChangeEventSource.RelationalSnapshotContext snapshotContext,
            EventDispatcher.SnapshotReceiver snapshotReceiver,
            Table table)
            throws InterruptedException {
        long exportStart = clock.currentTimeInMillis();
        LOG.info("Exporting data from split '{}' of table {}", snapshotSplit.splitId(), table.id());
       // 构建sql
        final String selectSql =
                StatementUtils.buildSplitScanQuery(
                        snapshotSplit.getTableId(),
                        snapshotSplit.getSplitKeyType(),
                        snapshotSplit.getSplitStart() == null,
                        snapshotSplit.getSplitEnd() == null);
        LOG.info(
                "For split '{}' of table {} using select statement: '{}'",
                snapshotSplit.splitId(),
                table.id(),
                selectSql);
        try (PreparedStatement selectStatement =
                        StatementUtils.readTableSplitDataStatement( // 创建statement,然后查询sql
                                jdbcConnection,
                                selectSql,
                                snapshotSplit.getSplitStart() == null,
                                snapshotSplit.getSplitEnd() == null, snapshotSplit.getSplitStart(),
                                snapshotSplit.getSplitEnd(),
                                snapshotSplit.getSplitKeyType().getFieldCount(),
                                connectorConfig.getQueryFetchSize());
             // 然后对查询出来的数据进行封装成sourceRecord发送下游
                ResultSet rs = selectStatement.executeQuery()) {
            ColumnUtils.ColumnArray columnArray = ColumnUtils.toArray(rs, table);
            long rows = 0;
            Threads.Timer logTimer = getTableScanLogTimer();
            while (rs.next()) {
                rows++;
                final Object[] row = new Object[columnArray.getGreatestColumnPosition()];
                for (int i = 0; i < columnArray.getColumns().length; i++) {
                    Column actualColumn = table.columns().get(i);
                    row[columnArray.getColumns()[i].position() - 1] =
                            readField(rs, i + 1, actualColumn, table);
                }
                if (logTimer.expired()) {
                    long stop = clock.currentTimeInMillis();
                    LOG.info(
                            "Exported {} records for split '{}' after {}",
                            rows,
                            snapshotSplit.splitId(),
                            Strings.duration(stop - exportStart));
                    snapshotProgressListener.rowsScanned(table.id(), rows);
                    logTimer = getTableScanLogTimer();
                }
                // 这里会将数据放入队列,通过receiver接收数据,然后再将数据放入其队列的一个过程,其实不必深入,就是封装的比较好,难以理解
                dispatcher.dispatchSnapshotEvent(
                        table.id(),
                        getChangeRecordEmitter(snapshotContext, table.id(), row),// 就是new了一个
                        snapshotReceiver);
            }
            LOG.info(
                    "Finished exporting {} records for split '{}', total duration '{}'",
                    rows,
                    snapshotSplit.splitId(),
                    Strings.duration(clock.currentTimeInMillis() - exportStart));
        } catch (SQLException e) {
            throw new ConnectException("Snapshotting of table " + table.id() + " failed", e);
        }
    }
// -------------------------   dispatcher.dispatchSnapshotEvent方法之后的流程  ----------------------------------
 // 进入evnentDisptcher.dispatchSnapshotEvent方法
   public void dispatchSnapshotEvent(T dataCollectionId, ChangeRecordEmitter changeRecordEmitter, SnapshotReceiver receiver) throws InterruptedException {
        DataCollectionSchema dataCollectionSchema = schema.schemaFor(dataCollectionId);
        if (dataCollectionSchema == null) {
            errorOnMissingSchema(dataCollectionId, changeRecordEmitter);
        }
        changeRecordEmitter.emitChangeRecords(dataCollectionSchema, new Receiver() {
            @Override
            public void changeRecord(DataCollectionSchema schema,
                                     Operation operation,
                                     Object key, Struct value,
                                     OffsetContext offset,
                                     ConnectHeaders headers)
                    throws InterruptedException {
                eventListener.onEvent(dataCollectionSchema.id(), offset, key, value);
               // 真正的放入队列的逻辑在这里调用
               // receiver使我们传入的  对应BufferingSnapshotChangeRecordReceiver类
                receiver.changeRecord(dataCollectionSchema, operation, key, value, offset, headers);
            }
        });
    }
  // BufferingSnapshotChangeRecordReceiver的changeRecord方法
 // 前面简单介绍过他的处理逻辑了,就不必多做介绍了
  @Override
        public void changeRecord(DataCollectionSchema dataCollectionSchema,
                                 Operation operation,
                                 Object key, Struct value,
                                 OffsetContext offsetContext,
                                 ConnectHeaders headers)
                throws InterruptedException {
            Objects.requireNonNull(value, "value must not be null");
            LOGGER.trace("Received change record for {} operation on key {}", operation, key);
            if (bufferedEvent != null) {
                queue.enqueue(bufferedEvent.get());
            }
            Schema keySchema = dataCollectionSchema.keySchema();
            String topicName = topicSelector.topicNameFor((T) dataCollectionSchema.id());
            // the record is produced lazily, so to have the correct offset as per the pre/post completion callbacks
            bufferedEvent = () -> {
                SourceRecord record = new SourceRecord(
                        offsetContext.getPartition(),
                        offsetContext.getOffset(),
                        topicName, null,
                        keySchema, key,
                        dataCollectionSchema.getEnvelopeSchema().schema(), value,
                        null, headers);
                return changeEventCreator.createDataChangeEvent(record);
            };
        }
2.pollSplitRecords方法,这个方法是拉取queue中的数据

上面把数据读取后写入到queue的流程已经捋清楚了,我们现在看看reader是在什么时候读取了queue的数据

public Iterator<SourceRecord> pollSplitRecords() throws InterruptedException {
        checkReadException();
        if (hasNextElement.get()) {
            // data input: [low watermark event][snapshot events][high watermark event][binlog
            // events][binlog-end event]
            // data output: [low watermark event][normalized events][high watermark event]
            boolean reachBinlogEnd = false;
            final List<SourceRecord> sourceRecords = new ArrayList<>();
            while (!reachBinlogEnd) {
               // 可以看到这里直接queue.poll直接拉取数据即可,在这里会判断一下当前evnet是否是到达了结束的水位线,实际上就是高水位的位置,到达结束水位线之后,我们就可以停止了
                List<DataChangeEvent> batch = queue.poll();
                for (DataChangeEvent event : batch) {
                    sourceRecords.add(event.getRecord());
                    if (RecordUtils.isEndWatermarkEvent(event.getRecord())) {
                        reachBinlogEnd = true;
                        break;
                    }
                }
            }
            // snapshot split return its data once
            hasNextElement.set(false);
            return normalizedSplitRecords(currentSnapshotSplit, sourceRecords, nameAdjuster)
                    .iterator();
        }
        // the data has been polled, no more data
        reachEnd.compareAndSet(false, true);
        return null;
    }



通过上面的阅读我们已经看到了数据从读取到,再到放入队列中的一个过程,这里对队列做一个说明,在上面的介绍中存在两个队列,一个queue,一个elementQueue,这个的区别在于,queue是在读取数据阶段,将数据放入queue,在FetcherTask中调用了reader的fetch方法,将queue中的数据拉取到,并将其加入到elementQueue

在上面的操作中数据已经放入了elementQueue中,现在我们看看elementQueue中的数据是在什么时候发送到下游的


这里我们需要重新回到MysqlSource中通过一张图来看看

640.png

我们在创建Reader的时候传入了一个MysqlRecordEmitter,在后面发送数据的时候是通过这个类

对于发送数据到下游的逻辑是在MysqlSourceReader的父类(SourceReaderBase)中,但是发送的类是有Emitter完成的

由于相关方法是有上层调用执行,我们就不多看了,就简单说明一下,系统调用SourceReaderBase.pollNext(),开始触发数据collect的操作,将其发送至下游节点

640.png

我们这里直接阅读MysqlRecordEmitter源码看看他发送数据的逻辑,其实到这里跟sourceFucntion实现的原理基本差不多了我们这里简单过一下即可

public final class MySqlRecordEmitter<T> implements RecordEmitter<SourceRecord, T, MySqlSplitState> {
    private static final FlinkJsonTableChangeSerializer TABLE_CHANGE_SERIALIZER =
            new FlinkJsonTableChangeSerializer();
    private final DebeziumDeserializationSchema<T> debeziumDeserializationSchema;
    private final boolean includeSchemaChanges;
    private final OutputCollector<T> outputCollector;
    public MySqlRecordEmitter(
            DebeziumDeserializationSchema<T> debeziumDeserializationSchema,
            MySqlSourceReaderMetrics sourceReaderMetrics,
            boolean includeSchemaChanges) {
       // 对数据Deserialization的一个对象,与单并行度的类是同一个,具体内部逻辑可以自己看
        this.debeziumDeserializationSchema = debeziumDeserializationSchema;
        this.sourceReaderMetrics = sourceReaderMetrics;
        this.includeSchemaChanges = includeSchemaChanges;
        this.outputCollector = new OutputCollector<>();
    }
    @Override
    public void emitRecord(SourceRecord element, SourceOutput<T> output, MySqlSplitState splitState)
            throws Exception {
        // 判断一下消息的事件类型,如果是一个sourceReocrd则发送下游,否则对应其事件的相关操作
        if (isWatermarkEvent(element)) {
            BinlogOffset watermark = getWatermark(element);
            if (isHighWatermarkEvent(element) && splitState.isSnapshotSplitState()) {
                splitState.asSnapshotSplitState().setHighWatermark(watermark);
            }
        } else if (isSchemaChangeEvent(element) && splitState.isBinlogSplitState()) {
            HistoryRecord historyRecord = getHistoryRecord(element);
            Array tableChanges =
                    historyRecord.document().getArray(HistoryRecord.Fields.TABLE_CHANGES);
            TableChanges changes = TABLE_CHANGE_SERIALIZER.deserialize(tableChanges, true);
            for (TableChanges.TableChange tableChange : changes) {
                splitState.asBinlogSplitState().recordSchema(tableChange.getId(), tableChange);
            }
            if (includeSchemaChanges) {
                emitElement(element, output);
            }
        } else if (isDataChangeRecord(element)) {
            if (splitState.isBinlogSplitState()) {
                BinlogOffset position = getBinlogPosition(element);
                splitState.asBinlogSplitState().setStartingOffset(position);
            }
            reportMetrics(element);
            emitElement(element, output);
        } else {
            // unknown element
            LOG.info("Meet unknown element {}, just skip.", element);
        }
    }
    private void emitElement(SourceRecord element, SourceOutput<T> output) throws Exception {
        outputCollector.output = output;
       // 调用不同的Deserialization.deserialize方法完成数据的转换以及发送到下游
        debeziumDeserializationSchema.deserialize(element, outputCollector);
    }
    private static class OutputCollector<T> implements Collector<T> {
        private SourceOutput<T> output;
        @Override
        public void collect(T record) {
            output.collect(record);
        }
    }
}


四.题外话 <Table相关内容>

// RowData的DeserializeSchema,对应上面使用到的DeserializeSchema 做一个简单的介绍
public final class RowDataDebeziumDeserializeSchema
        implements DebeziumDeserializationSchema<RowData> {
      @Override
    public void deserialize(SourceRecord record, Collector<RowData> out) throws Exception {
        Envelope.Operation op = Envelope.operationFor(record);
        Struct value = (Struct) record.value();
        Schema valueSchema = record.valueSchema();
       // 针对不同的操作类型,我们需要器对应的数据  
       // after表示更改之后的数据结果  before表示更改之前的数据
       // 只有update的时候才需要同时使用 before和after(动动聪明脑袋瓜为啥呢)
       // 针对不同的操作使用RowKind进行表示,在sql层面会根据数据的标识来进行对应处理,比如insert,比如update操作
        // 所以对于table内容,我们只需要将数据转换成对应的RowData类型,并对其表示RowKind类型,框架便会帮我在sink的时候
       // 做出对应的操作,我们无需编写相关代码来实现
       // 所以 sql中使用 cdc 我们需要将其加上RowKind,对于后面的操作我们就无需关心了
       // 我们在sql中 formt格式是json,实际上走的逻辑这里我也没看,因为我不太关心,如果大家关心的话或者想了解的,大家可以自己去看看
       // 具体的实现方式
        if (op == Envelope.Operation.CREATE || op == Envelope.Operation.READ) {
           // 构建RowData的方法,我们有加进来,里面内容比较繁琐
           // 讲讲大概内容
           // 就是对于我们字段的类型,转换成对应的java中类型,当然这里面我说的是flink中,因为他对用到的类型都重新做了一层封装,这样才能支持我们db中的所有类型
            GenericRowData insert = extractAfterRow(value, valueSchema);
            validator.validate(insert, RowKind.INSERT);
            insert.setRowKind(RowKind.INSERT);
           // 具体的发送下游方法
            emit(record, insert, out);
        } else if (op == Envelope.Operation.DELETE) {
            GenericRowData delete = extractBeforeRow(value, valueSchema);
            validator.validate(delete, RowKind.DELETE);
            delete.setRowKind(RowKind.DELETE);
            emit(record, delete, out);
        } else {
            GenericRowData before = extractBeforeRow(value, valueSchema);
            validator.validate(before, RowKind.UPDATE_BEFORE);
            before.setRowKind(RowKind.UPDATE_BEFORE);
            emit(record, before, out);
            GenericRowData after = extractAfterRow(value, valueSchema);
            validator.validate(after, RowKind.UPDATE_AFTER);
            after.setRowKind(RowKind.UPDATE_AFTER);
            emit(record, after, out);
        }
    }
}

对于我们想实现一个tableSource的话,我们需要继承DynamicTableSourceFactory,实现下面的方法,然后通过spi的方法将其动态的加载


  • createDynamicTableSource  : 创建一个具体的source
  • factoryIdentifier : 可以认为是我们描述connector的名字,比如kafka
  • requiredOptions : with后面所必须要有参数,比如 username,password等,如果没有抛出异常
  • optionalOptions : with后面的配置项,这是可选择的可有可无

640.png


相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
3月前
|
SQL 关系型数据库 Apache
从 Flink 到 Doris 的实时数据写入实践 —— 基于 Flink CDC 构建更实时高效的数据集成链路
本文将深入解析 Flink-Doris-Connector 三大典型场景中的设计与实现,并结合 Flink CDC 详细介绍了整库同步的解决方案,助力构建更加高效、稳定的实时数据处理体系。
1757 0
从 Flink 到 Doris 的实时数据写入实践 —— 基于 Flink CDC 构建更实时高效的数据集成链路
|
6月前
|
数据采集 SQL canal
Amoro + Flink CDC 数据融合入湖新体验
本文总结了货拉拉高级大数据开发工程师陈政羽在Flink Forward Asia 2024上的分享,聚焦Flink CDC在货拉拉的应用与优化。内容涵盖CDC应用现状、数据入湖新体验、入湖优化及未来规划。文中详细分析了CDC在多业务场景中的实践,包括数据采集平台化、稳定性建设,以及面临的文件碎片化、Schema演进等挑战。同时介绍了基于Apache Amoro的湖仓融合架构,通过自优化服务解决小文件问题,提升数据新鲜度与读写平衡。未来将深化Paimon与Amoro的结合,打造更高效的入湖生态与自动化优化方案。
380 1
Amoro + Flink CDC 数据融合入湖新体验
|
6月前
|
SQL 关系型数据库 MySQL
Flink CDC 3.4 发布, 优化高频 DDL 处理,支持 Batch 模式,新增 Iceberg 支持
Apache Flink CDC 3.4.0 版本正式发布!经过4个月的开发,此版本强化了对高频表结构变更的支持,新增 batch 执行模式和 Apache Iceberg Sink 连接器,可将数据库数据全增量实时写入 Iceberg 数据湖。51位贡献者完成了259次代码提交,优化了 MySQL、MongoDB 等连接器,并修复多个缺陷。未来 3.5 版本将聚焦脏数据处理、数据限流等能力及 AI 生态对接。欢迎下载体验并提出反馈!
1187 1
Flink CDC 3.4 发布, 优化高频 DDL 处理,支持 Batch 模式,新增 Iceberg 支持
|
7月前
|
SQL API Apache
Dinky 和 Flink CDC 在实时整库同步的探索之路
本次分享围绕 Dinky 的整库同步技术演进,从传统数据集成方案的痛点出发,探讨了 Flink CDC Yaml 作业的探索历程。内容分为三个部分:起源、探索、未来。在起源部分,分析了传统数据集成方案中全量与增量割裂、时效性低等问题,引出 Flink CDC 的优势;探索部分详细对比了 Dinky CDC Source 和 Flink CDC Pipeline 的架构与能力,深入讲解了 YAML 作业的细节,如模式演变、数据转换等;未来部分则展望了 Dinky 对 Flink CDC 的支持与优化方向,包括 Pipeline 转换功能、Transform 扩展及实时湖仓治理等。
892 12
Dinky 和 Flink CDC 在实时整库同步的探索之路
|
5月前
|
消息中间件 SQL 关系型数据库
Flink CDC + Kafka 加速业务实时化
Flink CDC 是一种支持流批一体的分布式数据集成工具,通过 YAML 配置实现数据传输过程中的路由与转换操作。它已从单一数据源的 CDC 数据流发展为完整的数据同步解决方案,支持 MySQL、Kafka 等多种数据源和目标端(如 Delta Lake、Iceberg)。其核心功能包括多样化数据输入链路、Schema Evolution、Transform 和 Routing 模块,以及丰富的监控指标。相比传统 SQL 和 DataStream 作业,Flink CDC 提供更灵活的 Schema 变更控制和原始 binlog 同步能力。
|
8月前
|
Oracle 关系型数据库 Java
【YashanDB知识库】Flink CDC实时同步Oracle数据到崖山
本文介绍通过Flink CDC实现Oracle数据实时同步至崖山数据库(YashanDB)的方法,支持全量与增量同步,并涵盖新增、修改和删除的DML操作。内容包括环境准备(如JDK、Flink版本等)、Oracle日志归档启用、用户权限配置、增量日志记录设置、元数据迁移、Flink安装与配置、生成Flink SQL文件、Streampark部署,以及创建和启动实时同步任务的具体步骤。适合需要跨数据库实时同步方案的技术人员参考。
【YashanDB知识库】Flink CDC实时同步Oracle数据到崖山
|
8月前
|
关系型数据库 MySQL 数据库
基于Flink CDC 开发,支持Web-UI的实时KingBase 连接器,三大模式无缝切换,效率翻倍!
TIS 是一款基于Web-UI的开源大数据集成工具,通过与人大金仓Kingbase的深度整合,提供高效、灵活的实时数据集成方案。它支持增量数据监听和实时写入,兼容MySQL、PostgreSQL和Oracle模式,无需编写复杂脚本,操作简单直观,特别适合非专业开发人员使用。TIS率先实现了Kingbase CDC连接器的整合,成为业界首个开箱即用的Kingbase CDC数据同步解决方案,助力企业数字化转型。
1794 5
基于Flink CDC 开发,支持Web-UI的实时KingBase 连接器,三大模式无缝切换,效率翻倍!
|
8月前
|
存储 SQL Java
Flink CDC + Hologres高性能数据同步优化实践
本文整理自阿里云高级技术专家胡一博老师在Flink Forward Asia 2024数据集成(二)专场的分享,主要内容包括:1. Hologres介绍:实时数据仓库,支持毫秒级写入和高QPS查询;2. 写入优化:通过改进缓冲队列、连接池和COPY模式提高吞吐量和降低延迟;3. 消费优化:优化离线场景和分区表的消费逻辑,提升性能和资源利用率;4. 未来展望:进一步简化用户操作,支持更多DDL操作及全增量消费。Hologres 3.0全新升级为一体化实时湖仓平台,提供多项新功能并降低使用成本。
671 1
Flink CDC + Hologres高性能数据同步优化实践
|
8月前
|
分布式计算 关系型数据库 MySQL
Flink CDC 3.3.0 发布公告
Flink CDC 3.3.0 发布公告
371 14
|
8月前
|
SQL 弹性计算 DataWorks
Flink CDC 在阿里云 DataWorks 数据集成入湖场景的应用实践
Flink CDC 在阿里云 DataWorks 数据集成入湖场景的应用实践
406 6