请问有大佬用过mysqlcdc的这个createReader 吗?

请问有大佬用过mysqlcdc的这个createReader 吗? 52adbe39a415dc7d514cc6f94cb5673b.png

展开
收起
十一0204 2023-04-06 12:13:02 198 分享 版权
2 条回答
写回答
取消 提交回答
  • 公众号:网络技术联盟站,InfoQ签约作者,阿里云社区签约作者,华为云 云享专家,BOSS直聘 创作王者,腾讯课堂创作领航员,博客+论坛:https://www.wljslmz.cn,工程师导航:https://www.wljslmz.com

    MySQLCDC 是阿里云消息队列(MQ)提供的一种数据同步工具,它可以实时捕获 MySQL 数据库的变更,并将变更数据同步到消息队列中。在使用 MySQLCDC 进行数据同步时,确实需要使用到 createReader 函数。

    createReader 函数是 MySQLCDC 中的一个重要函数,用于创建一个基于 Canal 协议的数据读取器,该读取器可以对 MySQL 数据库进行实时的变更数据捕获和推送。

    2023-04-26 09:50:26
    赞同 展开评论
  •   /*构建sourceReader */
       @Override
       public SourceReader<T, MySqlSplit> createReader(SourceReaderContext readerContext)
               throws Exception {
        // 前面提到了,根据subtask索引创建对应的config
           MySqlSourceConfig sourceConfig =
                   configFactory.createConfig(readerContext.getIndexOfSubtask());
           // 一个阻塞队列,多线程交互用的,不必深入
           FutureCompletingBlockingQueue<RecordsWithSplitIds<SourceRecord>> elementsQueue =
                   new FutureCompletingBlockingQueue<>();
        // metric相关
           final MySqlSourceReaderMetrics sourceReaderMetrics =
                   new MySqlSourceReaderMetrics(readerContext.metricGroup());
           sourceReaderMetrics.registerMetrics();
        // 通过supplier函数构建一个SplitReader,解耦的作用,主要看里面的MySqlSplitReader实现即可
           Supplier<MySqlSplitReader> splitReaderSupplier =
            // 拿到每个reader的config和对应的subtask index
                  () -> new MySqlSplitReader(sourceConfig, readerContext.getIndexOfSubtask());
       
        // 构建了一个具体的sourceReader
           return new MySqlSourceReader<>(
                   elementsQueue,
                   splitReaderSupplier,
                   new MySqlRecordEmitter<>(
                           deserializationSchema,
                           sourceReaderMetrics,
                           sourceConfig.isIncludeSchemaChanges()),
                   readerContext.getConfiguration(),
                   readerContext,
                   sourceConfig);
      }
       @Override
    
    2023-04-07 08:32:59
    赞同 展开评论

涵盖 RocketMQ、Kafka、RabbitMQ、MQTT、轻量消息队列(原MNS) 的消息队列产品体系,全系列产品 Serverless 化。RocketMQ 中文社区:https://rocketmq-learning.com/

收录在圈子:
+ 订阅
阿里云 云原生应用平台 肩负阿里巴巴集团基础设施云化以及核心技术互联网化的重要职责,致力于打造稳定、标准、先进的云原生产品,成为云原生时代的引领者,推动行业全面想云原生的技术升级,成为阿里云新增长引擎。商业化产品包括容器、云原生中间件、函数计算等。
还有其他疑问?
咨询AI助理