本文的小例子主要是说明如何安全中断阻塞队列中的任务,避免使用interrupt()中断线程,造成堵塞队列中没有被消费的任务都被忽略。
package com.thread; import java.io.PrintWriter; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; /** * 单个线程的安全取消 * * 这里模拟的场景是单个线程,处理阻塞队列中要处理的日志信息 * * 分析:由于使用了阻塞队列,而阻塞队列的take操作响应中断;如果日志线程被修改为捕获InterruptedException就退出,那么 * 中断日志就能够停止了。 * 但是,这样使用Interrupt来中断线程,突然的关闭会忽略等待中需要被记录的日志。 * * 解决方案:添加“已请求关闭”标志,和队列中要处理的日志的数量。 * 当发起Interrupt来中断线程时,并不会直接中断线程,只是改变“已请求关闭”标志为ture, * 只有当标志为ture并且队列中已请求进来的要处理的日志都处理完了才会中断线程。 * * @author hadoop * */ public class SafeCancellThread { class LogService{ private final BlockingQueue<String> queue = new ArrayBlockingQueue<String>(50); private final LoggerThread loggerThread = new LoggerThread(); private PrintWriter writer ; private boolean isShutdown; private int reservations; public LogService(PrintWriter writer) { this.writer = writer; } public void start(){ loggerThread.start(); } public void stop(){ synchronized (this) { isShutdown = true; } loggerThread.interrupt(); } public void log(String msg) throws InterruptedException{ synchronized (this) { if(isShutdown){ throw new IllegalStateException(); } ++reservations; } System.out.println("向队列中添加:"+msg); queue.put(msg);//向阻塞队列中添加任务 } class LoggerThread extends Thread{ public void run(){ try { while(true){ try { synchronized(LogService.this){ if(isShutdown && reservations == 0){ System.out.println("队列中的任务处理完成"); break; } } //消费阻塞队列 String msg = queue.take(); synchronized(LogService.this){ --reservations; } Thread.currentThread().sleep(100); System.out.println("处理"+msg); writer.println(msg); } catch (InterruptedException e) { System.out.println("接到中断请求,重试知道队列中的任务消费完"); } } } finally{ writer.close(); } } } } public static void main(String[] args) throws Exception { SafeCancellThread sct = new SafeCancellThread(); PrintWriter writer = new PrintWriter("C:\\Users\\Administrator\\Desktop\\input\\temp.txt"); writer.write("开始"); final LogService logService = sct.new LogService(writer); new Thread(new Runnable() { @Override public void run() { int incre = 1; while(!logService.isShutdown){//如果服务停止,就不继续生产日志任务了。默认为false try { Thread.sleep(2*100);//每200ms产生一条日志写入任务 logService.log("日志"+incre);//添加日志时会出现堵塞 incre++; } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } } }).start(); Thread.sleep(6*1000); System.out.println("开始处理任务"); logService.start();//开始处理 //等待4s后执行取消任务的请求 Thread.sleep(4*1000); System.out.println("发起取消请求"); logService.stop(); } }
执行结果:
可以发现,调用了中断请求,停止了向阻塞队列中继续加入任务,并且没有立刻停止线程,而是继续将已添加到队列中的任务执行完,才停止线程。避免了加入阻塞队列中的任务由于线程中断而被忽略。
还可以使用基于服务的消息中断,管理消息的生命周期
package com.thread; import java.io.FileNotFoundException; import java.io.PrintWriter; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; /** * 使用Executor的日志服务 * @author hadoop * */ public class LogService { private final ExecutorService exec = Executors.newSingleThreadExecutor(); private static final int TIMEOUT = 5; private static final TimeUnit UNIT = TimeUnit.SECONDS; private final PrintWriter writer ; public void start(){ } public LogService(PrintWriter writer) { this.writer = writer; } public void stop() throws InterruptedException{ try { System.out.println("停止"); exec.shutdown();//会等待已提交到线程池中的任务执行完毕后,在中断线程 exec.awaitTermination(TIMEOUT, UNIT);// }finally{ writer.close(); } } class WriteTask implements Runnable{ private String msg; public WriteTask(String msg) { this.msg = msg; } @Override public void run() { try { Thread.sleep(1000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println("处理"+msg); writer.write(msg); } } public void log(String msg){ try { exec.execute(new WriteTask(msg)); } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) throws FileNotFoundException { PrintWriter writer = new PrintWriter("C:\\Users\\Administrator\\Desktop\\input\\temp.txt"); LogService logService = new LogService(writer); for (int i = 0; i < 100; i++) { System.out.println("日志" + i); try { Thread.sleep(300); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } logService.log("日志"+i); } try { logService.stop(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); System.out.println("线程任务被中断"); } } }