java并发编程笔记--ScheduledThreadPoolExecutor实现

简介:     ScheduledThreadPoolExecutor作为ScheduledExecutorService接口的实现,提供了延迟执行任务或者周期性执行任务的能力。通过名称可以看出,ScheduledThreadPoolExecutor基于线程池实现,它通过继承ThreadPoolExecutor实现线程池管理能力的复用,同时扩展了自己的定时任务调度能力。

    ScheduledThreadPoolExecutor作为ScheduledExecutorService接口的实现,提供了延迟执行任务或者周期性执行任务的能力。通过名称可以看出,ScheduledThreadPoolExecutor基于线程池实现,它通过继承ThreadPoolExecutor实现线程池管理能力的复用,同时扩展了自己的定时任务调度能力。

image

    首先来看ScheduledExecutorServicej接口,它继承了ExecutorService接口,作为任务执行器的一种扩展类型,提供了如下方法:

  • schedule方法:用于任务的单次执行,允许指定延迟时间,当时间为0或者负数时,表示立即执行任务;
  • scheduleAtFixedRate方法:以固定的时间间隔执行任务,当任务本身的执行时间超过时间间隔时,会等到任务执行完成后,立即执行下一次任务;同一个任务总是串行执行,不会并发执行;
  • scheduleWithFixedDelay方法:以固定的延迟执行任务,当前任务执行时间与上一次任务执行时间相隔固定的延迟;任务每次执行完成后,会在结束时间上加上固定的延迟作为下一次执行时间。任务执行的周期会将任务本身执行耗时考虑在内,因而并非每次执行的时间间隔都相同;

ScheduledThreadPoolExecutor继承ThreadPoolExecutor,主要做了如下改变:

  • 使用ScheduledFutureTask作为任务封装类,代替原先的FutureTask类;
  • 使用DelayedWorkQueue作为阻塞队列,队列为无界队列;ScheduledThreadPoolExecutor的构造器仅需要传入corePoolSize,使用"corePoolSize+无界队列"实现任务调度;
  • 支持run-after-shutdown参数,使得ScheduledThreadPoolExecutor重写shutdown方法,允许移除并且取消不需要在shutdown后执行的任务;
  • 提供了decorateTask方法,用来定制任务操作;

ScheduledThreadPoolExecutor的组成

image

ScheduledThreadPoolExecutor由3部分组成:

  • 任务调度控制:ScheduledThreadPoolExecutor,负责任务调度控制,实现了ScheduledExecutorService接口;
  • 阻塞队列:DelayedWorkQueue,作为ScheduledThreadPoolExecutor的内部类,用于缓存线程任务的阻塞队列,仅能够存放RunnableScheduledFuture对象;该队列实现了延迟调度任务的逻辑,如果当前时间大于等于任务的延迟执行时间,任务才可以被调度。
  • 调度任务:ScheduledFutureTask,作为ScheduledFutureTask的内部类,实现了RunnableScheduledFuture,封装了调度任务的执行逻辑。其中的time字段存放下一次执行时间,DelayedWorkQueue会据此判断任务是否可以被执行。period字段存放执行周期,对于周期性执行任务,每次会根据period计算time。

ScheduledThreadPoolExecutor初始化

ScheduledThreadPoolExecutor的构造器最多指定3个参数:

  • corePoolSize:线程池核心工作线程数量;
  • threadFactory:定制工作线程创建方式;
  • handler:驳回任务处理策略;

    ScheduledThreadPoolExecutor构造器会调用父类构造器进行线程池初始化,使用DelayedWorkQueue作为阻塞队列,该队列为无界队列,因而maximumPoolSize属性配置无效。又因为都是核心工作线程,没有非核心线程需要回收,因而keepAliveTime配置为0。代码如下:

public ScheduledThreadPoolExecutor(int corePoolSize,
                                   ThreadFactory threadFactory,
                                   RejectedExecutionHandler handler) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue(), threadFactory, handler);
}

ScheduledThreadPoolExecutor初始化时并不会预先创建工作线程,而是在提交任务的时候,通过父类java.util.concurrent.ThreadPoolExecutor#ensurePrestart方法判断线程数是否达到corePoolSize,如果未达到,则新增线程;实现逻辑如下:

