【万字长文】Flink cdc源码精讲(推荐收藏)(四)

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
实时计算 Flink 版,5000CU*H 3个月
云数据库 RDS MySQL,集群版 2核4GB 100GB
推荐场景:
搭建个人博客
简介: 【万字长文】Flink cdc源码精讲(推荐收藏)

我们进入fetcherTask中,只看task逻辑即可

class FetcherTask{
 @Override
    public boolean run() throws IOException {
        try {
            if (!isWakenUp() && lastRecords == null) {
              // 返回的是该对象 public final class MySqlRecords implements RecordsWithSplitIds<SourceRecord>
              // 调用了我们在创建fetcherTask的时候传入的splitReader对象,实际上还是调用reader的fetch方法来真正的获取数据
                lastRecords = splitReader.fetch();
            }
            if (!isWakenUp()) {
                // The order matters here. We must first put the last records into the queue.
                // This ensures the handling of the fetched records is atomic to wakeup.
               // 将读取的数据放入到队列汇总
                if (elementsQueue.put(fetcherIndex, lastRecords)) {
                    if (!lastRecords.finishedSplits().isEmpty()) {
                        // The callback does not throw InterruptedException.
                        splitFinishedCallback.accept(lastRecords.finishedSplits());
                    }
                    lastRecords = null;
                }
            }
        } catch (InterruptedException e) {
            throw new IOException("Source fetch execution was interrupted", e);
            if (isWakenUp()) {
                wakeup = false;
            }
        }
        return true;
    }
}

通过上面我们基本上已经清楚了在flink层面是怎么最终调用了cdc读取数据的代码,现在我们根据主要的读取代码看看是怎么样子实现的

currentReader.pollSplitRecords() ,我们简单介绍一下currentReader(BinlogSplitReader/SnapshotSplitReader)主要两种实现,大概的思路这里面会根据不同的性质区分进行读取数据,在submitSplit的时候会创建readRask读取指定split的数据,结果会放入StatefulTaskContext的queue中,在fetch方法会先提交split,让其执行read数据,然后通过pollSplitRecords方法在调用queue.poll拉取数据,这是一个阻塞的操作,如果超时则抛出中断异常

