Tablestore入门手册--局部事务

本文涉及的产品
对象存储 OSS,20GB 3个月
对象存储 OSS,恶意文件检测 1000次 1年
对象存储 OSS,内容安全 1000次 1年
简介:

局部事务介绍

表格存储提供的局部事务也可以称为是分区键事务:可以指定某个分区键下的操作是原子的,要么全部成功要么全部失败,并且所提供的隔离级别为串行化。也就是说表格存储的局部事务可以防止以下问题

  1. 脏读:事务之外的操作读到了尚未提交的写入
  2. 脏写:事务之外的写入覆盖了本事务尚未提交的写入
  3. 不可重复读:在事务中的多次对同一行数据的读操作读到了不同的值
  4. 更新丢失:本事务提交已提交之后被其他并行执行的事务所覆盖(与脏写不同,脏写是两个事务都没有提交时发生的)

局部事务基本使用流程如下图所示
image

Tablestore的局部事务在启动事务时或首先获取到分区键下的锁,所有后续对该分区键的写操作与启动事务操作都会被阻塞至原事务提交或者超时以保证操作的隔离性,有如下的一些特性:

  1. 在事务提交或者中止之前,不能有另外一个事务在同分区键下启动事务
  2. 在事务提交或中止之前,非本事务的写入将被阻塞或超时失败
  3. 在事务提交和中止之前,非本事务的读取操作无法读取到事务中未提交的写入,而本事务的读操作可以获取到本事务中的写入

局部事务的使用

事务的启动、提交与中止

启动事务

// 局部事务需要指定一个分区键(第一列主键)
PrimaryKey transactionPK = new PrimaryKey(Collections.singletonList(
    new PrimaryKeyColumn(PK_USER_ID, PrimaryKeyValue.fromString(userId))
));
StartLocalTransactionRequest startTransactionRequest = new StartLocalTransactionRequest(TABLE_NAME, transactionPK);
StartLocalTransactionResponse startTransactionResponse = syncClient.startLocalTransaction(startTransactionRequest);
// 启动事务成功后会返回一个transactionId,任意实现了TxnRequest抽象类的请求都可以通过setTransactionId方法指定事务
final String transactionId = startTransactionResponse.getTransactionID();

提交事务

// 提交事务,所有与该transactionId相关的写入操作将被永久地写入到Tablestore中
syncClient.commitTransaction(new CommitTransactionRequest(transactionId));

中止事务

// 中止事务,所有与改transactionId相关的写入操作将被回滚
syncClient.abortTransaction(new AbortTransactionRequest(transactionId));

基本用法

分别用同一个transactionId两次写入数据并提交,要么全部失败,要么全部失败
写入第一行数据

PutRowRequest typeAPutRequest = new PutRowRequest(new RowPutChange(
    TABLE_NAME,
    new PrimaryKey(Arrays.asList(
        new PrimaryKeyColumn(PK_USER_ID, PrimaryKeyValue.fromString(userId)),
        new PrimaryKeyColumn(PK_TYPE, PrimaryKeyValue.fromString(typeA))
    ))
).addColumn(COLUMN_CONTENT, ColumnValue.fromString("content_a")));
// 设置事务ID,由启动事务返回
typeAPutRequest.setTransactionId(transactionId);
syncClient.putRow(typeAPutRequest);

通过事务ID读取上面写入的数据

GetRowRequest getRowRequest = new GetRowRequest();
SingleRowQueryCriteria singleRowQueryCriteria = new SingleRowQueryCriteria(
    TABLE_NAME,
    new PrimaryKey(Arrays.asList(
        new PrimaryKeyColumn(PK_USER_ID, PrimaryKeyValue.fromString(userId)),
        new PrimaryKeyColumn(PK_TYPE, PrimaryKeyValue.fromString(typeA))
    ))
);
singleRowQueryCriteria.setMaxVersions(1);
getRowRequest.setRowQueryCriteria(singleRowQueryCriteria);
// 这边需要设置事务ID
getRowRequest.setTransactionId(transactionId);
GetRowResponse getRowResponse = syncClient.getRow(getRowRequest);
// 可以正常获取到数据
Assert.assertNotNull(getRowResponse.getRow());

不通过事务ID读取

