异步和线程池
初始化线程的方式
继承Thread
主线程
public static void main(String[] args) throws ExecutionException, InterruptedException { // 继承Thread System.out.println("主方法1.。start。。。"); Thread01 thread01 = new Thread01(); thread01.start(); System.out.println("主方法1.。end。。。"); }
/** * 继承Thread 重新run方法 */ public static class Thread01 extends Thread{ @Override public void run() { System.out.println("当前线程:"+ Thread.currentThread().getId()); System.out.println("i = " + 10 / 2); } }
输出结果
主方法1.。start。。。 当前线程:12 i = 5 主方法1.。end。。。
实现Runnable接口
主线程
public static void main(String[] args) throws ExecutionException, InterruptedException { //实现Runnable接口的方式 System.out.println("主方法1.。start。。。"); new Thread(new Runnable01()).start(); System.out.println("主方法1.。end。。。"); }
/** * 实现Runnable接口的方式 */ public static class Runnable01 implements Runnable{ @Override public void run() { System.out.println("当前线程:"+ Thread.currentThread().getId()); System.out.println("i = " + 10 / 2); } }
结果
主方法1.。start。。。 当前线程:12 i = 5 主方法1.。end。。。
实现Callable接口 + FutureTask (可以拿到返回值,可以处理异常)
public static void main(String[] args) throws ExecutionException, InterruptedException { System.out.println("主方法1.。start。。。"); FutureTask futureTask = new FutureTask<Integer>(new Callable01()); new Thread(futureTask).start(); //阻塞等待整个线程完成,获取返回结果 Integer result = (Integer) futureTask.get(); System.out.println("异步Callable执行后的返回结果是:"+ result); System.out.println("主方法1.。end。。。"); }
/** * 实现Callable接口 */ public static class Callable01 implements Callable<Integer> { @Override public Integer call() throws Exception { System.out.println("当前线程:"+ Thread.currentThread().getId()); System.out.println("i = " + 10 / 2); return 10 / 2; } }
输出
主方法1.。start。。。 当前线程:12 i = 5 异步Callable执行后的返回结果是:5 主方法1.。end。。。
线程池
一般在业务中以上三个方法都不推荐使用,一般是给线程池提交任务就可以了。
主方法
public static ExecutorService executor = Executors.newFixedThreadPool(10);
public static void main(String[] args) throws ExecutionException, InterruptedException { //线程池的使用 System.out.println("主方法1.。start。。。"); ExecutorService service = Executors.newFixedThreadPool(10); Future<?> submit = service.submit(new Runnable01()); submit.get(); System.out.println("主方法1.。end。。。"); }
可以向线程池中直接放入Runnable 或者Callable的任务即可
详解
/** * 线程池七大参数: * corePoolSize 初始化的线程数量,等待接收异步请求 * maximumPoolSize 最大线程数量,可以控制资源 * keepAliveTime 存活时间,如果当前线程数量大于核心数量,那么就会释放空闲的线程,只要线程空闲大于执行的空闲时间 * unit 执行线程存活的时间单位 * BlockingQueue<Runnable> workQueue 阻塞队列,如果任务有很多,就会将目前多的任务放在队列中,只要有线程空闲 * 就会去队列中取出新的任务继续执行 * threadFactory 线程的创建工厂 * RejectedExecutionHandler handler 如果队列满了,按照我们的策略拒绝执行的任务 * * 工作顺序: * 1)当线程进来后会直接分配给核心线程中,也就是corePoolSize初始化的线程数 * 2)当corePoolSize满了以后,默认将线程放到workQueue中等待核心线程调用 * 3)如果workQueue满了就会再启动新的线程,但是最大数量不能超过maximumPoolSize * 4)maximumPoolSize满了如果还有新的线程任务就会调用 handler丢弃策略, * 5)如果maximumPoolSize中任务都执行完毕,那么就会等待keepAliveTime指定的空闲 * 时间后进行销毁。 * */ ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( 10, 100000, 3, TimeUnit.MINUTES, new LinkedBlockingDeque<>(100000), //默认是Integer的最大值所以需要修改 Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy() ); threadPoolExecutor.execute(new Runnable01());
总结
在实际的开发过程中要使用线程池进行异步任务的完成,因为用Thread Runnable Callable这些都不能做到对线程的管理,很容易就导致资源浪费和消耗。
使用线程池的好处:
- 降低资源的消耗
- 提高响应速度
- 提高线程的可管理性
CompletableFuture异步编排
需求场景:面对复杂关系的异步任务进行编排处理
使用
CompletableFuture.runAsync() 这个异步方法没有返回值,所以给的是Void类型
CompletableFuture<Void> future = CompletableFuture.runAsync(()->{ System.out.println("当前线程:"+ Thread.currentThread().getId()); int i = 10 / 2; System.out.println("i = " + i); },executor);
CompletableFuture.supplyAsync()这个异步方法有返回值,返回值是Integer因此可以调用get方法,等异步任务执行结束后获取到返回值即可。
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(()->{ System.out.println("当前线程:"+ Thread.currentThread().getId()); int i = 10 / 2; System.out.println("i = " + i); return i; },executor); Integer i = future.get();
计算完成时使用函数回调whenComplete
还可以使用链式编程继续在后面追加方法
public class ThreadTest01 { //创建一个线程池 public static ExecutorService executor = Executors.newFixedThreadPool(10); public static void main(String[] args) throws ExecutionException, InterruptedException { System.out.println("主方法1.。start。。。"); //调用有返回值的异步 CompletableFuture<Integer> future = CompletableFuture. (()->{ System.out.println("当前线程:"+ Thread.currentThread().getId()); int i = 10 / 0; System.out.println("i = " + i); return i; },executor).whenComplete( (res,exc) -> { System.out.println("异步线程执行完毕返回的结果是:"+ res + ";抛出的异常是:" + exc); }).exceptionally( (exc) -> { //如果发生异常那么exceptionally可以直接得到异常信息还可以返回一个指定的异常信息 // 这个返回的异常信息可以让future.get();这里拿到 return 404; }); // get一方面可以等待异步线程完成后再执行其他业务,还可以获取异步任务的返回结果 Integer i = future.get(); System.out.println("主方法1.。end。。。"+ i); }
计算完成时使用函数回调handle,handle和handleAsync可以对异步返回结果和异常进行处理
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(()->{ System.out.println("当前线程:"+ Thread.currentThread().getId()); int i = 10 / 0; System.out.println("i = " + i); return i; },executor).handle( (res,exc) -> { if (res != null){ return res*2; } if (exc !=null){ return 404; } return 0; }); // get一方面可以等待异步线程完成后再执行其他业务,还可以获取异步任务的返回结果 Integer i = future.get(); System.out.println("主方法1.。end。。。"+ i);
输出
主方法1.。start。。。 当前线程:12 主方法1.。end。。。404
线程串行化方法
thenApply方法,当一个线程依赖另一个线程的时候,获取上一个任务的返回值并返回当前任务的返回值。
thenAccepte 消费上一个线程的处理结果,无返回结果
thenRun 当上一个线程处理完成后,执行thenRun的后续操作
带有Async都是异步的
获取返回值使用future.get()
两任务组合
thenCombine 两个任务都执行,第三个任务可以获取上两个任务的返回值,并处理并返回新的结果
thenAccpteBoth 两个任务都执行,并且第三个任务可以获取上两个任务的返回值,但是不能返回新的处理结果
thenRunBoth 两个任务执行后第三个任务开始执行,并且只能传入Runnable不能返回结果
runAfterEither 两个任务中任何一个任务执行完成后都立即执行第三个任务,因为第三个任务传入的是Runnable接口所以不能有返回值
acceptEither 可以接收两个任务中率先完成的任务的返回值并处理,但是第三个任务不能有返回值
applyToEither 可以接收两个任务中率先完成的任务的返回值并处理,第三个任务处理后可以有返回值
多任务组合
allOf 当所有的异步任务都执行完毕后才能继续往下执行 (消耗最少时间:用时最长的那个任务)
anyOf 当任何一个异步任务执行完毕后就会继续往下执行(消耗最少时间:用时最快的那个任务)
CompletableFuture<String> future01 = CompletableFuture.supplyAsync(() -> { System.out.println("future01"); return "future01.jpg"; }, executor); CompletableFuture<String> future02 = CompletableFuture.supplyAsync(() -> { System.out.println("future02"); return "商品属性:黑色256"; }, executor); CompletableFuture<String> future03 = CompletableFuture.supplyAsync(() -> { System.out.println("future03"); return "商品分类信息"; }, executor); CompletableFuture allOf = CompletableFuture.allOf(future01,future02,future03); //CompletableFuture.anyOf(future01,future02,future03); allOf.get(); System.out.println("end" + future01.get() + future02.get() + future03.get());
最佳实战
@Override public SkuItemVo item(Long skuId) { SkuItemVo skuItemVo = new SkuItemVo(); CompletableFuture<SkuInfoEntity> infoFuture = CompletableFuture.supplyAsync(() -> { //1、sku基本信息获取 SkuInfoEntity skuInfoEntity = getById(skuId); skuItemVo.setInfo(skuInfoEntity); return skuInfoEntity; }, executor); CompletableFuture<Void> ImageFuture = infoFuture.thenRunAsync(() -> { //2、sku图片信息 List<SkuImagesEntity> skuImagesEntities = skuImagesService.list( new LambdaQueryWrapper<SkuImagesEntity>().eq(SkuImagesEntity::getSkuId, skuId)); skuItemVo.setImages(skuImagesEntities); }, executor); CompletableFuture<Void> saleAttrFuture = infoFuture.thenAcceptAsync((skuInfoEntity) -> { Long spuId = skuInfoEntity.getSpuId(); //3、获取spu的销售属性组合 List<SkuItemVo.SkuItemSaleAttrVo> saleAttr = skuSaleAttrValueService.getSaleAttrBySpuId(spuId); skuItemVo.setSaleAttr(saleAttr); }, executor); CompletableFuture<Void> spuInfoDescEntityFuture = infoFuture.thenAcceptAsync((skuInfoEntity) -> { Long spuId = skuInfoEntity.getSpuId(); //4、获取spu的介绍 SpuInfoDescEntity spuInfoDescEntity = spuInfoDescService.getById(spuId); skuItemVo.setDesc(spuInfoDescEntity); }); CompletableFuture<Void> voidCompletableFuture = infoFuture.thenAcceptAsync((skuInfoEntity) -> { Long spuId = skuInfoEntity.getSpuId(); Long catalogId = skuInfoEntity.getCatalogId(); //5、获取spu规格参数信息 List<SkuItemVo.SpuItemAttrGroupVo> groupAttrs = attrGroupService.getGroupAttrsBySpuCataId(spuId, catalogId); skuItemVo.setGroupAttrs(groupAttrs); }); //等所有任务都做完然后再返回 CompletableFuture<Void> lastFuture = CompletableFuture.allOf(ImageFuture, saleAttrFuture, spuInfoDescEntityFuture, voidCompletableFuture); try { lastFuture.get(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } return skuItemVo; }