public class MySqlSplitReader implements SplitReader<SourceRecord, MySqlSplit> {
    private final Queue<MySqlSplit> splits;
    private final MySqlSourceConfig sourceConfig;
    private final int subtaskId;
    @Nullable private DebeziumReader<SourceRecord, MySqlSplit> currentReader;
    @Nullable private String currentSplitId;
    @Override
    public RecordsWithSplitIds<SourceRecord> fetch() throws IOException {
        // 执行fetch的时候提前检查一下currentReader,并根据不同的split创建不同的对应的reader,binlog/snapshot
        checkSplitOrStartNext();
        Iterator<SourceRecord> dataIt = null;
        try {
           // 调用具体的debeziumReader执行任务
           // 在reader中会调用StatefulTaskContext的queue的poll方法拉取数据,该方法会阻塞(也可以根据时间阻塞),如果时间间隔内没有返回数据则被中断,抛出InterruptedException
            dataIt = currentReader.pollSplitRecords(); 
        } catch (InterruptedException e) {
            LOG.warn("fetch data failed.", e);
            throw new IOException(e);
        }
        return dataIt == null
                ? finishedSnapshotSplit() // 如果没有读取到数据则返回一个空的,该方法执行后会将currentSplitId置位null,表示已经该split执行完成
                : MySqlRecords.forRecords(currentSplitId, dataIt);
    }
    @Override
    public void handleSplitsChanges(SplitsChange<MySqlSplit> splitsChanges) {
        if (!(splitsChanges instanceof SplitsAddition)) {
            throw new UnsupportedOperationException(
                    String.format(
                            "The SplitChange type of %s is not supported.",
                            splitsChanges.getClass()));
        }
        LOG.debug("Handling split change {}", splitsChanges);
        splits.addAll(splitsChanges.splits());
    }
    private void checkSplitOrStartNext() throws IOException {
        // the binlog reader should keep alive
        if (currentReader instanceof BinlogSplitReader) {
            return;
        }
        if (canAssignNextSplit()) {
            final MySqlSplit nextSplit = splits.poll();
            if (nextSplit == null) {
                throw new IOException("Cannot fetch from another split - no split remaining");
            }
            currentSplitId = nextSplit.splitId();
            if (nextSplit.isSnapshotSplit()) {
                if (currentReader == null) {
                    final MySqlConnection jdbcConnection =
                            createMySqlConnection(sourceConfig.getDbzConfiguration());
                    final BinaryLogClient binaryLogClient =
                            createBinaryClient(sourceConfig.getDbzConfiguration());
                    final StatefulTaskContext statefulTaskContext =
                            new StatefulTaskContext(sourceConfig, binaryLogClient, jdbcConnection);
                    currentReader = new SnapshotSplitReader(statefulTaskContext, subtaskId);
                }
            } else {
                // point from snapshot split to binlog split
                if (currentReader != null) {
                    LOG.info("It's turn to read binlog split, close current snapshot reader");
                    currentReader.close();
                }
                final MySqlConnection jdbcConnection =
                        createMySqlConnection(sourceConfig.getDbzConfiguration());
                final BinaryLogClient binaryLogClient =
                        createBinaryClient(sourceConfig.getDbzConfiguration());
                final StatefulTaskContext statefulTaskContext =
                        new StatefulTaskContext(sourceConfig, binaryLogClient, jdbcConnection);
                currentReader = new BinlogSplitReader(statefulTaskContext, subtaskId);
                LOG.info("BinlogSplitReader is created.");
            }
           // 提交一个split到reader,reader会在在submitSplit方法创建ReadTask对象,进行读取数据,将数据放入StatefulTaskContext的queue中,readTask放入线程池执行任务
            currentReader.submitSplit(nextSplit);
        }
    }
    private boolean canAssignNextSplit() {
        return currentReader == null || currentReader.isFinished();
    }
}


上面基本运行流程已经走通了现在就差实际读取数据的阶段了,现在我们直接跟着代码一点一点走 看看实际的执行逻辑是什么样子的,怎么读取数据的,由于cdc的代码比较多,我们就过滤掉了,binlogReadTask太繁琐,我们就不一步一步讲解了,后面可以简单介绍一下流程看看

640.png


我们一次看看对于reader对应的处理逻辑

1.checkSplitWithSplitIds方法

在这个方法中最主要的是调用了submitSplit开始我们下面的读取数据的一个流程

