保护性暂停

简介: 如何实现线程同步和异步

同步&异步

1. 同步等待

1.1 什么是保护性暂停

In concurrent programming, guarded suspension is a software design pattern for managing operations that require both a lock to be acquired and a precondition to be satisfied before the operation can be executed. The guarded suspension pattern is typically applied to method calls in object-oriented programs, and involves suspending the method call, and the calling thread, until the precondition (acting as a guard) is satisfied.

—— wikipedia

保护性暂停模式是让一个线程等待另一个线程的结果。

举个例子说明:现在有两个线程,Thread1负责写入结果,Thread2负责读取结果。

同步

import java.util.concurrent.atomic.AtomicReference;

public class Main {
    public static void main(String[] args) throws Exception {
        //公共变量
        AtomicReference<Integer> response = new AtomicReference<>(null);


        Thread thread1 = new Thread(()->{
            System.out.println("thread1 before set .....");
            response.set(1);
            System.out.println("thread1 after set .....");

        });

        Thread thread2 = new Thread(()->{
            System.out.println("thread2 before get .....");
            System.out.println(response.get());
            System.out.println("thread2 after get .....");
        });

        thread1.start();
        thread2.start();

    }


}

按照上面的代码,两个线程独立执行,thread2能否获取response的值全靠缘分。

既然要控制先后顺序,自然就要使用wait和notify进行同步(先忽略join、future、CountdownLatch等同步工具)

import java.util.concurrent.atomic.AtomicReference;

public class Main {
    public static void main(String[] args) throws Exception {
        //公共变量
        AtomicReference<Integer> response = new AtomicReference<>(null);

        Thread thread1 = new Thread(()->{
                System.out.println("thread1 before set .....");
                synchronized(response) {
                    response.set(1);
                    response.notify();
                }

            System.out.println("thread1 after set .....");

        });

        Thread thread2 = new Thread(()->{
                System.out.println("thread2 before get .....");
                synchronized(response) {
                    //如果已有response直接获取,否则阻塞
                    if (response.get() == null){
                        try {
                            response.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }

                    System.out.println(response.get());

                }

                System.out.println("thread2 after get .....");
        });

        thread1.start();
        thread2.start();

    }


}

为了代码的简洁和易用,我们将同步逻辑封装到一个对象中,就实现了一个简易的保护性暂停对象:

保护性暂停

import java.util.concurrent.atomic.AtomicReference;

public class Main {
    public static void main(String[] args) throws Exception {

        GuardObject guardObject = new GuardObject();

        Thread thread1 = new Thread(()->{
            System.out.println("thread1 before set .....");
            guardObject.setObj(1);
            System.out.println("thread1 after set .....");

        });

        Thread thread2 = new Thread(()->{
            System.out.println("thread2 before get .....");
            try {
                System.out.println(guardObject.getObj());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("thread2 after get .....");
        });

        thread1.start();
        thread2.start();

    }

   static class GuardObject{
        private Integer response;

        public synchronized int getObj() throws InterruptedException {
            if(response == null){
                wait();
            }
            return response;
        }

        public synchronized void setObj(Integer value){
            response = value;
            notify();
        }
    }
}

JDK中的thread.join、FutureTask都是用保护性暂停的设计模式来实现的。

1.2 FutureTask

![]()

  1. 入口: excutor.submit(callable)

excutor

  1. 生成RunnableFuture, 加入Worker。线程池会把Callable对象包装进 RunnableFutureRunnableFuture既是Future又是Runnable。然后执行该
// java.util.concurrent.AbstractExecutorService

/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException       {@inheritDoc}
*/
public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
} 


protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    return new FutureTask<T>(callable);
}
  1. 工作线程调用 FuturerTask的run方法。
// java.util.concurrent.ThreadPoolExecutor#addWorker   
    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }
  1. Future调用get, 如果没有返回值(outcome变量),会将当前线程封装进WaitNode,并调用UNSAFE.compareAndSwapObject(this, waitersOffset,q.next = waiters, q)存入waiters(头插法), 然后调用LockSupport.unpark阻塞当前线程。
//java.util.concurrent.FutureTask
    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }


    /**
     * Returns result or throws exception for completed task.
     *
     * @param s completed state value
     */
    @SuppressWarnings("unchecked")
    private V report(int s) throws ExecutionException {
        Object x = outcome;
        if (s == NORMAL)
            return (V)x;
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);
    }


    /**
     * Awaits completion or aborts on interrupt or timeout.
     *
     * @param timed true if use timed waits
     * @param nanos time to wait, if timed
     * @return state upon completion
     */
    private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
            if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }

            int s = state;
            if (s > COMPLETING) {
                if (q != null)
                    q.thread = null;
                return s;
            }
            else if (s == COMPLETING) // cannot time out yet
                Thread.yield();
            else if (q == null)
                q = new WaitNode();
            else if (!queued)
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
            else if (timed) {
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                }
                LockSupport.parkNanos(this, nanos);
            }
            else
                LockSupport.park(this);
        }
    }
  1. FuturerTask的run方法执行完毕,通过set(result)将结果赋值给outcome, 并激活waiters中所有的阻塞
