【万字长文】Flink cdc源码精讲(推荐收藏)(二)

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
实时计算 Flink 版,5000CU*H 3个月
云数据库 RDS MySQL,集群版 2核4GB 100GB
推荐场景:
搭建个人博客
简介: 【万字长文】Flink cdc源码精讲(推荐收藏)

三.mysql-cdc源码-新Source接口的实现


1.11版本之后flink提供了新的source接口,可以提前预习一波https://issues.apache.org/jira/browse/FLINK-10740

简单介绍一下

SourceReader  : 对split的的数据进行读取操作,比如: 读取一个分区,一个块等,当然不只局限与一个分区,根据自己的实现来

SplitEnumerator : 负责对数据源进行切分或者发现分区等,比如: 发现kafka的分区,对文件划分块等

上述的比较简单,实际上比这复杂一点,所以在新的source接口实现一个source是比较难的事情,不过熟悉之后都一样


提前说明 :

一个split我们可以认为是一个切片,mysql-cdc中, 假想情况下 :  一张的一部分中, 比如 开始主键 1 到 结束主键 10 ,那么该split就表示这些数据,在具体读取数据的时候是有readTask来去读,那么他就会通过split标记的点位来进行数据的读取,当然一个readTask不止会执行一个split;

snapshot表示的是读取数据库的历史全量数据

binlog 表示当我们snapshot阶段结束后开始binlog阶段,即我们开始读取的binlog数据了

先执行snapshot阶段,后执行binlog阶段



代码的生成和旧版是相同的,只不过是内部执行的逻辑存在变化,新的source接口实现的cdc代码比较复杂,涉及的内容比较多,可能比较晕,后面自己可以根据源码debug走一走

由于代码过多,主要讲解重点的内容,不重要的跳过了

