《Java并发库系列三》一newSingleThreadScheduledExecutor

简介: newSingleThreadScheduledExecutor:产生一个ScheduledExecutorService对象,这个对象的线程池大小为1,如果任务多于一个,任务将按先后顺序执行。

newSingleThreadScheduledExecutor:产生一个ScheduledExecutorService对象,这个对象的线程池大小为1,如果任务多于一个,任务将按先后顺序执行。




1、继承结构


构造函数

包含一个定时的service

 

public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
    return new DelegatedScheduledExecutorService
        (new ScheduledThreadPoolExecutor(1));
}
static class DelegatedScheduledExecutorService
        extends DelegatedExecutorService
        implements ScheduledExecutorService {
    private final ScheduledExecutorService e;
    DelegatedScheduledExecutorService(ScheduledExecutorService executor) {
        super(executor);
        e = executor;
    }
复制代码


2、怎么保证只有一个线程


定时执行的时候调用这个方法,调用过程如下,注意看其中的注释,由上往下的调用顺序


public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                 long initialDelay,
                                                 long delay,
                                                 TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    if (delay <= 0)
        throw new IllegalArgumentException();
    ScheduledFutureTask<Void> sft =
        new ScheduledFutureTask<Void>(command,
                                      null,
                                      triggerTime(initialDelay, unit),
                                      unit.toNanos(-delay));
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    sft.outerTask = t;
    //  延迟执行
    delayedExecute(t);
    return t;
}
private void delayedExecute(RunnableScheduledFuture<?> task) {
    if (isShutdown())
        reject(task);
    else {
        // 加入任务队列
        super.getQueue().add(task);
        if (isShutdown() &&
            !canRunInCurrentRunState(task.isPeriodic()) &&
            remove(task))
            task.cancel(false);
        else
            // 确保执行
            ensurePrestart();
    }
}
// 如果worker数量小于corePoolSize,创建新的线程,其他情况不处理
void ensurePrestart() {
    int wc = workerCountOf(ctl.get());
    if (wc < corePoolSize)
        addWorker(null, true);
    else if (wc == 0)
        addWorker(null, false);
}
复制代码


3、怎么保证时间可以定时执行


public ScheduledFuture<?> schedule(Runnable command,
                                   long delay,
                                   TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    RunnableScheduledFuture<?> t = decorateTask(command,
        new ScheduledFutureTask<Void>(command, null,
                                      triggerTime(delay, unit)));
    delayedExecute(t);
    return t;
}
复制代码


在每次执行的时候会把下一次执行的时间放进任务中


private long triggerTime(long delay, TimeUnit unit) {
    return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
}
/**
 * Returns the trigger time of a delayed action.
 */
long triggerTime(long delay) {
    return now() +
        ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}
复制代码

 

FutureTask 定时是通过LockSupport.parkNanos(this, nanos);LockSupport.park(this);


private int awaitDone(boolean timed, long nanos)
    throws InterruptedException {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    boolean queued = false;
    for (;;) {
        if (Thread.interrupted()) {
            removeWaiter(q);
            throw new InterruptedException();
        }
        int s = state;
        if (s > COMPLETING) {
            if (q != null)
                q.thread = null;
            return s;
        }
        else if (s == COMPLETING) // cannot time out yet
            Thread.yield();
        else if (q == null)
            q = new WaitNode();
        else if (!queued)
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                 q.next = waiters, q);
        else if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                removeWaiter(q);
                return state;
            }
            //注意这里
            LockSupport.parkNanos(this, nanos);
        }
        else //注意这里
            LockSupport.park(this);
    }
}
复制代码


 

总结:Executor是通过将任务放在队列中,生成的futureTask。然后将生成的任务在队列中排序,将时间最近的需要出发的任务做检查。如果时间不到,就阻塞线程到下次出发时间。


注意:newSingleThreadScheduledExecutor只会有一个线程,不管你提交多少任务,这些任务会顺序执行,如果发生异常会取消下面的任务,线程池也不会关闭,注意捕捉异常


4、使用


