[翻译]AKKA笔记 - DEATHWATCH -7

简介: 当我们说Actor生命周期的时候,我们能看到Actor能被很多种方式停掉(用ActorSystem.stop或ActorContext.stop或发送一个PoisonPill - 也有一个kill和gracefulstop)。

当我们说Actor生命周期的时候,我们能看到Actor能被很多种方式停掉(用ActorSystem.stop或ActorContext.stop或发送一个PoisonPill - 也有一个killgracefulstop)。

无论Actor是怎么死的,有些情况一些系统中的其他actor想要知道。让我们举一个Actor与数据库交互的例子 - 我们叫它RepositoryActor。很明显,会有一些其他actor会向这个RepositoryActor发送消息。这些有“兴趣”的Actor很愿意留个eye或者看(watch)这个actor关闭时的消息。这个在Actor里面叫DeathWatch。这个用来watchunwatch的方法就是ActorContext.watchActorContext.unwatch。如果已经监视了,这些watcher会在Actor关闭时收到一个Terminated消息,并可以很舒服的加到他们的receive功能中。

不像Supervision有一个严格的父子继承关系,任何Actor都可以watch任何ActorSystem中的Actor。

这里写图片描述

让我们看下。

代码

QUOTEREPOSITORYACTOR

1.我们的QueryRepositoryActor格言查询Actor保存了一个quote的列表并且在收到一个QuoteRepositoryRequest时随机返回一条。

  1. 他记录了收到消息的个数,如果收到超过3个消息,他用PoisonPill把自己杀掉

这没啥神奇的。

package me.rerun.akkanotes.deathwatch

import akka.actor.{PoisonPill, Actor, ActorLogging, actorRef2Scala}  
import me.rerun.akkanotes.protocols.QuoteRepositoryProtocol._  
import scala.util.Random

class QuoteRepositoryActor() extends Actor with ActorLogging {

  val quotes = List(
    "Moderation is for cowards",
    "Anything worth doing is worth overdoing",
    "The trouble is you think you have time",
    "You never gonna know if you never even try")

  var repoRequestCount:Int=1

  def receive = {

    case QuoteRepositoryRequest => {

      if (repoRequestCount>3){
        self!PoisonPill
      }
      else {
        //Get a random Quote from the list and construct a response
        val quoteResponse = QuoteRepositoryResponse(quotes(Random.nextInt(quotes.size)))

        log.info(s"QuoteRequest received in QuoteRepositoryActor. Sending response to Teacher Actor $quoteResponse")
        repoRequestCount=repoRequestCount+1
        sender ! quoteResponse
      }

    }

  }

}

TEACHERACTORWATCHER

一样的,TeacherActorWatcher也没啥神奇的,除了他创建了一个QuoteRepositoryActor并且用context.watch观察。

package me.rerun.akkanotes.deathwatch

import akka.actor.{Terminated, Props, Actor, ActorLogging}  
import me.rerun.akkanotes.protocols.TeacherProtocol.QuoteRequest  
import me.rerun.akkanotes.protocols.QuoteRepositoryProtocol.QuoteRepositoryRequest

class TeacherActorWatcher extends Actor with ActorLogging {

  val quoteRepositoryActor=context.actorOf(Props[QuoteRepositoryActor], "quoteRepositoryActor")
  context.watch(quoteRepositoryActor)


  def receive = {
    case QuoteRequest => {
      quoteRepositoryActor ! QuoteRepositoryRequest
    }
    case Terminated(terminatedActorRef)=>{
      log.error(s"Child Actor {$terminatedActorRef} Terminated")
    }
  }
}

测试CASE

这里会有点意思。我从来没想过这个可以被测试。akka-testkit。我们会分析下这三个测试CASE:

1. 断言如果观察到已经收到Terminated消息

QuoteRepositoryActor应该在收到第四条消息时给测试case发送一条Terminated消息。前三条应该是可以的。