void ensurePrestart() {
    int wc = workerCountOf(ctl.get());
    // 当前线程数 < corePoolSize,则添加核心工作线程;
    if (wc < corePoolSize)
        addWorker(null, true);
    // 如果0 == wc >= corePoolSize,则表示corePoolSize配置为0,则初始化一个非核心工作线程;
    else if (wc == 0)
        addWorker(null, false);
}

任务执行

ScheduledThreadPoolExecutor的任务执行分为单次执行周期性执行

  • 单次执行:通过schedule方法执行的任务属于单次执行任务。Executor的execute方法、ExecutorService的submit方法都是通过调用schedule方法执行,故也是单次执行的任务。除了schedule可以指定延迟时间以外,其余方法的延迟时间均为0,即立刻执行任务。比如:execute方法实现如下:
public void execute(Runnable command) {
    schedule(command, 0, NANOSECONDS);
}
  • 周期性执行:通过scheduleAtFixedRate、scheduleWithFixedDelay方法执行的任务均为周期性执行任务。周期性执行的实现可以理解为每次执行完成后设定下一次执行时间,然后将任务重新放入到阻塞队列等待下一次调度。

任务入口:delayedExecute()

    无论是单次执行还是周期性执行,其执行的入口都是delayedExecute方法。delayedExecute()将任务放入到阻塞队列中,复用ThreadPoolExecutor的逻辑进行任务调度。代码如下:

private void delayedExecute(RunnableScheduledFuture<?> task) {
    // 如果调度器关闭,则拒绝接收任务
    if (isShutdown())
        // 执行拒绝策略
        reject(task);
    // 如果调度器未关闭
    else {
        // 添加任务到阻塞队列,等待调度执行;
        super.getQueue().add(task);
        // 如果此时调度器关闭,则取消任务;
        if (isShutdown() &&
            !canRunInCurrentRunState(task.isPeriodic()) &&
            remove(task))
            task.cancel(false);
        // 如果调度器正常运行,则检查开启线程数是否达到corePoolSize
        // 如果未达到corePoolSize,则初始化1个工作线程;
        // 如果corePoolSize设置为0,则会初始化1个非core工作线程;
        else
            ensurePrestart();
    }
}

    当ThreadPoolExecutor的Worker线程从阻塞队列取出任务执行时,会调用ScheduledFutureTask的run方法。该方法对任务类型进行判断,如果是单次执行任务,则立即执行并设置返回结果。如果是周期性执行任务,则执行任务并设置下一次执行时间,然后将任务放入到阻塞队列中,等待下一次调度。方法代码如下:

public void run() {
    boolean periodic = isPeriodic();
    // 如果当前不可运行任务,则取消任务
    if (!canRunInCurrentRunState(periodic))
        cancel(false);
    // 如果是单次执行的任务
    else if (!periodic)
        // 直接调用FutureTask.run()方法执行;
        ScheduledFutureTask.super.run();
    // 如果是周期性任务,则运行任务但不设置结果;
    else if (ScheduledFutureTask.super.runAndReset()) {
        // 设置任务下一次执行时间
        setNextRunTime();
        // 将任务重新加入队列,等待下一次调度
        reExecutePeriodic(outerTask);
    }
}

单次执行:schedule()

schedule的执行主要分为参数封装执行两个步骤。实现如下:

public ScheduledFuture<?> schedule(Runnable command,
                                   long delay,
                                   TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    // 封装任务参数,调用decorateTask装饰方法进行任务定制
    RunnableScheduledFuture<?> t = decorateTask(command,
        new ScheduledFutureTask<Void>(command, null,
                                      triggerTime(delay, unit)));
    // 执行任务                                  
    delayedExecute(t);
    return t;
}

参数封装过程会调用decorateTask方法,该方法为protected的空方法,用于定制RunnableScheduledFuture的属性,可以通过重写实现定制。

protected <V> RunnableScheduledFuture<V> decorateTask(
    Runnable runnable, RunnableScheduledFuture<V> task) {
    return task;
}

