splitEnumerator :
- 处理sourceReader的split请求
- 将split分配给sourceReader
// 继承SplitEnumerator,并重写其方法 public class MySqlSourceEnumerator implements SplitEnumerator<MySqlSplit, PendingSplitsState> { private static final long CHECK_EVENT_INTERVAL = 30_000L; private final SplitEnumeratorContext<MySqlSplit> context; private final MySqlSourceConfig sourceConfig; private final MySqlSplitAssigner splitAssigner; // using TreeSet to prefer assigning binlog split to task-0 for easier debug private final TreeSet<Integer> readersAwaitingSplit; private List<List<FinishedSnapshotSplitInfo>> binlogSplitMeta; public MySqlSourceEnumerator( SplitEnumeratorContext<MySqlSplit> context, MySqlSourceConfig sourceConfig, MySqlSplitAssigner splitAssigner) { // source.createEnumerator传入的context对象 this.context = context; this.sourceConfig = sourceConfig; this.splitAssigner = splitAssigner; this.readersAwaitingSplit = new TreeSet<>(); } @Override public void start() { splitAssigner.open(); //调用splitAssigner的open方法,可以具体看看每个splitAssigner的实现 // 注册一个Callable,定期调用,主要的作用就是当reader出现通信失败或者故障重启之后,检查是否有错过的通知时间,不是终点 this.context.callAsync( this::getRegisteredReader, this::syncWithReaders, CHECK_EVENT_INTERVAL, CHECK_EVENT_INTERVAL); } // 处理split的请求,当有具体给定子subtask id的reader调用SourceReaderContext.sendSplitRequest()方法时,将调用此方法。 @Override public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) { if (!context.registeredReaders().containsKey(subtaskId)) { // reader failed between sending the request and now. skip this request. return; } // 将请求的taskId放入等待列表 readersAwaitingSplit.add(subtaskId); // 对等待列表的subtask进行fen'pei assignSplits(); } // 将split添加至splitEnumerator,只有在最后一个成功的checkpoint之后,分配的spilt才会出现此情况,说明需要重新处理. @Override public void addSplitsBack(List<MySqlSplit> splits, int subtaskId) { LOG.debug("MySQL Source Enumerator adds splits back: {}", splits); splitAssigner.addSplits(splits); } // 处理sourceReader的自定义event @Override public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) { // sourceReader发送给splitEnumerator的SourceEvent通知snapshot的split已经读取完成,binlog的位置是一致的 if (sourceEvent instanceof FinishedSnapshotSplitsReportEvent) { LOG.info( "The enumerator receives finished split offsets {} from subtask {}.", sourceEvent, subtaskId); FinishedSnapshotSplitsReportEvent reportEvent = (FinishedSnapshotSplitsReportEvent) sourceEvent; Map<String, BinlogOffset> finishedOffsets = reportEvent.getFinishedOffsets(); // 上面splitAssigner介绍过 splitAssigner.onFinishedSplits(finishedOffsets); // 返回ACK事件返回给redaer的表示已经确认了snapshot FinishedSnapshotSplitsAckEvent ackEvent = new FinishedSnapshotSplitsAckEvent(new ArrayList<>(finishedOffsets.keySet())); context.sendEventToSourceReader(subtaskId, ackEvent); } // sourceReader发送给splitEnumerator的SourceEvent用来拉取binlog元数据,也就是发送BinlogSplitMetaEvent else if (sourceEvent instanceof BinlogSplitMetaRequestEvent) { LOG.debug( "The enumerator receives request for binlog split meta from subtask {}.", subtaskId); // 发送binlog meta sendBinlogMeta(subtaskId, (BinlogSplitMetaRequestEvent) sourceEvent); } } @Override public PendingSplitsState snapshotState(long checkpointId) { return splitAssigner.snapshotState(checkpointId); } @Override public void notifyCheckpointComplete(long checkpointId) { splitAssigner.notifyCheckpointComplete(checkpointId); // binlog split may be available after checkpoint complete assignSplits(); } // ------------------------------------------------------------------------------------------ // 为等待列表的subtask分配 private void assignSplits() { // treeSet返回的iter是排好序的,即按照subtask id顺序依次处理 final Iterator<Integer> awaitingReader = readersAwaitingSplit.iterator(); while (awaitingReader.hasNext()) { int nextAwaiting = awaitingReader.next(); // 如果reader再次请求的split在此期间失败,则将其从等待列表中删除 if (!context.registeredReaders().containsKey(nextAwaiting)) { awaitingReader.remove(); continue; } Optional<MySqlSplit> split = splitAssigner.getNext(); if (split.isPresent()) { final MySqlSplit mySqlSplit = split.get(); // 为subtask分配split context.assignSplit(mySqlSplit, nextAwaiting); awaitingReader.remove(); LOG.info("Assign split {} to subtask {}", mySqlSplit, nextAwaiting); } else { // there is no available splits by now, skip assigning // 前面splitAssigner中会分配空值,在这里被过滤掉 break; } } } // 发送给binlog meta event到reader private void sendBinlogMeta(int subTask, BinlogSplitMetaRequestEvent requestEvent) { // 如果binlog meta ==null 则进行meta的初始化操作 if (binlogSplitMeta == null) { final List<FinishedSnapshotSplitInfo> finishedSnapshotSplitInfos = splitAssigner.getFinishedSplitInfos(); if (finishedSnapshotSplitInfos.isEmpty()) { LOG.error( "The assigner offer empty finished split information, this should not happen"); throw new FlinkRuntimeException( "The assigner offer empty finished split information, this should not happen"); } binlogSplitMeta = Lists.partition( finishedSnapshotSplitInfos, sourceConfig.getSplitMetaGroupSize()); } final int requestMetaGroupId = requestEvent.getRequestMetaGroupId(); if (binlogSplitMeta.size() > requestMetaGroupId) { // 获取对应的FinishedSnapshotSplitInfo列表,并将其封序列化,生成meta event List<FinishedSnapshotSplitInfo> metaToSend = binlogSplitMeta.get(requestMetaGroupId); BinlogSplitMetaEvent metadataEvent = new BinlogSplitMetaEvent( requestEvent.getSplitId(), requestMetaGroupId, metaToSend.stream() .map(FinishedSnapshotSplitInfo::serialize) .collect(Collectors.toList())); // 将生成的meta evnet 发送给reader context.sendEventToSourceReader(subTask, metadataEvent); } else { LOG.error( "Received invalid request meta group id {}, the invalid meta group id range is [0, {}]", requestMetaGroupId, binlogSplitMeta.size() - 1); } } }
上面两个类中我们没有看到具体的读数据逻辑,实际上当系统调用addSplit()的时候就开始启动任务了,由于调用链比较长,为了方便观看,我这里直接截图看代码,看看代码是怎么开始进入执行逻辑的,
- sourceReader中创建的fetcherManager,存入父类成员变量中
- 当sourceReader调用addSplits的会调用父类的addSplits方法
- 调用我们传入的fetcherManager的addSplits方法
- 调用fetcherManager的addSplits方法时,子类没有覆写父类方法,直接进入父类方法,这里直接进入父类的splits方法,如果fetcher没有启动,则创建fetcher(一个runnable对象),然后提交到线程池执行任务
上面可以看到我们的fetcher已经启动了,那我们就看看fetcher具体做了什么样子的事情(要记住上面传入了一个队列,fetcher中读取的数据会放入队列中),createFetcher时候,实际是创建的SplitFetcher,有flink新source中提供类
/*由于SplitFetcher是一个runnable对象,所以我们直接进入run方法看看做了什么即可 先介绍一下流程 : 1. 当构建fetcher的时候在构造方法中,我们传递了一个splitReader,这个是负责真实读取数据的(实际上是mysqlSplitReader) 2. fetcher构造方法中构建了一个FetcherTask,run之后会开始task的执行,如果还记得的话 我们在startFetcher()之后调用了一个fetcher的addSplit方法,该方法会将splits构建成tasks加入的taskQueue 3. 里面会有一些空闲,唤醒等不重要的逻辑,我给删除掉了,不重要,不要占用过多时间,因为非cdc内容 */ private static final Logger LOG = LoggerFactory.getLogger(SplitFetcher.class); private static final SplitFetcherTask WAKEUP_TASK = new DummySplitFetcherTask("WAKEUP_TASK"); private final int id; private final BlockingDeque<SplitFetcherTask> taskQueue; // track the assigned splits so we can suspend the reader when there is no splits assigned. private final Map<String, SplitT> assignedSplits; private final FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue; private final SplitReader<E, SplitT> splitReader; private final Consumer<Throwable> errorHandler; private final Runnable shutdownHook; private final AtomicBoolean wakeUp; private final AtomicBoolean closed; private final FetchTask<E, SplitT> fetchTask; private volatile SplitFetcherTask runningTask = null; private final Object lock = new Object(); SplitFetcher( int id, FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue, SplitReader<E, SplitT> splitReader, Consumer<Throwable> errorHandler, Runnable shutdownHook, Consumer<Collection<String>> splitFinishedHook) { this.id = id; // task队列,包含WAKEUP_TASK(特定情况下唤醒fetcher线程用),以及我们读取任务的task this.taskQueue = new LinkedBlockingDeque<>(); // 读取的数据会放入该队列 this.elementsQueue = elementsQueue; this.assignedSplits = new HashMap<>(); this.splitReader = splitReader; this.errorHandler = errorHandler; this.shutdownHook = shutdownHook; this.isIdle = true; this.wakeUp = new AtomicBoolean(false); this.closed = new AtomicBoolean(false); // 对传入的splitReader封装到fetcherTask,以便任务启动的时候直接执行任务 this.fetchTask = new FetchTask<>( splitReader, elementsQueue, ids -> { ids.forEach(assignedSplits::remove); splitFinishedHook.accept(ids); LOG.info("Finished reading from splits {}", ids); }, id); } @Override public void run() { LOG.info("Starting split fetcher {}", id); try { while (!closed.get()) { // 每次循环的距离逻辑 runOnce(); } } catch (Throwable t) { errorHandler.accept(t); } finally { try { splitReader.close(); } catch (Exception e) { errorHandler.accept(e); } LOG.info("Split fetcher {} exited.", id); // This executes after possible errorHandler.accept(t). If these operations bear // a happens-before relation, then we can checking side effect of errorHandler.accept(t) // to know whether it happened after observing side effect of shutdownHook.run(). shutdownHook.run(); } } /** Package private method to help unit test. */ void runOnce() { try { if (shouldRunFetchTask()) { runningTask = fetchTask; } else { runningTask = taskQueue.take(); } LOG.debug("Prepare to run {}", runningTask); // 这里运行task,我们下面直接去task中看看具体的操作逻辑即可 if (!wakeUp.get() && runningTask.run()) { LOG.debug("Finished running task {}", runningTask); // the task has finished running. Set it to null so it won't be enqueued. runningTask = null; checkAndSetIdle(); } } catch (Exception e) { throw new RuntimeException( String.format( "SplitFetcher thread %d received unexpected exception while polling the records", id), e); } // If the task is not null that means this task needs to be re-executed. This only // happens when the task is the fetching task or the task was interrupted. maybeEnqueueTask(runningTask); synchronized (wakeUp) { // Set the running task to null. It is necessary for the shutdown method to avoid // unnecessarily interrupt the running task. runningTask = null; // Set the wakeUp flag to false. wakeUp.set(false); LOG.debug("Cleaned wakeup flag."); } } /* 在fetcher创建的时候调用了该方法,或者已经运行之后调用的该方法在上面截图的流程中有代码 */ public void addSplits(List<SplitT> splitsToAdd) { enqueueTask(new AddSplitsTask<>(splitReader, splitsToAdd, assignedSplits)); wakeUp(true); } public void enqueueTask(SplitFetcherTask task) { synchronized (lock) { taskQueue.offer(task); isIdle = false; } } }