【万字长文】Flink cdc源码精讲(推荐收藏)(一)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
云数据库 RDS MySQL Serverless,0.5-2RCU 50GB
简介: 【万字长文】Flink cdc源码精讲(推荐收藏)

前言

flink-cdc源码地址 : https://github.com/ververica/flink-cdc-connectors

flink-cdc不再flink项目中,在flink1.11之后flink引入cdc功能,下面我们以源码深入了解flink-cdc实现原理,

我们主要以flink-cdc-mysql为主,其余代码基本差不太多

事先需要先简单了解一下debezium相关原理,flink-cdc是基于debezium实现的

一点建议 :

  • 在阅读源码的时候,我们应该带着问题去思考,然后一步一步去阅读源码,在阅读源码的过程中,不要被一些不重要的点给占用过多的时间精力,并且一遍两遍是不会让我有一个清晰的印象的,毕竟别人多少年多少人的开发,看一两遍就可以理解的,在阅读某个框架源码之前,我们应该已经对该框架原理有一定的理解,然后根据我们的理解去验证他是代码实现的样子,或者带着思考去阅读,为什么这么实现,这么实现的好处是什么等.,其实代码都是一样的,只不过是每个人的实现方式不同,考虑的问题不同而已,
  • 要有一定的java基础,熟悉多线程,了解开发使用的相关接口(或者自己看了介绍之后很容易理解),如果基础不牢,更多的是建议先从基础学习,然后写一写代码测试,比如多线程的时候怎么做交互等,自己写一写,在后面阅读源码的时候会更容理解里面内容
  • 该内容要首先对cdc有一定的了解,知道cdc的相关原理,flink-cdc的实现基于debezium实现,该框架是开源的,可以先去了解一下,这样对于我们后面内容会更容易理解

     谨记: 阅读的时候抓住重点!!!!!!!! , 不要被不重要的内容占用时间


一.项目结构(mysql-cdc为主)


640.png


1. 目录结构

  • 带有test项目都是用于测试的项目
  • 后缀带有cdc的表示一个database的连接器,区分sql与api形式
  • flink-format-changelog-json : 用于解析json成RowData的模块
  • flink-connector-debezium : 该模块封装debezium以及相关核心代码实现,并且修改了debezium的部分源码
  • 每个项目中都有test目录,里面有相关的测试代码,可以自行测试代码debug

2. mysql项目源码包结构


640.png


  • debezium : debezium用到的相关类
  • schema :  mysql schema(表结构)相关代码
  • source : mysql-cdc source实现代码,包括全量读mysql,分割器,读取器等相关
  • table : cdc table实现代码主要以table dynamic factory的实现
  • resrouces : 该目录用与spi方式动态加载table factory,用于sql创建table找到对应的工厂类


二.mysql-cdc源码-SourceFucntion的单并行度的实现

  • 基于RichSourceFunction的,单并行读取 1,11之前的source接口,已被标记Deprecated
  • 基于Source的,多并行度,1.11之后新出的srouce接口,实现要更复杂

我们主要根据单并行度源码基于讲解,这样更方便理解

具体入手我们可以根据文档中的创建source的类来一点一点走