// -------------------------   SnapshotSplitReader.submitSplit方法  ------------------------------------------
public void submitSplit(MySqlSplit mySqlSplit) {
        this.currentSnapshotSplit = mySqlSplit.asSnapshotSplit();
        statefulTaskContext.configure(currentSnapshotSplit);
     // 拿到context的queue,在pollSplitSrecords的时候需要
        this.queue = statefulTaskContext.getQueue();
        this.nameAdjuster = statefulTaskContext.getSchemaNameAdjuster();
        this.hasNextElement.set(true);
        this.reachEnd.set(false);
     // 主要读取逻辑在readTask中
        this.splitSnapshotReadTask =
                new MySqlSnapshotSplitReadTask(
                        statefulTaskContext.getConnectorConfig(),
                        statefulTaskContext.getOffsetContext(),
                        statefulTaskContext.getSnapshotChangeEventSourceMetrics(),
                        statefulTaskContext.getDatabaseSchema(),
                        statefulTaskContext.getConnection(),
                        statefulTaskContext.getDispatcher(),
                        statefulTaskContext.getTopicSelector(),
                        StatefulTaskContext.getClock(),
                        currentSnapshotSplit);
     // 提交一个runnable到线程中,主要是执行readTask的execute方法
        executor.submit(
                () -> {
                    try {
                        currentTaskRunning = true;
                       // 自己实现的contextImpl 主要记录高水位和低水位用
                        final SnapshotSplitChangeEventSourceContextImpl sourceContext =
                                new SnapshotSplitChangeEventSourceContextImpl();
                       // 执行readTask
                        SnapshotResult snapshotResult =
                                splitSnapshotReadTask.execute(sourceContext);
                        final MySqlBinlogSplit backfillBinlogSplit =
                                createBackfillBinlogSplit(sourceContext);
                        // optimization that skip the binlog read when the low watermark equals high
                        // watermark
                       // 如由于snapshot是并行读取的,所以当该读取该split的数据,低水位和高水位相同,说明在read数据中没有出现其他操作,所以可以退出binlog优化阶段,可以认为该split范围的数据没有变更,不需要在snapshot之后进行binlog的read
                        final boolean binlogBackfillRequired =
                                backfillBinlogSplit
                                        .getEndingOffset()
                                        .isAfter(backfillBinlogSplit.getStartingOffset());
                        if (!binlogBackfillRequired) {
                            dispatchHighWatermark(backfillBinlogSplit);
                            currentTaskRunning = false;
                            return;
                        }
                        // snapshot执行完成后,开始binlogReadTask的读取操作
                        if (snapshotResult.isCompletedOrSkipped()) {
                           // 根据snapshot read task读取结束后,会记录高低水位,水位线作为参数构建binlog read task
                            final MySqlBinlogSplitReadTask backfillBinlogReadTask =
                                    createBackfillBinlogReadTask(backfillBinlogSplit);
                           // 执行binlog read task,由于里面的处理逻辑太复杂了,我们就不直接进行阅读了
                           // 我这里直接简单介绍一下流程,就是拿到snapshot的高水位,作为endOffset,在binlog read task中,会
                           // 以endOffset作为结束条件,小宇endOffset的数据都会被read,并发送下游
                            backfillBinlogReadTask.execute(
                                    new SnapshotBinlogSplitChangeEventSourceContextImpl());
                        } else {
                            readException =
                                    new IllegalStateException(
                                            String.format(
                                                    "Read snapshot for mysql split %s fail",
                                                    currentSnapshotSplit));
                        }
                    } catch (Exception e) {
                        currentTaskRunning = false;
                        LOG.error(
                                String.format(
                                        "Execute snapshot read task for mysql split %s fail",
                                        currentSnapshotSplit),
                                e);
                        readException = e;
                    }
                });
    }
// -------------------------   MySqlSnapshotSplitReadTask.execute(sourceContext)方法  ------------------------------------------
 @Override
    public SnapshotResult execute(ChangeEventSourceContext context) throws InterruptedException {
        SnapshottingTask snapshottingTask = getSnapshottingTask(previousOffset);//就是new了一个
        final SnapshotContext ctx;
        try {
            ctx = prepare(context); //重新new了一个 context对象,比较无用
        } catch (Exception e) {
            LOG.error("Failed to initialize snapshot context.", e);
            throw new RuntimeException(e);
        }
        try {
           // 上面都是无用代码,这里直接调用了doExecute方法,我们进入该方法看主要逻辑即可
            return doExecute(context, ctx, snapshottingTask);
        } catch (InterruptedException e) {
            LOG.warn("Snapshot was interrupted before completion");
            throw e;
        } catch (Exception t) {
            throw new DebeziumException(t);
        }
    }
