Java Interrupt

简介: 本文中,我们主要介绍一下 Java 提供的 Interrupt 机制的常见使用方法,以及 Interrupt 的实现。

引言

本文中,我们主要介绍一下 Java 提供的 Interrupt 机制的常见使用方法,以及 Interrupt 的实现。所有关于 Java 并发的文章均收录于<Java并发系列文章>

Interrupt

在 Java 中,如果我们想要操作一个线程的执行状态,可以直接调用 Thread::stop 停止它的运行,但是这样显然有些粗暴,如果线程正处于临界区进行数据修改,可能会导致数据的错乱。同样,我们也可以通过 Thread::suspendThread::resume 来暂停和恢复一个线程的运行。

但是上述的这些方式,都是在比较下层的方式来控制一个线程的运行,不够优雅不说,还存在潜在风险。所以从 JDK1.2 开始这些方法就已经被废弃了。取而代之的,是一个新的 Thread::runterupt 接口,它是一个交互式的接口,为什么这么说呢?因为它没有 Thread::stop 那么强硬直接从底层销毁线程,相反的它以类似于通知的方式告诉线程,现在希望你停止下来,具体停不停,以什么方式停下来,都由线程方自行决定。

你既可以完全忽略中断通知:

while (true) {
    try {
        // do some thing
    } catch (InterruptedException ignored) {
    }
}

也可以接收到中断请求后,立马停止。

while (!Thread.interrupted()) {
    try {
        // do some thing
    } catch (InterruptedException ignored) {
        break;
    }
}

使用过 Thread::interrupt 可能会发现一个“怪”现象,当一个线程被中断时,有的时候线程内会产生 InterruptedException,有的时候啥都没有,这是为什么呢?这是因为,当我们执行不同的代码时,interrupt 的处理结果并不相同:

  • 对于大部分阻塞线程的方法,使用Thread.interrupt(),可以立刻退出等待,抛出InterruptedException 这些方法包括Object.wait(), Thread.join(),Thread.sleep(),以及各种AQS衍生类:Lock.lockInterruptibly()等任何显示声明throws InterruptedException的方法。
  • 被阻塞的nio Channel也会响应interrupt(),抛出ClosedByInterruptException,相应nio通道需要实现java.nio.channels. InterruptibleChannel接口。
  • 如果使用的是传统IO(非Channel,如ServerSocket.accept),所在线程被interrupt时不会抛出ClosedByInterruptException,只会修改 Thread 的 interrupt 标志位。。但可以使用流的close方法实现退出阻塞。
  • 还有一些阻塞方法不会响应interrupt,如等待进入synchronized段、Lock.lock()。他们不能被动的退出阻塞状态,只会修改 Thread 的 interrupt 标志位。
  • 一般的数字运算,内存操作也不会抛出 InterruptException,只会修改 Thread 的 interrupt 标志位。

看到这大家肯定有疑问,怎么规则这么多啊,interrupt 到底是怎么实现的啊。带着这些疑问,我们深入到 Thread::interrupt 的源码中。

//Class java.lang.Thread
public void interrupt() {
    if (this != Thread.currentThread())
        checkAccess();
    synchronized (blockerLock) {
        Interruptible b = blocker;   //中断触发器
        if (b != null) {
            interrupt0();
            b.interrupt(this);   //触发回调接口
            return;
        }
    }
    interrupt0();
}

private native void interrupt0();

从代码中可以看到,在执行 interrupt 时,首先会进行权限检查,如果被中断的线程属于系统线程组(即JVM线程),checkAccess()方法会使用系统的System.getSecurityManager()来判断权限。由于Java默认没有开启安全策略,此方法其实会跳过安全检查。

然后,如果中断触发器 broker 不为空,则会先执行 interrupt0 这个原生函数,然后触发 broker 的回调函数。这里我们先介绍一下 blocker 可以用来干什么。前面我们说了,被阻塞的nio Channel也会响应interrupt(),抛出ClosedByInterruptException。这个就是通过 broker 实现的。具体的实现在InterruptibleChannel接口的抽象实现类AbstractInterruptibleChannel的方法begin()中:

//Class java.nio.channels.spi.AbstractInterruptibleChannel
protected final void begin() {
    if (interruptor == null) {
        interruptor = new Interruptible() {
                public void interrupt(Thread target) {
                    synchronized (closeLock) {
                        if (!open)
                            return;
                        open = false;
                        interrupted = target;
                        try {//关闭io通道
                            AbstractInterruptibleChannel.this.implCloseChannel();
                        } catch (IOException x) { }
                    }
                }};
    }
    blockedOn(interruptor);//将interruptor设置为当前线程的blocker
    Thread me = Thread.currentThread();
    if (me.isInterrupted())
        interruptor.interrupt(me);
}

protected final void end(boolean completed) throws AsynchronousCloseException {
    blockedOn(null);
    Thread interrupted = this.interrupted;
    if (interrupted != null && interrupted == Thread.currentThread()) {
        interrupted = null;
        throw new ClosedByInterruptException();
    }
    if (!completed && !open)
        throw new AsynchronousCloseException();
}

