java线程阻塞中断和LockSupport的常见问题

简介:

上周五和周末,工作忙里偷闲,在看java cocurrent中也顺便再温故了一下Thread.interrupt和java 5之后的LockSupport的实现。

 

在介绍之前,先抛几个问题。

 

  1. Thread.interrupt()方法和InterruptedException异常的关系?是由interrupt触发产生了InterruptedException异常?
  2. Thread.interrupt()会中断线程什么状态的工作? RUNNING or BLOCKING?
  3. 一般Thread编程需要关注interrupt中断不?一般怎么处理?可以用来做什么?
  4. LockSupport.park()和unpark(),与object.wait()和notify()的区别?
  5. LockSupport.park(Object blocker)传递的blocker对象做什么用?
  6. LockSupport能响应Thread.interrupt()事件不?会抛出InterruptedException异常?
  7. Thread.interrupt()处理是否有对应的回调函数?类似于钩子调用?
如果你都都能很明确的答上来了,说明你已经完全懂Thread.interrupt,可以不用往下看那了。

那如果不清楚的,带着这几个问题,一起来梳理下。
Thread的interrupt处理的几个方法:
  • public void interrupt() :  执行线程interrupt事件
  • public boolean isInterrupted() : 检查当前线程是否处于interrupt
  • public static boolean interrupted() : check当前线程是否处于interrupt,并重置interrupt信息。类似于resetAndGet()

理解:
1. 每个线程都有一个interrupt status标志位,用于表明当前线程是否处于中断状态
2. 一般调用Thread.interrupt()会有两种处理方式
  • 遇到一个低优先级的block状态时,比如object.wait(),object.sleep(),object.join()。它会立马触发一个unblock解除阻塞,并throw一个InterruptedException。
  • 其他情况,Thread.interrupt()仅仅只是更新了status标志位。然后你的工作线程通过Thread.isInterrrupted()进行检查,可以做相应的处理,比如也throw InterruptedException或者是清理状态,取消task等。
在interrupt javadoc中描述:



最佳实践
IBM上有篇文章写的挺不错。 Java theory and practice: Dealing with InterruptedException , 里面提到了Interrupt处理的几条最佳实践。
  1. Don't swallow interrupts (别吃掉Interrupt,一般是两种处理:  继续throw InterruptedException异常。  另一种就是继续设置Thread.interupt()异常标志位,让更上一层去进行相应处理。
    1.public class TaskRunner implements Runnable {  
    2.    private BlockingQueue<Task> queue;  
    3.  
    4.    public TaskRunner(BlockingQueue<Task> queue) {   
    5.        this.queue = queue;   
    6.    }  
    7.  
    8.    public void run() {   
    9.        try {  
    10.             while (true) {  
    11.                 Task task = queue.take(10, TimeUnit.SECONDS);  
    12.                 task.execute();  
    13.             }  
    14.         }  
    15.         catch (InterruptedException e) {   
    16.             // Restore the interrupted status  
    17.             Thread.currentThread().interrupt();  
    18.         }  
    19.    }  
    20.}  
    

  1. Implementing cancelable tasks with Interrupt (使用Thread.interrupt()来设计和支持可被cancel的task)
    1.public class PrimeProducer extends Thread {  
    2.    private final BlockingQueue<BigInteger> queue;  
    3.  
    4.    PrimeProducer(BlockingQueue<BigInteger> queue) {  
    5.        this.queue = queue;  
    6.    }  
    7.  
    8.    public void run() {  
    9.        try {  
    10.            BigInteger p = BigInteger.ONE;  
    11.            while (!Thread.currentThread().isInterrupted())  
    12.                queue.put(p = p.nextProbablePrime());  
    13.        } catch (InterruptedException consumed) {  
    14.            /* Allow thread to exit */  
    15.        }  
    16.    }  
    17.  
    18.    public void cancel() { interrupt(); } // 发起中断  
    19.}<span style="white-space: normal;"> </span>  
    

注册Interrupt处理事件(非正常用法)

一般正常的task设计用来处理cancel,都是采用主动轮询的方式检查Thread.isInterrupt(),对业务本身存在一定的嵌入性,还有就是存在延迟,你得等到下一个检查点(谁知道下一个检查点是在什么时候,特别是进行一个socket.read时,遇到过一个HttpClient超时的问题)。

 

