信号量

简介: 信号量

概念

较为官方的说法:信号量是一个整型变量来累计唤醒次数,供以后使用。

信号量主要有两种操作:P/V(down和up)或sleep和wakeup
两种操作的主要作用:

  • P/down/sleep对信号值减1
  • V/up/wakeup对信号值加1

适用场景

单核处理器和多核处理器

主要组成

  • 一个整型变量
  • 一个等待进程列表
  • 两个原子方法(PV)

典型实现

Java CountDownLatch

java中的CountDownLatch支持设置一个整数作为信号量计数,countDown()函数作为V方法,await等待信号量计数为0。

Java中的等待进程队列在哪里?
Java中有线程等待队列,是AbstractQueuedSynchronizer这个类进行了封装,如果未满足自定义的await条件,那么当前线程会被加入到等待线程队列中,并把当前线程设置为不可调度的状态,直到countDown函数被调用会重新唤醒不可调度状态的线程,线程被唤醒后继续检查是否条件是否满足,如果不满足继续设置为不可调度的状态,往复循环直到条件满足。

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;

public final class CountDownLatch {

    private final Sync sync;

    public CountDownLatch(int count) {
        if(count <= 0) {
            throw new IllegalArgumentException("count < =0");
        }

        this.sync = new Sync(count);
    }

    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    public boolean await(long timeout, TimeUnit timeUnit) throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, timeUnit.toNanos(timeout));
    }

    public void countDown() {
        sync.releaseShared(1);
    }

    public long getCount() {
        return sync.getCount();
    }

    public void reset() {
        sync.reset();
    }

    @Override
    public String toString() {
        return "CountDownLatchReset{" +
                "sync=" + sync.getCount() +
                '}';
    }

    private static final class Sync extends AbstractQueuedSynchronizer {
        private final int startCount;

        public Sync(int count) {
            this.startCount = count;
        }

        public long getCount() {
            return getState();
        }

        @Override
        protected int tryAcquireShared(int arg) {
            return getState() == 0 ? 1 : -1;
        }

        @Override
        protected boolean tryReleaseShared(int arg) {
            for(;;) {
                int c = getState();
                if(c == 0) {
                    return false;
                }

                int nextC = c -1;
                if(compareAndSetState(c, nextC)) {
                    return nextC == 0;
                }
            }
        }

        protected void reset() {
            setState(startCount);
        }
    }


}

典型应用场景

1、生产者-消费者问题

2、栅栏

public class BatchTaskBarrier<T> {
    // 信号量作为同步工具
    private final CountDownLatch barrier;
    // 批量任务执行等待超时时间
    private final long timeoutMillis;
    // 批量任务执行结果
    private final List<T> resultList;
    // 批量任务的Future对象
    private final List<Future<Object>> futureList;
    // 任务执行线程池
    private final ExecutorService executor;


    public BatchTaskBarrier(int count, long timeoutMillis, ExecutorService executor) {
        this.barrier = new CountDownLatch(count);
        this.timeoutMillis = timeoutMillis;
        resultList = new ArrayList<>(count);
        futureList = new ArrayList<>(count);
        this.executor = executor;
    }

    /**
     * 执行批量任务
     * @param callable 业务回调接口
     */
    public void executeTask(Callable<T> callable) {
        BatchTask task = new BatchTask(barrier, callable);
        Future<Object> taskFuture = executor.submit(task);
        futureList.add(taskFuture);
    }

    /**
     * 等待直到超时
     * @return 批量任务结果
     */
    public List<T> await() {
        try {
            barrier.await(timeoutMillis, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        for (Future<Object> taskFuture : futureList) {
            if (!taskFuture.isDone()) {
                taskFuture.cancel(true);
            }

            T result = null;
            try {
                result = (T) taskFuture.get(timeoutMillis, TimeUnit.MILLISECONDS);
                resultList.add(result);
            } catch (InterruptedException e) {
                log.error("", e);
            } catch (ExecutionException e) {
                log.error("", e);
            } catch (TimeoutException e) {
                log.error("", e);
            }

        }

        return resultList;
    }
}
目录
相关文章
|
8月前
|
程序员
信号量和管程
信号量和管程
40 0
|
安全
理解信号量
理解信号量
|
存储
信号量(下)
信号量(下)
44 0
|
算法
信号量(上)
信号量(上)
39 0
|
机器学习/深度学习 C语言
信号量
信号量
95 0
信号量的使用
信号量的使用
206 0
Semaphore信号量
Semaphore 可以用来限制或管理数量有限资源的使用情况 - 信号量的作用是用来维护一个“许可证”,的计数,线程可以获取 许可证,那信号量剩余许可证就减一,线程也可以是否一个许可证,那剩余的许可证就加一,当信号量拥有的许可证为0时,那么下一个线程想获得许可证,就要进行等待,直到另外线程释放许可证
287 0
Semaphore信号量
信号量Semaphore的使用
允许多个线程同时访问:信号量(Semaphore) 信号量为多线程协作提供了更为强大的控制方法。广义上说,信号量是对锁的扩展。无论是内部锁synchronized还是重入锁ReentrantLock,一次都只允许一个线程访问一个资源,而信号量指定多个线程访问同一个资源。
1338 0
|
调度
详解线程的信号量和互斥锁
  前言:有个问题感觉一直会被问道:进程和线程的区别?也许之前我会回答: 进程:资源分配最小单位 线程:轻量级的进程 是系统调度的最小单位 由进程创建 多个线程共享进程的资源   但是现在我觉得一个比喻回答的更好:程序就像静止的火车,进程是运行的火车,线程是运行火车的每节车厢。
1120 0