保护性暂停

简介: 如何实现线程同步和异步

同步&异步

1. 同步等待

1.1 什么是保护性暂停

In concurrent programming, guarded suspension is a software design pattern for managing operations that require both a lock to be acquired and a precondition to be satisfied before the operation can be executed. The guarded suspension pattern is typically applied to method calls in object-oriented programs, and involves suspending the method call, and the calling thread, until the precondition (acting as a guard) is satisfied.

—— wikipedia

保护性暂停模式是让一个线程等待另一个线程的结果。

举个例子说明:现在有两个线程,Thread1负责写入结果,Thread2负责读取结果。

同步

import java.util.concurrent.atomic.AtomicReference;

public class Main {
    public static void main(String[] args) throws Exception {
        //公共变量
        AtomicReference<Integer> response = new AtomicReference<>(null);


        Thread thread1 = new Thread(()->{
            System.out.println("thread1 before set .....");
            response.set(1);
            System.out.println("thread1 after set .....");

        });

        Thread thread2 = new Thread(()->{
            System.out.println("thread2 before get .....");
            System.out.println(response.get());
            System.out.println("thread2 after get .....");
        });

        thread1.start();
        thread2.start();

    }


}

按照上面的代码,两个线程独立执行,thread2能否获取response的值全靠缘分。

既然要控制先后顺序,自然就要使用wait和notify进行同步(先忽略join、future、CountdownLatch等同步工具)

import java.util.concurrent.atomic.AtomicReference;

public class Main {
    public static void main(String[] args) throws Exception {
        //公共变量
        AtomicReference<Integer> response = new AtomicReference<>(null);

        Thread thread1 = new Thread(()->{
                System.out.println("thread1 before set .....");
                synchronized(response) {
                    response.set(1);
                    response.notify();
                }

            System.out.println("thread1 after set .....");

        });

        Thread thread2 = new Thread(()->{
                System.out.println("thread2 before get .....");
                synchronized(response) {
                    //如果已有response直接获取,否则阻塞
                    if (response.get() == null){
                        try {
                            response.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }

                    System.out.println(response.get());

                }

                System.out.println("thread2 after get .....");
        });

        thread1.start();
        thread2.start();

    }


}

为了代码的简洁和易用,我们将同步逻辑封装到一个对象中,就实现了一个简易的保护性暂停对象:

保护性暂停

import java.util.concurrent.atomic.AtomicReference;

public class Main {
    public static void main(String[] args) throws Exception {

        GuardObject guardObject = new GuardObject();

        Thread thread1 = new Thread(()->{
            System.out.println("thread1 before set .....");
            guardObject.setObj(1);
            System.out.println("thread1 after set .....");

        });

        Thread thread2 = new Thread(()->{
            System.out.println("thread2 before get .....");
            try {
                System.out.println(guardObject.getObj());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("thread2 after get .....");
        });

        thread1.start();
        thread2.start();

    }

   static class GuardObject{
        private Integer response;

        public synchronized int getObj() throws InterruptedException {
            if(response == null){
                wait();
            }
            return response;
        }

        public synchronized void setObj(Integer value){
            response = value;
            notify();
        }
    }
}

JDK中的thread.join、FutureTask都是用保护性暂停的设计模式来实现的。

1.2 FutureTask

![]()

  1. 入口: excutor.submit(callable)

excutor

  1. 生成RunnableFuture, 加入Worker。线程池会把Callable对象包装进 RunnableFutureRunnableFuture既是Future又是Runnable。然后执行该
// java.util.concurrent.AbstractExecutorService

/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException       {@inheritDoc}
*/
public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
} 


protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    return new FutureTask<T>(callable);
}
  1. 工作线程调用 FuturerTask的run方法。
// java.util.concurrent.ThreadPoolExecutor#addWorker   
    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }
  1. Future调用get, 如果没有返回值(outcome变量),会将当前线程封装进WaitNode,并调用UNSAFE.compareAndSwapObject(this, waitersOffset,q.next = waiters, q)存入waiters(头插法), 然后调用LockSupport.unpark阻塞当前线程。
