请问有大佬用过mysqlcdc的这个createReader 吗?
MySQLCDC 是阿里云消息队列(MQ)提供的一种数据同步工具,它可以实时捕获 MySQL 数据库的变更,并将变更数据同步到消息队列中。在使用 MySQLCDC 进行数据同步时,确实需要使用到 createReader 函数。
createReader 函数是 MySQLCDC 中的一个重要函数,用于创建一个基于 Canal 协议的数据读取器,该读取器可以对 MySQL 数据库进行实时的变更数据捕获和推送。
/*构建sourceReader */
@Override
public SourceReader<T, MySqlSplit> createReader(SourceReaderContext readerContext)
throws Exception {
// 前面提到了,根据subtask索引创建对应的config
MySqlSourceConfig sourceConfig =
configFactory.createConfig(readerContext.getIndexOfSubtask());
// 一个阻塞队列,多线程交互用的,不必深入
FutureCompletingBlockingQueue<RecordsWithSplitIds<SourceRecord>> elementsQueue =
new FutureCompletingBlockingQueue<>();
// metric相关
final MySqlSourceReaderMetrics sourceReaderMetrics =
new MySqlSourceReaderMetrics(readerContext.metricGroup());
sourceReaderMetrics.registerMetrics();
// 通过supplier函数构建一个SplitReader,解耦的作用,主要看里面的MySqlSplitReader实现即可
Supplier<MySqlSplitReader> splitReaderSupplier =
// 拿到每个reader的config和对应的subtask index
() -> new MySqlSplitReader(sourceConfig, readerContext.getIndexOfSubtask());
// 构建了一个具体的sourceReader
return new MySqlSourceReader<>(
elementsQueue,
splitReaderSupplier,
new MySqlRecordEmitter<>(
deserializationSchema,
sourceReaderMetrics,
sourceConfig.isIncludeSchemaChanges()),
readerContext.getConfiguration(),
readerContext,
sourceConfig);
}
@Override
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
涵盖 RocketMQ、Kafka、RabbitMQ、MQTT、轻量消息队列(原MNS) 的消息队列产品体系,全系产品 Serverless 化。RocketMQ 一站式学习:https://rocketmq.io/