【并发编程】同步容器与并发容器2

简介: 【并发编程】同步容器与并发容器

(2)ConcurrentQueue


与ConcurrentHashMap相同,ConcurrentQueue也是通过同样的方式来提高并发性能的。

同步容器中提到过火车票问题:

有N张火车票,每张车票都有一个编号,同时有10个窗口对外售票。

使用ConcurrentQueue进一步提高并发性:

public class Demo4 {
    private static Queue<String> queues = new ConcurrentLinkedDeque<>();
    static {
        for (int i = 0; i < 10000; i++) {
            queues.add("票编号:"+i);
        }
    }
    public static void main(String[] args) throws InterruptedException {
        long start = System.currentTimeMillis();
        for (int i = 0; i < 10; i++) {
            new Thread(()->{
                while(true){
                    String s = queues.poll();
                    if (s ==null) break;
                    else System.out.println("销售了---"+s);
                }
            }).start();
        }
        long end = System.currentTimeMillis();
        Thread.sleep(3000L);
        System.out.println("总耗时:"+(end-start)+"ms");
    }
}

d8e519514a844e7e9987ea7ee5bebcb8.jpg


常用的API

Queue<String> strings = new ConcurrentLinkedQueue<String>();
strings.offer(元素) //相当于add,放进队列
strings.size() //获取当前队列的元素个数
strings.poll() //取出并移除
strings.peek() //取出不会移除,相当于get();

(3)CopyOnWriteArrayList

写时复制容器,即copy-on-write,多线程环境下,写时效率低,读时效率高,适合写少读多的环境。

public class Demo5 implements Runnable{
    private static List<String> lists = new ArrayList<>();
    //private static List<String> lists = new Vector<>();
    //private static List<String> lists = new CopyOnWriteArrayList<>();
    private Random random = new Random();
    @Override
    public void run() {
        for (int i = 0; i < 10000; i++) {
            lists.add(random.nextInt()+"");
        }
    }
    public static void main(String[] args) throws InterruptedException {
        long start = System.currentTimeMillis();
        System.out.println("线程开始操作");
        for (int i = 0; i < 10; i++) {
            new Thread(new Demo5()).start();
        }
        for (int i = 0; i < 100; i++) {
            int finalI = i;
            new Thread(()->{
                for (int j = 0; j < lists.size(); j++) {
                    lists.get(finalI);
                }
            }).start();
        }
        long end = System.currentTimeMillis();
        Thread.sleep(6000L);
        System.out.println("耗时:"+(end-start)+"ms");
    }
}

运行结果:

ArrayList:报错:Exception in thread "Thread-1" java.lang.ArrayIndexOutOfBoundsException: 244
Vector:117ms
CopyOnWriteArrayList:222ms

从JDK5开始Java并发包里面提供了两个使用CopyOnWrite机制实现的并发容器,它们是CopyOnWriteArrayList和CopyOnWriteArraySet。

当我们往一个容器添加元素的时候,不直接往当前容器添加,而是先将当前容器进行Copy,复制出一个新的容器,然后向新的容器添加元素,添加完成元素后,再将原来的容器的引用指向新的容器。这样做的好处就是我们可以对CopyOnWrite容器进行并发的读,而不需要加锁,因为在当前读的容器不会添加任何元素。所以CopyOnWrite容器是一种读写分离的思想,读和写写对应不同的容器。

(4)BlockingQueue

在新增的Concurrent包中,BlockingQueue很好的解决了多线程中,如何高效安全“传输”数据问题。通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利。

阻塞对列,顾名思义,首先他是一个队列,一个队列在数据结构当中起到的作用大致如下:

fada38996a404a26abaadc5296d6b707.jpg

队列可以使得数据由队列的一端输入,从另一端输出。

先进先出(FIFO):先插入的队列的元素也是最先出队列,这种队列体现了一种公平性。

后进先出(LIFO):后插入队列的元素最先出队列,这种队列优先处理最近发生的事件。

f32f713659d44ebbbcd566716d3b9d37.jpg

(5)LinkedBlockingQueue

这中并发容器,会自动实现阻塞式的生产者/消费者模式。使用队列解耦合,在实现异步事物的时候很有用。

案例

