阻塞队列中的安全中断

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: 阻塞队列中的安全中断

本文的小例子主要是说明如何安全中断阻塞队列中的任务,避免使用interrupt()中断线程,造成堵塞队列中没有被消费的任务都被忽略。

package com.thread;
import java.io.PrintWriter;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
/**
 * 单个线程的安全取消
 * 
 * 这里模拟的场景是单个线程,处理阻塞队列中要处理的日志信息
 * 
 * 分析:由于使用了阻塞队列,而阻塞队列的take操作响应中断;如果日志线程被修改为捕获InterruptedException就退出,那么
 * 中断日志就能够停止了。
 * 但是,这样使用Interrupt来中断线程,突然的关闭会忽略等待中需要被记录的日志。
 * 
 * 解决方案:添加“已请求关闭”标志,和队列中要处理的日志的数量。
 * 当发起Interrupt来中断线程时,并不会直接中断线程,只是改变“已请求关闭”标志为ture,
 * 只有当标志为ture并且队列中已请求进来的要处理的日志都处理完了才会中断线程。
 * 
 * @author hadoop
 *
 */
public class SafeCancellThread {
       class  LogService{
        private final BlockingQueue<String>  queue = new ArrayBlockingQueue<String>(50);
        private final LoggerThread loggerThread = new LoggerThread();
        private  PrintWriter writer ; 
        private boolean isShutdown;
        private int reservations;
     public LogService(PrintWriter writer) {
       this.writer = writer;
     }
  public void start(){
         loggerThread.start();
        }
  public void stop(){
     synchronized (this) {
     isShutdown = true;
    }
     loggerThread.interrupt();
     }
        public void log(String msg) throws InterruptedException{
         synchronized (this) {
      if(isShutdown){
        throw new IllegalStateException();
      }
      ++reservations;
      }
         System.out.println("向队列中添加:"+msg);
         queue.put(msg);//向阻塞队列中添加任务
        }
        class LoggerThread extends Thread{
         public void run(){
        try {
          while(true){
           try {
            synchronized(LogService.this){
            if(isShutdown && reservations == 0){
              System.out.println("队列中的任务处理完成");
              break;
            }
            }
            //消费阻塞队列
            String msg = queue.take();
            synchronized(LogService.this){
             --reservations;
            }
            Thread.currentThread().sleep(100);
            System.out.println("处理"+msg);
            writer.println(msg);
          } catch (InterruptedException e) {
            System.out.println("接到中断请求,重试知道队列中的任务消费完");
          }
          }
        } finally{
          writer.close();
        }
         }
        } 
       }
  public static void main(String[] args) throws Exception {
   SafeCancellThread sct = new SafeCancellThread();
   PrintWriter writer = new PrintWriter("C:\\Users\\Administrator\\Desktop\\input\\temp.txt");
   writer.write("开始");
   final LogService logService = sct.new LogService(writer);
   new Thread(new Runnable() {
    @Override
    public void run() {
           int incre = 1;
      while(!logService.isShutdown){//如果服务停止,就不继续生产日志任务了。默认为false
        try {
        Thread.sleep(2*100);//每200ms产生一条日志写入任务
        logService.log("日志"+incre);//添加日志时会出现堵塞
        incre++;
        } catch (Exception e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
        }
      }
    }
  }).start();
   Thread.sleep(6*1000);
   System.out.println("开始处理任务");
   logService.start();//开始处理
   //等待4s后执行取消任务的请求
   Thread.sleep(4*1000);
   System.out.println("发起取消请求");
   logService.stop();
  }
}


执行结果:


可以发现,调用了中断请求,停止了向阻塞队列中继续加入任务,并且没有立刻停止线程,而是继续将已添加到队列中的任务执行完,才停止线程。避免了加入阻塞队列中的任务由于线程中断而被忽略。

1.png


还可以使用基于服务的消息中断,管理消息的生命周期

package com.thread;
import java.io.FileNotFoundException;
import java.io.PrintWriter;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
 * 使用Executor的日志服务
 * @author hadoop
 *
 */
public class LogService {
    private final ExecutorService exec = Executors.newSingleThreadExecutor();
    private static final int TIMEOUT = 5;
    private static final TimeUnit UNIT = TimeUnit.SECONDS;
    private final PrintWriter writer ;  
    public void start(){
    }
    public LogService(PrintWriter writer) {
  this.writer = writer;
  }
  public void stop() throws InterruptedException{
      try {
      System.out.println("停止");
      exec.shutdown();//会等待已提交到线程池中的任务执行完毕后,在中断线程
          exec.awaitTermination(TIMEOUT, UNIT);//
  }finally{
    writer.close();
  }
    }
  class WriteTask implements Runnable{
     private String msg;
  public WriteTask(String msg) {
    this.msg = msg;
  }
  @Override
  public void run() {
    try {
    Thread.sleep(1000);
    } catch (InterruptedException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
    }
    System.out.println("处理"+msg);
    writer.write(msg);
  }
  }
    public void log(String msg){
      try {
       exec.execute(new WriteTask(msg));
  } catch (Exception e) {
    e.printStackTrace();
  }
    }
    public static void main(String[] args) throws FileNotFoundException {
      PrintWriter writer = new PrintWriter("C:\\Users\\Administrator\\Desktop\\input\\temp.txt");
      LogService logService = new LogService(writer);
       for (int i = 0; i < 100; i++) {
        System.out.println("日志" + i);
        try {
    Thread.sleep(300);
    } catch (InterruptedException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
    }
        logService.log("日志"+i);
   }
       try {
    logService.stop();
  } catch (InterruptedException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
    System.out.println("线程任务被中断");
  }
  }
}
相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
目录
相关文章
|
8月前
|
Java API 调度
线程的中断(interrupt)机制
线程的中断(interrupt)机制
116 1
|
Java 程序员
JUC之线程中断机制解读(interrupt)
JUC之线程中断机制解读(interrupt)
线程发生阻塞,怎么唤醒线程?
线程发生阻塞,怎么唤醒线程?
266 0
|
算法 Java 调度
线程的挂起和唤醒
线程的挂起和唤醒
休眠与唤醒
休眠与唤醒
112 0
|
存储 网络协议 API
4-FreeRTOS队列、互斥、信号量
4-FreeRTOS队列、互斥、信号量
|
监控 Java
一文了解JAVA线程的中断(Interrupt)机制
一文了解JAVA线程的中断(Interrupt)机制
1111 0
一文了解JAVA线程的中断(Interrupt)机制
线程的阻塞
相信像我一样的初学者遇到线程阻塞时会感到无助,那我就把自己对于线程阻塞的理解记录下来吧!
181 0
线程的中断
线程的中断主要涉及三个api interrupt(); isInterrupted(); Thread.interrupted();
我们该如何正确的中断一个正在执行的线程??
当我们在调用Java对象的wait()方法或者线程的sleep()方法时,需要捕获并处理InterruptedException异常。如果我们对InterruptedException异常处理不当,则会发生我们意想不到的后果!今天,我们就以一个案例的形式,来为大家详细介绍下为何中断执行的线程不起作用。
171 0