// 实现了两个接口 source,和 resultTypeQueryable(比较简单就一个获取结果类型信息的接口) , 主要代码还是在source接口的实现
// T 为输出类型,MySqlSplit是mysql的分割器,PendingSplitsState表示Enumerator的状态对象
public class MySqlSource<T>
       implements Source<T, MySqlSplit, PendingSplitsState>, ResultTypeQueryable<T> {
   private final MySqlSourceConfigFactory configFactory;
   private final DebeziumDeserializationSchema<T> deserializationSchema;
   /* 通过构造者模式构建source所需要的参数,简单说明一下,里面的参数,通过MySqlSourceConfigFactory添加参数,在build方法中,将factory作为参数构建出MySqlSource
    -------------------------------------讲解一下对应关系------------------------------------------------
  MySqlSourceConfigFactory 可以根据不同的subtask创建对应的MySqlSourceConfig
  MySqlSourceConfig 可以构建 MySqlConnectorConfig
  MySqlConnection 通过 DebeziumUtil.createMySqlConnection(mySqlSourceConfig.getDbzConfiguration())方法构建
     上面的一个config比较混乱,名字也比较不容易理解,后面用到的时候会简单提一下,这里主要是有一个印象,不要被一些配置搞混
   */
   public static <T> MySqlSourceBuilder<T> builder() {
       return new MySqlSourceBuilder<>();
  }
 // 由MySqlSourceBuilder.build方法创建
   MySqlSource(
           MySqlSourceConfigFactory configFactory,
           DebeziumDeserializationSchema<T> deserializationSchema // 与老版source的deserialization一样
  ) {
       this.configFactory = configFactory;
       this.deserializationSchema = deserializationSchema;
  }
   @Override  // 流批一体的source,表示有界性,新source接口的特性
   public Boundedness getBoundedness() {return Boundedness.CONTINUOUS_UNBOUNDED; }
  /*构建sourceReader */
   @Override
   public SourceReader<T, MySqlSplit> createReader(SourceReaderContext readerContext)
           throws Exception {
    // 前面提到了,根据subtask索引创建对应的config
       MySqlSourceConfig sourceConfig =
               configFactory.createConfig(readerContext.getIndexOfSubtask());
       // 一个阻塞队列,多线程交互用的,不必深入
       FutureCompletingBlockingQueue<RecordsWithSplitIds<SourceRecord>> elementsQueue =
               new FutureCompletingBlockingQueue<>();
    // metric相关
       final MySqlSourceReaderMetrics sourceReaderMetrics =
               new MySqlSourceReaderMetrics(readerContext.metricGroup());
       sourceReaderMetrics.registerMetrics();
    // 通过supplier函数构建一个SplitReader,解耦的作用,主要看里面的MySqlSplitReader实现即可
       Supplier<MySqlSplitReader> splitReaderSupplier =
        // 拿到每个reader的config和对应的subtask index
              () -> new MySqlSplitReader(sourceConfig, readerContext.getIndexOfSubtask());
    // 构建了一个具体的sourceReader
       return new MySqlSourceReader<>(
               elementsQueue,
               splitReaderSupplier,
               new MySqlRecordEmitter<>(
                       deserializationSchema,
                       sourceReaderMetrics,
                       sourceConfig.isIncludeSchemaChanges()),
               readerContext.getConfiguration(),
               readerContext,
               sourceConfig);
  }
   @Override
   public SplitEnumerator<MySqlSplit, PendingSplitsState> createEnumerator(
           SplitEnumeratorContext<MySqlSplit> enumContext) {
    // 因为只会生成一次所以生成一个sourceConfig即可
       MySqlSourceConfig sourceConfig = configFactory.createConfig(0);
// 检验mysql
       final MySqlValidator validator = new MySqlValidator(sourceConfig);
       validator.validate();
       final MySqlSplitAssigner splitAssigner;
    // 判断开始条件如果是initial则先读取mysql table的数据(代码中叫做snapshot),然后再继续读取binlog的数据,如果不是initial状态,则直接从binlog开始读取
       if (sourceConfig.getStartupOptions().startupMode == StartupMode.INITIAL) {
           try (JdbcConnection jdbc = openJdbcConnection(sourceConfig)) {
               final List<TableId> remainingTables = discoverCapturedTables(jdbc, sourceConfig);
               boolean isTableIdCaseSensitive = DebeziumUtils.isTableIdCaseSensitive(jdbc);
               splitAssigner =
                // 里面包含 snapshot和binlog的split逻辑
                       new MySqlHybridSplitAssigner(
                               sourceConfig,
                               enumContext.currentParallelism(),
                               remainingTables,
                               isTableIdCaseSensitive);
          } catch (Exception e) {
               throw new FlinkRuntimeException(
                       "Failed to discover captured tables for enumerator", e);
          }
      } else {
        // 之有binlog的split逻辑
           splitAssigner = new MySqlBinlogSplitAssigner(sourceConfig);
      }
// 创建对应发的SplitEnumerator,用于构建split给reader读取
       return new MySqlSourceEnumerator(enumContext, sourceConfig, splitAssigner);
  }
// 恢复SplitEnumerato,比如任务故障重启,会根据不同的checkpoint恢复SplitEnumerator,用于继续之前未完成的读取操作
   @Override
   public SplitEnumerator<MySqlSplit, PendingSplitsState> restoreEnumerator(
           SplitEnumeratorContext<MySqlSplit> enumContext, PendingSplitsState checkpoint) {
       MySqlSourceConfig sourceConfig = configFactory.createConfig(0);
       final MySqlSplitAssigner splitAssigner;
       if (checkpoint instanceof HybridPendingSplitsState) {
           splitAssigner =
                   new MySqlHybridSplitAssigner(
                           sourceConfig,
                           enumContext.currentParallelism(),
                          (HybridPendingSplitsState) checkpoint);
      } else if (checkpoint instanceof BinlogPendingSplitsState) {
           splitAssigner =
                   new MySqlBinlogSplitAssigner( sourceConfig, (BinlogPendingSplitsState) checkpoint);
      } else {
           throw new UnsupportedOperationException( "Unsupported restored PendingSplitsState: " + checkpoint);
      }
       return new MySqlSourceEnumerator(enumContext, sourceConfig, splitAssigner);
  }
 // -----------------容错相关,不是重点-----------------
   @Override
   public SimpleVersionedSerializer<MySqlSplit> getSplitSerializer() { return MySqlSplitSerializer.INSTANCE; }
   @Override
   public SimpleVersionedSerializer<PendingSplitsState> getEnumeratorCheckpointSerializer() { return new PendingSplitsStateSerializer(getSplitSerializer());}
// 返回值类型的提取
   @Override
   public TypeInformation<T> getProducedType() {return deserializationSchema.getProducedType();}
}

