JUC:java.util.concurrent
集合
1、BlockingQueue
什么是阻塞队列?
阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。
阻塞队列提供了四种处理方法:
方法\处理方式 | 抛出异常 | 返回特殊值 | 一直阻塞 | 超时退出 |
---|---|---|---|---|
插入方法 | add(e) | offer(e) | put(e) | offer(e,time,unit) |
移除方法 | remove() | poll() | take() | poll(time,unit) |
检查方法 | element() | peek() | 不可用 | 不可用 |
- 异常:是指当阻塞队列满时候,再往队列里插入元素,会抛出IllegalStateException("Queue full")异常。当队列为空时,从队列里获取元素时会抛出NoSuchElementException异常 。
- 返回特殊值:插入方法会返回是否成功,成功则返回true。移除方法,则是从队列里拿出一个元素,如果没有则返回null
- 一直阻塞:当阻塞队列满时,如果生产者线程往队列里put元素,队列会一直阻塞生产者线程,直到拿到数据,或者响应中断退出。当队列空时,消费者线程试图从队列里take元素,队列也会阻塞消费者线程,直到队列可用。
- 超时退出:当阻塞队列满时,队列会阻塞生产者线程一段时间,如果超过一定的时间,生产者线程就会退出。
阻塞队列接口:
BlockingQueue的核心方法:
public interface BlockingQueue<E> extends Queue<E> {
//插入元素e到队列中,成功返回true, 否则抛出异常。如果向限定了容量的队列中插入值,推荐使用offer()方法。
boolean add(E e);
//插入元素e到队列中,如果设置成功返回true, 否则返回false. e的值不能为空,否则抛出空指针异常。
boolean offer(E e);
//插入元素e到队列中,,如果队列中没有多余的空间,该方法会一直阻塞,直到队列中有多余的空间。
void put(E e) throws InterruptedException;
//在给定的时间插入元素e到队列中,如果设置成功返回true, 否则返回false.
boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException;
//检索并从队列的头部删除元素,如果队列中没有值,线程会一直阻塞,直到队列中有值,并且该方法取得了该值。
E take() throws InterruptedException;
//在给定的时间范围内,检索并从队列的头部删除元素,从队列中获取值,如果没有取到会抛出异常。
E poll(long timeout, TimeUnit unit)
throws InterruptedException;
//获取队列中剩余的空间。
int remainingCapacity();
//从队列中移除指定的值。
boolean remove(Object o);
//判断队列中是否包含该值。
public boolean contains(Object o);
//将队列中值,全部移除,并追加到给定的集合中。
int drainTo(Collection<? super E> c);
//指定最多数量限制将队列中值,全部移除,并追加到给定的集合中。
int drainTo(Collection<? super E> c, int maxElements);
}
继承关系
- 子接口:
BlockingDeque
TransferQueue
TransferQueue继承了BlockingQueue,并扩展了一些新方法。
BlockingQueue是指这样的一个队列:当生产者向队列添加元素但队列已满时,生产者会被阻塞;当消费者从队列移除元素但队列为空时,消费者会被阻塞。
TransferQueue则更进一步,生产者会一直阻塞直到所添加到队列的元素被某一个消费者所消费(不仅仅是添加到队列里就完事)。新添加的transfer方法用来实现这种约束。顾名思义,阻塞就是发生在元素从一个线程transfer到另一个线程的过程中,它有效地实现了元素在线程之间的传递(以建立Java内存模型中的happens-before关系的方式)。
TransferQueue还包括了其他的一些方法:两个tryTransfer方法,一个是非阻塞的,另一个带有timeout参数设置超时时间的。还有两个辅助方法hasWaitingConsumer()和getWaitingConsumerCount()。
- 实现类
ArrayBlockingQueue
DelayQueue
LinkedBlockingDeque
LinkedBlockingQueue
LinkedTransferQueue
PriorityBlockingQueue
SynchronousQueue
2、 ArrayBlockingQueue
ArrayBlockingQueue
是一个线程安全的、基于数组、有界的、阻塞的、FIFO 队列。试图向已满队列中放入元素会导致操作受阻塞;试图从空队列中提取元素将导致类似阻塞。
此类基于 java.util.concurrent.locks.ReentrantLock
来实现线程安全,所以提供了 ReentrantLock
所能支持的公平性选择。
使用:
package kaikeba.com;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class ArrayBlockingQueueDemo {
public static void main(String[] args) {
BlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<Integer>(3,true);
Producer producer = new Producer(blockingQueue);
Consumer consumer = new Consumer(blockingQueue);
new Thread(producer).start();
new Thread(consumer).start();
}
}
class Producer implements Runnable {
private BlockingQueue<Integer> blockingQueue;
private static int element = 0;
public Producer(BlockingQueue<Integer> blockingQueue) {
this.blockingQueue = blockingQueue;
}
public void run() {
try {
while(element < 20) {
System.out.println("生产元素:"+element);
blockingQueue.put(element++);
}
} catch (Exception e) {
System.out.println("生产者在等待空闲空间的时候发生异常!");
e.printStackTrace();
}
System.out.println("生产者终止了生产过程!");
}
}
class Consumer implements Runnable {
private BlockingQueue<Integer> blockingQueue;
public Consumer(BlockingQueue<Integer> blockingQueue) {
this.blockingQueue = blockingQueue;
}
public void run() {
try {
while(true) {
System.out.println("消费元素:"+blockingQueue.take());
}
} catch (Exception e) {
System.out.println("消费者在等待新产品的时候发生异常!");
e.printStackTrace();
}
System.out.println("消费者终止了消费过程!");
}
}
3、PriorityBlockingQueue
PriorityBlockingQueue是带优先级的无界阻塞队列,每次出队都返回优先级最高的元素,是二叉树最小堆的实现。
使用:
package kaikeba.com;
import java.util.Random;
import java.util.concurrent.PriorityBlockingQueue;
public class PriorityBlockingQueueTest {
public static void main(String[] args) throws InterruptedException {
PriorityBlockingQueue<PriorityElement> queue = new PriorityBlockingQueue<>();
for (int i = 0; i < 5; i++) {
Random random=new Random();
PriorityElement ele = new PriorityElement(random.nextInt(10));
queue.put(ele);
}
while(!queue.isEmpty()){
System.out.println(queue.take());
}
}
}
class PriorityElement implements Comparable<PriorityElement> {
private int priority;//定义优先级
PriorityElement(int priority) {
//初始化优先级
this.priority = priority;
}
@Override
public int compareTo(PriorityElement o) {
//按照优先级大小进行排序
return priority >= o.getPriority() ? 1 : -1;
}
public int getPriority() {
return priority;
}
public void setPriority(int priority) {
this.priority = priority;
}
@Override
public String toString() {
return "PriorityElement [priority=" + priority + "]";
}
}
4、 DelayQueue
DelayQueue队列中每个元素都有个过期时间,并且队列是个优先级队列,当从队列获取元素时候,只有过期元素才会出队列。
使用
package kaikeba.com;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
public class DelayQueueTest {
public static void main(String[] args) throws InterruptedException {
Item item1 = new Item("item1", 5, TimeUnit.SECONDS);
Item item2 = new Item("item2",10, TimeUnit.SECONDS);
Item item3 = new Item("item3",15, TimeUnit.SECONDS);
DelayQueue<Item> queue = new DelayQueue<>();
queue.put(item1);
queue.put(item2);
queue.put(item3);
System.out.println("begin time:" + LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME));
for (int i = 0; i < 3; i++) {
Item take = queue.take();
System.out.format("name:{%s}, time:{%s}\n",take.name, LocalDateTime.now().format(DateTimeFormatter.ISO_DATE_TIME));
}
}
}
class Item implements Delayed {
/* 触发时间*/
private long time;
String name;
public Item(String name, long time, TimeUnit unit) {
this.name = name;
this.time = System.currentTimeMillis() + (time > 0? unit.toMillis(time): 0);
}
@Override
public long getDelay(TimeUnit unit) {
return time - System.currentTimeMillis();
}
@Override
public int compareTo(Delayed o) {
Item item = (Item) o;
long diff = this.time - item.time;
if (diff <= 0) {
return -1;
}else {
return 1;
}
}
@Override
public String toString() {
return "Item{" +
"time=" + time +
", name='" + name + '\'' +
'}';
}
}
5、 LinkedBlockingQueue
LinkedBlockingQueue是一个基于单向链表的、范围任意的(其实是有界的)、FIFO 阻塞队列。访问与移除操作是在队头进行,添加操作是在队尾进行,并分别使用不同的锁进行保护,只有在可能涉及多个节点的操作才同时对两个锁进行加锁。
队列是否为空、是否已满仍然是通过元素数量的计数器(count)进行判断的,由于可以同时在队头、队尾并发地进行访问、添加操作,所以这个计数器必须是线程安全的,这里使用了一个原子类 AtomicInteger
,这就决定了它的容量范围是: 1 – Integer.MAX_VALUE。
由于同时使用了两把锁,在需要同时使用两把锁时,加锁顺序与释放顺序是非常重要的:必须以固定的顺序进行加锁,再以与加锁顺序的相反的顺序释放锁。
头结点和尾结点一开始总是指向一个哨兵的结点,它不持有实际数据,当队列中有数据时,头结点仍然指向这个哨兵,尾结点指向有效数据的最后一个结点。这样做的好处在于,与计数器 count 结合后,对队头、队尾的访问可以独立进行,而不需要判断头结点与尾结点的关系。
使用:
package kaikeba.com;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.LinkedBlockingQueue;
public class ArrayBlockingQueueDemo {
public static void main(String[] args) {
BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<>();
Producer producer = new Producer(blockingQueue);
Consumer consumer = new Consumer(blockingQueue);
new Thread(producer).start();
new Thread(consumer).start();
}
}
class Producer implements Runnable {
private BlockingQueue<Integer> blockingQueue;
private static int element = 0;
public Producer(BlockingQueue<Integer> blockingQueue) {
this.blockingQueue = blockingQueue;
}
public void run() {
try {
while(element < 20) {
System.out.println("生产元素:"+element);
blockingQueue.put(element++);
}
} catch (Exception e) {
System.out.println("生产者在等待空闲空间的时候发生异常!");
e.printStackTrace();
}
System.out.println("生产者终止了生产过程!");
}
}
class Consumer implements Runnable {
private BlockingQueue<Integer> blockingQueue;
public Consumer(BlockingQueue<Integer> blockingQueue) {
this.blockingQueue = blockingQueue;
}
public void run() {
try {
while(true) {
System.out.println("消费元素:"+blockingQueue.take());
}
} catch (Exception e) {
System.out.println("消费者在等待新产品的时候发生异常!");
e.printStackTrace();
}
System.out.println("消费者终止了消费过程!");
}
}
6、 LinkedBlockingDeque
LinkedBlockingDeque是一个由链表结构组成的双向阻塞队列。所谓双向队列指的你可以从队列的两端插入和移出元素。双端队列因为多了一个操作队列的入口,在多线程同时入队时,也就减少了一半的竞争。相比其他的阻塞队列,LinkedBlockingDeque多了addFirst,addLast,offerFirst,offerLast,peekFirst,peekLast等方法,以First单词结尾的方法,表示插入,获取(peek)或移除双端队列的第一个元素。以Last单词结尾的方法,表示插入,获取或移除双端队列的最后一个元素。另外插入方法add等同于addLast,移除方法remove等效于removeFirst。在初始化LinkedBlockingDeque时可以初始化队列的容量,用来防止其再扩容时过渡膨胀。
package kaikeba.com;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
public class LinkedBlockingQueueDemo {
public static void main(String[] args) {
BlockingQueue<Integer> blockingQueue = new LinkedBlockingDeque<Integer>();
Producer producer = new Producer(blockingQueue);
Consumer consumer = new Consumer(blockingQueue);
new Thread(producer).start();
new Thread(consumer).start();
}
}
class Producer implements Runnable {
private BlockingQueue<Integer> blockingQueue;
private static int element = 0;
public Producer(BlockingQueue<Integer> blockingQueue) {
this.blockingQueue = blockingQueue;
}
public void run() {
try {
while(element < 20) {
System.out.println("生产元素:"+element);
blockingQueue.put(element++);
}
} catch (Exception e) {
System.out.println("生产者在等待空闲空间的时候发生异常!");
e.printStackTrace();
}
System.out.println("生产者终止了生产过程!");
}
}
class Consumer implements Runnable {
private BlockingQueue<Integer> blockingQueue;
public Consumer(BlockingQueue<Integer> blockingQueue) {
this.blockingQueue = blockingQueue;
}
public void run() {
try {
while(true) {
System.out.println("消费元素:"+blockingQueue.take());
}
} catch (Exception e) {
System.out.println("消费者在等待新产品的时候发生异常!");
e.printStackTrace();
}
System.out.println("消费者终止了消费过程!");
}
}
7 、SynchronousQueue
SynchronousQueue是一个没有数据缓冲的BlockingQueue,生产者线程对其的插入操作put必须等待消费者的移除操作take,反过来也一样。
SynchronousQueue内部并没有数据缓存空间,你不能调用peek()方法来看队列中是否有数据元素,因为数据元素只有当你试着取走的时候才可能存在,不取走而只想偷窥一下是不行的,当然遍历这个队列的操作也是不允许的。
数据是在配对的生产者和消费者线程之间直接传递的,并不会将数据缓冲到队列中。
SynchronousQueue支持公平访问队列,默认情况下,线程采用非公平策略,如果使用公平策略,等待的线程采用先进先出的顺序访问队列。
SynchronousQueue适合传递性场景,一个使用场景是在线程池里。Executors.newCachedThreadPool()就使用了SynchronousQueue,这个线程池根据需要(新任务到来时)创建新的线程,如果有空闲线程则会重复使用,线程空闲了60秒后会被回收。
使用
package kaikeba.com;
import java.util.concurrent.*;
public class SynchronousQueueDemo {
public static void main(String[] args) {
BlockingQueue<Integer> blockingQueue = new SynchronousQueue<>();
Producer producer = new Producer(blockingQueue);
Consumer consumer = new Consumer(blockingQueue);
new Thread(producer).start();
new Thread(consumer).start();
}
}
class Producer implements Runnable {
private BlockingQueue<Integer> blockingQueue;
private static int element = 0;
public Producer(BlockingQueue<Integer> blockingQueue) {
this.blockingQueue = blockingQueue;
}
public void run() {
try {
while(element < 20) {
System.out.println("生产元素:"+element);
blockingQueue.put(element++);
}
} catch (Exception e) {
System.out.println("生产者在等待空闲空间的时候发生异常!");
e.printStackTrace();
}
System.out.println("生产者终止了生产过程!");
}
}
class Consumer implements Runnable {
private BlockingQueue<Integer> blockingQueue;
public Consumer(BlockingQueue<Integer> blockingQueue) {
this.blockingQueue = blockingQueue;
}
public void run() {
try {
while(true) {
Thread.sleep(1000l);
System.out.println("消费元素:"+blockingQueue.take());
}
} catch (Exception e) {
System.out.println("消费者在等待新产品的时候发生异常!");
e.printStackTrace();
}
System.out.println("消费者终止了消费过程!");
}
}
8、 LinkedTransferQueue
LinkedTransferQueue是一个由链表结构组成的无界阻塞TransferQueue队列。相对于其他阻塞队列LinkedTransferQueue多了tryTransfer和transfer方法。
transfer方法。如果当前有消费者正在等待接收元素(消费者使用take()方法或带时间限制的poll()方法时),transfer方法可以把生产者传入的元素立刻transfer(传输)给消费者。如果没有消费者在等待接收元素,transfer方法会将元素存放在队列的tail节点,并等到该元素被消费者消费了才返回。transfer方法的关键代码如下:
第一行代码是试图把存放当前元素的s节点作为tail节点。第二行代码是让CPU自旋等待消费者消费元素。因为自旋会消耗CPU,所以自旋一定的次数后使用Thread.yield()方法来暂停当前正在执行的线程,并执行其他线程。
tryTransfer方法。则是用来试探下生产者传入的元素是否能直接传给消费者。如果没有消费者等待接收元素,则返回false。和transfer方法的区别是tryTransfer方法无论消费者是否接收,方法立即返回。而transfer方法是必须等到消费者消费了才返回。
对于带有时间限制的tryTransfer(E e, long timeout, TimeUnit unit)方法,则是试图把生产者传入的元素直接传给消费者,但是如果没有消费者消费该元素则等待指定的时间再返回,如果超时还没消费元素,则返回false,如果在超时时间内消费了元素,则返回true。
使用:
package kaikeba.com;
import java.util.concurrent.*;
public class LinkedTransferQueueTest {
public static void main(String[] args) {
LinkedTransferQueue<Integer> blockingQueue = new LinkedTransferQueue<Integer>();
Producer producer = new Producer(blockingQueue);
Consumer consumer = new Consumer(blockingQueue);
new Thread(producer).start();
new Thread(consumer).start();
}
}
class Producer implements Runnable {
private LinkedTransferQueue<Integer> linkedTransferQueue;
private static int element = 0;
public Producer(LinkedTransferQueue<Integer> linkedTransferQueue) {
this.linkedTransferQueue = linkedTransferQueue;
}
public void run() {
try {
while(element < 20) {
System.out.println("生产元素:"+element);
linkedTransferQueue.put(element++);
}
} catch (Exception e) {
System.out.println("生产者在等待空闲空间的时候发生异常!");
e.printStackTrace();
}
System.out.println("生产者终止了生产过程!");
}
}
class Consumer implements Runnable {
private LinkedTransferQueue<Integer> linkedTransferQueue;
public Consumer(LinkedTransferQueue<Integer> linkedTransferQueue) {
this.linkedTransferQueue = linkedTransferQueue;
}
public void run() {
try {
while(true) {
Thread.sleep(1000l);
System.out.println("消费元素:"+linkedTransferQueue.take());
}
} catch (Exception e) {
System.out.println("消费者在等待新产品的时候发生异常!");
e.printStackTrace();
}
System.out.println("消费者终止了消费过程!");
}
9、ConcurrentHashMap
HashMap容量
/**
* Constructs an empty <tt>HashMap</tt> with the specified initial
* capacity and load factor.
*
* @param initialCapacity the initial capacity
* @param loadFactor the load factor
* @throws IllegalArgumentException if the initial capacity is negative
* or the load factor is nonpositive
*/
public HashMap(int initialCapacity, float loadFactor) {
if (initialCapacity < 0)
throw new IllegalArgumentException("Illegal initial capacity: " +
initialCapacity);
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
if (loadFactor <= 0 || Float.isNaN(loadFactor))
throw new IllegalArgumentException("Illegal load factor: " +
loadFactor);
this.loadFactor = loadFactor;
this.threshold = tableSizeFor(initialCapacity);
}
给定的默认容量为 16,负载因子为 0.75。Map 在使用过程中不断的往里面存放数据,当数量达到了 16 * 0.75 = 12
就需要将当前 16 的容量进行扩容,而扩容这个过程涉及到 rehash、复制数据等操作,所以非常消耗性能。
因此通常建议能提前预估 HashMap 的大小最好,尽量的减少扩容带来的性能损耗。
线程不安全的 HashMap
因为多线程环境下,使用 HashMap 进行 put 操作会引起死循环,导致 CPU 利用率接近 100%,所以在并发情况下不能使用 HashMap,如以下代码
final HashMap<String, String> map = new HashMap<String, String>(2);
Thread t = new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 10000; i++) {
new Thread(new Runnable() {
@Override
public void run() {
map.put(UUID.randomUUID().toString(), "");
}
}, "kaikeba" + i).start();
}
}
}, "kaikeba");
t.start();
t.join();
效率低下的 HashTable 容器
HashTable 容器使用 syncronized来保证线程安全,但在线程竞争激烈的情况下 HashTable 的效率非常低下。因为当一个线程访问 HashTable 的同步方法时,其他线程访问 HashTable 的同步方法时,可能会进入阻塞或轮询状态。如线程 1 使用 put 进行添加元素,线程 2 不但不能使用 put 方法添加元素,并且也不能使用 get 方法来获取元素,所以竞争越激烈效率越低。
栗子:
package kaikeba.com;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class ConcurrentHashMapTest {
public static void main(String[] args) {
Map<String, String> map = new ConcurrentHashMap<String, String>();
map.put("key1", "1");
map.put("key2", "2");
map.put("key3", "3");
map.put("key4", "4");
Iterator<String> it = map.keySet().iterator();
while (it.hasNext()) {
String key = it.next();
System.out.println(key + ","+ map.get(key));
}
}
}
10、 ConcurrentSkipListMap
JDK1.6时,为了对高并发环境下的有序Map提供更好的支持,J.U.C新增了一个ConcurrentNavigableMap接口,ConcurrentNavigableMap很简单,它同时实现了NavigableMap和ConcurrentMap接口。
ConcurrentNavigableMap接口提供的功能也和NavigableMap几乎完全一致,很多方法仅仅是返回的类型不同。
NavigableMap接口,进一步扩展了SortedMap的功能,提供了根据指定Key返回最接近项、按升序/降序返回所有键的视图等功能。
J.U.C提供了基于ConcurrentNavigableMap接口的一个实现——ConcurrentSkipListMap
。ConcurrentSkipListMap可以看成是并发版本的TreeMap,但是和TreeMap不同是,ConcurrentSkipListMap并不是基于红黑树实现的,其底层是一种类似跳表(Skip List)的结构。
栗子:
package kaikeba.com;
import java.util.Map;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
public class ConcurrentSkipListMapTest {
public static void main(String[] args) {
ConcurrentSkipListMap<String, Contact> map = new ConcurrentSkipListMap<>();
Thread threads[]=new Thread[25];
int counter=0;
//创建和启动25个任务,对于每个任务指定一个大写字母作为ID
for (char i='A'; i<'Z'; i++) {
Task0 task=new Task0(map, String.valueOf(i));
threads[counter]=new Thread(task);
threads[counter].start();
counter++;
}
//使用join()方法等待线程的结束
for (int i=0; i<25; i++) {
try {
threads[i].join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.printf("Size of the map: %d\n",map.size());
Map.Entry<String, Contact> element;
Contact contact;
// 使用firstEntry()方法获取map的第一个实体,并输出。
element=map.firstEntry();
contact=element.getValue();
System.out.printf("First Entry: %s: %s\n",contact.
getName(),contact.getPhone());
//使用lastEntry()方法获取map的最后一个实体,并输出。
element=map.lastEntry();
contact=element.getValue();
System.out.printf("Last Entry: %s: %s\n",contact.
getName(),contact.getPhone());
//使用subMap()方法获取map的子map,并输出。
System.out.printf("Submap from A1996 to B1002: \n");
ConcurrentNavigableMap<String, Contact> submap=map.
subMap("A1996", "B1001");
do {
element=submap.pollFirstEntry();
if (element!=null) {
contact=element.getValue();
System.out.printf("%s: %s\n",contact.getName(),contact.
getPhone());
}
} while (element!=null);
}
}
class Contact {
private String name;
private String phone;
public Contact(String name, String phone) {
this.name = name;
this.phone = phone;
}
public String getName() {
return name;
}
public String getPhone() {
return phone;
}
}
class Task0 implements Runnable {
private ConcurrentSkipListMap<String, Contact> map;
private String id;
public Task0(ConcurrentSkipListMap<String, Contact> map, String id) {
this.id = id;
this.map = map;
}
@Override
public void run() {
for (int i = 0; i < 1000; i++) {
Contact contact = new Contact(id, String.valueOf(i + 1000));
map.put(id + contact.getPhone(), contact);
}
}
}
11、 ConcurrentSkipListSet
ConcurrentSkipListSet,是JDK1.6时J.U.C新增的一个集合工具类,它是一种有序的SET类型。
ConcurrentSkipListSet实现了NavigableSet接口,ConcurrentSkipListMap实现了NavigableMap接口,以提供和排序相关的功能,维持元素的有序性,所以ConcurrentSkipListSet就是一种为并发环境设计的有序SET工具类。
ConcurrentSkipListSet底层实现引用了ConcurrentSkipListMap。
栗子:
package kaikeba.com;
import java.util.concurrent.ConcurrentSkipListSet;
public class ConcurrentSkipListSetTest {
public static void main(String[] args) {
ConcurrentSkipListSet<Contact1> set = new ConcurrentSkipListSet<>();
Thread threads[]=new Thread[25];
int counter=0;
//创建和启动25个任务,对于每个任务指定一个大写字母作为ID
for (char i='A'; i<'Z'; i++) {
Task1 task=new Task1(set, String.valueOf(i));
threads[counter]=new Thread(task);
threads[counter].start();
counter++;
}
//使用join()方法等待线程的结束
for (int i=0; i<25; i++) {
try {
threads[i].join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.printf("Size of the set: %d\n",set.size());
Contact1 contact;
// 使用first方法获取set的第一个实体,并输出。
contact=set.first();
System.out.printf("First Entry: %s: %s\n",contact.
getName(),contact.getPhone());
//使用last方法获取set的最后一个实体,并输出。
contact=set.last();
System.out.printf("Last Entry: %s: %s\n",contact.
getName(),contact.getPhone());
}
}
class Contact1 implements Comparable<Contact1> {
private String name;
private String phone;
public Contact1(String name, String phone) {
this.name = name;
this.phone = phone;
}
public String getName() {
return name;
}
public String getPhone() {
return phone;
}
@Override
public int compareTo(Contact1 o) {
return name.compareTo(o.name);
}
}
class Task1 implements Runnable {
private ConcurrentSkipListSet<Contact1> set;
private String id;
public Task1(ConcurrentSkipListSet<Contact1> set, String id) {
this.id = id;
this.set = set;
}
@Override
public void run() {
for (int i = 0; i < 100; i++) {
Contact1 contact = new Contact1(id, String.valueOf(i + 100));
set.add(contact);
}
}
}
12 、CopyOnWriteArrayList
Copy-On-Write简称COW,是一种用于程序设计中的优化策略。其基本思路是,从一开始大家都在共享同一个内容,当某个人想要修改这个内容的时候,才会真正把内容Copy出去形成一个新的内容然后再改,这是一种延时懒惰策略。从JDK1.5开始Java并发包里提供了两个使用CopyOnWrite机制实现的并发容器,它们是CopyOnWriteArrayList和CopyOnWriteArraySet。CopyOnWrite容器非常有用,可以在非常多的并发场景中使用到。
什么是CopyOnWrite容器
CopyOnWrite容器即写时复制的容器。通俗的理解是当我们往一个容器添加元素的时候,不直接往当前容器添加,而是先将当前容器进行Copy,复制出一个新的容器,然后新的容器里添加元素,添加完元素之后,再将原容器的引用指向新的容器。这样做的好处是我们可以对CopyOnWrite容器进行并发的读,而不需要加锁,因为当前容器不会添加任何元素。所以CopyOnWrite容器也是一种读写分离的思想,读和写不同的容器。
CopyOnWrite容器有很多优点,但是同时也存在两个问题,即内存占用问题和数据一致性问题。所以在开发的时候需要注意一下。
内存占用问题。因为CopyOnWrite的写时复制机制,所以在进行写操作的时候,内存里会同时驻扎两个对象的内存,旧的对象和新写入的对象(注意:在复制的时候只是复制容器里的引用,只是在写的时候会创建新对象添加到新容器里,而旧容器的对象还在使用,所以有两份对象内存)。如果这些对象占用的内存比较大,比如说200M左右,那么再写入100M数据进去,内存就会占用300M,那么这个时候很有可能造成频繁的Yong GC和Full GC。之前我们系统中使用了一个服务由于每晚使用CopyOnWrite机制更新大对象,造成了每晚15秒的Full GC,应用响应时间也随之变长。
针对内存占用问题,可以通过压缩容器中的元素的方法来减少大对象的内存消耗,比如,如果元素全是10进制的数字,可以考虑把它压缩成36进制或64进制。或者不使用CopyOnWrite容器,而使用其他的并发容器,如ConcurrentHashMap。
数据一致性问题。CopyOnWrite容器只能保证数据的最终一致性,不能保证数据的实时一致性。所以如果你希望写入的的数据,马上能读到,请不要使用CopyOnWrite容器。
使用:
package kaikeba.com;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
class ReadThread implements Runnable {
private List<Integer> list;
public ReadThread(List<Integer> list) {
this.list = list;
}
@Override
public void run() {
System.out.print("size:="+list.size()+",::");
for (Integer ele : list) {
System.out.print(ele + ",");
}
System.out.println();
}
}
class WriteThread implements Runnable {
private List<Integer> list;
public WriteThread(List<Integer> list) {
this.list = list;
}
@Override
public void run() {
this.list.add(9);
}
}
public class TestCopyOnWriteArrayListTest {
private void test() {
//1、初始化CopyOnWriteArrayList
List<Integer> tempList = Arrays.asList(new Integer [] {1,2});
CopyOnWriteArrayList<Integer> copyList = new CopyOnWriteArrayList<>(tempList);
//2、模拟多线程对list进行读和写
ExecutorService executorService = Executors.newFixedThreadPool(10);
executorService.execute(new ReadThread(copyList));
executorService.execute(new WriteThread(copyList));
executorService.execute(new WriteThread(copyList));
executorService.execute(new WriteThread(copyList));
executorService.execute(new ReadThread(copyList));
executorService.execute(new WriteThread(copyList));
executorService.execute(new ReadThread(copyList));
executorService.execute(new WriteThread(copyList));
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("copyList size:"+copyList.size());
}
public static void main(String[] args) {
new TestCopyOnWriteArrayList().test();
}
}
13 、CopyOnWriteArraySet
CopyOnWriteArraySet相对CopyOnWriteArrayList用来存储不重复的对象,是线程安全的。虽然继承了AbstractSet类,但CopyOnWriteArraySet与HashMap 完全不同,内部是用CopyOnWriteArrayList实现的,实现不重复的特性也是直接调用CopyOnWriteArrayList的方法实现的,感觉加的最有用的函数就是eq函数判断对象是否相同