ScheduledExecutorService single = Executors.newSingleThreadScheduledExecutor();
Runnable runnable1 = () -> {
    try {
        Thread.sleep(4000);
        System.out.println("11111111111111");
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
};
Runnable runnable2 = () -> {
    try {
        Thread.sleep(4000);
        System.out.println("222");
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
};
single.scheduleWithFixedDelay(runnable1,0,1, TimeUnit.SECONDS);
single.scheduleWithFixedDelay(runnable2,0,2, TimeUnit.SECONDS);
复制代码


11111111111111
222
11111111111111
222
11111111111111
复制代码


在项目中要注意关闭线程池


actionService = Executors.newSingleThreadScheduledExecutor();
        actionService.scheduleWithFixedDelay(() -> {
            try {
                Thread.currentThread().setName("robotActionService");
                Integer robotId = robotQueue.poll();
                if (robotId == null) {
                    //    关闭线程池
                    actionService.shutdown();
                } else {
                    int aiLv = robots.get(robotId);
                    if (actionQueueMap.containsKey(aiLv)) {
                        ActionQueue actionQueue = actionQueueMap.get(aiLv);
                        actionQueue.doAction(robotId);
                    }
                }
            } catch (Exception e) {
                //    捕捉异常
                LOG.error("",e);
            }
        }, 1, 1, TimeUnit.SECONDS);
复制代码


目录
相关文章
|
5月前
|
JavaScript 前端开发 Java
通义灵码 Rules 库合集来了,覆盖Java、TypeScript、Python、Go、JavaScript 等
通义灵码新上的外挂 Project Rules 获得了开发者的一致好评:最小成本适配我的开发风格、相当把团队经验沉淀下来,是个很好功能……
1135 103
|
2月前
|
Java API 调度
从阻塞到畅通:Java虚拟线程开启并发新纪元
从阻塞到畅通:Java虚拟线程开启并发新纪元
293 83
|
2月前
|
存储 Java 调度
Java虚拟线程:轻量级并发的革命性突破
Java虚拟线程:轻量级并发的革命性突破
240 83
|
5月前
|
消息中间件 算法 安全
JUC并发—1.Java集合包底层源码剖析
本文主要对JDK中的集合包源码进行了剖析。
|
4月前
|
机器学习/深度学习 消息中间件 存储
【高薪程序员必看】万字长文拆解Java并发编程!(9-2):并发工具-线程池
🌟 ​大家好,我是摘星!​ 🌟今天为大家带来的是并发编程中的强力并发工具-线程池,废话不多说让我们直接开始。
187 0
|
3月前
|
Java 物联网 数据处理
Java Solon v3.2.0 史上最强性能优化版本发布 并发能力提升 700% 内存占用节省 50%
Java Solon v3.2.0 是一款性能卓越的后端开发框架,新版本并发性能提升700%,内存占用节省50%。本文将从核心特性(如事件驱动模型与内存优化)、技术方案示例(Web应用搭建与数据库集成)到实际应用案例(电商平台与物联网平台)全面解析其优势与使用方法。通过简单代码示例和真实场景展示,帮助开发者快速掌握并应用于项目中,大幅提升系统性能与资源利用率。
115 6
Java Solon v3.2.0 史上最强性能优化版本发布 并发能力提升 700% 内存占用节省 50%
|
2月前
|
SQL 缓存 安全
深度理解 Java 内存模型:从并发基石到实践应用
本文深入解析 Java 内存模型(JMM),涵盖其在并发编程中的核心作用与实践应用。内容包括 JMM 解决的可见性、原子性和有序性问题,线程与内存的交互机制,volatile、synchronized 和 happens-before 等关键机制的使用,以及在单例模式、线程通信等场景中的实战案例。同时,还介绍了常见并发 Bug 的排查与解决方案,帮助开发者写出高效、线程安全的 Java 程序。
150 0
|
4月前
|
缓存 安全 Java
【高薪程序员必看】万字长文拆解Java并发编程!(3-1):并发共享问题的解决与分析
活锁:多个线程相互影响对方退出同步代码块的条件而导致线程一直运行的情况。例如,线程1的退出条件是count=5,而线程2和线程3在其代码块中不断地是count进行自增自减的操作,导致线程1永远运行。内存一致性问题:由于JIT即时编译器对缓存的优化和指令重排等造成的内存可见性和有序性问题,可以通过synchronized,volatile,并发集合类等机制来解决。这里的线程安全是指,多个线程调用它们同一个实例的方法时,是线程安全的,但仅仅能保证当前调用的方法是线程安全的,不同方法之间是线程不安全的。
88 0
|
4月前
|
Java 程序员
【高薪程序员必看】万字长文拆解Java并发编程!(3-2):并发共享问题的解决与分析
wait方法和notify方法都是Object类的方法:让当前获取锁的线程进入waiting状态,并进入waitlist队列:让当前获取锁的线程进入waiting状态,并进入waitlist队列,等待n秒后自动唤醒:在waitlist队列中挑一个线程唤醒:唤醒所有在waitlist队列中的线程它们都是之间协作的手段,只有拥有对象锁的线程才能调用这些方法,否则会出现IllegalMonitorStateException异常park方法和unpark方法是LockSupport类中的方法。
89 0