周期性执行:scheduleAtFixedRate() / scheduleWithFixedDelay()

    scheduleAtFixedRate()的实现与schedule()方法非常相似,仅是将decorateTask()返回的RunnableScheduledFuture对象设置为原有Future的outerTask属性。在重新知心任务时,会将outerTask添加到阻塞队列,从而保证decorateTask()的定制效果一直有效。

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    if (period <= 0)
        throw new IllegalArgumentException();
    ScheduledFutureTask<Void> sft =
        new ScheduledFutureTask<Void>(command,
                                      null,
                                      triggerTime(initialDelay, unit),
                                      unit.toNanos(period));
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    // 保存定制后的Future对象,便于再次调用;
    sft.outerTask = t;
    delayedExecute(t);
    return t;
}

    scheduleAtFixedRate()的实现与schedule()方法非常相似,仅是设置ScheduledFutureTask延迟时,使用负数,标识执行方式为scheduleAtFixedRate。

public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                 long initialDelay,
                                                 long delay,
                                                 TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    if (delay <= 0)
        throw new IllegalArgumentException();
    ScheduledFutureTask<Void> sft =
        new ScheduledFutureTask<Void>(command,
                                      null,
                                      triggerTime(initialDelay, unit),
                                      unit.toNanos(-delay));
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    sft.outerTask = t;
    delayedExecute(t);
    return t;
}

    ScheduledFutureTask并没有设置单独的字段用于标识执行类型,而是通过period字段的正负号和是否为0表示执行方式:

  • 正数:fixed-rate执行方式;
  • 负数:fixed-delay执行方式;
  • 0:单次执行任务;

    scheduleAtFixedRate() / scheduleWithFixedDelay()执行的主要区别在于设置下一次执行时间的策略不同,而执行时间通过ScheduledFutureTask的time字段保存,通过ScheduledFutureTask#setNextRunTime()进行设置,代码如下:

private void setNextRunTime() {
    long p = period;
    // 如果是fixed-rate执行方式:下一次执行时间 = 上一次执行时间 + period
    if (p > 0)
        time += p;
    // 如果是fixed-delay执行方式:下一次执行时间 = now() + period
    else
        time = triggerTime(-p);
}

延迟功能实现:DelayedWorkQueue

    DelayedWorkQueue是专门存放RunnableScheduledFuture和ScheduledFutureTask对象的优先队列,底层基于最小二叉堆实现,为了能够提升任务的查找和删除效率,ScheduledFutureTask中增加了一个heapIndex的成员变量,用于存放任务在堆数组中的索引位置,当需要查找或者删除某个特定的任务时,直接根据任务的heapIndex访问堆数组中的元素。任务是否到达执行时间的判断逻辑均在DelayedWorkQueue中实现。

主要成员变量

        /**
         * 存放堆的数组,初始化大小为16
         */
        private RunnableScheduledFuture<?>[] queue =
            new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
            
        /**
         * 队列中的任务个数
         */
        private int size = 0;
            
        /**
         * 保证队列操作的锁
         */
        private final ReentrantLock lock = new ReentrantLock();
        
        /**
         * 存放用于等待任务的leader线程引用;
         * 为降低性能消耗,同一时间并不需要所有线程都轮询等待任务到达执行时间;
         * 只需要一个leader线程负责轮询等待即可;
         */
        private Thread leader = null;
        
        /**
         * Condition signalled when a newer task becomes available at the
         * head of the queue or a new thread may need to become leader.
         */
        private final Condition available = lock.newCondition();

    与PriorityQueue的实现不同,DelayedWorkQueue涉及到多线程访问,因而需要保证线程同步测正确性,故使用ReentrantLock来控制操作的原子性,同时使用Condition来协调线程的执行;

设置任务索引

    为了方便在DelayedWorkQueue中查找和删除任务,ScheduledFutureTask有一个heapIndex用于存放任务在堆数组中的索引位置。每当任务在队列中的位置改变时,需要同步更新任务的heapIndex。

        private void setIndex(RunnableScheduledFuture<?> f, int idx) {
            if (f instanceof ScheduledFutureTask)
                ((ScheduledFutureTask)f).heapIndex = idx;
        }

上浮、下沉操作

