我们进入fetcherTask中,只看task逻辑即可
class FetcherTask{ @Override public boolean run() throws IOException { try { if (!isWakenUp() && lastRecords == null) { // 返回的是该对象 public final class MySqlRecords implements RecordsWithSplitIds<SourceRecord> // 调用了我们在创建fetcherTask的时候传入的splitReader对象,实际上还是调用reader的fetch方法来真正的获取数据 lastRecords = splitReader.fetch(); } if (!isWakenUp()) { // The order matters here. We must first put the last records into the queue. // This ensures the handling of the fetched records is atomic to wakeup. // 将读取的数据放入到队列汇总 if (elementsQueue.put(fetcherIndex, lastRecords)) { if (!lastRecords.finishedSplits().isEmpty()) { // The callback does not throw InterruptedException. splitFinishedCallback.accept(lastRecords.finishedSplits()); } lastRecords = null; } } } catch (InterruptedException e) { throw new IOException("Source fetch execution was interrupted", e); if (isWakenUp()) { wakeup = false; } } return true; } }
通过上面我们基本上已经清楚了在flink层面是怎么最终调用了cdc读取数据的代码,现在我们根据主要的读取代码看看是怎么样子实现的
currentReader.pollSplitRecords() ,我们简单介绍一下currentReader(BinlogSplitReader/SnapshotSplitReader)主要两种实现,大概的思路这里面会根据不同的性质区分进行读取数据,在submitSplit的时候会创建readRask读取指定split的数据,结果会放入StatefulTaskContext的queue中,在fetch方法会先提交split,让其执行read数据,然后通过pollSplitRecords方法在调用queue.poll拉取数据,这是一个阻塞的操作,如果超时则抛出中断异常
public class MySqlSplitReader implements SplitReader<SourceRecord, MySqlSplit> { private final Queue<MySqlSplit> splits; private final MySqlSourceConfig sourceConfig; private final int subtaskId; @Nullable private DebeziumReader<SourceRecord, MySqlSplit> currentReader; @Nullable private String currentSplitId; @Override public RecordsWithSplitIds<SourceRecord> fetch() throws IOException { // 执行fetch的时候提前检查一下currentReader,并根据不同的split创建不同的对应的reader,binlog/snapshot checkSplitOrStartNext(); Iterator<SourceRecord> dataIt = null; try { // 调用具体的debeziumReader执行任务 // 在reader中会调用StatefulTaskContext的queue的poll方法拉取数据,该方法会阻塞(也可以根据时间阻塞),如果时间间隔内没有返回数据则被中断,抛出InterruptedException dataIt = currentReader.pollSplitRecords(); } catch (InterruptedException e) { LOG.warn("fetch data failed.", e); throw new IOException(e); } return dataIt == null ? finishedSnapshotSplit() // 如果没有读取到数据则返回一个空的,该方法执行后会将currentSplitId置位null,表示已经该split执行完成 : MySqlRecords.forRecords(currentSplitId, dataIt); } @Override public void handleSplitsChanges(SplitsChange<MySqlSplit> splitsChanges) { if (!(splitsChanges instanceof SplitsAddition)) { throw new UnsupportedOperationException( String.format( "The SplitChange type of %s is not supported.", splitsChanges.getClass())); } LOG.debug("Handling split change {}", splitsChanges); splits.addAll(splitsChanges.splits()); } private void checkSplitOrStartNext() throws IOException { // the binlog reader should keep alive if (currentReader instanceof BinlogSplitReader) { return; } if (canAssignNextSplit()) { final MySqlSplit nextSplit = splits.poll(); if (nextSplit == null) { throw new IOException("Cannot fetch from another split - no split remaining"); } currentSplitId = nextSplit.splitId(); if (nextSplit.isSnapshotSplit()) { if (currentReader == null) { final MySqlConnection jdbcConnection = createMySqlConnection(sourceConfig.getDbzConfiguration()); final BinaryLogClient binaryLogClient = createBinaryClient(sourceConfig.getDbzConfiguration()); final StatefulTaskContext statefulTaskContext = new StatefulTaskContext(sourceConfig, binaryLogClient, jdbcConnection); currentReader = new SnapshotSplitReader(statefulTaskContext, subtaskId); } } else { // point from snapshot split to binlog split if (currentReader != null) { LOG.info("It's turn to read binlog split, close current snapshot reader"); currentReader.close(); } final MySqlConnection jdbcConnection = createMySqlConnection(sourceConfig.getDbzConfiguration()); final BinaryLogClient binaryLogClient = createBinaryClient(sourceConfig.getDbzConfiguration()); final StatefulTaskContext statefulTaskContext = new StatefulTaskContext(sourceConfig, binaryLogClient, jdbcConnection); currentReader = new BinlogSplitReader(statefulTaskContext, subtaskId); LOG.info("BinlogSplitReader is created."); } // 提交一个split到reader,reader会在在submitSplit方法创建ReadTask对象,进行读取数据,将数据放入StatefulTaskContext的queue中,readTask放入线程池执行任务 currentReader.submitSplit(nextSplit); } } private boolean canAssignNextSplit() { return currentReader == null || currentReader.isFinished(); } }
上面基本运行流程已经走通了现在就差实际读取数据的阶段了,现在我们直接跟着代码一点一点走 看看实际的执行逻辑是什么样子的,怎么读取数据的,由于cdc的代码比较多,我们就过滤掉了,binlogReadTask太繁琐,我们就不一步一步讲解了,后面可以简单介绍一下流程看看
我们一次看看对于reader对应的处理逻辑
1.checkSplitWithSplitIds方法
在这个方法中最主要的是调用了submitSplit开始我们下面的读取数据的一个流程
// ------------------------- SnapshotSplitReader.submitSplit方法 ------------------------------------------ public void submitSplit(MySqlSplit mySqlSplit) { this.currentSnapshotSplit = mySqlSplit.asSnapshotSplit(); statefulTaskContext.configure(currentSnapshotSplit); // 拿到context的queue,在pollSplitSrecords的时候需要 this.queue = statefulTaskContext.getQueue(); this.nameAdjuster = statefulTaskContext.getSchemaNameAdjuster(); this.hasNextElement.set(true); this.reachEnd.set(false); // 主要读取逻辑在readTask中 this.splitSnapshotReadTask = new MySqlSnapshotSplitReadTask( statefulTaskContext.getConnectorConfig(), statefulTaskContext.getOffsetContext(), statefulTaskContext.getSnapshotChangeEventSourceMetrics(), statefulTaskContext.getDatabaseSchema(), statefulTaskContext.getConnection(), statefulTaskContext.getDispatcher(), statefulTaskContext.getTopicSelector(), StatefulTaskContext.getClock(), currentSnapshotSplit); // 提交一个runnable到线程中,主要是执行readTask的execute方法 executor.submit( () -> { try { currentTaskRunning = true; // 自己实现的contextImpl 主要记录高水位和低水位用 final SnapshotSplitChangeEventSourceContextImpl sourceContext = new SnapshotSplitChangeEventSourceContextImpl(); // 执行readTask SnapshotResult snapshotResult = splitSnapshotReadTask.execute(sourceContext); final MySqlBinlogSplit backfillBinlogSplit = createBackfillBinlogSplit(sourceContext); // optimization that skip the binlog read when the low watermark equals high // watermark // 如由于snapshot是并行读取的,所以当该读取该split的数据,低水位和高水位相同,说明在read数据中没有出现其他操作,所以可以退出binlog优化阶段,可以认为该split范围的数据没有变更,不需要在snapshot之后进行binlog的read final boolean binlogBackfillRequired = backfillBinlogSplit .getEndingOffset() .isAfter(backfillBinlogSplit.getStartingOffset()); if (!binlogBackfillRequired) { dispatchHighWatermark(backfillBinlogSplit); currentTaskRunning = false; return; } // snapshot执行完成后,开始binlogReadTask的读取操作 if (snapshotResult.isCompletedOrSkipped()) { // 根据snapshot read task读取结束后,会记录高低水位,水位线作为参数构建binlog read task final MySqlBinlogSplitReadTask backfillBinlogReadTask = createBackfillBinlogReadTask(backfillBinlogSplit); // 执行binlog read task,由于里面的处理逻辑太复杂了,我们就不直接进行阅读了 // 我这里直接简单介绍一下流程,就是拿到snapshot的高水位,作为endOffset,在binlog read task中,会 // 以endOffset作为结束条件,小宇endOffset的数据都会被read,并发送下游 backfillBinlogReadTask.execute( new SnapshotBinlogSplitChangeEventSourceContextImpl()); } else { readException = new IllegalStateException( String.format( "Read snapshot for mysql split %s fail", currentSnapshotSplit)); } } catch (Exception e) { currentTaskRunning = false; LOG.error( String.format( "Execute snapshot read task for mysql split %s fail", currentSnapshotSplit), e); readException = e; } }); } // ------------------------- MySqlSnapshotSplitReadTask.execute(sourceContext)方法 ------------------------------------------ @Override public SnapshotResult execute(ChangeEventSourceContext context) throws InterruptedException { SnapshottingTask snapshottingTask = getSnapshottingTask(previousOffset);//就是new了一个 final SnapshotContext ctx; try { ctx = prepare(context); //重新new了一个 context对象,比较无用 } catch (Exception e) { LOG.error("Failed to initialize snapshot context.", e); throw new RuntimeException(e); } try { // 上面都是无用代码,这里直接调用了doExecute方法,我们进入该方法看主要逻辑即可 return doExecute(context, ctx, snapshottingTask); } catch (InterruptedException e) { LOG.warn("Snapshot was interrupted before completion"); throw e; } catch (Exception t) { throw new DebeziumException(t); } } // ------------------------- MySqlSnapshotSplitReadTask.doExecute(sourceContext)方法 ------------------------------------------ @Override protected SnapshotResult doExecute( ChangeEventSourceContext context, SnapshotContext snapshotContext, SnapshottingTask snapshottingTask) throws Exception { final RelationalSnapshotChangeEventSource.RelationalSnapshotContext ctx = (RelationalSnapshotChangeEventSource.RelationalSnapshotContext) snapshotContext; ctx.offset = offsetContext; // 一个dispatcher,用于记录水位线事件,后面会通过该dispatcher发射数据,当然是通过emitter发射了 final SignalEventDispatcher signalEventDispatcher = new SignalEventDispatcher( offsetContext.getPartition(), topicSelector.topicNameFor(snapshotSplit.getTableId()), dispatcher.getQueue()); // 其实log输出的日志就已经很清晰了 // 记录低水位 final BinlogOffset lowWatermark = currentBinlogOffset(jdbcConnection); LOG.info( "Snapshot step 1 - Determining low watermark {} for split {}", lowWatermark, snapshotSplit); ((SnapshotSplitReader.SnapshotSplitChangeEventSourceContextImpl) (context)) .setLowWatermark(lowWatermark); signalEventDispatcher.dispatchWatermarkEvent( snapshotSplit, lowWatermark, SignalEventDispatcher.WatermarkKind.LOW); LOG.info("Snapshot step 2 - Snapshotting data"); // 读取数据 主要方法重点介绍的地方 createDataEvents(ctx, snapshotSplit.getTableId()); // 记录高水位 final BinlogOffset highWatermark = currentBinlogOffset(jdbcConnection); LOG.info( "Snapshot step 3 - Determining high watermark {} for split {}", highWatermark, snapshotSplit); signalEventDispatcher.dispatchWatermarkEvent( snapshotSplit, highWatermark, SignalEventDispatcher.WatermarkKind.HIGH); ((SnapshotSplitReader.SnapshotSplitChangeEventSourceContextImpl) (context)) .setHighWatermark(highWatermark); return SnapshotResult.completed(ctx.offset); } // 我们看看createDataEvents 调用过程 private void createDataEvents( RelationalSnapshotChangeEventSource.RelationalSnapshotContext snapshotContext, TableId tableId) throws Exception { EventDispatcher.SnapshotReceiver snapshotReceiver = dispatcher.getSnapshotChangeEventReceiver(); LOG.debug("Snapshotting table {}", tableId); createDataEventsForTable( snapshotContext, snapshotReceiver, databaseSchema.tableFor(tableId)); // receiver的逻辑我们就不看了,我这里介绍一下就好 // receiver通过changeRecord方法接收到数据后,通过一个成员变量(bufferedEvent)控制,如果!=null加入队列,然后创建一个新的SourceRecord,直到所有的数据读取完成,所以说最后一条数据创建成功之后,如果没有新的数据了,则不会调用changeRecord该方法,也就是说成员变量记录了最后一个record // 这里调用completeSnapshot方法的时候会对bufferedEvent变量进行判断,如果不等于null做一些complete相关的工作最后加入队列中,如果不调用该方法,则当前split的snapshot阶段读取的数据少了一条,嘻嘻嘻 snapshotReceiver.completeSnapshot(); } // createDataEvents中调用到本类的createDataEventsForTable,也就是开始了具体读取逻辑 private void createDataEventsForTable( RelationalSnapshotChangeEventSource.RelationalSnapshotContext snapshotContext, EventDispatcher.SnapshotReceiver snapshotReceiver, Table table) throws InterruptedException { long exportStart = clock.currentTimeInMillis(); LOG.info("Exporting data from split '{}' of table {}", snapshotSplit.splitId(), table.id()); // 构建sql final String selectSql = StatementUtils.buildSplitScanQuery( snapshotSplit.getTableId(), snapshotSplit.getSplitKeyType(), snapshotSplit.getSplitStart() == null, snapshotSplit.getSplitEnd() == null); LOG.info( "For split '{}' of table {} using select statement: '{}'", snapshotSplit.splitId(), table.id(), selectSql); try (PreparedStatement selectStatement = StatementUtils.readTableSplitDataStatement( // 创建statement,然后查询sql jdbcConnection, selectSql, snapshotSplit.getSplitStart() == null, snapshotSplit.getSplitEnd() == null, snapshotSplit.getSplitStart(), snapshotSplit.getSplitEnd(), snapshotSplit.getSplitKeyType().getFieldCount(), connectorConfig.getQueryFetchSize()); // 然后对查询出来的数据进行封装成sourceRecord发送下游 ResultSet rs = selectStatement.executeQuery()) { ColumnUtils.ColumnArray columnArray = ColumnUtils.toArray(rs, table); long rows = 0; Threads.Timer logTimer = getTableScanLogTimer(); while (rs.next()) { rows++; final Object[] row = new Object[columnArray.getGreatestColumnPosition()]; for (int i = 0; i < columnArray.getColumns().length; i++) { Column actualColumn = table.columns().get(i); row[columnArray.getColumns()[i].position() - 1] = readField(rs, i + 1, actualColumn, table); } if (logTimer.expired()) { long stop = clock.currentTimeInMillis(); LOG.info( "Exported {} records for split '{}' after {}", rows, snapshotSplit.splitId(), Strings.duration(stop - exportStart)); snapshotProgressListener.rowsScanned(table.id(), rows); logTimer = getTableScanLogTimer(); } // 这里会将数据放入队列,通过receiver接收数据,然后再将数据放入其队列的一个过程,其实不必深入,就是封装的比较好,难以理解 dispatcher.dispatchSnapshotEvent( table.id(), getChangeRecordEmitter(snapshotContext, table.id(), row),// 就是new了一个 snapshotReceiver); } LOG.info( "Finished exporting {} records for split '{}', total duration '{}'", rows, snapshotSplit.splitId(), Strings.duration(clock.currentTimeInMillis() - exportStart)); } catch (SQLException e) { throw new ConnectException("Snapshotting of table " + table.id() + " failed", e); } } // ------------------------- dispatcher.dispatchSnapshotEvent方法之后的流程 ---------------------------------- // 进入evnentDisptcher.dispatchSnapshotEvent方法 public void dispatchSnapshotEvent(T dataCollectionId, ChangeRecordEmitter changeRecordEmitter, SnapshotReceiver receiver) throws InterruptedException { DataCollectionSchema dataCollectionSchema = schema.schemaFor(dataCollectionId); if (dataCollectionSchema == null) { errorOnMissingSchema(dataCollectionId, changeRecordEmitter); } changeRecordEmitter.emitChangeRecords(dataCollectionSchema, new Receiver() { @Override public void changeRecord(DataCollectionSchema schema, Operation operation, Object key, Struct value, OffsetContext offset, ConnectHeaders headers) throws InterruptedException { eventListener.onEvent(dataCollectionSchema.id(), offset, key, value); // 真正的放入队列的逻辑在这里调用 // receiver使我们传入的 对应BufferingSnapshotChangeRecordReceiver类 receiver.changeRecord(dataCollectionSchema, operation, key, value, offset, headers); } }); } // BufferingSnapshotChangeRecordReceiver的changeRecord方法 // 前面简单介绍过他的处理逻辑了,就不必多做介绍了 @Override public void changeRecord(DataCollectionSchema dataCollectionSchema, Operation operation, Object key, Struct value, OffsetContext offsetContext, ConnectHeaders headers) throws InterruptedException { Objects.requireNonNull(value, "value must not be null"); LOGGER.trace("Received change record for {} operation on key {}", operation, key); if (bufferedEvent != null) { queue.enqueue(bufferedEvent.get()); } Schema keySchema = dataCollectionSchema.keySchema(); String topicName = topicSelector.topicNameFor((T) dataCollectionSchema.id()); // the record is produced lazily, so to have the correct offset as per the pre/post completion callbacks bufferedEvent = () -> { SourceRecord record = new SourceRecord( offsetContext.getPartition(), offsetContext.getOffset(), topicName, null, keySchema, key, dataCollectionSchema.getEnvelopeSchema().schema(), value, null, headers); return changeEventCreator.createDataChangeEvent(record); }; }
2.pollSplitRecords方法,这个方法是拉取queue中的数据
上面把数据读取后写入到queue的流程已经捋清楚了,我们现在看看reader是在什么时候读取了queue的数据
public Iterator<SourceRecord> pollSplitRecords() throws InterruptedException { checkReadException(); if (hasNextElement.get()) { // data input: [low watermark event][snapshot events][high watermark event][binlog // events][binlog-end event] // data output: [low watermark event][normalized events][high watermark event] boolean reachBinlogEnd = false; final List<SourceRecord> sourceRecords = new ArrayList<>(); while (!reachBinlogEnd) { // 可以看到这里直接queue.poll直接拉取数据即可,在这里会判断一下当前evnet是否是到达了结束的水位线,实际上就是高水位的位置,到达结束水位线之后,我们就可以停止了 List<DataChangeEvent> batch = queue.poll(); for (DataChangeEvent event : batch) { sourceRecords.add(event.getRecord()); if (RecordUtils.isEndWatermarkEvent(event.getRecord())) { reachBinlogEnd = true; break; } } } // snapshot split return its data once hasNextElement.set(false); return normalizedSplitRecords(currentSnapshotSplit, sourceRecords, nameAdjuster) .iterator(); } // the data has been polled, no more data reachEnd.compareAndSet(false, true); return null; }
通过上面的阅读我们已经看到了数据从读取到,再到放入队列中的一个过程,这里对队列做一个说明,在上面的介绍中存在两个队列,一个queue,一个elementQueue,这个的区别在于,queue是在读取数据阶段,将数据放入queue,在FetcherTask中调用了reader的fetch方法,将queue中的数据拉取到,并将其加入到elementQueue
在上面的操作中数据已经放入了elementQueue中,现在我们看看elementQueue中的数据是在什么时候发送到下游的
这里我们需要重新回到MysqlSource中通过一张图来看看
我们在创建Reader的时候传入了一个MysqlRecordEmitter,在后面发送数据的时候是通过这个类
对于发送数据到下游的逻辑是在MysqlSourceReader的父类(SourceReaderBase)中,但是发送的类是有Emitter完成的
由于相关方法是有上层调用执行,我们就不多看了,就简单说明一下,系统调用SourceReaderBase.pollNext(),开始触发数据collect的操作,将其发送至下游节点
我们这里直接阅读MysqlRecordEmitter源码看看他发送数据的逻辑,其实到这里跟sourceFucntion实现的原理基本差不多了我们这里简单过一下即可
public final class MySqlRecordEmitter<T> implements RecordEmitter<SourceRecord, T, MySqlSplitState> { private static final FlinkJsonTableChangeSerializer TABLE_CHANGE_SERIALIZER = new FlinkJsonTableChangeSerializer(); private final DebeziumDeserializationSchema<T> debeziumDeserializationSchema; private final boolean includeSchemaChanges; private final OutputCollector<T> outputCollector; public MySqlRecordEmitter( DebeziumDeserializationSchema<T> debeziumDeserializationSchema, MySqlSourceReaderMetrics sourceReaderMetrics, boolean includeSchemaChanges) { // 对数据Deserialization的一个对象,与单并行度的类是同一个,具体内部逻辑可以自己看 this.debeziumDeserializationSchema = debeziumDeserializationSchema; this.sourceReaderMetrics = sourceReaderMetrics; this.includeSchemaChanges = includeSchemaChanges; this.outputCollector = new OutputCollector<>(); } @Override public void emitRecord(SourceRecord element, SourceOutput<T> output, MySqlSplitState splitState) throws Exception { // 判断一下消息的事件类型,如果是一个sourceReocrd则发送下游,否则对应其事件的相关操作 if (isWatermarkEvent(element)) { BinlogOffset watermark = getWatermark(element); if (isHighWatermarkEvent(element) && splitState.isSnapshotSplitState()) { splitState.asSnapshotSplitState().setHighWatermark(watermark); } } else if (isSchemaChangeEvent(element) && splitState.isBinlogSplitState()) { HistoryRecord historyRecord = getHistoryRecord(element); Array tableChanges = historyRecord.document().getArray(HistoryRecord.Fields.TABLE_CHANGES); TableChanges changes = TABLE_CHANGE_SERIALIZER.deserialize(tableChanges, true); for (TableChanges.TableChange tableChange : changes) { splitState.asBinlogSplitState().recordSchema(tableChange.getId(), tableChange); } if (includeSchemaChanges) { emitElement(element, output); } } else if (isDataChangeRecord(element)) { if (splitState.isBinlogSplitState()) { BinlogOffset position = getBinlogPosition(element); splitState.asBinlogSplitState().setStartingOffset(position); } reportMetrics(element); emitElement(element, output); } else { // unknown element LOG.info("Meet unknown element {}, just skip.", element); } } private void emitElement(SourceRecord element, SourceOutput<T> output) throws Exception { outputCollector.output = output; // 调用不同的Deserialization.deserialize方法完成数据的转换以及发送到下游 debeziumDeserializationSchema.deserialize(element, outputCollector); } private static class OutputCollector<T> implements Collector<T> { private SourceOutput<T> output; @Override public void collect(T record) { output.collect(record); } } }
四.题外话 <Table相关内容>
// RowData的DeserializeSchema,对应上面使用到的DeserializeSchema 做一个简单的介绍 public final class RowDataDebeziumDeserializeSchema implements DebeziumDeserializationSchema<RowData> { @Override public void deserialize(SourceRecord record, Collector<RowData> out) throws Exception { Envelope.Operation op = Envelope.operationFor(record); Struct value = (Struct) record.value(); Schema valueSchema = record.valueSchema(); // 针对不同的操作类型,我们需要器对应的数据 // after表示更改之后的数据结果 before表示更改之前的数据 // 只有update的时候才需要同时使用 before和after(动动聪明脑袋瓜为啥呢) // 针对不同的操作使用RowKind进行表示,在sql层面会根据数据的标识来进行对应处理,比如insert,比如update操作 // 所以对于table内容,我们只需要将数据转换成对应的RowData类型,并对其表示RowKind类型,框架便会帮我在sink的时候 // 做出对应的操作,我们无需编写相关代码来实现 // 所以 sql中使用 cdc 我们需要将其加上RowKind,对于后面的操作我们就无需关心了 // 我们在sql中 formt格式是json,实际上走的逻辑这里我也没看,因为我不太关心,如果大家关心的话或者想了解的,大家可以自己去看看 // 具体的实现方式 if (op == Envelope.Operation.CREATE || op == Envelope.Operation.READ) { // 构建RowData的方法,我们有加进来,里面内容比较繁琐 // 讲讲大概内容 // 就是对于我们字段的类型,转换成对应的java中类型,当然这里面我说的是flink中,因为他对用到的类型都重新做了一层封装,这样才能支持我们db中的所有类型 GenericRowData insert = extractAfterRow(value, valueSchema); validator.validate(insert, RowKind.INSERT); insert.setRowKind(RowKind.INSERT); // 具体的发送下游方法 emit(record, insert, out); } else if (op == Envelope.Operation.DELETE) { GenericRowData delete = extractBeforeRow(value, valueSchema); validator.validate(delete, RowKind.DELETE); delete.setRowKind(RowKind.DELETE); emit(record, delete, out); } else { GenericRowData before = extractBeforeRow(value, valueSchema); validator.validate(before, RowKind.UPDATE_BEFORE); before.setRowKind(RowKind.UPDATE_BEFORE); emit(record, before, out); GenericRowData after = extractAfterRow(value, valueSchema); validator.validate(after, RowKind.UPDATE_AFTER); after.setRowKind(RowKind.UPDATE_AFTER); emit(record, after, out); } } }
对于我们想实现一个tableSource的话,我们需要继承DynamicTableSourceFactory,实现下面的方法,然后通过spi的方法将其动态的加载
- createDynamicTableSource : 创建一个具体的source
- factoryIdentifier : 可以认为是我们描述connector的名字,比如kafka
- requiredOptions : with后面所必须要有参数,比如 username,password等,如果没有抛出异常
- optionalOptions : with后面的配置项,这是可选择的可有可无