JAVA线程&线程池&异步编排

简介: JAVA线程&线程池&异步编排


异步和线程池

初始化线程的方式

继承Thread

主线程

public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 继承Thread
        System.out.println("主方法1.。start。。。");
        Thread01 thread01 = new Thread01();
        thread01.start();
        System.out.println("主方法1.。end。。。");
    }
/**
 * 继承Thread 重新run方法
 */
public static class Thread01 extends Thread{
    @Override
    public void run() {
        System.out.println("当前线程:"+ Thread.currentThread().getId());
        System.out.println("i = " + 10 / 2);
    }
}

输出结果

主方法1.。start。。。
当前线程:12
i = 5
主方法1.。end。。。
实现Runnable接口

主线程

public static void main(String[] args) throws ExecutionException, InterruptedException {
        //实现Runnable接口的方式
        System.out.println("主方法1.。start。。。");
        new Thread(new Runnable01()).start();
        System.out.println("主方法1.。end。。。");
    }
/**
     * 实现Runnable接口的方式
     */
    public static class Runnable01 implements Runnable{
        @Override
        public void run() {
            System.out.println("当前线程:"+ Thread.currentThread().getId());
            System.out.println("i = " + 10 / 2);
        }
    }

结果

主方法1.。start。。。
当前线程:12
i = 5
主方法1.。end。。。
实现Callable接口 + FutureTask (可以拿到返回值,可以处理异常)
public static void main(String[] args) throws ExecutionException, InterruptedException {
        System.out.println("主方法1.。start。。。");
        FutureTask futureTask = new FutureTask<Integer>(new Callable01());
        new Thread(futureTask).start();
        //阻塞等待整个线程完成,获取返回结果
        Integer result = (Integer) futureTask.get();
        System.out.println("异步Callable执行后的返回结果是:"+ result);
        System.out.println("主方法1.。end。。。");
    }
/**
     * 实现Callable接口
     */
    public static class Callable01 implements Callable<Integer> {
        @Override
        public Integer call() throws Exception {
            System.out.println("当前线程:"+ Thread.currentThread().getId());
            System.out.println("i = " + 10 / 2);
            return 10 / 2;
        }
    }

输出

主方法1.。start。。。
当前线程:12
i = 5
异步Callable执行后的返回结果是:5
主方法1.。end。。。
线程池

一般在业务中以上三个方法都不推荐使用,一般是给线程池提交任务就可以了。

主方法

public static ExecutorService executor = Executors.newFixedThreadPool(10);
public static void main(String[] args) throws ExecutionException, InterruptedException {
        //线程池的使用
        System.out.println("主方法1.。start。。。");
        ExecutorService service = Executors.newFixedThreadPool(10);
        Future<?> submit = service.submit(new Runnable01());
        submit.get();
        System.out.println("主方法1.。end。。。");
    }

可以向线程池中直接放入Runnable 或者Callable的任务即可

详解
/**
 * 线程池七大参数:
 *  corePoolSize 初始化的线程数量,等待接收异步请求
 *  maximumPoolSize 最大线程数量,可以控制资源
 *  keepAliveTime  存活时间,如果当前线程数量大于核心数量,那么就会释放空闲的线程,只要线程空闲大于执行的空闲时间
 *  unit  执行线程存活的时间单位
 *  BlockingQueue<Runnable> workQueue 阻塞队列,如果任务有很多,就会将目前多的任务放在队列中,只要有线程空闲
 *  就会去队列中取出新的任务继续执行
 *  threadFactory 线程的创建工厂
 *  RejectedExecutionHandler handler 如果队列满了,按照我们的策略拒绝执行的任务
 *
 *  工作顺序:
 *  1)当线程进来后会直接分配给核心线程中,也就是corePoolSize初始化的线程数
 *  2)当corePoolSize满了以后,默认将线程放到workQueue中等待核心线程调用
 *  3)如果workQueue满了就会再启动新的线程,但是最大数量不能超过maximumPoolSize
 *  4)maximumPoolSize满了如果还有新的线程任务就会调用 handler丢弃策略,
 *  5)如果maximumPoolSize中任务都执行完毕,那么就会等待keepAliveTime指定的空闲
 *  时间后进行销毁。
 *
 */
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
        10,
        100000,
        3,
        TimeUnit.MINUTES,
        new LinkedBlockingDeque<>(100000), //默认是Integer的最大值所以需要修改
        Executors.defaultThreadFactory(),
        new ThreadPoolExecutor.AbortPolicy()
);
threadPoolExecutor.execute(new Runnable01());
总结

