单线程事件处理器ControllerEventManager

简介: 单线程事件处理器,Controller端定义的一个组件。该组件内置了一个专属线程,负责处理其他线程发送过来的Controller事件。还定义了一些管理方法,为专属线程输送待处理事件。

0 前言

单线程事件处理器,Controller端定义的一个组件。该组件内置了一个专属线程,负责处理其他线程发送过来的Controller事件。还定义了一些管理方法,为专属线程输送待处理事件。


0.11.0.0版本前,Controller组件源码复杂。集群元数据信息在程序中同时被多个线程访问,因此,源码里有大量Monitor锁、Lock锁或其他线程安全机制,导致晦涩难懂,改动困难

0.11.0.0版本开始,社区重构Controller代码结构。将多线程并发访问改为单线程的事件队列方式。并非说Controller只有一个线程,而是指对局部状态的访问限制在一个专属线程,即让这个特定线程排他操作Controller元数据信息

这就无需担心多线程访问引发的各种线程安全问题,简化Controller端代码。


1 基本概念


21.png

Controller单线程事件队列处理模型及其基础组件。

Controller端多线程向事件队列写入不同种类事件,如zk端注册的Watcher线程、KafkaRequestHandler线程、Kafka定时任务线程等。


事件队列的另一端,只有一个名为ControllerEventThread的线程,负责“消费”或处理队列中的事件。


即单线程事件队列模型。


2 相关类

2.1 ControllerEventProcessor

Controller端的事件处理器接口:

20.png


API

process

接收一个Controller事件,并进行普通处理。实现Controller事件处理的主要方法


preempt

接收一个Controller事件,并抢占队列之前的Controller事件,进行优先处理。Kafka用其实现某些高优先级事件的抢占处理,目前在源码中也只有两类事件(ShutdownEventThread和Expire)需抢占式处理


KafkaController类是Controller组件的功能实现类,ControllerEventProcessor的唯一实现类。

16.png


2.2 ControllerEvent

Controller事件,即事件队列中被处理的对象,对应ControllerEvent接口

15.png

每个ControllerEvent都定义了一个状态。Controller处理具体事件时,会更新状态。

状态由ControllerState定义:

14.png

每类ControllerState都定义一个value值,表示Controller状态的序号,从0开始。

rateAndTimeMetricName方法用于构造Controller状态速率的监控指标名称。


如TopicChange是一类ControllerState:主题总数发生变化。


监控这类状态变更速率,rateAndTimeMetricName方法会定义名为TopicChangeRateAndTimeMs的指标。

并非所有ControllerState都有对应速率监控指标,如表示空闲状态的Idle无对应指标。


Controller总共定义了25类事件和17种状态:

13.png


监控到某些Controller状态变更速率异常时,可通过该表,快速确定可能造成瓶颈的Controller事件,并定位处理函数,辅助排查。


多个ControllerEvent可能属相同的ControllerState。


2.3 ControllerEventManager

事件处理器,创建和管理ControllerEventThread。位于ControllerEventManager.scala,该文件的组成:


ControllerEventManager Object

保存一些字符串常量,如线程名称

12.png

ControllerEventProcessor

事件处理器接口,目前只有KafkaController实现该接口

QueuedEvent

事件队列上的事件对象

ControllerEventManager Class

ControllerEventManager的伴生类,主要创建和管理事件处理线程和事件队列。该类定义了ControllerEventThread线程类。

ControllerEventManager对象仅定义了3个公共变量。


QueuedEvent

// 每个QueuedEvent定义了两个字段

// event: ControllerEvent类,表示Controller事件

// enqueueTimeMs:表示Controller事件被放入到事件队列的时间戳

class QueuedEvent(val event: ControllerEvent,

                 val enqueueTimeMs: Long) {

 // 标识事件是否开始被处理

 val processingStarted = new CountDownLatch(1)

 

 // 标识事件是否被处理过

 val spent = new AtomicBoolean(false)

 

 // 处理事件

 def process(processor: ControllerEventProcessor): Unit = {

   if (spent.getAndSet(true))

     return

   processingStarted.countDown()

   processor.process(event)

 }

 

 // 抢占式处理事件

 def preempt(processor: ControllerEventProcessor): Unit = {

   if (spent.getAndSet(true))

     return

   processor.preempt(event)

 }

 

 // 阻塞等待事件被处理完成

 def awaitProcessing(): Unit = {

   processingStarted.await()

 }

 

 override def toString: String = {

   s"QueuedEvent(event=$event, enqueueTimeMs=$enqueueTimeMs)"

 }

}


每个QueuedEvent对象实例都裹挟了一个ControllerEvent。

