Marble原理之线程中断

简介: 本文是开源Java JOB调度框架 Marble的原理介绍系列之一。需要了解Marble的请访问 https://github.com/jeff-dong/marble

本章节依赖于【Marble使用】,阅读本章节前请保证已经充分了解Marble。
中断特性从Marble-Agent 2.0.5开始支持。

线程中断使用

  1. 引入marble-agent jar包
<dependency>
            <groupId>com.github.jeff-dong</groupId>
            <artifactId>marble-agent</artifactId>
            <version>最新版</version>
</dependency>
  1. JOB执行代码适当位置添加中断标志, 下面给出示例代码
@Component("job1")
public class Job1 extends MarbleJob {
    private ClogWrapper logger = ClogWrapperFactory.getClogWrapper(Job1.class);

    @Override
    public void execute(String param) throws Exception {
        logger.info("JOB1开始执行 ...");
        int i = 0;
        while (true) {
            i++;
            //1、用中断状态码进行判断
            if (Thread.interrupted()) {
                logger.info("JOB1-[{}]-[{}]被打断啦", param, Thread.currentThread().getName());
                return;
            }
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                //2、捕获终端异常后return结束
                return;
            }
            logger.info("JOB1-[{}]-[{}]-{}-------", param, Thread.currentThread().getName(), i);
        }
    }
}
  1. Marble OFFLINE进行线程中断

3.1 手动调度线程中断

4678905_b2f772e124219867

3.2 选择要中断的服务器进行终端尝试
4678905_b192bb1edf275c81

3.3 查看中断日志(同步JOB)
4678905_5d008fef53039e9b

中断实现及原理

Java的线程中断

Java的线程中断机制是一种协作机制,线程中断并不能立即停掉线程执行,相反,可能线程永远都不会响应。
java的线程中断模型只是通过修改线程的中断标志(interrupt)进行中断通知,不会有其它额外操作,因此线程是否最终中断取决于线程的执行逻辑。因此,如果想让线程按照自己的想法中断,要代码中事先进行中断的“埋点”处理。

有人可能会想到Thread的stop方法进行中断,由于此方法可能造成不可预知的结果,已经被抛弃

Marble进行线程中断实现

需求收集
  1. 以JOB为维度进行线程中断;
  2. 尽量做到实时响应;
  3. 存在集群中多台机器,要支持指定某台机器中的线程中断;
  4. 允许多次中断尝试;
  5. 中断请求不能依赖于JOB当前状态。可能已经停止调度的JOB也要手动中断执行中的线程;
  6. 透明和扩展不同JOB的中断(提供用户中断的"后处理"扩展);
需求分析及实现

【以JOB为维度进行线程中断】

Marble的JOB标志为 schedulerName-appId-jobName组成,目前Marble每个JOB调度时间和频率都是个性化,目前调度完成就销毁。但要做到任何时间进行执行中的线程中断就要:

1.1 存储JOB的运行线程,随时准备中断;
1.2 在缓存的JOB数量/时间和性能间做权衡,不能过多也不能过少;
1.3 制定缓存已满时的抛弃策略,避免缓存被占满新的线程永远无法中断;
1.4 要同步JOB和异步JOB透明处理(感觉不出差异);

实现:
Marble的线程池中定义支持并发的MAP进行JOB维度的线程缓存,此外指定每个JOB下缓存的线程数量。如下:

public class ThreadPool {
    ...
    private Multimap<String, Object> threadMultimap = Multimaps.synchronizedMultimap(HashMultimap.<String, Object>create());
    //multimap的单个key的最大容量
    private static final int THREADMULTIMAP_SIZE = 50;
    ...
}

Marble-Agent在同步/异步JOB生成新的线程对象时进行放入MAP缓存,如果缓存(50个)已满采用如下策略进行处理:

  1. 尝试清理当前map中的非活跃线程;
  2. 尝试清理当前map中已经完成的线程(同步线程有效);
  3. 如果还未清理出空间,移除最久的线程;