上面的代码中我们可以看到source的实现,主要是构建sourceReader和splitEnumerator,以及容错内容,相关的处理逻辑也封装在相应的对象中,下面我们对其内部逐步剖析

/*在看其他内容之前,我们可以看看如何对mysql进行split操作,在snapshot是通过主键来split的,binlog的只从当前offset位置开始消费,
这里是混合的一个split,另外还存在binlog和snapshot的splitAssigner,不过我们根据主要看看大致逻辑,具体到某一直可以自己阅读理解,
解释一下 : 先读取mysql历史数据即snapshot阶段,然后再进行当前mysql-binlog的位置开始消费,所以这个混合的意义就是先读取全量数据,然后从最新的binlog开始读取,完成cdc读取数据的过程*/
public class MySqlHybridSplitAssigner implements MySqlSplitAssigner {
    private final int splitMetaGroupSize;
    private boolean isBinlogSplitAssigned;
    private final MySqlSnapshotSplitAssigner snapshotSplitAssigner;
    public MySqlHybridSplitAssigner(
            MySqlSourceConfig sourceConfig,
            int currentParallelism,
            List<TableId> remainingTables,
            boolean isTableIdCaseSensitive) {
        this(
             // 创建snapshot split
                new MySqlSnapshotSplitAssigner(
                        sourceConfig, currentParallelism, remainingTables, isTableIdCaseSensitive),
                false,
                sourceConfig.getSplitMetaGroupSize());
    }
    public MySqlHybridSplitAssigner(
            MySqlSourceConfig sourceConfig,
            int currentParallelism,
            HybridPendingSplitsState checkpoint) {
        this(
                new MySqlSnapshotSplitAssigner(
                        sourceConfig, currentParallelism, checkpoint.getSnapshotPendingSplits()),
                checkpoint.isBinlogSplitAssigned(),
                sourceConfig.getSplitMetaGroupSize());
    }
    private MySqlHybridSplitAssigner(
            MySqlSnapshotSplitAssigner snapshotSplitAssigner,
            boolean isBinlogSplitAssigned,
            int splitMetaGroupSize) {
        this.snapshotSplitAssigner = snapshotSplitAssigner;
        this.isBinlogSplitAssigned = isBinlogSplitAssigned;
        this.splitMetaGroupSize = splitMetaGroupSize;
    }
    @Override
    public void open() {
        snapshotSplitAssigner.open();
    }
  // 主要返回下一个split,没有则返回一个空, optional可以jdk8的新特性,用于解决空指针的一个类
    @Override
    public Optional<MySqlSplit> getNext() {
      // 下面的方法可以见名知意,自行理解即可
        if (snapshotSplitAssigner.noMoreSplits()) {
            if (isBinlogSplitAssigned) {
                return Optional.empty();
            } else if (snapshotSplitAssigner.isFinished()) { // 当snapshot完成后,开始binlog的split流程
                // we need to wait snapshot-assigner to be finished before
                // assigning the binlog split. Otherwise, records emitted from binlog split
                // might be out-of-order in terms of same primary key with snapshot splits.
                isBinlogSplitAssigned = true;
                return Optional.of(createBinlogSplit());
            } else {
                // binlog split is not ready by now
                return Optional.empty();
            }
        } else {
            // snapshot assigner still have remaining splits, assign split from it
            return snapshotSplitAssigner.getNext();
        }
    }
   // splitAssigner是否在等待已完成split回调,即onFinishedSplits
    @Override
    public boolean waitingForFinishedSplits() {
        return snapshotSplitAssigner.waitingForFinishedSplits();
    }
  // 获取已完成的split并且包含他的元数据,可以根据已经完成snapshot(snapshot的某一个split)生成对应binlog的split
    @Override
    public List<FinishedSnapshotSplitInfo> getFinishedSplitInfos() {
        return snapshotSplitAssigner.getFinishedSplitInfos();
    }
  // 使用已完成的binlog偏移量来处理已完成的split,用于确定何时生成binlog split以及生成什么binlog split,就是回调
    @Override
    public void onFinishedSplits(Map<String, BinlogOffset> splitFinishedOffsets) {
        snapshotSplitAssigner.onFinishedSplits(splitFinishedOffsets);
    }
   // 向此splitAssigner添加一组spilt,当某些split处理失败,则需要重新添加分割时调用此方法
    @Override
    public void addSplits(Collection<MySqlSplit> splits) {
        List<MySqlSplit> snapshotSplits = new ArrayList<>();
        for (MySqlSplit split : splits) {
            if (split.isSnapshotSplit()) {
                snapshotSplits.add(split);
            } else {
                // we don't store the split, but will re-create binlog split later
                isBinlogSplitAssigned = false;
            }
        }
        snapshotSplitAssigner.addSplits(snapshotSplits);
    }
 // ----------------------------checkpoint 容错相关----------------------------------------
    @Override
    public PendingSplitsState snapshotState(long checkpointId) {
        return new HybridPendingSplitsState(
                snapshotSplitAssigner.snapshotState(checkpointId), isBinlogSplitAssigned);
    }
    @Override
    public void notifyCheckpointComplete(long checkpointId) {
        snapshotSplitAssigner.notifyCheckpointComplete(checkpointId);
    }
    @Override
    public void close() {
        snapshotSplitAssigner.close();
    }
    // ------------------------------------binlog split部分-------------------------------------------
  // 构建biglog split, 就是根据已经完成snapshot split来构建binlog split的一个过程,split代码比较简单可以自行阅读
   // 简单介绍一下 就是描述binlog的split,snapshot的split相关内容,比如snapshot,会按照主键去做split,已经table的schemas相关信息
    private MySqlBinlogSplit createBinlogSplit() {
        final List<MySqlSnapshotSplit> assignedSnapshotSplit =
                snapshotSplitAssigner.getAssignedSplits().values().stream()
                        .sorted(Comparator.comparing(MySqlSplit::splitId))
                        .collect(Collectors.toList());
        Map<String, BinlogOffset> splitFinishedOffsets =
                snapshotSplitAssigner.getSplitFinishedOffsets();
        final List<FinishedSnapshotSplitInfo> finishedSnapshotSplitInfos = new ArrayList<>();
        BinlogOffset minBinlogOffset = null;
        for (MySqlSnapshotSplit split : assignedSnapshotSplit) {
            // find the min binlog offset
            BinlogOffset binlogOffset = splitFinishedOffsets.get(split.splitId());
            if (minBinlogOffset == null || binlogOffset.isBefore(minBinlogOffset)) {
                minBinlogOffset = binlogOffset;
            }
            finishedSnapshotSplitInfos.add(
                    new FinishedSnapshotSplitInfo(
                            split.getTableId(),
                            split.splitId(),
                            split.getSplitStart(),
                            split.getSplitEnd(),
                            binlogOffset));
        }
        boolean divideMetaToGroups = finishedSnapshotSplitInfos.size() > splitMetaGroupSize;
        return new MySqlBinlogSplit(
                BINLOG_SPLIT_ID,
                minBinlogOffset == null ? BinlogOffset.INITIAL_OFFSET : minBinlogOffset,
                BinlogOffset.NO_STOPPING_OFFSET,
                divideMetaToGroups ? new ArrayList<>() : finishedSnapshotSplitInfos,
                new HashMap<>(),
                finishedSnapshotSplitInfos.size());
    }
}

