JAVA线程&线程池&异步编排

简介: JAVA线程&线程池&异步编排


异步和线程池

初始化线程的方式

继承Thread

主线程

public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 继承Thread
        System.out.println("主方法1.。start。。。");
        Thread01 thread01 = new Thread01();
        thread01.start();
        System.out.println("主方法1.。end。。。");
    }
/**
 * 继承Thread 重新run方法
 */
public static class Thread01 extends Thread{
    @Override
    public void run() {
        System.out.println("当前线程:"+ Thread.currentThread().getId());
        System.out.println("i = " + 10 / 2);
    }
}

输出结果

主方法1.。start。。。
当前线程:12
i = 5
主方法1.。end。。。
实现Runnable接口

主线程

public static void main(String[] args) throws ExecutionException, InterruptedException {
        //实现Runnable接口的方式
        System.out.println("主方法1.。start。。。");
        new Thread(new Runnable01()).start();
        System.out.println("主方法1.。end。。。");
    }
/**
     * 实现Runnable接口的方式
     */
    public static class Runnable01 implements Runnable{
        @Override
        public void run() {
            System.out.println("当前线程:"+ Thread.currentThread().getId());
            System.out.println("i = " + 10 / 2);
        }
    }

结果

主方法1.。start。。。
当前线程:12
i = 5
主方法1.。end。。。
实现Callable接口 + FutureTask (可以拿到返回值,可以处理异常)
public static void main(String[] args) throws ExecutionException, InterruptedException {
        System.out.println("主方法1.。start。。。");
        FutureTask futureTask = new FutureTask<Integer>(new Callable01());
        new Thread(futureTask).start();
        //阻塞等待整个线程完成,获取返回结果
        Integer result = (Integer) futureTask.get();
        System.out.println("异步Callable执行后的返回结果是:"+ result);
        System.out.println("主方法1.。end。。。");
    }
/**
     * 实现Callable接口
     */
    public static class Callable01 implements Callable<Integer> {
        @Override
        public Integer call() throws Exception {
            System.out.println("当前线程:"+ Thread.currentThread().getId());
            System.out.println("i = " + 10 / 2);
            return 10 / 2;
        }
    }

输出

主方法1.。start。。。
当前线程:12
i = 5
异步Callable执行后的返回结果是:5
主方法1.。end。。。
线程池

一般在业务中以上三个方法都不推荐使用,一般是给线程池提交任务就可以了。

主方法

public static ExecutorService executor = Executors.newFixedThreadPool(10);
public static void main(String[] args) throws ExecutionException, InterruptedException {
        //线程池的使用
        System.out.println("主方法1.。start。。。");
        ExecutorService service = Executors.newFixedThreadPool(10);
        Future<?> submit = service.submit(new Runnable01());
        submit.get();
        System.out.println("主方法1.。end。。。");
    }

可以向线程池中直接放入Runnable 或者Callable的任务即可

详解
/**
 * 线程池七大参数:
 *  corePoolSize 初始化的线程数量,等待接收异步请求
 *  maximumPoolSize 最大线程数量,可以控制资源
 *  keepAliveTime  存活时间,如果当前线程数量大于核心数量,那么就会释放空闲的线程,只要线程空闲大于执行的空闲时间
 *  unit  执行线程存活的时间单位
 *  BlockingQueue<Runnable> workQueue 阻塞队列,如果任务有很多,就会将目前多的任务放在队列中,只要有线程空闲
 *  就会去队列中取出新的任务继续执行
 *  threadFactory 线程的创建工厂
 *  RejectedExecutionHandler handler 如果队列满了,按照我们的策略拒绝执行的任务
 *
 *  工作顺序:
 *  1)当线程进来后会直接分配给核心线程中,也就是corePoolSize初始化的线程数
 *  2)当corePoolSize满了以后,默认将线程放到workQueue中等待核心线程调用
 *  3)如果workQueue满了就会再启动新的线程,但是最大数量不能超过maximumPoolSize
 *  4)maximumPoolSize满了如果还有新的线程任务就会调用 handler丢弃策略,
 *  5)如果maximumPoolSize中任务都执行完毕,那么就会等待keepAliveTime指定的空闲
 *  时间后进行销毁。
 *
 */
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
        10,
        100000,
        3,
        TimeUnit.MINUTES,
        new LinkedBlockingDeque<>(100000), //默认是Integer的最大值所以需要修改
        Executors.defaultThreadFactory(),
        new ThreadPoolExecutor.AbortPolicy()
);
threadPoolExecutor.execute(new Runnable01());
总结

在实际的开发过程中要使用线程池进行异步任务的完成,因为用Thread Runnable Callable这些都不能做到对线程的管理,很容易就导致资源浪费和消耗。

使用线程池的好处:

  • 降低资源的消耗
  • 提高响应速度
  • 提高线程的可管理性

CompletableFuture异步编排

需求场景:面对复杂关系的异步任务进行编排处理

使用

CompletableFuture.runAsync() 这个异步方法没有返回值,所以给的是Void类型

