【万字长文】Flink cdc源码精讲(推荐收藏)(一)

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
实时计算 Flink 版,5000CU*H 3个月
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
简介: 【万字长文】Flink cdc源码精讲(推荐收藏)

前言

flink-cdc源码地址 : https://github.com/ververica/flink-cdc-connectors

flink-cdc不再flink项目中,在flink1.11之后flink引入cdc功能,下面我们以源码深入了解flink-cdc实现原理,

我们主要以flink-cdc-mysql为主,其余代码基本差不太多

事先需要先简单了解一下debezium相关原理,flink-cdc是基于debezium实现的

一点建议 :

  • 在阅读源码的时候,我们应该带着问题去思考,然后一步一步去阅读源码,在阅读源码的过程中,不要被一些不重要的点给占用过多的时间精力,并且一遍两遍是不会让我有一个清晰的印象的,毕竟别人多少年多少人的开发,看一两遍就可以理解的,在阅读某个框架源码之前,我们应该已经对该框架原理有一定的理解,然后根据我们的理解去验证他是代码实现的样子,或者带着思考去阅读,为什么这么实现,这么实现的好处是什么等.,其实代码都是一样的,只不过是每个人的实现方式不同,考虑的问题不同而已,
  • 要有一定的java基础,熟悉多线程,了解开发使用的相关接口(或者自己看了介绍之后很容易理解),如果基础不牢,更多的是建议先从基础学习,然后写一写代码测试,比如多线程的时候怎么做交互等,自己写一写,在后面阅读源码的时候会更容理解里面内容
  • 该内容要首先对cdc有一定的了解,知道cdc的相关原理,flink-cdc的实现基于debezium实现,该框架是开源的,可以先去了解一下,这样对于我们后面内容会更容易理解

     谨记: 阅读的时候抓住重点!!!!!!!! , 不要被不重要的内容占用时间


一.项目结构(mysql-cdc为主)


640.png


1. 目录结构

  • 带有test项目都是用于测试的项目
  • 后缀带有cdc的表示一个database的连接器,区分sql与api形式
  • flink-format-changelog-json : 用于解析json成RowData的模块
  • flink-connector-debezium : 该模块封装debezium以及相关核心代码实现,并且修改了debezium的部分源码
  • 每个项目中都有test目录,里面有相关的测试代码,可以自行测试代码debug

2. mysql项目源码包结构


640.png


  • debezium : debezium用到的相关类
  • schema :  mysql schema(表结构)相关代码
  • source : mysql-cdc source实现代码,包括全量读mysql,分割器,读取器等相关
  • table : cdc table实现代码主要以table dynamic factory的实现
  • resrouces : 该目录用与spi方式动态加载table factory,用于sql创建table找到对应的工厂类


二.mysql-cdc源码-SourceFucntion的单并行度的实现

  • 基于RichSourceFunction的,单并行读取 1,11之前的source接口,已被标记Deprecated
  • 基于Source的,多并行度,1.11之后新出的srouce接口,实现要更复杂

我们主要根据单并行度源码基于讲解,这样更方便理解

具体入手我们可以根据文档中的创建source的类来一点一点走

