读完 RocketMQ 源码,我学会了如何优雅的创建线程

简介: RocketMQ 是一款开源的分布式消息系统,基于高可用分布式集群技术,提供低延时、高可靠的消息发布与订阅服务。这篇文章,笔者整理了 RocketMQ 源码中创建线程的几点技巧,希望大家读完之后,能够有所收获。

RocketMQ 是一款开源的分布式消息系统,基于高可用分布式集群技术,提供低延时、高可靠的消息发布与订阅服务。

这篇文章,笔者整理了 RocketMQ 源码中创建线程的几点技巧,希望大家读完之后,能够有所收获。

1 创建单线程

首先我们先温习下常用的创建单线程的两种方式:

  • 实现 Runnable 接口
  • 继承 Thread 类

▍一、实现 Runnable 接口

图中,MyRunnable 类实现了 Runnable 接口的 run 方法,run 方法中定义具体的任务代码或处理逻辑,而Runnable 对象是作为线程构造函数的参数。

▍二、 继承 Thread 类

线程实现类直接继承 Thread ,本质上也是实现 Runnable 接口的 run 方法。

2 单线程抽象类

创建单线程的两种方式都很简单,但每次创建线程代码显得有点冗余,于是 RocketMQ 里实现了一个抽象类 ServiceThread 。

抽象类 ServiceThread

我们可以看到抽象类中包含了如下核心方法:

  1. 定义线程名;
  2. 启动线程;
  3. 关闭线程。

下图展示了 RocketMQ 众多的单线程实现类。

实现类的编程模版类似 :

我们仅仅需要继承抽象类,并实现 getServiceNamerun 方法即可。启动的时候,调用 start 方法 , 关闭的时候调用 shutdown 方法。

3 线程池原理

线程池是一种基于池化思想管理线程的工具,线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。

JDK中提供的 ThreadPoolExecutor 类,是我们最常使用的线程池类。

ThreadPoolExecutor构造函数

参数名 作用
corePoolSize 队列没满时,线程最大并发数
maximumPoolSizes 队列满后线程能够达到的最大并发数
keepAliveTime 空闲线程过多久被回收的时间限制
unit keepAliveTime 的时间单位
workQueue 阻塞的队列类型
threadPoolFactory 改变线程的名称、线程组、优先级、守护进程状态
RejectedExecutionHandler 超出 maximumPoolSizes + workQueue 时,任务会交给RejectedExecutionHandler来处理

任务的调度通过执行 execute方法完成,方法的核心流程如下:

  1. 如果 workerCount < corePoolSize,创建并启动一个线程来执行新提交的任务。
  2. 如果 workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中。
  3. 如果 workerCount >= corePoolSize && workerCount < maximumPoolSize,且线程池内的阻塞队列已满,则创建并启动一个线程来执行新提交的任务。
  4. 如果 workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满, 则根据拒绝策略来处理该任务, 默认的处理方式是直接抛异常。

4 线程池封装

在 RocketMQ 里 ,网络请求都会携带命令编码,每种命令映射对应的处理器,而处理器又会注册对应的线程池。

当服务端 Broker 接收到发送消息命令时,都会有单独的线程池 sendMessageExecutor 来处理这种命令请求。

基于 ThreadPoolExecutor 做了一个简单的封装 ,BrokerFixedThreadPoolExecutor 构造函数包含六个核心参数:

  1. 核心线程数和最大线程数相同 ,数量是:cpu核数和4比较后的最小值;
  2. 空闲线程的回收的时间限制,默认1分钟;
  3. 发送消息队列,有界队列,默认10000;
  4. 线程工厂 ThreadFactoryImpl ,定义了线程名前缀:SendMessageThread_ 。

RocketMQ 实现了一个简单的线程工厂:ThreadFactoryImpl,线程工厂可以定义线程名称,以及是否是守护线程 。

线程工厂

开源项目 Cobar ,Xmemcached,Metamorphosis 中都有类似线程工厂的实现 。

5 线程名很重要

线程名很重要,线程名很重要,线程名很重要 ,重要的事情说三遍。

我们看到 RocketMQ 中,无论是单线程抽象类还是多线程的封装都会配置线程名 ,因为通过线程名,非常容易定位问题,从而大大提升解决问题的效率。

定位的媒介常见有两种:日志文件堆栈记录

▍一、日志文件

经常处理业务问题的同学,一定都经常与日志打交道。

  • 查看 ERROR 日志,追溯到执行线程, 要是线程池隔离做的好,基本可以判断出哪种业务场景出了问题;
  • 通过查看线程打印的日志,推断线程调度是否正常,比如有的定时任务线程打印了开始,没有打印结束,推论当前线程可能已经挂掉或者阻塞。

▍二、堆栈记录