// -------------------------   MySqlSnapshotSplitReadTask.doExecute(sourceContext)方法  ------------------------------------------
 @Override
    protected SnapshotResult doExecute(
            ChangeEventSourceContext context,
            SnapshotContext snapshotContext,
            SnapshottingTask snapshottingTask)
            throws Exception {
        final RelationalSnapshotChangeEventSource.RelationalSnapshotContext ctx =
                (RelationalSnapshotChangeEventSource.RelationalSnapshotContext) snapshotContext;
        ctx.offset = offsetContext;
       // 一个dispatcher,用于记录水位线事件,后面会通过该dispatcher发射数据,当然是通过emitter发射了
        final SignalEventDispatcher signalEventDispatcher =
                new SignalEventDispatcher(
                        offsetContext.getPartition(),
                        topicSelector.topicNameFor(snapshotSplit.getTableId()),
                        dispatcher.getQueue());
    // 其实log输出的日志就已经很清晰了
       // 记录低水位
        final BinlogOffset lowWatermark = currentBinlogOffset(jdbcConnection);
        LOG.info(
                "Snapshot step 1 - Determining low watermark {} for split {}",
                lowWatermark,
                snapshotSplit);
        ((SnapshotSplitReader.SnapshotSplitChangeEventSourceContextImpl) (context))
                .setLowWatermark(lowWatermark);
        signalEventDispatcher.dispatchWatermarkEvent(
                snapshotSplit, lowWatermark, SignalEventDispatcher.WatermarkKind.LOW);
        LOG.info("Snapshot step 2 - Snapshotting data");
       // 读取数据  主要方法重点介绍的地方
        createDataEvents(ctx, snapshotSplit.getTableId());
    // 记录高水位
        final BinlogOffset highWatermark = currentBinlogOffset(jdbcConnection);
        LOG.info(
                "Snapshot step 3 - Determining high watermark {} for split {}",
                highWatermark,
                snapshotSplit);
        signalEventDispatcher.dispatchWatermarkEvent(
                snapshotSplit, highWatermark, SignalEventDispatcher.WatermarkKind.HIGH);
        ((SnapshotSplitReader.SnapshotSplitChangeEventSourceContextImpl) (context))
                .setHighWatermark(highWatermark);
        return SnapshotResult.completed(ctx.offset);
    }
// 我们看看createDataEvents 调用过程
private void createDataEvents(
            RelationalSnapshotChangeEventSource.RelationalSnapshotContext snapshotContext,
            TableId tableId)
            throws Exception {
        EventDispatcher.SnapshotReceiver snapshotReceiver =
                dispatcher.getSnapshotChangeEventReceiver();
        LOG.debug("Snapshotting table {}", tableId);
        createDataEventsForTable(
                snapshotContext, snapshotReceiver, databaseSchema.tableFor(tableId));
     // receiver的逻辑我们就不看了,我这里介绍一下就好
     // receiver通过changeRecord方法接收到数据后,通过一个成员变量(bufferedEvent)控制,如果!=null加入队列,然后创建一个新的SourceRecord,直到所有的数据读取完成,所以说最后一条数据创建成功之后,如果没有新的数据了,则不会调用changeRecord该方法,也就是说成员变量记录了最后一个record
     // 这里调用completeSnapshot方法的时候会对bufferedEvent变量进行判断,如果不等于null做一些complete相关的工作最后加入队列中,如果不调用该方法,则当前split的snapshot阶段读取的数据少了一条,嘻嘻嘻
        snapshotReceiver.completeSnapshot();
    }
