Java并发编程学习12-任务取消(上)

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: 【5月更文挑战第6天】本篇介绍了取消策略、线程中断、中断策略 和 响应中断的内容

java-concurrency-logo.png

引言

《任务取消》由于篇幅较多,拆分了两篇来介绍各种实现取消和中断的机制,以及如何编写任务和服务,使它们能对取消请求做出响应。

如何理解任务是可取消的 ?

如果外部代码能在某个任务正常完成之前将其置入 “完成” 状态,那么这个任务就被认为是可取消的。

大多数任务,我们都希望让它们运行直到结束,或者让它们自行停止。但是也有很多原因,导致我们需要取消这些任务,如下所示:

  • 用户请求取消。用户点击图形界面程序中的 “取消” 按钮,或者通过管理接口来发出取消请求,例如 JMXJava Management Extensions,即 Java 管理扩展)。
  • 有时间限制的操作。某个应用程序需要在有限时间内搜索问题空间,并在这个时间内选择最佳的解决方案。当计时器超时时,需要取消所有正在搜索的任务。
  • 应用程序事件。应用程序对某个问题空间进行分解并搜索,从而使不同的任务可以搜索问题空间中的不同区域。当其中一个任务找到了解决方案时,所有其他仍在搜索的任务都将被取消。
  • 错误。网页爬虫程序搜索相关的页面,并将页面或摘要数据保存到硬盘。当一个爬虫任务发生错误时(例如,磁盘空间已满),那么所有搜索任务都会取消,此时可能会记录它们的当前状态,以便稍后重新启动。
  • 关闭。当一个程序或服务关闭时,必须对正在处理和等待处理的工作执行某种操作。在平缓的关闭过程中,当前正在执行的任务将继续执行直到完成,而在立即关闭过程中,当前的任务则可能取消。

主要内容

1. 取消策略

当我们需要取消任务时,该怎么操作呢?

Java 中没有一种安全的抢占式方式来停止线程,因此也就没有安全的抢占式方法来停止任务。只有一种 协作机制,使请求取消的任务和代码都遵循一种协商好的协议。

下面我们来看一下如下示例【使用 volatile 类型的域来保存取消状态】:

@ThreadSafe
public class PrimeGenerator implements Runnable {
   
   
    @GuardedBy("this")
    private final List<BigInteger> primes = new ArrayList<BigInteger>();
    // 为了使这个过程能可靠地工作,标志 cancelled 必须为 volatile 类型
    private volatile boolean cancelled;

    public void run() {
   
   
        BigInteger p = BigInteger.ONE;
        while(!cancelled) {
   
   
            LOGGER.debug("before = {}", p);
            p = p.nextProbablePrime();
            LOGGER.debug("prime = {}", p);
            synchronized (this) {
   
   
                primes.add(p);
            }
        }
    }

    public void cancel() {
   
   
        cancelled = true;
    }

    public synchronized List<BigInteger> get() {
   
   
        return new ArrayList<BigInteger>(primes);
    }
}

如上示例 PrimeGenerator 将会持续地枚举素数,直到它被取消。它使用了一种协作机制,cancel 方法将设置 cancelledtrue,任务在 搜索下一个素数之前 会检查这个标志,如果标志为 true,则任务将会自行结束。

下面我们再来看一下如下示例【让上面的素数生成器运行 1 秒钟后取消】:

public class PrimeTest {
   
   

    private static final FleaLogger LOGGER = FleaLoggerProxy.getProxyInstance(PrimeTest.class);

    @Test
    public void primeTest() throws InterruptedException {
   
   
        LOGGER.debug("Primes = {}", aSecondOfPrimes());
    }

    /**
     * 一个仅运行一秒钟的素数生成器
     */
    public List<BigInteger> aSecondOfPrimes() throws InterruptedException {
   
   
        PrimeGenerator generator = new PrimeGenerator();
        new Thread(generator).start();
        try {
   
   
            LOGGER.debug("sleep start");
            SECONDS.sleep(1);
        } finally {
   
   
            generator.cancel();
        }
        return generator.get();
    }
}

实际运行中,上述代码并不会刚好在运行 1 秒后停止,因为在请求取消的时刻和 run 方法中循环执行下一次检查之间可能存在延迟。cancel 方法由 finally 块调用,从而确保即使在调用 sleep 时被中断也能取消素数生成器的执行。如果 cancel 没有被调用,那么搜索素数的线程将永远运行下去。