jstack 是 java 虚拟机自带的一种堆栈跟踪工具 ,主要用来查看 Java 线程的调用堆栈,线程快照包含当前 java 虚拟机内每一条线程正在执行的方法堆栈的集合,可以用来分析线程问题。

jstack -l 进程pid

笔者查看线程堆栈,一般关注如下几点:

  1. 当前 jvm 进程中的线程数量和线程分类是否在预期的范围内;
  2. 系统接口超时或者定时任务停止的异常场景下 ,分析堆栈中是否有锁未释放,或者线程一直等待网络通讯响应;
  3. 分析 jvm 进程中哪个线程占用的 CPU 最高。

6 总结

本文是RocketMQ 系列文章的开篇,和朋友们简单聊聊 RocketMQ 源码里创建线程的技巧。

  1. 单线程抽象类 ServiceThread

    使用者只需要实现业务逻辑以及定义线程名即可 ,不需要写冗余的代码。

  2. 线程池封装

    适当封装,定义线程工厂,并合理配置线程池参数。

  3. 线程名很重要

    文件日志,堆栈记录配合线程名能大大提升解决问题的效率。

RocketMQ 的多线程编程技巧很多,比如线程通讯,并发控制,线程模型等等,后续的文章会一一为大家展现。


如果我的文章对你有所帮助,还请帮忙点赞、在看、转发一下,你的支持会激励我输出更高质量的文章,非常感谢!

相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
3月前
|
消息中间件 Java API
RocketMQ事务消息, 图文、源码学习探究~
介绍 RocketMQ是阿里巴巴开源的分布式消息中间件,它是一个高性能、低延迟、可靠的消息队列系统,用于在分布式系统中进行异步通信。 从4.3.0版本开始正式支持分布式事务消息~ RocketMq事务消息支持最终一致性:在普通消息基础上,支持二阶段的提交能力。将二阶段提交和本地事务绑定,实现全局提交结果的一致性。 原理、流程 本质上RocketMq的事务能力是基于二阶段提交来实现的 在消息发送上,将二阶段提交与本地事务绑定 本地事务执行成功,则事务消息成功,可以交由Consumer消费 本地事务执行失败,则事务消息失败,Consumer无法消费 但是,RocketMq只能保证本地事务
|
5月前
|
Java API 网络架构
关于 Spring Integration 你知道多少,包含集成MQTT案例讲述及源码3
关于 Spring Integration 你知道多少,包含集成MQTT案例讲述及源码
192 0
关于 Spring Integration 你知道多少,包含集成MQTT案例讲述及源码3
|
4月前
|
消息中间件 Apache 开发工具
RocketMQ-初体验RocketMQ(08)-IDEA拉取调测RocketMQ源码
RocketMQ-初体验RocketMQ(08)-IDEA拉取调测RocketMQ源码
38 0
|
5月前
|
消息中间件 Apache RocketMQ
电子好书发您分享《Apache RocketMQ 源码解析》
电子好书发您分享《Apache RocketMQ 源码解析》
123 1
|
5月前
|
消息中间件 中间件 Kafka
RocketMQ源码(二)消息消费的模式到底是Push还是Pull?
RocketMQ源码(二)消息消费的模式到底是Push还是Pull?
86 1
|
2月前
|
消息中间件 Java RocketMQ
【深度挖掘 RocketMQ底层源码】「底层源码挖掘系列」抽丝剥茧贯穿RocketMQ的消费者端的运行核心的流程(Pull模式-下)
【深度挖掘 RocketMQ底层源码】「底层源码挖掘系列」抽丝剥茧贯穿RocketMQ的消费者端的运行核心的流程(Pull模式-下)
13 1
|
2月前
|
消息中间件 存储 NoSQL
【深度挖掘 RocketMQ底层源码】「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行核心的流程(Pull模式-上)
【深度挖掘 RocketMQ底层源码】「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行核心的流程(Pull模式-上)
28 1
|
2月前
|
消息中间件 存储 Kafka
【深入浅出 RocketMQ原理及实战】「底层源码挖掘系列」透彻剖析贯穿一下RocketMQ和Kafka索引设计原理和方案
【深入浅出 RocketMQ原理及实战】「底层源码挖掘系列」透彻剖析贯穿一下RocketMQ和Kafka索引设计原理和方案
53 1
|
5月前
|
消息中间件 负载均衡 算法
RocketMQ源码(三)简单探索Producer和Consumer与Queue之间的负载均衡策略
- Producer如何将消息负载均衡发送给queue? - Consumer如何通过负载均衡并发消费queue的消息?
471 0
|
5月前
|
消息中间件 Apache RocketMQ
电子好书发您分享《Apache RocketMQ 源码解析》
电子好书发您分享《Apache RocketMQ 源码解析》
78 1

热门文章

最新文章