public class Demo6 {
    //实例化时指定容器容量
    private static LinkedBlockingQueue<String> linkedBlockingQueue = new LinkedBlockingQueue<>(1000);
    public static void main(String[] args) {
        for (int i = 0; i < 100; i++) {
            int finalI = i;
            new Thread(() -> {
                for (int j = 0; j < 1000; j++) {
                    try {
                        //向对列中添加元素,如果对列满了 就等待1s在进行添加
                        boolean b = linkedBlockingQueue.offer(finalI + "", 1, TimeUnit.SECONDS);
                        if (b) {
                            System.out.println(finalI + "队列添加成功");
                        } else {
                            System.out.println(finalI + "队列添加失败,进入等待");
                        }
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }).start();
        }
        for (int i = 0; i < 10; i++) {
            int finalI = i;
            new Thread(() -> {
                for (int j = 0; j < 1000; j++) {
                    try {
                        //消费队列,如果为空就等待消费
                        String take = linkedBlockingQueue.take();
                        System.out.println("消费队列元素:" + take);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }).start();
        }
    }
}

3c6fcc6b51df4290b7ecdf9bb61ee0db.jpg

常用API

//实例化时指定容器容量
private static LinkedBlockingQueue<String> linkedBlockingQueue = new LinkedBlockingQueue<>(1000);
linkedBlockingQueue.add(元素) //如果队列满了,再次添加就会抛出异常:java.lang.IllegalStateException: Queue full
linkedBlockingQueue.offer(元素,时间,时间单位) //队列满了,等待时间后,再次添加,失败返回false
linkedBlockingQueue.offer(元素) //队列满了,添加失败返回false,成功返回true
linkedBlockingQueue.put(元素) //加入队列,如果满了就等待阻塞
linkedBlockingQueue.take() //取出队列中的元素,如果空了,就会等待阻塞

(6)ArrayBlockingQueue


ArrayBlockingQueue和LinkedBlockingQueue对象的方法都是一样的,用法是一样的。

二者的区别:

LinkedBlockingQueue是一个单向链表实现的阻塞队列,在链表一头加入元素,如果队列满了,就会阻塞,另一头取出元素,如果队列为空,就会阻塞。

LinkedBlockingQueue内部使用ReetrantLock实现插入锁(putLock)和取出锁(takeLock)。

ArrayBlockingQueue基于数组实现,成为有界队列,LinkedBlockingQueue认为是无界队列。当然LinkedBlockingQueue也可以指定队列容量。

2684a13e4a374368b207af25a35d65e7.jpg


(7)DelayQueue

DelayQueue也是一个BlockingQueue,用于放置实现了Delayed接口的对象,只能是实现了Delayed接口的对象,其中对象只能在其到期时才能从队列中取走。

Delayed扩展了Comparable接口,比较的基准为延时的时间,Delayed接口实现类getDelay()返回值为固定值(final),DelayedQueue内部是使用PriorityQueue实现的;即 (DelayQueue = BlockingQueue + PriorityQueue + Delayed)

可以说,DelayQueue是一个使用优先队列(PriorityQueue)实现的BlockingQueue,优先队列的比较基准是时间。是一个无界的BlockingQueue,用于放置实现了Delayed接口的对象,其中的对象只能在其到期时能从队列中取走。这种队列是有序的,及队头对象的延迟到期时间最长。但是要注意不能将null元素放置到队列中。

Delayed,一种混合风格的接口,用来标记那些应该在给定延迟时间之后执行的对象。此接口的实现类必须重写一个compareTo()方法,该方法提供于此接口的getDelay()方法一致的排序。

DelayQueue存储的对象是实现了Delayed接口的对象,在这个对象中,需要重写compareTo()和getDelay()方法。


自定义MyTask类实现Delayed


public class MyTask implements Delayed {
    private long time;
    private String name;
    private long start = System.currentTimeMillis();
    public MyTask(String name,long time) {
        this.time = time;
        this.name = name;
    }
    @Override
    public long getDelay(TimeUnit unit) {
        return (start+time) - System.currentTimeMillis();
    }
    @Override
    public int compareTo(Delayed o) {
        return (int)(this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
    }
    @Override
    public String toString() {
        return "MyTask{" +
                "time=" + time +
                ", name='" + name + '\'' +
                '}';
    }
}

测试main

public class Main {
    public static void main(String[] args) throws InterruptedException {
        DelayQueue <MyTask> myTasks = new DelayQueue<>();
        new Thread(()->{
            myTasks.offer(new MyTask("task1",10000));
            myTasks.offer(new MyTask("task2",4000));
            myTasks.offer(new MyTask("task3",4200));
            myTasks.offer(new MyTask("task4",6200));
            myTasks.offer(new MyTask("task5",9800));
        }).start();
        long start = System.currentTimeMillis();
        Thread.sleep(2000);
        System.out.println("队列中存放数据:");
        for (MyTask myTask : myTasks) {
            System.out.println(myTask);
        }
        System.out.println();
        System.out.println("队列中取出数据:");
        while(true){
            MyTask myTask = myTasks.take();
            System.out.println(myTask+":取出耗时:"+(System.currentTimeMillis()-start)+"ms");
        }
    }
}

85b6343ba46443dab5f99e889899bcad.jpg


DelayQueue能做什么

淘宝订单业务:下单之后如果30分钟之内没有付款就自动取消订单。

饿了么定餐通知:下单成功后60s后给用户发短信。

关闭空闲连接:服务器中,很多客户端的连接,空闲一段时间之后需要关闭。

缓存:缓存中的对象,超过了空闲时间,需要从缓存中移出。

任务超时处理:在网络协议滑动窗口请求应答交互时,处理超时未响应的请求等。

(8)LinkedTransferQueue

TransferQueue是一个继承了BlockingQueue的接口,并且增加了若干新方法。

LinkedTransferQueue是TransferQueue接口的实现类,其定义一个无界的队列,具有先进先出(FIFO)的特性。

TransferQueue接口含有下面几个重要方法:

transfer(E e)

若当前存在一个正在等待获取的消费者线程,即立刻移交之,否则,会插入当前元素e到队列尾部,并且等待进入阻塞状态,到有消费者线程取走该元素。

tryTransfer(E e)

若当前存在一个正在等待获取的消费者线程(使用take()或者poll()函数),使用该方法会即刻转移\传输对象元素e;如不存在,则返回false,并且不进入队列。这是一个不阻塞的操作。

tryTransfer(E e,long timeout,TimeUnit nuit)

若当前存在一个正在等待的消费者线程,会立即传输给它,否则将插入元素e到队列尾部,并且等待被消费者线程获取消费掉,若在指定的时间内元素e无法被消费者线程获取,则返回false,同时该元素被移除。

hasWaitingConsumer()

判断是否由消费者线程。

getWaitingConsumerCount()

获取所有等待获取元素的消费者线程数量。

size()

因为队列的异步特性,检测当前队列的元素个数需要逐一迭代,无法保证原子性,可能会得到一个不太准确的结果,尤其是在遍历时有可能队列发生更改。

消费者生产者案例

Producer

public class Producer implements Runnable {
    private final TransferQueue<String> queue;
    //构造传入LinkedTransferQueue队列
    public Producer(TransferQueue<String> queue) {
        this.queue = queue;
    }
    @Override
    public void run() {
        try {
            //生产者循环
            while (true){
                //判断当前队列是否还有消费者,有的话就生产产品交由消费者线程
                if(queue.hasWaitingConsumer()) queue.transfer(produce());
                //休眠1s
                TimeUnit.SECONDS.sleep(1);
            }
        }catch (Exception e){
            e.printStackTrace();
        }
    }
    //生产产品方法
    private String produce(){
        return "Your lucky number:"+(new Random().nextInt(100));
    }
}

Consumer

public class Consumer implements Runnable{
    private final TransferQueue<String> queue;
    public Consumer(TransferQueue<String> queue) {
        this.queue = queue;
    }
    @Override
    public void run() {
        try{
            //消费者线程取出队列元素
            System.out.println("Consumer--"+Thread.currentThread().getName()+"--"+queue.take());
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

main测试

public class Main {
    public static void main(String[] args) {
        TransferQueue<String> queue = new LinkedTransferQueue<>();
        Thread producer = new Thread(new Producer(queue));
        producer.setDaemon(true);
        producer.start();
        for (int i = 0; i < 20; i++) {
            Thread consumer = new Thread(new Consumer(queue));
            consumer.setDaemon(true);
            consumer.start();
            try{
                Thread.sleep(1000L);
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    }
}

163ec55874a341cd8708cb3505f64f18.jpg

(9)SynchronousQueue


SynchronousQueue也是一种BlockingQueue,是一种无缓冲的等待队列。所以在某次添加元素后必须等待其他线程取走后才能继续添加,可以认为SynchronousQueue是一个缓存值为0的阻塞队列(也可以是1),它的isEmpty()方法永远返回时true,remainingCapacity()方法永远返回时0。

remove和removeAll方法返回永远是false,iterator()方法永远返回空,peek()方法永远返回null。

使用put()方法时,会一直阻塞在这里,等待被消费。

案例代码

public class SynchronousQueueDemo {
    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<String> strings = new SynchronousQueue<>();
        for (int i = 0; i < 2; i++) {
            new Thread(()->{
                try{
                    System.out.println("取出数据:"+strings.take());
                }catch (Exception e){
                    e.printStackTrace();
                }
            }).start();
        }
        strings.put("aaa");
        strings.put("bbb");
    }
}

eec47f359a8a4b11a340515cf2652dda.jpg

0d724e8c474340abac9cb1662b39193f.jpg

相关文章
|
4月前
|
分布式计算 DataWorks MaxCompute
DataWorks中odps到容器部署starrocks的单表同步遇到写入问题
【1月更文挑战第6天】【1月更文挑战第29篇】DataWorks中odps到容器部署starrocks的单表同步遇到写入问题
46 3
|
4月前
|
安全 Java 编译器
Java并发编程学习6-同步容器类和并发容器
【1月更文挑战第6天】本篇介绍同步容器类和并发容器的相关内容(Vector、ConcurrentHashMap、CopyOnWriteArrayList)
36 3
Java并发编程学习6-同步容器类和并发容器
|
5月前
|
分布式计算 DataWorks MaxCompute
DataWorks中odps到容器部署starrocks的单表同步遇到写入问题
DataWorks中odps到容器部署starrocks的单表同步遇到写入问题
38 1
|
10月前
|
算法 安全 Java
同步容器和并发容器
同步容器和并发容器
|
12月前
|
Linux Docker 容器
Docker Review - 使用docker volume数据卷实现容器内的数据与宿主机同步
Docker Review - 使用docker volume数据卷实现容器内的数据与宿主机同步
202 0
|
12月前
|
Java 调度 容器
并发编程-15并发容器(J.U.C)核心 AbstractQueuedSynchronizer 抽象队列同步器AQS介绍
并发编程-15并发容器(J.U.C)核心 AbstractQueuedSynchronizer 抽象队列同步器AQS介绍
88 0
|
3天前
|
监控 Kubernetes Docker
【Docker 专栏】Docker 容器内应用的健康检查与自动恢复
【5月更文挑战第9天】本文探讨了Docker容器中应用的健康检查与自动恢复,强调其对应用稳定性和系统性能的重要性。健康检查包括进程、端口和应用特定检查,而自动恢复则涉及重启容器和重新部署。Docker原生及第三方工具(如Kubernetes)提供了相关功能。配置检查需考虑检查频率、应用特性和监控告警。案例分析展示了实际操作,未来发展趋势将趋向更智能和高效的检查恢复机制。
【Docker 专栏】Docker 容器内应用的健康检查与自动恢复
|
2天前
|
NoSQL Redis Docker
Mac上轻松几步搞定Docker与Redis安装:从下载安装到容器运行实测全程指南
Mac上轻松几步搞定Docker与Redis安装:从下载安装到容器运行实测全程指南
12 0
|
3天前
|
存储 安全 数据库
【Docker 专栏】Docker 容器内应用的状态持久化
【5月更文挑战第9天】本文探讨了Docker容器中应用状态持久化的重要性,包括数据保护、应用可用性和历史记录保存。主要持久化方法有数据卷、绑定挂载和外部存储服务。数据卷是推荐手段,可通过`docker volume create`命令创建并挂载。绑定挂载需注意权限和路径一致性。利用外部存储如数据库和云服务可应对复杂需求。最佳实践包括规划存储策略、定期备份和测试验证。随着技术发展,未来将有更智能的持久化解决方案。
【Docker 专栏】Docker 容器内应用的状态持久化
|
3天前
|
机器学习/深度学习 监控 Kubernetes
【Docker 专栏】Docker 容器内服务的自动扩展与缩容
【5月更文挑战第9天】本文探讨了Docker容器服务的自动扩展与缩容原理及实践,强调其在动态业务环境中的重要性。通过选择监控指标(如CPU使用率)、设定触发条件和制定扩展策略,实现资源的动态调整。方法包括云平台集成和使用Kubernetes等框架。实践中,电商平台和实时数据处理系统受益于此技术。注意点涉及监控数据准确性、扩展速度和资源分配。未来,智能算法将提升扩展缩容的效率和准确性,成为关键技术支持。
【Docker 专栏】Docker 容器内服务的自动扩展与缩容