MySqlSource通过构建者模式(23种设计模式)构建,我们只需要知道我们可以设置哪些参数即可,这个比较容易理解
 // 通过构建者方式配置任务启动时候所需要的参数
  public static class Builder<T> {// MysqlSourcen内部类
       private int port = 3306; // default 3306 port
       private String hostname;
       private String[] databaseList;
       private String username;
       private String password;
       private Integer serverId;
       private String serverTimeZone; //时区
       private String[] tableList;
       private Properties dbzProperties; // 传入的dbz引擎所需的属性
       private StartupOptions startupOptions = StartupOptions.initial(); // 用于控制开始binlog开始消费位置的参数
       private DebeziumDeserializationSchema<T> deserializer; // 用于对数据解析成什么样子如json,String等,定义序列化方式
     //上面参数配置完成,通过build构建sourceFunctionn,主要将配置信息封装带properties中,这里面的参数主要是debezium所需要启动参数,配置信息等,如果想要了解可以去debezium官网查看参数的具体细节
    public DebeziumSourceFunction<T> build() {
           Properties props = new Properties();
           props.setProperty("connector.class", MySqlConnector.class.getCanonicalName());
           // hard code server name, because we don't need to distinguish it, docs:
           // Logical name that identifies and provides a namespace for the particular MySQL
           // database
           // server/cluster being monitored. The logical name should be unique across all other
           // connectors,
           // since it is used as a prefix for all Kafka topic names emanating from this connector.
           // Only alphanumeric characters and underscores should be used.
           props.setProperty("database.server.name", DATABASE_SERVER_NAME);
           props.setProperty("database.hostname", checkNotNull(hostname));
           props.setProperty("database.user", checkNotNull(username));
           props.setProperty("database.password", checkNotNull(password));
           props.setProperty("database.port", String.valueOf(port));
           props.setProperty("database.history.skip.unparseable.ddl", String.valueOf(true));
           // debezium use "long" mode to handle unsigned bigint by default,
           // but it'll cause lose of precise when the value is larger than 2^63,
           // so use "precise" mode to avoid it.
           props.put("bigint.unsigned.handling.mode", "precise");
           if (serverId != null) { props.setProperty("database.server.id", String.valueOf(serverId)); }
           if (databaseList != null) {  props.setProperty("database.whitelist", String.join(",", databaseList)); }
           if (tableList != null) { props.setProperty("table.whitelist", String.join(",", tableList));}
           if (serverTimeZone != null) {  props.setProperty("database.serverTimezone", serverTimeZone);   }
      // 判断开始消费位置,在sqlSourceBuilder中构建的参数,没有则为null
           DebeziumOffset specificOffset = null;
           switch (startupOptions.startupMode) {
               case INITIAL:
                   props.setProperty("snapshot.mode", "initial");
                   break;
               case EARLIEST_OFFSET:
                   props.setProperty("snapshot.mode", "never");
                   break;
               case LATEST_OFFSET:
                   props.setProperty("snapshot.mode", "schema_only");
                   break;
               case SPECIFIC_OFFSETS:      
                   props.setProperty("snapshot.mode", "schema_only_recovery");
                   specificOffset = new DebeziumOffset();
                   Map<String, String> sourcePartition = new HashMap<>();
                   sourcePartition.put("server", DATABASE_SERVER_NAME);
                   specificOffset.setSourcePartition(sourcePartition);
                   Map<String, Object> sourceOffset = new HashMap<>();
                   sourceOffset.put("file", startupOptions.specificOffsetFile);
                   sourceOffset.put("pos", startupOptions.specificOffsetPos);
                   specificOffset.setSourceOffset(sourceOffset);
                   break;
               case TIMESTAMP:
                   checkNotNull(deserializer);
                   props.setProperty("snapshot.mode", "never");
                   deserializer =
                           new SeekBinlogToTimestampFilter<>(
                                   startupOptions.startupTimestampMillis, deserializer);
                   break;
               default:
                   throw new UnsupportedOperationException();
          }
           if (dbzProperties != null) {
               props.putAll(dbzProperties);
               // Add default configurations for compatibility when set the legacy mysql connector
               // implementation
               if (LEGACY_IMPLEMENTATION_VALUE.equals(
                       dbzProperties.get(LEGACY_IMPLEMENTATION_KEY))) {
                   props.put("transforms", "snapshotasinsert");
                   props.put(
                           "transforms.snapshotasinsert.type",
                           "io.debezium.connector.mysql.transforms.ReadToInsertEvent");
              }
          }
       // 构建通用的cdc sourceFunction --> 基于richSourceFunction
           return new DebeziumSourceFunction<>(
                   deserializer, props, specificOffset,
             new MySqlValidator(props) // mysql校验器,版本信息,binlog是否为row等
          );
      }
  }