AbstractInterruptibleChannel 有两个函数,begin 负责将 Interruptible 回调器注入到 Thread 中,当前线程被打断时直接断开连接。end 函数负责检查当前线程是否被中断,如果是的话,则抛出 ClosedByInterruptException 异常。

//Class java.nio.channels.Channels.ReadableByteChannelImpl
public int read(ByteBuffer dst) throws IOException {
    ......
    try {
        begin();
        bytesRead = in.read(buf, 0, bytesToRead);
    finally {
        end(bytesRead > 0);
    }
    ......
}

nio通道ReadableByteChannel每次执行阻塞方法read()前,都会执行begin(),把Interruptible回调接口注册到当前线程上,以实现能够响应其他线程的中断。当线程收到中断时,Thread.interrupt()触发回调接口,在回调接口Interruptible中关闭io通道并返回,最后在finally块中执行end(),end()方法会检查中断标记,抛出ClosedByInterruptException。

之前在介绍Thread的时候曾经提到,JavaThread有三个成员变量,这三个变量就是 interrupt 实现的关键:

//用于synchronized同步块和Object.wait()
ParkEvent * _ParkEvent ;
//用于Thread.sleep()
ParkEvent * _SleepEvent ;
//用于unsafe.park()/unpark(),供java.util.concurrent.locks.LockSupport调用,
//因此它支持了java.util.concurrent的各种锁、条件变量等线程同步操作,是concurrent的实现基础
Parker* _parker;

接下来,我们看一下原生方法 interrupt0 的实现。


JVM_ENTRY(void, JVM_Interrupt(JNIEnv* env, jobject jthread))
    JVMWrapper("JVM_Interrupt");
    // Ensure that the C++ Thread and OSThread structures aren't freed before we operate
    oop java_thread = JNIHandles::resolve_non_null(jthread);
    MutexLockerEx ml(thread->threadObj() == java_thread ? NULL : Threads_lock);
    // We need to re-resolve the java_thread, since a GC might have happened during the
    // acquire of the lock
    // 当调用 Thread::start 之后 JavaThread实例才会被创建,所以如果 Interrupt 发生在thread start 之前,是不会起到中断效果的
    JavaThread* thr = java_lang_Thread::thread(JNIHandles::resolve_non_null(jthread));
    if (thr != NULL) {
        Thread::interrupt(thr);
    }
JVM_END

JVM_Interrupt对参数进行了校验,然后直接调用Thread::interrupt:

void os::interrupt(Thread* thread) {
    assert(Thread::current() == thread || Threads_lock->owned_by_self(),
        "possibility of dangling Thread pointer");
    //获取系统native线程对象
    OSThread* osthread = thread->osthread();
    if (!osthread->interrupted()) {
        osthread->set_interrupted(true);
        //内存屏障,使osthread的interrupted状态对其它线程立即可见
        OrderAccess::fence();
        //前文说过,_SleepEvent用于Thread.sleep,线程调用了sleep方法,则通过unpark唤醒
        ParkEvent * const slp = thread->_SleepEvent ;
        if (slp != NULL) slp->unpark() ;
    }
    //_parker用于concurrent相关的锁,此处同样通过unpark唤醒
    if (thread->is_Java_thread())
        ((JavaThread*)thread)->parker()->unpark();
    //synchronized同步块和Object.wait() 唤醒
    ParkEvent * ev = thread->_ParkEvent ;
    if (ev != NULL) ev->unpark() ;
}

简单地说,interrupt 底层不仅仅会修改线程的中断标志位 osthread->set_interrupted(true) ,当该线程处于等待状态时,还会试图唤醒该线程。无论是它在 sleep 中,在 park 中,还是在 wait 中,或者正在获取 monitor 锁。关于 sleep,park和wait,我们前面已经介绍过了,当被唤醒时,如果发现线程的 interrupt 标志位被置位,就会抛出异常。

那 monitor 锁也是阻塞在 park 函数上的,它被唤醒之后怎么不抛出异常并返回呢?实际上这是实现策略决定的,如果是synchronized等待事件,被唤醒后会尝试获取锁,如果失败则会通过循环继续park()等待,因此synchronized等待实际上是不会被interrupt()中断的。

和 monitor 锁类似,通过 AQS 实现的锁的 lock 接口也是使用了同样的策略。在检测到中断时,不抛出异常继续尝试获得锁。

/**
 * Acquires in exclusive uninterruptible mode for thread already in
 * queue. Used by condition wait methods as well as acquire.
 *
 * @param node the node
 * @param arg the acquire argument
 * @return {@code true} if interrupted while waiting
 */
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

/**
 * Convenience method to park and then check if interrupted
 *
 * @return {@code true} if interrupted
 */
private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

但是,AQS 也提供了可中断的加锁 api doAcquireInterruptibly , 它的实现就是检测到中断,就抛出异常。

/**
 * Acquires in exclusive interruptible mode.
 * @param arg the acquire argument
 */
private void doAcquireInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

关于 Interrupt 的实现,想必大家应该已经比较清楚了,那么我们应该如何正确的使用 Interrupt 接口,如果妥善的处理 InterruptedException 呢?

  • 如果自己很清楚当前线程被中断后的处理方式:则按自己的方式处理,通常是做好善后工作,主动退出线程
  • 自己不知道外层业务需要达到什么目的:直接在方法声明中throws InterruptedException,丢给上层处理,这种方式也很常见,将中断的处置权交给具体的业务来处理
  • 自己有需要清理的数据,同时外层函数可能也需要清理一些数据:通过 Thread::interrupted 清除标志位(如果捕获到 InterruptedException 说明标志位已经被清除,就不要调用 Thread::interrupted ),然后再抛出新的异常给上一层,或者通过 Thread::interupt 重新标记中断标志位等。

Thread 的 Interrupt 相关接口有3个,使用起来比较容易混淆,这里简单地帮大家区分一下:

  • interupt:中断指定线程。
  • isInterrupted:检查该线程是否被中断,但是不重置中断标志位。
  • interrupted:检查该线程是否被中断,同时重置中断标志位。

文章说明

更多有价值的文章均收录于贝贝猫的文章目录

stun

版权声明: 本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!

创作声明: 本文基于下列所有参考内容进行创作,其中可能涉及复制、修改或者转换,图片均来自网络,如有侵权请联系我,我会第一时间进行删除。

参考内容

[1] linux 2.6 互斥锁的实现-源码分析
[2] 深入解析条件变量(condition variables)
[3] Linux下Condition Vairable和Mutext合用的小细节
[4] 从ReentrantLock的实现看AQS的原理及应用
[5] 不可不说的Java“锁”事
[6] 从源码层面解析yield、sleep、wait、park
[7] LockSupport中的park与unpark原理
[8] Thread.sleep、Object.wait、LockSupport.park 区别
[9] 从AQS到futex-二-JVM的Thread和Parker
[10] Java的LockSupport.park()实现分析
[11] JVM源码分析之Object.wait/notify实现
[12] Java线程源码解析之interrupt
[13] Thread.interrupt()相关源码分析%E7%9B%B8%E5%85%B3%E6%BA%90%E7%A0%81%E5%88%86%E6%9E%90/)
[14] Java CAS 原理剖析
[15] 源码解析 Java 的 compareAndSwapObject 到底比较的是什么
[16] 《Java并发编程的艺术》
[17] 《实战 Java 高并发程序设计》
[18] volatile关键字深入学习
[19] 为什么Netty的FastThreadLocal速度快
[20] 线程池ThreadPoolExecutor实现原理
[21] 深入理解Java线程池:ThreadPoolExecutor
[22] ConcurrentHashMap 详解一
[23] ConcurrentHashMap 详解二
[24] JUC中Atomic class之lazySet的一点疑惑
[25] The JSR-133 Cookbook for Compiler Writers
[26] 就是要你懂Java中volatile关键字实现原理