"A QuoteRepositoryActor" must {
    ...
    ...
    ...

    "send back a termination message to the watcher on 4th message" in {
      val quoteRepository=TestActorRef[QuoteRepositoryActor]

      val testProbe=TestProbe()
      testProbe.watch(quoteRepository) //Let's watch the Actor

      within (1000 millis) {
        var receivedQuotes = List[String]()
        (1 to 3).foreach(_ => quoteRepository ! QuoteRepositoryRequest)
        receiveWhile() {
          case QuoteRepositoryResponse(quoteString) => {
            receivedQuotes = receivedQuotes :+ quoteString
          }
        }

        receivedQuotes.size must be (3)
        println(s"receiveCount ${receivedQuotes.size}")

        //4th message
        quoteRepository!QuoteRepositoryRequest
        testProbe.expectTerminated(quoteRepository)  //Expect a Terminated Message
      }
    }

2.如果没有观察(watched/unwatched)到则断言没收到Terminated消息

事实上,我们做这个只是演示下context.unwatch。如果我们移掉testProbe.watch和testProbe.unwatch这行,则测试case会运行的很正常。

    "not send back a termination message on 4th message if not watched" in {
      val quoteRepository=TestActorRef[QuoteRepositoryActor]

      val testProbe=TestProbe()
      testProbe.watch(quoteRepository) //watching

      within (1000 millis) {
        var receivedQuotes = List[String]()
        (1 to 3).foreach(_ => quoteRepository ! QuoteRepositoryRequest)
        receiveWhile() {
          case QuoteRepositoryResponse(quoteString) => {
            receivedQuotes = receivedQuotes :+ quoteString
          }
        }

        testProbe.unwatch(quoteRepository) //not watching anymore
        receivedQuotes.size must be (3)
        println(s"receiveCount ${receivedQuotes.size}")

        //4th message
        quoteRepository!QuoteRepositoryRequest
        testProbe.expectNoMsg() //Not Watching. No Terminated Message
      }
    }

3. 在TeacherActorWatcher中断言收到了Terminated消息

我们订阅了EventStream并通过检查一个特殊的日志消息来断言termination。

   "end back a termination message to the watcher on 4th message to the TeacherActor" in {

      //This just subscribes to the EventFilter for messages. We have asserted all that we need against the QuoteRepositoryActor in the previous testcase
      val teacherActor=TestActorRef[TeacherActorWatcher]

      within (1000 millis) {
        (1 to 3).foreach (_=>teacherActor!QuoteRequest) //this sends a message to the QuoteRepositoryActor

        EventFilter.error (pattern="""Child Actor .* Terminated""", occurrences = 1).intercept{
          teacherActor!QuoteRequest //Send the dangerous 4th message
        }
      }
    }

EventFilter中的pattern属性,没啥奇怪的,需要一个正则表达式。正则pattern=”“”Child Actor .* Terminated”“”用来匹配一条格式是Child Actor {Actor[akka://TestUniversityMessageSystem/user/$$d/quoteRepositoryActor#-1905987636]} Terminated日志信息。

Github

与往常一样,代码在github。看下deathwatch的包。


文章来自微信平台「麦芽面包」(微信扫描二维码关注)。未经允许,禁止转载。

目录
相关文章
|
人工智能 JavaScript 前端开发
白嫖 DeepSeek ,低代码竟然会一键作诗?
宜搭低代码平台接入 DeepSeek AI 大模型能力竟然这么方便!本教程将揭秘宜搭如何快速接入 DeepSeek API,3 步打造专属作诗机器人,也许你还能开发出更多有意思的智能玩法,让创意在代码间自由生长。
2773 14
|
JSON 数据挖掘 API
唯品会按关键字搜索 VIP 商品 API 接口的开发应用与收益
在电商蓬勃发展的今天,精准的商品搜索功能至关重要。唯品会的按关键字搜索VIP商品API接口通过高效、精准的检索,提升了用户购物体验和商家销售业绩。该接口基于RESTful架构,采用JSON格式交互,支持唯品会APP内搜索、第三方平台合作及数据分析等场景,显著提升用户活跃度与忠诚度,拓展销售渠道,增加收入,并挖掘数据驱动的商业价值,助力唯品会持续发展。
322 4
|
机器学习/深度学习 算法 5G
|
存储 安全 Linux
OverTheWire Bandit 通关解析(上)
OverTheWire Bandit 通关解析(上)
|
编解码 测试技术 开发工具
绘梦有形,快手开源「可图 Kolors」,等你来玩
近期,快手开源了名为Kolors(可图)的文本到图像生成模型,该模型具有对英语和汉语的深刻理解,并能够生成高质量、逼真的图像。
|
监控 JavaScript Java
JVM源码级别分析G1发生FullGC元凶的是什么
线上系统遭遇频繁Old GC问题,监控显示出现多次“to-space exhausted”日志,这表明垃圾回收过程中因年轻代 Survivor 区或老年代空间不足导致对象晋升失败。通过 JVM 源码分析,此问题源于对象转移至老年代失败时,JVM 无法找到足够的空间存放存活对象。进一步排查发现大对象分配占用了预留空间,加剧了空间不足的情况。使用 JFR 分析工具定位到定期报表序列化导致大量大对象生成,通过改用堆外内存进行序列化输出,最终解决了频繁 Old GC 问题。
686 0
|
Ubuntu JavaScript 小程序
别用 VMware 了,这款虚拟机简单、轻量、好用还免费...
别用 VMware 了,这款虚拟机简单、轻量、好用还免费...
|
Web App开发 安全 Android开发
如何上架自己的应用到各大应用商店?
上架各大安卓应用商店(腾讯应用宝、阿里应用商店、百度手机助手、华为应用市场、小米应用商店)需要准备哪些材料?
1508 0
|
前端开发 开发者 CDN
又一款开源免费字体「霞鹜文楷」,原来还能这么用!
又一款开源免费字体「霞鹜文楷」,原来还能这么用!
|
存储 网络安全
【Openstack】Ceph 与Openstack存储对接
Ceph 与Openstack存储对接
5343 12
【Openstack】Ceph 与Openstack存储对接