概念
较为官方的说法:信号量是一个整型变量来累计唤醒次数,供以后使用。
信号量主要有两种操作:P/V(down和up)或sleep和wakeup
两种操作的主要作用:
- P/down/sleep对信号值减1
- V/up/wakeup对信号值加1
适用场景
单核处理器和多核处理器
主要组成
- 一个整型变量
- 一个等待进程列表
- 两个原子方法(PV)
典型实现
Java CountDownLatch
java中的CountDownLatch支持设置一个整数作为信号量计数,countDown()函数作为V方法,await等待信号量计数为0。
Java中的等待进程队列在哪里?
Java中有线程等待队列,是AbstractQueuedSynchronizer这个类进行了封装,如果未满足自定义的await条件,那么当前线程会被加入到等待线程队列中,并把当前线程设置为不可调度的状态,直到countDown函数被调用会重新唤醒不可调度状态的线程,线程被唤醒后继续检查是否条件是否满足,如果不满足继续设置为不可调度的状态,往复循环直到条件满足。
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
public final class CountDownLatch {
private final Sync sync;
public CountDownLatch(int count) {
if(count <= 0) {
throw new IllegalArgumentException("count < =0");
}
this.sync = new Sync(count);
}
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public boolean await(long timeout, TimeUnit timeUnit) throws InterruptedException {
return sync.tryAcquireSharedNanos(1, timeUnit.toNanos(timeout));
}
public void countDown() {
sync.releaseShared(1);
}
public long getCount() {
return sync.getCount();
}
public void reset() {
sync.reset();
}
@Override
public String toString() {
return "CountDownLatchReset{" +
"sync=" + sync.getCount() +
'}';
}
private static final class Sync extends AbstractQueuedSynchronizer {
private final int startCount;
public Sync(int count) {
this.startCount = count;
}
public long getCount() {
return getState();
}
@Override
protected int tryAcquireShared(int arg) {
return getState() == 0 ? 1 : -1;
}
@Override
protected boolean tryReleaseShared(int arg) {
for(;;) {
int c = getState();
if(c == 0) {
return false;
}
int nextC = c -1;
if(compareAndSetState(c, nextC)) {
return nextC == 0;
}
}
}
protected void reset() {
setState(startCount);
}
}
}
典型应用场景
1、生产者-消费者问题
2、栅栏
public class BatchTaskBarrier<T> {
// 信号量作为同步工具
private final CountDownLatch barrier;
// 批量任务执行等待超时时间
private final long timeoutMillis;
// 批量任务执行结果
private final List<T> resultList;
// 批量任务的Future对象
private final List<Future<Object>> futureList;
// 任务执行线程池
private final ExecutorService executor;
public BatchTaskBarrier(int count, long timeoutMillis, ExecutorService executor) {
this.barrier = new CountDownLatch(count);
this.timeoutMillis = timeoutMillis;
resultList = new ArrayList<>(count);
futureList = new ArrayList<>(count);
this.executor = executor;
}
/**
* 执行批量任务
* @param callable 业务回调接口
*/
public void executeTask(Callable<T> callable) {
BatchTask task = new BatchTask(barrier, callable);
Future<Object> taskFuture = executor.submit(task);
futureList.add(taskFuture);
}
/**
* 等待直到超时
* @return 批量任务结果
*/
public List<T> await() {
try {
barrier.await(timeoutMillis, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
for (Future<Object> taskFuture : futureList) {
if (!taskFuture.isDone()) {
taskFuture.cancel(true);
}
T result = null;
try {
result = (T) taskFuture.get(timeoutMillis, TimeUnit.MILLISECONDS);
resultList.add(result);
} catch (InterruptedException e) {
log.error("", e);
} catch (ExecutionException e) {
log.error("", e);
} catch (TimeoutException e) {
log.error("", e);
}
}
return resultList;
}
}