来看一下,主动抛出InterruptedException异常的实现,借鉴于InterruptibleChannel的设计,比较取巧。


1.interface InterruptAble { // 定义可中断的接口  
2.  
3.    public void interrupt() throws InterruptedException;  
4.}  
5.  
6.abstract class InterruptSupport implements InterruptAble {  
7.  
8.    private volatile boolean interrupted = false;  
9.    private Interruptible    interruptor = new Interruptible() {  
10.  
11.                                             public void interrupt() {  
12.                                                 interrupted = true;  
13.                                                 InterruptSupport.this.interrupt(); // 位置3  
14.                                             }  
15.                                         };  
16.  
17.    public final boolean execute() throws InterruptedException {  
18.        try {  
19.            blockedOn(interruptor); // 位置1  
20.            if (Thread.currentThread().isInterrupted()) { // 立马被interrupted  
21.                interruptor.interrupt();  
22.            }  
23.            // 执行业务代码  
24.            bussiness();  
25.        } finally {  
26.            blockedOn(null);   // 位置2  
27.        }  
28.  
29.        return interrupted;  
30.    }  
31.  
32.    public abstract void bussiness() ;  
33.  
34.    public abstract void interrupt();  
35.  
36.    // -- sun.misc.SharedSecrets --  
37.    static void blockedOn(Interruptible intr) { // package-private  
38.        sun.misc.SharedSecrets.getJavaLangAccess().blockedOn(Thread.currentThread(), intr);  
39.    }  
40.}  

代码说明,几个取巧的点:

位置1:利用sun提供的blockedOn方法,绑定对应的Interruptible事件处理钩子到指定的Thread上。

位置2:执行完代码后,清空钩子。避免使用连接池时,对下一个Thread处理事件的影响。

位置3:定义了Interruptible事件钩子的处理方法,回调InterruptSupport.this.interrupt()方法,子类可以集成实现自己的业务逻辑,比如sock流关闭等等。

 

使用: 


1.class InterruptRead extends InterruptSupport {  
2.  
3.    private FileInputStream in;  
4.  
5.    @Override  
6.    public void bussiness() {  
7.        File file = new File("/dev/urandom"); // 读取linux黑洞,永远读不完  
8.        try {  
9.            in = new FileInputStream(file);  
10.            byte[] bytes = new byte[1024];  
11.            while (in.read(bytes, 0, 1024) > 0) {  
12.                // Thread.sleep(100);  
13.                // if (Thread.interrupted()) {// 以前的Interrupt检查方式  
14.                // throw new InterruptedException("");  
15.                // }  
16.            }  
17.        } catch (Exception e) {  
18.            throw new RuntimeException(e);  
19.        }  
20.    }  
21.  
22.    public FileInputStream getIn() {  
23.        return in;  
24.    }  
25.  
26.    @Override  
27.    public void interrupt() {  
28.        try {  
29.            in.getChannel().close();  
30.        } catch (IOException e) {  
31.            e.printStackTrace();  
32.        }  
33.    }  
34.  
35.}  
36.  
37.public static void main(String args[]) throws Exception {  
38.        final InterruptRead test = new InterruptRead();  
39.        Thread t = new Thread() {  
40.  
41.            @Override  
42.            public void run() {  
43.                long start = System.currentTimeMillis();  
44.                try {  
45.                    System.out.println("InterruptRead start!");  
46.                    test.execute();  
47.                } catch (InterruptedException e) {  
48.                    System.out.println("InterruptRead end! cost time : " + (System.currentTimeMillis() - start));  
49.                    e.printStackTrace();  
50.                }  
51.            }  
52.        };  
53.        t.start();  
54.        // 先让Read执行3秒  
55.        Thread.sleep(3000);  
56.        // 发出interrupt中断  
57.        t.interrupt();  
58.    }  

jdk源码介绍: 

1. sun提供的钩子可以查看System的相关代码, line : 1125