MySqlSource通过构建者模式(23种设计模式)构建,我们只需要知道我们可以设置哪些参数即可,这个比较容易理解
 // 通过构建者方式配置任务启动时候所需要的参数
  public static class Builder<T> {// MysqlSourcen内部类
       private int port = 3306; // default 3306 port
       private String hostname;
       private String[] databaseList;
       private String username;
       private String password;
       private Integer serverId;
       private String serverTimeZone; //时区
       private String[] tableList;
       private Properties dbzProperties; // 传入的dbz引擎所需的属性
       private StartupOptions startupOptions = StartupOptions.initial(); // 用于控制开始binlog开始消费位置的参数
       private DebeziumDeserializationSchema<T> deserializer; // 用于对数据解析成什么样子如json,String等,定义序列化方式
     //上面参数配置完成,通过build构建sourceFunctionn,主要将配置信息封装带properties中,这里面的参数主要是debezium所需要启动参数,配置信息等,如果想要了解可以去debezium官网查看参数的具体细节
    public DebeziumSourceFunction<T> build() {
           Properties props = new Properties();
           props.setProperty("connector.class", MySqlConnector.class.getCanonicalName());
           // hard code server name, because we don't need to distinguish it, docs:
           // Logical name that identifies and provides a namespace for the particular MySQL
           // database
           // server/cluster being monitored. The logical name should be unique across all other
           // connectors,
           // since it is used as a prefix for all Kafka topic names emanating from this connector.
           // Only alphanumeric characters and underscores should be used.
           props.setProperty("database.server.name", DATABASE_SERVER_NAME);
           props.setProperty("database.hostname", checkNotNull(hostname));
           props.setProperty("database.user", checkNotNull(username));
           props.setProperty("database.password", checkNotNull(password));
           props.setProperty("database.port", String.valueOf(port));
           props.setProperty("database.history.skip.unparseable.ddl", String.valueOf(true));
           // debezium use "long" mode to handle unsigned bigint by default,
           // but it'll cause lose of precise when the value is larger than 2^63,
           // so use "precise" mode to avoid it.
           props.put("bigint.unsigned.handling.mode", "precise");
           if (serverId != null) { props.setProperty("database.server.id", String.valueOf(serverId)); }
           if (databaseList != null) {  props.setProperty("database.whitelist", String.join(",", databaseList)); }
           if (tableList != null) { props.setProperty("table.whitelist", String.join(",", tableList));}
           if (serverTimeZone != null) {  props.setProperty("database.serverTimezone", serverTimeZone);   }
      // 判断开始消费位置,在sqlSourceBuilder中构建的参数,没有则为null
           DebeziumOffset specificOffset = null;
           switch (startupOptions.startupMode) {
               case INITIAL:
                   props.setProperty("snapshot.mode", "initial");
                   break;
               case EARLIEST_OFFSET:
                   props.setProperty("snapshot.mode", "never");
                   break;
               case LATEST_OFFSET:
                   props.setProperty("snapshot.mode", "schema_only");
                   break;
               case SPECIFIC_OFFSETS:      
                   props.setProperty("snapshot.mode", "schema_only_recovery");
                   specificOffset = new DebeziumOffset();
                   Map<String, String> sourcePartition = new HashMap<>();
                   sourcePartition.put("server", DATABASE_SERVER_NAME);
                   specificOffset.setSourcePartition(sourcePartition);
                   Map<String, Object> sourceOffset = new HashMap<>();
                   sourceOffset.put("file", startupOptions.specificOffsetFile);
                   sourceOffset.put("pos", startupOptions.specificOffsetPos);
                   specificOffset.setSourceOffset(sourceOffset);
                   break;
               case TIMESTAMP:
                   checkNotNull(deserializer);
                   props.setProperty("snapshot.mode", "never");
                   deserializer =
                           new SeekBinlogToTimestampFilter<>(
                                   startupOptions.startupTimestampMillis, deserializer);
                   break;
               default:
                   throw new UnsupportedOperationException();
          }
           if (dbzProperties != null) {
               props.putAll(dbzProperties);
               // Add default configurations for compatibility when set the legacy mysql connector
               // implementation
               if (LEGACY_IMPLEMENTATION_VALUE.equals(
                       dbzProperties.get(LEGACY_IMPLEMENTATION_KEY))) {
                   props.put("transforms", "snapshotasinsert");
                   props.put(
                           "transforms.snapshotasinsert.type",
                           "io.debezium.connector.mysql.transforms.ReadToInsertEvent");
              }
          }
       // 构建通用的cdc sourceFunction --> 基于richSourceFunction
           return new DebeziumSourceFunction<>(
                   deserializer, props, specificOffset,
             new MySqlValidator(props) // mysql校验器,版本信息,binlog是否为row等
          );
      }
  }

