表格存储的 SDK 提供了 BatchGetRow、BatchWriteRow、GetRange 和 GetByIterator 等多行操作的接口。
批量读(BatchGetRow)
批量读取一个或多个表中的若干行数据。
BatchGetRow 操作可视为多个 GetRow 操作的集合,各个操作独立执行,独立返回结果,独立计算服务能力单元。
与执行大量的 GetRow 操作相比,使用 BatchGetRow 操作可以有效减少请求的响应时间,提高数据的读取速率。
接口/**
* 从多张表(Table)中获取多行数据。
*
* @param batchGetRowRequest 执行BatchGetRow操作所需参数的封装。
* @return BatchGetRow操作的响应内容。
* @throws OTSException OTS访问返回错误消息
* @throws ClientException 请求的返回结果无效, 或由于网络原因请求失败, 或访问超时。
*/
public BatchGetRowResult batchGetRow(BatchGetRowRequest batchGetRowRequest)
throws OTSException, ClientException;
示例
批量一次读 10 行。
try
{
// 构造BatchGetRow对象和参数
BatchGetRowRequest request = new BatchGetRowRequest();
MultiRowQueryCriteria tableRows = new MultiRowQueryCriteria(tableName);
for (int j = 0; j < 10; ++j) {
RowPrimaryKey primaryKeys = new RowPrimaryKey();
primaryKeys.addPrimaryKeyColumn("pk0", PrimaryKeyValue.fromLong(j));
primaryKeys.addPrimaryKeyColumn("pk1", PrimaryKeyValue.fromLong(j)));
tableRows.addRow(primaryKeys);
}
// 只返回col0和col1列
tableRows.addColumnsToGet(new String[] {"col0", "col1"});
request.addMultiRowQueryCriteria(tableRows);
// 调用batchGetRow接口
BatchGetRowResult result = client.batchGetRow(request);
// 统计是否有某些行执行失败,如果有,则重试
BatchGetRowRequest failedOperations = null;
List<Row> succeedRows = new ArrayList<Row>();
int retryCount = 0;
do {
failedOperations = new BatchGetRowRequest();
Map<String, List<BatchGetRowResult.RowStatus>> status = result.getTableToRowsStatus();
for (Entry<String, List<BatchGetRowResult.RowStatus>> entry : status.entrySet()) {
tableName = entry.getKey();
tableRows = new MultiRowQueryCriteria(tableName);
List<BatchGetRowResult.RowStatus> statuses = entry.getValue();
for (int index = 0; index < statuses.size(); index++) {
BatchGetRowResult.RowStatus rowStatus = statuses.get(index);
if (!rowStatus.isSucceed()) {
// 操作失败, 需要放到重试列表中再次重试
// 需要重试的操作可以从request中获取参数
tableRows.addRow(request.getPrimaryKey(tableName, index));
} else {
succeedRows.add(rowStatus.getRow());
System.out.println("Read row:");
System.out.println("pk0:" + rowStatus.getRow().getColumns().get("pk0"));
System.out.println("pk1:" + rowStatus.getRow().getColumns().get("pk1"));
System.out.println("col0:" + rowStatus.getRow().getColumns().get("col0"));
}
}
if (!tableRows.getRowKeys().isEmpty()) {
tableRows.addColumnsToGet(new String[] {"col0", "col1"});
failedOperations.addMultiRowQueryCriteria(tableRows);
}
}
if (failedOperations.isEmpty() || ++retryCount > 3) {
// 如果所有操作都成功了或者重试次数达到上线, 则不再需要重试。
break;
}
// 如果有需要重试的操作, 则稍微等待一会后再次重试, 否则继续出错的概率很高。
try {
// 100ms后继续重试
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
request = failedOperations;
result = client.batchGetRow(request);
} while (true);
System.out.println(String.format("Get row finish, succeeded count:%d", succeedRows.size()));
} catch (ClientException ex) {
System.out.println("Batch get row failed.");
} catch (OTSException ex) {
System.out.println("Batch get row failed.");
}
提示:
/**
* 对多张表(Table)中的多行数据进行增加、删除或者更改操作。
*
* @param batchWriteRowRequest 执行BatchWriteRow操作所需参数的封装。
* @return BatchWriteRow操作的响应内容。
* @throws OTSException OTS访问返回错误消息
* @throws ClientException 请求的返回结果无效, 或由于网络原因请求失败, 或访问超时。
*/
public BatchWriteRowResult batchWriteRow(
BatchWriteRowRequest batchWriteRowRequest) throws OTSException,
ClientException;
try
{
BatchWriteRowRequest request = new BatchWriteRowRequest();
// 向3张表中插入数据, 每张表插入10行。
for (int i = 0; i < 3; ++i) {
for (int j = 0; j < 10; ++j) {
RowPutChange rowPutChange = new RowPutChange("SampleTable" + i);
RowPrimaryKey primaryKey = new RowPrimaryKey();
primaryKey.addPrimaryKeyColumn("pk0", PrimaryKeyValue.fromLong(i));
primaryKey.addPrimaryKeyColumn("pk1", PrimaryKeyValue.fromLong(j));
rowPutChange.setPrimaryKey(primaryKey);
rowPutChange.addAttributeColumn("col0", ColumnValue.fromLong(4));
rowPutChange.addAttributeColumn("col2", ColumnValue.fromString("成杭京"));
request.addRowPutChange(rowPutChange);
}
}
// batchWriteRow接口会返回一个结果集, 结果集中包含的结果个数与插入的行数相同。 结果集中的结果不一定都是成功,用户需要自己对不成功的操作进行重试。
BatchWriteRowResult result = client.batchWriteRow(request);
// 重试逻辑类似于上面的batchGetRow示例中的逻辑,这里省略
} catch (ClientException ex) {
System.out.println("Batch write row failed.");
} catch (OTSException ex) {
System.out.println("Batch write row failed.");
}
/**
* 从表(Table)中查询一个范围内的多行数据。
*
* @param getRangeRequest 执行GetRange操作所需参数的封装。
* @return GetRange操作的响应内容。
* @throws OTSException OTS访问返回错误消息
* @throws ClientException 请求的返回结果无效, 或由于网络原因请求失败, 或访问超时。
*/
public GetRangeResult getRange(GetRangeRequest getRangeRequest)
throws OTSException, ClientException;
try
{
// 这里查找uid从1-4(左闭右开)的数据
RangeRowQueryCriteria criteria = new RangeRowQueryCriteria("SampleTable");
RowPrimaryKey inclusiveStartKey = new RowPrimaryKey();
inclusiveStartKey.addPrimaryKeyColumn("pk0", PrimaryKeyValue.fromLong(1));
// 范围的边界需要提供完整的PK,若查询的范围不涉及到某一列值的范围,则需要将该列设置为无穷大或者无穷小
inclusiveStartKey.addPrimaryKeyColumn("pk1", PrimaryKeyValue.INF_MIN);
RowPrimaryKey exclusiveEndKey = new RowPrimaryKey();
exclusiveEndKey.addPrimaryKeyColumn("pk0", PrimaryKeyValue.fromLong(4));
// 范围的边界需要提供完整的PK,若查询的范围不涉及到某一列值的范围,则需要将该列设置为无穷大或者无穷小
exclusiveEndKey.addPrimaryKeyColumn("pk1", PrimaryKeyValue.INF_MAX);
criteria.setInclusiveStartPrimaryKey(inclusiveStartKey);
criteria.setExclusiveEndPrimaryKey(exclusiveEndKey);
// 前向查询
criteria.setDirection(Direction.FORWARD);
// 最多返回10行
criteria.setLimit(10);
// 构造RangeRequest对象
GetRangeRequest request = new GetRangeRequest();
request.setRangeRowQueryCriteria(criteria);
// 调用getRange接口
GetRangeResult result = client.getRange(request);
// 输出数据
List<Row> rows = result.getRows();
for (Row row : rows) {
System.out.println("col0:" + row.getColumns().get("col0"));
System.out.println("col1:" + row.getColumns().get("col1"));
System.out.println("col2:" + row.getColumns().get("col2"));
}
int consumedReadCU = result.getConsumedCapacity().getCapacityUnit().getReadCapacityUnit();
System.out.println("consumed capacityunit:" + consumedReadCU);
System.out.println("Get range succeeded");
} catch (ClientException ex) {
System.out.println("Get range failed.");
} catch (OTSException ex) {
System.out.println("Get range failed.");
}
/**
* 从表(Table)中迭代读取满足条件的数据
* @param rangeIteratorParameter
* 迭代读时的参数,包括开始,结束位置
* @return 迭代器
* @throws OTSException
* OTS访问返回错误消息
* @throws ClientException
* 请求的返回结果无效, 或由于网络原因请求失败, 或访问超时。
*/
public Iterator<Row> createRangeIterator(
RangeIteratorParameter rangeIteratorParameter)
throws OTSException, ClientException;
// 构造迭代读取的起始位置
RowPrimaryKey inclusiveStartKey = new RowPrimaryKey();
inclusiveStartKey.addPrimaryKeyColumn(COLUMN_GID_NAME,
PrimaryKeyValue.fromLong(1));
inclusiveStartKey.addPrimaryKeyColumn(COLUMN_UID_NAME,
PrimaryKeyValue.INF_MIN);
// 构造迭代读取的结束位置
RowPrimaryKey exclusiveEndKey = new RowPrimaryKey();
exclusiveEndKey.addPrimaryKeyColumn(COLUMN_GID_NAME,
PrimaryKeyValue.fromLong(4));
exclusiveEndKey.addPrimaryKeyColumn(COLUMN_UID_NAME,
PrimaryKeyValue.INF_MAX);
// 构造迭代读取的参数对象,并设置起始和结束的位置,包括起始位置的行,不包括结束位置的行
RangeIteratorParameter rangeIteratorParameter = new RangeIteratorParameter(tableName);
rangeIteratorParameter.setInclusiveStartPrimaryKey(inclusiveStartKey);
rangeIteratorParameter.setExclusiveEndPrimaryKey(exclusiveEndKey);
// 创建出迭代器,并迭代获取每一行数据
try {
Iterator<Row> iterator = client.createRangeIterator(rangeIteratorParameter);
while (iterator.hasNext()) {
Row row = iterator.next();
System.out.println("name:" + row.getColumns().get(COLUMN_NAME_NAME));
System.out.println("address" + row.getColumns().get(COLUMN_ADDRESS_NAME));
}
System.out.println("Iterator get succeeded.");
} catch (ClientException ex) {
System.out.println("Iterator get failed.");
} catch (OTSException ex) {
System.out.println("Iterator get failed.");
}
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。