java中有界队列的饱和策略(reject policy)

简介: java中有界队列的饱和策略(reject policy)

文章目录



java中有界队列的饱和策略(reject policy)


我们在使用ExecutorService的时候知道,在ExecutorService中有个一个Queue来保存提交的任务,通过不同的构造函数,我们可以创建无界的队列

(ExecutorService.newCachedThreadPool)和有界的队列(ExecutorService newFixedThreadPool(int nThreads))。


无界队列很好理解,我们可以无限制的向ExecutorService提交任务。那么对于有界队列来说,如果队列满了该怎么处理呢?


今天我们要介绍一下java中ExecutorService的饱和策略(reject policy)。


以ExecutorService的具体实现ThreadPoolExecutor来说,它定义了4种饱和策略。分别是AbortPolicy,DiscardPolicy,DiscardOldestPolicy和CallerRunsPolicy。


如果要在ThreadPoolExecutor中设定饱和策略可以调用setRejectedExecutionHandler方法,如下所示:


ThreadPoolExecutor threadPoolExecutor= new ThreadPoolExecutor(5, 10, 10, TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>(20));
        threadPoolExecutor.setRejectedExecutionHandler(
                new ThreadPoolExecutor.AbortPolicy()
        );


上面的例子中我们定义了一个初始5个,最大10个工作线程的Thread Pool,并且定义其中的Queue的容量是20。如果提交的任务超出了容量,则会使用AbortPolicy策略。


AbortPolicy


AbortPolicy意思是如果队列满了,最新的提交任务将会被拒绝,并抛出RejectedExecutionException异常:


