ThreadPoolTaskExecutor @Async调用方法

简介: ThreadPoolTaskExecutor @Async调用方法

SpringBoot ThreadPoolTaskExecutor @Async

SpringBoot项目中,异步线程池的使用,参数设置,队列拒绝策略;以及对比ForkJoinPool各场景下的性能。

环境:jdk8、springboot 2.1.6

线程池注入(一)

多线程池注入,用于多个业务场景,避免各业务之间相互影响

package com.mpos.mnp.web.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.ThreadPoolExecutor;
/**
 * @className: BeanConfig
 * @author: www.wityx.com
 */
@Configuration
@EnableAsync
public class BeanConfig {
  @Bean("taskExecutor")
  public TaskExecutor taskExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    // 设置核心线程数
    executor.setCorePoolSize(50);
    // 设置最大线程数
    executor.setMaxPoolSize(200);
    // 设置队列容量
    executor.setQueueCapacity(20000);
    // 设置线程活跃时间(秒)
    executor.setKeepAliveSeconds(60);
    // 设置默认线程名称
    executor.setThreadNamePrefix("mnp-send");
    // 设置拒绝策略
    executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
    return executor;
  }
 
  @Bean("jobExecutor")
  public TaskExecutor jobExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    // 设置核心线程数
    executor.setCorePoolSize(50);
    // 设置最大线程数
    executor.setMaxPoolSize(200);
    // 设置队列容量
    executor.setQueueCapacity(500);
    // 设置线程活跃时间(秒)
    executor.setKeepAliveSeconds(60);
    // 设置默认线程名称
    executor.setThreadNamePrefix("mnp-job");
    // 设置拒绝策略
    executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
    return executor;
  }
}

参数解释

  • corePoolSize:核心线程数
  • 核心线程会一直存活,及时没有任务需要执行
  • 当线程数小于核心线程数时,即使有线程空闲,线程池也会优先创建新线程处理
  • 设置allowCoreThreadTimeout=true(默认false)时,核心线程会超时关闭
  • queueCapacity:任务队列容量(阻塞队列)
  • 当核心线程数达到最大时,新任务会放在队列中排队等待执行 ,队列存放的数据大小跟分配的内存有关
  • maxPoolSize:最大线程数
  • 当线程数>=corePoolSize,且任务队列已满时。线程池会创建新线程来处理任务
  • 当线程数=maxPoolSize,且任务队列已满时,线程池会跟根据拒绝策略进行相应的处理
  • keepAliveTime:线程空闲时间(s)
  • 当线程空闲时间达到keepAliveTime时,线程会退出,直到线程数量=corePoolSize
  • 如果allowCoreThreadTimeout=true,则会直到线程数量=0
  • threadNamePrefix:设置默认线程名称
  • 线程前缀名称,有助于区分不同线程池之间的线程
  • rejectedExecutionHandler:任务拒绝处理器
  • 两种情况会拒绝处理任务:
  1. 当线程数已经达到maxPoolSize,切队列已满,会拒绝新任务
  2. 当线程池被调用shutdown()后,会等待线程池里的任务执行完毕,再shutdown。如果在调用shutdown()和线程池真正shutdown之间提交任务,会拒绝新任务
  • 线程池会调用rejectedExecutionHandler来处理这个任务。如果没有设置默认是AbortPolicy,会抛出异常
  • ThreadPoolExecutor类有几个内部实现类来处理这类情况:
  1. AbortPolicy 丢弃任务,抛运行时异常
  2. CallerRunsPolicy 执行任务 P:当线程池满以后,队列达到最大值时,异步先会变为同步执行,影响主线程性能,请结合业务场景,具体分析使用(本人亲测)
  3. DiscardPolicy 忽视,什么都不会发生
  4. DiscardOldestPolicy 从队列中踢出最先进入队列(最后一个执行)的任务

以下参数请慎用, 线程一直再运行时,无法关闭程序,需kill进程。并丢失队列中的数据。

    // 等待所有任务结束后再关闭线程池
    executor.setWaitForTasksToCompleteOnShutdown(true);

异步使用(二)

异步实现类

package com.mpos.mnp.app.impl;
 
import com.mpos.mnp.dao.entity.AgencyNoticeMessage;
import com.mpos.mnp.dao.entity.AppNoticeMessage;
import com.mpos.mnp.service.job.MessageJobService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
 
import javax.annotation.Resource;
 
/**
 * @author: http://www.wityx.com
 * @link:http://www.wityx.com/post/822_1_1.html
 */
@Component
@Slf4j
public class AsyncSendMessage {
 
  /**
  * 业务类Service
  */
    @Resource
    private MessageJobService messageJobService;
 