GetRowRequest getRowRequest = new GetRowRequest();
SingleRowQueryCriteria singleRowQueryCriteria = new SingleRowQueryCriteria(
    TABLE_NAME,
    new PrimaryKey(Arrays.asList(
        new PrimaryKeyColumn(PK_USER_ID, PrimaryKeyValue.fromString(userId)),
        new PrimaryKeyColumn(PK_TYPE, PrimaryKeyValue.fromString(typeA))
    ))
);
singleRowQueryCriteria.setMaxVersions(1);
getRowRequest.setRowQueryCriteria(singleRowQueryCriteria);
GetRowResponse getRowResponse = syncClient.getRow(getRowRequest);
// 不设置事务ID获取,由于Tablestore提供的隔离级别为串行化,这边不能读取到未提交的写入
Assert.assertNull(getRowResponse.getRow());

写入第二行数据

PutRowRequest typeBPutRequest = new PutRowRequest(new RowPutChange(
    TABLE_NAME,
    new PrimaryKey(Arrays.asList(
        new PrimaryKeyColumn(PK_USER_ID, PrimaryKeyValue.fromString(userId)),
        new PrimaryKeyColumn(PK_TYPE, PrimaryKeyValue.fromString(typeB))
    ))
).addColumn(COLUMN_CONTENT, ColumnValue.fromString("content_b")));
typeBPutRequest.setTransactionId(transactionId);
syncClient.putRow(typeBPutRequest);

提交

// 提交事务,提交事务后,其他读操作就可以获取到之前的写入
syncClient.commitTransaction(new CommitTransactionRequest(transactionId));

中止事务

// 中止事务,所有与改transactionId相关的写入操作将被回滚
syncClient.abortTransaction(new AbortTransactionRequest(transactionId));

启动事务后使用后其他操作非本事务的操作尝试写入

本示例展示的是一个在事务执行期间有另外一个同分区键的写入时的场景,由于在分区键下启动事务会直接锁定分区键下所有的写操作,在事务执行期间任何向同分区下的写入操作将被阻塞至事务提交或超时。下面的流程图展示了两个线程在通过事务写入的一些情景:
image

写入第一行

// put row A with transactionID
PutRowRequest typeAPutRequest = new PutRowRequest(new RowPutChange(
    TABLE_NAME,
    new PrimaryKey(Arrays.asList(
        new PrimaryKeyColumn(PK_USER_ID, PrimaryKeyValue.fromString(userId)),
        new PrimaryKeyColumn(PK_TYPE, PrimaryKeyValue.fromString(typeA))
    ))
).addColumn(COLUMN_CONTENT, ColumnValue.fromString("content_a")));
// set transactionId in startTransactionResponse
typeAPutRequest.setTransactionId(transactionId);
syncClient.putRow(typeAPutRequest);

写入第二行时不带事务ID,模拟非本事务的操作尝试写入,写入失败,返回的错误码为OTSRowOperationConflict

// put row B without transactionID
PutRowRequest typeBPutRequest = new PutRowRequest(new RowPutChange(
    TABLE_NAME,
    new PrimaryKey(Arrays.asList(
        new PrimaryKeyColumn(PK_USER_ID, PrimaryKeyValue.fromString(userId)),
        new PrimaryKeyColumn(PK_TYPE, PrimaryKeyValue.fromString(typeB))
    ))
).addColumn(COLUMN_CONTENT, ColumnValue.fromString("content_b")));
// put without transactionId, due to PK_USE_ID is locked by another transaction, this action will fail
try {
    syncClient.putRow(typeBPutRequest);
    Assert.fail();
} catch (TableStoreException e) {
    // ok
    Assert.assertEquals("OTSRowOperationConflict", e.getErrorCode());
}

Batch写入

使用batch写入第一行和第二行

BatchWriteRowRequest batchWriteRowRequest = new BatchWriteRowRequest();
batchWriteRowRequest.addRowChange(new RowPutChange(TABLE_NAME, new PrimaryKey(Arrays.asList(
    new PrimaryKeyColumn(PK_USER_ID, PrimaryKeyValue.fromString(userId)),
    new PrimaryKeyColumn(PK_TYPE, PrimaryKeyValue.fromString(typeA))
))).addColumn(COLUMN_CONTENT, ColumnValue.fromString("content_a")));
batchWriteRowRequest.addRowChange(new RowPutChange(TABLE_NAME, new PrimaryKey(Arrays.asList(
    new PrimaryKeyColumn(PK_USER_ID, PrimaryKeyValue.fromString(userId)),
    new PrimaryKeyColumn(PK_TYPE, PrimaryKeyValue.fromString(typeB))
))).addColumn(COLUMN_CONTENT, ColumnValue.fromString("content_b")));
batchWriteRowRequest.setTransactionId(transactionId);
syncClient.batchWriteRow(batchWriteRowRequest);