上面内容主要是以构建source所需要的参数为主,具体我们进入到DebeziumSourceFunction中看看具体实现

// source代码,用于读取binlog,logminer等
// 实现richSourceFuntion完成source端代码的编写,实现ChecckpointFunction用于保证容错相关的内容,实现checkpointListener监听checkpoint的完成状态
public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
       implements CheckpointedFunction, CheckpointListener, ResultTypeQueryable<T> {
 // ------------------------------列出一些比较重要的成员变量,不重要的忽略了------------------------------------------
   // ----------------------------------State-------------------------------------------------
/* 主要用于状态的维护,当任务出现问题重启/手动重启后,维护的一些schema(record中的结构) 未消费的records(在queue中,后面会看到) offset等信息 */
   private transient volatile String restoredOffsetState;
   private transient ListState<byte[]> offsetState;
   private transient ListState<String> schemaRecordsState;
   // -----------------------------------Worker-----------------------------------------------
/* 一个单线程的线程池,一个debeziumEngine(一个runnable的实现类)用与读取binlog数据  
TODO 所以设计到多线程的交互*/
   private transient ExecutorService executor;
   private transient DebeziumEngine<?> engine;
   /* 一个consumer,用于从engine中读取数据的消费者,并将数据放入handover中 */
   private transient DebeziumChangeConsumer changeConsumer;
   /* 用于从handover中拿取数据 */
   private transient DebeziumChangeFetcher<T> debeziumChangeFetcher;
   /* 两个线程(source,engine)之间交互数据的一个桥梁 */
   private transient Handover handover;
 // ----------------------------------------我们主要介绍srouce的run方法,其他方法主要用于容错相关--------------------------------------
   @Override
   public void run(SourceContext<T> sourceContext) throws Exception {
      // TODO 用于engine执行的一些相关参数,不是终点内容,如果感兴趣可官网看看说明
       properties.setProperty("name", "engine");
       properties.setProperty("offset.storage", FlinkOffsetBackingStore.class.getCanonicalName());
       if (restoredOffsetState != null) {
           properties.setProperty(FlinkOffsetBackingStore.OFFSET_STATE_VALUE, restoredOffsetState);
      }
       properties.setProperty("include.schema.changes", "false");
       properties.setProperty("offset.flush.interval.ms", String.valueOf(Long.MAX_VALUE));
       properties.setProperty("tombstones.on.delete", "false");
       if (engineInstanceName == null) {
           engineInstanceName = UUID.randomUUID().toString();
      }    
       properties.setProperty(
               FlinkDatabaseHistory.DATABASE_HISTORY_INSTANCE_NAME, engineInstanceName);  
       properties.setProperty("database.history", determineDatabase().getCanonicalName());
       String dbzHeartbeatPrefix =
               properties.getProperty(
                       Heartbeat.HEARTBEAT_TOPICS_PREFIX.name(),
                       Heartbeat.HEARTBEAT_TOPICS_PREFIX.defaultValueAsString());
       this.debeziumChangeFetcher =
               new DebeziumChangeFetcher<>(
                       sourceContext,
                       deserializer,
                       restoredOffsetState == null, // 是否是快照阶段或者state==null?
                       dbzHeartbeatPrefix,
                       handover);
       // 创建并配置engine相关参数
       this.engine =
               DebeziumEngine.create(Connect.class)
                      .using(properties)// 参数
                      .notifying(changeConsumer) // 配饰consumer消费 engine读取的数据(binlog/历史数据)
                      .using(OffsetCommitPolicy.always()) // offset的提交策略
                      .using(
                              (success, message, error) -> {
                                   if (success) {
                                       handover.close();
                                  } else {
                                       handover.reportError(error);
                                  }
                              })
                      .build();
       // 将engine任务提交到线程池中执行
       executor.execute(engine);
       debeziumStarted = true;
       // mertic相关配置i
       MetricGroup metricGroup = getRuntimeContext().getMetricGroup();
       // ....
       // 启动fetcher,循环去hanover中拿取最新数据发送下游
       debeziumChangeFetcher.runFetchLoop();
  }
}