1.sun.misc.SharedSecrets.setJavaLangAccess(new sun.misc.JavaLangAccess(){  
2.            public sun.reflect.ConstantPool getConstantPool(Class klass) {  
3.                return klass.getConstantPool();  
4.            }  
5.            public void setAnnotationType(Class klass, AnnotationType type) {  
6.                klass.setAnnotationType(type);  
7.            }  
8.            public AnnotationType getAnnotationType(Class klass) {  
9.                return klass.getAnnotationType();  
10.            }  
11.            public <E extends Enum<E>>  
12.            E[] getEnumConstantsShared(Class<E> klass) {  
13.                return klass.getEnumConstantsShared();  
14.            }  
15.            public void blockedOn(Thread t, Interruptible b) {  
16.                t.blockedOn(b);  
17.            }  
18.        });  

 2. Thread.interrupt()


1.public void interrupt() {  
2.    if (this != Thread.currentThread())  
3.        checkAccess();  
4.  
5.    synchronized (blockerLock) {  
6.        Interruptible b = blocker;  
7.        if (b != null) {  
8.        interrupt0();       // Just to set the interrupt flag  
9.        b.interrupt(); //回调钩子  
10.        return;  
11.        }  
12.    }  
13.    interrupt0();  
14.    }  

更多

更多关于Thread.stop,suspend,resume,interrupt的使用注意点,可以看一下sun的文档,比如http://download.oracle.com/javase/6/docs/technotes/guides/concurrency/threadPrimitiveDeprecation.html

 

 

最后来解答一下之前的几个问题:

问题1: Thread.interrupt()方法和InterruptedException异常的关系?是由interrupt触发产生了InterruptedException异常? 

答: Thread.interrupt()只是在Object.wait() .Object.join(), Object.sleep()几个方法会主动抛出InterruptedException异常。而在其他的的block常见,只是通过设置了Thread的一个标志位信息,需要程序自我进行处理。


1.if (Thread.interrupted())  // Clears interrupted status!  
2.    throw new InterruptedException();  

问题2:Thread.interrupt()会中断线程什么状态的工作? RUNNING or BLOCKING?

答:Thread.interrupt设计的目的主要是用于处理线程处于block状态,比如wait(),sleep()状态就是个例子。但可以在程序设计时为支持task cancel,同样可以支持RUNNING状态。比如Object.join()和一些支持interrupt的一些nio channel设计。

 

问题3: 一般Thread编程需要关注interrupt中断不?一般怎么处理?可以用来做什么?

答: interrupt用途: unBlock操作,支持任务cancel, 数据清理等。

 

问题4: LockSupport.park()和unpark(),与object.wait()和notify()的区别?

答:

1.  面向的主体不一样。LockSuport主要是针对Thread进进行阻塞处理,可以指定阻塞队列的目标对象,每次可以指定具体的线程唤醒。Object.wait()是以对象为纬度,阻塞当前的线程和唤醒单个(随机)或者所有线程。

2.  实现机制不同。虽然LockSuport可以指定monitor的object对象,但和object.wait(),两者的阻塞队列并不交叉。可以看下测试例子。object.notifyAll()不能唤醒LockSupport的阻塞Thread.

 

 

 

问题5: LockSupport.park(Object blocker)传递的blocker对象做什么用?

答: 对应的blcoker会记录在Thread的一个parkBlocker属性中,通过jstack命令可以非常方便的监控具体的阻塞对象.


1.public static void park(Object blocker) {  
2.        Thread t = Thread.currentThread();  
3.        setBlocker(t, blocker); // 设置Thread.parkBlocker属性的值  
4.        unsafe.park(false, 0L);  
5.        setBlocker(t, null);  // 清除Thread.parkBlocker属性的值  
6.    }  

 具体LockSupport的javadoc描述也比较清楚,可以看下:

 

 

问题6: LockSupport能响应Thread.interrupt()事件不?会抛出InterruptedException异常?

答:能响应interrupt事件,但不会抛出InterruptedException异常。针对LockSupport对Thread.interrupte支持,也先看一下javadoc中的描述:


 

相关测试代码