一个可取消的任务必须拥有取消策略,在该策略中需要详细定义取消操作的三步骤:

  • How。应用程序的其他代码如何(How)请求取消该任务。
  • When。任务在何时(When)检查是否已经请求了取消。
  • What。在响应取消请求时应该执行哪些(What)操作。

上述素数生成器 PrimeGenerator 就使用了一种简单的取消策略:

客户代码通过 调用 cancel 来请求取消,PrimeGenerator 在 每次搜索素数前 首先检查是否存在取消请求,如果存在则 退出

2. 线程中断

介绍中断之前,我们首先来分析一下,上述素数生成器使用的取消机制目前存在的问题:

  • 任务的退出过程仍然需要花费一定的时间。
  • 任务中如果调用了一个阻塞的方法(如 BlockingQueue.put),它可能永远不会检查取消方法,从而永远不会结束。

下面我们再来看一下如下示例:

public class BrokenPrimeProducer extends Thread {
   
   

    private static final FleaLogger LOGGER = FleaLoggerProxy.getProxyInstance(BrokenPrimeProducer.class);

    private final BlockingQueue<BigInteger> queue;

    private volatile boolean cancelled = false;

    public BrokenPrimeProducer(BlockingQueue<BigInteger> queue) {
   
   
        this.queue = queue;
    }

    @Override
    public void run() {
   
   
        try {
   
   
            BigInteger p = BigInteger.ONE;
            while (!cancelled) {
   
   
                LOGGER.debug("before = {}", p);
                queue.put(p = p.nextProbablePrime());
                LOGGER.debug("prime = {}", p);
            }
        } catch (InterruptedException e) {
   
   
            LOGGER.error("InterruptedException");
        }
    }

    public void cancel() {
   
   
        cancelled = true;
        LOGGER.debug("cancel");
    }
}

public class PrimeConsumer {
   
   

    private static final FleaLogger LOGGER = FleaLoggerProxy.getProxyInstance(PrimeConsumer.class);

    private static final int BOUND = 100;

    private int times = 0; // 消费次数

    public void consumePrimes() throws InterruptedException {
   
   
        BlockingQueue<BigInteger> primes = new LinkedBlockingQueue<>(BOUND);
        BrokenPrimeProducer producer = new BrokenPrimeProducer(primes);
        producer.start();
        try {
   
   
            while (needMorePrimes())
                consume(primes.take());
        } finally {
   
   
            producer.cancel();
        }
    }

    private void consume(BigInteger value) {
   
   
        times++;
        LOGGER.debug1(new Object() {
   
   }, "value = {}", value);
    }

    private boolean needMorePrimes() throws InterruptedException {
   
   
        return true;
    }
}

如上示例中,生产者线程 BrokenPrimeProducer 生成素数,并存放到阻塞队列 queue 中【其中 queue 由生产者的构造方法传入】。消费者 PrimeConsumerconsumePrimes 方法从生产者中获取素数并处理。

如果生产者的生产速度超过了消费者的处理速度,队列将被填满,其 put 方法也会一直阻塞下去。

我们来思考下遇到上述情况,如果消费者想取消生产者任务,又该怎么办呢?

消费者可以调用生产者的 cancel 方法来设置 cancelled 标志,但是因为消费者已经停止从队列中取素数,而阻塞队列 queue 的 put 方法将一直保持阻塞状态,导致生产者任务无法从阻塞的 put 方法中恢复过来,也就永远不会再检查 cancelled 标志,从而无法取消生产者任务。

下面我们来修改下消费者的 needMorePrimes 方法, 再用测试类 PrimeConsumerTest 来直观地演示下上述情况:

private boolean needMorePrimes() throws InterruptedException {
   
   
    Thread.sleep(1000); // 等待1s 再判断
    return times < 5;
}

public class PrimeConsumerTest {
   
   

    public static void main(String[] args) throws InterruptedException {
   
   
        PrimeConsumer consumer = new PrimeConsumer();
        consumer.consumePrimes();
    }
}

运行结果如下:

image.png

从上图可以看到,当消费次数达到 5 次后,消费者不再从队列中取素数并打印出来,从代码看它后面直接进入 finally 方法,并且调用生产者的 cancel 方法准备去取消生产者任务,但是生产者线程的打印日志除了 cancel 外,就一直保持 before 那,并且看程序也没有结束掉,说明此时生产者在 put 方法上一直阻塞着。

那么我们如何改造生产者,能够在 put 方法阻塞的情况下,支持消费者取消生产者任务呢?

想要实现这种功能,就不得不提到接下来要讲到的 线程中断

其实在笔者前面的《阻塞队列》博文中,曾简单介绍了阻塞方法与中断方法,大家可以快速去回顾一下。