上浮、下沉操作的实现与PriorityQueue实现相似,只多了更新索引位置的操作,且需要在加锁的环境下调用。

        /**
         * 上浮操作
         */
        private void siftUp(int k, RunnableScheduledFuture<?> key) {
            while (k > 0) {
                int parent = (k - 1) >>> 1;
                RunnableScheduledFuture<?> e = queue[parent];
                if (key.compareTo(e) >= 0)
                    break;
                queue[k] = e;
                // 更新父节点索引位置
                setIndex(e, k);
                k = parent;
            }
            queue[k] = key;
            // 更新当前节点索引位置
            setIndex(key, k);
        }
        
        /**
         * 下沉操作
         */
        private void siftDown(int k, RunnableScheduledFuture<?> key) {
            int half = size >>> 1;
            while (k < half) {
                int child = (k << 1) + 1;
                RunnableScheduledFuture<?> c = queue[child];
                int right = child + 1;
                if (right < size && c.compareTo(queue[right]) > 0)
                    c = queue[child = right];
                if (key.compareTo(c) <= 0)
                    break;
                queue[k] = c;
                // 更新子节点索引
                setIndex(c, k);
                k = child;
            }
            queue[k] = key;
            // 更新当前节点索引
            setIndex(key, k);
        }

入队操作

        public boolean offer(Runnable x) {
            if (x == null)
                throw new NullPointerException();
                
            // 操作前先加锁,保证原子性
            RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                int i = size;
                
                // 容量不够,则成倍扩容
                if (i >= queue.length)
                    grow();
                size = i + 1;
                
                // 如果队列为空,则直接放入任务
                if (i == 0) {
                    queue[0] = e;
                    setIndex(e, 0);
                } else {
                    // 如果队列不为空,则执行上浮操作
                    siftUp(i, e);
                }
                
                // queue[0] == e包含两种情况:
                // 1)e为队列中的第一个元素;
                // 2)e为队列中最近要执行的任务;
                if (queue[0] == e) {
                    leader = null;
                    available.signal();
                }
            } finally {
                lock.unlock();
            }
            return true;
        }

        public void put(Runnable e) {
            offer(e);
        }

        public boolean add(Runnable e) {
            return offer(e);
        }

