《重学Java高并发》线程之间如何协作(父子线程如何优雅交互)

简介: 《重学Java高并发》线程之间如何协作(父子线程如何优雅交互)

如果大家从事的是业务开发,在工作中如果涉及到并发,通常是引入线程池来实现并发。

但如果从事的是基础框架的开发,通常并不会直接使用线程池,会按需创建单个线程,并且为了职责的单一与提升性能,通常单个线程只会负责一个流程中的部分功能,多个线程紧密配合。


那线程与线程之间如何协作呢?如果子线程出现异常,如何通知主线程呢?


可以明确的告诉大家,子线程抛出异常,主线程是无法捕获到异常的。


请带着上述问题,开始本文的学习。


1、两个线程如何协作


这里我想拿RocketMQ中的一个场景与大家来分享,通常这也是两个线程进行协作十分经典的使用示例


在RocketMQ消费端,PUSH模式消费者在消费之前需要提前做好准备:


  • 队列负载均衡
  • 消息拉取


这里简单介绍一下相关实现:在RocketMQ消费者PUSH模式启动后,消费组中的成员发生了变化,需要进行重平衡,即进行队列重新负载,主要的依据是查询当前队列的总个数与当前消费者个数,然后使用负载均衡算法(例如平均分配),各个消费者获取分配后的队列后,依次向Broker服务端拉取消息,然后提交消费线程池消费。


单一指责设计原则出发,根据负载均衡、消息拉取两个不通的职责来看,需要设计两个线程分别处理,并且这两个线程必须相互协作,因为负责队列负载的线程需要指引消息拉取线程具体拉取哪些队列。

76a87ea2a45e0047774f06fb18a4c709.png

从上述流程图中可以看出,RocketMQ的给出的实践要点如下:


  • 引入一个阻塞队列(多线程安全),充当任务队列,两个线程都可以访问。
  • PullMessageService通过调用阻塞队列的take()方法获取任务,如果阻塞队列(任务队列)中没有待处理任务,线程则阻塞,非常关键,避免线程空轮询,造成CPU飙升。
  • RebalanceService线程的职责是根据任务负载算法生成任务,放入任务队列中,从而能够唤醒PullMessageService线程。

大家有没有发现,这不就是典型的生产者-消费者模式吗?没错,这就是。


介绍了线程之间如何协作,接下来再介绍一下如何复用一个线程。


我们还是以RocketMQ中PullMessageService线程的实现为例进行阐述。

d68fe1a23f684edf6bdff37d559373d3.png

实现经典步骤:


  • 使用 while() 进行循环,这里通常有两种实现方法:
  • while(Thread.interrupted()),即检测当前线程的中断状态,如果要停止该线程,通常是在其他线程中调用要中断线程的interrupt()方法。
  • while(!this.stoped) stoped会使用volatile关键字首饰,如果要停止该线程,并且需要暴露一个方法,用于将stoped设置为true,从而跳出循环。
  • 在while方法中从一个阻塞队列中获取待执行的任务,没有任务执行时当前线程阻塞,不消耗CPU资源。


2、如何处理异常情况


上述线程的协作方式非常的优雅,但细想一下,其实也存在问题。


如果“生产者”线程(RebalacneService)出现异常而提停止,PullMessageServicve线程可能会由于一直没有任务执行而一直阻塞,进入假死异常。


这样的问题在上述场景中或许不那么明显,笔者举一个工作中遇到的场景。


我在负责数据同步产品时,需要增量将MySQL数据库中的数据同步到mq、es等其他目标端,就按照数据拉取、数据解析、数据传输等职责设计了多个线程,其线程模型如下图所示:

06acc83f7659e21839a84662e5339ee7.png

如果解析线程出现异常(数据同步场景,碰到的最常见的异常:表结构变更引起的异常),导致解析线程不再从任务队列中获取解析任务,然后解析队列将被填满,导致MySQLBinlogPullThread线程的无限阻塞,整个数据同步流程处于假死,问题很严重,如果不及时处理,数据会积压而造成生产故障,那如何处理呢?


不做任何处理的情况下,一个线程是无法感知另外一个线程的异常状态的,从名称来看,调用栈的作用域是线程。


从上面的线程协作模型来看,可以按照职责分为生产者线程、消费者线程。


  • 生产者线程,向任务队列中添加任务
  • 消费者线程,从任务队列中获取任务并处理