使用batch写入第三行和第四行

BatchWriteRowRequest batchWriteRowRequest = new BatchWriteRowRequest();
batchWriteRowRequest.addRowChange(new RowPutChange(TABLE_NAME, new PrimaryKey(Arrays.asList(
    new PrimaryKeyColumn(PK_USER_ID, PrimaryKeyValue.fromString(userId)),
    new PrimaryKeyColumn(PK_TYPE, PrimaryKeyValue.fromString(typeC))
))).addColumn(COLUMN_CONTENT, ColumnValue.fromString("content_c")));
batchWriteRowRequest.addRowChange(new RowPutChange(TABLE_NAME, new PrimaryKey(Arrays.asList(
    new PrimaryKeyColumn(PK_USER_ID, PrimaryKeyValue.fromString(userId)),
    new PrimaryKeyColumn(PK_TYPE, PrimaryKeyValue.fromString(typeD))
))).addColumn(COLUMN_CONTENT, ColumnValue.fromString("content_d")));
batchWriteRowRequest.setTransactionId(transactionId);
syncClient.batchWriteRow(batchWriteRowRequest);
相关实践学习
阿里云表格存储使用教程
表格存储(Table Store)是构建在阿里云飞天分布式系统之上的分布式NoSQL数据存储服务,根据99.99%的高可用以及11个9的数据可靠性的标准设计。表格存储通过数据分片和负载均衡技术,实现数据规模与访问并发上的无缝扩展,提供海量结构化数据的存储和实时访问。 产品详情:https://www.aliyun.com/product/ots
目录
相关文章
|
存储 SQL 移动开发
Tablestore入门手册-UpdateRow接口详解| 1月14号云栖号夜读
今天的首篇文章,讲述了:表格存储Tablestore入门手册系列主要介绍表格存储的各个功能接口和适用场景,帮助客户了解和使用表格存储Tablestore。本文对表格存储Tablestore的UpdateRow接口进行介绍,包括其参数、功能示例、使用场景等。
2741 0
|
SQL 存储 NoSQL
Tablestore入门手册-条件更新
功能说明 条件更新功能只有在满足条件时才对表中的数据进行更改,当不满足条件时更新失败。 比如有如下场景,初始化数据,当数据字段A为-1时,将A的值更新为指定的内容。比如更新为12;如果不是-1则更新失败。   条件更新支持两个维度。分别是行的存在性检查和列值的条件判断。 第一个维度是行的条件检查,包括如下三种条件: IGNORE:忽略,不做存在
1162 0
|
存储 NoSQL JavaScript
Tablestore入门手册-数据管理-GetRow
GetRow接口概述     GetRow接口用于读取一行数据,是Tablestore最基础的API之一。官方提供了Java、Go、Node.js、Python、PHP、C#、C++ SDK。     本文以Java代码为例,对GetRow接口进行详细说明。 基本使用说明 参数说明 参数名称 是否必填 参数说
982 0
|
存储 NoSQL JavaScript
Tablestore入门手册--表(Table)管理
表管理接口概述 API 描述 createTable 创建表 deleteTable 删除表 listTable 列出实例下的所有表 updateTable 更新表(在表被创建之后,动态的更改表的配置或预留吞吐量)
1835 0
|
存储 NoSQL Java
Tablestore入门手册--数据表meta信息介绍
若有幸这篇文章也许会是您准备上手实践Tablestore的第一篇帮助文档,本文将通过介绍Tablestore的基础模型使大家理解Tablestore的数据表结构。主要讲如何设置数据表的Meta信息,建一张最适合自己业务的表,祝阅有所得,漫步云端。
1348 0
Tablestore入门手册--数据表meta信息介绍
|
6月前
|
存储 索引
表格存储根据多元索引查询条件直接更新数据
表格存储是否可以根据多元索引查询条件直接更新数据?
62 3