Marble原理之线程中断

简介: 本文是开源Java JOB调度框架 Marble的原理介绍系列之一。需要了解Marble的请访问 https://github.com/jeff-dong/marble

本章节依赖于【Marble使用】,阅读本章节前请保证已经充分了解Marble。
中断特性从Marble-Agent 2.0.5开始支持。

线程中断使用

  1. 引入marble-agent jar包
<dependency>
            <groupId>com.github.jeff-dong</groupId>
            <artifactId>marble-agent</artifactId>
            <version>最新版</version>
</dependency>
  1. JOB执行代码适当位置添加中断标志, 下面给出示例代码
@Component("job1")
public class Job1 extends MarbleJob {
    private ClogWrapper logger = ClogWrapperFactory.getClogWrapper(Job1.class);

    @Override
    public void execute(String param) throws Exception {
        logger.info("JOB1开始执行 ...");
        int i = 0;
        while (true) {
            i++;
            //1、用中断状态码进行判断
            if (Thread.interrupted()) {
                logger.info("JOB1-[{}]-[{}]被打断啦", param, Thread.currentThread().getName());
                return;
            }
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                //2、捕获终端异常后return结束
                return;
            }
            logger.info("JOB1-[{}]-[{}]-{}-------", param, Thread.currentThread().getName(), i);
        }
    }
}
  1. Marble OFFLINE进行线程中断

3.1 手动调度线程中断

4678905_b2f772e124219867

3.2 选择要中断的服务器进行终端尝试
4678905_b192bb1edf275c81

3.3 查看中断日志(同步JOB)
4678905_5d008fef53039e9b

中断实现及原理

Java的线程中断

Java的线程中断机制是一种协作机制,线程中断并不能立即停掉线程执行,相反,可能线程永远都不会响应。
java的线程中断模型只是通过修改线程的中断标志(interrupt)进行中断通知,不会有其它额外操作,因此线程是否最终中断取决于线程的执行逻辑。因此,如果想让线程按照自己的想法中断,要代码中事先进行中断的“埋点”处理。

有人可能会想到Thread的stop方法进行中断,由于此方法可能造成不可预知的结果,已经被抛弃

Marble进行线程中断实现

需求收集
  1. 以JOB为维度进行线程中断;
  2. 尽量做到实时响应;
  3. 存在集群中多台机器,要支持指定某台机器中的线程中断;
  4. 允许多次中断尝试;
  5. 中断请求不能依赖于JOB当前状态。可能已经停止调度的JOB也要手动中断执行中的线程;
  6. 透明和扩展不同JOB的中断(提供用户中断的"后处理"扩展);
需求分析及实现

【以JOB为维度进行线程中断】

Marble的JOB标志为 schedulerName-appId-jobName组成,目前Marble每个JOB调度时间和频率都是个性化,目前调度完成就销毁。但要做到任何时间进行执行中的线程中断就要:

1.1 存储JOB的运行线程,随时准备中断;
1.2 在缓存的JOB数量/时间和性能间做权衡,不能过多也不能过少;
1.3 制定缓存已满时的抛弃策略,避免缓存被占满新的线程永远无法中断;
1.4 要同步JOB和异步JOB透明处理(感觉不出差异);

实现:
Marble的线程池中定义支持并发的MAP进行JOB维度的线程缓存,此外指定每个JOB下缓存的线程数量。如下:

public class ThreadPool {
    ...
    private Multimap<String, Object> threadMultimap = Multimaps.synchronizedMultimap(HashMultimap.<String, Object>create());
    //multimap的单个key的最大容量
    private static final int THREADMULTIMAP_SIZE = 50;
    ...
}

Marble-Agent在同步/异步JOB生成新的线程对象时进行放入MAP缓存,如果缓存(50个)已满采用如下策略进行处理:

  1. 尝试清理当前map中的非活跃线程;
  2. 尝试清理当前map中已经完成的线程(同步线程有效);
  3. 如果还未清理出空间,移除最久的线程;
public ThreadPool multimapPut(String key, Object value) {
        if (StringUtils.isNotBlank(key)) {
            Collection collection = threadMultimap.get(key);
            if (collection != null && collection.size() >= THREADMULTIMAP_SIZE) {
                //替换最久的
                Iterator<Object> it = collection.iterator();
                //首先进行 非活跃线程清理
                while (it.hasNext()) {
                    Object tempObj = it.next();
                    if(tempObj instanceof MarbleThread){
                        MarbleThread mt = (MarbleThread)tempObj;
                        //不活跃删除
                        if(!mt.isThreadAlive()){
                            it.remove();
                        }
                    }else if(tempObj instanceof MarbleThreadFeature){
                        MarbleThreadFeature mf = (MarbleThreadFeature) tempObj;
                        //完成的线程删除
                        if(mf.isDone()){
                            it.remove();
                        }
                    }
                }
                //仍然>最大值,删除最久未使用
                if(collection.size() >= THREADMULTIMAP_SIZE){
                    while (it.hasNext()) {
                        it.next();
                        it.remove();
                        break;
                    }
                }
                threadMultimap.put(key, value);
                return this;
            }
        }
        threadMultimap.put(key, value);
        return this;
    }

