到目前为止我们所接触过的角色都是可以接收消息的,而消息的类型也是五花八门,如String、元组、case类/自定义消息等。然而发送消息的行为在感觉上与我们日常编程工作中所使用的常规函数调用还是有很大区别的,为了弥合二者之间的鸿沟,类型化角色(Typed Actor)就应运而生了。这种类型的角色可以将发送消息的动作在形式上伪装成常规的函数调用,而将消息传输动作隐藏在后台执行。我们可以将类型化角色想像成为一个活动的对象,该对象运行在一个属于自己的轻量消息驱动的线程里面,并且还带有一个用于将正常的函数调用转换成异步非阻塞消息的拦截代理。
由于类型化角色可以将普通函数调用在后台转换成消息,所以我们就可以最大程度地享受到静态类型所带来的好处。由于有了类型化角色,我们就无需在接收消息的角色里对收到的消息类型猜来猜去,同时也可以更好地利用如代码补全等这些由IDE所提供的编程支持。
如果要实现一个普通的角色,我只需简单地写一个类,并使其继承UntypedActor或Actor trait/抽象类即可。而如果我们要实现一个类型化的角色,则需要创建一个接口-实现对(在Scala中,我们可以不用写接口定义,而只需使用一个不含实现内容的trait即可)。
我们可以使用Actor类的actorOf()函数来实例化一个普通角色。而如果要实例化一个类型化的角色,则我们需要使用TypedActor的newInstance()函数才行。
我们从TypedActor中拿到的引用是一个可以将函数调用转换成异步消息的拦截代理。其中,返回值为void的函数将被转换成sendOneWay或!函数,返回普通类型内容的函数会被转换成sendRequestReply()或!!函数,而返回值类型为Future的函数则被转换成sendRequestReplyFuture()或!!!函数。
我们在第5章中用现代Java并发API和在6.7节中用STM重构的EnergySource类示例是我们演示类型化角色功能和用法的最佳选择。由于该示例中存在可变状态,所以我们可以用一个角色来对其进行隔离。又因为每个EnergySource实例都将各自运行在一个独立的线程中,所以其中不存在竞争条件的问题。当多个线程调用同一个EnergySource实例上的方法时,这些调用将会跳出线程并在该实例上顺序执行。请记住,角色是不会一直占住线程不放的,所以它们可以在各实例间共享线程资源并提供更大的吞吐量——类型化的角色同样也可以做到。
EnergySource其实本身没什么复杂的业务逻辑,只有查询当前的电量、使用数和消耗电量,以及可以在后台自动恢复电量这几个简单的功能。我们当然希望基于角色的版本也能实现上述全部功能,但请先不要着急,我们将采取递增式的构建方法以便可以每次只关注一件事情。示例展示顺序则还是沿用之前的老规矩:先构建Java版然后再构建Scala版。
在Java中使用类型化角色
类型化角色需要一个接口/实现对,所以下面让我们先从EnergySource的接口开始:
1 |
public interface EnergySource { |
2 |
long getUnitsAvailable(); |
4 |
void useEnergy( final long units); |
与上面这个接口相对应的实现类是EnergySourceImpl。该类与普通Java类的唯一区别是我们为了将其转换成一个活动对象而让其继承了TypedActor类:
01 |
public class EnergySourceImpl extends TypedActor implements EnergySource { |
02 |
private final long MAXLEVEL = 100L; |
03 |
private long level = MAXLEVEL; |
04 |
private long usageCount = 0L; |
05 |
public long getUnitsAvailable() { return level; } |
06 |
Using Typed Actors • 191 |
07 |
public long getUsageCount() { return usageCount; } |
08 |
public void useEnergy( final long units) { |
09 |
if (units > 0 && level - units >= 0 ) { |
11 |
"Thread in useEnergy: " + Thread.currentThread().getName()); |
TypedActor保证EnergySourceImpl里定义的所有函数都是互斥的;也就是说,在任意给定的实例上,每次只能有一个函数能被调用,所以在函数的实现中无需对任何成员字段的访问进行同步或加锁。为了能够感知到执行角色的线程的存在,我们会在示例代码中插入少量的打印语句。最后,为了验证类型化角色的实际效果,我们还需要实现测试用例UseEnergySouce类。
01 |
public class UseEnergySource { |
02 |
public static void main(final String[] args) |
03 |
throws InterruptedException { |
04 |
System.out.println("Thread in main: " + |
05 |
Thread.currentThread().getName()); |
06 |
final EnergySource energySource = |
07 |
TypedActor.newInstance(EnergySource.class, EnergySourceImpl.class); |
08 |
System.out.println("Energy units " + energySource.getUnitsAvailable()); |
09 |
System.out.println("Firing two requests for use energy"); |
10 |
energySource.useEnergy(10); |
11 |
energySource.useEnergy(10); |
12 |
System.out.println("Fired two requests for use energy"); |
14 |
System.out.println("Firing one more requests for use energy"); |
15 |
energySource.useEnergy(10); |
17 |
System.out.println("Energy units " + energySource.getUnitsAvailable()); |
18 |
System.out.println("Usage " + energySource.getUsageCount()); |
19 |
TypedActor.stop(energySource); |
首先,我们通过TypedActor的newInstance()函数创建一个类型化角色的实例。随后,我们调用了getUnitsAvailable()函数来获取电源当前电量值。请注意,此时我们的主调线程(即主线程)将会被阻塞,直至类型化角色响应为止。与getUnitsAvailable()所不同的是,由于返回值是void,所以对于useEnergy()的调用是非阻塞的,即接下来的连续两次useEnergy()函数调用都是立刻返回的。在经历一个短暂的延时之后,我们会再调用一次useEnergy()以研究角色和线程的行为。接下来,为了让异步消息能够被处理完,我们又插入了一个1秒的延时,并紧接着又再次查询电源的使用次数和当前电量。在程序结尾,我们关停了所有角色。下面让我们来看看这段代码的输出结果:
03 |
Firing two requests for use energy |
04 |
Fired two requests for use energy |
05 |
Thread in useEnergy: akka:event-driven:dispatcher:global-2 |
06 |
Thread in useEnergy: akka:event-driven:dispatcher:global-2 |
07 |
Firing one more requests for use energy |
08 |
Thread in useEnergy: akka:event-driven:dispatcher:global-3 |
由于类型化角色EnergySourceImpl每次只会执行一个函数,所以虽然前两次useEnergy()请求并未阻塞主线程,但这两个任务都是在角色线程中顺序执行的。上面的测试用例优雅地将执行线程在main函数的调用与角色里的函数之间进行了数次切换,这使得我们可以更清楚地观察到:当main()函数运行在主线程中时,角色中的函数也在另外一个由Akka管理的线程(global-2)中顺序执行着。此外,我们还注意到角色并没有占住执行线程不放,证据就是最后一个useEnergy()函数是在另外一个Akka管理的线程(global-3)中运行的。
电源的可变状态被隔离在EnergySourceImpl 角色中——我称之为隔离的,不仅是因为可变状态被封装在EnergySourceImpl类的定义代码中,而是由于在类型化角色的控制之下的任意时刻,最多只会有一个角色的执行线程可以访问该可变状态。
在Scala中使用类型化角色
前面我们已经看到,在Java中使用类型化角色需要一组接口/实现对。而在Scala中,我们不再创建接口,而是改为创建一个不含实现代码的trait。下面让我们将EnergySource用Scala改写为一个trait:
2 |
def getUnitsAvailable() : Long |
3 |
def getUsageCount() : Long |
4 |
def useEnergy(units : Long) : Unit |
EnergySourceImpl的实现完全是对其Java版本同名类的直接翻译。该类继承了TypedActor并附带了之前定义的EnergySource trait。
01 |
class EnergySourceImpl extends TypedActor with EnergySource { |
05 |
def getUnitsAvailable() = level |
06 |
def getUsageCount() = usageCount |
07 |
def useEnergy(units : Long) = { |
08 |
if (units > 0 && level - units > = 0 ) { |
09 |
println( "Thread in useEnergy: " + Thread.currentThread().getName()) |
最后,我们还需要实现一个对上面所有功能进行检验的测试用例UseEnergySource:
01 |
object UseEnergySource { |
02 |
def main(args : Array[String]) : Unit = { |
03 |
println( "Thread in main: " + Thread.currentThread().getName()) |
04 |
val energySource = TypedActor.newInstance( |
05 |
classOf[EnergySource], classOf[EnergySourceImpl]) |
06 |
println( "Energy units " + energySource.getUnitsAvailable) |
07 |
println( "Firing two requests for use energy" ) |
08 |
energySource.useEnergy( 10 ) |
09 |
energySource.useEnergy( 10 ) |
10 |
println( "Fired two requests for use energy" ) |
12 |
println( "Firing one more requests for use energy" ) |
13 |
energySource.useEnergy( 10 ) |
15 |
println( "Energy units " + energySource.getUnitsAvailable) |
16 |
println( "Usage " + energySource.getUsageCount) |
17 |
TypedActor.stop(energySource) |
下面让我们运行Scala版本的示例代码并观察其输出结果:
03 |
Firing two requests for use energy |
04 |
Fired two requests for use energy |
05 |
Thread in useEnergy: akka:event-driven:dispatcher:global-2 |
06 |
Thread in useEnergy: akka:event-driven:dispatcher:global-2 |
07 |
Firing one more requests for use energy |
08 |
Thread in useEnergy: akka:event-driven:dispatcher:global-3 |
与Java版本的实现相比,Scala版本的示例程序除了受益于活动对象之外,其代码简洁性也更胜一筹。