1.package com.agapple.cocurrent;  
2.  
3.import java.io.File;  
4.import java.io.FileInputStream;  
5.import java.lang.reflect.Field;  
6.import java.util.concurrent.TimeUnit;  
7.import java.util.concurrent.locks.LockSupport;  
8.  
9.public class LockSupportTest {  
10.  
11.    private static LockSupportTest blocker = new LockSupportTest();  
12.  
13.    public static void main(String args[]) throws Exception {  
14.        lockSupportTest();  
15.        parkTest();  
16.        interruptParkTest();  
17.        interruptSleepTest();  
18.        interruptWaitTest();  
19.    }  
20.  
21.    /** 
22.     * LockSupport.park对象后,尝试获取Thread.blocker对象,调用其single唤醒 
23.     *  
24.     * @throws Exception 
25.     */  
26.    private static void lockSupportTest() throws Exception {  
27.        Thread t = doTest(new TestCallBack() {  
28.  
29.            @Override  
30.            public void callback() throws Exception {  
31.                // 尝试sleep 5s  
32.                System.out.println("blocker");  
33.                LockSupport.park(blocker);  
34.                System.out.println("wakeup now!");  
35.            }  
36.  
37.            @Override  
38.            public String getName() {  
39.                return "lockSupportTest";  
40.            }  
41.  
42.        });  
43.        t.start(); // 启动读取线程  
44.  
45.        Thread.sleep(150);  
46.        synchronized (blocker) {  
47.            Field field = Thread.class.getDeclaredField("parkBlocker");  
48.            field.setAccessible(true);  
49.            Object fBlocker = field.get(t);  
50.            System.out.println(blocker == fBlocker);  
51.            Thread.sleep(100);  
52.            System.out.println("notifyAll");  
53.            blocker.notifyAll();  
54.        }  
55.    }  
56.  
57.    /** 
58.     * 尝试去中断一个object.wait(),会抛出对应的InterruptedException异常 
59.     *  
60.     * @throws InterruptedException 
61.     */  
62.    private static void interruptWaitTest() throws InterruptedException {  
63.        final Object obj = new Object();  
64.        Thread t = doTest(new TestCallBack() {  
65.  
66.            @Override  
67.            public void callback() throws Exception {  
68.                // 尝试sleep 5s  
69.                obj.wait();  
70.                System.out.println("wakeup now!");  
71.            }  
72.  
73.            @Override  
74.            public String getName() {  
75.                return "interruptWaitTest";  
76.            }  
77.  
78.        });  
79.        t.start(); // 启动读取线程  
80.        Thread.sleep(2000);  
81.        t.interrupt(); // 检查下在park时,是否响应中断  
82.    }  
83.  
84.    /** 
85.     * 尝试去中断一个Thread.sleep(),会抛出对应的InterruptedException异常 
86.     *  
87.     * @throws InterruptedException 
88.     */  
89.    private static void interruptSleepTest() throws InterruptedException {  
90.        Thread t = doTest(new TestCallBack() {  
91.  
92.            @Override  
93.            public void callback() throws Exception {  
94.                // 尝试sleep 5s  
95.                Thread.sleep(5000);  
96.                System.out.println("wakeup now!");  
97.            }  
98.  
99.            @Override  
100.            public String getName() {  
101.                return "interruptSleepTest";  
102.            }  
103.  
104.        });  
105.        t.start(); // 启动读取线程  
106.        Thread.sleep(2000);  
107.        t.interrupt(); // 检查下在park时,是否响应中断  
108.    }  
109.  
110.    /** 
111.     * 尝试去中断一个LockSupport.park(),会有响应但不会抛出InterruptedException异常 
112.     *  
113.     * @throws InterruptedException 
114.     */  
115.    private static void interruptParkTest() throws InterruptedException {  
116.        Thread t = doTest(new TestCallBack() {  
117.  
118.            @Override  
119.            public void callback() {  
120.                // 尝试去park 自己线程  
121.                LockSupport.parkNanos(blocker, TimeUnit.SECONDS.toNanos(5));  
122.                System.out.println("wakeup now!");  
123.            }  
124.  
125.            @Override  
126.            public String getName() {  
127.                return "interruptParkTest";  
128.            }  
129.  
130.        });  
131.        t.start(); // 启动读取线程  
132.        Thread.sleep(2000);  
133.        t.interrupt(); // 检查下在park时,是否响应中断  
134.    }  
135.  
136.    /** 
137.     * 尝试去中断一个LockSupport.unPark(),会有响应 
138.     *  
139.     * @throws InterruptedException 
140.     */  
141.    private static void parkTest() throws InterruptedException {  
142.        Thread t = doTest(new TestCallBack() {  
143.  
144.            @Override  
145.            public void callback() {  
146.                // 尝试去park 自己线程  
147.                LockSupport.park(blocker);  
148.                System.out.println("wakeup now!");  
149.            }  
150.  
151.            @Override  
152.            public String getName() {  
153.                return "parkTest";  
154.            }  
155.  
156.        });  
157.  
158.        t.start(); // 启动读取线程  
159.        Thread.sleep(2000);  
160.        LockSupport.unpark(t);  
161.        t.interrupt();  
162.    }  
163.  
164.    public static Thread doTest(final TestCallBack call) {  
165.        return new Thread() {  
166.  
167.            @Override  
168.            public void run() {  
169.                File file = new File("/dev/urandom"); // 读取linux黑洞  
170.                try {  
171.                    FileInputStream in = new FileInputStream(file);  
172.                    byte[] bytes = new byte[1024];  
173.                    while (in.read(bytes, 0, 1024) > 0) {  
174.                        if (Thread.interrupted()) {  
175.                            throw new InterruptedException("");  
176.                        }  
177.                        System.out.println(bytes[0]);  
178.                        Thread.sleep(100);  
179.                        long start = System.currentTimeMillis();  
180.                        call.callback();  
181.                        System.out.println(call.getName() + " callback finish cost : "  
182.                                           + (System.currentTimeMillis() - start));  
183.                    }  
184.                } catch (Exception e) {  
185.                    e.printStackTrace();  
186.                }  
187.            }  
188.  
189.        };  
190.    }  
191.}  
192.  
193.interface TestCallBack {  
194.  
195.    public void callback() throws Exception;  
196.  
197.    public String getName();  
198.}  

