如何优雅的使用和理解线程池(下)

简介: 平时接触过多线程开发的童鞋应该都或多或少了解过线程池,之前发布的《阿里巴巴 Java 手册》里也有一条

SpringBoot 使用线程池


2018 年了,SpringBoot 盛行;来看看在 SpringBoot 中应当怎么配置和使用线程池。


既然用了 SpringBoot ,那自然得发挥 Spring 的特性,所以需要 Spring 来帮我们管理线程池:


@Configuration
public class TreadPoolConfig {
    /**
     * 消费队列线程
     * @return
     */
    @Bean(value = "consumerQueueThreadPool")
    public ExecutorService buildConsumerQueueThreadPool(){
        ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
                .setNameFormat("consumer-queue-thread-%d").build();
        ExecutorService pool = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS,
                new ArrayBlockingQueue<Runnable>(5),namedThreadFactory,new ThreadPoolExecutor.AbortPolicy());
        return pool ;
    }
}


使用时:


@Resource(name = "consumerQueueThreadPool")
    private ExecutorService consumerQueueThreadPool;
    @Override
    public void execute() {
        //消费队列
        for (int i = 0; i < 5; i++) {
            consumerQueueThreadPool.execute(new ConsumerQueueThread());
        }
    }


其实也挺简单,就是创建了一个线程池的 bean,在使用时直接从 Spring 中取出即可。


监控线程池


谈到了 SpringBoot,也可利用它 actuator 组件来做线程池的监控。


线程怎么说都是稀缺资源,对线程池的监控可以知道自己任务执行的状况、效率等。


关于 actuator 就不再细说了,感兴趣的可以看看这篇,有详细整理过如何暴露监控端点。


其实 ThreadPool 本身已经提供了不少 api 可以获取线程状态:



很多方法看名字就知道其含义,只需要将这些信息暴露到 SpringBoot 的监控端点中,我们就可以在可视化页面查看当前的线程池状态了。


甚至我们可以继承线程池扩展其中的几个函数来自定义监控逻辑:



看这些名称和定义都知道,这是让子类来实现的。


可以在线程执行前、后、终止状态执行自定义逻辑。


线程池隔离


线程池看似很美好,但也会带来一些问题。


如果我们很多业务都依赖于同一个线程池,当其中一个业务因为各种不可控的原因消耗了所有的线程,导致线程池全部占满。


这样其他的业务也就不能正常运转了,这对系统的打击是巨大的。


比如我们 Tomcat 接受请求的线程池,假设其中一些响应特别慢,线程资源得不到回收释放;线程池慢慢被占满,最坏的情况就是整个应用都不能提供服务。


所以我们需要将线程池进行隔离


通常的做法是按照业务进行划分:


比如下单的任务用一个线程池,获取数据的任务用另一个线程池。这样即使其中一个出现问题把线程池耗尽,那也不会影响其他的任务运行。


hystrix 隔离


这样的需求 Hystrix 已经帮我们实现了。


Hystrix 是一款开源的容错插件,具有依赖隔离、系统容错降级等功能。


下面来看看 Hystrix 简单的应用:


首先需要定义两个线程池,分别用于执行订单、处理用户。


/**
 * Function:订单服务
 *
 * @author crossoverJie
 *         Date: 2018/7/28 16:43
 * @since JDK 1.8
 */
public class CommandOrder extends HystrixCommand<String> {
    private final static Logger LOGGER = LoggerFactory.getLogger(CommandOrder.class);
    private String orderName;
    public CommandOrder(String orderName) {
        super(Setter.withGroupKey(
                //服务分组
                HystrixCommandGroupKey.Factory.asKey("OrderGroup"))
                //线程分组
                .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("OrderPool"))
                //线程池配置
                .andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter()
                        .withCoreSize(10)
                        .withKeepAliveTimeMinutes(5)
                        .withMaxQueueSize(10)
                        .withQueueSizeRejectionThreshold(10000))
                .andCommandPropertiesDefaults(
                        HystrixCommandProperties.Setter()
                                .withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.THREAD))
        )
        ;
        this.orderName = orderName;
    }
    @Override
    public String run() throws Exception {
        LOGGER.info("orderName=[{}]", orderName);
        TimeUnit.MILLISECONDS.sleep(100);
        return "OrderName=" + orderName;
    }
}
/**
 * Function:用户服务
 *
 * @author crossoverJie
 *         Date: 2018/7/28 16:43
 * @since JDK 1.8
 */
