HashSet、TreeSet
package com.mmall.concurrency.example.commonUnsafe; import com.mmall.concurrency.annoations.NotThreadSafe; import lombok.extern.slf4j.Slf4j; import java.util.HashSet; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; @Slf4j @NotThreadSafe public class HashSetExample { // 请求总数 public static int clientTotal = 5000; // 同时并发执行的线程数 public static int threadTotal = 200; private static Set<Integer> set = new HashSet<>(); public static void main(String[] args) throws Exception { ExecutorService executorService = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(threadTotal); final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); for (int i = 0; i < clientTotal; i++) { final int count = i; executorService.execute(() -> { try { semaphore.acquire(); update(count); semaphore.release(); } catch (Exception e) { log.error("exception", e); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); log.info("size:{}", set.size()); } private static void update(int i) { set.add(i); } } // 输出 size:4955
解决方案一(同步容器:synchronizedSet)
package com.mmall.concurrency.example.syncContainer; import com.google.common.collect.Sets; import com.mmall.concurrency.annoations.ThreadSafe; import lombok.extern.slf4j.Slf4j; import java.util.Collections; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; @Slf4j @ThreadSafe public class CollectionsExample2 { // 请求总数 public static int clientTotal = 5000; // 同时并发执行的线程数 public static int threadTotal = 200; private static Set<Integer> set = Collections.synchronizedSet(Sets.newHashSet()); public static void main(String[] args) throws Exception { ExecutorService executorService = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(threadTotal); final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); for (int i = 0; i < clientTotal; i++) { final int count = i; executorService.execute(() -> { try { semaphore.acquire(); update(count); semaphore.release(); } catch (Exception e) { log.error("exception", e); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); log.info("size:{}", set.size()); } private static void update(int i) { set.add(i); } } // 输出 size:5000
解决方案二(并发容器:CopyOnWriteArraySet)(对应 HashSet)
package com.mmall.concurrency.example.concurrent; import com.mmall.concurrency.annoations.ThreadSafe; import lombok.extern.slf4j.Slf4j; import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; @Slf4j @ThreadSafe public class CopyOnWriteArraySetExample { // 请求总数 public static int clientTotal = 5000; // 同时并发执行的线程数 public static int threadTotal = 200; private static Set<Integer> set = new CopyOnWriteArraySet<>(); public static void main(String[] args) throws Exception { ExecutorService executorService = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(threadTotal); final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); for (int i = 0; i < clientTotal; i++) { final int count = i; executorService.execute(() -> { try { semaphore.acquire(); update(count); semaphore.release(); } catch (Exception e) { log.error("exception", e); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); log.info("size:{}", set.size()); } private static void update(int i) { set.add(i); } } // 输出 size:5000
分析
CopyOnWriteArraySet 写操作时复制,当有新元素添加到集合中时,从原有的数组中拷贝一份出来,然后在新的数组上作写操作,然后将原来的数组指向新的数组。整个数组的add操作都是在锁的保护下进行的,防止并发时复制多份副本。读操作是在原数组中进行,不需要加锁。
缺点
写操作时复制消耗内存
不能用于实时读的场景
由于复制和add操作等需要时间,故读取时可能读到旧值。
能做到最终一致性,但无法满足实时性的要求,更适合读多写少的场景。
如果无法知道数组有多大,或者add、set操作有多少,慎用此类。
迭代器不支持可变的remove操作。
设计思想
读写分离
最终一致性
使用时另外开辟空间,防止并发冲突
解决方案三(并发容器:ConcurrentSkipListSet)(对应 TreeSet)
package com.mmall.concurrency.example.concurrent; import com.mmall.concurrency.annoations.ThreadSafe; import lombok.extern.slf4j.Slf4j; import java.util.Set; import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; @Slf4j @ThreadSafe public class ConcurrentSkipListSetExample { // 请求总数 public static int clientTotal = 5000; // 同时并发执行的线程数 public static int threadTotal = 200; private static Set<Integer> set = new ConcurrentSkipListSet<>(); public static void main(String[] args) throws Exception { ExecutorService executorService = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(threadTotal); final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); for (int i = 0; i < clientTotal; i++) { final int count = i; executorService.execute(() -> { try { semaphore.acquire(); update(count); semaphore.release(); } catch (Exception e) { log.error("exception", e); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); log.info("size:{}", set.size()); } private static void update(int i) { set.add(i); } } // 输出 size:5000
分析
支持自然排序
基于Map集合来进一步封装
contains、add、remove是线程安全的,但是其addAll,containsAll,removeAll不是线程安全的(因为这些批量操作是调用的父类里基础的单个contains、add、remove线程安全方法,所以不能保证原子性)
不能存空值