优雅代码】04-1行代码完成多线程,别再写runnable了

简介: java8提供的CompletableFuture以及匿名函数可以让我们一行代码完成多线程

【优雅代码】04-1行代码完成多线程,别再写runnable了

欢迎关注b站账号/公众号【六边形战士夏宁】,一个要把各项指标拉满的男人。该文章已在 github目录收录。
屏幕前的 大帅比大漂亮如果有帮助到你的话请顺手点个赞、加个收藏这对我真的很重要。别下次一定了,都不关注上哪下次一定。

1.背景介绍

java8提供的CompletableFuture以及匿名函数可以让我们一行代码完成多线程

2.建立相关类

2.1.ThreadEntity

用于多线程测试的实体类

public class ThreadEntity {
    private int num;
    private int price;
    public int countPrice(){
        price = RandomUtils.nextInt();
        try {
            System.out.println(num);
            // 随机等待1~10秒
            Thread.sleep(RandomUtils.nextInt(1, 10) * 1000);
            System.out.println(num);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return price;
    }
}

2.2.ThreadPoolManager

/**
 * tasks 每秒的任务数,默认200,依据访问量及使用线程池的地方进行计算
 * taskCost:每个任务花费时间,默认0.1s
 * responseTime:最大响应时间,默认为1s,一般用户最大忍受时间为3秒
 *
 * @author seal email:876651109@qq.com
 * @date 2020/5/30 10:08 AM
 */
@Data
@Slf4j
@Configuration
public class ThreadPoolManager {
    /**
     * 平均响应时间默认2秒
     */
    private static final float ALL_COST_AVG = 2F;
    /**
     * 平均IO时间默认1.5秒
     */
    private static final float IO_COST_AVG = 1.5F;
    /**
     * 服务器核数
     */
    private static final int SIZE_PROCESSOR = Runtime.getRuntime().availableProcessors();
    /**
     * https://www.cnblogs.com/dennyzhangdd/p/6909771.html?utm_source=itdadao&utm_medium=referral
     * 阻塞系数=阻塞时间/(阻塞时间+计算时间)
     * 线程数=核心数/(1-阻塞系数)
     * 等同于CPU核心数*cpu使用率*(1+等待时间与计算时间的比率)
     * N+1通常为最优效率
     * <p>
     * https://blog.51cto.com/13527416/2056080
     */
    private static final int SIZE_CORE_POOL = SIZE_PROCESSOR + 1;

    /**
     * 线程池维护最大数量,默认会与核心线程数一致无意义,保守情况取2cpu
     * 或者使用简单计算 线程池大小 = ((线程 IO time + 线程 CPU time )/线程 CPU time ) CPU数目**
     * 请求所消耗的时间 /(请求所消耗的时间-DB处理)*CPU数目,重点在于cpu等待时间,通常为数据库DB时间
     * 按照通常2秒展示界面,数据库运算1.5秒则(2/0.5)*n,其实就是优化等待时间
     * <p>
     * 默认4n即8核32线程
     */
    private static final int SIZE_MAX_POOL = (int) (SIZE_PROCESSOR * (ALL_COST_AVG / (ALL_COST_AVG - IO_COST_AVG)));
    /**
     * 线程池队列长度,默认为integer最大值,Dubbo使用1000,无限大会引起用户用户的任务一直排队,应选择适当性丢弃,
     * 可忍受时间6其它的则抛弃
     * SIZE_MAX_POOL/IO_COST_AVG=每秒可处理任务数,默认为
     * 可忍受时间6*每秒可处理任务数=X队列数
     */
    private static final int SIZE_QUEUE = (int) (6 * (SIZE_MAX_POOL / IO_COST_AVG));
    /**
     * 线程池具体类
     * LinkedBlockingDeque常用于固定线程,SynchronousQueue常用于cache线程池
     * Executors.newCachedThreadPool()常用于短期任务
     * <p>
     * 线程工厂选择,区别不大
     * 有spring的CustomizableThreadFactory,new CustomizableThreadFactory("springThread-pool-")
     * guava的ThreadFactoryBuilder,new ThreadFactoryBuilder().setNameFormat("retryClient-pool-").build();
     * apache-lang的BasicThreadFactory,new BasicThreadFactory.Builder().namingPattern("basicThreadFactory-").build()
     * <p>
     * 队列满了的策略默认AbortPolicy
     */
    private static ThreadPoolManager threadPoolManager = new ThreadPoolManager();

    private final ThreadPoolExecutor pool = new ThreadPoolExecutor(
            SIZE_CORE_POOL,
            SIZE_MAX_POOL,
            30L, TimeUnit.SECONDS, new LinkedBlockingDeque<>(SIZE_QUEUE),
            new CustomizableThreadFactory("springThread-pool-"),
            new ThreadPoolExecutor.AbortPolicy()
    );

    private void prepare() {
        if (pool.isShutdown() && !pool.prestartCoreThread()) {
            int coreSize = pool.prestartAllCoreThreads();
            System.out.println("当前线程池");
        }
    }