//java.util.concurrent.FutureTask
    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }


    /**
     * Returns result or throws exception for completed task.
     *
     * @param s completed state value
     */
    @SuppressWarnings("unchecked")
    private V report(int s) throws ExecutionException {
        Object x = outcome;
        if (s == NORMAL)
            return (V)x;
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);
    }


    /**
     * Awaits completion or aborts on interrupt or timeout.
     *
     * @param timed true if use timed waits
     * @param nanos time to wait, if timed
     * @return state upon completion
     */
    private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
            if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }

            int s = state;
            if (s > COMPLETING) {
                if (q != null)
                    q.thread = null;
                return s;
            }
            else if (s == COMPLETING) // cannot time out yet
                Thread.yield();
            else if (q == null)
                q = new WaitNode();
            else if (!queued)
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
            else if (timed) {
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                }
                LockSupport.parkNanos(this, nanos);
            }
            else
                LockSupport.park(this);
        }
    }
  1. FuturerTask的run方法执行完毕,通过set(result)将结果赋值给outcome, 并激活waiters中所有的阻塞
//java.util.concurrent.FutureTask   
public void run() {
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }


 protected void set(V v) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = v;
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
            finishCompletion();
        }
 }

    /**
     * Removes and signals all waiting threads, invokes done(), and
     * nulls out callable.
     */
    private void finishCompletion() {
        // assert state > COMPLETING;
        for (WaitNode q; (q = waiters) != null;) {
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                for (;;) {
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                        LockSupport.unpark(t);
                    }
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    q.next = null; // unlink to help gc
                    q = next;
                }
                break;
            }
        }

        done();

        callable = null;        // to reduce footprint
    }

2. 异步处理

FutureTask是一种进程等待另一个进程执行结束(同步), 但有时候我们需要的是异步操作,又该如何设计呢?通过注册+回调机制来实现。

FutureAsync

import java.util.ArrayList;
import java.util.List;

public class Main {
    public static void main(String[] args) throws Exception {

        FutureAsync<Integer> future = new FutureAsync<>();

        Thread thread1 = new Thread(()->{
            System.out.println("thread1 before set .....");
            try {
                Thread.sleep(1000);
                future.fireSuccess(1);
            } catch (InterruptedException e) {
               future.fireFailure();
            }

            System.out.println("thread1 after set .....");

        });

        Thread thread2 = new Thread(()->{
            System.out.println("thread2 before get .....");
            future.addListener(new FutureCallback() {
                   @Override
                   public void onSuccess(Object value) {
                       System.out.println(String.valueOf(value));
                   }

                   @Override
                   public void onFailure() {
                       System.out.println("error");
                   }
            });

            System.out.println("thread2 after get .....");
        });

        thread1.start();
        thread2.start();

    }

    static class FutureAsync<T>{
       List<FutureCallback> callbacks = new ArrayList<>();

       void addListener(FutureCallback callback){
           callbacks.add(callback);
       }

       void fireSuccess(T value){
           callbacks.stream().forEach((callback)->{callback.onSuccess(value);});
       }
        void fireFailure(){
            callbacks.stream().forEach((callback)->{callback.onFailure();});
        }
    }

    interface FutureCallback<T>{
        void  onSuccess(T value);
        void onFailure();
    }
}
相关文章
|
4月前
如何识别IT项目失败的预警信号并及时挽救或果断终止
如何识别IT项目失败的预警信号并及时挽救或果断终止
|
7月前
|
搜索推荐 Windows
让你的电脑准时“打个盹”:Win10定时休眠
木头左教你设置Windows 10任务计划程序,让电脑定时休眠,节约能源又呵护健康。首先确保休眠功能开启,然后在任务计划程序创建新任务,命名如“定时休眠”,设置触发时间和操作(cmd.exe /c shutdown -h)。可高级定制,如条件触发或异常处理。跟着步骤实践,解决常见问题,打造个性化自动休眠计划。记得谨慎操作哦!
|
8月前
|
消息中间件 Java
保护性暂停模式
保护性暂停模式
72 0
|
消息中间件 Java
同步模式之保护性暂停
同步模式之保护性暂停
|
消息中间件 Cloud Native Java
线程同步模式之保护性暂停
保护性暂停是一种同步模式,用于保护共享资源的完整性。在多线程或多进程环境中,如果多个线程或进程同时访问共享资源,可能会导致数据不一致或者竞态条件等问题
178 0
MOTOROLA TMCP700 在任何时候暂停或中止数据阶段
MOTOROLA TMCP700 在任何时候暂停或中止数据阶段
107 0
MOTOROLA TMCP700 在任何时候暂停或中止数据阶段
|
Java
Java循环运行时暂停一段时间
Java循环运行时暂停一段时间
125 2
多线程编程之如何暂停与恢复线程
前面的文章多线程编程之停止线程的几种方法介绍了停止线程运行的三种方法:
704 0
多线程编程之如何暂停与恢复线程
|
监控 Java 数据库
一个线程罢工的诡异事件
线上某个应用里业务逻辑没有执行,导致的结果是数据库里的某些数据没有更新。