前言
flink-cdc源码地址 : https://github.com/ververica/flink-cdc-connectors
flink-cdc不再flink项目中,在flink1.11之后flink引入cdc功能,下面我们以源码深入了解flink-cdc实现原理,
我们主要以flink-cdc-mysql为主,其余代码基本差不太多
事先需要先简单了解一下debezium相关原理,flink-cdc是基于debezium实现的
一点建议 :
- 在阅读源码的时候,我们应该带着问题去思考,然后一步一步去阅读源码,在阅读源码的过程中,不要被一些不重要的点给占用过多的时间精力,并且一遍两遍是不会让我有一个清晰的印象的,毕竟别人多少年多少人的开发,看一两遍就可以理解的,在阅读某个框架源码之前,我们应该已经对该框架原理有一定的理解,然后根据我们的理解去验证他是代码实现的样子,或者带着思考去阅读,为什么这么实现,这么实现的好处是什么等.,其实代码都是一样的,只不过是每个人的实现方式不同,考虑的问题不同而已,
- 要有一定的java基础,熟悉多线程,了解开发使用的相关接口(或者自己看了介绍之后很容易理解),如果基础不牢,更多的是建议先从基础学习,然后写一写代码测试,比如多线程的时候怎么做交互等,自己写一写,在后面阅读源码的时候会更容理解里面内容
- 该内容要首先对cdc有一定的了解,知道cdc的相关原理,flink-cdc的实现基于debezium实现,该框架是开源的,可以先去了解一下,这样对于我们后面内容会更容易理解
谨记: 阅读的时候抓住重点!!!!!!!! , 不要被不重要的内容占用时间
一.项目结构(mysql-cdc为主)
1. 目录结构
- 带有test项目都是用于测试的项目
- 后缀带有cdc的表示一个database的连接器,区分sql与api形式
- flink-format-changelog-json : 用于解析json成RowData的模块
- flink-connector-debezium : 该模块封装debezium以及相关核心代码实现,并且修改了debezium的部分源码
- 每个项目中都有test目录,里面有相关的测试代码,可以自行测试代码debug
2. mysql项目源码包结构
- debezium : debezium用到的相关类
- schema : mysql schema(表结构)相关代码
- source : mysql-cdc source实现代码,包括全量读mysql,分割器,读取器等相关
- table : cdc table实现代码主要以table dynamic factory的实现
- resrouces : 该目录用与spi方式动态加载table factory,用于sql创建table找到对应的工厂类
二.mysql-cdc源码-SourceFucntion的单并行度的实现
- 基于RichSourceFunction的,单并行读取 1,11之前的source接口,已被标记Deprecated
- 基于Source的,多并行度,1.11之后新出的srouce接口,实现要更复杂
我们主要根据单并行度源码基于讲解,这样更方便理解
具体入手我们可以根据文档中的创建source的类来一点一点走
MySqlSource通过构建者模式(23种设计模式)构建,我们只需要知道我们可以设置哪些参数即可,这个比较容易理解 // 通过构建者方式配置任务启动时候所需要的参数 public static class Builder<T> {// MysqlSourcen内部类 private int port = 3306; // default 3306 port private String hostname; private String[] databaseList; private String username; private String password; private Integer serverId; private String serverTimeZone; //时区 private String[] tableList; private Properties dbzProperties; // 传入的dbz引擎所需的属性 private StartupOptions startupOptions = StartupOptions.initial(); // 用于控制开始binlog开始消费位置的参数 private DebeziumDeserializationSchema<T> deserializer; // 用于对数据解析成什么样子如json,String等,定义序列化方式 //上面参数配置完成,通过build构建sourceFunctionn,主要将配置信息封装带properties中,这里面的参数主要是debezium所需要启动参数,配置信息等,如果想要了解可以去debezium官网查看参数的具体细节 public DebeziumSourceFunction<T> build() { Properties props = new Properties(); props.setProperty("connector.class", MySqlConnector.class.getCanonicalName()); // hard code server name, because we don't need to distinguish it, docs: // Logical name that identifies and provides a namespace for the particular MySQL // database // server/cluster being monitored. The logical name should be unique across all other // connectors, // since it is used as a prefix for all Kafka topic names emanating from this connector. // Only alphanumeric characters and underscores should be used. props.setProperty("database.server.name", DATABASE_SERVER_NAME); props.setProperty("database.hostname", checkNotNull(hostname)); props.setProperty("database.user", checkNotNull(username)); props.setProperty("database.password", checkNotNull(password)); props.setProperty("database.port", String.valueOf(port)); props.setProperty("database.history.skip.unparseable.ddl", String.valueOf(true)); // debezium use "long" mode to handle unsigned bigint by default, // but it'll cause lose of precise when the value is larger than 2^63, // so use "precise" mode to avoid it. props.put("bigint.unsigned.handling.mode", "precise"); if (serverId != null) { props.setProperty("database.server.id", String.valueOf(serverId)); } if (databaseList != null) { props.setProperty("database.whitelist", String.join(",", databaseList)); } if (tableList != null) { props.setProperty("table.whitelist", String.join(",", tableList));} if (serverTimeZone != null) { props.setProperty("database.serverTimezone", serverTimeZone); } // 判断开始消费位置,在sqlSourceBuilder中构建的参数,没有则为null DebeziumOffset specificOffset = null; switch (startupOptions.startupMode) { case INITIAL: props.setProperty("snapshot.mode", "initial"); break; case EARLIEST_OFFSET: props.setProperty("snapshot.mode", "never"); break; case LATEST_OFFSET: props.setProperty("snapshot.mode", "schema_only"); break; case SPECIFIC_OFFSETS: props.setProperty("snapshot.mode", "schema_only_recovery"); specificOffset = new DebeziumOffset(); Map<String, String> sourcePartition = new HashMap<>(); sourcePartition.put("server", DATABASE_SERVER_NAME); specificOffset.setSourcePartition(sourcePartition); Map<String, Object> sourceOffset = new HashMap<>(); sourceOffset.put("file", startupOptions.specificOffsetFile); sourceOffset.put("pos", startupOptions.specificOffsetPos); specificOffset.setSourceOffset(sourceOffset); break; case TIMESTAMP: checkNotNull(deserializer); props.setProperty("snapshot.mode", "never"); deserializer = new SeekBinlogToTimestampFilter<>( startupOptions.startupTimestampMillis, deserializer); break; default: throw new UnsupportedOperationException(); } if (dbzProperties != null) { props.putAll(dbzProperties); // Add default configurations for compatibility when set the legacy mysql connector // implementation if (LEGACY_IMPLEMENTATION_VALUE.equals( dbzProperties.get(LEGACY_IMPLEMENTATION_KEY))) { props.put("transforms", "snapshotasinsert"); props.put( "transforms.snapshotasinsert.type", "io.debezium.connector.mysql.transforms.ReadToInsertEvent"); } } // 构建通用的cdc sourceFunction --> 基于richSourceFunction return new DebeziumSourceFunction<>( deserializer, props, specificOffset, new MySqlValidator(props) // mysql校验器,版本信息,binlog是否为row等 ); } }
上面内容主要是以构建source所需要的参数为主,具体我们进入到DebeziumSourceFunction中看看具体实现
// source代码,用于读取binlog,logminer等 // 实现richSourceFuntion完成source端代码的编写,实现ChecckpointFunction用于保证容错相关的内容,实现checkpointListener监听checkpoint的完成状态 public class DebeziumSourceFunction<T> extends RichSourceFunction<T> implements CheckpointedFunction, CheckpointListener, ResultTypeQueryable<T> { // ------------------------------列出一些比较重要的成员变量,不重要的忽略了------------------------------------------ // ----------------------------------State------------------------------------------------- /* 主要用于状态的维护,当任务出现问题重启/手动重启后,维护的一些schema(record中的结构) 未消费的records(在queue中,后面会看到) offset等信息 */ private transient volatile String restoredOffsetState; private transient ListState<byte[]> offsetState; private transient ListState<String> schemaRecordsState; // -----------------------------------Worker----------------------------------------------- /* 一个单线程的线程池,一个debeziumEngine(一个runnable的实现类)用与读取binlog数据 TODO 所以设计到多线程的交互*/ private transient ExecutorService executor; private transient DebeziumEngine<?> engine; /* 一个consumer,用于从engine中读取数据的消费者,并将数据放入handover中 */ private transient DebeziumChangeConsumer changeConsumer; /* 用于从handover中拿取数据 */ private transient DebeziumChangeFetcher<T> debeziumChangeFetcher; /* 两个线程(source,engine)之间交互数据的一个桥梁 */ private transient Handover handover; // ----------------------------------------我们主要介绍srouce的run方法,其他方法主要用于容错相关-------------------------------------- @Override public void run(SourceContext<T> sourceContext) throws Exception { // TODO 用于engine执行的一些相关参数,不是终点内容,如果感兴趣可官网看看说明 properties.setProperty("name", "engine"); properties.setProperty("offset.storage", FlinkOffsetBackingStore.class.getCanonicalName()); if (restoredOffsetState != null) { properties.setProperty(FlinkOffsetBackingStore.OFFSET_STATE_VALUE, restoredOffsetState); } properties.setProperty("include.schema.changes", "false"); properties.setProperty("offset.flush.interval.ms", String.valueOf(Long.MAX_VALUE)); properties.setProperty("tombstones.on.delete", "false"); if (engineInstanceName == null) { engineInstanceName = UUID.randomUUID().toString(); } properties.setProperty( FlinkDatabaseHistory.DATABASE_HISTORY_INSTANCE_NAME, engineInstanceName); properties.setProperty("database.history", determineDatabase().getCanonicalName()); String dbzHeartbeatPrefix = properties.getProperty( Heartbeat.HEARTBEAT_TOPICS_PREFIX.name(), Heartbeat.HEARTBEAT_TOPICS_PREFIX.defaultValueAsString()); this.debeziumChangeFetcher = new DebeziumChangeFetcher<>( sourceContext, deserializer, restoredOffsetState == null, // 是否是快照阶段或者state==null? dbzHeartbeatPrefix, handover); // 创建并配置engine相关参数 this.engine = DebeziumEngine.create(Connect.class) .using(properties)// 参数 .notifying(changeConsumer) // 配饰consumer消费 engine读取的数据(binlog/历史数据) .using(OffsetCommitPolicy.always()) // offset的提交策略 .using( (success, message, error) -> { if (success) { handover.close(); } else { handover.reportError(error); } }) .build(); // 将engine任务提交到线程池中执行 executor.execute(engine); debeziumStarted = true; // mertic相关配置i MetricGroup metricGroup = getRuntimeContext().getMetricGroup(); // .... // 启动fetcher,循环去hanover中拿取最新数据发送下游 debeziumChangeFetcher.runFetchLoop(); } }
上面我们已经看了source.run的基本实现,他的主要处理逻辑在DebeziumChangeConsumer,DebeziumChangeFetcher,Handover中
简单介绍三个类的作用和 主要 方法
和参数
DebeziumChangeConsumer : 用于消费engine读取的数据
/* 该类实现 DebeziumEngine.ChangeConsumer接口,实现handlerBatch方法 相对比较简单, 另外两个成员方法主要是offset相关,非重点内容*/ // engine线程会调用handleBatch方法出传递引擎消费到的数据 public class DebeziumChangeConsumer implements DebeziumEngine.ChangeConsumer<ChangeEvent<SourceRecord, SourceRecord>> { @Override public void handleBatch( List<ChangeEvent<SourceRecord, SourceRecord>> events, RecordCommitter<ChangeEvent<SourceRecord, SourceRecord>> recordCommitter) { try { currentCommitter = recordCommitter; // 间接调用到handover的produce方法,该方法是阻塞的 嘻嘻嘻(如果有历史records未被消费则wait) handover.produce(events); } catch (Throwable e) { // Hold this exception in handover and trigger the fetcher to exit handover.reportError(e); } } }
DebeziumChangeFetcher : 循环从handover中获取consumer从engine读取的最新数据
public class DebeziumChangeFetcher<T> { private final SourceFunction.SourceContext<T> sourceContext; /* 保证数据发送和状态更新的一把锁 */ private final Object checkpointLock; /* 用于将数据转化成我们自定义的类型,如json,string等 */ private final DebeziumDeserializationSchema<T> deserialization; /* 下面自定义的collector*/ private final DebeziumCollector debeziumCollector; /* 见名知意,很好理解 */ private final DebeziumOffset debeziumOffset; /* 用于存储在stateoffset的序列化器*/ private final DebeziumOffsetSerializer stateSerializer; /* 心跳相关*/ private final String heartbeatTopicPrefix; /* 是否恢复的状态,需要消费历史相关数据*/ private boolean isInDbSnapshotPhase; private final Handover handover; public void runFetchLoop() throws Exception { try { // 读取mysql历史的数据,不要被名字所迷惑 if (isInDbSnapshotPhase) { List<ChangeEvent<SourceRecord, SourceRecord>> events = handover.pollNext(); synchronized (checkpointLock) { LOG.info( "Database snapshot phase can't perform checkpoint, acquired Checkpoint lock."); handleBatch(events); // 这里防止snapshot数据无法一次读取完毕,必须保证snapshot数据读取完毕才进入binlog的读取 while (isRunning && isInDbSnapshotPhase) { handleBatch(handover.pollNext()); } } LOG.info("Received record from streaming binlog phase, released checkpoint lock."); } // 到这里表示snapshot的数据读取完毕,开始实时读取binlog数据 while (isRunning) { // 具体的处理数据逻辑 pollNext会阻塞 handleBatch(handover.pollNext()); } } catch (Handover.ClosedException e) { // ignore } private void handleBatch(List<ChangeEvent<SourceRecord, SourceRecord>> changeEvents) throws Exception { if (CollectionUtils.isEmpty(changeEvents)) { return; } this.processTime = System.currentTimeMillis(); for (ChangeEvent<SourceRecord, SourceRecord> event : changeEvents) { SourceRecord record = event.value(); // time相关基本都是metric相关内容,不必较真 updateMessageTimestamp(record); fetchDelay = processTime - messageTimestamp; // 通过心跳机制来更新offset if (isHeartbeatEvent(record)) { synchronized (checkpointLock) { debeziumOffset.setSourcePartition(record.sourcePartition()); debeziumOffset.setSourceOffset(record.sourceOffset()); } continue; } // 根据不同的deserialization对数据做转换,---> 可以看这个,比较容易理解StringDebeziumDeserializationSchema, 内部直接 record.toString即可,就是将debezium读取的record转换成我们想要的格式或者类型,debeziumCollector 就是下面自定义的collector,在deserialize中,会将转换完成的数据放入queue中 deserialization.deserialize(record, debeziumCollector); // 判断数据是否为snapshot的最后一条数据,如果是则在这条数据之后转换到binlog的streaming流程 if (!isSnapshotRecord(record)) { LOG.debug("Snapshot phase finishes."); isInDbSnapshotPhase = false;// runFetchLoop方法中使用 } // 具体发送数据 emitRecordsUnderCheckpointLock( debeziumCollector.records, record.sourcePartition(), record.sourceOffset()); } } private void emitRecordsUnderCheckpointLock( Queue<T> records, Map<String, ?> sourcePartition, Map<String, ?> sourceOffset) { // 同步是保证数据的发送和offset的更新是安全,lock是可重入的(不懂可以百度,java基础内容) synchronized (checkpointLock) { T record; // 循环debeziumCollector的records队列,将队列中的数据依次发送到下游, while ((record = records.poll()) != null) { emitDelay = System.currentTimeMillis() - messageTimestamp; // 通过source的context对象将其发送到下游operator,这里转入了flink的处理逻辑,不再cdc代码之内 sourceContext.collect(record); } debeziumOffset.setSourcePartition(sourcePartition); debeziumOffset.setSourceOffset(sourceOffset); } } // 心跳机制 ,用于更新offset的机制 private boolean isHeartbeatEvent(SourceRecord record) { String topic = record.topic(); return topic != null && topic.startsWith(heartbeatTopicPrefix); } // --------------------------------自定义collector------------------------------------------------------- private class DebeziumCollector implements Collector<T> { private final Queue<T> records = new ArrayDeque<>(); @Override public void collect(T record) { // 将数据放入队列,queue会在别的地方进出列将数据发送下游 records.add(record); } } }
Handover : source线程和engine线程执行中数据交互桥梁
/* 这个类由两个线程访问, pollNext由debeziumFetcher调用,produce有debeziumConsumer调用,因为涉及多线程的调用,单纯的讲代码可能不容易理解,可以去复习一下java多线程知识内容,或者自己debug一下看看调用流程就比较容易理解了 */ @ThreadSafe //表示类是线程安全的,这类涉及engine和source线程两个线程操作,内部的实现保证了线程安全 public class Handover implements Closeable { private static final Logger LOG = LoggerFactory.getLogger(Handover.class); private final Object lock = new Object(); @GuardedBy("lock") // 注解表示该变量受lock的保护, 不是重点勿关注 private List<ChangeEvent<SourceRecord, SourceRecord>> next; @GuardedBy("lock") private Throwable error; private boolean wakeupProducer; /* debeziumFetcher 调用,当没有数据的时候进入wait状态,wait状态的时候cpu是不会调用wait状态的线程,另一个线程就可以占用cpu的全部时间片*/ public List<ChangeEvent<SourceRecord, SourceRecord>> pollNext() throws Exception { // 同步代码块才可以使用wait和notifyAll,为什么使用这种方式,因为只有两个线程,所以这种方式实现简单,如果线程多可以通过juc的lock去做或者其他方式也可以 synchronized (lock) { // 没有数据没有异常则持续循环进入wait状态,为了防止虚假唤醒的情况 while (next == null && error == null) { lock.wait(); } List<ChangeEvent<SourceRecord, SourceRecord>> n = next; // 上面的循环可以退出的时候,说明一定是有数据或者有异常,不存在其他的情况 if (n != null) { // 将next置为null 下面会根据此条件作为判断条件 next = null; // 唤醒其他等待线程,当然只可能是engine线程 lock.notifyAll(); return n; } else { // 将异常抛出 ExceptionUtils.rethrowException(error, error.getMessage()); // 上面方法一定会抛出异常,改代码只是为了去掉编译警告... return Collections.emptyList(); } } } public void produce(final List<ChangeEvent<SourceRecord, SourceRecord>> element) throws InterruptedException { checkNotNull(element); synchronized (lock) { // next不等一直进入wait状态 while (next != null && !wakeupProducer) { lock.wait(); } wakeupProducer = false; // 有异常抛出异常,没异常将接受新数据,并唤醒fetcher线程 if (error != null) { ExceptionUtils.rethrow(error, error.getMessage()); } else { next = element; lock.notifyAll(); } } } }
上面代码即是基于RichSourceFunction实现的cdc主要
代码,其实不算难,但是前人写的代码是已经把很多问题已经考虑进入,对代码的抽象也很好,扩展起来很方便,api设计对与我们开发者来说很容易使用