现在我们开始介绍sourceReader和SplitEnumerator

sourceReader :

/* SingleThreadMultiplexSourceReaderBase */
public class MySqlSourceReader<T>
     extends SingleThreadMultiplexSourceReaderBase<SourceRecord, T, MySqlSplit, MySqlSplitState> {
 private static final Logger LOG = LoggerFactory.getLogger(MySqlSourceReader.class);
 private final MySqlSourceConfig sourceConfig;
 private final Map<String, MySqlSnapshotSplit> finishedUnackedSplits;
 private final Map<String, MySqlBinlogSplit> uncompletedBinlogSplits;
 private final int subtaskId;
 public MySqlSourceReader(
         FutureCompletingBlockingQueue<RecordsWithSplitIds<SourceRecord>> elementQueue,
         Supplier<MySqlSplitReader> splitReaderSupplier,
         RecordEmitter<SourceRecord, T, MySqlSplitState> recordEmitter,
         Configuration config,
         SourceReaderContext context,
         MySqlSourceConfig sourceConfig) {
     super(
             elementQueue,
          // 一个单线程的fetcher管理器,做一些读取操作
          // 简单描述一下流程 
          // SingleThreadFetcherManager.createSplitFetcher 构建一个SplitFetcher(实现了Runnable),在SplitFetcher中会构建一个fetcherTask,SplitFetcher.run方法中,循环调用this.runOnce(),this.runOnce()会持续调用fetcherTask.run()读取数据,run()会调用MySqlSplitReader.fetch方法,返回reader读取的数据,并将数据放入到elementQueue中,只要涉及都多线程的代码,都比较晦涩难懂
             new SingleThreadFetcherManager<>(elementQueue, splitReaderSupplier::get),
             recordEmitter,
             config,
             context);
     this.sourceConfig = sourceConfig;
     this.finishedUnackedSplits = new HashMap<>();
     this.uncompletedBinlogSplits = new HashMap<>();
     this.subtaskId = context.getIndexOfSubtask();
 }
  // 启动reader
 @Override
 public void start() {
     if (getNumberOfCurrentlyAssignedSplits() == 0) {
        // 发送split的请求到splitEnumerator,会调用到SplitEnumerator.handleSplitRequest(int, String)方法,会带这并行的reader的subtask id 和hostname
         context.sendSplitRequest();
     }
 }
  // 当reader分配到新的split的时候,会初始化一个split的state
 @Override
 protected MySqlSplitState initializedState(MySqlSplit split) {
     if (split.isSnapshotSplit()) {
         return new MySqlSnapshotSplitState(split.asSnapshotSplit());
     } else {
         return new MySqlBinlogSplitState(split.asBinlogSplit());
     }
 }
 @Override // 容错相关, skip
 public List<MySqlSplit> snapshotState(long checkpointId) {
     // unfinished splits
     List<MySqlSplit> stateSplits = super.snapshotState(checkpointId);
     // add finished snapshot splits that didn't receive ack yet
     stateSplits.addAll(finishedUnackedSplits.values());
     // add binlog splits who are uncompleted
     stateSplits.addAll(uncompletedBinlogSplits.values());
     return stateSplits;
 }
  // 清理处理已完成的split状态,非重点
 @Override
 protected void onSplitFinished(Map<String, MySqlSplitState> finishedSplitIds) {
     for (MySqlSplitState mySqlSplitState : finishedSplitIds.values()) {
         MySqlSplit mySqlSplit = mySqlSplitState.toMySqlSplit();
         checkState(
                 mySqlSplit.isSnapshotSplit(),
                 String.format(
                         "Only snapshot split could finish, but the actual split is binlog split %s",
                         mySqlSplit));
         finishedUnackedSplits.put(mySqlSplit.splitId(), mySqlSplit.asSnapshotSplit());
     }
     reportFinishedSnapshotSplitsIfNeed();
     context.sendSplitRequest();
 }
  /*添加此reader要read的split列表,当splitEnumerator通过splitEnumeratorContext分配一个splut时,将调用此方法
即调用 context.assignSplit(SourceSplit, int) 或者 context.assignSplits(SplitsAssignment).
 */
 @Override
 public void addSplits(List<MySqlSplit> splits) {
     List<MySqlSplit> unfinishedSplits = new ArrayList<>();
     for (MySqlSplit split : splits) {
       // 判断是否是snapshot还是binlog split
         if (split.isSnapshotSplit()) {
            // 如果split已经read完成放入完成集合,否则放入未完成的集合中
             MySqlSnapshotSplit snapshotSplit = split.asSnapshotSplit();
             if (snapshotSplit.isSnapshotReadFinished()) {
                 finishedUnackedSplits.put(snapshotSplit.splitId(), snapshotSplit);
             } else {
                 unfinishedSplits.add(split);
             }
         } else {
             if (!split.asBinlogSplit().isCompletedSplit()) {
                //如果binlog split未完成则加入未完成的列表中,并想spluitEnumerator发送请求binlog split meta的事件
                 uncompletedBinlogSplits.put(split.splitId(), split.asBinlogSplit());
                 requestBinlogSplitMetaIfNeeded(split.asBinlogSplit());
             } else {
                // 未完成的split集合删除该split ,未完成的集合表示没有split meta信息
                 uncompletedBinlogSplits.remove(split.splitId());
                // 创建binlog split, 带有table schema信息
                 MySqlBinlogSplit mySqlBinlogSplit =
                         discoverTableSchemasForBinlogSplit(split.asBinlogSplit());
                // 添加到未完成的splits,后续会进行read操作
                 unfinishedSplits.add(mySqlBinlogSplit);
             }
         }
     }
     // notify split enumerator again about the finished unacked snapshot splits
     reportFinishedSnapshotSplitsIfNeed();
     // add all un-finished splits (including binlog split) to SourceReaderBase
     // TODO 当调用spuer.addSplits的时候,会启动fetcherManager,开始读取数据的操作
     super.addSplits(unfinishedSplits);
 }
 private MySqlBinlogSplit discoverTableSchemasForBinlogSplit(MySqlBinlogSplit split) {
     final String splitId = split.splitId();
   // 如果tableSchema不存在则填充,如果已经存在,则直接返回split即可
     if (split.getTableSchemas().isEmpty()) {
         try (MySqlConnection jdbc =
              // 静态方法,构建一个mysqlConnection,可以认为就是一个jdbc连接 ,不必深入
                 DebeziumUtils.createMySqlConnection(sourceConfig.getDbzConfiguration())) {
             Map<TableId, TableChanges.TableChange> tableSchemas =
                // 静态方法,根据我们sourceBuilder构建的时候给定的database和tablelist来构建对应的tableId和TableChange,然后我们在面read的时候需要, 不必深入工具类
              TableDiscoveryUtils.discoverCapturedTableSchemas(sourceConfig, jdbc);
             LOG.info("The table schema discovery for binlog split {} success", splitId);
            // 静态方法,构建一个带有tableSchema的MysqlBinlogSpilt,不必深入
             return MySqlBinlogSplit.fillTableSchemas(split, tableSchemas);
         } catch (SQLException e) {
             LOG.error("Failed to obtains table schemas due to {}", e.getMessage());
             throw new FlinkRuntimeException(e);
         }
     } else {
         LOG.warn("The binlog split {} has table schemas yet, skip the table schema discovery",split);
         return split;
     }
 }
// 处理source自定义事件,接收来自splitEumumerator,与splitEumumerator类似
 @Override
 public void handleSourceEvents(SourceEvent sourceEvent) {
     if (sourceEvent instanceof FinishedSnapshotSplitsAckEvent) {
         FinishedSnapshotSplitsAckEvent ackEvent = (FinishedSnapshotSplitsAckEvent) sourceEvent;
         LOG.debug(
                 "The subtask {} receives ack event for {} from enumerator.",
                 subtaskId,
                 ackEvent.getFinishedSplits());
         for (String splitId : ackEvent.getFinishedSplits()) {
             this.finishedUnackedSplits.remove(splitId);
         }
     } else if (sourceEvent instanceof FinishedSnapshotSplitsRequestEvent) {
         // report finished snapshot splits
         LOG.debug(
                 "The subtask {} receives request to report finished snapshot splits.",
                 subtaskId);
         reportFinishedSnapshotSplitsIfNeed();
     } else if (sourceEvent instanceof BinlogSplitMetaEvent) {
         LOG.debug(
                 "The subtask {} receives binlog meta with group id {}.",
                 subtaskId,
                 ((BinlogSplitMetaEvent) sourceEvent).getMetaGroupId());
         fillMetaDataForBinlogSplit((BinlogSplitMetaEvent) sourceEvent);
     } else {
         super.handleSourceEvents(sourceEvent);
     }
 }
   // 发送请求binlogSplit meta的事件
 private void requestBinlogSplitMetaIfNeeded(MySqlBinlogSplit binlogSplit) {
     final String splitId = binlogSplit.splitId();
     if (!binlogSplit.isCompletedSplit()) {
         final int nextMetaGroupId =
                 getNextMetaGroupId(
                         binlogSplit.getFinishedSnapshotSplitInfos().size(),
                         sourceConfig.getSplitMetaGroupSize());
         BinlogSplitMetaRequestEvent splitMetaRequestEvent =
                 new BinlogSplitMetaRequestEvent(splitId, nextMetaGroupId);
         context.sendSourceEventToCoordinator(splitMetaRequestEvent);
     } else {
         LOG.info("The meta of binlog split {} has been collected success", splitId);
         this.addSplits(Arrays.asList(binlogSplit));
     }
 }
  // 我们发送了请求meta的event后,会收到binlog split meta,我们需要填充至binlogSplit中
 private void fillMetaDataForBinlogSplit(BinlogSplitMetaEvent metadataEvent) {
     MySqlBinlogSplit binlogSplit = uncompletedBinlogSplits.get(metadataEvent.getSplitId());
     if (binlogSplit != null) {
         final int receivedMetaGroupId = metadataEvent.getMetaGroupId();
         final int expectedMetaGroupId =
                 getNextMetaGroupId(
                         binlogSplit.getFinishedSnapshotSplitInfos().size(),
                         sourceConfig.getSplitMetaGroupSize());
         if (receivedMetaGroupId == expectedMetaGroupId) {
             List<FinishedSnapshotSplitInfo> metaDataGroup =
                     metadataEvent.getMetaGroup().stream()
                             .map(FinishedSnapshotSplitInfo::deserialize)
                             .collect(Collectors.toList());
             uncompletedBinlogSplits.put(
                     binlogSplit.splitId(),
                     MySqlBinlogSplit.appendFinishedSplitInfos(binlogSplit, metaDataGroup));
             LOG.info("Fill meta data of group {} to binlog split", metaDataGroup.size());
         } else {
             LOG.warn("Received out of oder binlog meta event for split {}, the received meta group id is {}, but expected is {}, ignore it",metadataEvent.getSplitId(), receivedMetaGroupId,expectedMetaGroupId);
         }
       // 继续发送请求meta event
         requestBinlogSplitMetaIfNeeded(binlogSplit);
     } else {
         LOG.warn( "Received binlog meta event for split {}, but the uncompleted split map does not contain it", metadataEvent.getSplitId());
     }
 }
   // state变成不可变的state
 @Override
 protected MySqlSplit toSplitType(String splitId, MySqlSplitState splitState) { return splitState.toMySqlSplit(); }
}
相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
17天前
|
DataWorks 安全 关系型数据库
DataWorks产品使用合集之使用Flink CDC读取PostgreSQL数据时如何指定编码格式
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
12天前
|
Oracle 关系型数据库 Java
实时计算 Flink版产品使用问题之源码 deploy,生成带有时间戳的jar包,如何修改配置信息
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
12天前
|
SQL 监控 关系型数据库
实时计算 Flink版产品使用问题之使用mysql cdc配置StartupOptions.initial()全量之后就不增量了,是什么原因
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
1月前
|
消息中间件 关系型数据库 MySQL
Apache Flink CDC 3.1.0 发布公告
Apache Flink 社区很高兴地宣布发布 Flink CDC 3.1.0!
583 1
Apache Flink CDC 3.1.0 发布公告
|
10天前
|
SQL 关系型数据库 MySQL
实时计算 Flink版操作报错合集之CDC任务在异常后整个record sent从0初始化开始,是什么导致的
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
404 0
|
10天前
|
关系型数据库 数据库 流计算
实时计算 Flink版操作报错合集之在使用Flink CDC TiDB Connector时,无法获取到事件,该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
345 0
|
10天前
|
关系型数据库 MySQL 数据库
实时计算 Flink版操作报错合集之下载了mysql的cdc的demo,在本地调试时,报错:找不到这个包,该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
105 0
|
1月前
|
SQL Java 数据处理
实时计算 Flink版产品使用合集之如何写一个opengauss的cdc
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStreamAPI、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
16天前
|
消息中间件 关系型数据库 MySQL
Flink CDC 最佳实践(以 MySQL 为例)
Flink CDC 最佳实践(以 MySQL 为例)
|
17天前
|
消息中间件 关系型数据库 MySQL