保护性暂停

简介: 如何实现线程同步和异步

同步&异步

1. 同步等待

1.1 什么是保护性暂停

In concurrent programming, guarded suspension is a software design pattern for managing operations that require both a lock to be acquired and a precondition to be satisfied before the operation can be executed. The guarded suspension pattern is typically applied to method calls in object-oriented programs, and involves suspending the method call, and the calling thread, until the precondition (acting as a guard) is satisfied.

—— wikipedia

保护性暂停模式是让一个线程等待另一个线程的结果。

举个例子说明:现在有两个线程,Thread1负责写入结果,Thread2负责读取结果。

同步

import java.util.concurrent.atomic.AtomicReference;

public class Main {
    public static void main(String[] args) throws Exception {
        //公共变量
        AtomicReference<Integer> response = new AtomicReference<>(null);


        Thread thread1 = new Thread(()->{
            System.out.println("thread1 before set .....");
            response.set(1);
            System.out.println("thread1 after set .....");

        });

        Thread thread2 = new Thread(()->{
            System.out.println("thread2 before get .....");
            System.out.println(response.get());
            System.out.println("thread2 after get .....");
        });

        thread1.start();
        thread2.start();

    }


}

按照上面的代码,两个线程独立执行,thread2能否获取response的值全靠缘分。

既然要控制先后顺序,自然就要使用wait和notify进行同步(先忽略join、future、CountdownLatch等同步工具)

import java.util.concurrent.atomic.AtomicReference;

public class Main {
    public static void main(String[] args) throws Exception {
        //公共变量
        AtomicReference<Integer> response = new AtomicReference<>(null);

        Thread thread1 = new Thread(()->{
                System.out.println("thread1 before set .....");
                synchronized(response) {
                    response.set(1);
                    response.notify();
                }

            System.out.println("thread1 after set .....");

        });

        Thread thread2 = new Thread(()->{
                System.out.println("thread2 before get .....");
                synchronized(response) {
                    //如果已有response直接获取,否则阻塞
                    if (response.get() == null){
                        try {
                            response.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }

                    System.out.println(response.get());

                }

                System.out.println("thread2 after get .....");
        });

        thread1.start();
        thread2.start();

    }


}

为了代码的简洁和易用,我们将同步逻辑封装到一个对象中,就实现了一个简易的保护性暂停对象:

保护性暂停

import java.util.concurrent.atomic.AtomicReference;

public class Main {
    public static void main(String[] args) throws Exception {

        GuardObject guardObject = new GuardObject();

        Thread thread1 = new Thread(()->{
            System.out.println("thread1 before set .....");
            guardObject.setObj(1);
            System.out.println("thread1 after set .....");

        });

        Thread thread2 = new Thread(()->{
            System.out.println("thread2 before get .....");
            try {
                System.out.println(guardObject.getObj());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("thread2 after get .....");
        });

        thread1.start();
        thread2.start();

    }

   static class GuardObject{
        private Integer response;

        public synchronized int getObj() throws InterruptedException {
            if(response == null){
                wait();
            }
            return response;
        }

        public synchronized void setObj(Integer value){
            response = value;
            notify();
        }
    }
}

JDK中的thread.join、FutureTask都是用保护性暂停的设计模式来实现的。

1.2 FutureTask

![]()

  1. 入口: excutor.submit(callable)

excutor

  1. 生成RunnableFuture, 加入Worker。线程池会把Callable对象包装进 RunnableFutureRunnableFuture既是Future又是Runnable。然后执行该
// java.util.concurrent.AbstractExecutorService

/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException       {@inheritDoc}
*/
public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
} 


protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    return new FutureTask<T>(callable);
}
  1. 工作线程调用 FuturerTask的run方法。
// java.util.concurrent.ThreadPoolExecutor#addWorker   
    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }
  1. Future调用get, 如果没有返回值(outcome变量),会将当前线程封装进WaitNode,并调用UNSAFE.compareAndSwapObject(this, waitersOffset,q.next = waiters, q)存入waiters(头插法), 然后调用LockSupport.unpark阻塞当前线程。
//java.util.concurrent.FutureTask
    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }


    /**
     * Returns result or throws exception for completed task.
     *
     * @param s completed state value
     */
    @SuppressWarnings("unchecked")
    private V report(int s) throws ExecutionException {
        Object x = outcome;
        if (s == NORMAL)
            return (V)x;
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);
    }


    /**
     * Awaits completion or aborts on interrupt or timeout.
     *
     * @param timed true if use timed waits
     * @param nanos time to wait, if timed
     * @return state upon completion
     */
    private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
            if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }

            int s = state;
            if (s > COMPLETING) {
                if (q != null)
                    q.thread = null;
                return s;
            }
            else if (s == COMPLETING) // cannot time out yet
                Thread.yield();
            else if (q == null)
                q = new WaitNode();
            else if (!queued)
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
            else if (timed) {
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                }
                LockSupport.parkNanos(this, nanos);
            }
            else
                LockSupport.park(this);
        }
    }
  1. FuturerTask的run方法执行完毕,通过set(result)将结果赋值给outcome, 并激活waiters中所有的阻塞