说到线程中断,就不得不提到 Thread 类,下面简单介绍下:

每个线程都有一个 boolean 类型的中断状态。

当中断线程时,该线程的中断状态将被设置为 true

线程中还包含了中断线程、查询线程中断状态的方法,如下所示:

  • interrupt 方法能中断目标线程。
  • isInterrupted 方法能返回目标线程的中断状态。
  • 静态的 interrupted 方法将清除当前线程的中断状态,并返回它之前的值,这也是清除中断状态的唯一方法。

注意:

  • 调用 interrupt 方法并不意味着立即停止目标线程正在进行的工作,而只是传递了请求中断的消息。
  • 因为静态的 interrupted 方法会清除当前线程的中断状态。如果调用它时返回了 true,那么除非你想屏蔽这个中断,否则必须对它进行处理----可以抛出 InterruptedException,或者通过再次调用 interrupt 来恢复中断状态【可以结合《阻塞队列》“阻塞方法与中断方法” 那块的内容进行思考】。

当线程在非阻塞状态下中断时,它的中断状态将被设置,然后根据将被取消的操作来检查中断状态以判断发生了中断。如果不触发 InterruptedException,那么中断状态将一直保持,直到明确地清除中断状态。

我们知道 Java 类库中的一些阻塞库方法支持中断。例如 Thread.sleepObject.wait 等,它们都会检查中断状态,并且在发现中断时提前返回。

它们在响应中断时执行的操作包括:

  • 清除中断状态
  • 抛出 InterruptedException,表示阻塞操作由于中断而提前结束。

如果任务代码能够响应中断,那么可以使用中断作为取消机制,并且利用 Java 类库中提供的中断支持。

通过上面的了解,我们现在可以使用中断来请求取消生产者任务,以解决 BrokenPrimeProducer 存在的问题。

下面来看一下改造后的生产者的代码示例:

public class PrimeProducer extends Thread {
   
   

    private static final FleaLogger LOGGER = FleaLoggerProxy.getProxyInstance(PrimeProducer.class);

    private final BlockingQueue<BigInteger> queue;

    public PrimeProducer(BlockingQueue<BigInteger> queue) {
   
   
        this.queue = queue;
    }

    @Override
    public void run() {
   
   
        try {
   
   
            BigInteger p = BigInteger.ONE;
            while (!Thread.currentThread().isInterrupted()) {
   
   
                LOGGER.debug("before = {}", p);
                queue.put(p = p.nextProbablePrime());
                LOGGER.debug("prime = {}", p);
            }
        } catch (InterruptedException e) {
   
   
            // 允许线程退出
            LOGGER.error("InterruptedException");
        }
    }

    public void cancel() {
   
   
        interrupt(); // 中断线程
        LOGGER.debug("interrupt");
    }
}

public class PrimeConsumer {
   
   

    public void consumePrimes() throws InterruptedException {
   
   
        // 。。。
        PrimeProducer producer = new PrimeProducer(primes);
        producer.start();
        // 。。。
    }

    // 。。。
}

上述生产者 PrimeProducer 在每次循环中,都有两个位置可以检测出中断:

  1. 阻塞的 put 方法调用中。
  2. while 循环的判断条件中。

当然因为这里调用阻塞的 put 方法,while 循环条件中显式的检测也可以去掉。但如果加上这段的话,可以使 PrimeProducer 对中断具有更高的响应性。

然后我们用自测类 PrimeConsumerNewTest 来演示一下使用中断来请求取消:

public class PrimeConsumerNewTest {
   
   
    public static void main(String[] args) throws InterruptedException {
   
   
        PrimeConsumer consumer = new PrimeConsumer();
        consumer.consumePrimes();
    }
}

运行结果如下:

image.png

从上图可以看到,当消费次数达到 5 次后,消费者不再从队列中取素数并打印出来,从代码看它后面直接进入 finally 方法,并且调用生产者的 cancel 方法,这里可以看到生产者日志打印了 interrupt,发出了中断请求;在发出中断请求之前,我们也从日志中看到生产最后停在了 before 处,说明此时生产者在 put 方法上阻塞着;而发出中断请求之后,put 方法响应了中断,并抛出了 InterruptedException,从日志看就是打印了 InterruptedException

3. 中断策略

正如任务中应该包含取消策略一样,线程同样应该包含中断策略。

那该如何理解中断策略呢?

中断策略规定了当线程发现中断请求时,应该做哪些工作,哪些工作单元对于中断来说是原子操作,以及以多快的速度来响应中断。

其中最合理的中断策略是什么呢?