在QueuedEvent中,用CountDownLatch来做各种条件控制,比如用于侦测线程是否成功启动、成功关闭等。

QueuedEvent使用它的唯一目的,是确保Expire事件在建立ZooKeeper会话前被处理。

若不是在该场景,则代码就用spent来标识该事件是否已被处理:


若已被处理

再次调用process方法时就会直接返回

2.4 ControllerEventThread

专属的事件处理线程,唯一作用:处理不同种类的ControllEvent。

ControllerEventManager类内部定义的线程类。


消费QueuedEvent的ControllerEventThread类:

11.png

继承自ShutdownableThread:10.png

9.png


该类会循环执行doWork,具体实现则由子类完成。


作为Controller唯一的事件处理线程,需关注该线程运行状态。必须要知道该线程在JVM上的名字,后续就能对其监控。

线程名由ControllerEventManager Object中ControllerEventThreadName变量定义:

8.png

ControllerEventThread#doWork


override def doWork(): Unit = {

 // 从事件队列获取待处理的Controller事件(QueuedEvent对象实例),否则等待

 val dequeued = queue.take()

 dequeued.event match {

   // 若是关闭线程事件,啥都不做。关闭线程由外部来执行

   case ShutdownEventThread =>

   case controllerEvent =>

     _state = controllerEvent.state

     // 更新对应事件在队列中保存的时间

     eventQueueTimeHist.update(time.milliseconds() - dequeued.enqueueTimeMs)

     try {

       def process(): Unit = dequeued.process(processor)

       // 处理事件,同时计算处理速率

       rateAndTimeMetrics.get(state) match {

         case Some(timer) => timer.time { process() }

         case None => process()

       }

     } catch {

       case e: Throwable => error(s"Uncaught error processing event $controllerEvent", e)

     }

     _state = ControllerState.Idle

 }

}



首先是调用LinkedBlockingQueue#take,去。注意,这里用的是take方法,这说明,如果事件队列中没有QueuedEvent,那么,ControllerEventThread线程将一直处于阻塞状态,直到事件队列上插入了新的待处理事件。


一旦拿到QueuedEvent事件后,线程会判断是否是ShutdownEventThread事件。当ControllerEventManager关闭时,会显式地向事件队列中塞入ShutdownEventThread,表明要关闭ControllerEventThread线程。如果是该事件,那么ControllerEventThread什么都不用做,毕竟要关闭这个线程了。相反地,如果是其他的事件,就调用QueuedEvent的process方法执行对应的处理逻辑,同时计算事件被处理的速率。


该process方法底层调用ControllerEventProcessor#process:


def process(processor: ControllerEventProcessor): Unit = {

 // 若已经被处理过,直接返回

 if (spent.getAndSet(true))

   return

 processingStarted.countDown()

 // 调用ControllerEventProcessor的process方法处理事件

 processor.process(event)

}


方法首先判断该事件是否已被处理:


是,直接返回

不是,调用ControllerEventProcessor#process处理事件

每个ControllerEventProcessor#process都封装在KafkaController.scala文件。就是KafkaController类实现ControllerEventProcessor#process,部分代码:


override def process(event: ControllerEvent): Unit = {

   try {

     // 依次匹配ControllerEvent事件

     event match {

       case event: MockEvent =>

         event.process()

       case ShutdownEventThread =>

         error("Received a ShutdownEventThread event. This type of event is supposed to be handle by ControllerEventThread")

       case AutoPreferredReplicaLeaderElection =>

         processAutoPreferredReplicaLeaderElection()

       ......

     }

   } catch {

     // 如果Controller换成了别的Broker

     case e: ControllerMovedException =>

       info(s"Controller moved to another broker when processing $event.", e)

       // 执行Controller卸任逻辑

       maybeResign()

     case e: Throwable =>

       error(s"Error processing event $event", e)

   } finally {

     updateMetrics()

   }

}



这个process方法接收一个ControllerEvent实例,接着会判断它是哪类Controller事件,并调用相应的处理方法:


AutoPreferredReplicaLeaderElection,调processAutoPreferredReplicaLeaderElection

其他类型事件,调用process

put方法和clearAndPut方法也很重要。ControllerEventThread是读取队列事件,这两个方法就是向队列生产元素:


put把指定ControllerEvent插入事件队列

clearAndPut先执行高优先级的抢占式事件,之后清空队列所有事件,最后再插入指定事件

下面这两段源码分别对应这两个方法:


// put方法

def put(event: ControllerEvent): QueuedEvent = inLock(putLock) {

 // 构建QueuedEvent实例

 val queuedEvent = new QueuedEvent(event, time.milliseconds())

 // 插入到事件队列

 queue.put(queuedEvent)

 // 返回新建QueuedEvent实例

 queuedEvent

}

