同步&异步
1. 同步等待
1.1 什么是保护性暂停
In concurrent programming, guarded suspension is a software design pattern for managing operations that require both a lock to be acquired and a precondition to be satisfied before the operation can be executed. The guarded suspension pattern is typically applied to method calls in object-oriented programs, and involves suspending the method call, and the calling thread, until the precondition (acting as a guard) is satisfied.—— wikipedia
保护性暂停模式是让一个线程等待另一个线程的结果。
举个例子说明:现在有两个线程,Thread1负责写入结果,Thread2负责读取结果。
import java.util.concurrent.atomic.AtomicReference;
public class Main {
public static void main(String[] args) throws Exception {
//公共变量
AtomicReference<Integer> response = new AtomicReference<>(null);
Thread thread1 = new Thread(()->{
System.out.println("thread1 before set .....");
response.set(1);
System.out.println("thread1 after set .....");
});
Thread thread2 = new Thread(()->{
System.out.println("thread2 before get .....");
System.out.println(response.get());
System.out.println("thread2 after get .....");
});
thread1.start();
thread2.start();
}
}
按照上面的代码,两个线程独立执行,thread2能否获取response的值全靠缘分。
既然要控制先后顺序,自然就要使用wait和notify进行同步(先忽略join、future、CountdownLatch等同步工具)
import java.util.concurrent.atomic.AtomicReference;
public class Main {
public static void main(String[] args) throws Exception {
//公共变量
AtomicReference<Integer> response = new AtomicReference<>(null);
Thread thread1 = new Thread(()->{
System.out.println("thread1 before set .....");
synchronized(response) {
response.set(1);
response.notify();
}
System.out.println("thread1 after set .....");
});
Thread thread2 = new Thread(()->{
System.out.println("thread2 before get .....");
synchronized(response) {
//如果已有response直接获取,否则阻塞
if (response.get() == null){
try {
response.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(response.get());
}
System.out.println("thread2 after get .....");
});
thread1.start();
thread2.start();
}
}
为了代码的简洁和易用,我们将同步逻辑封装到一个对象中,就实现了一个简易的保护性暂停对象:
import java.util.concurrent.atomic.AtomicReference;
public class Main {
public static void main(String[] args) throws Exception {
GuardObject guardObject = new GuardObject();
Thread thread1 = new Thread(()->{
System.out.println("thread1 before set .....");
guardObject.setObj(1);
System.out.println("thread1 after set .....");
});
Thread thread2 = new Thread(()->{
System.out.println("thread2 before get .....");
try {
System.out.println(guardObject.getObj());
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("thread2 after get .....");
});
thread1.start();
thread2.start();
}
static class GuardObject{
private Integer response;
public synchronized int getObj() throws InterruptedException {
if(response == null){
wait();
}
return response;
}
public synchronized void setObj(Integer value){
response = value;
notify();
}
}
}
JDK中的thread.join、FutureTask都是用保护性暂停的设计模式来实现的。
1.2 FutureTask
![]()
- 入口:
excutor.submit(callable)
- 生成
RunnableFuture
, 加入Worker。线程池会把Callable
对象包装进RunnableFuture
。RunnableFuture
既是Future
又是Runnable
。然后执行该
// java.util.concurrent.AbstractExecutorService
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
- 工作线程调用
FuturerTask
的run方法。
// java.util.concurrent.ThreadPoolExecutor#addWorker
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
- Future调用get, 如果没有返回值(
outcome
变量),会将当前线程封装进WaitNode,并调用UNSAFE.compareAndSwapObject(this, waitersOffset,q.next = waiters, q)
存入waiters(头插法), 然后调用LockSupport.unpark
阻塞当前线程。
//java.util.concurrent.FutureTask
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
/**
* Returns result or throws exception for completed task.
*
* @param s completed state value
*/
@SuppressWarnings("unchecked")
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
/**
* Awaits completion or aborts on interrupt or timeout.
*
* @param timed true if use timed waits
* @param nanos time to wait, if timed
* @return state upon completion
*/
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
else if (q == null)
q = new WaitNode();
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
else
LockSupport.park(this);
}
}
FuturerTask
的run方法执行完毕,通过set(result)
将结果赋值给outcome
, 并激活waiters
中所有的阻塞
//java.util.concurrent.FutureTask
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
/**
* Removes and signals all waiting threads, invokes done(), and
* nulls out callable.
*/
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
done();
callable = null; // to reduce footprint
}
2. 异步处理
FutureTask是一种进程等待另一个进程执行结束(同步), 但有时候我们需要的是异步操作,又该如何设计呢?通过注册+回调机制来实现。
import java.util.ArrayList;
import java.util.List;
public class Main {
public static void main(String[] args) throws Exception {
FutureAsync<Integer> future = new FutureAsync<>();
Thread thread1 = new Thread(()->{
System.out.println("thread1 before set .....");
try {
Thread.sleep(1000);
future.fireSuccess(1);
} catch (InterruptedException e) {
future.fireFailure();
}
System.out.println("thread1 after set .....");
});
Thread thread2 = new Thread(()->{
System.out.println("thread2 before get .....");
future.addListener(new FutureCallback() {
@Override
public void onSuccess(Object value) {
System.out.println(String.valueOf(value));
}
@Override
public void onFailure() {
System.out.println("error");
}
});
System.out.println("thread2 after get .....");
});
thread1.start();
thread2.start();
}
static class FutureAsync<T>{
List<FutureCallback> callbacks = new ArrayList<>();
void addListener(FutureCallback callback){
callbacks.add(callback);
}
void fireSuccess(T value){
callbacks.stream().forEach((callback)->{callback.onSuccess(value);});
}
void fireFailure(){
callbacks.stream().forEach((callback)->{callback.onFailure();});
}
}
interface FutureCallback<T>{
void onSuccess(T value);
void onFailure();
}
}