当线程发现中断请求后,就尽快退出,在必要时进行清理,并通知某个所有者该线程已经退出。

当然,除上外还可以建立其他的中断策略,如 暂停服务重新开始服务

我们知道任务不会在其自己拥有的线程中执行,而是在某个服务(例如线程池)拥有的线程中执行。对于非线程所有者的代码来说(例如,对于线程池而言,任何在线程池实现以外的代码),应该小心地保存中断状态,这样拥有线程的代码才能对中断做出响应,即使 “非所有者” 代码也可以做出响应。

这也就是为什么大多数可阻塞的库函数都只是抛出 InterruptedException 作为中断响应。它们永远不会在某个由自己拥有的线程中运行,因此它们为任务或库代码实现了最合理的取消策略:尽快退出执行流程,并把中断信息传递给调用者,从而使调用栈中的上层代码可以采取进一步的操作

当检查到中断请求时,任务并不需要放弃所有的操作,它可以推迟处理中断请求,并直到某个更合适的时刻。因此就需要记住中断请求,并在完成当前任务后抛出 InterruptedException 或者 表示已收到中断请求。

无论任务把中断视为取消,还是其他某个中断响应操作,都应该小心地保存执行线程的中断状态。如果除了将 InterruptedException 传递给调用者外还需要下执行其他操作,那么应该在捕获 InterruptedException 之后恢复中断状态:

    Thread.currentThread().interrupt();

线程应该只能由其所有者中断,所有者可以将线程的中断策略信息封装到某个合适的取消机制中,例如 关闭方法

由于每个线程拥有各自的中断策略,因此除非你知道中断对该线程的含义,否则就不应该中断这个线程。

4. 响应中断

在笔者前面的《阻塞队列》博文中,当调用可中断的阻塞函数时,例如 Thread.sleepBlockingQueue.put 等,有两种常见的方法可用于处理 InterruptedException :传递 InterruptedException 和 恢复中断。

不过需要注意的是,你不能在 catch 块中捕获到 InterruptedException 异常却不做任何处理,除非在你的代码中实现了线程的中断策略。

当然细心的小伙伴就会说了,上文中的生产者线程 PrimeProducer 不就捕获了 InterruptedException 异常却不做任何处理。这里需要解释下,虽然 PrimeProducer 屏蔽了中断,但因为它已经知道线程将要结束,并且在调用栈中已经没有上层代码需要知道中断信息。上述只是一类特殊的情况,由于大多数代码并不知道它们将在哪个线程中运行,因此应该保存中断状态。

只有实现了线程中断策略的代码才可以屏蔽中断请求。在常规的任务和库代码中都不应该屏蔽中断请求。

对于一些不支持取消但仍可以调用可中断阻塞方法的操作,它们必须在循环中调用这些方法,并在发现中断后重新尝试。在这种情况下,它们应该在本地保存中断状态,并在返回前恢复状态而不是在捕获 InterruptedException 时恢复状态,可参考如下示例:

    public Task getNextTask(BlockingQueue<Task> queue) {
   
   
        boolean interrupted = false;
        try {
   
   
            while (true) {
   
   
                try {
   
   
                    return queue.take();
                } catch (InterruptedException e) {
   
   
                    interrupted = true;
                    // 重新尝试
                }
            }
        } finally {
   
   
            if (interrupted) 
                Thread.currentThread().interrupt(); // 恢复中断
        }
    }

如果过早地设置中断状态,就可能引起无限循环,因为大多数可中断的阻塞方法都会在入口处检查中断状态,并且当发现该状态已被设置时会立即抛出 InterruptedException。(通常,可中断的方法会在阻塞或进行重要的工作前首先检查中断,从而尽快地响应中断)。

如果代码不会调用可中断的阻塞方法,那么仍然可以通过在任务代码中轮询当前线程的中断状态来响应中断。要选择合适的轮询频率,就需要在效率和响应性之间进行权衡。如果响应性要求较高,那么就不应该调用那些执行时间较长并且不响应中断的方法。

在取消过程中可能涉及除了中断状态之外的其他状态,中断可以用来获得线程的注意,并且由中断线程保存的信息,可以为中断的线程提供进一步的指示。(当访问这些信息时,要确保使用同步。)

例如,当一个由 ThreadPoolExecutor 拥有的工作者线程检测到中断时,它会检查线程池是否正在关闭。如果是,它会在结束之前执行一些线程池清理工作,否则它可能创建一个新线程将线程池恢复到合理的规模。

总结

