CDC(Change data capture)是Cassandra提供的一种用于捕获和归档数据写入操作的机制,这个功能在3.8以上版本支持。当对一个表设置了“cdc=true”属性之后,包含有这个表的数据的CommitLog在丢弃时会被移动到指定的目录中,用户可以自己编写程序消费(解析并删除)这些日志,实现诸如增量数据导出、备份等功能。本文介绍CDC功能的使用并分析其特点。
配置参数
cassandra.yaml中,如下参数和CDC功能有关:
cdc_enabled (默认: false)
true表示节点上开启cdc功能。
cdc_raw_directory (默认: $CASSANDRA_HOME/data/cdc_raw)
当一个Commitlog文件不再需要保留(相关的改动已经全部从memtable持久化到sstable)时,会把日志文件移动到这个目录下。
cdc_free_space_in_mb: (默认: min(4096,磁盘空间的1/8))
当cdc_raw_directory目录下的日志和当前正在写入的日志占用空间超过这个值时,会停止开启了cdc的表的写入。
cdc_free_space_check_interval_ms (默认: 250)
计算日志占用空间的周期。
开启表的CDC
通过如下语句可以在一个表上开启CDC:
CREATE TABLE foo (a int, b text, PRIMARY KEY(a)) WITH cdc=true;
ALTER TABLE foo WITH cdc=true;
消费日志
CDC日志的格式和Commitlog是一致的,在Cassandra中提供了解析Commitlog的类CommitLogReader,借助这个类,我们只需要实现自己的CommitLogReadHandler类来处理捕获的写入操作就可以实现对CDC日志的消费。下面的示例代码实现了一个简单的功能:解析日志并把日志中的写操作输出到屏幕上。
pom文件中增加如下依赖:
<dependency>
<groupId>org.apache.cassandra</groupId>
<artifactId>cassandra-all</artifactId>
<version>${cassandra.version}</version>
</dependency>
消费日志的代码如下:
DatabaseDescriptor.toolInitialization();
CommitLogReader reader = new CommitLogReader();
CommitLogReadHandler handler = new CommitLogReadHandler() {
@Override public boolean shouldSkipSegmentOnError(CommitLogReadException e)
throws IOException {
return false;
}
@Override public void handleUnrecoverableError(CommitLogReadException e) throws IOException {
}
@Override public void handleMutation(Mutation mutation, int i, int i1,
CommitLogDescriptor commitLogDescriptor) {
for (PartitionUpdate partition : mutation.getPartitionUpdates()) {
System.out.println(partition.toString());
}
}
};
File dir = new File($CDC日志目录);
while (true) {
String[] paths = dir.list();
if( paths.length == 0 ) {
Thread.sleep(1000);
}
for (String path : paths) {
File f = new File(dir, path);
reader.readAllFiles(handler, ArrayUtils.toArray(f));
f.delete();
}
}
运行时需要将Cassandra需要的路径加入到classpath里面(可以通过在脚本中调用bin/cassandra.in.sh实现),或者至少指定-Dcassandra.config和-Dcassandra.storagedir这两个路径作为启动参数。这是因为在解析日志时需要表的meta信息。
总结
CDC功能主要起到两个作用:
(1)能够将增量数据可靠的归档到CDC日志目录下面。在CDC出现之前,我们只能通过commitlog_archiving这样的功能归档增量日志,但是commitlog_archiving的机制是调用命令行,在一些异常情况下可能会出现归档失败。而CDC则可以保证日志在成功归档到CDC日志目录中之前绝对不会被删除。
(2)可以起到反压的作用,避免日志占用过多的磁盘空间。当日志量超过cdc_free_space_in_mb的配置后,开启CDC的表会拒绝写请求(抛出WriteTimeoutException)。
一些限制:
(1)通过CDC不能实时读取到增量数据。根据我们前面分析的机制,只有一个日志文件写满并且不需要保留时才会移动到CDC目录下面。
(2)CDC日志中不仅包含开启了cdc功能的表,如果业务场景只需要某个表的数据,需要自己过滤。
(3)在多副本的情况下,同一个(逻辑上的)写入会在多个节点都写入日志,所以如果业务上要求避免重复,需要自己做去重。
入群邀约
为了营造一个开放的 Cassandra 技术交流,我们建立了微信群公众号和钉钉群,为广大用户提供专业的技术分享及问答,定期开展专家技术直播,欢迎大家加入。另外阿里云提供免费Cassandra试用:https://www.aliyun.com/product/cds
钉钉群入群链接:https://c.tb.cn/F3.ZRTY0o
微信群公众号: