AKKA中的事件流

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: AKKA中的事件流

在《企业应用集成模式》一书中,定义了许多与消息处理有关的模式,其中运用最为广泛的模式为Publisher-Subscriber模式,尤其是在异步处理场景下。


基于Publisher-Subscriber模式,还可以根据不同的场景衍生出特殊的模式,例如针对一个Publisher和多个Subscriber,演化为Broadcast模式和Message Router模式。前者会将消息同时发送给所有的Subscriber,实现分布式的并行处理。例如针对订单处理的场景,当顾客下订单后,既需要生成订单,又需要通知库存准备发货,还需要通知卖方和买方。这些任务虽然存在事务的一致性,但基于BASE原则,可以通过补偿机制实现事务的最终一致性。于是,设计时可以将这些任务交给不同的Subscriber,当接收到消息后,同时对订单进行处理。至于Message Router,则需要引入的Router对传入的消息作出智能判断,从而将消息传递给真正感兴趣的Subscriber。这就好像发布者同时发布了不同的刊物,订阅者只订阅自己喜欢的刊物。


而消息总线(message bus)则通过引入总线来彻底解除Publisher与Subscriber之间的耦合,类似设计模式中的Mediator模式。总线就是Mediator,用以协调Publisher与Subscriber之间的关系。或者,我们也可以认为是两个Publisher-Subscriber的组合。对于Publisher而言,总线就是Subscriber;对于Subscriber而言,总线则成了Publisher。


AKKA提供的事件总线(Event Bus)可以看做是一种运用于特殊场景的消息总线,此时事件即为消息。它可以看做是Message Router模式的实现,提供了向多个Actor发送消息的基础设施,内含的Classifier作为分类器,用于分发消息时选择Subscriber,扮演了Message Router的角色。


在AKKA中,Event Bus被定义为trait,定义了基本的订阅、取消订阅、发布等对应的方法,代码如下所示:

trait EventBus {
  type Event
  type Classifier
  type Subscriber  
  def subscribe(subscriber: Subscriber, to: Classifier): Boolean
  def unsubscribe(subscriber: Subscriber, from: Classifier): Boolean
  def unsubscribe(subscriber: Subscriber): Unit
  def publish(event: Event): Unit
}


根据AKKA官方文档的描述,Event为所有发布到该总线上的事件类型,Classifier是选择订阅者的分类器,Subscriber就是注册到该总线上的订阅者。它们均被定义为抽象的type,使得EventBus拥有最大的开放性。我们视情况而定去具体实现特定类型。例如针对Actor的EventBus,订阅者被指定为ActorRef类型:

trait ActorEventBus extends EventBus {
  type Subscriber = ActorRef
  protected def compareSubscribers(a: ActorRef, b: ActorRef) = a compareTo b
}


AKKA运用Cake Pattern,以trait类型定义了一些特化的分类器。例如:

trait ActorClassifier {
 this: EventBus ⇒  
 type Classifier = ActorRef
}
trait PredicateClassifier {
 this: EventBus ⇒  
 type Classifier = Event ⇒ Boolean
}


除此之外,还有诸如SubchannelClassification、ScanningClassification、ActorClassification等分类器,都采用了同样的方式定义为trait。这样就便于继承EventBus的类进行trait的混入,例如EventStream的定义:

class EventStream(private val debug: Boolean = false) extends LoggingBus with SubchannelClassification {
  type Event = AnyRef
  type Classifier = Class[_]
}


由于LoggingBus继承自ActorEventBus,故而EventStream中的Subscriber类型被定义为ActorRef。然后在EventStream中又重写了Event和Classfier类型,分别为AnyRef和Class,这说明任何Java引用对象都可以作为事件,而分类的依据则为Event的类型。

type Subscriber = ActorRef
type Event = AnyRef
type Classifier = Class[_]

EventStream继承了SubchannelClassification。在其中维持了订阅者列表,虽然该订阅列表类型为SubclassifiedIndex,不过我们可以将其简单地视为一个Map(实际情况更复杂,因为它实际上维护了分类的层级):

trait SubchannelClassification {
  this: EventBus ⇒
  private lazy val subscriptions = new SubclassifiedIndex[Classifier, Subscriber]()
}

由于此时的Classifier在EventStream中被定义为Class[_],Subscriber为ActorRef,因此subscripions相当于被定义为:

private lazy val subscriptions = new SubclassifiedIndex[Class[_], ActorRef]