本篇介绍了取消策略、线程中断、中断策略 和 响应中断的内容,下篇将要介绍如何编写任务和服务,使它们能对取消请求做出响应。

相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
目录
相关文章
|
4天前
|
安全 Java 调度
Java编程时多线程操作单核服务器可以不加锁吗?
Java编程时多线程操作单核服务器可以不加锁吗?
17 2
|
6天前
|
消息中间件 分布式计算 Java
Linux环境下 java程序提交spark任务到Yarn报错
Linux环境下 java程序提交spark任务到Yarn报错
16 5
|
7天前
|
Java 开发者
深入探索Java中的并发编程
本文将带你领略Java并发编程的奥秘,揭示其背后的原理与实践。通过深入浅出的解释和实例,我们将探讨Java内存模型、线程间通信以及常见并发工具的使用方法。无论是初学者还是有一定经验的开发者,都能从中获得启发和实用的技巧。让我们一起开启这场并发编程的奇妙之旅吧!
|
8天前
|
算法 安全 Java
JAVA并发编程系列(12)ThreadLocal就是这么简单|建议收藏
很多人都以为TreadLocal很难很深奥,尤其被问到ThreadLocal数据结构、以及如何发生的内存泄漏问题,候选人容易谈虎色变。 日常大家用这个的很少,甚至很多近10年资深研发人员,都没有用过ThreadLocal。本文由浅入深、并且才有通俗易懂方式全面分析ThreadLocal的应用场景、数据结构、内存泄漏问题。降低大家学习啃骨头的心理压力,希望可以帮助大家彻底掌握并应用这个核心技术到工作当中。
|
3天前
|
Java
JAVA并发编程系列(13)Future、FutureTask异步小王子
本文详细解析了Future及其相关类FutureTask的工作原理与应用场景。首先介绍了Future的基本概念和接口方法,强调其异步计算特性。接着通过FutureTask实现了一个模拟外卖订单处理的示例,展示了如何并发查询外卖信息并汇总结果。最后深入分析了FutureTask的源码,包括其内部状态转换机制及关键方法的实现原理。通过本文,读者可以全面理解Future在并发编程中的作用及其实现细节。
|
6天前
|
Java 数据处理 调度
Java中的多线程编程:从基础到实践
本文深入探讨了Java中多线程编程的基本概念、实现方式及其在实际项目中的应用。首先,我们将了解什么是线程以及为何需要多线程编程。接着,文章将详细介绍如何在Java中创建和管理线程,包括继承Thread类、实现Runnable接口以及使用Executor框架等方法。此外,我们还将讨论线程同步和通信的问题,如互斥锁、信号量、条件变量等。最后,通过具体的示例展示了如何在实际项目中有效地利用多线程提高程序的性能和响应能力。
|
7天前
|
安全 算法 Java
Java中的多线程编程:从基础到高级应用
本文深入探讨了Java中的多线程编程,从最基础的概念入手,逐步引导读者了解并掌握多线程开发的核心技术。无论是初学者还是有一定经验的开发者,都能从中获益。通过实例和代码示例,本文详细讲解了线程的创建与管理、同步与锁机制、线程间通信以及高级并发工具等主题。此外,还讨论了多线程编程中常见的问题及其解决方案,帮助读者编写出高效、安全的多线程应用程序。
|
2月前
|
算法 Java 开发者
Java 编程入门:从零到一的旅程
本文将带领读者开启Java编程之旅,从最基础的语法入手,逐步深入到面向对象的核心概念。通过实例代码演示,我们将一起探索如何定义类和对象、实现继承与多态,并解决常见的编程挑战。无论你是编程新手还是希望巩固基础的开发者,这篇文章都将为你提供有价值的指导和灵感。
|
2月前
|
机器学习/深度学习 Java TensorFlow
深度学习中的图像识别:从理论到实践Java中的多线程编程入门指南
【8月更文挑战第29天】本文将深入探讨深度学习在图像识别领域的应用,从基础理论到实际应用案例,带领读者一步步理解如何利用深度学习技术进行图像识别。我们将通过一个简单的代码示例,展示如何使用Python和TensorFlow库实现一个基本的图像识别模型。无论你是初学者还是有一定经验的开发者,都能从中获得启发和学习。 【8月更文挑战第29天】在Java世界里,线程是程序执行的最小单元,而多线程则是提高程序效率和响应性的关键武器。本文将深入浅出地引导你理解Java多线程的核心概念、创建方法以及同步机制,帮助你解锁并发编程的大门。
|
3月前
|
传感器 数据采集 监控
Java串口编程入门
Java串口编程入门
下一篇
无影云桌面