public static class AbortPolicy implements RejectedExecutionHandler {
        /**
         * Creates an {@code AbortPolicy}.
         */
        public AbortPolicy() { }
        /**
         * Always throws RejectedExecutionException.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         * @throws RejectedExecutionException always
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
    }


上面的代码中,rejectedExecution方法中我们直接抛出了RejectedExecutionException异常。


DiscardPolicy


DiscardPolicy将会悄悄的丢弃提交的任务,而不报任何异常。


public static class DiscardPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code DiscardPolicy}.
         */
        public DiscardPolicy() { }
        /**
         * Does nothing, which has the effect of discarding task r.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        }
    }


DiscardOldestPolicy


DiscardOldestPolicy将会丢弃最老的任务,保存最新插入的任务。


public static class DiscardOldestPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code DiscardOldestPolicy} for the given executor.
         */
        public DiscardOldestPolicy() { }
        /**
         * Obtains and ignores the next task that the executor
         * would otherwise execute, if one is immediately available,
         * and then retries execution of task r, unless the executor
         * is shut down, in which case task r is instead discarded.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                e.getQueue().poll();
                e.execute(r);
            }
        }
    }


我们看到在rejectedExecution方法中,poll了最老的一个任务,然后使用

ThreadPoolExecutor提交了一个最新的任务。


CallerRunsPolicy


CallerRunsPolicy和其他的几个策略不同,它既不会抛弃任务,也不会抛出异常,而是将任务回退给调用者,使用调用者的线程来执行任务,从而降低调用者的调用速度。我们看下是怎么实现的:


public static class CallerRunsPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code CallerRunsPolicy}.
         */
        public CallerRunsPolicy() { }
        /**
         * Executes task r in the caller's thread, unless the executor
         * has been shut down, in which case the task is discarded.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                r.run();
            }
        }
    }


在rejectedExecution方法中,直接调用了 r.run()方法,这会导致该方法直接在调用者的主线程中执行,而不是在线程池中执行。从而导致主线程在该任务执行结束之前不能提交任何任务。从而有效的阻止了任务的提交。


使用Semaphore


如果我们并没有定义饱和策略,那么有没有什么方法来控制任务的提交速度呢?考虑下之前我们讲到的Semaphore,我们可以指定一定的资源信号量来控制任务的提交,如下所示:


public class SemaphoreUsage {
    private final Executor executor;
    private final Semaphore semaphore;
    public SemaphoreUsage(Executor executor, int count) {
        this.executor = executor;
        this.semaphore = new Semaphore(count);
    }
    public void submitTask(final Runnable command) throws InterruptedException {
        semaphore.acquire();
        try {
            executor.execute(() -> {
                        try {
                            command.run();
                        } finally {
                            semaphore.release();
                        }
                    }
            );
        } catch (RejectedExecutionException e) {
            semaphore.release();
        }
    }
}


本文的例子可参考https://github.com/ddean2009/learn-java-concurrency/tree/master/rejectPolicy

相关文章
|
7天前
|
Java
java中的队列
这篇文章通过Java代码示例介绍了使用数组实现队列操作,包括队列的初始化、入队、出队、判断队列满和空以及遍历队列的方法。
java中的队列
|
12天前
|
Java
Java线程池如何执行?拒绝策略有哪些?
【8月更文挑战第8天】Java线程池如何执行?拒绝策略有哪些?
32 6
|
10天前
|
消息中间件 负载均衡 Java
"Kafka核心机制揭秘:深入探索Producer的高效数据发布策略与Java实战应用"
【8月更文挑战第10天】Apache Kafka作为顶级分布式流处理平台,其Producer组件是数据高效发布的引擎。Producer遵循高吞吐、低延迟等设计原则,采用分批发送、异步处理及数据压缩等技术提升性能。它支持按消息键值分区,确保数据有序并实现负载均衡;提供多种确认机制保证可靠性;具备失败重试功能确保消息最终送达。Java示例展示了基本配置与消息发送流程,体现了Producer的强大与灵活性。
32 3
|
7天前
|
安全 Java 网络安全
云计算时代下的网络安全挑战与应对策略Java编程中的异常处理:从基础到高级
在云服务不断深入各行各业的今天,网络安全问题也随之凸显。本文将探讨云计算环境下的安全风险,并提出相应的防护措施,以期为相关行业提供参考和指导。 在Java的世界里,异常处理是代码健壮性的守护神。它不仅保护程序免于意外崩溃,还提供了一种优雅的方式来响应错误。本文将带你领略异常处理的艺术,从简单的try-catch语句到复杂的自定义异常和finally块的神秘力量,我们将一起探索如何让Java程序在面对不确定性时,依然能够优雅地起舞。
|
1月前
|
Java Windows
Java演进问题之JVM在内存返还策略上会左右为难如何解决
Java演进问题之JVM在内存返还策略上会左右为难如何解决
|
1月前
|
监控 算法 Java
深入理解Java虚拟机:JVM调优的实用策略
在Java应用开发中,性能优化常常成为提升系统响应速度和处理能力的关键。本文将探讨Java虚拟机(JVM)调优的核心概念,包括垃圾回收、内存管理和编译器优化等方面,并提供一系列经过验证的调优技巧。通过这些实践指导,开发人员可以有效减少延迟,提高吞吐量,确保应用稳定运行。 【7月更文挑战第16天】
|
13天前
|
监控 算法 Java
Java中的垃圾回收机制及其优化策略
在Java编程世界中,垃圾回收(Garbage Collection, GC)是一块基石,它保证了内存管理的自动化与高效性。本文将探讨JVM的垃圾回收机制,分析其工作原理,并介绍如何通过不同的配置和实践来优化垃圾回收过程,以提升Java应用的性能。
|
1月前
|
设计模式 安全 Java
Java面试题:请解释Java中的线程池以及为什么要使用线程池?请解释Java中的内存模型以及如何避免内存泄漏?请解释Java中的并发工具包以及如何实现一个简单的线程安全队列?
Java面试题:请解释Java中的线程池以及为什么要使用线程池?请解释Java中的内存模型以及如何避免内存泄漏?请解释Java中的并发工具包以及如何实现一个简单的线程安全队列?
33 1
|
30天前
|
Java 编译器 C++
Java演进问题之HotSpot JVM中的分层编译策略如何解决
Java演进问题之HotSpot JVM中的分层编译策略如何解决
|
1月前
|
存储 Java 调度
线程操纵术并行策略问题之Java的并行编程优势问题如何解决
线程操纵术并行策略问题之Java的并行编程优势问题如何解决