集合与事务
在我们努力学习这些示例的过程中,很容易就会忘记我们所要处理的值都必须是不可变的。只有实体才是可变的,而状态值则是不可变的。虽然STM已经为我们减轻了很多负担,但如果想要在维护不可变性的同时还要兼顾性能的话,对我们来说也将是一个非常严峻的挑战。
为了保证不可变性,我们采取的第一个步骤是将单纯用来保存数据的类(value classes)及其内部所有成员字段都置为final(在Scala中是val)。然后,我们需要传递地保证我们自己定义的类里面的字段所使用的类也都是不可变的。可以说,将字段和类的定义置为final这一步是整个过程的基础,这同时也是避免并发问题的第一步。
虽说不可变性可以使代码变得又好又安全,但是由于性能问题,程序员们还是不大愿意使用这一特性。其症结在于,为了维护不可变性,我们可能在数据没发生任何变动的情况下也要进行拷贝操作,而这种无谓的拷贝对性能伤害很大。为了解决这个问题,我们在3.6节中曾经讨论过持久化数据结构以及如何使用这类数据结构来减轻程序在性能方面的负担。而在持久化数据结构的实现方面,已经有很多现成的第三方库可供使用,而Scala本身也提供了这类数据结构。由于Java也有实现好的持久化数据结构可用,所以我们就无需专门为使用这个特性而去换用自己不熟悉的语言。
除了不可变性之外,我们还希望能获得一些事务运行所需要的数据结构——这些数据结构的值是不可变的,但其实体可以在托管事务中被改变。Akka提供了两种托管数据结构——TransactionalVector和TransactionalMap。这两种数据结构源自于高效的Scala数据结构,其工作原理和Java的list、map类似。下面就让我们一起来学习如何在Java和Scala中使用TransactionalMap
在Java中使用事务集合类
在Java中使用TransactionalMap是非常简单的。例如,下面我们一起来写一个为运动员们记录得分的程序,其中对于得分的更新操作是并发执行的。这里我们将不采用同步或锁的方式,而是把所有更新操作都放在事务中处理。示例代码如下所示:
02 |
final private TransactionalMap<String, Integer> scoreValues = |
03 |
new TransactionalMap<String, Integer>(); |
04 |
final private Ref<Long> updates = new Ref<Long>(0L); |
05 |
public void updateScore( final String name, final int score) { |
07 |
public Object atomically() { |
08 |
scoreValues.put(name, score); |
09 |
updates.swap(updates.get() + 1 ); |
11 |
throw new RuntimeException( "Reject this score" ); |
16 |
public Iterable<String> getNames() { |
17 |
return asJavaIterable(scoreValues.keySet()); |
19 |
public long getNumberOfUpdates() { return updates.get(); } |
20 |
public int getScore( final String name) { |
21 |
return scoreValues.get(name).get(); |
在updateScore()函数中,我们把设置某个运动员的得分以及增加更新次数的操作都收敛到一个事务里面,该事务中所用到的TransactionalMap类型的scoreValue字段以及Ref类型updates字段都是托管类型。其中TransactionalMap支持普通Map的所有函数,只不过这些函数都是事务性的——即一旦事务回滚,我们对其进行的任何变更都将被丢弃。为了能够观察到实际的效果,我们人为地设置了一个回滚条件,即当得分为13的时,我们会先完成变更操作,然后抛异常令事务回滚。
在Java中,如果集合类实现了Iterable接口的话,我们就可以使用像for(String name: collectionOfNames)这样的for-each语句。但TransactionalMap是一个Scala集合类,并且没有直接支持这个接口。别担心——Scala提供了一个叫做javaConversions的门面(façade设计模式——译者注),该门面提供了很多方便的函数来获取我们想要的Java接口。例如,我们可以使用asJavaIterable()函数来获取原本需要使用getNames()函数才能拿到的接口。
至此我们已经完成了Scores类的全部功能,接下来我们还需要写一个测试用例来检验Scores类所实现的这些功能:
01 |
package com.agiledeveloper.pcj; |
02 |
public class UseScores { |
03 |
public static void main( final String[] args) { |
04 |
final Scores scores = new Scores(); |
05 |
scores.updateScore( "Joe" , 14 ); |
06 |
scores.updateScore( "Sally" , 15 ); |
07 |
scores.updateScore( "Bernie" , 12 ); |
08 |
System.out.println( "Number of updates: " + scores.getNumberOfUpdates()); |
10 |
scores.updateScore( "Bill" , 13 ); |
11 |
} catch (Exception ex) { |
12 |
System.out.println( "update failed for score 13" ); |
14 |
System.out.println( "Number of updates: " + scores.getNumberOfUpdates()); |
15 |
for (String name : scores.getNames()) { |
17 |
String.format( "Score for %s is %d" , name, scores.getScore(name))); |
上例中,我们先是添加了三个正常的运动员成绩,随后又增加了一个可以导致事务回滚的成绩。但由于事务的存在,所以最后一个成绩更新操作最终是无效的。而在代码的最后,我们会遍历并输出事务性map里面的所有数据。下面让我们观察一下这段代码的输出结果:
Number of updates: 3
update failed for score 13
Number of updates: 3
Score for Joe is 14
Score for Bernie is 12
Score for Sally is 15
在Scala中使用事务集合类
在Scala中,我们可以用与Java类似的方式来使用事务集合类。只不过由于这次是在Scala中,所以这里我们需要使用Scala的内部迭代器而不是javaConversions门面(facade)。下面让我们把Scores类翻译成Scala代码:
02 |
private val scoreValues = new TransactionalMap[String, Int]() |
03 |
private val updates = Ref( 0 L) |
04 |
def updateScore(name : String, score : Int) = { |
06 |
scoreValues.put(name, score) |
07 |
updates.swap(updates.get() + 1 ) |
08 |
if (score == 13 ) throw new RuntimeException( "Reject this score" ) |
11 |
def foreach(codeBlock : ((String, Int)) = > Unit) = |
12 |
scoreValues.foreach(codeBlock) |
13 |
def getNumberOfUpdates() = updates.get() |
如上所示,updateScore()函数与Java版本基本是相同的。唯一有点区别的地方是,我们去掉了getNames()函数和getScore()函数,并为foreach()提供了内部迭代器来遍历map中的数据。我们在下面所列出了Scala版UseScores类的实现,这段代码是其Java版代码的直译:
01 |
package com.agiledeveloper.pcj |
03 |
def main(args : Array[String]) : Unit = { |
04 |
val scores = new Scores() |
05 |
scores.updateScore( "Joe" , 14 ) |
06 |
scores.updateScore( "Sally" , 15 ) |
07 |
scores.updateScore( "Bernie" , 12 ) |
08 |
println( "Number of updates: " + scores.getNumberOfUpdates()) |
10 |
scores.updateScore( "Bill" , 13 ) |
12 |
case ex = > println( "update failed for score 13" ) |
14 |
println( "Number of updates: " + scores.getNumberOfUpdates()) |
15 |
scores.foreach { mapEntry = > |
16 |
val (name, score) = mapEntry |
17 |
println( "Score for " + name + " is " + score) |
不出所料,测试用例的输出结果也与Java版代码如出一辙:
Number of updates: 3
update failed for score 13
Number of updates: 3
Score for Joe is 14
Score for Bernie is 12
Score for Sally is 15