如果大家从事的是业务开发,在工作中如果涉及到并发,通常是引入线程池来实现并发。
但如果从事的是基础框架的开发,通常并不会直接使用线程池,会按需创建单个线程,并且为了职责的单一与提升性能,通常单个线程只会负责一个流程中的部分功能,多个线程紧密配合。
那线程与线程之间如何协作呢?如果子线程出现异常,如何通知主线程呢?
可以明确的告诉大家,子线程抛出异常,主线程是无法捕获到异常的。
请带着上述问题,开始本文的学习。
1、两个线程如何协作
这里我想拿RocketMQ中的一个场景与大家来分享,通常这也是两个线程进行协作十分经典的使用示例。
在RocketMQ消费端,PUSH模式消费者在消费之前需要提前做好准备:
- 队列负载均衡
- 消息拉取
这里简单介绍一下相关实现:在RocketMQ消费者PUSH模式启动后,消费组中的成员发生了变化,需要进行重平衡,即进行队列重新负载,主要的依据是查询当前队列的总个数与当前消费者个数,然后使用负载均衡算法(例如平均分配),各个消费者获取分配后的队列后,依次向Broker服务端拉取消息,然后提交消费线程池消费。
从单一指责设计原则出发,根据负载均衡、消息拉取两个不通的职责来看,需要设计两个线程分别处理,并且这两个线程必须相互协作,因为负责队列负载的线程需要指引消息拉取线程具体拉取哪些队列。
从上述流程图中可以看出,RocketMQ的给出的实践要点如下:
- 引入一个阻塞队列(多线程安全),充当任务队列,两个线程都可以访问。
- PullMessageService通过调用阻塞队列的take()方法获取任务,如果阻塞队列(任务队列)中没有待处理任务,线程则阻塞,非常关键,避免线程空轮询,造成CPU飙升。
- RebalanceService线程的职责是根据任务负载算法生成任务,放入任务队列中,从而能够唤醒PullMessageService线程。
大家有没有发现,这不就是典型的生产者-消费者模式吗?没错,这就是。
介绍了线程之间如何协作,接下来再介绍一下如何复用一个线程。
我们还是以RocketMQ中PullMessageService线程的实现为例进行阐述。
实现经典步骤:
- 使用 while() 进行循环,这里通常有两种实现方法:
- while(Thread.interrupted()),即检测当前线程的中断状态,如果要停止该线程,通常是在其他线程中调用要中断线程的interrupt()方法。
- while(!this.stoped) stoped会使用volatile关键字首饰,如果要停止该线程,并且需要暴露一个方法,用于将stoped设置为true,从而跳出循环。
- 在while方法中从一个阻塞队列中获取待执行的任务,没有任务执行时当前线程阻塞,不消耗CPU资源。
2、如何处理异常情况
上述线程的协作方式非常的优雅,但细想一下,其实也存在问题。
如果“生产者”线程(RebalacneService)出现异常而提停止,PullMessageServicve线程可能会由于一直没有任务执行而一直阻塞,进入假死异常。
这样的问题在上述场景中或许不那么明显,笔者举一个工作中遇到的场景。
我在负责数据同步产品时,需要增量将MySQL数据库中的数据同步到mq、es等其他目标端,就按照数据拉取、数据解析、数据传输等职责设计了多个线程,其线程模型如下图所示:
如果解析线程出现异常(数据同步场景,碰到的最常见的异常:表结构变更引起的异常),导致解析线程不再从任务队列中获取解析任务,然后解析队列将被填满,导致MySQLBinlogPullThread线程的无限阻塞,整个数据同步流程处于假死,问题很严重,如果不及时处理,数据会积压而造成生产故障,那如何处理呢?
不做任何处理的情况下,一个线程是无法感知另外一个线程的异常状态的,从名称来看,调用栈的作用域是线程。
从上面的线程协作模型来看,可以按照职责分为生产者线程、消费者线程。
- 生产者线程,向任务队列中添加任务
- 消费者线程,从任务队列中获取任务并处理
2.1 生产者线程异常如何通知消费线程
如果是生产者线程出现异常,我们如何通知消费端线程呢?
笔者在实践过程中的处理方法:首先捕获生产者线程的异常,然后包装一个异常任务ErrorTask,放入到任务队列,然后消费者线程在处理任务时,首先判断任务的类型,如果是ErrorTask,则停止并退出。
给出的基本实现伪代码如下:
1、生产者线程代码
重点在线程内部需要捕获异常,然后将异常封装成一个Error任务,供下游感知。
2、消费者线程代码
2.2 消费端线程异常如何通知生产线程
消费端线程的角色主要是从任务队列中获取任务,但自身如果出现异常,只会阻塞生产端线程,要通知发送方线程异常,通常的方式是客户端需要持有发送方的对象,并通过其提供的方法进行通知。
发送者线程需要预留检测点与关闭方法,其代码如下:
关键点如下:
- run方法中使用while(!stoped)方式进行循环,内嵌一个阻塞队列。
- 线程提供一个改变线程状态运行的方法,shutdown,并传人是否是因为异常退出,供其他线程调用。
消费者线程的实现如下: