为了让表格存储 Tablestore 用户更好的了解产品数据管理能力以及SDK使用,本文将结合代码讲述数据管理的功能与使用方式。本文仅讲述主表数据的管理使用,并非Tablestore全部的数据管理能力。涉及到二级索引、多元索引查询能力的内容,将会在以后的文章中单独介绍。样例代码链接地址见:Tablestore-Examples项目。
数据管理概览
Tablestore的数据以PrimaryKey作为唯一行标识,表数据的管理也是基于PrimaryKey做相应的增、删、改、查。对于Tablestore的主表数据管理,按照操作数量分为:单行操作、批量操作两类。按照操作类型又分为:写、读两类。单行数据的增、删、改操作都实现了RowChange接口,因此在批量操作中,可以一次请求同时对数据发起增、删、改的操作请求。
数据管理使用
单行操作
数据插入
数据行级别插入,需要提供完整的PrimaryKey标识行的唯一ID,然后配置行的属性值。
注意事项:
- 主键必须完整,并且与表属性主键名、主键类型一致,否侧会有参数错误;
- 如果该行数据存在,默认覆盖原数据。如果不想覆盖,可通过设置condition做校验,若存在则会抛错。
PrimaryKey primaryKey = PrimaryKeyBuilder.createPrimaryKeyBuilder()
.addPrimaryKeyColumn(PK1, PrimaryKeyValue.fromLong(1L))
.addPrimaryKeyColumn(PK2, PrimaryKeyValue.fromString("string"))
.build();
RowPutChange rowChange = new RowPutChange(TABLE_NAME, primaryKey);
rowChange.addColumn("string", ColumnValue.fromString("string value"))
.addColumn("long", ColumnValue.fromLong(1L))
.addColumn("boolean", ColumnValue.fromBoolean(true))
.addColumn("double", ColumnValue.fromDouble(1.1))
.addColumn("binary", ColumnValue.fromBinary(new byte[]{1, 2, 3}));
PutRowRequest putRowRequest = new PutRowRequest(rowChange);
syncClient.putRow(putRowRequest);
数据插入-条件插入
条件写入是提供三种行级别条件检验,对写操作都支持,RowChange接口提供的API,主要包含如下行检验方式:
- IGNORE // 不做检查,默认值
- EXPECT_EXIST // 期望行存在,存在才会做写操作
- EXPECT_NOT_EXIST // 期望行不存在,不存在才会做写操作
// 若条件校验失败,则会抛错:OTSConditionCheckFail
rowChange.setCondition(new Condition(RowExistenceExpectation.EXPECT_NOT_EXIST));
数据插入-自增列
支持自增列数据插入,前提创建表格时相应的主键列已设置成自增列。数据插入时,该列主键设置为特殊的列值。
PrimaryKey primaryKey = PrimaryKeyBuilder.createPrimaryKeyBuilder()
.addPrimaryKeyColumn(PK1, PrimaryKeyValue.fromLong(1L))
.addPrimaryKeyColumn(PK2, PrimaryKeyValue.fromString("string"))
.addPrimaryKeyColumn(PK3, PrimaryKeyValue.AUTO_INCREMENT) // 自增列的特殊列值
.build();
RowPutChange rowChange = new RowPutChange(TABLE_NAME_INC, primaryKey);
rowChange.addColumn("string", ColumnValue.fromString("string value"));
rowChange.setReturnType(ReturnType.RT_PK); // 获取主键自增列实际插入的值
PutRowRequest putRowRequest = new PutRowRequest(rowChange);
PutRowResponse putRowResponse = syncClient.putRow(putRowRequest);
// 自增列的实际存储值只有在返回的Response中才能拿到
PrimaryKey returnPrimaryKey = putRowResponse.getRow().getPrimaryKey();
数据删除
基于给定的完整PrimaryKey删除相应行的数据,如果行不存在,删除操作也不会抛错。
注意事项:
- 主键必须完整,并且与表属性主键名、主键类型一致,否侧会有参数错误;
PrimaryKey primaryKey = PrimaryKeyBuilder.createPrimaryKeyBuilder()
.addPrimaryKeyColumn(PK1, PrimaryKeyValue.fromLong(1L))
.addPrimaryKeyColumn(PK2, PrimaryKeyValue.fromString("string"))
.build();
RowDeleteChange rowChange = new RowDeleteChange(TABLE_NAME, primaryKey);
DeleteRowRequest getRowRequest = new DeleteRowRequest(rowChange);
syncClient.deleteRow(getRowRequest);
数据更新
数据更新对指定行做指定字段做更新操作,涵盖列的增、删、改。
注意事项:
- 主键必须完整,并且与表属性主键名、主键类型一致,否侧会有参数错误;
-
支持条件更新:
- 支持单列值过滤(SingleColumnValueCondition)
- 组合多列值过滤(CompositeColumnValueCondition)单列值的与、或、非组合
-
操作可以是:
- 删除指定列的指定版本
- 删除指定列的所有版本
- 增加新列
- 更新已存在列
- 指定列(整数列)做原子加:原子加更新时,如果想要获取增加后的值,需要设置ReturnType
PrimaryKey primaryKey = PrimaryKeyBuilder.createPrimaryKeyBuilder()
.addPrimaryKeyColumn(PK1, PrimaryKeyValue.fromLong(1L))
.addPrimaryKeyColumn(PK2, PrimaryKeyValue.fromString("string"))
.build();
RowUpdateChange rowChange = new RowUpdateChange(TABLE_NAME, primaryKey);
rowChange
.deleteColumns("binary") // 删除指定列所有版本
.deleteColumn("double", timestamp) // 删除指定列指定版本
.put("string", ColumnValue.fromString("new string value")) // 更新已存在列
.put("newCol", ColumnValue.fromString("new col")) // 增加新列
.increment(new Column("long", ColumnValue.fromLong(100L))); // 原子加
rowChange.setReturnType(ReturnType.RT_AFTER_MODIFY); // 获取属性列原子加修改后的值
UpdateRowRequest updateRowRequest = new UpdateRowRequest(rowChange);
UpdateRowResponse updateRowResponse = syncClient.updateRow(updateRowRequest);
数据更新-条件更新
Tablestore支持行级别的条件更有新,用户可以提供属性列值的条件校验实现条件更新。条件更新也是RowChange接口提供的API。
条件更新支持单列条件校验、组合属性列条件校验两种,满足条件时才会完成数据更新,不满足是会抛错误。
单属性列条件校验
Condition condition = new Condition(RowExistenceExpectation.EXPECT_EXIST); // 条件更新
condition.setColumnCondition(
new SingleColumnValueCondition(
"boolean",
SingleColumnValueCondition.CompareOperator.EQUAL,
ColumnValue.fromBoolean(true)));
rowChange.setCondition(condition);
组合属性列条件校验(多列属性的与、或、非组合)
CompositeColumnValueCondition colCondition = new CompositeColumnValueCondition(CompositeColumnValueCondition.LogicOperator.AND);
ColumnCondition subColCondition1 = new SingleColumnValueCondition(
"boolean",
SingleColumnValueCondition.CompareOperator.EQUAL,
ColumnValue.fromBoolean(true));
ColumnCondition subColCondition2 = new SingleColumnValueCondition(
"double",
SingleColumnValueCondition.CompareOperator.GREATER_THAN,
ColumnValue.fromLong(0L));
colCondition.addCondition(subColCondition1).addCondition(subColCondition2);
Condition condition = new Condition();
condition.setColumnCondition(colCondition);
rowChange.setCondition(condition);
数据查询
说明:基于给定的完整PrimaryKey读取该行行数据。如果数据不存在,则Response.getRow()为空。
注意事项:
- 主键必须完整,并且与表属性主键名、主键类型一致,否侧会有参数错误;
- 默认返回所有属性列,如果指定属性列仅返回属性列值;
- 如果指定属性列,但该行数据没有指定的列属性(有其他属性列),则会返回null,不表示该行一定不存在;
- 多版本表可以通过设置MaxVersion返回列多个版本的数据;
PrimaryKey primaryKey = PrimaryKeyBuilder.createPrimaryKeyBuilder()
.addPrimaryKeyColumn(PK1, PrimaryKeyValue.fromLong(1L))
.addPrimaryKeyColumn(PK2, PrimaryKeyValue.fromString("string"))
.build();
SingleRowQueryCriteria criteria = new SingleRowQueryCriteria(TABLE_NAME, primaryKey);
criteria.setMaxVersions(1);
GetRowRequest getRowRequest = new GetRowRequest(criteria);
GetRowResponse getRowResponse = syncClient.getRow(getRowRequest);
批量操作
数据写-增删改
批量数据操作与单行操作都是基于RowChange实现的请求构建。因此一个BatchWriteRowRequest中,可以包含增、删、改三种不同的操作。同时,表名参数包含在各自的行信息中,所以可以实现多表数据的混合写入操作。用户可以自己尝试。
注意事项:
- 请求失败,所有的行操作都失败
- 请求成功,不表示每一行都成功,需要判断失败列表的长度是否为零。对于失败行,可重新构建请求做重试。
- 批量写(增删改)操作构建的请求有限制:行数<=200,总大小<=4M,超过限制会抛错
BatchWriteRowRequest batchWriteRowRequest = new BatchWriteRowRequest();
/**
* Multi Put Row
*/
for (long i = 1L; i <= 10L; i++) {
PrimaryKey primaryKey = PrimaryKeyBuilder.createPrimaryKeyBuilder()
.addPrimaryKeyColumn(PK1, PrimaryKeyValue.fromLong(i))
.addPrimaryKeyColumn(PK2, PrimaryKeyValue.fromString("string"))
.build();
RowPutChange rowChange = new RowPutChange(TABLE_NAME, primaryKey);
rowChange.addColumn("long", ColumnValue.fromLong(i));
batchWriteRowRequest.addRowChange(rowChange);
}
/**
* Multi Update Row
*/
for (long i = 11L; i <= 20L; i++) {
PrimaryKey primaryKey = PrimaryKeyBuilder.createPrimaryKeyBuilder()
.addPrimaryKeyColumn(PK1, PrimaryKeyValue.fromLong(i))
.addPrimaryKeyColumn(PK2, PrimaryKeyValue.fromString("string"))
.build();
RowUpdateChange rowChange = new RowUpdateChange(TABLE_NAME, primaryKey);
rowChange.put("long", ColumnValue.fromLong(i));
batchWriteRowRequest.addRowChange(rowChange);
}
/**
* Multi Delete Row
*/
for (long i = 21L; i <= 30L; i++) {
PrimaryKey primaryKey = PrimaryKeyBuilder.createPrimaryKeyBuilder()
.addPrimaryKeyColumn(PK1, PrimaryKeyValue.fromLong(i))
.addPrimaryKeyColumn(PK2, PrimaryKeyValue.fromString("string"))
.build();
RowDeleteChange rowChange = new RowDeleteChange(TABLE_NAME, primaryKey);
batchWriteRowRequest.addRowChange(rowChange);
}
BatchWriteRowResponse batchWriteRowResponse = syncClient.batchWriteRow(batchWriteRowRequest);
int totalRowCount = batchWriteRowResponse.getRowStatus(TABLE_NAME).size();
int failedRowCount = batchWriteRowResponse.getFailedRows().size();
批量写-失败行重试
批量写操作成功后,并不表示批请求的所有行操作都成功。行级别的状态需要单独判断。返回失败行,对失败行重新构建BatchWriteRow请求,重新发起请求。
if (batchWriteRowResponse.getFailedRows().size() > 0 ) {
BatchWriteRowRequest retryRequest = batchWriteRowRequest.createRequestForRetry(
batchWriteRowResponse.getFailedRows());
syncClient.batchWriteRow(retryRequest);
}
多行随机查询
批量读查询支持单表下,批量数据同时读的能力,用户构建请求时只需添加行的完整PrimaryKey即可。
注意事项:
- 批量读请求失败,每一行读都没有成功
- 批量读请求成功,并不能表示每一行请求都成功,需要判断失败列表的长度是否为零。
BatchGetRowRequest batchGetRowRequest = new BatchGetRowRequest();
MultiRowQueryCriteria criteria = new MultiRowQueryCriteria(TABLE_NAME);
for (long i = 1L; i <= 30L; i += 10) {
PrimaryKey primaryKey = PrimaryKeyBuilder.createPrimaryKeyBuilder()
.addPrimaryKeyColumn(PK1, PrimaryKeyValue.fromLong(i))
.addPrimaryKeyColumn(PK2, PrimaryKeyValue.fromString("string"))
.build();
criteria.addRow(primaryKey);
}
criteria.setMaxVersions(1);
batchGetRowRequest.addMultiRowQueryCriteria(criteria);
BatchGetRowResponse batchGetRowResponse = syncClient.batchGetRow(batchGetRowRequest);
int totalRowCount = batchGetRowResponse.getBatchGetRowResult(TABLE_NAME).size();
int failedRowCount = batchGetRowResponse.getFailedRows().size();
范围查询
表中数据基于PrimaryKey有序排列,表是一个特殊的基于主键的联合索引。因此,数据范围查询遵循联合索引的最左匹配原则。即:范围查询时如果某一列提供了具体的范围值(非MIN、MAX),则下一列的范围限制就被忽略,因为下一列的范围约束是无效的。具体参考下图:
注意事项:
- 结果与条件不一致时,请参考最左匹配原则
- 范围左开右闭,及包含start,但不包含end
- 正序时,start必须小于end;倒序时:start必须大于end
GetRangeRequest getRangeRequest = new GetRangeRequest();
PrimaryKey start = PrimaryKeyBuilder.createPrimaryKeyBuilder()
.addPrimaryKeyColumn(PK1, PrimaryKeyValue.fromLong(2))
.addPrimaryKeyColumn(PK2, PrimaryKeyValue.fromString("5"))
.build();
PrimaryKey end = PrimaryKeyBuilder.createPrimaryKeyBuilder()
.addPrimaryKeyColumn(PK1, PrimaryKeyValue.fromLong(4))
.addPrimaryKeyColumn(PK2, PrimaryKeyValue.fromString("4"))
.build();
RangeRowQueryCriteria criteria = new RangeRowQueryCriteria(TABLE_NAME);
criteria.setInclusiveStartPrimaryKey(start);
criteria.setExclusiveEndPrimaryKey(end);
criteria.setDirection(Direction.FORWARD); // 设置顺序
criteria.setMaxVersions(1); // 返回版本个数
criteria.setLimit(20); // 每页返回行数限制
getRangeRequest.setRangeRowQueryCriteria(criteria);
GetRangeResponse getRangeResponse = syncClient.getRange(getRangeRequest);
范围查询-连续翻页
当范围条件数据单次请求未拿到全量数据时,需要用户基于NextStartPrimaryKey做连续翻页,从而遍历全部命中数据。如果GetRangeResponse.getNextStartPrimaryKey()非空,则一定还有数据。通过将获取的PrimaryKey设置到原请求中重新构建,然后再次发起请求。
PrimaryKey nextStartPrimaryKey = null;
do {
GetRangeResponse getRangeResponse = syncClient.getRange(getRangeRequest);
List<Row> pageRowList = getRangeResponse.getRows();
nextStartPrimaryKey = getRangeResponse.getNextStartPrimaryKey();
if (nextStartPrimaryKey != null) { // 判断存在下一页,重构请求
criteria.setInclusiveStartPrimaryKey(nextStartPrimaryKey);
getRangeRequest.setRangeRowQueryCriteria(criteria);
}
} while (nextStartPrimaryKey != null); // 持续翻页,直到没有下一页
范围查询-连续翻页迭代器
为了方便用户遍历全量数据,我们提供了迭代器接口。用户无需关心请求构建、结果判断等逻辑,只需将异步Client与请求体作为参数构建GetRangeIterator即可。
注意事项:
- 迭代器使用的异步Client,可以重现构建也可以通过同步Client转换;
- 迭代器内部自动发起请求,消费完一批后自动发起下一批请求;
AsyncClient asyncClient = (AsyncClient) syncClient.asAsyncClient();
GetRangeIterator getRangeIterator = new GetRangeIterator(asyncClient, getRangeRequest);
while (getRangeIterator.hasNext()) {
Row row = getRangeIterator.next();
}
Tablestore 入门手册
本文结合Java SDK的接口调用代码,介绍了Tablestore在数据管理方面的基本功能与使用方式。代码已开源在Tablestore-Examples项目中,用户可以直接运行使用。基于样例代码与文章,新用户能更简单、更快速地上手Tablestore,欢迎新、老用户使用与建议。
通过对基础使用功能的持续输出,我们将整理出一套完整的入门手册(含可执行样例),敬请期待。
专家服务
如有疑问或者需要更好的在线支持,欢迎加入钉钉群:“表格存储公开交流群”。群内提供免费的在线专家服务,欢迎扫码加入,群号:23307953