出队操作

        /**
         * 任务出队后,通过下沉操作使得堆有序;
         */
        private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
            int s = --size;
            RunnableScheduledFuture<?> x = queue[s];
            queue[s] = null;
            if (s != 0)
                siftDown(0, x);
                
            // 出队任务heapIndex设置为-1
            setIndex(f, -1);
            return f;
        }

        /**
         * 出队,非阻塞
         */
        public RunnableScheduledFuture<?> poll() {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                RunnableScheduledFuture<?> first = queue[0];
                // 如果队列为空或者没有任务到执行时间,则返回null
                if (first == null || first.getDelay(NANOSECONDS) > 0)
                    return null;
                else
                    // 执行下沉操作,返回队首任务
                    return finishPoll(first);
            } finally {
                lock.unlock();
            }
        }

        /**
         * 出队,阻塞线程,直到有任务返回
         */
        public RunnableScheduledFuture<?> take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            // 加锁,允许响应中断
            lock.lockInterruptibly();
            try {
                for (;;) {
                    RunnableScheduledFuture<?> first = queue[0];
                    // 如果队列为空,则阻塞等待
                    if (first == null)
                        available.await();
                    else {
                        long delay = first.getDelay(NANOSECONDS);
                        
                        // 如果第一个任务已经到达时间点,则立刻返回任务
                        if (delay <= 0)
                            return finishPoll(first);
                        
                        // 如果未到达时间点
                        first = null; // don't retain ref while waiting
                        
                        // 如果已经有leader线程等待任务,则阻塞当前线程
                        if (leader != null)
                            available.await();
                        
                        // 如果没有leader线程,则设置当前线程为leader线程,轮询等待任务到达执行时间点
                        else {
                            Thread thisThread = Thread.currentThread();
                            leader = thisThread;
                            try {
                                // 等待队首任务到达执行时间
                                available.awaitNanos(delay);
                            } finally {
                                if (leader == thisThread)
                                    leader = null;
                            }
                        }
                    }
                }
            } finally {
                // 唤醒一个线程,确保至少有一个线程未被阻塞
                if (leader == null && queue[0] != null)
                    available.signal();
                    
                lock.unlock();
            }
        }

        /**
         * 出队,阻塞线程,直到有任务返回或者超时
         */
        public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit)
            throws InterruptedException {
            long nanos = unit.toNanos(timeout);
            final ReentrantLock lock = this.lock;
            
            // 加锁,允许响应中断
            lock.lockInterruptibly();
            try {
                for (;;) {
                    RunnableScheduledFuture<?> first = queue[0];
                    // 如果队列为空
                    if (first == null) {
                    
                        // 如果到达超时时间,则返回null
                        if (nanos <= 0)
                            return null;
                        // 未到达超时时间,则等待超时
                        else
                            nanos = available.awaitNanos(nanos);
                            
                    } else {
                    
                        long delay = first.getDelay(NANOSECONDS);
                        
                        // 如果队首任务到达执行时间,则立即返回任务;
                        if (delay <= 0)
                            return finishPoll(first);
                            
                        // 如果未到达执行时间,且超时,则返回null;
                        if (nanos <= 0)
                            return null;
                        
                        // 如果未到达执行时间,且没有超时
                        first = null; // don't retain ref while waiting
                        
                        // 如果已经有leader线程,或者超时时间小于第一个任务执行时间,
                        // 则阻塞当前线程直至超时
                        if (nanos < delay || leader != null)
                            nanos = available.awaitNanos(nanos);
                        
                        // 如果没有leader线程,或者没有超时且没有任务到达时间点;
                        // 则阻塞等待任务到达执行时间点    
                        else {
                            Thread thisThread = Thread.currentThread();
                            leader = thisThread;
                            try {
                                long timeLeft = available.awaitNanos(delay);
                                nanos -= delay - timeLeft;
                            } finally {
                            
                                // 如果当前线程是leader线程,则取消其leader属性
                                if (leader == thisThread)
                                    leader = null;
                            }
                        }
                    }
                }
            } finally {
                // 唤醒一个线程,确保至少有一个线程未被阻塞
                if (leader == null && queue[0] != null)
                    available.signal();
                lock.unlock();
            }
        }

根据heapIndex查找和删除任务

        /**
         * 查找一个任务的index,如果未找到,则返回-1
         */
        private int indexOf(Object x) {
            if (x != null) {
                // 如果是ScheduledFutureTask类型任务,则直接返回heapIndex,效率O(1)
                if (x instanceof ScheduledFutureTask) {
                    int i = ((ScheduledFutureTask) x).heapIndex;
                    
                    // 检查ScheduledFutureTask是否属于当前pool
                    if (i >= 0 && i < size && queue[i] == x)
                        return i;
                } 
                // 如果是RunnableScheduledFuture类型任务,则遍历查找,效率O(1)
                else {
                    for (int i = 0; i < size; i++)
                        if (x.equals(queue[i]))
                            return i;
                }
            }
            return -1;
        }

        /**
         * 查找任务
         */
        public boolean contains(Object x) {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                return indexOf(x) != -1;
            } finally {
                lock.unlock();
            }
        }

        /**
         * 从队列中删除任务,用于取消任务的场景
         */
        public boolean remove(Object x) {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                // 查找任务索引
                int i = indexOf(x);
                
                // 未找到任务,则不执行删除操作,返回false
                if (i < 0)
                    return false;
                
                // 清空任务索引信息,减小任务队列size
                // 使用队尾任务替换现有任务索引位置,然后通过下沉、上浮操作找到合适位置
                setIndex(queue[i], -1);
                int s = --size;
                RunnableScheduledFuture<?> replacement = queue[s];
                queue[s] = null;
                if (s != i) {
                    siftDown(i, replacement);
                    // 如果任务未下沉,则执行上浮操作
                    if (queue[i] == replacement)
                        siftUp(i, replacement);
                }
                return true;
            } finally {
                lock.unlock();
            }
        }

    通过上面代码我们总结DelayedWorkQueue的实现原理:
