不安全的容器
final List<Integer> l1 = new ArrayList<Integer>(); new Thread(){ public void run() { for (int i = 0; i < 1000; i++) { l1.add(i); } }; }.start(); for (int i = 0; i < 1000; i++) { l1.add(i); } TimeUnit.SECONDS.sleep(2); System.out.println(l1.size());
打印如下:
Exception in thread "Thread-0" java.lang.ArrayIndexOutOfBoundsException: 16 at java.util.ArrayList.add(ArrayList.java:352) at com.price.concurrent.TestConcurrentClass$6.run(TestConcurrentClass.java:127) 1001
因为并发带来了内部错误。
同步的容器
而对于并发容器来讲比如Vector和Collections.synchronizedList()就不会出现这个问题。
Vector是用方法锁来实现的, 而后者是用同步块来实现的,因此后者的效率较高。
同步容器的单个方法都是安全的,比如上面的那个例子改为使用同步容器:
final List<Integer> l2 = Collections.synchronizedList(new ArrayList<Integer>()); new Thread(){ public void run() { for (int i = 0; i < 1000; i++) { l2.add(i); } }; }.start(); v1.iterator(); for (int i = 0; i < 1000; i++) { l2.add(i); } TimeUnit.SECONDS.sleep(2); System.out.println(l2.size());
会打印2000.不会出现异常
加锁复合操作
但是通常对于容器的操作还会有很多复合操作,比如迭代、缺少才加入等操作,还是会出现问题,这时候需要加入额外的锁。
复合操作:
final List<Integer> l2 = Collections.synchronizedList(new ArrayList<Integer>()); for (int i = 0; i < 1000; i++) { l2.add(i); } new Thread(){ public void run() { for (int i = 0; i < 1000; i++) { // synchronized (l2) { l2.add(i); // } } }; }.start(); // synchronized (l2) { Iterator<Integer> i1 = l2.iterator(); while(i1.hasNext()){ Integer i = i1.next(); } // } TimeUnit.SECONDS.sleep(2);该代码会抛出:
Exception in thread "main" java.util.ConcurrentModificationException
因为遍历的时候会实时检查集合的数量是否发生变化,如果有另外一个线程修改了集合数量则会抛出这个异常。
如果放开代码中的同步块,则不会再抛出异常了。
为了解决这个问题,除了使用加锁的方式外,还可以在遍历之前进行拷贝。
对于这些复合操作,JDK提供了许多类库,提供了比客户端加锁更好的并发性和可伸缩性。
分离锁集合
ConcurrentHashMap
提供了putIfAbsent等方法提供了一些常用复合操作的并发安全方法。
其实现的机制使用了分离锁, 先hash key到每一个桶上,然后对单独的桶加锁,这样就能够把锁的消耗分解得很小。
CopyOnWriteArrayList
同样提供了很多常用复合操作的并发安全方法。
其实现的机制可以看看如下两个方法:
public E set(int index, E element) { final ReentrantLock lock = this.lock; lock.lock(); try { Object[] elements = getArray(); Object oldValue = elements[index]; if (oldValue != element) { int len = elements.length; Object[] newElements = Arrays.copyOf(elements, len); newElements[index] = element; setArray(newElements); } else { // Not quite a no-op; ensures volatile write semantics setArray(elements); } return (E)oldValue; } finally { lock.unlock(); } }
final void setArray(Object[] a) { array = a; }
public Iterator<E> iterator() { return new COWIterator<E>(getArray(), 0); } final Object[] getArray() { return array; }
这样所有的可能修改集合的方法都是加了锁的,在修改的时候创建了新的集合,永远不会修改老的集合。
而不会修改集合的地方,比如遍历集合是直接返回了一个当前的数组引用,这个引用不会被修改,因为修改行为会创建新的数组来给引用赋值。
这样很适用于写少读多的情况。
阻塞队列
前面说过使用wait和notifyAll来实现生产者消费者模式。 这里我们有更好的集合可以使用 BlockingQueuepackage com.price.concurrent; import java.util.Random; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; public class TestBlockingQueue { public static void main(String[] args) { BlockingQueue<String> queue = new ArrayBlockingQueue<String>(10); final Producer p = new Producer(queue); new Thread(){ public void run() { int i = 0; while(true){ try { TimeUnit.MILLISECONDS.sleep(new Random().nextInt(1000)); } catch (InterruptedException e) { e.printStackTrace(); } p.product("product" + i++); } }; }.start(); Customer c = new Customer(queue); while(true){ try { TimeUnit.MILLISECONDS.sleep(new Random().nextInt(1000)); } catch (InterruptedException e) { e.printStackTrace(); } c.custome(); } } } class Producer{ private BlockingQueue<String> queue; public Producer(BlockingQueue<String> queue) { this.queue = queue; } public void product(String product){ try { queue.put(product); } catch (InterruptedException e) { e.printStackTrace(); } } } class Customer{ private BlockingQueue<String> queue; public Customer(BlockingQueue<String> queue) { this.queue = queue; } public void custome(){ try { System.out.println(queue.take()); } catch (InterruptedException e) { e.printStackTrace(); } } }这里如果使用非安全的队列,出了无法实现阻塞效果外,还会造成,多线程写丢失,内部状态不一致,甚至抛出边界异常等等
对于队列,还提供了如下的方法:
add offer put 添加一个元素, 如果满了抛出异常,返回false, 阻塞(仅BockingQueue支持)
remove poll take 取第一个元素并删除 , 如果集合为空,抛出异常,返回null ,阻塞(仅BockingQueue支持)
element peek 返回头元素, 如果为空 抛出异常, 返回null.
出了上面这种外JDK还根据需求提供了别的,比如PriorityQueue 根据compare方法来决定取出顺序的队列
Deque和BlockingDeque双向队列,每个消费者有自己的双端队列,自己的队列完成之后会尝试去消费其他的队列。