(2)ConcurrentQueue
与ConcurrentHashMap相同,ConcurrentQueue也是通过同样的方式来提高并发性能的。
同步容器中提到过火车票问题:
有N张火车票,每张车票都有一个编号,同时有10个窗口对外售票。
使用ConcurrentQueue进一步提高并发性:
public class Demo4 { private static Queue<String> queues = new ConcurrentLinkedDeque<>(); static { for (int i = 0; i < 10000; i++) { queues.add("票编号:"+i); } } public static void main(String[] args) throws InterruptedException { long start = System.currentTimeMillis(); for (int i = 0; i < 10; i++) { new Thread(()->{ while(true){ String s = queues.poll(); if (s ==null) break; else System.out.println("销售了---"+s); } }).start(); } long end = System.currentTimeMillis(); Thread.sleep(3000L); System.out.println("总耗时:"+(end-start)+"ms"); } }
常用的API
Queue<String> strings = new ConcurrentLinkedQueue<String>(); strings.offer(元素) //相当于add,放进队列 strings.size() //获取当前队列的元素个数 strings.poll() //取出并移除 strings.peek() //取出不会移除,相当于get();
(3)CopyOnWriteArrayList
写时复制容器,即copy-on-write,多线程环境下,写时效率低,读时效率高,适合写少读多的环境。
public class Demo5 implements Runnable{ private static List<String> lists = new ArrayList<>(); //private static List<String> lists = new Vector<>(); //private static List<String> lists = new CopyOnWriteArrayList<>(); private Random random = new Random(); @Override public void run() { for (int i = 0; i < 10000; i++) { lists.add(random.nextInt()+""); } } public static void main(String[] args) throws InterruptedException { long start = System.currentTimeMillis(); System.out.println("线程开始操作"); for (int i = 0; i < 10; i++) { new Thread(new Demo5()).start(); } for (int i = 0; i < 100; i++) { int finalI = i; new Thread(()->{ for (int j = 0; j < lists.size(); j++) { lists.get(finalI); } }).start(); } long end = System.currentTimeMillis(); Thread.sleep(6000L); System.out.println("耗时:"+(end-start)+"ms"); } }
运行结果:
ArrayList:报错:Exception in thread "Thread-1" java.lang.ArrayIndexOutOfBoundsException: 244 Vector:117ms CopyOnWriteArrayList:222ms
从JDK5开始Java并发包里面提供了两个使用CopyOnWrite机制实现的并发容器,它们是CopyOnWriteArrayList和CopyOnWriteArraySet。
当我们往一个容器添加元素的时候,不直接往当前容器添加,而是先将当前容器进行Copy,复制出一个新的容器,然后向新的容器添加元素,添加完成元素后,再将原来的容器的引用指向新的容器。这样做的好处就是我们可以对CopyOnWrite容器进行并发的读,而不需要加锁,因为在当前读的容器不会添加任何元素。所以CopyOnWrite容器是一种读写分离的思想,读和写写对应不同的容器。
(4)BlockingQueue
在新增的Concurrent包中,BlockingQueue很好的解决了多线程中,如何高效安全“传输”数据问题。通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利。
阻塞对列,顾名思义,首先他是一个队列,一个队列在数据结构当中起到的作用大致如下:
队列可以使得数据由队列的一端输入,从另一端输出。
先进先出(FIFO):先插入的队列的元素也是最先出队列,这种队列体现了一种公平性。
后进先出(LIFO):后插入队列的元素最先出队列,这种队列优先处理最近发生的事件。
(5)LinkedBlockingQueue
这中并发容器,会自动实现阻塞式的生产者/消费者模式。使用队列解耦合,在实现异步事物的时候很有用。
案例
public class Demo6 { //实例化时指定容器容量 private static LinkedBlockingQueue<String> linkedBlockingQueue = new LinkedBlockingQueue<>(1000); public static void main(String[] args) { for (int i = 0; i < 100; i++) { int finalI = i; new Thread(() -> { for (int j = 0; j < 1000; j++) { try { //向对列中添加元素,如果对列满了 就等待1s在进行添加 boolean b = linkedBlockingQueue.offer(finalI + "", 1, TimeUnit.SECONDS); if (b) { System.out.println(finalI + "队列添加成功"); } else { System.out.println(finalI + "队列添加失败,进入等待"); } } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); } for (int i = 0; i < 10; i++) { int finalI = i; new Thread(() -> { for (int j = 0; j < 1000; j++) { try { //消费队列,如果为空就等待消费 String take = linkedBlockingQueue.take(); System.out.println("消费队列元素:" + take); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); } } }
常用API
//实例化时指定容器容量 private static LinkedBlockingQueue<String> linkedBlockingQueue = new LinkedBlockingQueue<>(1000); linkedBlockingQueue.add(元素) //如果队列满了,再次添加就会抛出异常:java.lang.IllegalStateException: Queue full linkedBlockingQueue.offer(元素,时间,时间单位) //队列满了,等待时间后,再次添加,失败返回false linkedBlockingQueue.offer(元素) //队列满了,添加失败返回false,成功返回true linkedBlockingQueue.put(元素) //加入队列,如果满了就等待阻塞 linkedBlockingQueue.take() //取出队列中的元素,如果空了,就会等待阻塞
(6)ArrayBlockingQueue
ArrayBlockingQueue和LinkedBlockingQueue对象的方法都是一样的,用法是一样的。
二者的区别:
LinkedBlockingQueue是一个单向链表实现的阻塞队列,在链表一头加入元素,如果队列满了,就会阻塞,另一头取出元素,如果队列为空,就会阻塞。
LinkedBlockingQueue内部使用ReetrantLock实现插入锁(putLock)和取出锁(takeLock)。
ArrayBlockingQueue基于数组实现,成为有界队列,LinkedBlockingQueue认为是无界队列。当然LinkedBlockingQueue也可以指定队列容量。
(7)DelayQueue
DelayQueue也是一个BlockingQueue,用于放置实现了Delayed接口的对象,只能是实现了Delayed接口的对象,其中对象只能在其到期时才能从队列中取走。
Delayed扩展了Comparable接口,比较的基准为延时的时间,Delayed接口实现类getDelay()返回值为固定值(final),DelayedQueue内部是使用PriorityQueue实现的;即 (DelayQueue = BlockingQueue + PriorityQueue + Delayed)
可以说,DelayQueue是一个使用优先队列(PriorityQueue)实现的BlockingQueue,优先队列的比较基准是时间。是一个无界的BlockingQueue,用于放置实现了Delayed接口的对象,其中的对象只能在其到期时能从队列中取走。这种队列是有序的,及队头对象的延迟到期时间最长。但是要注意不能将null元素放置到队列中。
Delayed,一种混合风格的接口,用来标记那些应该在给定延迟时间之后执行的对象。此接口的实现类必须重写一个compareTo()方法,该方法提供于此接口的getDelay()方法一致的排序。
DelayQueue存储的对象是实现了Delayed接口的对象,在这个对象中,需要重写compareTo()和getDelay()方法。
自定义MyTask类实现Delayed
public class MyTask implements Delayed { private long time; private String name; private long start = System.currentTimeMillis(); public MyTask(String name,long time) { this.time = time; this.name = name; } @Override public long getDelay(TimeUnit unit) { return (start+time) - System.currentTimeMillis(); } @Override public int compareTo(Delayed o) { return (int)(this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS)); } @Override public String toString() { return "MyTask{" + "time=" + time + ", name='" + name + '\'' + '}'; } }
测试main
public class Main { public static void main(String[] args) throws InterruptedException { DelayQueue <MyTask> myTasks = new DelayQueue<>(); new Thread(()->{ myTasks.offer(new MyTask("task1",10000)); myTasks.offer(new MyTask("task2",4000)); myTasks.offer(new MyTask("task3",4200)); myTasks.offer(new MyTask("task4",6200)); myTasks.offer(new MyTask("task5",9800)); }).start(); long start = System.currentTimeMillis(); Thread.sleep(2000); System.out.println("队列中存放数据:"); for (MyTask myTask : myTasks) { System.out.println(myTask); } System.out.println(); System.out.println("队列中取出数据:"); while(true){ MyTask myTask = myTasks.take(); System.out.println(myTask+":取出耗时:"+(System.currentTimeMillis()-start)+"ms"); } } }
DelayQueue能做什么
淘宝订单业务:下单之后如果30分钟之内没有付款就自动取消订单。
饿了么定餐通知:下单成功后60s后给用户发短信。
关闭空闲连接:服务器中,很多客户端的连接,空闲一段时间之后需要关闭。
缓存:缓存中的对象,超过了空闲时间,需要从缓存中移出。
任务超时处理:在网络协议滑动窗口请求应答交互时,处理超时未响应的请求等。
(8)LinkedTransferQueue
TransferQueue是一个继承了BlockingQueue的接口,并且增加了若干新方法。
LinkedTransferQueue是TransferQueue接口的实现类,其定义一个无界的队列,具有先进先出(FIFO)的特性。
TransferQueue接口含有下面几个重要方法:
transfer(E e)
若当前存在一个正在等待获取的消费者线程,即立刻移交之,否则,会插入当前元素e到队列尾部,并且等待进入阻塞状态,到有消费者线程取走该元素。
tryTransfer(E e)
若当前存在一个正在等待获取的消费者线程(使用take()或者poll()函数),使用该方法会即刻转移\传输对象元素e;如不存在,则返回false,并且不进入队列。这是一个不阻塞的操作。
tryTransfer(E e,long timeout,TimeUnit nuit)
若当前存在一个正在等待的消费者线程,会立即传输给它,否则将插入元素e到队列尾部,并且等待被消费者线程获取消费掉,若在指定的时间内元素e无法被消费者线程获取,则返回false,同时该元素被移除。
hasWaitingConsumer()
判断是否由消费者线程。
getWaitingConsumerCount()
获取所有等待获取元素的消费者线程数量。
size()
因为队列的异步特性,检测当前队列的元素个数需要逐一迭代,无法保证原子性,可能会得到一个不太准确的结果,尤其是在遍历时有可能队列发生更改。
消费者生产者案例
Producer
public class Producer implements Runnable { private final TransferQueue<String> queue; //构造传入LinkedTransferQueue队列 public Producer(TransferQueue<String> queue) { this.queue = queue; } @Override public void run() { try { //生产者循环 while (true){ //判断当前队列是否还有消费者,有的话就生产产品交由消费者线程 if(queue.hasWaitingConsumer()) queue.transfer(produce()); //休眠1s TimeUnit.SECONDS.sleep(1); } }catch (Exception e){ e.printStackTrace(); } } //生产产品方法 private String produce(){ return "Your lucky number:"+(new Random().nextInt(100)); } }
Consumer
public class Consumer implements Runnable{ private final TransferQueue<String> queue; public Consumer(TransferQueue<String> queue) { this.queue = queue; } @Override public void run() { try{ //消费者线程取出队列元素 System.out.println("Consumer--"+Thread.currentThread().getName()+"--"+queue.take()); }catch (Exception e){ e.printStackTrace(); } } }
main测试
public class Main { public static void main(String[] args) { TransferQueue<String> queue = new LinkedTransferQueue<>(); Thread producer = new Thread(new Producer(queue)); producer.setDaemon(true); producer.start(); for (int i = 0; i < 20; i++) { Thread consumer = new Thread(new Consumer(queue)); consumer.setDaemon(true); consumer.start(); try{ Thread.sleep(1000L); }catch (Exception e){ e.printStackTrace(); } } } }
(9)SynchronousQueue
SynchronousQueue也是一种BlockingQueue,是一种无缓冲的等待队列。所以在某次添加元素后必须等待其他线程取走后才能继续添加,可以认为SynchronousQueue是一个缓存值为0的阻塞队列(也可以是1),它的isEmpty()方法永远返回时true,remainingCapacity()方法永远返回时0。
remove和removeAll方法返回永远是false,iterator()方法永远返回空,peek()方法永远返回null。
使用put()方法时,会一直阻塞在这里,等待被消费。
案例代码
public class SynchronousQueueDemo { public static void main(String[] args) throws InterruptedException { BlockingQueue<String> strings = new SynchronousQueue<>(); for (int i = 0; i < 2; i++) { new Thread(()->{ try{ System.out.println("取出数据:"+strings.take()); }catch (Exception e){ e.printStackTrace(); } }).start(); } strings.put("aaa"); strings.put("bbb"); } }