java中ThreadPool的介绍和使用

简介: java中ThreadPool的介绍和使用

文章目录



java中ThreadPool的介绍和使用


Thread Pool简介


在Java中,threads是和系统的threads相对应的,用来处理一系列的系统资源。不管在windows和linux下面,能开启的线程个数都是有限的,如果你在java程序中无限制的创建thread,那么将会遇到无线程可创建的情况。


CPU的核数是有限的,如果同时有多个线程正在运行中,那么CPU将会根据线程的优先级进行轮循,给每个线程分配特定的CPU时间。所以线程也不是越多越好。


在java中,代表管理ThreadPool的接口有两个:ExecutorService和Executor。


我们运行线程的步骤一般是这样的:1. 创建一个ExecutorService。 2.将任务提交给ExecutorService。3.ExecutorService调度线程来运行任务。


画个图来表示:


image.png


下面我讲一下,怎么在java中使用ThreadPool。


Executors, Executor 和 ExecutorService


Executors 提供了一系列简便的方法,来帮助我们创建ThreadPool。


Executor接口定义了一个方法:


public interface Executor {
    /**
     * Executes the given command at some time in the future.  The command
     * may execute in a new thread, in a pooled thread, or in the calling
     * thread, at the discretion of the {@code Executor} implementation.
     *
     * @param command the runnable task
     * @throws RejectedExecutionException if this task cannot be
     * accepted for execution
     * @throws NullPointerException if command is null
     */
    void execute(Runnable command);
}


ExecutorService继承了Executor,提供了更多的线程池的操作。是对Executor的补充。


根据接口实现分离的原则,我们通常在java代码中使用ExecutorService或者Executor,而不是具体的实现类。


我们看下怎么通过Executors来创建一个Executor和ExecutorService:


Executor executor = Executors.newSingleThreadExecutor();
        executor.execute(() -> log.info("in Executor"));
        ExecutorService executorService= Executors.newCachedThreadPool();
        executorService.submit(()->log.info("in ExecutorService"));
        executorService.shutdown();


关于ExecutorService的细节,我们这里就多讲了,感兴趣的朋友可以参考之前我写的ExecutorService的详细文章。


ThreadPoolExecutor


ThreadPoolExecutor是ExecutorService接口的一个实现,它可以为线程池添加更加精细的配置,具体而言它可以控制这三个参数:corePoolSize, maximumPoolSize, 和 keepAliveTime。


PoolSize就是线程池里面的线程个数,corePoolSize表示的是线程池里面初始化和保持的最小的线程个数。


如果当前等待线程太多,可以设置maximumPoolSize来提供最大的线程池个数,从而线程池会创建更多的线程以供任务执行。


keepAliveTime是多余的线程未分配任务将会等待的时间。超出该时间,线程将会被线程池回收。


我们看下怎么创建一个ThreadPoolExecutor:


ThreadPoolExecutor threadPoolExecutor =
                new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
                        new LinkedBlockingQueue<Runnable>());
        threadPoolExecutor.submit(()->log.info("submit through threadPoolExecutor"));
        threadPoolExecutor.shutdown();


上面的例子中我们通过ThreadPoolExecutor的构造函数来创建ThreadPoolExecutor。


通常来说Executors已经内置了ThreadPoolExecutor的很多实现,我们来看下面的例子:


ThreadPoolExecutor executor1 =
                (ThreadPoolExecutor) Executors.newFixedThreadPool(2);
        executor1.submit(() -> {
            Thread.sleep(1000);
            return null;
        });
        executor1.submit(() -> {
            Thread.sleep(1000);
            return null;
        });
        executor1.submit(() -> {
            Thread.sleep(1000);
            return null;
        });
        log.info("executor1 poolsize {}",executor1.getPoolSize());
        log.info("executor1 queuesize {}", executor1.getQueue().size());
        executor1.shutdown();


上的例子中我们Executors.newFixedThreadPool(2)来创建一个ThreadPoolExecutor。


上面的例子中我们提交了3个task。但是我们pool size只有2。所以还有一个1个不能立刻被执行,需要在queue中等待。


我们再看一个例子:


ThreadPoolExecutor executor2 =
                (ThreadPoolExecutor) Executors.newCachedThreadPool();
        executor2.submit(() -> {
            Thread.sleep(1000);
            return null;
        });
        executor2.submit(() -> {
            Thread.sleep(1000);
            return null;
        });
        executor2.submit(() -> {
            Thread.sleep(1000);
            return null;
        });
        log.info("executor2 poolsize {}", executor2.getPoolSize());
        log.info("executor2 queue size {}", executor2.getQueue().size());
        executor2.shutdown();


上面的例子中我们使用Executors.newCachedThreadPool()来创建一个ThreadPoolExecutor。 运行之后我们可以看到poolsize是3,而queue size是0。这表明newCachedThreadPool会自动增加pool size。


如果thread在60秒钟之类没有被激活,则会被收回。


这里的Queue是一个SynchronousQueue,因为插入和取出基本上是同时进行的,所以这里的queue size基本都是0.