1)基于最小二叉堆实现的优先队列,根据ScheduledFutureTask.compareTo方法比较任务执行时间,使得最近要执行的任务位于队首;
2)任务出队时,通过轮询判断任务是否到达执行时间点,ScheduledFutureTask实现了Delayed接口,通过getDelay方法能够获取到任务还有多长时间执行;
3)当队列中所有任务都没有到达执行时间时,队列中会维持一个leader线程,用于轮询等待队首任务,其余线程均await()。
4)ScheduledFutureTask增加heapIndex属性,用于标记任务在堆数组中的索引,从而便于任务的快速查找(是否存在)与取消(删除);

任务取消

    任务的取消通过ScheduledFutureTask.cancel()方法实现,该方法调用ThreadPoolExecutor.cancel(),在取消任务后,判断是否需要从阻塞队列中移除任务。其中removeOnCancel参数通过setRemoveOnCancelPolicy()设置。之所以要在取消任务后移除阻塞队列中任务,是为了防止队列中积压大量已被取消的任务。

public boolean cancel(boolean mayInterruptIfRunning) {
    // 调用ThreadPoolExecutor.cancel方法取消任务
    boolean cancelled = super.cancel(mayInterruptIfRunning);
    // 从阻塞队列中移除任务
    if (cancelled && removeOnCancel && heapIndex >= 0)
        remove(this);
    return cancelled;
}

关闭调度器

ScheduledThreadPoolExecutor的shutdown() / shutdownNow()方法均调用ThreadPoolExecutor的相应方法实现。同时,ScheduledThreadPoolExecutor实现了ThreadPoolExecutor的onShutdown()用于在shutdown()执行过程中取消任务执行。

此处涉及2个参数:

  • executeExistingDelayedTasksAfterShutdown:当执行shutdown()后,是否继续执行队列中的单次执行任务;默认为true,即执行;
  • continueExistingPeriodicTasksAfterShutdown:当执行shutdown()后,是否继续执行队列中的周期性任务;默认为false,即不执行;