    public static ThreadPoolManager getInstance() {
        if (threadPoolManager != null) {
            ThreadPoolExecutor pool = threadPoolManager.pool;
        }
        return threadPoolManager;
    }
}

3.核心代码

3.1.并行流

parallel是并行核心可以发现内部是多线程运行,但是经过collect以后会排好序所以不用担心,小项目可以使用,大项目的话建议老老实实用自己的线程池,JDK自带的fork/join并不贴合业务

System.out.println(Stream.of(1, 2, 3, 4, 5, 6).parallel().map(l -> {
    System.out.println(l);
    return l;
}).collect(Collectors.toList()));

输出如下,因为多线程所以随机输出,但因为使用collect收集则最终结果并未发生改变

2
6
4
5
3
1
[1, 2, 3, 4, 5, 6]

3.2.同步代码

这个可以不用再去实现线程的接口,不过还是要考虑一下队列满了的丢弃情况

List<ThreadEntity> listEntity = IntStream.range(0, 10).mapToObj(x -> ThreadEntity.builder().num(x).build()).collect(Collectors.toList());
List<CompletableFuture<Integer>> listCompletableFuture = listEntity.stream().map(x -> {
    try {
        // 此处ThreadPoolManager.getInstance().getPool()如果不传该参数则使用默认commonPool,无特殊需求的话trycatch一般不写
        return CompletableFuture.supplyAsync(() -> x.countPrice(),
                ThreadPoolManager.getInstance().getPool());
    } catch (RejectedExecutionException e) {
        System.out.println("reject" + x);
        log.error("", e);
        return null;
    }
}).collect(Collectors.toList());
List<Integer> result = listCompletableFuture.stream().map(CompletableFuture::join).collect(Collectors.toList());
System.out.println(result);
System.out.println(listEntity);

输出如下可以看到运行是以多线程的方式进行,但是结果和原先是保持一致的

start6
start9
start0
start3
start2
start1
start8
start5
start4
start7
end3
end8
end5
end7
end9
end1
end2
end6
end0
end4
[131523688, 1491605535, 222657954, 132274662, 1134597171, 2057763841, 1168687436, 1842194861, 1264173480, 56446450]
[ThreadEntity(super=com.example.demo.lesson.grace.thread.ThreadEntity@7d6f201, num=0, price=131523688), ThreadEntity(super=com.example.demo.lesson.grace.thread.ThreadEntity@58e825f3, num=1, price=1491605535), ThreadEntity(super=com.example.demo.lesson.grace.thread.ThreadEntity@d458bb1, num=2, price=222657954), ThreadEntity(super=com.example.demo.lesson.grace.thread.ThreadEntity@7e26830, num=3, price=132274662), ThreadEntity(super=com.example.demo.lesson.grace.thread.ThreadEntity@43a0a2b8, num=4, price=1134597171), ThreadEntity(super=com.example.demo.lesson.grace.thread.ThreadEntity@7aa70ac1, num=5, price=2057763841), ThreadEntity(super=com.example.demo.lesson.grace.thread.ThreadEntity@45a8d047, num=6, price=1168687436), ThreadEntity(super=com.example.demo.lesson.grace.thread.ThreadEntity@6dcdb8e3, num=7, price=1842194861), ThreadEntity(super=com.example.demo.lesson.grace.thread.ThreadEntity@4b59d119, num=8, price=1264173480), ThreadEntity(super=com.example.demo.lesson.grace.thread.ThreadEntity@35d5d9e, num=9, price=56446450)]

如果IntStream.range(0, 10)改成(0, 1000)则会有如下拒绝报错

java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.CompletableFuture$AsyncSupply@5af97850 rejected from java.util.concurrent.ThreadPoolExecutor@491666ad[Running, pool size = 64, active threads = 64, queued tasks = 256, completed tasks = 0]
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
    at java.util.concurrent.CompletableFuture.asyncSupplyStage(CompletableFuture.java:1618)
    at java.util.concurrent.CompletableFuture.supplyAsync(CompletableFuture.java:1843)
    at com.example.demo.lesson.grace.thread.TestMain.lambda$threadEx1$2(TestMain.java:34)
    at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
    at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
    at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
    at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
    at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
    at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
    at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
    at com.example.demo.lesson.grace.thread.TestMain.threadEx1(TestMain.java:41)
    at com.example.demo.lesson.grace.thread.TestMain.main(TestMain.java:26)
rejectThreadEntity(super=com.example.demo.lesson.grace.thread.ThreadEntity@1a9, num=366)

3.3.异步代码

以下代码可以直接简写成一行,在处理异步任务变得异常方便
CompletableFuture.runAsync(() -> fun())

List<ThreadEntity> listEntity = IntStream.range(0, 500).mapToObj(x -> ThreadEntity.builder().num(x).build()).collect(Collectors.toList());
List<CompletableFuture> listCompletableFuture = listEntity.stream().map(x -> {
    try {
        // 此处ThreadPoolManager.getInstance().getPool()如果不传该参数则使用默认commonPool,无特殊需求的话trycatch一般不写
        return CompletableFuture.runAsync(() -> x.countPrice(), ThreadPoolManager.getInstance().getPool());
    } catch (RejectedExecutionException e) {
        System.out.println("reject" + x);
        return null;
    }
}).collect(Collectors.toList());
listCompletableFuture.stream().map(CompletableFuture::join);
System.out.println("1234");
// 一行多线程异步执行写法
CompletableFuture.runAsync(() -> System.out.println(1));

输出如下,可以看到主线程已经结束了其它子线程才在输出,完全没有等待的多线程

1234
1
start7
start0
start6
start5
start4
start2
start8
start1
start9
start3
end8
end4
end9
end6
end2
end0
end1
end3
end5
end7
相关文章
|
3月前
|
Java 开发者
奇迹时刻!探索 Java 多线程的奇幻之旅:Thread 类和 Runnable 接口的惊人对决
【8月更文挑战第13天】Java的多线程特性能显著提升程序性能与响应性。本文通过示例代码详细解析了两种核心实现方式:Thread类与Runnable接口。Thread类适用于简单场景,直接定义线程行为;Runnable接口则更适合复杂的项目结构,尤其在需要继承其他类时,能保持代码的清晰与模块化。理解两者差异有助于开发者在实际应用中做出合理选择,构建高效稳定的多线程程序。
58 7
|
4月前
|
安全 Python
告别低效编程!Python线程与进程并发技术详解,让你的代码飞起来!
【7月更文挑战第9天】Python并发编程提升效率:**理解并发与并行,线程借助`threading`模块处理IO密集型任务,受限于GIL;进程用`multiprocessing`实现并行,绕过GIL限制。示例展示线程和进程创建及同步。选择合适模型,注意线程安全,利用多核,优化性能,实现高效并发编程。
74 3
|
20天前
|
安全 Java
在 Java 中使用实现 Runnable 接口的方式创建线程
【10月更文挑战第22天】通过以上内容的介绍,相信你已经对在 Java 中如何使用实现 Runnable 接口的方式创建线程有了更深入的了解。在实际应用中,需要根据具体的需求和场景,合理选择线程创建方式,并注意线程安全、同步、通信等相关问题,以确保程序的正确性和稳定性。
|
23天前
|
Java 开发者
在Java多线程编程中,创建线程的方法有两种:继承Thread类和实现Runnable接口
【10月更文挑战第20天】在Java多线程编程中,创建线程的方法有两种:继承Thread类和实现Runnable接口。本文揭示了这两种方式的微妙差异和潜在陷阱,帮助你更好地理解和选择适合项目需求的线程创建方式。
18 3
|
23天前
|
Java
在Java多线程编程中,实现Runnable接口通常优于继承Thread类
【10月更文挑战第20天】在Java多线程编程中,实现Runnable接口通常优于继承Thread类。原因包括:1) Java只支持单继承,实现接口不受此限制;2) Runnable接口便于代码复用和线程池管理;3) 分离任务与线程,提高灵活性。因此,实现Runnable接口是更佳选择。
30 2
|
23天前
|
Java
Java中多线程编程的基本概念和创建线程的两种主要方式:继承Thread类和实现Runnable接口
【10月更文挑战第20天】《JAVA多线程深度解析:线程的创建之路》介绍了Java中多线程编程的基本概念和创建线程的两种主要方式:继承Thread类和实现Runnable接口。文章详细讲解了每种方式的实现方法、优缺点及适用场景,帮助读者更好地理解和掌握多线程编程技术,为复杂任务的高效处理奠定基础。
28 2
|
23天前
|
Java 开发者
Java多线程初学者指南:介绍通过继承Thread类与实现Runnable接口两种方式创建线程的方法及其优缺点
【10月更文挑战第20天】Java多线程初学者指南:介绍通过继承Thread类与实现Runnable接口两种方式创建线程的方法及其优缺点,重点解析为何实现Runnable接口更具灵活性、资源共享及易于管理的优势。
28 1
|
3月前
|
Java Windows
【Azure Developer】Windows中通过pslist命令查看到Java进程和线程信息,但为什么和代码中打印出来的进程号不一致呢?
【Azure Developer】Windows中通过pslist命令查看到Java进程和线程信息,但为什么和代码中打印出来的进程号不一致呢?
|
3月前
|
Java 开发者
解锁Java并发编程的秘密武器!揭秘AQS,让你的代码从此告别‘锁’事烦恼,多线程同步不再是梦!
【8月更文挑战第25天】AbstractQueuedSynchronizer(AQS)是Java并发包中的核心组件,作为多种同步工具类(如ReentrantLock和CountDownLatch等)的基础。AQS通过维护一个表示同步状态的`state`变量和一个FIFO线程等待队列,提供了一种高效灵活的同步机制。它支持独占式和共享式两种资源访问模式。内部使用CLH锁队列管理等待线程,当线程尝试获取已持有的锁时,会被放入队列并阻塞,直至锁被释放。AQS的巧妙设计极大地丰富了Java并发编程的能力。
43 0
|
5月前
|
Java
Runnable+CountDownLatch多线程的等待和通知
Runnable+CountDownLatch多线程的等待和通知