上面内容主要是以构建source所需要的参数为主,具体我们进入到DebeziumSourceFunction中看看具体实现

// source代码,用于读取binlog,logminer等
// 实现richSourceFuntion完成source端代码的编写,实现ChecckpointFunction用于保证容错相关的内容,实现checkpointListener监听checkpoint的完成状态
public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
       implements CheckpointedFunction, CheckpointListener, ResultTypeQueryable<T> {
 // ------------------------------列出一些比较重要的成员变量,不重要的忽略了------------------------------------------
   // ----------------------------------State-------------------------------------------------
/* 主要用于状态的维护,当任务出现问题重启/手动重启后,维护的一些schema(record中的结构) 未消费的records(在queue中,后面会看到) offset等信息 */
   private transient volatile String restoredOffsetState;
   private transient ListState<byte[]> offsetState;
   private transient ListState<String> schemaRecordsState;
   // -----------------------------------Worker-----------------------------------------------
/* 一个单线程的线程池,一个debeziumEngine(一个runnable的实现类)用与读取binlog数据  
TODO 所以设计到多线程的交互*/
   private transient ExecutorService executor;
   private transient DebeziumEngine<?> engine;
   /* 一个consumer,用于从engine中读取数据的消费者,并将数据放入handover中 */
   private transient DebeziumChangeConsumer changeConsumer;
   /* 用于从handover中拿取数据 */
   private transient DebeziumChangeFetcher<T> debeziumChangeFetcher;
   /* 两个线程(source,engine)之间交互数据的一个桥梁 */
   private transient Handover handover;
 // ----------------------------------------我们主要介绍srouce的run方法,其他方法主要用于容错相关--------------------------------------
   @Override
   public void run(SourceContext<T> sourceContext) throws Exception {
      // TODO 用于engine执行的一些相关参数,不是终点内容,如果感兴趣可官网看看说明
       properties.setProperty("name", "engine");
       properties.setProperty("offset.storage", FlinkOffsetBackingStore.class.getCanonicalName());
       if (restoredOffsetState != null) {
           properties.setProperty(FlinkOffsetBackingStore.OFFSET_STATE_VALUE, restoredOffsetState);
      }
       properties.setProperty("include.schema.changes", "false");
       properties.setProperty("offset.flush.interval.ms", String.valueOf(Long.MAX_VALUE));
       properties.setProperty("tombstones.on.delete", "false");
       if (engineInstanceName == null) {
           engineInstanceName = UUID.randomUUID().toString();
      }    
       properties.setProperty(
               FlinkDatabaseHistory.DATABASE_HISTORY_INSTANCE_NAME, engineInstanceName);  
       properties.setProperty("database.history", determineDatabase().getCanonicalName());
       String dbzHeartbeatPrefix =
               properties.getProperty(
                       Heartbeat.HEARTBEAT_TOPICS_PREFIX.name(),
                       Heartbeat.HEARTBEAT_TOPICS_PREFIX.defaultValueAsString());
       this.debeziumChangeFetcher =
               new DebeziumChangeFetcher<>(
                       sourceContext,
                       deserializer,
                       restoredOffsetState == null, // 是否是快照阶段或者state==null?
                       dbzHeartbeatPrefix,
                       handover);
       // 创建并配置engine相关参数
       this.engine =
               DebeziumEngine.create(Connect.class)
                      .using(properties)// 参数
                      .notifying(changeConsumer) // 配饰consumer消费 engine读取的数据(binlog/历史数据)
                      .using(OffsetCommitPolicy.always()) // offset的提交策略
                      .using(
                              (success, message, error) -> {
                                   if (success) {
                                       handover.close();
                                  } else {
                                       handover.reportError(error);
                                  }
                              })
                      .build();
       // 将engine任务提交到线程池中执行
       executor.execute(engine);
       debeziumStarted = true;
       // mertic相关配置i
       MetricGroup metricGroup = getRuntimeContext().getMetricGroup();
       // ....
       // 启动fetcher,循环去hanover中拿取最新数据发送下游
       debeziumChangeFetcher.runFetchLoop();
  }
}


