表格存储的 SDK 提供了 BatchGetRow、BatchWriteRow 和 GetRange 等多行操作的接口。
批量读(BatchGetRow)
批量读取一个或多个表中的若干行数据。
BatchGetRow 操作可视为多个 GetRow 操作的集合,各个操作独立执行,独立返回结果,独立计算服务能力单元。
与执行大量的 GetRow 操作相比,使用 BatchGetRow 操作可以有效减少请求的响应时间,提高数据的读取速率。
接口"""
说明:批量获取多行数据。
request = BatchGetRowRequest()
request.add(TableInBatchGetRowItem(myTable0, primary_keys, column_to_get=None, column_filter=None))
request.add(TableInBatchGetRowItem(myTable1, primary_keys, column_to_get=None, column_filter=None))
request.add(TableInBatchGetRowItem(myTable2, primary_keys, column_to_get=None, column_filter=None))
request.add(TableInBatchGetRowItem(myTable3, primary_keys, column_to_get=None, column_filter=None))
response = client.batch_get_row(request)
``response``为返回的结果,类型为tablestore.metadata.BatchGetRowResponse
"""
def batch_get_row(self, request):
示例
批量一次读 3 行。
# 需要返回的列
columns_to_get = ['name', 'mobile', 'address', 'age']
# 读3行
rows_to_get = []
for i in range(0, 3):
primary_key = [('gid',i), ('uid',i+1)]
rows_to_get.append(primary_key)
# 过滤条件:name等于Johb,且 address等于China。
cond = CompositeColumnCondition(LogicalOperator.AND)
cond.add_sub_condition(SingleColumnCondition("name", "John", ComparatorType.EQUAL))
cond.add_sub_condition(SingleColumnCondition("address", 'China', ComparatorType.EQUAL))
# 构造批量读请求。
request = BatchGetRowRequest()
# 增加表:table_name中需要读取的行,最后一个参数1表示最读取最新的一个版本。
request.add(TableInBatchGetRowItem(table_name, rows_to_get, columns_to_get, cond, 1))
# 增加表:notExistTable中需要读取的行。
request.add(TableInBatchGetRowItem('notExistTable', rows_to_get, columns_to_get, cond, 1))
try:
result = client.batch_get_row(request)
print ('Result status: %s'%(result.is_all_succeed()))
table_result_0 = result.get_result_by_table(table_name)
table_result_1 = result.get_result_by_table('notExistTable')
print ('Check first table\'s result:')
for item in table_result_0:
if item.is_ok:
print ('Read succeed, PrimaryKey: %s, Attributes: %s' % (item.row.primary_key, item.row.attribute_columns))
else:
print ('Read failed, error code: %s, error message: %s' % (item.error_code, item.error_message))
print ('Check second table\'s result:')
for item in table_result_1:
if item.is_ok:
print ('Read succeed, PrimaryKey: %s, Attributes: %s' % (item.row.primary_key, item.row.attribute_columns))
else:
print ('Read failed, error code: %s, error message: %s' % (item.error_code, item.error_message))
# 客户端异常,一般为参数错误或者网络异常。
except OTSClientError as e:
print "get row failed, http_status:%d, error_message:%s" % (e.get_http_status(), e.get_error_message())
# 服务端异常,一般为参数错误或者流控错误。
except OTSServiceError as e:
print "get row failed, http_status:%d, error_code:%s, error_message:%s, request_id:%s" % (e.get_http_status(), e.get_error_code(), e.get_error_message(), e.get_request_id())
提示:
详细代码:
BatchGetRow@GitHub。
批量写(BatchWriteRow)
批量插入、修改或删除一个或多个表中的若干行数据。
BatchWriteRow 操作可视为多个 PutRow、UpdateRow、DeleteRow 操作的集合,各个操作独立执行,独立返回结果,独立计算服务能力单元。
接口"""
说明:批量修改多行数据。
request = MiltiTableInBatchWriteRowItem()
request.add(TableInBatchWriteRowItem(table0, row_items))
request.add(TableInBatchWriteRowItem(table1, row_items))
response = client.batch_write_row(request)
``response``为返回的结果,类型为tablestore.metadata.BatchWriteRowResponse
"""
def batch_write_row(self, request):
示例
批量写数据。
put_row_items = []
## 增加PutRow的行。
for i in range(0, 10):
primary_key = [('gid',i), ('uid',i+1)]
attribute_columns = [('name','somebody'+str(i)), ('address','somewhere'+str(i)), ('age',i)]
row = Row(primary_key, attribute_columns)
condition = Condition(RowExistenceExpectation.IGNORE)
item = PutRowItem(row, condition)
put_row_items.append(item)
## 增加UpdateRow的行
for i in range(10, 20):
primary_key = [('gid',i), ('uid',i+1)]
attribute_columns = {'put': [('name','somebody'+str(i)), ('address','somewhere'+str(i)), ('age',i)]}
row = Row(primary_key, attribute_columns)
condition = Condition(RowExistenceExpectation.IGNORE, SingleColumnCondition("age", i, ComparatorType.EQUAL))
item = UpdateRowItem(row, condition)
put_row_items.append(item)
## 增加DeleteRow的行
delete_row_items = []
for i in range(10, 20):
primary_key = [('gid',i), ('uid',i+1)]
row = Row(primary_key)
condition = Condition(RowExistenceExpectation.IGNORE)
item = DeleteRowItem(row, condition)
delete_row_items.append(item)
# 构造批量写的请求。
request = BatchWriteRowRequest()
request.add(TableInBatchWriteRowItem(table_name, put_row_items))
request.add(TableInBatchWriteRowItem('notExistTable', delete_row_items))
# 调用batch_write_row 方法执行批量写,如果请求参数等有错误会抛异常;如果部分行失败,则不会抛异常,但是内部的Item会失败。
try:
result = client.batch_write_row(request)
print ('Result status: %s'%(result.is_all_succeed()))
## 检查Put行的结果。
print ('check first table\'s put results:')
succ, fail = result.get_put()
for item in succ:
print ('Put succeed, consume %s write cu.' % item.consumed.write)
for item in fail:
print ('Put failed, error code: %s, error message: %s' % (item.error_code, item.error_message))
## 检查Update的行结果
print ('check first table\'s update results:')
succ, fail = result.get_update()
for item in succ:
print ('Update succeed, consume %s write cu.' % item.consumed.write)
for item in fail:
print ('Update failed, error code: %s, error message: %s' % (item.error_code, item.error_message))
## 检查Delete行的结果。
print ('check second table\'s delete results:')
succ, fail = result.get_delete()
for item in succ:
print ('Delete succeed, consume %s write cu.' % item.consumed.write)
for item in fail:
print ('Delete failed, error code: %s, error message: %s' % (item.error_code, item.error_message))
# 客户端异常,一般为参数错误或者网络异常。
except OTSClientError as e:
print "get row failed, http_status:%d, error_message:%s" % (e.get_http_status(), e.get_error_message())
# 服务端异常,一般为参数错误或者流控错误。
except OTSServiceError as e:
print "get row failed, http_status:%d, error_code:%s, error_message:%s, request_id:%s" % (e.get_http_status(), e.get_error_code(), e.get_error_message(), e.get_request_id())
提示:
详细代码:
BatchWriteRow@GitHub。
范围读(GetRange)
读取指定主键范围内的数据。
接口"""
说明:根据范围条件获取多行数据。
``table_name``是对应的表名。
``direction``表示范围的方向,字符串格式,取值包括'FORWARD'和'BACKWARD'。
``inclusive_start_primary_key``表示范围的起始主键(在范围内)。
``exclusive_end_primary_key``表示范围的结束主键(不在范围内)。
``columns_to_get``是可选参数,表示要获取的列的名称列表,类型为list;如果不填,表示获取所有列。
``limit``是可选参数,表示最多读取多少行;如果不填,则没有限制。
``column_filter``是可选参数,表示读取指定条件的行
``max_version``是可选参数,表示返回的最大版本数目,与time_range必须存在一个。
``time_range``是可选参数,表示返回的版本的范围,于max_version必须存在一个。
``start_column``是可选参数,用于宽行读取,表示本次读取的起始列。
``end_column``是可选参数,用于宽行读取,表示本次读取的结束列。
``token``是可选参数,用于宽行读取,表示本次读取的起始列位置,内容被二进制编码,来源于上次请求的返回结果中。
返回:符合条件的结果列表。
``consumed``表示本次操作消耗的CapacityUnit,是tablestore.metadata.CapacityUnit类的实例。
``next_start_primary_key``表示下次get_range操作的起始点的主健列,类型为dict。
``row_list``表示本次操作返回的行数据列表,格式为:[Row, ...]。
"""
def get_range(self, table_name, direction,
inclusive_start_primary_key,
exclusive_end_primary_key,
columns_to_get=None,
limit=None,
column_filter=None,
max_version=None,
time_range=None,
start_column=None,
end_column=None,
token = None):
示例
范围读取。
# 范围查询的起始主键
inclusive_start_primary_key = [('uid',INF_MIN), ('gid',INF_MIN)]
# 范围查询的结束主键
exclusive_end_primary_key = [('uid',INF_MAX), ('gid',INF_MAX)]
# 查询所有列
columns_to_get = []
# 每次最多返回90,如果总共有100个结果,首次查询时指定limit=90,则第一次最多返回90,最少可能返回0个结果,但是next_start_primary_key不为None。
limit = 90
# 设置过滤器
cond = CompositeColumnCondition(LogicalOperator.AND)
cond.add_sub_condition(SingleColumnCondition("address", 'China', ComparatorType.EQUAL))
cond.add_sub_condition(SingleColumnCondition("age", 50, ComparatorType.LESS_THAN))
try:
# 调用get_range接口
consumed, next_start_primary_key, row_list, next_token = client.get_range(
table_name, Direction.FORWARD,
inclusive_start_primary_key, exclusive_end_primary_key,
columns_to_get,
limit,
column_filter = cond,
max_version = 1
)
all_rows = []
all_rows.extend(row_list)
# 当next_start_primary_key 不为空的时候,说明还有数据,继续循环读取。
while next_start_primary_key is not None:
inclusive_start_primary_key = next_start_primary_key
consumed, next_start_primary_key, row_list, next_token = client.get_range(
table_name, Direction.FORWARD,
inclusive_start_primary_key, exclusive_end_primary_key,
columns_to_get, limit,
column_filter = cond,
max_version = 1
)
all_rows.extend(row_list)
# 打印主键和属性列
for row in all_rows:
print (row.primary_key, row.attribute_columns)
print ('Total rows: ', len(all_rows))
# 客户端异常,一般为参数错误或者网络异常。
except OTSClientError as e:
print "get row failed, http_status:%d, error_message:%s" % (e.get_http_status(), e.get_error_message())
# 服务端异常,一般为参数错误或者流控错误。
except OTSServiceError as e:
print "get row failed, http_status:%d, error_code:%s, error_message:%s, request_id:%s" % (e.get_http_status(), e.get_error_code(), e.get_error_message(), e.get_request_id())
提示:
详细代码:
GetRange@GitHub。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。