public class CommandUser extends HystrixCommand<String> {
    private final static Logger LOGGER = LoggerFactory.getLogger(CommandUser.class);
    private String userName;
    public CommandUser(String userName) {
        super(Setter.withGroupKey(
                //服务分组
                HystrixCommandGroupKey.Factory.asKey("UserGroup"))
                //线程分组
                .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("UserPool"))
                //线程池配置
                .andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter()
                        .withCoreSize(10)
                        .withKeepAliveTimeMinutes(5)
                        .withMaxQueueSize(10)
                        .withQueueSizeRejectionThreshold(10000))
                //线程池隔离
                .andCommandPropertiesDefaults(
                        HystrixCommandProperties.Setter()
                                .withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.THREAD))
        )
        ;
        this.userName = userName;
    }
    @Override
    public String run() throws Exception {
        LOGGER.info("userName=[{}]", userName);
        TimeUnit.MILLISECONDS.sleep(100);
        return "userName=" + userName;
    }
}


api 特别简洁易懂,具体详情请查看官方文档。

然后模拟运行:

public static void main(String[] args) throws Exception {
        CommandOrder commandPhone = new CommandOrder("手机");
        CommandOrder command = new CommandOrder("电视");
        //阻塞方式执行
        String execute = commandPhone.execute();
        LOGGER.info("execute=[{}]", execute);
        //异步非阻塞方式
        Future<String> queue = command.queue();
        String value = queue.get(200, TimeUnit.MILLISECONDS);
        LOGGER.info("value=[{}]", value);
        CommandUser commandUser = new CommandUser("张三");
        String name = commandUser.execute();
        LOGGER.info("name=[{}]", name);
    }
复制代码


运行结果:



可以看到两个任务分成了两个线程池运行,他们之间互不干扰。


获取任务任务结果支持同步阻塞和异步非阻塞方式,可自行选择。


它的实现原理其实容易猜到:


利用一个 Map 来存放不同业务对应的线程池。


通过刚才的构造函数也能证明:



还要注意的一点是:


自定义的 Command 并不是一个单例,每次执行需要 new 一个实例,不然会报 This instance can only be executed once. Please instantiate a new instance. 异常。


总结


池化技术确实在平时应用广泛,熟练掌握能提高不少效率。


文末的 hystrix 源码:


github.com/crossoverJi…


相关文章
|
2月前
|
Java
线程池的实现
线程池的实现
22 0
|
12月前
|
缓存 Java
线程池简单总结
线程池简单总结
|
9月前
|
Java
6. 实现简单的线程池
6. 实现简单的线程池
40 0
|
监控 Java
线程池的讲解和实现
线程池的讲解和实现
|
前端开发 Java 调度
你了解线程池吗
你了解线程池吗
65 0
|
存储 Java 调度
线程池使用
线程池使用
|
Java 数据库连接 容器
关于线程池
关于线程池
73 0
|
存储 缓存 Java
理解与实现线程池
理解与实现线程池
123 0
KeyAffinityExecutor 线程池
KeyAffinityExecutor 线程池
|
监控 Java
关于线程池,你需要了解这些
1、 降低资源消耗;提高线程利用率,降低创建和销毁线程的消耗。 2、 提高响应速度;任务来了,直接有线程可用可执行,而不是先创建线程,再执行。 3、 提高线程的可管理性;线程是稀缺资源,使用线程池可以统一分配调优监控。
132 0