2.1 生产者线程异常如何通知消费线程


如果是生产者线程出现异常,我们如何通知消费端线程呢?


笔者在实践过程中的处理方法:首先捕获生产者线程的异常,然后包装一个异常任务ErrorTask,放入到任务队列,然后消费者线程在处理任务时,首先判断任务的类型,如果是ErrorTask,则停止并退出。


给出的基本实现伪代码如下:


1、生产者线程代码

1329c9ed3c7dfaf608cfc1354b6342ed.png

重点在线程内部需要捕获异常,然后将异常封装成一个Error任务,供下游感知。


2、消费者线程代码

08291b7a7a2b9168e163b2bbbaa8898f.png


2.2 消费端线程异常如何通知生产线程


消费端线程的角色主要是从任务队列中获取任务,但自身如果出现异常,只会阻塞生产端线程,要通知发送方线程异常,通常的方式是客户端需要持有发送方的对象,并通过其提供的方法进行通知


发送者线程需要预留检测点与关闭方法,其代码如下:

b9e5be80351261a281bcecff92572164.png

关键点如下:


  • run方法中使用while(!stoped)方式进行循环,内嵌一个阻塞队列。
  • 线程提供一个改变线程状态运行的方法,shutdown,并传人是否是因为异常退出,供其他线程调用。


消费者线程的实现如下:

984f201ab47ab9fdb49f5e50fdfdf32f.png


相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
3天前
|
安全 Java 调度
Java线程:深入理解与实战应用
Java线程:深入理解与实战应用
23 0
|
1天前
|
消息中间件 缓存 NoSQL
Java多线程实战-CompletableFuture异步编程优化查询接口响应速度
Java多线程实战-CompletableFuture异步编程优化查询接口响应速度
|
1天前
|
数据采集 存储 Java
高德地图爬虫实践:Java多线程并发处理策略
高德地图爬虫实践:Java多线程并发处理策略
|
2天前
|
缓存 Java
【Java基础】简说多线程(上)
【Java基础】简说多线程(上)
6 0
|
3天前
|
SQL Java 数据库连接
Java从入门到精通:2.3.1数据库编程——学习JDBC技术,掌握Java与数据库的交互
ava从入门到精通:2.3.1数据库编程——学习JDBC技术,掌握Java与数据库的交互
|
3天前
|
并行计算 算法 安全
Java从入门到精通:2.1.3深入学习Java核心技术——掌握Java多线程编程
Java从入门到精通:2.1.3深入学习Java核心技术——掌握Java多线程编程
|
3天前
|
安全 Java 编译器
是时候来唠一唠synchronized关键字了,Java多线程的必问考点!
本文简要介绍了Java中的`synchronized`关键字,它是用于保证多线程环境下的同步,解决原子性、可见性和顺序性问题。从JDK1.6开始,synchronized进行了优化,性能得到提升,现在仍可在项目中使用。synchronized有三种用法:修饰实例方法、静态方法和代码块。文章还讨论了synchronized修饰代码块的锁对象、静态与非静态方法调用的互斥性,以及构造方法不能被同步修饰。此外,通过反汇编展示了`synchronized`在方法和代码块上的底层实现,涉及ObjectMonitor和monitorenter/monitorexit指令。
16 0
|
3天前
|
监控 安全 Java
在Java中如何优雅的停止一个线程?可别再用Thread.stop()了!
在Java中如何优雅的停止一个线程?可别再用Thread.stop()了!
11 2
|
3天前
|
Java 调度
Java面试必考题之线程的生命周期,结合源码,透彻讲解!
Java面试必考题之线程的生命周期,结合源码,透彻讲解!
29 1
|
3天前
|
安全 Java
Java基础教程(15)-多线程基础
【4月更文挑战第15天】Java内置多线程支持,通过Thread类或Runnable接口实现。线程状态包括New、Runnable、Blocked、Waiting、Timed Waiting和Terminated。启动线程调用start(),中断线程用interrupt(),同步用synchronized关键字。线程安全包如java.util.concurrent提供并发集合和原子操作。线程池如ExecutorService简化任务管理,Callable接口允许返回值,Future配合获取异步结果。Java 8引入CompletableFuture支持回调。