在实际的开发过程中要使用线程池进行异步任务的完成,因为用Thread Runnable Callable这些都不能做到对线程的管理,很容易就导致资源浪费和消耗。

使用线程池的好处:

  • 降低资源的消耗
  • 提高响应速度
  • 提高线程的可管理性

CompletableFuture异步编排

需求场景:面对复杂关系的异步任务进行编排处理

使用

CompletableFuture.runAsync() 这个异步方法没有返回值,所以给的是Void类型

CompletableFuture<Void> future = CompletableFuture.runAsync(()->{
            System.out.println("当前线程:"+ Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("i = " + i);
        },executor);

CompletableFuture.supplyAsync()这个异步方法有返回值,返回值是Integer因此可以调用get方法,等异步任务执行结束后获取到返回值即可。

CompletableFuture<Integer> future = CompletableFuture.supplyAsync(()->{
            System.out.println("当前线程:"+ Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("i = " + i);
            return i;
        },executor);
Integer i = future.get();

计算完成时使用函数回调whenComplete

还可以使用链式编程继续在后面追加方法

public class ThreadTest01 {
    //创建一个线程池
    public static ExecutorService executor = Executors.newFixedThreadPool(10);
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        System.out.println("主方法1.。start。。。");
        //调用有返回值的异步
            CompletableFuture<Integer> future = CompletableFuture. (()->{
            System.out.println("当前线程:"+ Thread.currentThread().getId());
            int i = 10 / 0;
            System.out.println("i = " + i);
            return i;
        },executor).whenComplete( (res,exc) -> {
                System.out.println("异步线程执行完毕返回的结果是:"+ res + ";抛出的异常是:" + exc);
            }).exceptionally( (exc) -> {
                //如果发生异常那么exceptionally可以直接得到异常信息还可以返回一个指定的异常信息
                // 这个返回的异常信息可以让future.get();这里拿到
                return 404;
            });
        // get一方面可以等待异步线程完成后再执行其他业务,还可以获取异步任务的返回结果
        Integer i = future.get();
        System.out.println("主方法1.。end。。。"+ i);
    }

计算完成时使用函数回调handle,handle和handleAsync可以对异步返回结果和异常进行处理

CompletableFuture<Integer> future = CompletableFuture.supplyAsync(()->{
    System.out.println("当前线程:"+ Thread.currentThread().getId());
    int i = 10 / 0;
    System.out.println("i = " + i);
    return i;
},executor).handle( (res,exc) -> {
    if (res != null){
        return res*2;
    }
    if (exc !=null){
        return 404;
    }
    return 0;
});
// get一方面可以等待异步线程完成后再执行其他业务,还可以获取异步任务的返回结果
Integer i = future.get();
System.out.println("主方法1.。end。。。"+ i);

输出

主方法1.。start。。。
当前线程:12
主方法1.。end。。。404
线程串行化方法

thenApply方法,当一个线程依赖另一个线程的时候,获取上一个任务的返回值并返回当前任务的返回值。

thenAccepte 消费上一个线程的处理结果,无返回结果

thenRun 当上一个线程处理完成后,执行thenRun的后续操作

带有Async都是异步的

获取返回值使用future.get()

两任务组合

thenCombine 两个任务都执行,第三个任务可以获取上两个任务的返回值,并处理并返回新的结果

thenAccpteBoth 两个任务都执行,并且第三个任务可以获取上两个任务的返回值,但是不能返回新的处理结果

thenRunBoth 两个任务执行后第三个任务开始执行,并且只能传入Runnable不能返回结果

runAfterEither 两个任务中任何一个任务执行完成后都立即执行第三个任务,因为第三个任务传入的是Runnable接口所以不能有返回值

acceptEither 可以接收两个任务中率先完成的任务的返回值并处理,但是第三个任务不能有返回值

applyToEither 可以接收两个任务中率先完成的任务的返回值并处理,第三个任务处理后可以有返回值

多任务组合

allOf 当所有的异步任务都执行完毕后才能继续往下执行 (消耗最少时间:用时最长的那个任务)

anyOf 当任何一个异步任务执行完毕后就会继续往下执行(消耗最少时间:用时最快的那个任务)

CompletableFuture<String> future01 = CompletableFuture.supplyAsync(() -> {
            System.out.println("future01");
            return "future01.jpg";
        }, executor);
        CompletableFuture<String> future02 = CompletableFuture.supplyAsync(() -> {
            System.out.println("future02");
            return  "商品属性:黑色256";
        }, executor);
        CompletableFuture<String> future03 = CompletableFuture.supplyAsync(() -> {
            System.out.println("future03");
            return "商品分类信息";
        }, executor);
        CompletableFuture allOf = CompletableFuture.allOf(future01,future02,future03);
        //CompletableFuture.anyOf(future01,future02,future03);
        allOf.get();
        System.out.println("end" + future01.get() + future02.get() + future03.get());

最佳实战

@Override
public SkuItemVo item(Long skuId) {
    SkuItemVo skuItemVo = new SkuItemVo();
    CompletableFuture<SkuInfoEntity> infoFuture = CompletableFuture.supplyAsync(() -> {
        //1、sku基本信息获取
        SkuInfoEntity skuInfoEntity = getById(skuId);
        skuItemVo.setInfo(skuInfoEntity);
        return skuInfoEntity;
    }, executor);
    CompletableFuture<Void> ImageFuture = infoFuture.thenRunAsync(() -> {
        //2、sku图片信息
        List<SkuImagesEntity> skuImagesEntities =
                skuImagesService.list(
                        new LambdaQueryWrapper<SkuImagesEntity>().eq(SkuImagesEntity::getSkuId, skuId));
        skuItemVo.setImages(skuImagesEntities);
    }, executor);
    CompletableFuture<Void> saleAttrFuture = infoFuture.thenAcceptAsync((skuInfoEntity) -> {
        Long spuId = skuInfoEntity.getSpuId();
        //3、获取spu的销售属性组合
        List<SkuItemVo.SkuItemSaleAttrVo> saleAttr = skuSaleAttrValueService.getSaleAttrBySpuId(spuId);
        skuItemVo.setSaleAttr(saleAttr);
    }, executor);
    CompletableFuture<Void> spuInfoDescEntityFuture = infoFuture.thenAcceptAsync((skuInfoEntity) -> {
        Long spuId = skuInfoEntity.getSpuId();
        //4、获取spu的介绍
        SpuInfoDescEntity spuInfoDescEntity = spuInfoDescService.getById(spuId);
        skuItemVo.setDesc(spuInfoDescEntity);
    });
    CompletableFuture<Void> voidCompletableFuture = infoFuture.thenAcceptAsync((skuInfoEntity) -> {
        Long spuId = skuInfoEntity.getSpuId();
        Long catalogId = skuInfoEntity.getCatalogId();
        //5、获取spu规格参数信息
        List<SkuItemVo.SpuItemAttrGroupVo> groupAttrs = attrGroupService.getGroupAttrsBySpuCataId(spuId, catalogId);
        skuItemVo.setGroupAttrs(groupAttrs);
    });
    //等所有任务都做完然后再返回
    CompletableFuture<Void> lastFuture =
            CompletableFuture.allOf(ImageFuture, saleAttrFuture, spuInfoDescEntityFuture, voidCompletableFuture);
    try {
        lastFuture.get();
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
        e.printStackTrace();
    }
    return skuItemVo;
}
相关文章
|
2天前
|
Java
Java一分钟之-并发编程:线程间通信(Phaser, CyclicBarrier, Semaphore)
【5月更文挑战第19天】Java并发编程中,Phaser、CyclicBarrier和Semaphore是三种强大的同步工具。Phaser用于阶段性任务协调,支持动态注册;CyclicBarrier允许线程同步执行,适合循环任务;Semaphore控制资源访问线程数,常用于限流和资源池管理。了解其使用场景、常见问题及避免策略,结合代码示例,能有效提升并发程序效率。注意异常处理和资源管理,以防止并发问题。
25 2
|
2天前
|
安全 Java 容器
Java一分钟之-并发编程:线程安全的集合类
【5月更文挑战第19天】Java提供线程安全集合类以解决并发环境中的数据一致性问题。例如,Vector是线程安全但效率低;可以使用Collections.synchronizedXxx将ArrayList或HashMap同步;ConcurrentHashMap是高效线程安全的映射;CopyOnWriteArrayList和CopyOnWriteArraySet适合读多写少场景;LinkedBlockingQueue是生产者-消费者模型中的线程安全队列。注意,过度同步可能影响性能,应尽量减少共享状态并利用并发工具类。
17 2
|
2天前
|
Java 程序员 调度
Java中的多线程编程:基础知识与实践
【5月更文挑战第19天】多线程编程是Java中的一个重要概念,它允许程序员在同一时间执行多个任务。本文将介绍Java多线程的基础知识,包括线程的创建、启动和管理,以及如何通过多线程提高程序的性能和响应性。
|
3天前
|
Java
深入理解Java并发编程:线程池的应用与优化
【5月更文挑战第18天】本文将深入探讨Java并发编程中的重要概念——线程池。我们将了解线程池的基本概念,应用场景,以及如何优化线程池的性能。通过实例分析,我们将看到线程池如何提高系统性能,减少资源消耗,并提高系统的响应速度。
13 5
|
3天前
|
消息中间件 安全 Java
理解Java中的多线程编程
【5月更文挑战第18天】本文介绍了Java中的多线程编程,包括线程和多线程的基本概念。Java通过继承Thread类或实现Runnable接口来创建线程,此外还支持使用线程池(如ExecutorService和Executors)进行更高效的管理。多线程编程需要注意线程安全、性能优化和线程间通信,以避免数据竞争、死锁等问题,并确保程序高效运行。
|
3天前
|
存储 Java
【Java】实现一个简单的线程池
,如果被消耗完了就说明在规定时间内获取不到任务,直接return结束线程。
11 0
|
3天前
|
安全 Java 容器
深入理解Java并发编程:线程安全与性能优化
【5月更文挑战第18天】随着多核处理器的普及,并发编程变得越来越重要。Java提供了丰富的并发编程工具,如synchronized关键字、显式锁Lock、原子类、并发容器等。本文将深入探讨Java并发编程的核心概念,包括线程安全、死锁、资源竞争等,并分享一些性能优化的技巧。
|
3天前
|
安全 Java 开发者
Java中的多线程编程:理解与实践
【5月更文挑战第18天】在现代软件开发中,多线程编程是提高程序性能和响应速度的重要手段。Java作为一种广泛使用的编程语言,其内置的多线程支持使得开发者能够轻松地实现并行处理。本文将深入探讨Java多线程的基本概念、实现方式以及常见的并发问题,并通过实例代码演示如何高效地使用多线程技术。通过阅读本文,读者将对Java多线程编程有一个全面的认识,并能够在实际开发中灵活运用。
|
4天前
|
Python
|
6天前
|
Java 数据库
【Java多线程】对线程池的理解并模拟实现线程池
【Java多线程】对线程池的理解并模拟实现线程池
17 1