对应的单元测试:
import org.junit.jupiter.api.Test; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; class AtomicReferenceTest { @Test void testGetSetPlain() throws InterruptedException { AtomicReference<String> ref = new AtomicReference<>("initial"); int numberOfThreads = 10; ExecutorService executor = Executors.newFixedThreadPool(numberOfThreads); for (int i = 0; i < numberOfThreads; i++) { executor.submit(() -> ref.setPlain("new")); } executor.shutdown(); assertTrue(executor.awaitTermination(1, TimeUnit.MINUTES)); assertEquals("new", ref.getPlain()); } @Test void testGetSetAcquireRelease() throws InterruptedException { AtomicReference<String> ref = new AtomicReference<>("initial"); int numberOfThreads = 10; ExecutorService executor = Executors.newFixedThreadPool(numberOfThreads); for (int i = 0; i < numberOfThreads; i++) { executor.submit(() -> ref.setRelease("new")); } executor.shutdown(); assertTrue(executor.awaitTermination(1, TimeUnit.MINUTES)); assertEquals("new", ref.getAcquire()); } @Test void testGetSetOpaque() throws InterruptedException { AtomicReference<String> ref = new AtomicReference<>("initial"); int numberOfThreads = 10; ExecutorService executor = Executors.newFixedThreadPool(numberOfThreads); for (int i = 0; i < numberOfThreads; i++) { executor.submit(() -> ref.setOpaque("new")); } executor.shutdown(); assertTrue(executor.awaitTermination(1, TimeUnit.MINUTES)); assertEquals("new", ref.getOpaque()); } @Test void testCompareAndExchange() throws InterruptedException { AtomicReference<String> ref = new AtomicReference<>("initial"); int numberOfThreads = 10; ExecutorService executor = Executors.newFixedThreadPool(numberOfThreads); for (int i = 0; i < numberOfThreads; i++) { executor.submit(() -> ref.compareAndExchange("initial", "changed")); } executor.shutdown(); assertTrue(executor.awaitTermination(1, TimeUnit.MINUTES)); assertEquals("changed", ref.get()); } @Test void testCompareAndExchangeAcquireRelease() throws InterruptedException { AtomicReference<String> ref = new AtomicReference<>("initial"); ExecutorService executor = Executors.newSingleThreadExecutor(); executor.submit(() -> assertEquals("initial", ref.compareAndExchangeAcquire("initial", "changed"))); executor.submit(() -> assertEquals("changed", ref.get())); executor.submit(() -> assertEquals("changed", ref.compareAndExchangeRelease("changed", "new"))); executor.submit(() -> assertEquals("new", ref.get())); executor.shutdown(); assertTrue(executor.awaitTermination(1, TimeUnit.MINUTES)); } @Test void testWeakCompareAndSetVolatile() throws InterruptedException { AtomicReference<String> ref = new AtomicReference<>("initial"); int numberOfThreads = 10; ExecutorService executor = Executors.newFixedThreadPool(numberOfThreads); for (int i = 0; i < numberOfThreads; i++) { executor.submit(() -> assertTrue(ref.weakCompareAndSetVolatile("initial", "changed"))); } executor.shutdown(); assertTrue(executor.awaitTermination(1, TimeUnit.MINUTES)); assertEquals("changed", ref.get()); } @Test void testWeakCompareAndSetAcquireRelease() throws InterruptedException { AtomicReference<String> ref = new AtomicReference<>("initial"); ExecutorService executor = Executors.newSingleThreadExecutor(); executor.submit(() -> assertTrue(ref.weakCompareAndSetAcquire("initial", "changed"))); executor.submit(() -> assertEquals("changed", ref.get())); executor.submit(() -> assertTrue(ref.weakCompareAndSetRelease("changed", "new"))); executor.submit(() -> assertEquals("new", ref.get())); executor.shutdown(); assertTrue(executor.awaitTermination(1, TimeUnit.MINUTES)); } @Test void testGetAndSet() throws InterruptedException { AtomicReference<String> ref = new AtomicReference<>("initial"); int numberOfThreads = 10; ExecutorService executor = Executors.newFixedThreadPool(numberOfThreads); for (int i = 0; i < numberOfThreads; i++) { executor.submit(() -> assertEquals("initial", ref.getAndSet("new"))); } executor.shutdown(); assertTrue(executor.awaitTermination(1, TimeUnit.MINUTES)); assertEquals("new", ref.get()); } @Test void testGetAndUpdate() throws InterruptedException { AtomicReference<Integer> ref = new AtomicReference<>(0); int numberOfThreads = 10; ExecutorService executor = Executors.newFixedThreadPool(numberOfThreads); for (int i = 0; i < numberOfThreads; i++) { executor.submit(() -> ref.getAndUpdate(x -> x + 1)); } executor.shutdown(); assertTrue(executor.awaitTermination(1, TimeUnit.MINUTES)); assertEquals(numberOfThreads, ref.get()); } @Test void testUpdateAndGet() throws InterruptedException { AtomicReference<Integer> ref = new AtomicReference<>(0); int numberOfThreads = 10; ExecutorService executor = Executors.newFixedThreadPool(numberOfThreads); for (int i = 0; i < numberOfThreads; i++) { executor.submit(() -> ref.updateAndGet(x -> x + 1)); } executor.shutdown(); assertTrue(executor.awaitTermination(1, TimeUnit.MINUTES)); assertEquals(numberOfThreads, ref.get()); } @Test void testGetAndAccumulate() throws InterruptedException { AtomicReference<Integer> ref = new AtomicReference<>(0); int numberOfThreads = 10; ExecutorService executor = Executors.newFixedThreadPool(numberOfThreads); for (int i = 0; i < numberOfThreads; i++) { executor.submit(() -> ref.getAndAccumulate(1, Integer::sum)); } executor.shutdown(); assertTrue(executor.awaitTermination(1, TimeUnit.MINUTES)); assertEquals(numberOfThreads, ref.get()); } @Test void testAccumulateAndGet() throws InterruptedException { AtomicReference<Integer> ref = new AtomicReference<>(0); int numberOfThreads = 10; ExecutorService executor = Executors.newFixedThreadPool(numberOfThreads); for (int i = 0; i < numberOfThreads; i++) { executor.submit(() -> ref.accumulateAndGet(1, Integer::sum)); } executor.shutdown(); assertTrue(executor.awaitTermination(1, TimeUnit.MINUTES)); assertEquals(numberOfThreads, ref.get()); } @Test void compareSetAndGetPlainWithVolatile() throws InterruptedException { AtomicReference<String> ref = new AtomicReference<>("initial"); // set/get (volatile access) ExecutorService executorVolatile = Executors.newFixedThreadPool(2); executorVolatile.execute(() -> ref.set("volatileUpdate")); executorVolatile.execute(() -> assertEquals("volatileUpdate", ref.get())); executorVolatile.shutdown(); // setPlain/getPlain (non-volatile access) AtomicReference<String> refPlain = new AtomicReference<>("initial"); ExecutorService executorPlain = Executors.newFixedThreadPool(2); executorPlain.execute(() -> refPlain.setPlain("plainUpdate")); executorPlain.execute(() -> assertEquals("plainUpdate", refPlain.getPlain())); executorPlain.shutdown(); assertTrue(executorVolatile.awaitTermination(1, TimeUnit.SECONDS)); assertTrue(executorPlain.awaitTermination(1, TimeUnit.SECONDS)); } @Test void compareCompareAndSetWithWeakCompareAndSetVolatile() throws InterruptedException { AtomicReference<Integer> ref = new AtomicReference<>(0); // compareAndSet ExecutorService executorCAS = Executors.newFixedThreadPool(10); for (int i = 0; i < 10; i++) { executorCAS.submit(() -> { while (!ref.compareAndSet(ref.get(), ref.get() + 1)) { // retry } }); } // weakCompareAndSetVolatile AtomicReference<Integer> refWeak = new AtomicReference<>(0); ExecutorService executorWeakCAS = Executors.newFixedThreadPool(10); for (int i = 0; i < 10; i++) { executorWeakCAS.submit(() -> { while (!refWeak.weakCompareAndSetVolatile(refWeak.get(), refWeak.get() + 1)) { // retry } }); } executorCAS.shutdown(); executorWeakCAS.shutdown(); assertTrue(executorCAS.awaitTermination(1, TimeUnit.SECONDS)); assertTrue(executorWeakCAS.awaitTermination(1, TimeUnit.SECONDS)); assertEquals(10, ref.get()); assertEquals(10, refWeak.get()); } @Test void compareGetAndSetWithCompareAndExchange() throws InterruptedException { AtomicReference<String> ref = new AtomicReference<>("initial"); // getAndSet ExecutorService executorGetAndSet = Executors.newSingleThreadExecutor(); executorGetAndSet.submit(() -> assertEquals("initial", ref.getAndSet("new"))); executorGetAndSet.shutdown(); // compareAndExchange AtomicReference<String> refExchange = new AtomicReference<>("initial"); ExecutorService executorExchange = Executors.newSingleThreadExecutor(); executorExchange.submit(() -> assertEquals("initial", refExchange.compareAndExchange("initial", "new"))); executorExchange.shutdown(); assertTrue(executorGetAndSet.awaitTermination(1, TimeUnit.SECONDS)); assertTrue(executorExchange.awaitTermination(1, TimeUnit.SECONDS)); assertEquals("new", ref.get()); assertEquals("new", refExchange.get()); } }