木洛 2016-05-23 8830浏览量
表格存储(原OTS)的一大特性是能够支撑海量数据的高并发、高吞吐率的写入,特别适合日志数据或物联网场景(例如轨迹追踪或溯源)数据的写入和存储。这些场景的特性是,会在短时间内产生大量的数据需要消化并写入数据库,需要数据库能够提供高并发、高吞吐率的写入性能,需要满足每秒上万行甚至上百万行的写入吞吐率。针对这些场景,我们在存储层做了很多的优化(本篇文章不赘述),同时在SDK接口层也做了一些优化,专门提供了一个简单易用、高性能的数据导入接口。
TableStoreWriter是基于Java SDK的异步接口,封装的一层专门用于高并发、高吞吐率数据导入的接口。本篇文章主要会介绍TableStoreWriter的适用场景、底层架构以及如何使用。
如果你的应用场景,满足以下特点,则可以考虑使用TableStoreWriter来作为数据写入的入口:
特点一: 高并发,对吞吐率要求很高
需要高并发的数据写入,非写入行的吞吐率要求很高。例如日志场景,需要分布式的采集日志,采集点可能很多;需要在短时间内将这些产生的日志消费掉,导入到数据库中,衡量导入性能的指标是每秒消费多少MB的日志数据。
特点二:对单条数据的写入延迟没有要求
应用场景需要的是高写入吞吐率,而不是单条数据的写入延迟。还是拿日志场景举例,日志场景对写入的要求是每秒能处理多少条日志,而不在乎一条日志从产生到最终写入的延迟。这是典型的离线和在线场景的区别,在线场景要求反馈是及时的。从延迟的量级上来讲,在线场景可能要求数据写入在毫秒级别,而离线场景可能可以接受数据写入延迟在百毫秒级别。
为啥TableStoreWriter要求应用对单行导入的延迟没有要求?这与TableStoreWriter内部优化写入吞吐率相关,为了最大化利用存储层写入的性能,TableStoreWriter内部会做数据缓冲,尽量发送大的数据包,而数据缓冲需要数据从写入到发送有一个暂缓。
特点三:写入可异步化(可采用生产者消费者模型)
TableStoreWriter为提高写入吞吐率,做的一个优化即异步化。异步化有很多的好处,包括数据写入可以更聚集,可以提供更高的写入并发等。
所以对于应用层,需要能够接受写入异步化。异步化代表的意思是,数据写入的触发线程,不需要同步的等待该行数据是否写入成功还是失败的反馈,数据写入失败或成功的处理可以被异步的执行。
类似的架构为:生产者将数据写入一个队列,而不用管该数据何时被消费,消费者异步的消费数据。
特点四:同一条数据可重复写入
TableStoreWriter无法避免一条数据可能被重复的写入,重复的原因有很多,例如网络超时重传等。在非事务的写入模式下,都很难保证一条数据不被重复写入,而如果带了事务的写入,则性能都不会好。TableStoreWriter重性能,所以需要应用能接受一条数据被重复的写入。
ClientConfiguration cc = new ClientConfiguration();
cc.setRetryStrategy(new DefaultRetryStrategy()); // 可定制重试策略,若需要保证数据写入成功率,可采用更激进的重试策略
AsyncClient asyncClient = new AsyncClient(endPoint, accessId, accessKey, instanceName, cc);
// 初始化
WriterConfig config = new WriterConfig();
config.setMaxBatchSize(4 * 1024 * 1024); // 配置一次批量导入请求的大小限制,默认是4MB
config.setMaxColumnsCount(128); // 配置一行的列数的上限,默认128列
config.setBufferSize(1024); // 配置内存中最多缓冲的数据行数,默认1024行,必须是2的指数倍
config.setMaxBatchRowsCount(100); // 配置一次批量导入的行数上限,默认100
config.setConcurrency(10); // 配置最大并发数,默认10
config.setMaxAttrColumnSize(2 * 1024 * 1024); // 配置属性列的值大小上限,默认是2MB
config.setMaxPKColumnSize(1024); // 配置主键列的值大小上限,默认1KB
config.setFlushInterval(10000); // 配置缓冲区flush的时间间隔,默认10s
// 配置一个callback,OTSWriter通过该callback反馈哪些导入成功,哪些行导入失败,该callback只简单的统计写入成功和失败的行数。
AtomicLong succeedCount = new AtomicLong();
AtomicLong failedCount = new AtomicLong();
TableStoreCallback<RowChange, ConsumedCapacity> callback = new SampleCallback(succeedCount, failedCount);
ExecutorService executor = Executors.newFixedThreadPool(2);
TableStoreWriter tablestoreWriter = new DefaultTableStoreWriter(asyncClient, tableName, config, callback, executor);
int start = id * rowsCount;
for (int i = 0; i < rowsCount; i++) {
PrimaryKey primaryKey = PrimaryKeyBuilder.createPrimaryKeyBuilder()
.addPrimaryKeyColumn("gid", PrimaryKeyValue.fromLong(start + i))
.addPrimaryKeyColumn("uid", PrimaryKeyValue.fromLong(start + i)).build();
RowPutChange rowChange = new RowPutChange(tableName);
rowChange.setPrimaryKey(primaryKey);
rowChange.addColumn("col1", ColumnValue.fromBoolean(true));
rowChange.addColumn("col2", ColumnValue.fromLong(10));
rowChange.addColumn("col3", ColumnValue.fromString("Hello world."));
tablestoreWriter.addRowChange(rowChange);
}
private static class SampleCallback implements TableStoreCallback<RowChange, ConsumedCapacity> {
private AtomicLong succeedCount;
private AtomicLong failedCount;
public SampleCallback(AtomicLong succeedCount, AtomicLong failedCount) {
this.succeedCount = succeedCount;
this.failedCount = failedCount;
}
@Override
public void onCompleted(RowChange req, ConsumedCapacity res) {
succeedCount.incrementAndGet();
}
@Override
public void onFailed(RowChange req, Exception ex) {
ex.printStackTrace();
failedCount.incrementAndGet();
}
}
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
阿里云存储基于飞天盘古2.0分布式存储系统,产品多种多样,充分满足用户数据存储和迁移上云需求。