三.mysql-cdc源码-新Source接口的实现
1.11版本之后flink提供了新的source接口,可以提前预习一波https://issues.apache.org/jira/browse/FLINK-10740
简单介绍一下
SourceReader : 对split的的数据进行读取操作,比如: 读取一个分区,一个块等,当然不只局限与一个分区,根据自己的实现来
SplitEnumerator : 负责对数据源进行切分或者发现分区等,比如: 发现kafka的分区,对文件划分块等
上述的比较简单,实际上比这复杂一点,所以在新的source接口实现一个source是比较难的事情,不过熟悉之后都一样
提前说明 :
一个split我们可以认为是一个切片,mysql-cdc中, 假想情况下 : 一张的一部分中, 比如 开始主键 1 到 结束主键 10 ,那么该split就表示这些数据,在具体读取数据的时候是有readTask来去读,那么他就会通过split标记的点位来进行数据的读取,当然一个readTask不止会执行一个split;
snapshot表示的是读取数据库的历史全量数据
binlog 表示当我们snapshot阶段结束后开始binlog阶段,即我们开始读取的binlog数据了
先执行snapshot阶段,后执行binlog阶段
代码的生成和旧版是相同的,只不过是内部执行的逻辑存在变化,新的source接口实现的cdc代码比较复杂,涉及的内容比较多,可能比较晕,后面自己可以根据源码debug走一走
由于代码过多,主要讲解重点的内容,不重要的跳过了
// 实现了两个接口 source,和 resultTypeQueryable(比较简单就一个获取结果类型信息的接口) , 主要代码还是在source接口的实现 // T 为输出类型,MySqlSplit是mysql的分割器,PendingSplitsState表示Enumerator的状态对象 public class MySqlSource<T> implements Source<T, MySqlSplit, PendingSplitsState>, ResultTypeQueryable<T> { private final MySqlSourceConfigFactory configFactory; private final DebeziumDeserializationSchema<T> deserializationSchema; /* 通过构造者模式构建source所需要的参数,简单说明一下,里面的参数,通过MySqlSourceConfigFactory添加参数,在build方法中,将factory作为参数构建出MySqlSource -------------------------------------讲解一下对应关系------------------------------------------------ MySqlSourceConfigFactory 可以根据不同的subtask创建对应的MySqlSourceConfig MySqlSourceConfig 可以构建 MySqlConnectorConfig MySqlConnection 通过 DebeziumUtil.createMySqlConnection(mySqlSourceConfig.getDbzConfiguration())方法构建 上面的一个config比较混乱,名字也比较不容易理解,后面用到的时候会简单提一下,这里主要是有一个印象,不要被一些配置搞混 */ public static <T> MySqlSourceBuilder<T> builder() { return new MySqlSourceBuilder<>(); } // 由MySqlSourceBuilder.build方法创建 MySqlSource( MySqlSourceConfigFactory configFactory, DebeziumDeserializationSchema<T> deserializationSchema // 与老版source的deserialization一样 ) { this.configFactory = configFactory; this.deserializationSchema = deserializationSchema; } @Override // 流批一体的source,表示有界性,新source接口的特性 public Boundedness getBoundedness() {return Boundedness.CONTINUOUS_UNBOUNDED; } /*构建sourceReader */ @Override public SourceReader<T, MySqlSplit> createReader(SourceReaderContext readerContext) throws Exception { // 前面提到了,根据subtask索引创建对应的config MySqlSourceConfig sourceConfig = configFactory.createConfig(readerContext.getIndexOfSubtask()); // 一个阻塞队列,多线程交互用的,不必深入 FutureCompletingBlockingQueue<RecordsWithSplitIds<SourceRecord>> elementsQueue = new FutureCompletingBlockingQueue<>(); // metric相关 final MySqlSourceReaderMetrics sourceReaderMetrics = new MySqlSourceReaderMetrics(readerContext.metricGroup()); sourceReaderMetrics.registerMetrics(); // 通过supplier函数构建一个SplitReader,解耦的作用,主要看里面的MySqlSplitReader实现即可 Supplier<MySqlSplitReader> splitReaderSupplier = // 拿到每个reader的config和对应的subtask index () -> new MySqlSplitReader(sourceConfig, readerContext.getIndexOfSubtask()); // 构建了一个具体的sourceReader return new MySqlSourceReader<>( elementsQueue, splitReaderSupplier, new MySqlRecordEmitter<>( deserializationSchema, sourceReaderMetrics, sourceConfig.isIncludeSchemaChanges()), readerContext.getConfiguration(), readerContext, sourceConfig); } @Override public SplitEnumerator<MySqlSplit, PendingSplitsState> createEnumerator( SplitEnumeratorContext<MySqlSplit> enumContext) { // 因为只会生成一次所以生成一个sourceConfig即可 MySqlSourceConfig sourceConfig = configFactory.createConfig(0); // 检验mysql final MySqlValidator validator = new MySqlValidator(sourceConfig); validator.validate(); final MySqlSplitAssigner splitAssigner; // 判断开始条件如果是initial则先读取mysql table的数据(代码中叫做snapshot),然后再继续读取binlog的数据,如果不是initial状态,则直接从binlog开始读取 if (sourceConfig.getStartupOptions().startupMode == StartupMode.INITIAL) { try (JdbcConnection jdbc = openJdbcConnection(sourceConfig)) { final List<TableId> remainingTables = discoverCapturedTables(jdbc, sourceConfig); boolean isTableIdCaseSensitive = DebeziumUtils.isTableIdCaseSensitive(jdbc); splitAssigner = // 里面包含 snapshot和binlog的split逻辑 new MySqlHybridSplitAssigner( sourceConfig, enumContext.currentParallelism(), remainingTables, isTableIdCaseSensitive); } catch (Exception e) { throw new FlinkRuntimeException( "Failed to discover captured tables for enumerator", e); } } else { // 之有binlog的split逻辑 splitAssigner = new MySqlBinlogSplitAssigner(sourceConfig); } // 创建对应发的SplitEnumerator,用于构建split给reader读取 return new MySqlSourceEnumerator(enumContext, sourceConfig, splitAssigner); } // 恢复SplitEnumerato,比如任务故障重启,会根据不同的checkpoint恢复SplitEnumerator,用于继续之前未完成的读取操作 @Override public SplitEnumerator<MySqlSplit, PendingSplitsState> restoreEnumerator( SplitEnumeratorContext<MySqlSplit> enumContext, PendingSplitsState checkpoint) { MySqlSourceConfig sourceConfig = configFactory.createConfig(0); final MySqlSplitAssigner splitAssigner; if (checkpoint instanceof HybridPendingSplitsState) { splitAssigner = new MySqlHybridSplitAssigner( sourceConfig, enumContext.currentParallelism(), (HybridPendingSplitsState) checkpoint); } else if (checkpoint instanceof BinlogPendingSplitsState) { splitAssigner = new MySqlBinlogSplitAssigner( sourceConfig, (BinlogPendingSplitsState) checkpoint); } else { throw new UnsupportedOperationException( "Unsupported restored PendingSplitsState: " + checkpoint); } return new MySqlSourceEnumerator(enumContext, sourceConfig, splitAssigner); } // -----------------容错相关,不是重点----------------- @Override public SimpleVersionedSerializer<MySqlSplit> getSplitSerializer() { return MySqlSplitSerializer.INSTANCE; } @Override public SimpleVersionedSerializer<PendingSplitsState> getEnumeratorCheckpointSerializer() { return new PendingSplitsStateSerializer(getSplitSerializer());} // 返回值类型的提取 @Override public TypeInformation<T> getProducedType() {return deserializationSchema.getProducedType();} }
上面的代码中我们可以看到source的实现,主要是构建sourceReader和splitEnumerator,以及容错内容,相关的处理逻辑也封装在相应的对象中,下面我们对其内部逐步剖析
/*在看其他内容之前,我们可以看看如何对mysql进行split操作,在snapshot是通过主键来split的,binlog的只从当前offset位置开始消费, 这里是混合的一个split,另外还存在binlog和snapshot的splitAssigner,不过我们根据主要看看大致逻辑,具体到某一直可以自己阅读理解, 解释一下 : 先读取mysql历史数据即snapshot阶段,然后再进行当前mysql-binlog的位置开始消费,所以这个混合的意义就是先读取全量数据,然后从最新的binlog开始读取,完成cdc读取数据的过程*/ public class MySqlHybridSplitAssigner implements MySqlSplitAssigner { private final int splitMetaGroupSize; private boolean isBinlogSplitAssigned; private final MySqlSnapshotSplitAssigner snapshotSplitAssigner; public MySqlHybridSplitAssigner( MySqlSourceConfig sourceConfig, int currentParallelism, List<TableId> remainingTables, boolean isTableIdCaseSensitive) { this( // 创建snapshot split new MySqlSnapshotSplitAssigner( sourceConfig, currentParallelism, remainingTables, isTableIdCaseSensitive), false, sourceConfig.getSplitMetaGroupSize()); } public MySqlHybridSplitAssigner( MySqlSourceConfig sourceConfig, int currentParallelism, HybridPendingSplitsState checkpoint) { this( new MySqlSnapshotSplitAssigner( sourceConfig, currentParallelism, checkpoint.getSnapshotPendingSplits()), checkpoint.isBinlogSplitAssigned(), sourceConfig.getSplitMetaGroupSize()); } private MySqlHybridSplitAssigner( MySqlSnapshotSplitAssigner snapshotSplitAssigner, boolean isBinlogSplitAssigned, int splitMetaGroupSize) { this.snapshotSplitAssigner = snapshotSplitAssigner; this.isBinlogSplitAssigned = isBinlogSplitAssigned; this.splitMetaGroupSize = splitMetaGroupSize; } @Override public void open() { snapshotSplitAssigner.open(); } // 主要返回下一个split,没有则返回一个空, optional可以jdk8的新特性,用于解决空指针的一个类 @Override public Optional<MySqlSplit> getNext() { // 下面的方法可以见名知意,自行理解即可 if (snapshotSplitAssigner.noMoreSplits()) { if (isBinlogSplitAssigned) { return Optional.empty(); } else if (snapshotSplitAssigner.isFinished()) { // 当snapshot完成后,开始binlog的split流程 // we need to wait snapshot-assigner to be finished before // assigning the binlog split. Otherwise, records emitted from binlog split // might be out-of-order in terms of same primary key with snapshot splits. isBinlogSplitAssigned = true; return Optional.of(createBinlogSplit()); } else { // binlog split is not ready by now return Optional.empty(); } } else { // snapshot assigner still have remaining splits, assign split from it return snapshotSplitAssigner.getNext(); } } // splitAssigner是否在等待已完成split回调,即onFinishedSplits @Override public boolean waitingForFinishedSplits() { return snapshotSplitAssigner.waitingForFinishedSplits(); } // 获取已完成的split并且包含他的元数据,可以根据已经完成snapshot(snapshot的某一个split)生成对应binlog的split @Override public List<FinishedSnapshotSplitInfo> getFinishedSplitInfos() { return snapshotSplitAssigner.getFinishedSplitInfos(); } // 使用已完成的binlog偏移量来处理已完成的split,用于确定何时生成binlog split以及生成什么binlog split,就是回调 @Override public void onFinishedSplits(Map<String, BinlogOffset> splitFinishedOffsets) { snapshotSplitAssigner.onFinishedSplits(splitFinishedOffsets); } // 向此splitAssigner添加一组spilt,当某些split处理失败,则需要重新添加分割时调用此方法 @Override public void addSplits(Collection<MySqlSplit> splits) { List<MySqlSplit> snapshotSplits = new ArrayList<>(); for (MySqlSplit split : splits) { if (split.isSnapshotSplit()) { snapshotSplits.add(split); } else { // we don't store the split, but will re-create binlog split later isBinlogSplitAssigned = false; } } snapshotSplitAssigner.addSplits(snapshotSplits); } // ----------------------------checkpoint 容错相关---------------------------------------- @Override public PendingSplitsState snapshotState(long checkpointId) { return new HybridPendingSplitsState( snapshotSplitAssigner.snapshotState(checkpointId), isBinlogSplitAssigned); } @Override public void notifyCheckpointComplete(long checkpointId) { snapshotSplitAssigner.notifyCheckpointComplete(checkpointId); } @Override public void close() { snapshotSplitAssigner.close(); } // ------------------------------------binlog split部分------------------------------------------- // 构建biglog split, 就是根据已经完成snapshot split来构建binlog split的一个过程,split代码比较简单可以自行阅读 // 简单介绍一下 就是描述binlog的split,snapshot的split相关内容,比如snapshot,会按照主键去做split,已经table的schemas相关信息 private MySqlBinlogSplit createBinlogSplit() { final List<MySqlSnapshotSplit> assignedSnapshotSplit = snapshotSplitAssigner.getAssignedSplits().values().stream() .sorted(Comparator.comparing(MySqlSplit::splitId)) .collect(Collectors.toList()); Map<String, BinlogOffset> splitFinishedOffsets = snapshotSplitAssigner.getSplitFinishedOffsets(); final List<FinishedSnapshotSplitInfo> finishedSnapshotSplitInfos = new ArrayList<>(); BinlogOffset minBinlogOffset = null; for (MySqlSnapshotSplit split : assignedSnapshotSplit) { // find the min binlog offset BinlogOffset binlogOffset = splitFinishedOffsets.get(split.splitId()); if (minBinlogOffset == null || binlogOffset.isBefore(minBinlogOffset)) { minBinlogOffset = binlogOffset; } finishedSnapshotSplitInfos.add( new FinishedSnapshotSplitInfo( split.getTableId(), split.splitId(), split.getSplitStart(), split.getSplitEnd(), binlogOffset)); } boolean divideMetaToGroups = finishedSnapshotSplitInfos.size() > splitMetaGroupSize; return new MySqlBinlogSplit( BINLOG_SPLIT_ID, minBinlogOffset == null ? BinlogOffset.INITIAL_OFFSET : minBinlogOffset, BinlogOffset.NO_STOPPING_OFFSET, divideMetaToGroups ? new ArrayList<>() : finishedSnapshotSplitInfos, new HashMap<>(), finishedSnapshotSplitInfos.size()); } }
现在我们开始介绍sourceReader和SplitEnumerator
sourceReader :
/* SingleThreadMultiplexSourceReaderBase */ public class MySqlSourceReader<T> extends SingleThreadMultiplexSourceReaderBase<SourceRecord, T, MySqlSplit, MySqlSplitState> { private static final Logger LOG = LoggerFactory.getLogger(MySqlSourceReader.class); private final MySqlSourceConfig sourceConfig; private final Map<String, MySqlSnapshotSplit> finishedUnackedSplits; private final Map<String, MySqlBinlogSplit> uncompletedBinlogSplits; private final int subtaskId; public MySqlSourceReader( FutureCompletingBlockingQueue<RecordsWithSplitIds<SourceRecord>> elementQueue, Supplier<MySqlSplitReader> splitReaderSupplier, RecordEmitter<SourceRecord, T, MySqlSplitState> recordEmitter, Configuration config, SourceReaderContext context, MySqlSourceConfig sourceConfig) { super( elementQueue, // 一个单线程的fetcher管理器,做一些读取操作 // 简单描述一下流程 // SingleThreadFetcherManager.createSplitFetcher 构建一个SplitFetcher(实现了Runnable),在SplitFetcher中会构建一个fetcherTask,SplitFetcher.run方法中,循环调用this.runOnce(),this.runOnce()会持续调用fetcherTask.run()读取数据,run()会调用MySqlSplitReader.fetch方法,返回reader读取的数据,并将数据放入到elementQueue中,只要涉及都多线程的代码,都比较晦涩难懂 new SingleThreadFetcherManager<>(elementQueue, splitReaderSupplier::get), recordEmitter, config, context); this.sourceConfig = sourceConfig; this.finishedUnackedSplits = new HashMap<>(); this.uncompletedBinlogSplits = new HashMap<>(); this.subtaskId = context.getIndexOfSubtask(); } // 启动reader @Override public void start() { if (getNumberOfCurrentlyAssignedSplits() == 0) { // 发送split的请求到splitEnumerator,会调用到SplitEnumerator.handleSplitRequest(int, String)方法,会带这并行的reader的subtask id 和hostname context.sendSplitRequest(); } } // 当reader分配到新的split的时候,会初始化一个split的state @Override protected MySqlSplitState initializedState(MySqlSplit split) { if (split.isSnapshotSplit()) { return new MySqlSnapshotSplitState(split.asSnapshotSplit()); } else { return new MySqlBinlogSplitState(split.asBinlogSplit()); } } @Override // 容错相关, skip public List<MySqlSplit> snapshotState(long checkpointId) { // unfinished splits List<MySqlSplit> stateSplits = super.snapshotState(checkpointId); // add finished snapshot splits that didn't receive ack yet stateSplits.addAll(finishedUnackedSplits.values()); // add binlog splits who are uncompleted stateSplits.addAll(uncompletedBinlogSplits.values()); return stateSplits; } // 清理处理已完成的split状态,非重点 @Override protected void onSplitFinished(Map<String, MySqlSplitState> finishedSplitIds) { for (MySqlSplitState mySqlSplitState : finishedSplitIds.values()) { MySqlSplit mySqlSplit = mySqlSplitState.toMySqlSplit(); checkState( mySqlSplit.isSnapshotSplit(), String.format( "Only snapshot split could finish, but the actual split is binlog split %s", mySqlSplit)); finishedUnackedSplits.put(mySqlSplit.splitId(), mySqlSplit.asSnapshotSplit()); } reportFinishedSnapshotSplitsIfNeed(); context.sendSplitRequest(); } /*添加此reader要read的split列表,当splitEnumerator通过splitEnumeratorContext分配一个splut时,将调用此方法 即调用 context.assignSplit(SourceSplit, int) 或者 context.assignSplits(SplitsAssignment). */ @Override public void addSplits(List<MySqlSplit> splits) { List<MySqlSplit> unfinishedSplits = new ArrayList<>(); for (MySqlSplit split : splits) { // 判断是否是snapshot还是binlog split if (split.isSnapshotSplit()) { // 如果split已经read完成放入完成集合,否则放入未完成的集合中 MySqlSnapshotSplit snapshotSplit = split.asSnapshotSplit(); if (snapshotSplit.isSnapshotReadFinished()) { finishedUnackedSplits.put(snapshotSplit.splitId(), snapshotSplit); } else { unfinishedSplits.add(split); } } else { if (!split.asBinlogSplit().isCompletedSplit()) { //如果binlog split未完成则加入未完成的列表中,并想spluitEnumerator发送请求binlog split meta的事件 uncompletedBinlogSplits.put(split.splitId(), split.asBinlogSplit()); requestBinlogSplitMetaIfNeeded(split.asBinlogSplit()); } else { // 未完成的split集合删除该split ,未完成的集合表示没有split meta信息 uncompletedBinlogSplits.remove(split.splitId()); // 创建binlog split, 带有table schema信息 MySqlBinlogSplit mySqlBinlogSplit = discoverTableSchemasForBinlogSplit(split.asBinlogSplit()); // 添加到未完成的splits,后续会进行read操作 unfinishedSplits.add(mySqlBinlogSplit); } } } // notify split enumerator again about the finished unacked snapshot splits reportFinishedSnapshotSplitsIfNeed(); // add all un-finished splits (including binlog split) to SourceReaderBase // TODO 当调用spuer.addSplits的时候,会启动fetcherManager,开始读取数据的操作 super.addSplits(unfinishedSplits); } private MySqlBinlogSplit discoverTableSchemasForBinlogSplit(MySqlBinlogSplit split) { final String splitId = split.splitId(); // 如果tableSchema不存在则填充,如果已经存在,则直接返回split即可 if (split.getTableSchemas().isEmpty()) { try (MySqlConnection jdbc = // 静态方法,构建一个mysqlConnection,可以认为就是一个jdbc连接 ,不必深入 DebeziumUtils.createMySqlConnection(sourceConfig.getDbzConfiguration())) { Map<TableId, TableChanges.TableChange> tableSchemas = // 静态方法,根据我们sourceBuilder构建的时候给定的database和tablelist来构建对应的tableId和TableChange,然后我们在面read的时候需要, 不必深入工具类 TableDiscoveryUtils.discoverCapturedTableSchemas(sourceConfig, jdbc); LOG.info("The table schema discovery for binlog split {} success", splitId); // 静态方法,构建一个带有tableSchema的MysqlBinlogSpilt,不必深入 return MySqlBinlogSplit.fillTableSchemas(split, tableSchemas); } catch (SQLException e) { LOG.error("Failed to obtains table schemas due to {}", e.getMessage()); throw new FlinkRuntimeException(e); } } else { LOG.warn("The binlog split {} has table schemas yet, skip the table schema discovery",split); return split; } } // 处理source自定义事件,接收来自splitEumumerator,与splitEumumerator类似 @Override public void handleSourceEvents(SourceEvent sourceEvent) { if (sourceEvent instanceof FinishedSnapshotSplitsAckEvent) { FinishedSnapshotSplitsAckEvent ackEvent = (FinishedSnapshotSplitsAckEvent) sourceEvent; LOG.debug( "The subtask {} receives ack event for {} from enumerator.", subtaskId, ackEvent.getFinishedSplits()); for (String splitId : ackEvent.getFinishedSplits()) { this.finishedUnackedSplits.remove(splitId); } } else if (sourceEvent instanceof FinishedSnapshotSplitsRequestEvent) { // report finished snapshot splits LOG.debug( "The subtask {} receives request to report finished snapshot splits.", subtaskId); reportFinishedSnapshotSplitsIfNeed(); } else if (sourceEvent instanceof BinlogSplitMetaEvent) { LOG.debug( "The subtask {} receives binlog meta with group id {}.", subtaskId, ((BinlogSplitMetaEvent) sourceEvent).getMetaGroupId()); fillMetaDataForBinlogSplit((BinlogSplitMetaEvent) sourceEvent); } else { super.handleSourceEvents(sourceEvent); } } // 发送请求binlogSplit meta的事件 private void requestBinlogSplitMetaIfNeeded(MySqlBinlogSplit binlogSplit) { final String splitId = binlogSplit.splitId(); if (!binlogSplit.isCompletedSplit()) { final int nextMetaGroupId = getNextMetaGroupId( binlogSplit.getFinishedSnapshotSplitInfos().size(), sourceConfig.getSplitMetaGroupSize()); BinlogSplitMetaRequestEvent splitMetaRequestEvent = new BinlogSplitMetaRequestEvent(splitId, nextMetaGroupId); context.sendSourceEventToCoordinator(splitMetaRequestEvent); } else { LOG.info("The meta of binlog split {} has been collected success", splitId); this.addSplits(Arrays.asList(binlogSplit)); } } // 我们发送了请求meta的event后,会收到binlog split meta,我们需要填充至binlogSplit中 private void fillMetaDataForBinlogSplit(BinlogSplitMetaEvent metadataEvent) { MySqlBinlogSplit binlogSplit = uncompletedBinlogSplits.get(metadataEvent.getSplitId()); if (binlogSplit != null) { final int receivedMetaGroupId = metadataEvent.getMetaGroupId(); final int expectedMetaGroupId = getNextMetaGroupId( binlogSplit.getFinishedSnapshotSplitInfos().size(), sourceConfig.getSplitMetaGroupSize()); if (receivedMetaGroupId == expectedMetaGroupId) { List<FinishedSnapshotSplitInfo> metaDataGroup = metadataEvent.getMetaGroup().stream() .map(FinishedSnapshotSplitInfo::deserialize) .collect(Collectors.toList()); uncompletedBinlogSplits.put( binlogSplit.splitId(), MySqlBinlogSplit.appendFinishedSplitInfos(binlogSplit, metaDataGroup)); LOG.info("Fill meta data of group {} to binlog split", metaDataGroup.size()); } else { LOG.warn("Received out of oder binlog meta event for split {}, the received meta group id is {}, but expected is {}, ignore it",metadataEvent.getSplitId(), receivedMetaGroupId,expectedMetaGroupId); } // 继续发送请求meta event requestBinlogSplitMetaIfNeeded(binlogSplit); } else { LOG.warn( "Received binlog meta event for split {}, but the uncompleted split map does not contain it", metadataEvent.getSplitId()); } } // state变成不可变的state @Override protected MySqlSplit toSplitType(String splitId, MySqlSplitState splitState) { return splitState.toMySqlSplit(); } }