//java.util.concurrent.FutureTask   
public void run() {
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }


 protected void set(V v) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = v;
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
            finishCompletion();
        }
 }

    /**
     * Removes and signals all waiting threads, invokes done(), and
     * nulls out callable.
     */
    private void finishCompletion() {
        // assert state > COMPLETING;
        for (WaitNode q; (q = waiters) != null;) {
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                for (;;) {
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                        LockSupport.unpark(t);
                    }
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    q.next = null; // unlink to help gc
                    q = next;
                }
                break;
            }
        }

        done();

        callable = null;        // to reduce footprint
    }

2. 异步处理

FutureTask是一种进程等待另一个进程执行结束(同步), 但有时候我们需要的是异步操作,又该如何设计呢?通过注册+回调机制来实现。

FutureAsync

import java.util.ArrayList;
import java.util.List;

public class Main {
    public static void main(String[] args) throws Exception {

        FutureAsync<Integer> future = new FutureAsync<>();

        Thread thread1 = new Thread(()->{
            System.out.println("thread1 before set .....");
            try {
                Thread.sleep(1000);
                future.fireSuccess(1);
            } catch (InterruptedException e) {
               future.fireFailure();
            }

            System.out.println("thread1 after set .....");

        });

        Thread thread2 = new Thread(()->{
            System.out.println("thread2 before get .....");
            future.addListener(new FutureCallback() {
                   @Override
                   public void onSuccess(Object value) {
                       System.out.println(String.valueOf(value));
                   }

                   @Override
                   public void onFailure() {
                       System.out.println("error");
                   }
            });

            System.out.println("thread2 after get .....");
        });

        thread1.start();
        thread2.start();

    }

    static class FutureAsync<T>{
       List<FutureCallback> callbacks = new ArrayList<>();

       void addListener(FutureCallback callback){
           callbacks.add(callback);
       }

       void fireSuccess(T value){
           callbacks.stream().forEach((callback)->{callback.onSuccess(value);});
       }
        void fireFailure(){
            callbacks.stream().forEach((callback)->{callback.onFailure();});
        }
    }

    interface FutureCallback<T>{
        void  onSuccess(T value);
        void onFailure();
    }
}
相关文章
|
2月前
|
监控 网络协议 iOS开发
程序退到后台的时候,所有线程被挂起,系统回收所有的socket资源问题及解决方案
程序退到后台的时候,所有线程被挂起,系统回收所有的socket资源问题及解决方案
37 0
|
2月前
|
消息中间件 Java
保护性暂停模式
保护性暂停模式
25 0
|
12月前
|
消息中间件 Java
同步模式之保护性暂停
同步模式之保护性暂停
|
2月前
在程序运行过程中,线程的状态是什么?进来看看就通透了
在程序运行过程中,线程的状态是什么?进来看看就通透了
38 0
|
消息中间件 Cloud Native Java
线程同步模式之保护性暂停
保护性暂停是一种同步模式,用于保护共享资源的完整性。在多线程或多进程环境中,如果多个线程或进程同时访问共享资源,可能会导致数据不一致或者竞态条件等问题
131 0
|
存储 Linux 调度
Linux线程控制
线程控制的相关操作:线程创建、线程终止、线程等待和线程分离。
Linux线程控制
|
负载均衡 算法 Java
记一次线上频繁FGC的事件和解决方式
1.大量的请求,调用的地方要注意是否会导致内存的大量消耗,尽可能使用池化技术,单例等,减少创建,销毁的系统开销;2.CMS 的几个缺点,可以参考《深入java虚拟机》,对CPU占用会比较高,无法处理浮动垃圾,还有就是CMS使用的是标记-清除算法,会导致大量的空间碎片,碎片过多的话,导致分配大对象很困难,所以不得不进行FGC,也可能是这个原因导致了本文说的一直FGC的问题。
340 0
记一次线上频繁FGC的事件和解决方式
多线程编程之如何暂停与恢复线程
前面的文章多线程编程之停止线程的几种方法介绍了停止线程运行的三种方法:
628 0
多线程编程之如何暂停与恢复线程
|
监控 Java 数据库
一个线程罢工的诡异事件
线上某个应用里业务逻辑没有执行,导致的结果是数据库里的某些数据没有更新。