// createDataEvents中调用到本类的createDataEventsForTable,也就是开始了具体读取逻辑
    private void createDataEventsForTable(
            RelationalSnapshotChangeEventSource.RelationalSnapshotContext snapshotContext,
            EventDispatcher.SnapshotReceiver snapshotReceiver,
            Table table)
            throws InterruptedException {
        long exportStart = clock.currentTimeInMillis();
        LOG.info("Exporting data from split '{}' of table {}", snapshotSplit.splitId(), table.id());
       // 构建sql
        final String selectSql =
                StatementUtils.buildSplitScanQuery(
                        snapshotSplit.getTableId(),
                        snapshotSplit.getSplitKeyType(),
                        snapshotSplit.getSplitStart() == null,
                        snapshotSplit.getSplitEnd() == null);
        LOG.info(
                "For split '{}' of table {} using select statement: '{}'",
                snapshotSplit.splitId(),
                table.id(),
                selectSql);
        try (PreparedStatement selectStatement =
                        StatementUtils.readTableSplitDataStatement( // 创建statement,然后查询sql
                                jdbcConnection,
                                selectSql,
                                snapshotSplit.getSplitStart() == null,
                                snapshotSplit.getSplitEnd() == null, snapshotSplit.getSplitStart(),
                                snapshotSplit.getSplitEnd(),
                                snapshotSplit.getSplitKeyType().getFieldCount(),
                                connectorConfig.getQueryFetchSize());
             // 然后对查询出来的数据进行封装成sourceRecord发送下游
                ResultSet rs = selectStatement.executeQuery()) {
            ColumnUtils.ColumnArray columnArray = ColumnUtils.toArray(rs, table);
            long rows = 0;
            Threads.Timer logTimer = getTableScanLogTimer();
            while (rs.next()) {
                rows++;
                final Object[] row = new Object[columnArray.getGreatestColumnPosition()];
                for (int i = 0; i < columnArray.getColumns().length; i++) {
                    Column actualColumn = table.columns().get(i);
                    row[columnArray.getColumns()[i].position() - 1] =
                            readField(rs, i + 1, actualColumn, table);
                }
                if (logTimer.expired()) {
                    long stop = clock.currentTimeInMillis();
                    LOG.info(
                            "Exported {} records for split '{}' after {}",
                            rows,
                            snapshotSplit.splitId(),
                            Strings.duration(stop - exportStart));
                    snapshotProgressListener.rowsScanned(table.id(), rows);
                    logTimer = getTableScanLogTimer();
                }
                // 这里会将数据放入队列,通过receiver接收数据,然后再将数据放入其队列的一个过程,其实不必深入,就是封装的比较好,难以理解
                dispatcher.dispatchSnapshotEvent(
                        table.id(),
                        getChangeRecordEmitter(snapshotContext, table.id(), row),// 就是new了一个
                        snapshotReceiver);
            }
            LOG.info(
                    "Finished exporting {} records for split '{}', total duration '{}'",
                    rows,
                    snapshotSplit.splitId(),
                    Strings.duration(clock.currentTimeInMillis() - exportStart));
        } catch (SQLException e) {
            throw new ConnectException("Snapshotting of table " + table.id() + " failed", e);
        }
    }
// -------------------------   dispatcher.dispatchSnapshotEvent方法之后的流程  ----------------------------------
 // 进入evnentDisptcher.dispatchSnapshotEvent方法
   public void dispatchSnapshotEvent(T dataCollectionId, ChangeRecordEmitter changeRecordEmitter, SnapshotReceiver receiver) throws InterruptedException {
        DataCollectionSchema dataCollectionSchema = schema.schemaFor(dataCollectionId);
        if (dataCollectionSchema == null) {
            errorOnMissingSchema(dataCollectionId, changeRecordEmitter);
        }
        changeRecordEmitter.emitChangeRecords(dataCollectionSchema, new Receiver() {
            @Override
            public void changeRecord(DataCollectionSchema schema,
                                     Operation operation,
                                     Object key, Struct value,
                                     OffsetContext offset,
                                     ConnectHeaders headers)
                    throws InterruptedException {
                eventListener.onEvent(dataCollectionSchema.id(), offset, key, value);
               // 真正的放入队列的逻辑在这里调用
               // receiver使我们传入的  对应BufferingSnapshotChangeRecordReceiver类
                receiver.changeRecord(dataCollectionSchema, operation, key, value, offset, headers);
            }
        });
    }
  // BufferingSnapshotChangeRecordReceiver的changeRecord方法
 // 前面简单介绍过他的处理逻辑了,就不必多做介绍了
  @Override
        public void changeRecord(DataCollectionSchema dataCollectionSchema,
                                 Operation operation,
                                 Object key, Struct value,
                                 OffsetContext offsetContext,
                                 ConnectHeaders headers)
                throws InterruptedException {
            Objects.requireNonNull(value, "value must not be null");
            LOGGER.trace("Received change record for {} operation on key {}", operation, key);
            if (bufferedEvent != null) {
                queue.enqueue(bufferedEvent.get());
            }
            Schema keySchema = dataCollectionSchema.keySchema();
            String topicName = topicSelector.topicNameFor((T) dataCollectionSchema.id());
            // the record is produced lazily, so to have the correct offset as per the pre/post completion callbacks
            bufferedEvent = () -> {
                SourceRecord record = new SourceRecord(
                        offsetContext.getPartition(),
                        offsetContext.getOffset(),
                        topicName, null,
                        keySchema, key,
                        dataCollectionSchema.getEnvelopeSchema().schema(), value,
                        null, headers);
                return changeEventCreator.createDataChangeEvent(record);
            };
        }