上面我们已经看了source.run的基本实现,他的主要处理逻辑在DebeziumChangeConsumer,DebeziumChangeFetcher,Handover中

简单介绍三个类的作用和 主要 方法参数

DebeziumChangeConsumer : 用于消费engine读取的数据

/* 该类实现 DebeziumEngine.ChangeConsumer接口,实现handlerBatch方法 相对比较简单, 另外两个成员方法主要是offset相关,非重点内容*/
// engine线程会调用handleBatch方法出传递引擎消费到的数据
public class DebeziumChangeConsumer
   implements DebeziumEngine.ChangeConsumer<ChangeEvent<SourceRecord, SourceRecord>> {
@Override
public void handleBatch(
   List<ChangeEvent<SourceRecord, SourceRecord>> events,
   RecordCommitter<ChangeEvent<SourceRecord, SourceRecord>> recordCommitter) {
try {
   currentCommitter = recordCommitter;
  // 间接调用到handover的produce方法,该方法是阻塞的 嘻嘻嘻(如果有历史records未被消费则wait)
   handover.produce(events);
} catch (Throwable e) {
   // Hold this exception in handover and trigger the fetcher to exit
   handover.reportError(e);
      }
 }
}

DebeziumChangeFetcher : 循环从handover中获取consumer从engine读取的最新数据