//java.util.concurrent.FutureTask   
public void run() {
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }


 protected void set(V v) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = v;
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
            finishCompletion();
        }
 }

    /**
     * Removes and signals all waiting threads, invokes done(), and
     * nulls out callable.
     */
    private void finishCompletion() {
        // assert state > COMPLETING;
        for (WaitNode q; (q = waiters) != null;) {
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                for (;;) {
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                        LockSupport.unpark(t);
                    }
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    q.next = null; // unlink to help gc
                    q = next;
                }
                break;
            }
        }

        done();

        callable = null;        // to reduce footprint
    }

2. 异步处理

FutureTask是一种进程等待另一个进程执行结束(同步), 但有时候我们需要的是异步操作,又该如何设计呢?通过注册+回调机制来实现。

FutureAsync

import java.util.ArrayList;
import java.util.List;

public class Main {
    public static void main(String[] args) throws Exception {

        FutureAsync<Integer> future = new FutureAsync<>();

        Thread thread1 = new Thread(()->{
            System.out.println("thread1 before set .....");
            try {
                Thread.sleep(1000);
                future.fireSuccess(1);
            } catch (InterruptedException e) {
               future.fireFailure();
            }

            System.out.println("thread1 after set .....");

        });

        Thread thread2 = new Thread(()->{
            System.out.println("thread2 before get .....");
            future.addListener(new FutureCallback() {
                   @Override
                   public void onSuccess(Object value) {
                       System.out.println(String.valueOf(value));
                   }

                   @Override
                   public void onFailure() {
                       System.out.println("error");
                   }
            });

            System.out.println("thread2 after get .....");
        });

        thread1.start();
        thread2.start();

    }

    static class FutureAsync<T>{
       List<FutureCallback> callbacks = new ArrayList<>();

       void addListener(FutureCallback callback){
           callbacks.add(callback);
       }

       void fireSuccess(T value){
           callbacks.stream().forEach((callback)->{callback.onSuccess(value);});
       }
        void fireFailure(){
            callbacks.stream().forEach((callback)->{callback.onFailure();});
        }
    }

    interface FutureCallback<T>{
        void  onSuccess(T value);
        void onFailure();
    }
}
相关文章
|
6月前
|
监控 网络协议 iOS开发
程序退到后台的时候,所有线程被挂起,系统回收所有的socket资源问题及解决方案
程序退到后台的时候,所有线程被挂起,系统回收所有的socket资源问题及解决方案
221 0
|
6月前
LabVIEW中使用队列,通知器,信号量或集合点时的潜在竞争情况
LabVIEW中使用队列,通知器,信号量或集合点时的潜在竞争情况
65 0
|
6月前
|
消息中间件 Java
保护性暂停模式
保护性暂停模式
64 0
|
消息中间件 Java
同步模式之保护性暂停
同步模式之保护性暂停
|
6月前
在程序运行过程中,线程的状态是什么?进来看看就通透了
在程序运行过程中,线程的状态是什么?进来看看就通透了
52 0
|
消息中间件 Cloud Native Java
线程同步模式之保护性暂停
保护性暂停是一种同步模式,用于保护共享资源的完整性。在多线程或多进程环境中,如果多个线程或进程同时访问共享资源,可能会导致数据不一致或者竞态条件等问题
167 0
MOTOROLA TMCP700 在任何时候暂停或中止数据阶段
MOTOROLA TMCP700 在任何时候暂停或中止数据阶段
103 0
MOTOROLA TMCP700 在任何时候暂停或中止数据阶段
多线程编程之如何暂停与恢复线程
前面的文章多线程编程之停止线程的几种方法介绍了停止线程运行的三种方法:
688 0
多线程编程之如何暂停与恢复线程
|
消息中间件 缓存 监控
Java线程生命周期与状态切换
最近有点懒散,没什么比较有深度的产出。刚好想重新研读一下JUC线程池的源码实现,在此之前先深入了解一下Java中的线程实现,包括线程的生命周期、状态切换以及线程的上下文切换等等。编写本文的时候,使用的JDK版本是11。
149 0
Java线程生命周期与状态切换