此外,为了能在JVM关闭时进行线程中断,添加JVM hook进行中断调用处理(包括线程池的销毁)。
除此之外,还有个小问题,由于线程池使用的是有界的阻塞队列,此种情况下,线程中断时可能有的线程存在于阻塞队列中,单纯的中断无效,对于此类情况,要首先判断阻塞队列中是否存在要中断的线程,存在的话进行队列的移除操作。

【尽量做到实时响应】
只能通过用户在具体的线程逻辑中进行埋点处理,Marble在框架层面除了及时把用户的中断请求送达之外,没有其它措施。

【存在集群中多台机器,要支持指定某台机器中的线程中断】
Marble OFFLINE的中断页面支持机器的选择,用户进行选择后,Marble会有针对性的进行机器的中断RPC发送。

【允许多次中断尝试】
OFFLINE未对中断次数进行限制,目前支持多次中断请求发送。

【中断请求不能依赖于JOB当前状态】
考虑到用户对历史线程的中断请求,Marble未把中断操作绑定在JOB状态上,任何JOB都可以进行终端尝试。

【透明扩展不同JOB的中断】
Marble目前支持同步和异步JOB,两类JOB的中断处理并不一致,比如同步job的中断是通过FeatureTask的cancel实现,异步JOB是通过Thread的interrupt实现,此外线程被中断后Marble希望能更进一步提供一个统一的“后处理”操作给用户自己实现,比如用户可能需要在线程被中断后进行一些后续的log记录等。

为了代码层面一致透明,且友好的实现“后处理”的封装,Marble使用了代理模式,在Thread和FeatureTask上添加了一层“代理类”,由代理进行具体的中断操作。
同步JOB代理类:


/**
 * @author <a href="dongjianxing@aliyun.com">jeff</a>
 * @version 2017/4/19 16:31
 */
public class MarbleThreadFeature<V> implements RunnableFuture<V> {

    private ClogWrapper logger = ClogWrapperFactory.getClogWrapper(MarbleThreadFeature.class);
    private MarbleJob marbleJob;
    private String param;
    private FutureTask<Result> futureTask;


    public MarbleThreadFeature(final MarbleJob marbleJob, final String param) {
        super();
        this.marbleJob = marbleJob;
        this.param = param;
        futureTask = new FutureTask<>(new Callable<Result>() {
            @Override
            public Result call() throws Exception {
                return marbleJob.executeSync(param);
            }
        });
    }


    @Override
    public void run() {
        futureTask.run();
    }

    @Override
    public boolean cancel(boolean mayInterruptIfRunning) {
        return futureTask.cancel(mayInterruptIfRunning);
    }

    @Override
    public boolean isCancelled() {
        return futureTask.isCancelled();
    }

    @Override
    public boolean isDone() {
        return futureTask.isDone();
    }

    @Override
    public V get() throws InterruptedException, ExecutionException {
        return (V) futureTask.get();
    }

    @Override
    public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        return (V) futureTask.get(timeout, unit);
    }

    public void stop(String operator) {
        if (futureTask != null && !futureTask.isCancelled()) {
            logger.info("Thread-feature[{}] is interrupted", futureTask.getClass().getName());
            futureTask.cancel(true);
        }else if(marbleJob != null){
            boolean removeResult = ((ThreadPoolExecutor) ThreadPool.getFixedInstance().getExecutorService()).getQueue().remove(marbleJob);
            logger.info("Hanging MarbleJob[{}] is removed from the queue success?{}", marbleJob.getClass().getSimpleName(),removeResult);
        }
        //中断后处理
        if(marbleJob != null){
            marbleJob.afterInterruptTreatment();
        }
    }

}

异步JOB代理类:


/**
 * @author <a href="dongjianxing@aliyun.com">jeff</a>
 * @version 2017/4/19 16:31
 */
public class MarbleThread implements Runnable {

    private ClogWrapper logger = ClogWrapperFactory.getClogWrapper(MarbleThread.class);
    private MarbleJob marbleJob;
    private String param;
    private Thread runThread;


    public MarbleThread(MarbleJob marbleJob, String param) {
        super();
        this.marbleJob = marbleJob;
        this.param = param;
    }