上面我们已经看了source.run的基本实现,他的主要处理逻辑在DebeziumChangeConsumer,DebeziumChangeFetcher,Handover中

简单介绍三个类的作用和 主要 方法参数

DebeziumChangeConsumer : 用于消费engine读取的数据

/* 该类实现 DebeziumEngine.ChangeConsumer接口,实现handlerBatch方法 相对比较简单, 另外两个成员方法主要是offset相关,非重点内容*/
// engine线程会调用handleBatch方法出传递引擎消费到的数据
public class DebeziumChangeConsumer
   implements DebeziumEngine.ChangeConsumer<ChangeEvent<SourceRecord, SourceRecord>> {
@Override
public void handleBatch(
   List<ChangeEvent<SourceRecord, SourceRecord>> events,
   RecordCommitter<ChangeEvent<SourceRecord, SourceRecord>> recordCommitter) {
try {
   currentCommitter = recordCommitter;
  // 间接调用到handover的produce方法,该方法是阻塞的 嘻嘻嘻(如果有历史records未被消费则wait)
   handover.produce(events);
} catch (Throwable e) {
   // Hold this exception in handover and trigger the fetcher to exit
   handover.reportError(e);
      }
 }
}

DebeziumChangeFetcher : 循环从handover中获取consumer从engine读取的最新数据