@Override void onShutdown() {
    BlockingQueue<Runnable> q = super.getQueue();
    boolean keepDelayed = getExecuteExistingDelayedTasksAfterShutdownPolicy();
    boolean keepPeriodic = getContinueExistingPeriodicTasksAfterShutdownPolicy();
    // 如果设置不执行队列中的任务,则取消队列中所有任务,清空队列;
    if (!keepDelayed && !keepPeriodic) {
        for (Object e : q.toArray())
            if (e instanceof RunnableScheduledFuture<?>)
                ((RunnableScheduledFuture<?>) e).cancel(false);
        q.clear();
    }
    // 如果设置执行队列中的任务:周期性的或者单次的任务
    else {
        // 遍历队列中的任务,逐个取消并删除不需要执行的任务
        // 先拷贝到数组再遍历,防止遍历时队列元素更新,导致异常;
        for (Object e : q.toArray()) {
            if (e instanceof RunnableScheduledFuture) {
                RunnableScheduledFuture<?> t = (RunnableScheduledFuture<?>)e;
                if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) || t.isCancelled()) {
                    if (q.remove(t))
                        t.cancel(false);
                }
            }
        }
    }
    tryTerminate();
}
目录
相关文章
|
1月前
|
安全 Java 程序员
深入理解Java内存模型与并发编程####
本文旨在探讨Java内存模型(JMM)的复杂性及其对并发编程的影响,不同于传统的摘要形式,本文将以一个实际案例为引子,逐步揭示JMM的核心概念,包括原子性、可见性、有序性,以及这些特性在多线程环境下的具体表现。通过对比分析不同并发工具类的应用,如synchronized、volatile关键字、Lock接口及其实现等,本文将展示如何在实践中有效利用JMM来设计高效且安全的并发程序。最后,还将简要介绍Java 8及更高版本中引入的新特性,如StampedLock,以及它们如何进一步优化多线程编程模型。 ####
32 0
|
1月前
|
Java 程序员
Java编程中的异常处理:从基础到高级
在Java的世界中,异常处理是代码健壮性的守护神。本文将带你从异常的基本概念出发,逐步深入到高级用法,探索如何优雅地处理程序中的错误和异常情况。通过实际案例,我们将一起学习如何编写更可靠、更易于维护的Java代码。准备好了吗?让我们一起踏上这段旅程,解锁Java异常处理的秘密!
|
15天前
|
存储 缓存 Java
Java 并发编程——volatile 关键字解析
本文介绍了Java线程中的`volatile`关键字及其与`synchronized`锁的区别。`volatile`保证了变量的可见性和一定的有序性,但不能保证原子性。它通过内存屏障实现,避免指令重排序,确保线程间数据一致。相比`synchronized`,`volatile`性能更优,适用于简单状态标记和某些特定场景,如单例模式中的双重检查锁定。文中还解释了Java内存模型的基本概念,包括主内存、工作内存及并发编程中的原子性、可见性和有序性。
Java 并发编程——volatile 关键字解析
|
19天前
|
算法 Java 调度
java并发编程中Monitor里的waitSet和EntryList都是做什么的
在Java并发编程中,Monitor内部包含两个重要队列:等待集(Wait Set)和入口列表(Entry List)。Wait Set用于线程的条件等待和协作,线程调用`wait()`后进入此集合,通过`notify()`或`notifyAll()`唤醒。Entry List则管理锁的竞争,未能获取锁的线程在此排队,等待锁释放后重新竞争。理解两者区别有助于设计高效的多线程程序。 - **Wait Set**:线程调用`wait()`后进入,等待条件满足被唤醒,需重新竞争锁。 - **Entry List**:多个线程竞争锁时,未获锁的线程在此排队,等待锁释放后获取锁继续执行。
54 12
|
15天前
|
存储 安全 Java
Java多线程编程秘籍:各种方案一网打尽,不要错过!
Java 中实现多线程的方式主要有四种:继承 Thread 类、实现 Runnable 接口、实现 Callable 接口和使用线程池。每种方式各有优缺点,适用于不同的场景。继承 Thread 类最简单,实现 Runnable 接口更灵活,Callable 接口支持返回结果,线程池则便于管理和复用线程。实际应用中可根据需求选择合适的方式。此外,还介绍了多线程相关的常见面试问题及答案,涵盖线程概念、线程安全、线程池等知识点。
98 2
|
2月前
|
设计模式 Java 开发者
Java多线程编程的陷阱与解决方案####
本文深入探讨了Java多线程编程中常见的问题及其解决策略。通过分析竞态条件、死锁、活锁等典型场景,并结合代码示例和实用技巧,帮助开发者有效避免这些陷阱,提升并发程序的稳定性和性能。 ####
|
2月前
|
缓存 Java 开发者
Java多线程编程的陷阱与最佳实践####
本文深入探讨了Java多线程编程中常见的陷阱,如竞态条件、死锁和内存一致性错误,并提供了实用的避免策略。通过分析典型错误案例,本文旨在帮助开发者更好地理解和掌握多线程环境下的编程技巧,从而提升并发程序的稳定性和性能。 ####
|
1月前
|
安全 算法 Java
Java多线程编程中的陷阱与最佳实践####
本文探讨了Java多线程编程中常见的陷阱,并介绍了如何通过最佳实践来避免这些问题。我们将从基础概念入手,逐步深入到具体的代码示例,帮助开发者更好地理解和应用多线程技术。无论是初学者还是有经验的开发者,都能从中获得有价值的见解和建议。 ####
|
1月前
|
安全 Java 编译器
Kotlin教程笔记(27) -Kotlin 与 Java 共存(二)
Kotlin教程笔记(27) -Kotlin 与 Java 共存(二)
|
1月前
|
Java 调度
Java中的多线程编程与并发控制
本文深入探讨了Java编程语言中多线程编程的基础知识和并发控制机制。文章首先介绍了多线程的基本概念,包括线程的定义、生命周期以及在Java中创建和管理线程的方法。接着,详细讲解了Java提供的同步机制,如synchronized关键字、wait()和notify()方法等,以及如何通过这些机制实现线程间的协调与通信。最后,本文还讨论了一些常见的并发问题,例如死锁、竞态条件等,并提供了相应的解决策略。
51 3