局部事务介绍
表格存储提供的局部事务也可以称为是分区键事务:可以指定某个分区键下的操作是原子的,要么全部成功要么全部失败,并且所提供的隔离级别为串行化。也就是说表格存储的局部事务可以防止以下问题
- 脏读:事务之外的操作读到了尚未提交的写入
- 脏写:事务之外的写入覆盖了本事务尚未提交的写入
- 不可重复读:在事务中的多次对同一行数据的读操作读到了不同的值
- 更新丢失:本事务提交已提交之后被其他并行执行的事务所覆盖(与脏写不同,脏写是两个事务都没有提交时发生的)
局部事务基本使用流程如下图所示
Tablestore的局部事务在启动事务时或首先获取到分区键下的锁,所有后续对该分区键的写操作与启动事务操作都会被阻塞至原事务提交或者超时以保证操作的隔离性,有如下的一些特性:
- 在事务提交或者中止之前,不能有另外一个事务在同分区键下启动事务
- 在事务提交或中止之前,非本事务的写入将被阻塞或超时失败
- 在事务提交和中止之前,非本事务的读取操作无法读取到事务中未提交的写入,而本事务的读操作可以获取到本事务中的写入
局部事务的使用
事务的启动、提交与中止
启动事务
// 局部事务需要指定一个分区键(第一列主键)
PrimaryKey transactionPK = new PrimaryKey(Collections.singletonList(
new PrimaryKeyColumn(PK_USER_ID, PrimaryKeyValue.fromString(userId))
));
StartLocalTransactionRequest startTransactionRequest = new StartLocalTransactionRequest(TABLE_NAME, transactionPK);
StartLocalTransactionResponse startTransactionResponse = syncClient.startLocalTransaction(startTransactionRequest);
// 启动事务成功后会返回一个transactionId,任意实现了TxnRequest抽象类的请求都可以通过setTransactionId方法指定事务
final String transactionId = startTransactionResponse.getTransactionID();
提交事务
// 提交事务,所有与该transactionId相关的写入操作将被永久地写入到Tablestore中
syncClient.commitTransaction(new CommitTransactionRequest(transactionId));
中止事务
// 中止事务,所有与改transactionId相关的写入操作将被回滚
syncClient.abortTransaction(new AbortTransactionRequest(transactionId));
基本用法
分别用同一个transactionId两次写入数据并提交,要么全部失败,要么全部失败
写入第一行数据
PutRowRequest typeAPutRequest = new PutRowRequest(new RowPutChange(
TABLE_NAME,
new PrimaryKey(Arrays.asList(
new PrimaryKeyColumn(PK_USER_ID, PrimaryKeyValue.fromString(userId)),
new PrimaryKeyColumn(PK_TYPE, PrimaryKeyValue.fromString(typeA))
))
).addColumn(COLUMN_CONTENT, ColumnValue.fromString("content_a")));
// 设置事务ID,由启动事务返回
typeAPutRequest.setTransactionId(transactionId);
syncClient.putRow(typeAPutRequest);
通过事务ID读取上面写入的数据
GetRowRequest getRowRequest = new GetRowRequest();
SingleRowQueryCriteria singleRowQueryCriteria = new SingleRowQueryCriteria(
TABLE_NAME,
new PrimaryKey(Arrays.asList(
new PrimaryKeyColumn(PK_USER_ID, PrimaryKeyValue.fromString(userId)),
new PrimaryKeyColumn(PK_TYPE, PrimaryKeyValue.fromString(typeA))
))
);
singleRowQueryCriteria.setMaxVersions(1);
getRowRequest.setRowQueryCriteria(singleRowQueryCriteria);
// 这边需要设置事务ID
getRowRequest.setTransactionId(transactionId);
GetRowResponse getRowResponse = syncClient.getRow(getRowRequest);
// 可以正常获取到数据
Assert.assertNotNull(getRowResponse.getRow());
不通过事务ID读取
GetRowRequest getRowRequest = new GetRowRequest();
SingleRowQueryCriteria singleRowQueryCriteria = new SingleRowQueryCriteria(
TABLE_NAME,
new PrimaryKey(Arrays.asList(
new PrimaryKeyColumn(PK_USER_ID, PrimaryKeyValue.fromString(userId)),
new PrimaryKeyColumn(PK_TYPE, PrimaryKeyValue.fromString(typeA))
))
);
singleRowQueryCriteria.setMaxVersions(1);
getRowRequest.setRowQueryCriteria(singleRowQueryCriteria);
GetRowResponse getRowResponse = syncClient.getRow(getRowRequest);
// 不设置事务ID获取,由于Tablestore提供的隔离级别为串行化,这边不能读取到未提交的写入
Assert.assertNull(getRowResponse.getRow());
写入第二行数据
PutRowRequest typeBPutRequest = new PutRowRequest(new RowPutChange(
TABLE_NAME,
new PrimaryKey(Arrays.asList(
new PrimaryKeyColumn(PK_USER_ID, PrimaryKeyValue.fromString(userId)),
new PrimaryKeyColumn(PK_TYPE, PrimaryKeyValue.fromString(typeB))
))
).addColumn(COLUMN_CONTENT, ColumnValue.fromString("content_b")));
typeBPutRequest.setTransactionId(transactionId);
syncClient.putRow(typeBPutRequest);
提交
// 提交事务,提交事务后,其他读操作就可以获取到之前的写入
syncClient.commitTransaction(new CommitTransactionRequest(transactionId));
中止事务
// 中止事务,所有与改transactionId相关的写入操作将被回滚
syncClient.abortTransaction(new AbortTransactionRequest(transactionId));
启动事务后使用后其他操作非本事务的操作尝试写入
本示例展示的是一个在事务执行期间有另外一个同分区键的写入时的场景,由于在分区键下启动事务会直接锁定分区键下所有的写操作,在事务执行期间任何向同分区下的写入操作将被阻塞至事务提交或超时。下面的流程图展示了两个线程在通过事务写入的一些情景:
写入第一行
// put row A with transactionID
PutRowRequest typeAPutRequest = new PutRowRequest(new RowPutChange(
TABLE_NAME,
new PrimaryKey(Arrays.asList(
new PrimaryKeyColumn(PK_USER_ID, PrimaryKeyValue.fromString(userId)),
new PrimaryKeyColumn(PK_TYPE, PrimaryKeyValue.fromString(typeA))
))
).addColumn(COLUMN_CONTENT, ColumnValue.fromString("content_a")));
// set transactionId in startTransactionResponse
typeAPutRequest.setTransactionId(transactionId);
syncClient.putRow(typeAPutRequest);
写入第二行时不带事务ID,模拟非本事务的操作尝试写入,写入失败,返回的错误码为OTSRowOperationConflict
// put row B without transactionID
PutRowRequest typeBPutRequest = new PutRowRequest(new RowPutChange(
TABLE_NAME,
new PrimaryKey(Arrays.asList(
new PrimaryKeyColumn(PK_USER_ID, PrimaryKeyValue.fromString(userId)),
new PrimaryKeyColumn(PK_TYPE, PrimaryKeyValue.fromString(typeB))
))
).addColumn(COLUMN_CONTENT, ColumnValue.fromString("content_b")));
// put without transactionId, due to PK_USE_ID is locked by another transaction, this action will fail
try {
syncClient.putRow(typeBPutRequest);
Assert.fail();
} catch (TableStoreException e) {
// ok
Assert.assertEquals("OTSRowOperationConflict", e.getErrorCode());
}
Batch写入
使用batch写入第一行和第二行
BatchWriteRowRequest batchWriteRowRequest = new BatchWriteRowRequest();
batchWriteRowRequest.addRowChange(new RowPutChange(TABLE_NAME, new PrimaryKey(Arrays.asList(
new PrimaryKeyColumn(PK_USER_ID, PrimaryKeyValue.fromString(userId)),
new PrimaryKeyColumn(PK_TYPE, PrimaryKeyValue.fromString(typeA))
))).addColumn(COLUMN_CONTENT, ColumnValue.fromString("content_a")));
batchWriteRowRequest.addRowChange(new RowPutChange(TABLE_NAME, new PrimaryKey(Arrays.asList(
new PrimaryKeyColumn(PK_USER_ID, PrimaryKeyValue.fromString(userId)),
new PrimaryKeyColumn(PK_TYPE, PrimaryKeyValue.fromString(typeB))
))).addColumn(COLUMN_CONTENT, ColumnValue.fromString("content_b")));
batchWriteRowRequest.setTransactionId(transactionId);
syncClient.batchWriteRow(batchWriteRowRequest);
使用batch写入第三行和第四行
BatchWriteRowRequest batchWriteRowRequest = new BatchWriteRowRequest();
batchWriteRowRequest.addRowChange(new RowPutChange(TABLE_NAME, new PrimaryKey(Arrays.asList(
new PrimaryKeyColumn(PK_USER_ID, PrimaryKeyValue.fromString(userId)),
new PrimaryKeyColumn(PK_TYPE, PrimaryKeyValue.fromString(typeC))
))).addColumn(COLUMN_CONTENT, ColumnValue.fromString("content_c")));
batchWriteRowRequest.addRowChange(new RowPutChange(TABLE_NAME, new PrimaryKey(Arrays.asList(
new PrimaryKeyColumn(PK_USER_ID, PrimaryKeyValue.fromString(userId)),
new PrimaryKeyColumn(PK_TYPE, PrimaryKeyValue.fromString(typeD))
))).addColumn(COLUMN_CONTENT, ColumnValue.fromString("content_d")));
batchWriteRowRequest.setTransactionId(transactionId);
syncClient.batchWriteRow(batchWriteRowRequest);