2.pollSplitRecords方法,这个方法是拉取queue中的数据

上面把数据读取后写入到queue的流程已经捋清楚了,我们现在看看reader是在什么时候读取了queue的数据

public Iterator<SourceRecord> pollSplitRecords() throws InterruptedException {
        checkReadException();
        if (hasNextElement.get()) {
            // data input: [low watermark event][snapshot events][high watermark event][binlog
            // events][binlog-end event]
            // data output: [low watermark event][normalized events][high watermark event]
            boolean reachBinlogEnd = false;
            final List<SourceRecord> sourceRecords = new ArrayList<>();
            while (!reachBinlogEnd) {
               // 可以看到这里直接queue.poll直接拉取数据即可,在这里会判断一下当前evnet是否是到达了结束的水位线,实际上就是高水位的位置,到达结束水位线之后,我们就可以停止了
                List<DataChangeEvent> batch = queue.poll();
                for (DataChangeEvent event : batch) {
                    sourceRecords.add(event.getRecord());
                    if (RecordUtils.isEndWatermarkEvent(event.getRecord())) {
                        reachBinlogEnd = true;
                        break;
                    }
                }
            }
            // snapshot split return its data once
            hasNextElement.set(false);
            return normalizedSplitRecords(currentSnapshotSplit, sourceRecords, nameAdjuster)
                    .iterator();
        }
        // the data has been polled, no more data
        reachEnd.compareAndSet(false, true);
        return null;
    }



通过上面的阅读我们已经看到了数据从读取到,再到放入队列中的一个过程,这里对队列做一个说明,在上面的介绍中存在两个队列,一个queue,一个elementQueue,这个的区别在于,queue是在读取数据阶段,将数据放入queue,在FetcherTask中调用了reader的fetch方法,将queue中的数据拉取到,并将其加入到elementQueue

在上面的操作中数据已经放入了elementQueue中,现在我们看看elementQueue中的数据是在什么时候发送到下游的


这里我们需要重新回到MysqlSource中通过一张图来看看

640.png

我们在创建Reader的时候传入了一个MysqlRecordEmitter,在后面发送数据的时候是通过这个类

对于发送数据到下游的逻辑是在MysqlSourceReader的父类(SourceReaderBase)中,但是发送的类是有Emitter完成的

由于相关方法是有上层调用执行,我们就不多看了,就简单说明一下,系统调用SourceReaderBase.pollNext(),开始触发数据collect的操作,将其发送至下游节点

640.png

我们这里直接阅读MysqlRecordEmitter源码看看他发送数据的逻辑,其实到这里跟sourceFucntion实现的原理基本差不多了我们这里简单过一下即可