    /**
     * 异步发送
     *
     * @param id
     */
    @Async("taskExecutor")
    public void sendAgencyMessage(String id) {
        log.info("异步通知");
        // 参数传递用id,使用更小的字节放入缓存队列
        messageJobService.sendAgencyMessage(null,id);
    }
    /**
     * 异步发送
     *
     * @param id
     */
    @Async("jobExecutor")
    public void sendAppMessage(String id) {
        log.info("异步推送消息");
        // 参数传递用id,使用更小的字节放入缓存队列
        messageJobService.sendAppPushMessage(null,id);
    }
}
 

特别注意@Async的使用,不可在同一类使用@Async,否则不生效。

其他类使用使用异步处理

  // 建议使用此注解,避免使用@Autowired,具体原因参考官方说明
  @Resource
    private AsyncSendMessage asyncSendMessage;
 
  /**
     * 数据保存
     * @author: www.wityx.com
     * @param param
     */
    private void saveAppMessageData(AppMessageParam param) {
 
        String uniqNo = "MNP" + param.getReqSource() + param.getReqNo();
        try {
            //防止重复提交判断 Redis锁 默认60秒有效期
            boolean lockFlag = redisUtil.lock(uniqNo, uniqNo, 60);
            if (!lockFlag) {
                log.warn("【移动推送】请求单号:{}重复提交!", param.getReqNo());
                throw new BusinessException(ResultCode.COMMON_DULIICATE_SUBMIT.getCode(), "请求号:" + param.getReqNo());
            }
            // 异步处理
            asyncSendMessage.sendAppMessage(message.getId());
        } catch (BusinessException e) {
            throw e;
        } catch (Throwable e) {
            throw new BusinessException(ResultCode.SAVE_DB_ERROR);
        } finally {
            // 解锁
            redisUtil.unlock(uniqNo, uniqNo);
        }
    }

在使用异步时,捕获异常建议使用Throwable,Exception的父类。

springboot启动类打开异步

package com.mpos.mnp.web;
import com.alibaba.dubbo.config.spring.context.annotation.EnableDubboConfig;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.data.mongodb.repository.config.EnableMongoRepositories;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;
import springfox.documentation.swagger2.annotations.EnableSwagger2;
 
/**
 * Description: web 启动类
 * @author: www.wityx.com
 */
@SpringBootApplication
@EnableSwagger2
@ComponentScan(basePackages = {"com.wityx.www","com.gpay.user"})
@EnableMongoRepositories(basePackages = "com.wityx.www.dao.repository")
@EnableDubboConfig
@EnableScheduling
@EnableAsync
public class MnpWeb {
    public static void main(String[] args){
        SpringApplication.run(MnpWeb.class, args);
    }
}
 


相关文章
|
JavaScript 前端开发 开发工具
基于 Vue3.0 和 Ant Design Vue ,高颜值管理后台UI框架vue-vben-admin运行
基于 Vue3.0 和 Ant Design Vue ,高颜值管理后台UI框架vue-vben-admin运行
669 1
|
Java 测试技术 Docker
Spring Boot 学习研究笔记(十九)-docker部署SpringBoot 日志目录挂载
Spring Boot 学习研究笔记(十九)-docker部署SpringBoot 日志目录挂载
791 0
|
4天前
|
存储 缓存 算法
深入理解JVM《JVM内存区域详解 - 世界的基石》
Java代码从编译到执行需经javac编译为.class字节码,再由JVM加载运行。JVM内存分为线程私有(程序计数器、虚拟机栈、本地方法栈)和线程共享(堆、方法区)区域,其中堆是GC主战场,方法区在JDK 8+演变为使用本地内存的元空间,直接内存则用于提升NIO性能,但可能引发OOM。
|
5月前
|
SQL Java 数据安全/隐私保护
发现问题:Mybatis-plus的分页总数为0,分页功能失效,以及多租户插件的使用。
总的来说,使用 Mybatis-plus 确实可以极大地方便我们的开发,但也需要我们理解其工作原理,掌握如何合适地使用各种插件。分页插件和多租户插件是其中典型,它们的运用可以让我们的代码更为简洁、高效,理解和掌握好它们的用法对我们的开发过程有着极其重要的意义。
557 15
|
应用服务中间件 nginx
nginx更改配置文件后重启
nginx更改配置文件后重启
582 1
|
Java Spring 容器
Springboot中的@Import注解~
Springboot中的@Import注解~
227 0
|
移动开发 JavaScript
简单介绍下阿里云的H5滑动验证+H5示例源码
简单介绍下阿里云的H5滑动验证+H5示例源码
325 0
|
Java 数据库连接 数据库
Java线程池七大参数详解和配置(2)
Java线程池七大参数详解和配置(2)
908 0
|
前端开发 架构师 程序员
盘点13个值得程序员逛的论坛
盘点13个我自己比较经常使用的、有用的网站,包括资源、论坛网站等。
2339 0
盘点13个值得程序员逛的论坛
|
Web App开发 移动开发 前端开发
PDF 预览和下载你是怎么实现的?(下)
PDF 预览和下载你是怎么实现的?
887 0