最后

发觉文章越写越长,那就索性发到了论坛,大家一起讨论下.毕竟文章中描述的都是一些使用层面的东东,并没有从操作系统或者sun native实现上去介绍Thread的一些机制,熟悉这块的大牛门也可以出来发表下高见.

 

本文仅当抛砖引玉,欢迎发言!

相关文章
|
14天前
|
Java 数据库 Android开发
【专栏】Kotlin在Android开发中的多线程优化,包括线程池、协程的使用,任务分解、避免阻塞操作以及资源管理
【4月更文挑战第27天】本文探讨了Kotlin在Android开发中的多线程优化,包括线程池、协程的使用,任务分解、避免阻塞操作以及资源管理。通过案例分析展示了网络请求、图像处理和数据库操作的优化实践。同时,文章指出并发编程的挑战,如性能评估、调试及兼容性问题,并强调了多线程优化对提升应用性能的重要性。开发者应持续学习和探索新的优化策略,以适应移动应用市场的竞争需求。
|
4月前
|
Java
JAVA JUC LockSupport 类
【1月更文挑战第5天】JAVA JUC LockSupport 类
|
5月前
|
Java
Java 中的中断
Java 中的中断
44 0
|
7月前
|
安全 Java
【JavaSE专栏76】三态和五态,线程的不同状态:新建、运行、状态、阻塞、等待、计时等待状态
【JavaSE专栏76】三态和五态,线程的不同状态:新建、运行、状态、阻塞、等待、计时等待状态
|
7月前
|
Java
如何用Java编写代码来中断一个线程??
如何用Java编写代码来中断一个线程??
28 0
|
4月前
|
存储 安全 算法
详解Java中HashMap、HashTable、ConcurrentHashMap常见问题
详解Java中HashMap、HashTable、ConcurrentHashMap常见问题
45 0
|
8月前
|
Arthas IDE Java
一种获取阻塞线程栈帧数据的思路
一种获取阻塞线程栈帧数据的思路
128 2
|
2月前
|
消息中间件 Linux 调度
【Linux 进程/线程状态 】深入理解Linux C++中的进程/线程状态:阻塞,休眠,僵死
【Linux 进程/线程状态 】深入理解Linux C++中的进程/线程状态:阻塞,休眠,僵死
79 0
|
4月前
|
Java 程序员
java常见问题(一)
java常见问题(一)
|
18天前
|
监控
写一个线程来监控各线程是否发生阻塞
写一个线程来监控各线程是否发生阻塞
21 0