// clearAndPut方法

def clearAndPut(event: ControllerEvent): QueuedEvent = inLock(putLock) {

 // 优先处理抢占式事件

 queue.forEach(_.preempt(processor))

 // 清空事件队列

 queue.clear()

 // 调用上面的put方法将给定事件插入到事件队列

 put(event)

}


中的put方法使用putLock对代码进行保护,我觉得这个putLock是不需要的,因为LinkedBlockingQueue数据结构本身就已线程安全。put方法只会与全局共享变量queue打交道,因此,它们的线程安全性完全可委托LinkedBlockingQueue实现。LinkedBlockingQueue内部已维护一个putLock和一个takeLock,专门保护读写操作。


当然,我同意在clearAndPut中使用锁,毕竟要保证,访问抢占式事件和清空操作构成一个原子操作。


3 总结

Controller端的单线程事件队列实现方式,即ControllerEventManager通过构建ControllerEvent、ControllerState和对应的ControllerEventThread线程,并且结合专属事件队列,共同实现事件处理。


ControllerEvent:定义Controller能够处理的各类事件名称,目前总共定义了25类事件。

ControllerState:定义Controller状态。ControllerEvent的上一级分类,因此,ControllerEvent和ControllerState是多对一。

ControllerEventManager:Controller定义的事件管理器,专门定义和维护专属线程以及对应的事件队列。

ControllerEventThread:事件管理器创建的事件处理线程。该线程排他性地读取事件队列并处理队列中的所有事件。

目录
相关文章
|
3天前
SDL事件处理以及线程使用(2)
SDL库中事件处理和多线程编程的基本概念和示例代码,包括如何使用SDL事件循环来处理键盘和鼠标事件,以及如何创建和管理线程、互斥锁和条件变量。
12 1
SDL事件处理以及线程使用(2)
|
2月前
|
数据采集 Java Python
python 递归锁、信号量、事件、线程队列、进程池和线程池、回调函数、定时器
python 递归锁、信号量、事件、线程队列、进程池和线程池、回调函数、定时器
|
3月前
|
设计模式 存储 安全
Java面试题:设计一个线程安全的单例类并解释其内存占用情况?使用Java多线程工具类实现一个高效的线程池,并解释其背后的原理。结合观察者模式与Java并发框架,设计一个可扩展的事件处理系统
Java面试题:设计一个线程安全的单例类并解释其内存占用情况?使用Java多线程工具类实现一个高效的线程池,并解释其背后的原理。结合观察者模式与Java并发框架,设计一个可扩展的事件处理系统
55 1
|
3月前
|
设计模式 存储 缓存
Java面试题:结合单例模式与Java内存模型,设计一个线程安全的单例类?使用内存屏障与Java并发工具类,实现一个高效的并发缓存系统?结合观察者模式与Java并发框架,设计一个可扩展的事件处理系统
Java面试题:结合单例模式与Java内存模型,设计一个线程安全的单例类?使用内存屏障与Java并发工具类,实现一个高效的并发缓存系统?结合观察者模式与Java并发框架,设计一个可扩展的事件处理系统
29 0
|
5月前
|
存储 JSON 运维
【运维】Powershell 服务器系统管理信息总结(进程、线程、磁盘、内存、网络、CPU、持续运行时间、系统账户、日志事件)
【运维】Powershell 服务器系统管理信息总结(进程、线程、磁盘、内存、网络、CPU、持续运行时间、系统账户、日志事件)
129 0
|
消息中间件 缓存 资源调度
【Java项目】使用Nacos实现动态线程池技术以及Nacos配置文件更新监听事件
【Java项目】使用Nacos实现动态线程池技术以及Nacos配置文件更新监听事件
374 0
|
数据采集 监控 NoSQL
一日一技:Python多线程的事件监控
一日一技:Python多线程的事件监控
150 0
|
数据采集 监控 NoSQL
一日一技:Python多线程的事件监控
一日一技:Python多线程的事件监控
225 0
|
消息中间件 Java Shell
spring学习笔记(二)spring中的事件及多线程
spring学习笔记(二)spring中的事件及多线程
217 0
spring学习笔记(二)spring中的事件及多线程
|
4天前
|
存储 消息中间件 资源调度
C++ 多线程之初识多线程
这篇文章介绍了C++多线程的基本概念,包括进程和线程的定义、并发的实现方式,以及如何在C++中创建和管理线程,包括使用`std::thread`库、线程的join和detach方法,并通过示例代码展示了如何创建和使用多线程。
17 1
C++ 多线程之初识多线程