public final class MySqlRecordEmitter<T> implements RecordEmitter<SourceRecord, T, MySqlSplitState> {
    private static final FlinkJsonTableChangeSerializer TABLE_CHANGE_SERIALIZER =
            new FlinkJsonTableChangeSerializer();
    private final DebeziumDeserializationSchema<T> debeziumDeserializationSchema;
    private final boolean includeSchemaChanges;
    private final OutputCollector<T> outputCollector;
    public MySqlRecordEmitter(
            DebeziumDeserializationSchema<T> debeziumDeserializationSchema,
            MySqlSourceReaderMetrics sourceReaderMetrics,
            boolean includeSchemaChanges) {
       // 对数据Deserialization的一个对象,与单并行度的类是同一个,具体内部逻辑可以自己看
        this.debeziumDeserializationSchema = debeziumDeserializationSchema;
        this.sourceReaderMetrics = sourceReaderMetrics;
        this.includeSchemaChanges = includeSchemaChanges;
        this.outputCollector = new OutputCollector<>();
    }
    @Override
    public void emitRecord(SourceRecord element, SourceOutput<T> output, MySqlSplitState splitState)
            throws Exception {
        // 判断一下消息的事件类型,如果是一个sourceReocrd则发送下游,否则对应其事件的相关操作
        if (isWatermarkEvent(element)) {
            BinlogOffset watermark = getWatermark(element);
            if (isHighWatermarkEvent(element) && splitState.isSnapshotSplitState()) {
                splitState.asSnapshotSplitState().setHighWatermark(watermark);
            }
        } else if (isSchemaChangeEvent(element) && splitState.isBinlogSplitState()) {
            HistoryRecord historyRecord = getHistoryRecord(element);
            Array tableChanges =
                    historyRecord.document().getArray(HistoryRecord.Fields.TABLE_CHANGES);
            TableChanges changes = TABLE_CHANGE_SERIALIZER.deserialize(tableChanges, true);
            for (TableChanges.TableChange tableChange : changes) {
                splitState.asBinlogSplitState().recordSchema(tableChange.getId(), tableChange);
            }
            if (includeSchemaChanges) {
                emitElement(element, output);
            }
        } else if (isDataChangeRecord(element)) {
            if (splitState.isBinlogSplitState()) {
                BinlogOffset position = getBinlogPosition(element);
                splitState.asBinlogSplitState().setStartingOffset(position);
            }
            reportMetrics(element);
            emitElement(element, output);
        } else {
            // unknown element
            LOG.info("Meet unknown element {}, just skip.", element);
        }
    }
    private void emitElement(SourceRecord element, SourceOutput<T> output) throws Exception {
        outputCollector.output = output;
       // 调用不同的Deserialization.deserialize方法完成数据的转换以及发送到下游
        debeziumDeserializationSchema.deserialize(element, outputCollector);
    }
    private static class OutputCollector<T> implements Collector<T> {
        private SourceOutput<T> output;
        @Override
        public void collect(T record) {
            output.collect(record);
        }
    }
}


四.题外话 <Table相关内容>

