下面介绍取消线程常用的4中方式:
一、通过设置“cancelled requested”标志来中断线程
java中的任务取消实现:
是通过一个协作机制完成的,使用一个线程能够要求另一个线程停止当前的工作。
这样做的原因是:
如果直接要求一个任务、线程或者服务立即停止,可能会导致共享的数据结构处于不一致的状态。
当要求他们停止时,他们首先会清除当前进程中的工作,然后再终止。提供了更好的灵活性。因为
任务代码本身比发出取消请求的代码更明确应该清除什么。
典型的协作机制:
设置“cancelled requested”标志,任务会定期查看;
如果发现标志被设置过,任务就会提前结束。
一般为了让这个标志变量的状态更可靠一点,cancelled最好是volatile类型的。
取消策略:
说明关于取消的“how”,"when","what"————代码如何请求取消任务,任务在什么
时候检查取消的请求是否到达,响应取消请求的任务中应有的行为。
public class CancelThread { /** * 生成素数任务 * @author hadoop * */ class PrimeGenerator implements Runnable{ private final List<BigInteger> primes = new ArrayList<BigInteger>(); private volatile boolean cancelled; @Override public void run() { BigInteger p = BigInteger.ONE; while(!cancelled){ //得到下一个素数 p = p.nextProbablePrime(); synchronized (this) { primes.add(p); } } } public void cancel(){ cancelled = true; } public synchronized List<BigInteger> get(){ return new ArrayList<BigInteger>(primes); } } /** * 生成素数的程序运行1s钟后,取消任务 * @return * @throws InterruptedException */ List<BigInteger> aSecondOfPrimes() throws InterruptedException{ PrimeGenerator generator = new PrimeGenerator(); new Thread(generator).start(); try { TimeUnit.SECONDS.sleep(1); }finally{ generator.cancel(); } return generator.get(); } public static void main(String[] args) throws InterruptedException { CancelThread cancelThread = new CancelThread(); List<BigInteger> bList = cancelThread.aSecondOfPrimes(); for (BigInteger bigInteger : bList) { System.out.println(bigInteger); } } }
二、通过线程的interrupt()方法来中断线程
中断通常是实现取消最明智的选择
中断线程
中断:本身并不会真正中断一个正在运行的线程,仅仅发出中断请求,线程自己会在下一个方便的时刻中断
(这些时刻被称为取消点,cancellation point)
有些方法如wait,sleep和join方法,当接收一个中断请求时,会抛出一个异常,或者进入时中断状态就已经被设置了。
可阻塞的库函数,通过抛出InterruptedException作为中断的响应
常用的取消机制(设置“cancelled requested”标志)不能与可阻塞的库函数进行良好的互动。阻塞会导致中断无法立刻响应,或干脆无法响应。
如果任务代码响应中断,那么可以使用中断作为你的取消机制,而不是boolean标志来请求取消。
示例中进行了两个点的中断检查:
1.调用阻塞的put方法时
2.循环开始出显示地采集中断状态。
中断策略:
决定线程如何应对中断请求————当发现中断请求时,它会做什么,哪些工作单元对应中断来说是原子操作,
以及在多快的时间里响应中断。
线程应该只能被线程的所有者中断;所有者可以把线程的中断策略信息封装到一个合适的取消机制中,比如关闭(shutdown)方法
public class InterruptThread { /** * 素数生产者,向阻塞队列中放入内容 * @author hadoop * */ class BrokenPrimePreducer extends Thread{ private final BlockingQueue<BigInteger> queue; private volatile boolean cancelled = false; BrokenPrimePreducer(BlockingQueue<BigInteger> queue){ this.queue = queue; } public void run(){ try { BigInteger p = BigInteger.ONE; while(!Thread.currentThread().isInterrupted()){//第一出检查中断 //第二次检查中断,put操作中 notFull.await();在等待操作中会检查线程是否中断 queue.put(p = p.nextProbablePrime()); } } catch (InterruptedException e) { System.out.println("进行中断的逻辑处理"); } } public void cancel(){ this.interrupt(); } } void consumePrimes() throws InterruptedException{ //阻塞队列中只能存放5个对象 BlockingQueue<BigInteger> primes = new ArrayBlockingQueue<BigInteger>(5); BrokenPrimePreducer producer = new BrokenPrimePreducer(primes); long start = System.currentTimeMillis(); //开始执行线程,向队列中存放内容 producer.start(); long useTime = 0; try { while(needMorePrimes(useTime)){//是否需要更多的素数对象 Thread.sleep(40); //这里取消消费者,让堵塞队列始终处于调用put的堵塞状态,这样就始终不会再对cancelled取消标志进行判断了,所以通过取消标志无法取消任务 //primes.take(); useTime = System.currentTimeMillis() - start; } }finally{ producer.cancel(); for (BigInteger bigInteger : primes) { System.out.println(bigInteger); } } } /** * 判断是否需要更多的素数 * 方法开始5s后,不再需要素数 * @return */ boolean needMorePrimes(long useTime){ return useTime < 5000 ?true :false; } public static void main(String[] args) throws InterruptedException { InterruptThread it = new InterruptThread(); it.consumePrimes(); } }
三、通过future取消任务
单个任务的取消还能通过future来实现
future.cancel(boolean mayInterruptIfRunning)
参数mayInterruptIfRunning:
@param mayInterruptIfRunning <tt>true</tt> if the thread executing this * task should be interrupted; otherwise, in-progress tasks are allowed * to complete
如果为ture,就让执行中的任务立刻中断;如果为false,执行中任务会被执行完毕。
package com.thread; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; /** * 通过Future实现取消线程 * @author hadoop * */ public class FutureCanceled { //固定大小的线程池,同时只能接受5个任务 static ExecutorService mExecutor = Executors.newFixedThreadPool(5); public static void timedRun(Runnable r,long timeout,TimeUnit unit) throws Throwable { Future<?> task = mExecutor.submit(r); try { task.get(timeout, unit); } catch (TimeoutException e) { System.out.println("处理超时异常"); } catch (ExecutionException e) { System.out.println("处理执行异常"); throw new Throwable(e.getCause()); } finally{ task.cancel(true); } } }
四、通过停止基于线程的服务取消任务
应用程序通常会创建拥有线程的服务,比如线程池。线程的拥有者就是创建线程的类。
所以线程池拥有它的工作者线程。如果需要中断这些线程,那么应该由线程池来负责。
应用程序拥有服务,服务拥有工作者线程,而应用程序并不拥有工作者线程。因此应用程序不应该视图直接停止工作者线程。而是通过服务来管理线程。
ExecutorService提供了shutdown和shutdownNow方法来关闭线程。
package com.thread; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * ShutDown关闭线程池测试 * @author hadoop * */ public class ShutDownThreadPool { //固定大小的线程池,同时只能接受5个任务 static ExecutorService mExecutor = Executors.newFixedThreadPool(5); /* * 采用线程池开启多个子线程,主线程等待所有的子线程执行完毕 */ public static void moreThread() { try { int threadNum = 0; /** * 尽管线程池只能同时开启5个线程处理任务,但是10个任务已经放入处理队列中,尽管调用了shutdown方法,已放入的10个任务仍然会执行完成。 * 但是调用了shutdown方法后,就不能再向线程池中添加新任务来了,否则会抛RejectedExecutionException异常 */ for (int i = 0; i < 10; i++) { threadNum++; final int currentThreadNum = threadNum; mExecutor.execute(new Runnable() { @Override public void run() { try { System.out.println("子线程[" + currentThreadNum + "]开启"); Thread.sleep(1000*10); } catch (InterruptedException e) { e.printStackTrace(); }finally{ System.out.println("子线程[" + currentThreadNum + "]结束"); } } }); } System.out.println("已经开启所有的子线程"); mExecutor.shutdown(); System.out.println("shutdown():启动一次顺序关闭,执行以前提交的任务,但不接受新任务。"); int threadNum2 = 0; for (int i = 20; i < 30; i++) { threadNum2++; final int currentThreadNum = threadNum2; mExecutor.execute(new Runnable() { @Override public void run() { try { System.out.println("尝试再次添加任务:子线程[" + currentThreadNum + "]开启"); Thread.sleep(1000*10); } catch (InterruptedException e) { e.printStackTrace(); }finally{ System.out.println("子线程[" + currentThreadNum + "]结束"); } } }); } /** * 只有执行了shutdown方法,执行isTerminated才有效。否则isTerminated一直为ture */ while(true){ if(mExecutor.isTerminated()){ System.out.println("所有的子线程都结束了!"); break; } Thread.sleep(1000); } } catch (InterruptedException e) { e.printStackTrace(); }finally{ System.out.println("主线程结束"); } } public static void main(String[] args) { moreThread(); } }