public ThreadPool multimapPut(String key, Object value) {
        if (StringUtils.isNotBlank(key)) {
            Collection collection = threadMultimap.get(key);
            if (collection != null && collection.size() >= THREADMULTIMAP_SIZE) {
                //替换最久的
                Iterator<Object> it = collection.iterator();
                //首先进行 非活跃线程清理
                while (it.hasNext()) {
                    Object tempObj = it.next();
                    if(tempObj instanceof MarbleThread){
                        MarbleThread mt = (MarbleThread)tempObj;
                        //不活跃删除
                        if(!mt.isThreadAlive()){
                            it.remove();
                        }
                    }else if(tempObj instanceof MarbleThreadFeature){
                        MarbleThreadFeature mf = (MarbleThreadFeature) tempObj;
                        //完成的线程删除
                        if(mf.isDone()){
                            it.remove();
                        }
                    }
                }
                //仍然>最大值,删除最久未使用
                if(collection.size() >= THREADMULTIMAP_SIZE){
                    while (it.hasNext()) {
                        it.next();
                        it.remove();
                        break;
                    }
                }
                threadMultimap.put(key, value);
                return this;
            }
        }
        threadMultimap.put(key, value);
        return this;
    }

此外,为了能在JVM关闭时进行线程中断,添加JVM hook进行中断调用处理(包括线程池的销毁)。
除此之外,还有个小问题,由于线程池使用的是有界的阻塞队列,此种情况下,线程中断时可能有的线程存在于阻塞队列中,单纯的中断无效,对于此类情况,要首先判断阻塞队列中是否存在要中断的线程,存在的话进行队列的移除操作。

【尽量做到实时响应】
只能通过用户在具体的线程逻辑中进行埋点处理,Marble在框架层面除了及时把用户的中断请求送达之外,没有其它措施。

【存在集群中多台机器,要支持指定某台机器中的线程中断】
Marble OFFLINE的中断页面支持机器的选择,用户进行选择后,Marble会有针对性的进行机器的中断RPC发送。

【允许多次中断尝试】
OFFLINE未对中断次数进行限制,目前支持多次中断请求发送。

【中断请求不能依赖于JOB当前状态】
考虑到用户对历史线程的中断请求,Marble未把中断操作绑定在JOB状态上,任何JOB都可以进行终端尝试。

【透明扩展不同JOB的中断】
Marble目前支持同步和异步JOB,两类JOB的中断处理并不一致,比如同步job的中断是通过FeatureTask的cancel实现,异步JOB是通过Thread的interrupt实现,此外线程被中断后Marble希望能更进一步提供一个统一的“后处理”操作给用户自己实现,比如用户可能需要在线程被中断后进行一些后续的log记录等。

为了代码层面一致透明,且友好的实现“后处理”的封装,Marble使用了代理模式,在Thread和FeatureTask上添加了一层“代理类”,由代理进行具体的中断操作。
同步JOB代理类:


/**
 * @author <a href="dongjianxing@aliyun.com">jeff</a>
 * @version 2017/4/19 16:31
 */
public class MarbleThreadFeature<V> implements RunnableFuture<V> {

    private ClogWrapper logger = ClogWrapperFactory.getClogWrapper(MarbleThreadFeature.class);
    private MarbleJob marbleJob;
    private String param;
    private FutureTask<Result> futureTask;


    public MarbleThreadFeature(final MarbleJob marbleJob, final String param) {
        super();
        this.marbleJob = marbleJob;
        this.param = param;
        futureTask = new FutureTask<>(new Callable<Result>() {
            @Override
            public Result call() throws Exception {
                return marbleJob.executeSync(param);
            }
        });
    }


    @Override
    public void run() {
        futureTask.run();
    }

    @Override
    public boolean cancel(boolean mayInterruptIfRunning) {
        return futureTask.cancel(mayInterruptIfRunning);
    }

    @Override
    public boolean isCancelled() {
        return futureTask.isCancelled();
    }

    @Override
    public boolean isDone() {
        return futureTask.isDone();
    }

    @Override
    public V get() throws InterruptedException, ExecutionException {
        return (V) futureTask.get();
    }

    @Override
    public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        return (V) futureTask.get(timeout, unit);
    }

    public void stop(String operator) {
        if (futureTask != null && !futureTask.isCancelled()) {
            logger.info("Thread-feature[{}] is interrupted", futureTask.getClass().getName());
            futureTask.cancel(true);
        }else if(marbleJob != null){
            boolean removeResult = ((ThreadPoolExecutor) ThreadPool.getFixedInstance().getExecutorService()).getQueue().remove(marbleJob);
            logger.info("Hanging MarbleJob[{}] is removed from the queue success?{}", marbleJob.getClass().getSimpleName(),removeResult);
        }
        //中断后处理
        if(marbleJob != null){
            marbleJob.afterInterruptTreatment();
        }
    }

}

异步JOB代理类:


/**
 * @author <a href="dongjianxing@aliyun.com">jeff</a>
 * @version 2017/4/19 16:31
 */
public class MarbleThread implements Runnable {