相关文章
|
监控 Java
一文了解JAVA线程的中断(Interrupt)机制
一文了解JAVA线程的中断(Interrupt)机制
858 0
一文了解JAVA线程的中断(Interrupt)机制
java中interrupt,interrupted和isInterrupted的区别
java中interrupt,interrupted和isInterrupted的区别
|
Java 调度
Java并发编程之线程生命周期、守护线程、优先级、关闭和join、sleep、yield、interrupt
Java并发编程中,其中一个难点是对线程生命周期的理解,和多种线程控制方法、线程沟通方法的灵活运用。这些方法和概念之间彼此联系紧密,共同构成了Java并发编程基石之一。 Java线程的生命周期 Java线程类定义了New、Runnable、Running Man、Blocked和Dead五种状态。
1021 0
|
Java
关于java线程中stop interrupt daemon wait notify
一。关于终止线程stop与interrupt   一般来说,线程执行结束后就变成消亡状态,乍看之下我们并不需要人为进行干预(人为停止线程),不过凡事都有例外吧,在服务器或者其他应用场景下,线程为了提供服务而一直在不停的运转,因此必要时刻我们还需“人为干涉的”。
1297 0
|
Java
java 结束线程 interrupt()
http://blog.csdn.net/wxwzy738/article/details/8516253 class Example3 extends Thread {   volatile boolean stop = false;   public static void main( String args[] ) throws Exception {    Exampl
1282 0
|
1天前
|
安全 Java
java多线程(一)(火车售票)
java多线程(一)(火车售票)
|
1天前
|
安全 Java 调度
Java并发编程:深入理解线程与锁
【4月更文挑战第18天】本文探讨了Java中的线程和锁机制,包括线程的创建(通过Thread类、Runnable接口或Callable/Future)及其生命周期。Java提供多种锁机制,如`synchronized`关键字、ReentrantLock和ReadWriteLock,以确保并发访问共享资源的安全。此外,文章还介绍了高级并发工具,如Semaphore(控制并发线程数)、CountDownLatch(线程间等待)和CyclicBarrier(同步多个线程)。掌握这些知识对于编写高效、正确的并发程序至关重要。