ScheduledThreadPoolExecutor


还有个很常用的ScheduledThreadPoolExecutor,它继承自ThreadPoolExecutor, 并且实现了ScheduledExecutorService接口。


public class ScheduledThreadPoolExecutor
        extends ThreadPoolExecutor
        implements ScheduledExecutorService


我们看下怎么使用:


ScheduledExecutorService executor = Executors.newScheduledThreadPool(5);
        executor.schedule(() -> {
            log.info("Hello World");
        }, 500, TimeUnit.MILLISECONDS);


上面的例子中,我们定义了一个定时任务将会在500毫秒之后执行。


之前我们也讲到了ScheduledExecutorService还有两个非常常用的方法:


  • scheduleAtFixedRate - 以开始时间为间隔。
  • scheduleWithFixedDelay - 以结束时间为间隔。


CountDownLatch lock = new CountDownLatch(3);
        ScheduledExecutorService executor2 = Executors.newScheduledThreadPool(5);
        ScheduledFuture<?> future = executor2.scheduleAtFixedRate(() -> {
            log.info("in ScheduledFuture");
            lock.countDown();
        }, 500, 100, TimeUnit.MILLISECONDS);
        lock.await(1000, TimeUnit.MILLISECONDS);
        future.cancel(true);


ForkJoinPool


ForkJoinPool是在java 7 中引入的新框架,我们将会在后面的文章中详细讲解。 这里做个简单的介绍。


ForkJoinPool主要用来生成大量的任务来做算法运算。如果用线程来做的话,会消耗大量的线程。但是在fork/join框架中就不会出现这个问题。


在fork/join中,任何task都可以生成大量的子task,然后通过使用join()等待子task结束。


这里我们举一个例子:


static class TreeNode {
    int value;
    Set<TreeNode> children;
    TreeNode(int value, TreeNode... children) {
        this.value = value;
        this.children = Sets.newHashSet(children);
    }
}


定义一个TreeNode,然后遍历所有的value,将其加起来:


public  class CountingTask extends RecursiveTask<Integer> {
    private final TreeNode node;
    public CountingTask(TreeNode node) {
        this.node = node;
    }
    @Override
    protected Integer compute() {
        return node.value + node.children.stream()
                .map(childNode -> new CountingTask(childNode).fork()).mapToInt(ForkJoinTask::join).sum();
    }
}


下面是调用的代码:


public static void main(String[] args) {
        TreeNode tree = new TreeNode(5,
                new TreeNode(3), new TreeNode(2,
                new TreeNode(2), new TreeNode(8)));
        ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
        int sum = forkJoinPool.invoke(new CountingTask(tree));
    }
相关文章
|
4月前
|
Java 调度
Java多线程:什么是线程池(ThreadPool)?
Java多线程:什么是线程池(ThreadPool)?
48 0
|
安全 Java
java安全编码指南之:ThreadPool的使用
java安全编码指南之:ThreadPool的使用
|
Java Linux
Java ThreadPool 实现
前面我们已经介绍了 JDK 中常用的并发库(JUC)的使用方式, 本文我们着重介绍 JUC 中 ThreadPool 的实现方式。
|
Java 开发者
Java常见面试题:ThreadLocal和ThreadPool原理以及应用场景
线程池的概念在一些公司的笔试里面经常会出现,大家只需要清楚无限制增长、定长执行、定时操作、单线程池这四种线程池的操作就可以了。
Java常见面试题:ThreadLocal和ThreadPool原理以及应用场景
|
3天前
|
安全 Java 调度
Java线程:深入理解与实战应用
Java线程:深入理解与实战应用
20 0
|
21小时前
|
消息中间件 缓存 NoSQL
Java多线程实战-CompletableFuture异步编程优化查询接口响应速度
Java多线程实战-CompletableFuture异步编程优化查询接口响应速度
|
23小时前
|
数据采集 存储 Java
高德地图爬虫实践:Java多线程并发处理策略
高德地图爬虫实践:Java多线程并发处理策略
|
1天前
|
缓存 Java
【Java基础】简说多线程(上)
【Java基础】简说多线程(上)
5 0
|
2天前
|
并行计算 算法 安全
Java从入门到精通:2.1.3深入学习Java核心技术——掌握Java多线程编程
Java从入门到精通:2.1.3深入学习Java核心技术——掌握Java多线程编程
|
2天前
|
安全 Java 编译器
是时候来唠一唠synchronized关键字了,Java多线程的必问考点!
本文简要介绍了Java中的`synchronized`关键字,它是用于保证多线程环境下的同步,解决原子性、可见性和顺序性问题。从JDK1.6开始,synchronized进行了优化,性能得到提升,现在仍可在项目中使用。synchronized有三种用法:修饰实例方法、静态方法和代码块。文章还讨论了synchronized修饰代码块的锁对象、静态与非静态方法调用的互斥性,以及构造方法不能被同步修饰。此外,通过反汇编展示了`synchronized`在方法和代码块上的底层实现,涉及ObjectMonitor和monitorenter/monitorexit指令。
15 0