并发编程(九)Queue

简介: 并发编程(九)Queue

阻塞队列流程剖析

写时复制List CopyOnWriteArrayList

public class CopyOnWriteArrayListTest {
    public static void main(String[] args) throws InterruptedException {
        List<Integer> list = new CopyOnWriteArrayList<>();
        Thread t1 = new Thread(() -> {
            for (int i = 0; i < 100000; i++) {
                list.add(i);
            }
        });
        Thread t2 = new Thread(() -> {
            for (int i = 100000; i < 200000; i++) {
                list.add(i);
            }
        });
        t1.start();
        t2.start();
        t1.join();
        t2.join();
        int size = list.size();
        for (int i = 0; i < size; i++) {
            System.out.println("第" + i + "个元素是:" + list.get(i));
        }
    }
}

延时队列 DelayQueue

public class DelayQueueTest {
    public static void main(String[] args) throws InterruptedException {
//        List<Ticket> list = new ArrayList<>();
//        list.add(new Ticket("票据1", 10000));
//        list.add(new Ticket("票据2", 15000));
//        list.add(new Ticket("票据3", 5000));
//        for(int i =0;i<list.size();){
//            System.out.println(list.get(i).name);
//            list.remove(i);
//        }
        DelayQueue<Ticket> delayQueue = new DelayQueue<>();
        delayQueue.add(new Ticket("票据1", 10000));
        delayQueue.add(new Ticket("票据2", 15000));
        delayQueue.add(new Ticket("票据3", 5000));
        while (delayQueue.size() > 0) {
            System.out.println(delayQueue.take().name);
        }
    }
}
class Ticket implements Delayed {
    public String name;
    /**
     * 过期时间(毫秒)
     */
    public long expireTime;
    public Ticket() {
    }
    public Ticket(String name, long delayTime) {
        this.name = name;
        this.expireTime = System.currentTimeMillis() + delayTime;
    }
    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(expireTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
    }
    @Override
    public int compareTo(Delayed o) {
        return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
    }
}

阻塞队列

public class ArrayBlockingQueueTest {
    public static void main(String[] args) {
        ArrayBlockingQueue<Boll> arrayBlockingQueue = new ArrayBlockingQueue<>(1);
        Thread t1 = new Thread(() -> {
            int i = 0;
            for (; ; i++) {
                try {
                    System.out.println("准备放入编号为" + i + "的球");
                    arrayBlockingQueue.put(new Boll(i));
                    System.out.println("已放入编号为" + i + "的球");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        t1.start();
        Thread t2 = new Thread(() -> {
            for (; ; ) {
                try {
                    System.out.println("准备拿一个球");
                    Boll boll = arrayBlockingQueue.take();
                    System.out.println("拿到了编号为" + boll.number + "的球");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        t2.start();
    }
}
class Boll {
    int number;
    public Boll(int number) {
        this.number = number;
    }
}

自定义阻塞队列

public class MyBlockingQueue<T> {
    int count;
    //拿元素指针
    int head;
    //放元素指针
    int tail;
    Object[] data;
    public MyBlockingQueue(int capacity){
        data = new Object[capacity];
    }
    /**********锁机制***********/
    private Lock lock = new ReentrantLock();
    private Condition notFull = lock.newCondition();
    private Condition notEmpty = lock.newCondition();
    public void put(T t) throws InterruptedException {
        try {
            //可中断锁
            lock.lockInterruptibly();
            if(count == data.length){
                //1.将当前结点加入条件队列
                //2.释放锁唤醒同步队列的结点
                //3.陷入等待
                //4.被唤醒后重新获取锁              
                notFull.await();
            }
            data[tail] = t;
            if(++tail == data.length){
                tail = 0;
            }
            count++;
            //将notEmpty中条件队列的结点转移到lock的同步队列
            notEmpty.signal();
        }finally {
            //唤醒下一个同步队列中的结点
            lock.unlock();
        }
    }
    @SuppressWarnings("unchecked")
    public T take() throws InterruptedException {
        try {
            lock.lockInterruptibly();
            if(count == 0){
                //1.将当前结点加入条件队列
                //2.释放锁唤醒同步队列的结点
                //3.陷入等待
                //4.被唤醒后重新获取锁              
                notEmpty.await();
            }
            T t = (T) data[head];
            data[head] = null;
            if(++head == data.length){
                head = 0;
            }
            count--;
            //将notFull中条件队列的结点转移到lock的同步队列
            notFull.signal();
            return t;
        }finally {
            lock.unlock();
        }
    }
}
目录
相关文章
|
7月前
|
存储 缓存 安全
高并发编程之阻塞队列
高并发编程之阻塞队列
62 1
|
2月前
|
Java 数据库 数据安全/隐私保护
关于并发编程,你必须需要掌握的Future机制!
关于并发编程,你必须需要掌握的Future机制!
|
2月前
|
存储 安全 API
源码解密协程队列和线程队列的实现原理(二)
源码解密协程队列和线程队列的实现原理(二)
37 1
|
7月前
并发编程之BlockingQueue(阻塞队列)的详细解析
并发编程之BlockingQueue(阻塞队列)的详细解析
29 0
|
7月前
|
消息中间件 安全 Java
多线程案例-阻塞队列
多线程案例-阻塞队列
204 0
|
算法 索引 Python
数据结构与算法-(8)---队列(Queue)
数据结构与算法-(8)---队列(Queue)
71 1
|
数据采集 Python
Python多线程爬虫编程中queue.Queue和queue.SimpleQueue的区别和应用
在Python中,queue模块提供了多种队列类,用于在多线程编程中安全地交换信息。其中,queue.Queue 和queue.SimpleQueue 是两个常用的先进先出(FIFO)的队列类,它们有以下区别和优缺点: queue.Queue 是一个更复杂的队列类实现涉及到多个锁和条件变量,因此可能会影响性能和内存效率。 SimpleQueue 是一个更简单的队列类它只提供了put()和get()两个方法,并且不支持maxsize参数
221 0
Python多线程爬虫编程中queue.Queue和queue.SimpleQueue的区别和应用
|
存储 Java 容器
并发编程-23J.U.C组件拓展之阻塞队列BlockingQueue 和 线程池
并发编程-23J.U.C组件拓展之阻塞队列BlockingQueue 和 线程池
70 0
|
存储 缓存 Java
JUC并发编程学习(十)-阻塞队列、同步队列
JUC并发编程学习(十)-阻塞队列、同步队列
JUC并发编程学习(十)-阻塞队列、同步队列
|
存储 缓存
并发编程之BlockingQueue队列
BlockingQueue即阻塞队列,从阻塞这个词可以看出,在某些情况下对阻塞队列的访问可能会造成阻塞。被阻塞的情况主要有如下两种:
233 0