JUC并发包下的容器类
本部分回答以下几个问题,如果能回答正确,则证明本部分掌握好了。
- 普通集合类为什么线程不安全?
- ConcurrentHashMap的实现原理和常用方法
- CopyOnWrite类的实现原理和使用方式
接下来我们看这部分的内容。
线程不安全的集合类
如果使用线程不安全的集合极容易出现问题,例如两个线程同时往一个list里添加元素,他们同时判断一个索引上没有值,同时添加,那么实际上只添加了一次,我们举个例子看看:
public class ThreadTest { public static void main(String[] args) throws InterruptedException { List<String> list=new ArrayList<>(); for (int i=0;i<500;i++) { Thread thread=new Thread(()->{ try { Thread.sleep(50); } catch (InterruptedException e) { e.printStackTrace(); } list.add(Thread.currentThread().getName()+"线程添加的一个元素"); }); thread.start(); } Thread.sleep(2000); System.out.println("tml说在线程不安全条件下,500个线程并发后list只增加了"+list.size()+"个元素"); } }
打印结果为:
tml说在线程不安全条件下,500个线程并发后list只增加了493个元素
如果我们换成线程安全的Vector:
public class ThreadTest { public static void main(String[] args) throws InterruptedException { Vector<String> vector=new Vector<>(); for (int i=0;i<500;i++) { Thread thread=new Thread(()->{ try { Thread.sleep(50); } catch (InterruptedException e) { e.printStackTrace(); } vector.add(Thread.currentThread().getName()+"线程添加的一个元素"); }); thread.start(); } Thread.sleep(2000); System.out.println("tml说在线程安全条件下,500个线程并发后vector也增加了"+vector.size()+"个元素"); } }
返回结果为:
tml说在线程安全条件下,500个线程并发后vector也增加了500个元素
其实不光写会造成问题,在同一时间多个线程无法对同一个List进行读取和增删,否则就会抛出并发异常,因为在读的时候被别人改了
Exception in thread "Thread-403" java.lang.ArrayIndexOutOfBoundsException: 366 at java.util.ArrayList.add(ArrayList.java:463) at com.company.ThreadTest.lambda$main$0(ThreadTest.java:18) at java.lang.Thread.run(Thread.java:748) tml说在线程不安全条件下,500个线程并发后list只增加了491个元素
综合以上考虑,线程安全的实现类有vector,stack,hashtable 为了方便,我们将前面介绍集合类统称为java集合包。java集合包大多是非线程安全的,虽然可以通过Collections工具类中的方法获取java集合包对应的同步类,但是这些同步类的并发效率并不是很高。为了更好的支持高并发任务,Java在JUC包中添加了java集合包中单线程类的对应的支持高并发的类。例如,ArrayList对应的高并发类是CopyOnWriteArrayList,HashMap对应的高并发类是ConcurrentHashMap,等等。
ConcurrentHashMap
ConcurrentHashMap是线程安全且高效的HashMap。一起了解下该容器是如何在保证线程安全的同时又能保证高效的操作。
- 线程不安全的HashMap,在多线程环境下,使用HashMap进行put操作会引起死循环,导致CPU利用率接近100%,所以在并发情况下不能使用HashMap,因为多线程会导致HashMap的Entry链表形成环形数据结构,一旦形成环形数据结构,Entry的next节点永远不为空,就会产生死循环获取Entry
- 效率低下的HashTable,HashTable容器使用synchronized来保证线程安全,但在线程竞争激烈的情况下HashTable的效率非常低下。因为当一个线程访问HashTable的同步方法,其他线程也访问HashTable的同步方法时,会进入阻塞或轮询状态。如线程1使用put进行元素添加,线程2不但不能使用put方法添加元素,也不能使用get方法来获取元素,所以竞争越激烈效率越低
HashTable容器在竞争激烈的并发环境下表现出效率低下的原因是所有访问HashTable的线程都必须竞争同一把锁,假如容器里有多把锁,每一把锁用于锁容器其中一部分数据,那么当多线程访问容器里不同数据段的数据时,线程间就不会存在锁竞争,从而可以有效提高并发访问效率,这就是ConcurrentHashMap所使用的锁分段技术。首先将数据分成一段一段地存储,然后给每一段数据配一把锁,当一个线程占用锁访问其中一个段数据的时候,其他段的数据也能被其他线程访问
ConcurrentHashMap结构
ConcurrentHashMap是由Segment数组结构和HashEntry数组结构组成。
- Segment,一个ConcurrentHashMap里包含一个Segment数组。Segment的结构和HashMap类似,是一种数组和链表结构,【JDK1.7】Segment是一种可重入锁(ReentrantLock),在ConcurrentHashMap里扮演锁的角色
- HashEntry,则用于存储键值对数据。一个Segment里包含一个HashEntry数组,每个HashEntry是一个链表结构的元素,每个Segment守护着一个HashEntry数组里的元素,当对HashEntry数组的数据进行修改时,必须首先获得与它对应的Segment锁
整体结构如下,数据结构为:Segment+HashEntry数组+链表
JDK 1.8中ConcurrentHashMap的实现已经摒弃了Segment的概念,而是直接使用Node数组+链表+红黑树(与HashMap的底层实现相同)的数据结构实现,并发控制使用了synchronized和CAS操作。整体就像是优化过且线程安全的HashMap,虽然在JDK 1.8中还能看到Segment的数据结构,但已经简化了其属性,只是为了兼容旧版本
ConcurrentHashMap初始化
ConcurrentHashMap初始化方法是通过initialCapacity、loadFactor和concurrencyLevel等几个参数来初始化segment数组、段偏移量segmentShift、段掩码segmentMask和每个segment里的HashEntry数组来实现的
- initialCapacity,ConcurrentHashMap的初始容量,初始默认为16
- concurrencyLevel/ssize,segments数组的大小,默认为16,最大为65536,concurrencyLevel 表示并发度,默认16。并发度可以理解为程序运行时能够同时更新ConccurentHashMap且不产生锁竞争的最大线程数,实际上就是ConcurrentHashMap中的分段锁个数,即segments数组的长度
- loadFactor, 扩容因子,默认0.75,当一个Segment存储的元素数量大于
threshold
时,该Segment会进行一次扩容 - cap,segment里HashEntry数组的长度,为initialCapacity除以ssize的倍数,如果c大于1,就会取大于等于c的2的N次方值,所以cap不是1就是2的N次方
- threshold,单个segment的容量,值为
threshold = (int)cap*loadFactor
。
那么我们计算的时候可以依据初始值来进行一系列计算。例如initialCapacity为16个元素,负载因子设置为0.75,ssize为16,则c=16/16等于1,则cap为1,也就是每个segment数组长度为1,threshold 容量为(int)1*0.75=0
初始化segments数组
下面为初始化segments数组的源码
if(concurrencyLevel > MAX_SEGMENTS) concurrencyLevel = MAX_SEGMENTS; int sshift = 0; int ssize = 1; while (ssize < concurrencyLevel) { ++sshift; ssize <<= 1; } segmentShift = 32 - sshift; segmentMask = ssize -1; this.segments = Segment.newArray(ssize);
由上面代码可知,ssize用位运算来计算(ssize <<= 1),所以segments数组的大小取值为2的N次方,即为大于或等于concurrencyLevel的最低的N次方值来作为segment数组的长度。假如concurrencyLevel等于14、15或16,ssize都会等于16,即容器里锁的个数也是16
当然concurrencyLevel最大只能用16位的二进制来表示,即65535,这意味着segments数组的长度最大为65536,对应的二进制为16位
初始化segmentShift和segmentMask
这两个全局变量需要在定位segment的时的散列算法里使用,由初始化segments数组的代码中可知,
- sshift等于ssize从1向左移位的次数,在默认情况下concurrencyLevel等于16,则1需要向左移位移动4次,所以sshift等于4
- 段偏移量segmentShift用于定位参与散列预算的位数,segmentShift = 32 - sshift,所以默认为28.
- segmentMask是散列运算的掩码,segmentMask = ssize -1,即默认为15,掩码的二进制各个位的值都是1。
因为ssize的最大长度为65536,所以segmentShift最大值为16,segmentMask最大值为65535,对应的二进制为16位,每个位都是1。
初始化每个segment
输入参数initialCapacity是ConcurrentHashMap的初始化容量,loadfactor是每个segment的负载因子,在构造方法中需要通过这两个参数来初始化数组中的每个segment
if (initialCapacity > MAXIMUM_CAPACITY) initialCapacity = MAXIMUM_CAPACITY; int c = initialCapacity / ssize; if (c * ssize < initialCapacity) ++c; int cap = 1; while (cap < c) cap <<= 1; for (int i = 0; i < this.segments.length; ++i) this.segments[i] = new Segment<K,V>(cap, loadFacotr);
上述代码中的cap是segment里HashEntry数组的长度,它等于initialCapacity除以ssize的倍数,如果c大于1,就会取大于等于c的2的N次方值,所以cap不是1就是2的N次方。segment的容量threshold = (int)cap*loadFactor
,默认情况下initialCapacity
等于16,也就是容纳16个元素,loadFactor(负载因子)等于0.75,通过计算cap=1,threshold=0
定位Segment
ConcurrentHashMap使用分段所Segment来保护不同段的数据,那么在插入和获取元素的时候,必须先通过散列算法定位到某一个Segment。ConcurrentHashMap首先选用Wang/Jenkins hash的变种算法对元素的hashCode进行一次再散列。
private static int hash(int h) { h += (h << 15) ^ 0xffffcd7d; h ^= (h >>> 10); h += (h << 3); h ^= (h >>> 6); h += (h << 2) + (h << 14); return h ^ (h >>> 16); }
使用再散列算法,目的为了减少散列冲突,使元素能够均有地分步在不同的Segment上,从而提高容器的存取效率。假如散列的质量差到极点,那么所有的元素都在一个Segment中,不仅存取元素缓慢,分段所也会失去意义。在JDK 1.7中ConcurrentHashMap通过以下散列算法定位segment
final Segment<K,V> segmentFor(int hash) { return segments[(hash >>> segmentShift) & segmentMask]; }
Put/Get/Size操作
由于JDK1.7和1.8的底层实现和方法有所不同,所以我们这里分别介绍下:
Put操作
JDK1.7中的put操作如下
static class Segment<K,V> extends ReentrantLock implements Serializable
从上述Segment的继承体系中可以看出,Segment实现了ReentrantLock,也就带有锁的功能。由于put方法需要对共享变量进行写入操作,所以为了线程安全,在操作共享变量是,必须加锁。
- 对segment加锁,在加锁时,会通过继承ReentrantLock的tryLock()方法尝试获取锁,若获取成功,就直接在相应的位置插入;若已经有线程获取了该Segment的锁,那当前线程会以自旋的方式继续调用tryLock()方法获取锁,超过指定次数就挂起,等待唤醒
- 判断是否需要对Segment里的HashEntry数组进行扩容
- 定位添加元素的位置,然后将其放在HashEntry数组中
其中如果需要扩容该如何扩容呢?
- 在插入元素前会先判断Segment里的HashEntry数组是否超过容量threshold,如果超过了阈值,则对数组进行扩容。
- 建立一个容量是原来两倍的数组,然后将原数组中的元素再散列后插入到新数组中。为了高效,
ConcurrentHashMap
不会对整个容器进行扩容,而是只对某个segment进行扩容
这里的扩容方式与HashMap的扩容方式稍有不同,HashMap是在插入元素之后判断元素是否已经达到容量,如果达到了就进行扩容,但是有可能扩容之后就没有新元素插入,则HashMap就进行了一次无效的扩容。
JDK1.8中的put操作如下
对当前table进行无条件自循环put成功,可以分成以下步骤
- 如果没有初始化就调用initTable()方法来进行初始化过程,若通过散列得到的位置中没有节点,则不加锁直接将节点通过CAS操作插入
- 如果发现该桶中有一个节点,需要扩容则进行扩容
- 如果存在hash冲突,就加锁来保证线程安全,这里存在两种情况:一种是链表形式就直接遍历到尾部插入;另一种是红黑树形式,就按红黑树结构插入。判断依据是如果链表的数量大于阈值,则先转换为红黑树结构,再一次进入循环
如果添加成功,则调用addCount()统计size,并且检查是否需要扩容。
Get操作
JDK1.7中的get操作如下
public V get(Object key) { int hash = hash(key.hashCode()); //1,先经过一次再散列 return segmentFor(hash).get(key, hash); //2,使用这个散列值通过散列运算定位到Segment,再通过散列算法定位到元素 }
JDK1.8中的get操作如下
public V get(Object key) { Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek; , int h = spread(key.hashCode()); if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) { // 1,计算hash值,定位到该table索引位置,如果是首节点符合,则返回 if ((eh = e.hash) == h) { if ((ek = e.key) == key || (ek != null && key.equals(ek))) return e.val; } // 2. 如果遇到扩容的时候,会调用标志正在扩容节点Forwarding Node的find方法,查找该节点,若匹配则返回 // 若查找到就返回 else if (eh < 0) return (p = e.find(h, key)) != null ? p.val : null; // 3,若既不是首节点也不是forwarding node,则向下遍历 while ((e = e.next) != null) { if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; } } return null; }
get操作的高效体现在整个get过程不需要加锁,除非读到的值是空才会加锁重读,因为它的get方法里将使用的共享变量都定义成volatile类型。例如用于统计当前Segment大小的count字段和用于存储值的HashEntry的value。定义成volatile的变量,能够在线程之间保持可见性,能够被多线程同时读,并且保证不会读到过期的值(由Java内存模型的happen before原则保证),但是只能被单线程写(有一种情况可以被多线程写,就是写入的值不依赖于原值)。在get操作期间,只需要读取共享变量count和value值,所以不需要加锁
Size操作
计算ConcurrentHashMap的元素大小,就必须要统计Segment里的元素的大小后求和。上面说过Segment的全局变量count是一个volatile变量,在并发的场景下,可能会导致计算出来的size值和实际的size值有偏差。因为在计算count值的时候,可能有新数据的插入,导致结果的不准确。那么,最安全的做法就是在统计size的时候把所有Segment的put、remove和clean方法全部锁住,但这种做法显然是非常低效的。
JDK1.7中的size操作如下
JDK 1.7中是如下处理的,先尝试2次通过不锁住Segment的方式来统计各个Segment大小,如果统计的过程中,容器的count发生了变化,则在采用加锁的方式来统计所有Segment的大小。使用modCount变量判断容器是否发生了变化,在put、remove和clean方法里操作元素前都会将变量modCount进行加1操作,那么在统计size前后比较modCount是否发生变化,从而得知容器的大小是否发生变化
try { for (;;) { if (retries++ == RETRIES_BEFORE_LOCK) { for (int j = 0; j < segments.length; ++j) ensureSegment(j).lock(); // force creation } sum = 0L; size = 0; overflow = false; for (int j = 0; j < segments.length; ++j) { Segment<K,V> seg = segmentAt(segments, j); if (seg != null) { sum += seg.modCount; int c = seg.count; if (c < 0 || (size += c) < 0) overflow = true; } } if (sum == last) break; // 进行2次统计 last = sum; } } finally { if (retries > RETRIES_BEFORE_LOCK) { for (int j = 0; j < segments.length; ++j) segmentAt(segments, j).unlock(); } }
JDK1.8中的size操作如下
public int size() { long n = sumCount(); return ((n < 0L) ? 0 : (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int)n); } final long sumCount() { CounterCell[] as = counterCells; CounterCell a; // 变化的数量 long sum = baseCount; if (as != null) { for (int i = 0; i < as.length; ++i) { if ((a = as[i]) != null) sum += a.value; } } return sum; }
CopyOnWriteArray写时复制
JUC集合包中的List和Set实现类按照写时复制的实现原理包括: CopyOnWriteArrayList和 CopyOnWriteArraySet。
写时复制原理解决了并发修改异常,每当有写入的时候,这个时候CopyOnWriteArrayList 底层实现添加的原理是先copy出一个容器(可以简称副本),再往新的容器里添加这个新的数据,最后把新的容器的引用地址赋值给了之前那个旧的的容器地址,但是在添加这个数据的期间,其他线程如果要去读取数据,仍然是读取到旧的容器里的数据。
- CopyOnWriteArrayList相当于线程安全的ArrayList,它实现了List接口。CopyOnWriteArrayList是支持高并发的。
- CopyOnWriteArraySet相当于线程安全的HashSet,它继承于AbstractSet类。 内部包含一个
CopyOnWriteArrayList
对象(聚合关系),它是通过CopyOnWriteArrayList实现的
这种方式采用了写时加锁复制,读时读旧容器不需任何处理,这种方式有两个显著的缺点:
- 内存占用问题,很明显,两个数组同时驻扎在内存中,如果实际应用中,数据比较多,而且比较大的情况下,占用内存会比较大,针对这个其实可以用ConcurrentHashMap来代替。
- 数据一致性问题,CopyOnWrite容器只能保证数据的最终一致性,不能保证数据的实时一致性。所以如果你希望写入的的数据,马上能读到,请不要使用CopyOnWrite容器
基于此我们一般不会大量使用。
JUC并发包下的线程池
本部分回答以下几个问题,如果能回答正确,则证明本部分掌握好了。
- 线程池的参数列表,分别什么含义
- 线程池的常用方法和方法比较
- 线程池的执行流程,任务拒绝策略有哪些
- Java提供了哪些预置线程池
接下来我们看这部分的内容。
线程池定义
线程池的整体定义如下,包含下列的参数列表:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {}
可以看到,其需要如下几个参数:
- corePoolSize(必需):核心线程数。默认情况下,核心线程会一直存活,但是当将
allowCoreThreadTimeout
设置为true时,核心线程也会超时回收。在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了prestartAllCoreThreads()
或者prestartCoreThread()
方法,从这2个方法的名字就可以看出,是预创建线程的意思,即在没有任务到来之前就创建corePoolSize个线程或者一个线程。默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中 - maximumPoolSize(必需):线程池所能容纳的最大线程数。当活跃线程数达到该数值后,后续的新任务将会阻塞
- keepAliveTime(必需):表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize,即当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。但是如果调用了allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0;
- unit(必需):指定keepAliveTime参数的时间单位。常用的有:TimeUnit.MILLISECONDS(毫秒)、TimeUnit.SECONDS(秒)、TimeUnit.MINUTES(分)。
- workQueue(必需):任务队列。通过线程池的execute()方法提交的Runnable对象(等待执行的任务)将存储在该参数中。其采用
阻塞队列
实现。ArrayBlockingQueue和PriorityBlockingQueue使用较少,一般使用LinkedBlockingQueue和Synchronous。线程池的排队策略与BlockingQueue有关,关于队列和并发集合在本系列的其它Blog进行介绍 - threadFactory(可选):线程工厂。用于指定为线程池创建新线程的方式。线程工厂指定创建线程的方式,需要实现ThreadFactory接口,并实现newThread(Runnable r)方法。
- handler(可选):拒绝策略。当达到最大线程数时需要执行的饱和策略。
了解了具体参数后我们再看看其中几个参数的可选项。