并发编程(九)Queue

简介: 并发编程(九)Queue

阻塞队列流程剖析

写时复制List CopyOnWriteArrayList

public class CopyOnWriteArrayListTest {
    public static void main(String[] args) throws InterruptedException {
        List<Integer> list = new CopyOnWriteArrayList<>();
        Thread t1 = new Thread(() -> {
            for (int i = 0; i < 100000; i++) {
                list.add(i);
            }
        });
        Thread t2 = new Thread(() -> {
            for (int i = 100000; i < 200000; i++) {
                list.add(i);
            }
        });
        t1.start();
        t2.start();
        t1.join();
        t2.join();
        int size = list.size();
        for (int i = 0; i < size; i++) {
            System.out.println("第" + i + "个元素是:" + list.get(i));
        }
    }
}

延时队列 DelayQueue

public class DelayQueueTest {
    public static void main(String[] args) throws InterruptedException {
//        List<Ticket> list = new ArrayList<>();
//        list.add(new Ticket("票据1", 10000));
//        list.add(new Ticket("票据2", 15000));
//        list.add(new Ticket("票据3", 5000));
//        for(int i =0;i<list.size();){
//            System.out.println(list.get(i).name);
//            list.remove(i);
//        }
        DelayQueue<Ticket> delayQueue = new DelayQueue<>();
        delayQueue.add(new Ticket("票据1", 10000));
        delayQueue.add(new Ticket("票据2", 15000));
        delayQueue.add(new Ticket("票据3", 5000));
        while (delayQueue.size() > 0) {
            System.out.println(delayQueue.take().name);
        }
    }
}
class Ticket implements Delayed {
    public String name;
    /**
     * 过期时间(毫秒)
     */
    public long expireTime;
    public Ticket() {
    }
    public Ticket(String name, long delayTime) {
        this.name = name;
        this.expireTime = System.currentTimeMillis() + delayTime;
    }
    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(expireTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
    }
    @Override
    public int compareTo(Delayed o) {
        return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
    }
}

阻塞队列

public class ArrayBlockingQueueTest {
    public static void main(String[] args) {
        ArrayBlockingQueue<Boll> arrayBlockingQueue = new ArrayBlockingQueue<>(1);
        Thread t1 = new Thread(() -> {
            int i = 0;
            for (; ; i++) {
                try {
                    System.out.println("准备放入编号为" + i + "的球");
                    arrayBlockingQueue.put(new Boll(i));
                    System.out.println("已放入编号为" + i + "的球");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        t1.start();
        Thread t2 = new Thread(() -> {
            for (; ; ) {
                try {
                    System.out.println("准备拿一个球");
                    Boll boll = arrayBlockingQueue.take();
                    System.out.println("拿到了编号为" + boll.number + "的球");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        t2.start();
    }
}
class Boll {
    int number;
    public Boll(int number) {
        this.number = number;
    }
}

自定义阻塞队列

public class MyBlockingQueue<T> {
    int count;
    //拿元素指针
    int head;
    //放元素指针
    int tail;
    Object[] data;
    public MyBlockingQueue(int capacity){
        data = new Object[capacity];
    }
    /**********锁机制***********/
    private Lock lock = new ReentrantLock();
    private Condition notFull = lock.newCondition();
    private Condition notEmpty = lock.newCondition();
    public void put(T t) throws InterruptedException {
        try {
            //可中断锁
            lock.lockInterruptibly();
            if(count == data.length){
                //1.将当前结点加入条件队列
                //2.释放锁唤醒同步队列的结点
                //3.陷入等待
                //4.被唤醒后重新获取锁              
                notFull.await();
            }
            data[tail] = t;
            if(++tail == data.length){
                tail = 0;
            }
            count++;
            //将notEmpty中条件队列的结点转移到lock的同步队列
            notEmpty.signal();
        }finally {
            //唤醒下一个同步队列中的结点
            lock.unlock();
        }
    }
    @SuppressWarnings("unchecked")
    public T take() throws InterruptedException {
        try {
            lock.lockInterruptibly();
            if(count == 0){
                //1.将当前结点加入条件队列
                //2.释放锁唤醒同步队列的结点
                //3.陷入等待
                //4.被唤醒后重新获取锁              
                notEmpty.await();
            }
            T t = (T) data[head];
            data[head] = null;
            if(++head == data.length){
                head = 0;
            }
            count--;
            //将notFull中条件队列的结点转移到lock的同步队列
            notFull.signal();
            return t;
        }finally {
            lock.unlock();
        }
    }
}
目录
相关文章
|
1月前
|
存储 缓存 安全
高并发编程之阻塞队列
高并发编程之阻塞队列
33 1
|
1月前
并发编程之BlockingQueue(阻塞队列)的详细解析
并发编程之BlockingQueue(阻塞队列)的详细解析
14 0
|
1月前
|
安全 Java API
Java并发 - J.U.C并发容器类 list、set、queue
Queue API 阻塞是通过 condition 来实现的,可参考 Java 并发 - Lock 接口 ArrayBlockingQueue 阻塞 LinkedBlockingQueue 阻塞 ArrayQueue 非阻塞 LinkedQueue 非阻塞
|
6月前
|
算法 索引 Python
数据结构与算法-(8)---队列(Queue)
数据结构与算法-(8)---队列(Queue)
41 1
|
Java
高并发编程-Wait Set 多线程的“休息室”
高并发编程-Wait Set 多线程的“休息室”
50 0
|
算法 Java API
Juc并发编程16——Semaphore,Exchanger,Fork/Join框架
Juc并发编程16——Semaphore,Exchanger,Fork/Join框架
|
算法 API 调度
p-queue 源码解读
前言在钉钉表格的开放场景中,需要对用户在沙箱中执行的操作进行合理的调度,我们选择了 p-queue 这个简洁但功能强大的工具库。最近需要对不同类型的任务进行更复杂的并发调度,希望通过学习源码的方式,掌握其中的核心逻辑,确保业务功能的顺利实现。什么是 p-queuep-queue 是一个异步队列管理库。对于需要控制执行速度或数量的异步操作很有用,比如:与 REST API 交互或执行 CPU / 内
380 0
|
数据采集 Python
Python多线程爬虫编程中queue.Queue和queue.SimpleQueue的区别和应用
在Python中,queue模块提供了多种队列类,用于在多线程编程中安全地交换信息。其中,queue.Queue 和queue.SimpleQueue 是两个常用的先进先出(FIFO)的队列类,它们有以下区别和优缺点: queue.Queue 是一个更复杂的队列类实现涉及到多个锁和条件变量,因此可能会影响性能和内存效率。 SimpleQueue 是一个更简单的队列类它只提供了put()和get()两个方法,并且不支持maxsize参数
189 0
Python多线程爬虫编程中queue.Queue和queue.SimpleQueue的区别和应用
|
存储 缓存 Java
JUC并发编程学习(十)-阻塞队列、同步队列
JUC并发编程学习(十)-阻塞队列、同步队列
JUC并发编程学习(十)-阻塞队列、同步队列

热门文章

最新文章