// RowData的DeserializeSchema,对应上面使用到的DeserializeSchema 做一个简单的介绍
public final class RowDataDebeziumDeserializeSchema
        implements DebeziumDeserializationSchema<RowData> {
      @Override
    public void deserialize(SourceRecord record, Collector<RowData> out) throws Exception {
        Envelope.Operation op = Envelope.operationFor(record);
        Struct value = (Struct) record.value();
        Schema valueSchema = record.valueSchema();
       // 针对不同的操作类型,我们需要器对应的数据  
       // after表示更改之后的数据结果  before表示更改之前的数据
       // 只有update的时候才需要同时使用 before和after(动动聪明脑袋瓜为啥呢)
       // 针对不同的操作使用RowKind进行表示,在sql层面会根据数据的标识来进行对应处理,比如insert,比如update操作
        // 所以对于table内容,我们只需要将数据转换成对应的RowData类型,并对其表示RowKind类型,框架便会帮我在sink的时候
       // 做出对应的操作,我们无需编写相关代码来实现
       // 所以 sql中使用 cdc 我们需要将其加上RowKind,对于后面的操作我们就无需关心了
       // 我们在sql中 formt格式是json,实际上走的逻辑这里我也没看,因为我不太关心,如果大家关心的话或者想了解的,大家可以自己去看看
       // 具体的实现方式
        if (op == Envelope.Operation.CREATE || op == Envelope.Operation.READ) {
           // 构建RowData的方法,我们有加进来,里面内容比较繁琐
           // 讲讲大概内容
           // 就是对于我们字段的类型,转换成对应的java中类型,当然这里面我说的是flink中,因为他对用到的类型都重新做了一层封装,这样才能支持我们db中的所有类型
            GenericRowData insert = extractAfterRow(value, valueSchema);
            validator.validate(insert, RowKind.INSERT);
            insert.setRowKind(RowKind.INSERT);
           // 具体的发送下游方法
            emit(record, insert, out);
        } else if (op == Envelope.Operation.DELETE) {
            GenericRowData delete = extractBeforeRow(value, valueSchema);
            validator.validate(delete, RowKind.DELETE);
            delete.setRowKind(RowKind.DELETE);
            emit(record, delete, out);
        } else {
            GenericRowData before = extractBeforeRow(value, valueSchema);
            validator.validate(before, RowKind.UPDATE_BEFORE);
            before.setRowKind(RowKind.UPDATE_BEFORE);
            emit(record, before, out);
            GenericRowData after = extractAfterRow(value, valueSchema);
            validator.validate(after, RowKind.UPDATE_AFTER);
            after.setRowKind(RowKind.UPDATE_AFTER);
            emit(record, after, out);
        }
    }
}

对于我们想实现一个tableSource的话,我们需要继承DynamicTableSourceFactory,实现下面的方法,然后通过spi的方法将其动态的加载


  • createDynamicTableSource  : 创建一个具体的source
  • factoryIdentifier : 可以认为是我们描述connector的名字,比如kafka
  • requiredOptions : with后面所必须要有参数,比如 username,password等,如果没有抛出异常
  • optionalOptions : with后面的配置项,这是可选择的可有可无

640.png


相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
17天前
|
DataWorks 安全 关系型数据库
DataWorks产品使用合集之使用Flink CDC读取PostgreSQL数据时如何指定编码格式
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
12天前
|
Oracle 关系型数据库 Java
实时计算 Flink版产品使用问题之源码 deploy,生成带有时间戳的jar包,如何修改配置信息
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
12天前
|
SQL 监控 关系型数据库
实时计算 Flink版产品使用问题之使用mysql cdc配置StartupOptions.initial()全量之后就不增量了,是什么原因
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
1月前
|
消息中间件 关系型数据库 MySQL
Apache Flink CDC 3.1.0 发布公告
Apache Flink 社区很高兴地宣布发布 Flink CDC 3.1.0!
583 1
Apache Flink CDC 3.1.0 发布公告
|
10天前
|
SQL 关系型数据库 MySQL
实时计算 Flink版操作报错合集之CDC任务在异常后整个record sent从0初始化开始,是什么导致的
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
404 0
|
10天前
|
关系型数据库 数据库 流计算
实时计算 Flink版操作报错合集之在使用Flink CDC TiDB Connector时,无法获取到事件,该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
345 0
|
10天前
|
关系型数据库 MySQL 数据库
实时计算 Flink版操作报错合集之下载了mysql的cdc的demo,在本地调试时,报错:找不到这个包,该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
105 0
|
1月前
|
SQL Java 数据处理
实时计算 Flink版产品使用合集之如何写一个opengauss的cdc
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStreamAPI、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
16天前
|
消息中间件 关系型数据库 MySQL
Flink CDC 最佳实践(以 MySQL 为例)
Flink CDC 最佳实践(以 MySQL 为例)
|
17天前
|
消息中间件 关系型数据库 MySQL