    @Override
    public void run() {
        runThread = Thread.currentThread();
        try {
            marbleJob.execute(param);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public boolean isThreadAlive() {
        return (runThread != null && runThread.isAlive());
    }

    public String getThreadName() {
        return runThread != null ? runThread.getName() : "";
    }

    public void stop() {
        //首先尝试在阻塞队列中删除
        boolean removeResult = ((ThreadPoolExecutor) ThreadPool.getFixedInstance().getExecutorService()).getQueue().remove(this);
        logger.info("Hanging MarbleJob[{}] is removed from the queue success?{}", this.getClass().getSimpleName(), removeResult);
        if (runThread != null && !runThread.isInterrupted()) {
            logger.info("Thread[{}] is interrupted", runThread.getName());
            runThread.interrupt();
        }
        //中断后处理
        if (marbleJob != null) {
            marbleJob.afterInterruptTreatment();
        }
    }
}
目录
相关文章
|
2月前
|
安全 Java 数据库
一天十道Java面试题----第四天(线程池复用的原理------>spring事务的实现方式原理以及隔离级别)
这篇文章是关于Java面试题的笔记,涵盖了线程池复用原理、Spring框架基础、AOP和IOC概念、Bean生命周期和作用域、单例Bean的线程安全性、Spring中使用的设计模式、以及Spring事务的实现方式和隔离级别等知识点。
|
2月前
|
编解码 网络协议 API
Netty运行原理问题之Netty的主次Reactor多线程模型工作的问题如何解决
Netty运行原理问题之Netty的主次Reactor多线程模型工作的问题如何解决
|
19天前
|
存储 缓存 Java
什么是线程池?从底层源码入手,深度解析线程池的工作原理
本文从底层源码入手,深度解析ThreadPoolExecutor底层源码,包括其核心字段、内部类和重要方法,另外对Executors工具类下的四种自带线程池源码进行解释。 阅读本文后,可以对线程池的工作原理、七大参数、生命周期、拒绝策略等内容拥有更深入的认识。
什么是线程池?从底层源码入手,深度解析线程池的工作原理
|
9天前
|
存储 缓存 Java
JAVA并发编程系列(11)线程池底层原理架构剖析
本文详细解析了Java线程池的核心参数及其意义,包括核心线程数量(corePoolSize)、最大线程数量(maximumPoolSize)、线程空闲时间(keepAliveTime)、任务存储队列(workQueue)、线程工厂(threadFactory)及拒绝策略(handler)。此外,还介绍了四种常见的线程池:可缓存线程池(newCachedThreadPool)、定时调度线程池(newScheduledThreadPool)、单线程池(newSingleThreadExecutor)及固定长度线程池(newFixedThreadPool)。
|
2月前
|
存储 NoSQL Java
线程池的原理与C语言实现
【8月更文挑战第22天】线程池是一种多线程处理框架,通过复用预创建的线程来高效地处理大量短暂或临时任务,提升程序性能。它主要包括三部分:线程管理器、工作队列和线程。线程管理器负责创建与管理线程;工作队列存储待处理任务;线程则执行任务。当提交新任务时,线程管理器将其加入队列,并由空闲线程处理。使用线程池能减少线程创建与销毁的开销,提高响应速度,并能有效控制并发线程数量,避免资源竞争。这里还提供了一个简单的 C 语言实现示例。
|
2月前
|
存储 Java
线程池的底层工作原理是什么?
【8月更文挑战第8天】线程池的底层工作原理是什么?
86 8
|
1月前
|
安全 Java API
Java线程池原理与锁机制分析
综上所述,Java线程池和锁机制是并发编程中极其重要的两个部分。线程池主要用于管理线程的生命周期和执行并发任务,而锁机制则用于保障线程安全和防止数据的并发错误。它们深入地结合在一起,成为Java高效并发编程实践中的关键要素。
15 0
|
3月前
|
存储 SQL Java
(七)全面剖析Java并发编程之线程变量副本ThreadLocal原理分析
在之前的文章:彻底理解Java并发编程之Synchronized关键字实现原理剖析中我们曾初次谈到线程安全问题引发的"三要素":多线程、共享资源/临界资源、非原子性操作,简而言之:在同一时刻,多条线程同时对临界资源进行非原子性操作则有可能产生线程安全问题。
|
2月前
|
存储 Java 调度
深入浅出Java线程池原理
本文深入分析了Java线程池的原理和实现,帮助读者更好地理解Java并发编程中线程池的创建、工作流程和性能优化。
|
3月前
|
缓存 监控 Java
(十)深入理解Java并发编程之线程池、工作原理、复用原理及源码分析
深入理解Java并发编程之线程池、工作原理、复用原理及源码分析