CompletableFuture<Void> future = CompletableFuture.runAsync(()->{
            System.out.println("当前线程:"+ Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("i = " + i);
        },executor);

CompletableFuture.supplyAsync()这个异步方法有返回值,返回值是Integer因此可以调用get方法,等异步任务执行结束后获取到返回值即可。

CompletableFuture<Integer> future = CompletableFuture.supplyAsync(()->{
            System.out.println("当前线程:"+ Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("i = " + i);
            return i;
        },executor);
Integer i = future.get();

计算完成时使用函数回调whenComplete

还可以使用链式编程继续在后面追加方法

public class ThreadTest01 {
    //创建一个线程池
    public static ExecutorService executor = Executors.newFixedThreadPool(10);
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        System.out.println("主方法1.。start。。。");
        //调用有返回值的异步
            CompletableFuture<Integer> future = CompletableFuture. (()->{
            System.out.println("当前线程:"+ Thread.currentThread().getId());
            int i = 10 / 0;
            System.out.println("i = " + i);
            return i;
        },executor).whenComplete( (res,exc) -> {
                System.out.println("异步线程执行完毕返回的结果是:"+ res + ";抛出的异常是:" + exc);
            }).exceptionally( (exc) -> {
                //如果发生异常那么exceptionally可以直接得到异常信息还可以返回一个指定的异常信息
                // 这个返回的异常信息可以让future.get();这里拿到
                return 404;
            });
        // get一方面可以等待异步线程完成后再执行其他业务,还可以获取异步任务的返回结果
        Integer i = future.get();
        System.out.println("主方法1.。end。。。"+ i);
    }

计算完成时使用函数回调handle,handle和handleAsync可以对异步返回结果和异常进行处理

CompletableFuture<Integer> future = CompletableFuture.supplyAsync(()->{
    System.out.println("当前线程:"+ Thread.currentThread().getId());
    int i = 10 / 0;
    System.out.println("i = " + i);
    return i;
},executor).handle( (res,exc) -> {
    if (res != null){
        return res*2;
    }
    if (exc !=null){
        return 404;
    }
    return 0;
});
// get一方面可以等待异步线程完成后再执行其他业务,还可以获取异步任务的返回结果
Integer i = future.get();
System.out.println("主方法1.。end。。。"+ i);

输出

主方法1.。start。。。
当前线程:12
主方法1.。end。。。404
线程串行化方法

thenApply方法,当一个线程依赖另一个线程的时候,获取上一个任务的返回值并返回当前任务的返回值。

thenAccepte 消费上一个线程的处理结果,无返回结果

thenRun 当上一个线程处理完成后,执行thenRun的后续操作

带有Async都是异步的

获取返回值使用future.get()

两任务组合

thenCombine 两个任务都执行,第三个任务可以获取上两个任务的返回值,并处理并返回新的结果

thenAccpteBoth 两个任务都执行,并且第三个任务可以获取上两个任务的返回值,但是不能返回新的处理结果

thenRunBoth 两个任务执行后第三个任务开始执行,并且只能传入Runnable不能返回结果

runAfterEither 两个任务中任何一个任务执行完成后都立即执行第三个任务,因为第三个任务传入的是Runnable接口所以不能有返回值

acceptEither 可以接收两个任务中率先完成的任务的返回值并处理,但是第三个任务不能有返回值

applyToEither 可以接收两个任务中率先完成的任务的返回值并处理,第三个任务处理后可以有返回值

多任务组合

allOf 当所有的异步任务都执行完毕后才能继续往下执行 (消耗最少时间:用时最长的那个任务)

anyOf 当任何一个异步任务执行完毕后就会继续往下执行(消耗最少时间:用时最快的那个任务)

CompletableFuture<String> future01 = CompletableFuture.supplyAsync(() -> {
            System.out.println("future01");
            return "future01.jpg";
        }, executor);
        CompletableFuture<String> future02 = CompletableFuture.supplyAsync(() -> {
            System.out.println("future02");
            return  "商品属性:黑色256";
        }, executor);
        CompletableFuture<String> future03 = CompletableFuture.supplyAsync(() -> {
            System.out.println("future03");
            return "商品分类信息";
        }, executor);
        CompletableFuture allOf = CompletableFuture.allOf(future01,future02,future03);
        //CompletableFuture.anyOf(future01,future02,future03);
        allOf.get();
        System.out.println("end" + future01.get() + future02.get() + future03.get());

最佳实战

@Override
public SkuItemVo item(Long skuId) {
    SkuItemVo skuItemVo = new SkuItemVo();
    CompletableFuture<SkuInfoEntity> infoFuture = CompletableFuture.supplyAsync(() -> {
        //1、sku基本信息获取
        SkuInfoEntity skuInfoEntity = getById(skuId);
        skuItemVo.setInfo(skuInfoEntity);
        return skuInfoEntity;
    }, executor);
    CompletableFuture<Void> ImageFuture = infoFuture.thenRunAsync(() -> {
        //2、sku图片信息
        List<SkuImagesEntity> skuImagesEntities =
                skuImagesService.list(
                        new LambdaQueryWrapper<SkuImagesEntity>().eq(SkuImagesEntity::getSkuId, skuId));
        skuItemVo.setImages(skuImagesEntities);
    }, executor);
    CompletableFuture<Void> saleAttrFuture = infoFuture.thenAcceptAsync((skuInfoEntity) -> {
        Long spuId = skuInfoEntity.getSpuId();
        //3、获取spu的销售属性组合
        List<SkuItemVo.SkuItemSaleAttrVo> saleAttr = skuSaleAttrValueService.getSaleAttrBySpuId(spuId);
        skuItemVo.setSaleAttr(saleAttr);
    }, executor);
    CompletableFuture<Void> spuInfoDescEntityFuture = infoFuture.thenAcceptAsync((skuInfoEntity) -> {
        Long spuId = skuInfoEntity.getSpuId();
        //4、获取spu的介绍
        SpuInfoDescEntity spuInfoDescEntity = spuInfoDescService.getById(spuId);
        skuItemVo.setDesc(spuInfoDescEntity);
    });
    CompletableFuture<Void> voidCompletableFuture = infoFuture.thenAcceptAsync((skuInfoEntity) -> {
        Long spuId = skuInfoEntity.getSpuId();
        Long catalogId = skuInfoEntity.getCatalogId();
        //5、获取spu规格参数信息
        List<SkuItemVo.SpuItemAttrGroupVo> groupAttrs = attrGroupService.getGroupAttrsBySpuCataId(spuId, catalogId);
        skuItemVo.setGroupAttrs(groupAttrs);
    });
    //等所有任务都做完然后再返回
    CompletableFuture<Void> lastFuture =
            CompletableFuture.allOf(ImageFuture, saleAttrFuture, spuInfoDescEntityFuture, voidCompletableFuture);
    try {
        lastFuture.get();
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
        e.printStackTrace();
    }
    return skuItemVo;
}
相关文章
|
27天前
|
前端开发 JavaScript Java
使用Ajax进行异步交互:提升Java Web应用的用户体验
【4月更文挑战第3天】Ajax技术在Web开发中提升UX,通过与服务器异步交互实现页面局部更新,无需完整刷新。核心组件包括XMLHttpRequest、JavaScript、HTML/CSS及服务器端脚本。在Java Web应用中,可使用原生JavaScript或框架如jQuery、AngularJS实现Ajax请求。Ajax减少页面刷新,实现实时数据更新,即时表单验证和动态UI,显著改善用户体验,是现代Web开发不可或缺的一部分。
|
1月前
|
存储 安全 Java
【深度挖掘Java并发编程底层源码】「底层技术原理体系」带你零基础认识和分析学习相关的异步任务提交机制FutureTask的底层原理
【深度挖掘Java并发编程底层源码】「底层技术原理体系」带你零基础认识和分析学习相关的异步任务提交机制FutureTask的底层原理
14 0
|
2月前
|
Java 测试技术
|
3月前
|
NoSQL Java 关系型数据库
使用Kafka实现Java异步更新通知解决Redis与MySQL数据不一致
使用Kafka实现Java异步更新通知解决Redis与MySQL数据不一致
43 0
|
4月前
|
Java
Java异步、线程池解决方案
Java异步、线程池解决方案
45 0
|
5月前
|
Java 测试技术
Java8 异步非阻塞做法:CompletableFuture 两万字详解
CompletableFuture实现了CompletionStage接口和Future接口,前者是对后者的一个扩展,增加了异步回调、流式处理、多个Future组合处理的能力,使Java在处理多任务的协同工作时更加顺畅便利
|
7月前
|
消息中间件 Java UED
Java并发编程异步操作Future和FutureTask
生活是一个洗礼自己的过程,这个洗礼并不是传统意义上的洗礼,传统意义上的洗礼通常认为这个人的思想得到洗礼,灵魂得到洗礼,十分的清新脱俗,不世故,不圆滑,而现实的洗礼实则是让一个人褪去幼稚,褪去无知,让你变得点头哈腰,圆滑世故,我们都是动物,需要物质满足,更需要欲望填补,所以,变成自己小时候唾骂的对象也是可以理解,不过这是一个选择,你可以进行选择,只是在物欲横流的时代,多数人没有这种选择的权力!
57 0
|
8月前
|
Java 数据处理 Spring
异步操作轻松实现:探究 Java 中的 @Async 注解
在现代软件开发中,性能和并发性是至关重要的因素。而 Java 中的 `@Async` 注解则为开发人员提供了一种轻松的方式来实现异步操作,从而提升应用程序的性能和响应性。本文将带您深入探索 Java 中的 `@Async` 注解,揭示其作用、用法以及在实际开发中的应用场景。
Java 8 的异步利器:CompletableFuture源码解析(建议精读)
实现了俩接口,本身是个class。这个是Future的实现类,使用 completionStage 接口去支持完成时触发的函数和操作。
|
9月前
|
安全 Java API
Java NIO系列教程四【完】-管道-文件锁-异步写入
​ 到此位置NIO的所有的内容都结束了,对于NIO来说主要是各种概念需要大家去理解然后有很多的用法和api也需要大家去熟悉所以想把NIO学懂学好其实并不容易一定要多写案例去测试巩固,也预祝大家能把NIO的知识看懂理顺!!!
81 0