当Actor出现故障,从而使得消息被转发给dead letter时,我们可能需要侦听这些死信,并对它们进行处理。则AKKA的做法就是通过EventStream来进行订阅:

class DeadLetterListener extends Actor {
  def receive = {
    case DeadLetter(msg, from, to) =>
      println(s"${System.currentTimeMillis()}: from $from to $to with message $msg.")
  }
}
val listener = system.actorOf(Props[DeadLetterListener], "listener")
system.eventStream.subscribe(listener, classOf[DeadLetter])

结合SubchannelClassification,我们来分析其执行过程。

首先,它通过subscribe方法将DeadLetterListener的actor引用对象以及事件类型DeadLetter注册到SubchannelClassification中的subscriptions。当ActorSystem的任意actor发出DeadLetter时,就会触发EventStream的publish()方法:

class EventStream(private val debug: Boolean = false) extends LoggingBus with SubchannelClassification {
  protected def publish(event: AnyRef, subscriber: ActorRef) = {
    if (subscriber.isTerminated) unsubscribe(subscriber)
    else subscriber ! event
  }
}

此时就会通过subscriber(此时为前面定义的DeadLetterListener)发送event(此时为DeadLetter对象),从而进入到DeadLetterListener的receive方法中,打印出我想要的消息。

通过EventStream还可以处理日志消息。AKKA自身也提供了默认的处理器,可以配置在application.conf文件中:

akka {
  event-handlers = ["akka.event.Logging$DefaultLogger"]
}

这个默认的日志处理器会订阅高于配置级别的日志事件类,例如将日志级别配置为Debug:

system.eventStream.setLogLevel(Logging.DebugLevel)

通过这样的配置,所有低于Debug级别的日志事件发生时,都不会被EventStream分发。

相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
相关文章
|
消息中间件 安全 网络协议
Akka事件驱动新选择
在高并发场景解决方案中,多从线程角度出发,以解决线程安全问题,锁范围又需要多业务场景考虑,何时上锁,何时解锁,何时自动过期等,而事件驱动是从执行什么操作驱动的,在软件系统的设计层面,两者关联性不大,一个强调安全,一个强调策略,那么有没有两者结合解决并发编程难的事件驱动解决方案呢?带着场景解决方案我们走进Akka。
402 0
Akka事件驱动新选择
|
3月前
|
Linux
vertx的学习总结4之异步数据和事件流
本文探讨了Vert.x中异步数据流的概念,包括事件流的抽象、背压机制的重要性,以及如何从流中解析协议数据,并通过文件流的示例代码展示了异步文件操作的处理方式。
59 3
vertx的学习总结4之异步数据和事件流
|
3月前
什么是事件流?
什么是事件流?
75 1
|
5月前
什么是事件流
什么是事件流
72 0
|
5月前
|
消息中间件 存储 Kafka
Kafka 与 SQS:事件流工具深入比较
【8月更文挑战第13天】
189 0
|
消息中间件 前端开发 Java
AKKA 的 Actor 模式介绍 | 学习笔记
快速学习 AKKA 的 Actor 模式介绍
AKKA 的 Actor 模式介绍 | 学习笔记
使用Akka持久化——消息发送与接收
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/beliefer/article/details/53929751 前言 在《使用Akka持久化——持久化与快照》一文中介绍了如何使用Akka持久化消息及生成快照。
1102 0
|
缓存 前端开发 JavaScript
从 RxJS 到 Flink:如何处理数据流?
前端开发的本质是什么?响应式编程相对于 MVVM 或者 Redux 有什么优点?响应式编程的思想是否可以应用到后端开发中?本文以一个新闻网站为例,阐述在前端开发中如何使用响应式编程思想;再以计算电商平台双11每小时成交额为例,分享同样的思想在实时计算中的相同与不同之处。
从 RxJS 到 Flink:如何处理数据流?
|
消息中间件 Kubernetes Cloud Native
关于 Broker/Trigger 事件模型
Broker 提供一个事件集,可以通过属性选择该事件集。它负责接收事件并将其转发给由一个或多个匹配 Trigger 定义的订阅者。Trigger 描述基于事件属性的过滤器。同时可以根据需要创建多个 Trigger。本文将为大家详细讲解 Broker/Trigger 事件模型。
884 0
关于 Broker/Trigger 事件模型
|
流计算 Java 安全