public class DebeziumChangeFetcher<T> {
private final SourceFunction.SourceContext<T> sourceContext;
/* 保证数据发送和状态更新的一把锁 */
private final Object checkpointLock;
/* 用于将数据转化成我们自定义的类型,如json,string等 */
private final DebeziumDeserializationSchema<T> deserialization;
/* 下面自定义的collector*/
private final DebeziumCollector debeziumCollector;
 /* 见名知意,很好理解 */
private final DebeziumOffset debeziumOffset;
/* 用于存储在stateoffset的序列化器*/
private final DebeziumOffsetSerializer stateSerializer;
/* 心跳相关*/
private final String heartbeatTopicPrefix;
/* 是否恢复的状态,需要消费历史相关数据*/
private boolean isInDbSnapshotPhase;
private final Handover handover;
public void runFetchLoop() throws Exception {
  try {
      // 读取mysql历史的数据,不要被名字所迷惑
      if (isInDbSnapshotPhase) {
          List<ChangeEvent<SourceRecord, SourceRecord>> events = handover.pollNext();
             synchronized (checkpointLock) {
                 LOG.info(
                         "Database snapshot phase can't perform checkpoint, acquired Checkpoint lock.");              
                 handleBatch(events);
                 // 这里防止snapshot数据无法一次读取完毕,必须保证snapshot数据读取完毕才进入binlog的读取
                 while (isRunning && isInDbSnapshotPhase) {
                     handleBatch(handover.pollNext());
                 }
             }
             LOG.info("Received record from streaming binlog phase, released checkpoint lock.");
             }
         // 到这里表示snapshot的数据读取完毕,开始实时读取binlog数据
         while (isRunning) {
             // 具体的处理数据逻辑   pollNext会阻塞
             handleBatch(handover.pollNext());
         }
         } catch (Handover.ClosedException e) {
         // ignore
         }
 private void handleBatch(List<ChangeEvent<SourceRecord, SourceRecord>> changeEvents)
        throws Exception {
    if (CollectionUtils.isEmpty(changeEvents)) {
        return;
    }
    this.processTime = System.currentTimeMillis();
     for (ChangeEvent<SourceRecord, SourceRecord> event : changeEvents) {
         SourceRecord record = event.value();
         // time相关基本都是metric相关内容,不必较真
         updateMessageTimestamp(record);
         fetchDelay = processTime - messageTimestamp;
         // 通过心跳机制来更新offset
         if (isHeartbeatEvent(record)) {
             synchronized (checkpointLock) {
                 debeziumOffset.setSourcePartition(record.sourcePartition());
                 debeziumOffset.setSourceOffset(record.sourceOffset());
             }
             continue;
         }
               // 根据不同的deserialization对数据做转换,---> 可以看这个,比较容易理解StringDebeziumDeserializationSchema, 内部直接 record.toString即可,就是将debezium读取的record转换成我们想要的格式或者类型,debeziumCollector 就是下面自定义的collector,在deserialize中,会将转换完成的数据放入queue中
         deserialization.deserialize(record, debeziumCollector);
       // 判断数据是否为snapshot的最后一条数据,如果是则在这条数据之后转换到binlog的streaming流程
         if (!isSnapshotRecord(record)) {
             LOG.debug("Snapshot phase finishes.");
             isInDbSnapshotPhase = false;// runFetchLoop方法中使用
         }
         // 具体发送数据
         emitRecordsUnderCheckpointLock(
                 debeziumCollector.records, record.sourcePartition(), record.sourceOffset());
       }
       }
 private void emitRecordsUnderCheckpointLock(
        Queue<T> records, Map<String, ?> sourcePartition, Map<String, ?> sourceOffset) {
     // 同步是保证数据的发送和offset的更新是安全,lock是可重入的(不懂可以百度,java基础内容)
     synchronized (checkpointLock) {
         T record;
         // 循环debeziumCollector的records队列,将队列中的数据依次发送到下游,
         while ((record = records.poll()) != null) {
             emitDelay = System.currentTimeMillis() - messageTimestamp;
             // 通过source的context对象将其发送到下游operator,这里转入了flink的处理逻辑,不再cdc代码之内
             sourceContext.collect(record);
         }
         debeziumOffset.setSourcePartition(sourcePartition);
         debeziumOffset.setSourceOffset(sourceOffset);
     }
     }
   // 心跳机制 ,用于更新offset的机制
   private boolean isHeartbeatEvent(SourceRecord record) {
     String topic = record.topic();
     return topic != null && topic.startsWith(heartbeatTopicPrefix);
   }
 // --------------------------------自定义collector-------------------------------------------------------
 private class DebeziumCollector implements Collector<T> {
   private final Queue<T> records = new ArrayDeque<>();
   @Override
   public void collect(T record) {
     // 将数据放入队列,queue会在别的地方进出列将数据发送下游
       records.add(record);
   }
 }
 }

Handover : source线程和engine线程执行中数据交互桥梁

/* 这个类由两个线程访问, pollNext由debeziumFetcher调用,produce有debeziumConsumer调用,因为涉及多线程的调用,单纯的讲代码可能不容易理解,可以去复习一下java多线程知识内容,或者自己debug一下看看调用流程就比较容易理解了 */
@ThreadSafe //表示类是线程安全的,这类涉及engine和source线程两个线程操作,内部的实现保证了线程安全
public class Handover implements Closeable {
  private static final Logger LOG = LoggerFactory.getLogger(Handover.class);
  private final Object lock = new Object();
  @GuardedBy("lock") // 注解表示该变量受lock的保护, 不是重点勿关注
  private List<ChangeEvent<SourceRecord, SourceRecord>> next;
  @GuardedBy("lock")
  private Throwable error;
  private boolean wakeupProducer;
  /* debeziumFetcher 调用,当没有数据的时候进入wait状态,wait状态的时候cpu是不会调用wait状态的线程,另一个线程就可以占用cpu的全部时间片*/
  public List<ChangeEvent<SourceRecord, SourceRecord>> pollNext() throws Exception {
     // 同步代码块才可以使用wait和notifyAll,为什么使用这种方式,因为只有两个线程,所以这种方式实现简单,如果线程多可以通过juc的lock去做或者其他方式也可以
      synchronized (lock) {
          // 没有数据没有异常则持续循环进入wait状态,为了防止虚假唤醒的情况
          while (next == null && error == null) {
              lock.wait();
          }
          List<ChangeEvent<SourceRecord, SourceRecord>> n = next;
          // 上面的循环可以退出的时候,说明一定是有数据或者有异常,不存在其他的情况
          if (n != null) {
            // 将next置为null 下面会根据此条件作为判断条件
              next = null;
            // 唤醒其他等待线程,当然只可能是engine线程
              lock.notifyAll();
              return n;
          } else {
            // 将异常抛出
              ExceptionUtils.rethrowException(error, error.getMessage());
         // 上面方法一定会抛出异常,改代码只是为了去掉编译警告...
              return Collections.emptyList();
          }
      }
  }
  public void produce(final List<ChangeEvent<SourceRecord, SourceRecord>> element)
          throws InterruptedException {
      checkNotNull(element);
      synchronized (lock) {
        // next不等一直进入wait状态
          while (next != null && !wakeupProducer) {
              lock.wait();
          }
          wakeupProducer = false;
          // 有异常抛出异常,没异常将接受新数据,并唤醒fetcher线程
          if (error != null) {
              ExceptionUtils.rethrow(error, error.getMessage());
          } else {
              next = element;
              lock.notifyAll();
          }
      }
  }
}

上面代码即是基于RichSourceFunction实现的cdc主要代码,其实不算难,但是前人写的代码是已经把很多问题已经考虑进入,对代码的抽象也很好,扩展起来很方便,api设计对与我们开发者来说很容易使用


相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
4月前
|
数据采集 SQL canal
Amoro + Flink CDC 数据融合入湖新体验
本文总结了货拉拉高级大数据开发工程师陈政羽在Flink Forward Asia 2024上的分享,聚焦Flink CDC在货拉拉的应用与优化。内容涵盖CDC应用现状、数据入湖新体验、入湖优化及未来规划。文中详细分析了CDC在多业务场景中的实践,包括数据采集平台化、稳定性建设,以及面临的文件碎片化、Schema演进等挑战。同时介绍了基于Apache Amoro的湖仓融合架构,通过自优化服务解决小文件问题,提升数据新鲜度与读写平衡。未来将深化Paimon与Amoro的结合,打造更高效的入湖生态与自动化优化方案。
217 1
Amoro + Flink CDC 数据融合入湖新体验
|
4月前
|
SQL 关系型数据库 MySQL
Flink CDC 3.4 发布, 优化高频 DDL 处理,支持 Batch 模式,新增 Iceberg 支持
Apache Flink CDC 3.4.0 版本正式发布!经过4个月的开发,此版本强化了对高频表结构变更的支持,新增 batch 执行模式和 Apache Iceberg Sink 连接器,可将数据库数据全增量实时写入 Iceberg 数据湖。51位贡献者完成了259次代码提交,优化了 MySQL、MongoDB 等连接器,并修复多个缺陷。未来 3.5 版本将聚焦脏数据处理、数据限流等能力及 AI 生态对接。欢迎下载体验并提出反馈!
738 1
Flink CDC 3.4 发布, 优化高频 DDL 处理,支持 Batch 模式,新增 Iceberg 支持
|
5月前
|
SQL API Apache
Dinky 和 Flink CDC 在实时整库同步的探索之路
本次分享围绕 Dinky 的整库同步技术演进,从传统数据集成方案的痛点出发,探讨了 Flink CDC Yaml 作业的探索历程。内容分为三个部分:起源、探索、未来。在起源部分,分析了传统数据集成方案中全量与增量割裂、时效性低等问题,引出 Flink CDC 的优势;探索部分详细对比了 Dinky CDC Source 和 Flink CDC Pipeline 的架构与能力,深入讲解了 YAML 作业的细节,如模式演变、数据转换等;未来部分则展望了 Dinky 对 Flink CDC 的支持与优化方向,包括 Pipeline 转换功能、Transform 扩展及实时湖仓治理等。
641 12
Dinky 和 Flink CDC 在实时整库同步的探索之路
|
3月前
|
消息中间件 SQL 关系型数据库
Flink CDC + Kafka 加速业务实时化
Flink CDC 是一种支持流批一体的分布式数据集成工具,通过 YAML 配置实现数据传输过程中的路由与转换操作。它已从单一数据源的 CDC 数据流发展为完整的数据同步解决方案,支持 MySQL、Kafka 等多种数据源和目标端(如 Delta Lake、Iceberg)。其核心功能包括多样化数据输入链路、Schema Evolution、Transform 和 Routing 模块,以及丰富的监控指标。相比传统 SQL 和 DataStream 作业,Flink CDC 提供更灵活的 Schema 变更控制和原始 binlog 同步能力。
|
6月前
|
Oracle 关系型数据库 Java
【YashanDB知识库】Flink CDC实时同步Oracle数据到崖山
本文介绍通过Flink CDC实现Oracle数据实时同步至崖山数据库(YashanDB)的方法,支持全量与增量同步,并涵盖新增、修改和删除的DML操作。内容包括环境准备(如JDK、Flink版本等)、Oracle日志归档启用、用户权限配置、增量日志记录设置、元数据迁移、Flink安装与配置、生成Flink SQL文件、Streampark部署,以及创建和启动实时同步任务的具体步骤。适合需要跨数据库实时同步方案的技术人员参考。
【YashanDB知识库】Flink CDC实时同步Oracle数据到崖山
|
6月前
|
关系型数据库 MySQL 数据库
基于Flink CDC 开发,支持Web-UI的实时KingBase 连接器,三大模式无缝切换,效率翻倍!
TIS 是一款基于Web-UI的开源大数据集成工具,通过与人大金仓Kingbase的深度整合,提供高效、灵活的实时数据集成方案。它支持增量数据监听和实时写入,兼容MySQL、PostgreSQL和Oracle模式,无需编写复杂脚本,操作简单直观,特别适合非专业开发人员使用。TIS率先实现了Kingbase CDC连接器的整合,成为业界首个开箱即用的Kingbase CDC数据同步解决方案,助力企业数字化转型。
985 5
基于Flink CDC 开发,支持Web-UI的实时KingBase 连接器,三大模式无缝切换,效率翻倍!
|
6月前
|
存储 SQL Java
Flink CDC + Hologres高性能数据同步优化实践
本文整理自阿里云高级技术专家胡一博老师在Flink Forward Asia 2024数据集成(二)专场的分享,主要内容包括:1. Hologres介绍:实时数据仓库,支持毫秒级写入和高QPS查询;2. 写入优化:通过改进缓冲队列、连接池和COPY模式提高吞吐量和降低延迟;3. 消费优化:优化离线场景和分区表的消费逻辑,提升性能和资源利用率;4. 未来展望:进一步简化用户操作,支持更多DDL操作及全增量消费。Hologres 3.0全新升级为一体化实时湖仓平台,提供多项新功能并降低使用成本。
481 1
Flink CDC + Hologres高性能数据同步优化实践
|
6月前
|
分布式计算 关系型数据库 MySQL
Flink CDC 3.3.0 发布公告
Flink CDC 3.3.0 发布公告
241 14
|
6月前
|
SQL 弹性计算 DataWorks
Flink CDC 在阿里云 DataWorks 数据集成入湖场景的应用实践
Flink CDC 在阿里云 DataWorks 数据集成入湖场景的应用实践
237 6
|
6月前
|
SQL 人工智能 关系型数据库
Flink CDC YAML:面向数据集成的 API 设计
Flink CDC YAML:面向数据集成的 API 设计
170 5

热门文章

最新文章