    private ClogWrapper logger = ClogWrapperFactory.getClogWrapper(MarbleThread.class);
    private MarbleJob marbleJob;
    private String param;
    private Thread runThread;


    public MarbleThread(MarbleJob marbleJob, String param) {
        super();
        this.marbleJob = marbleJob;
        this.param = param;
    }

    @Override
    public void run() {
        runThread = Thread.currentThread();
        try {
            marbleJob.execute(param);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public boolean isThreadAlive() {
        return (runThread != null && runThread.isAlive());
    }

    public String getThreadName() {
        return runThread != null ? runThread.getName() : "";
    }

    public void stop() {
        //首先尝试在阻塞队列中删除
        boolean removeResult = ((ThreadPoolExecutor) ThreadPool.getFixedInstance().getExecutorService()).getQueue().remove(this);
        logger.info("Hanging MarbleJob[{}] is removed from the queue success?{}", this.getClass().getSimpleName(), removeResult);
        if (runThread != null && !runThread.isInterrupted()) {
            logger.info("Thread[{}] is interrupted", runThread.getName());
            runThread.interrupt();
        }
        //中断后处理
        if (marbleJob != null) {
            marbleJob.afterInterruptTreatment();
        }
    }
}
目录
相关文章
|
3月前
|
安全 Java 数据库
一天十道Java面试题----第四天(线程池复用的原理------>spring事务的实现方式原理以及隔离级别)
这篇文章是关于Java面试题的笔记,涵盖了线程池复用原理、Spring框架基础、AOP和IOC概念、Bean生命周期和作用域、单例Bean的线程安全性、Spring中使用的设计模式、以及Spring事务的实现方式和隔离级别等知识点。
|
3月前
|
编解码 网络协议 API
Netty运行原理问题之Netty的主次Reactor多线程模型工作的问题如何解决
Netty运行原理问题之Netty的主次Reactor多线程模型工作的问题如何解决
|
2月前
|
存储 缓存 Java
什么是线程池?从底层源码入手,深度解析线程池的工作原理
本文从底层源码入手,深度解析ThreadPoolExecutor底层源码,包括其核心字段、内部类和重要方法,另外对Executors工具类下的四种自带线程池源码进行解释。 阅读本文后,可以对线程池的工作原理、七大参数、生命周期、拒绝策略等内容拥有更深入的认识。
140 29
什么是线程池?从底层源码入手,深度解析线程池的工作原理
|
1月前
|
Java 编译器 程序员
【多线程】synchronized原理
【多线程】synchronized原理
59 0
|
1月前
|
Java 应用服务中间件 API
nginx线程池原理
nginx线程池原理
34 0
|
2月前
|
存储 缓存 Java
JAVA并发编程系列(11)线程池底层原理架构剖析
本文详细解析了Java线程池的核心参数及其意义,包括核心线程数量(corePoolSize)、最大线程数量(maximumPoolSize)、线程空闲时间(keepAliveTime)、任务存储队列(workQueue)、线程工厂(threadFactory)及拒绝策略(handler)。此外,还介绍了四种常见的线程池:可缓存线程池(newCachedThreadPool)、定时调度线程池(newScheduledThreadPool)、单线程池(newSingleThreadExecutor)及固定长度线程池(newFixedThreadPool)。
|
3月前
|
存储 NoSQL Java
线程池的原理与C语言实现
【8月更文挑战第22天】线程池是一种多线程处理框架,通过复用预创建的线程来高效地处理大量短暂或临时任务,提升程序性能。它主要包括三部分:线程管理器、工作队列和线程。线程管理器负责创建与管理线程;工作队列存储待处理任务;线程则执行任务。当提交新任务时,线程管理器将其加入队列,并由空闲线程处理。使用线程池能减少线程创建与销毁的开销,提高响应速度,并能有效控制并发线程数量,避免资源竞争。这里还提供了一个简单的 C 语言实现示例。
|
3月前
|
存储 Java
线程池的底层工作原理是什么?
【8月更文挑战第8天】线程池的底层工作原理是什么?
117 8
|
2月前
|
安全 Java API
Java线程池原理与锁机制分析
综上所述,Java线程池和锁机制是并发编程中极其重要的两个部分。线程池主要用于管理线程的生命周期和执行并发任务,而锁机制则用于保障线程安全和防止数据的并发错误。它们深入地结合在一起,成为Java高效并发编程实践中的关键要素。
32 0
|
3月前
|
存储 Java 调度
深入浅出Java线程池原理
本文深入分析了Java线程池的原理和实现,帮助读者更好地理解Java并发编程中线程池的创建、工作流程和性能优化。