public class DebeziumChangeFetcher<T> {
private final SourceFunction.SourceContext<T> sourceContext;
/* 保证数据发送和状态更新的一把锁 */
private final Object checkpointLock;
/* 用于将数据转化成我们自定义的类型,如json,string等 */
private final DebeziumDeserializationSchema<T> deserialization;
/* 下面自定义的collector*/
private final DebeziumCollector debeziumCollector;
 /* 见名知意,很好理解 */
private final DebeziumOffset debeziumOffset;
/* 用于存储在stateoffset的序列化器*/
private final DebeziumOffsetSerializer stateSerializer;
/* 心跳相关*/
private final String heartbeatTopicPrefix;
/* 是否恢复的状态,需要消费历史相关数据*/
private boolean isInDbSnapshotPhase;
private final Handover handover;
public void runFetchLoop() throws Exception {
  try {
      // 读取mysql历史的数据,不要被名字所迷惑
      if (isInDbSnapshotPhase) {
          List<ChangeEvent<SourceRecord, SourceRecord>> events = handover.pollNext();
             synchronized (checkpointLock) {
                 LOG.info(
                         "Database snapshot phase can't perform checkpoint, acquired Checkpoint lock.");              
                 handleBatch(events);
                 // 这里防止snapshot数据无法一次读取完毕,必须保证snapshot数据读取完毕才进入binlog的读取
                 while (isRunning && isInDbSnapshotPhase) {
                     handleBatch(handover.pollNext());
                 }
             }
             LOG.info("Received record from streaming binlog phase, released checkpoint lock.");
             }
         // 到这里表示snapshot的数据读取完毕,开始实时读取binlog数据
         while (isRunning) {
             // 具体的处理数据逻辑   pollNext会阻塞
             handleBatch(handover.pollNext());
         }
         } catch (Handover.ClosedException e) {
         // ignore
         }
 private void handleBatch(List<ChangeEvent<SourceRecord, SourceRecord>> changeEvents)
        throws Exception {
    if (CollectionUtils.isEmpty(changeEvents)) {
        return;
    }
    this.processTime = System.currentTimeMillis();
     for (ChangeEvent<SourceRecord, SourceRecord> event : changeEvents) {
         SourceRecord record = event.value();
         // time相关基本都是metric相关内容,不必较真
         updateMessageTimestamp(record);
         fetchDelay = processTime - messageTimestamp;
         // 通过心跳机制来更新offset
         if (isHeartbeatEvent(record)) {
             synchronized (checkpointLock) {
                 debeziumOffset.setSourcePartition(record.sourcePartition());
                 debeziumOffset.setSourceOffset(record.sourceOffset());
             }
             continue;
         }
               // 根据不同的deserialization对数据做转换,---> 可以看这个,比较容易理解StringDebeziumDeserializationSchema, 内部直接 record.toString即可,就是将debezium读取的record转换成我们想要的格式或者类型,debeziumCollector 就是下面自定义的collector,在deserialize中,会将转换完成的数据放入queue中
         deserialization.deserialize(record, debeziumCollector);
       // 判断数据是否为snapshot的最后一条数据,如果是则在这条数据之后转换到binlog的streaming流程
         if (!isSnapshotRecord(record)) {
             LOG.debug("Snapshot phase finishes.");
             isInDbSnapshotPhase = false;// runFetchLoop方法中使用
         }
         // 具体发送数据
         emitRecordsUnderCheckpointLock(
                 debeziumCollector.records, record.sourcePartition(), record.sourceOffset());
       }
       }
 private void emitRecordsUnderCheckpointLock(
        Queue<T> records, Map<String, ?> sourcePartition, Map<String, ?> sourceOffset) {
     // 同步是保证数据的发送和offset的更新是安全,lock是可重入的(不懂可以百度,java基础内容)
     synchronized (checkpointLock) {
         T record;
         // 循环debeziumCollector的records队列,将队列中的数据依次发送到下游,
         while ((record = records.poll()) != null) {
             emitDelay = System.currentTimeMillis() - messageTimestamp;
             // 通过source的context对象将其发送到下游operator,这里转入了flink的处理逻辑,不再cdc代码之内
             sourceContext.collect(record);
         }
         debeziumOffset.setSourcePartition(sourcePartition);
         debeziumOffset.setSourceOffset(sourceOffset);
     }
     }
   // 心跳机制 ,用于更新offset的机制
   private boolean isHeartbeatEvent(SourceRecord record) {
     String topic = record.topic();
     return topic != null && topic.startsWith(heartbeatTopicPrefix);
   }
 // --------------------------------自定义collector-------------------------------------------------------
 private class DebeziumCollector implements Collector<T> {
   private final Queue<T> records = new ArrayDeque<>();
   @Override
   public void collect(T record) {
     // 将数据放入队列,queue会在别的地方进出列将数据发送下游
       records.add(record);
   }
 }
 }

Handover : source线程和engine线程执行中数据交互桥梁

/* 这个类由两个线程访问, pollNext由debeziumFetcher调用,produce有debeziumConsumer调用,因为涉及多线程的调用,单纯的讲代码可能不容易理解,可以去复习一下java多线程知识内容,或者自己debug一下看看调用流程就比较容易理解了 */
@ThreadSafe //表示类是线程安全的,这类涉及engine和source线程两个线程操作,内部的实现保证了线程安全
public class Handover implements Closeable {
  private static final Logger LOG = LoggerFactory.getLogger(Handover.class);
  private final Object lock = new Object();
  @GuardedBy("lock") // 注解表示该变量受lock的保护, 不是重点勿关注
  private List<ChangeEvent<SourceRecord, SourceRecord>> next;
  @GuardedBy("lock")
  private Throwable error;
  private boolean wakeupProducer;
  /* debeziumFetcher 调用,当没有数据的时候进入wait状态,wait状态的时候cpu是不会调用wait状态的线程,另一个线程就可以占用cpu的全部时间片*/
  public List<ChangeEvent<SourceRecord, SourceRecord>> pollNext() throws Exception {
     // 同步代码块才可以使用wait和notifyAll,为什么使用这种方式,因为只有两个线程,所以这种方式实现简单,如果线程多可以通过juc的lock去做或者其他方式也可以
      synchronized (lock) {
          // 没有数据没有异常则持续循环进入wait状态,为了防止虚假唤醒的情况
          while (next == null && error == null) {
              lock.wait();
          }
          List<ChangeEvent<SourceRecord, SourceRecord>> n = next;
          // 上面的循环可以退出的时候,说明一定是有数据或者有异常,不存在其他的情况
          if (n != null) {
            // 将next置为null 下面会根据此条件作为判断条件
              next = null;
            // 唤醒其他等待线程,当然只可能是engine线程
              lock.notifyAll();
              return n;
          } else {
            // 将异常抛出
              ExceptionUtils.rethrowException(error, error.getMessage());
         // 上面方法一定会抛出异常,改代码只是为了去掉编译警告...
              return Collections.emptyList();
          }
      }
  }
  public void produce(final List<ChangeEvent<SourceRecord, SourceRecord>> element)
          throws InterruptedException {
      checkNotNull(element);
      synchronized (lock) {
        // next不等一直进入wait状态
          while (next != null && !wakeupProducer) {
              lock.wait();
          }
          wakeupProducer = false;
          // 有异常抛出异常,没异常将接受新数据,并唤醒fetcher线程
          if (error != null) {
              ExceptionUtils.rethrow(error, error.getMessage());
          } else {
              next = element;
              lock.notifyAll();
          }
      }
  }
}

上面代码即是基于RichSourceFunction实现的cdc主要代码,其实不算难,但是前人写的代码是已经把很多问题已经考虑进入,对代码的抽象也很好,扩展起来很方便,api设计对与我们开发者来说很容易使用


相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
1月前
|
消息中间件 Java Kafka
Flink CDC 在外部查询某个 job 中的表数据
【2月更文挑战第27天】Flink CDC 在外部查询某个 job 中的表数据
44 5
|
1月前
|
关系型数据库 MySQL API
Flink CDC产品常见问题之mysql整库同步到starrock时任务挂掉如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
1月前
|
关系型数据库 MySQL API
Flink CDC产品常见问题之读取不到或读取不全消息如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
1月前
|
API 数据库 流计算
有大佬知道在使用flink cdc实现数据同步,如何实现如果服务停止了对数据源表的某个数据进行删除操作,重启服务之后目标表能进行对源表删除的数据进行删除吗?
【2月更文挑战第27天】有大佬知道在使用flink cdc实现数据同步,如何实现如果服务停止了对数据源表的某个数据进行删除操作,重启服务之后目标表能进行对源表删除的数据进行删除吗?
52 3
|
1月前
|
Oracle 关系型数据库 MySQL
Flink CDC产品常见问题之flink Oraclecdc 捕获19C数据时报错错如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
1月前
|
自然语言处理 Java Scala
Flink CDC产品常见问题之大文件整库同步怎么解决
Flink CDC产品常见问题之大文件整库同步怎么解决
|
1月前
|
消息中间件 Kafka 流计算
如果有多个版本的Flink CDC在同一环境中运行,可能会导致Debezium版本冲突
【2月更文挑战第30天】如果有多个版本的Flink CDC在同一环境中运行,可能会导致Debezium版本冲突
20 2
|
1月前
|
消息中间件 API Apache
官宣|阿里巴巴捐赠的 Flink CDC 项目正式加入 Apache 基金会
本文整理自阿里云开源大数据平台徐榜江 (雪尽),关于阿里巴巴捐赠的 Flink CDC 项目正式加入 Apache 基金会。
1416 1
官宣|阿里巴巴捐赠的 Flink CDC 项目正式加入 Apache 基金会
|
1月前
|
Java 关系型数据库 MySQL
Flink CDC有见这个报错不?
【2月更文挑战第29天】Flink CDC有见这个报错不?
22 2
|
1月前
|
关系型数据库 MySQL API
Flink CDC产品常见问题之mysql整库同步到starrock时任务挂掉如何解决
Flink CDC产品常见问题之mysql整库同步到starrock时任务挂掉如何解决