1. 并发基础概念:
public class Counter {<!-- --> private int count; public synchronized void increment() {<!-- --> count++; } public int getCount() {<!-- --> return count; } }
public class AtomicIntegerDemo {<!-- --> private static AtomicInteger counter = new AtomicInteger(0); public static void main(String[] args) throws InterruptedException {<!-- --> Thread t1 = new Thread(() -> {<!-- --> for (int i = 0; i < 10000; i++) {<!-- --> counter.incrementAndGet(); } }); Thread t2 = new Thread(() -> {<!-- --> for (int i = 0; i < 10000; i++) {<!-- --> counter.incrementAndGet(); } }); t1.start(); t2.start(); t1.join(); t2.join(); System.out.println(counter.get()); } }
public class VolatileDemo {<!-- --> private static volatile boolean flag = false; public static void main(String[] args) throws InterruptedException {<!-- --> Thread readerThread = new Thread(() -> {<!-- --> while (!flag) {<!-- --> // do nothing } System.out.println("Flag is now true"); }); Thread writerThread = new Thread(() -> {<!-- --> flag = true; System.out.println("Flag is now true"); }); readerThread.start(); writerThread.start(); readerThread.join(); writerThread.join(); } }
2. 线程池:
public class ThreadPoolExecutorDemo {<!-- --> public static void main(String[] args) {<!-- --> ExecutorService executorService = new ThreadPoolExecutor( 2, // corePoolSize 5, // maximumPoolSize 60, // keepAliveTime TimeUnit.SECONDS, // unit new LinkedBlockingQueue<Runnable>(10), // workQueue new ThreadPoolExecutor.CallerRunsPolicy() // handler ); for (int i = 1; i <= 20; i++) {<!-- --> final int taskId = i; executorService.execute(() -> {<!-- --> System.out.println("Task #" + taskId + " is running on thread " + Thread.currentThread().getName()); try {<!-- --> Thread.sleep(2000); } catch (InterruptedException e) {<!-- --> e.printStackTrace(); } }); } executorService.shutdown(); } }
public class ExecutorsDemo {<!-- --> public static void main(String[] args) {<!-- --> ExecutorService executorService = Executors.newFixedThreadPool(5); for (int i = 1; i <= 20; i++) {<!-- --> final int taskId = i; executorService.execute(() -> {<!-- --> System.out.println("Task #" + taskId + " is running on thread " + Thread.currentThread().getName()); try {<!-- --> Thread.sleep(2000); } catch (InterruptedException e) {<!-- --> e.printStackTrace(); } }); } executorService.shutdown(); } }
3. Lock接口:
public class ReentrantLockDemo {<!-- --> private static final Lock lock = new ReentrantLock(); private static int count = 0; public static void main(String[] args) throws InterruptedException {<!-- --> Thread t1 = new Thread(() -> {<!-- --> for (int i = 0; i < 10000; i++) {<!-- --> increment(); } }); Thread t2 = new Thread(() -> {<!-- --> for (int i = 0; i < 10000; i++) {<!-- --> increment(); } }); t1.start(); t2.start(); t1.join(); t2.join(); System.out.println("Count: " + count); } private static void increment() {<!-- --> lock.lock(); try {<!-- --> count++; } finally {<!-- --> lock.unlock(); } } }
public class ReentrantReadWriteLockDemo {<!-- --> private static final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); private static final Lock readLock = lock.readLock(); private static final Lock writeLock = lock.writeLock(); private static int count = 0; public static void main(String[] args) throws InterruptedException {<!-- --> Thread t1 = new Thread(() -> {<!-- --> for (int i = 0; i < 10000; i++) {<!-- --> increment(); } }); Thread t2 = new Thread(() -> {<!-- --> for (int i = 0; i < 10000; i++) {<!-- --> get(); } }); t1.start(); t2.start(); t1.join(); t2.join(); System.out.println("Count: " + count); } private static void increment() {<!-- --> writeLock.lock(); try {<!-- --> count++; } finally {<!-- --> writeLock.unlock(); } } private static int get() {<!-- --> readLock.lock(); try {<!-- --> return count; } finally {<!-- --> readLock.unlock(); } } }
public class StampedLockDemo {<!-- --> private static final StampedLock lock = new StampedLock(); private static int count = 0; public static void main(String[] args){<!-- --> Thread t1 = new Thread(() -> {<!-- --> for (int i = 0; i < 10000; i++) {<!-- --> increment(); } }); Thread t2 = new Thread(() -> {<!-- --> for (int i = 0; i < 10000; i++) {<!-- --> get(); } }); t1.start(); t2.start(); t1.join(); t2.join(); System.out.println("Count: " + count); } private static void increment() {<!-- --> long stamp = lock.writeLock(); try {<!-- --> count++; } finally {<!-- --> lock.unlockWrite(stamp); } } private static int get() {<!-- --> long stamp = lock.tryOptimisticRead(); int c = count; if (!lock.validate(stamp)) {<!-- --> stamp = lock.readLock(); try {<!-- --> c = count; } finally {<!-- --> lock.unlockRead(stamp); } } return c; } }
4. Condition接口:
public class ConditionDemo {<!-- --> private static final Lock lock = new ReentrantLock(); private static final Condition notEmpty = lock.newCondition(); private static final Condition notFull = lock.newCondition(); private static final int CAPACITY = 10; private static final Queue<Integer> queue = new LinkedList<>(); public static void main(String[] args) throws InterruptedException {<!-- --> Thread t1 = new Thread(() -> {<!-- --> for (int i = 0; i < 20; i++) {<!-- --> try {<!-- --> produce(i); } catch (InterruptedException e) {<!-- --> e.printStackTrace(); } } }); Thread t2 = new Thread(() -> {<!-- --> for (int i = 0; i < 20; i++) {<!-- --> try {<!-- --> consume(); } catch (InterruptedException e) {<!-- --> e.printStackTrace(); } } }); t1.start(); t2.start(); t1.join(); t2.join(); } private static void produce(int value) throws InterruptedException {<!-- --> lock.lock(); try {<!-- --> while (queue.size() == CAPACITY) {<!-- --> notFull.await(); } queue.offer(value); System.out.println("Produced: " + value); notEmpty.signalAll(); } finally {<!-- --> lock.unlock(); } } private static void consume() throws InterruptedException {<!-- --> lock.lock(); try {<!-- --> while (queue.isEmpty()) {<!-- --> notEmpty.await(); } int value = queue.poll(); System.out.println("Consumed: " + value); notFull.signalAll(); } finally {<!-- --> lock.unlock(); } } }
5. CAS(Compare-And-Swap)操作:
public class CASDemo {<!-- --> private static final AtomicReference<String> atomicStr = new AtomicReference<>("Hello, World!"); public static void main(String[] args) throws InterruptedException {<!-- --> Thread t1 = new Thread(() -> {<!-- --> boolean swapped = false; while (!swapped) {<!-- --> String prev = atomicStr.get(); String next = prev.replace("World", "John"); swapped = atomicStr.compareAndSet(prev, next); } }); Thread t2 = new Thread(() -> {<!-- --> boolean swapped = false; while (!swapped) {<!-- --> String prev = atomicStr.get(); String next = prev.replace("World", "Mary"); swapped = atomicStr.compareAndSet(prev, next); } }); t1.start(); t2.start(); t1.join(); t2.join(); System.out.println(atomicStr.get()); } }
在这个示例中,我们使用AtomicReference类来实现CAS操作,并定义了一个初始字符串"Hello, World!“。在t1线程中,我们不断尝试将字符串中的"World"替换成"John”;在t2线程中,我们不断尝试将字符串中的"World"替换成"Mary"。通过compareAndSet()方法比较并更新字符串的值,直至成功,最终输出更新后的字符串。
6. 原子类:
public class AtomicDemo {<!-- --> private static final AtomicInteger atomicInt = new AtomicInteger(0); public static void main(String[] args) throws InterruptedException {<!-- --> Thread t1 = new Thread(() -> {<!-- --> for (int i = 0; i < 10000; i++) {<!-- --> atomicInt.incrementAndGet(); } }); Thread t2 = new Thread(() -> {<!-- --> for (int i = 0; i < 10000; i++) {<!-- --> atomicInt.addAndGet(2); } }); t1.start(); t2.start(); t1.join(); t2.join(); System.out.println("Count: " + atomicInt.get()); } }
7. 并发容器:
public class ConcurrentHashMapDemo {<!-- --> public static void main(String[] args) throws InterruptedException {<!-- --> Map<String, Integer> map = new ConcurrentHashMap<>(); Thread t1 = new Thread(() -> {<!-- --> for (int i = 0; i < 10000; i++) {<!-- --> map.put("Key" + i, i); } }); Thread t2 = new Thread(() -> {<!-- --> for (int i = 0; i < 10000; i++) {<!-- --> map.remove("Key" + i); } }); t1.start(); t2.start(); t1.join(); t2.join(); System.out.println("Size: " + map.size()); } }
8. CountDownLatch:
public class CountDownLatchDemo {<!-- --> private static final int THREAD_COUNT = 5; private static final CountDownLatch startLatch = new CountDownLatch(1); private static final CountDownLatch endLatch = new CountDownLatch(THREAD_COUNT); public static void main(String[] args) throws InterruptedException {<!-- --> for (int i = 0; i < THREAD_COUNT; i++) {<!-- --> Thread t = new Thread(() -> {<!-- --> try {<!-- --> System.out.println(Thread.currentThread().getName() + " waiting to start..."); startLatch.await(); System.out.println(Thread.currentThread().getName() + " started"); Thread.sleep((long) (Math.random() * 10000)); System.out.println(Thread.currentThread().getName() + " finished"); } catch (InterruptedException e) {<!-- --> e.printStackTrace(); } finally {<!-- --> endLatch.countDown(); } }, "Thread-" + i); t.start(); } Thread.sleep(3000); System.out.println("All threads ready, start now!"); startLatch.countDown(); endLatch.await(); System.out.println("All threads finished!"); } }
在这个示例中,我们使用CountDownLatch类来实现线程同步,并定义了两个计数器startLatch和endLatch。在main方法中,我们创建了5个线程,每个线程都会等待startLatch的计数器值减为0后才开始执行;执行完毕后,将endLatch的计数器值减1。在主线程中,我们等待3秒钟后,将startLatch的计数器值减为0,从而使5个等待的线程开始执行;然后等待所有线程的执行结束,即当endLatch的计数器值减为0时输出"All threads finished!"。
9. CyclicBarrier:
public class CyclicBarrierDemo {<!-- --> private static final int THREAD_COUNT = 5; private static final CyclicBarrier barrier = new CyclicBarrier(THREAD_COUNT, () -> {<!-- --> System.out.println("All threads arrived at barrier!"); }); public static void main(String[] args) throws InterruptedException {<!-- --> for (int i = 0; i < THREAD_COUNT; i++) {<!-- --> Thread t = new Thread(() -> {<!-- --> try {<!-- --> System.out.println(Thread.currentThread().getName() + " working..."); Thread.sleep((long) (Math.random() * 10000)); System.out.println(Thread.currentThread().getName() + " arrived at barrier!"); barrier.await(); System.out.println(Thread.currentThread().getName() + " continue working..."); Thread.sleep((long) (Math.random() * 10000)); System.out.println(Thread.currentThread().getName() + " finished"); } catch (InterruptedException | BrokenBarrierException e) {<!-- --> e.printStackTrace(); } }, "Thread-" + i); t.start(); } } }
10. Semaphore:
public class SemaphoreDemo {<!-- --> private static final int THREAD_COUNT = 10; private static final Semaphore semaphore = new Semaphore(5); public static void main(String[] args) throws InterruptedException {<!-- --> for (int i = 0; i < THREAD_COUNT; i++) {<!-- --> Thread t = new Thread(() -> {<!-- --> try {<!-- --> semaphore.acquire(); System.out.println(Thread.currentThread().getName() + " acquiring resource..."); Thread.sleep((long) (Math.random() * 10000)); System.out.println(Thread.currentThread().getName() + " releasing resource..."); semaphore.release(); } catch (InterruptedException e) {<!-- --> e.printStackTrace(); } }, "Thread-" + i); t.start(); } } }
11. Future接口和CompletableFuture类:
CompletableFuture类是Java 8中新增的一个类,它继承自Future接口,并提供了更加简洁易用的异步编程方式。CompletableFuture类支持链式调用、组合多个子任务、异常处理等特性,可以方便地实现复杂的异步编程逻辑。
public class CompletableFutureDemo {<!-- --> public static void main(String[] args) throws InterruptedException, ExecutionException {<!-- --> CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {<!-- --> System.out.println("Task started."); try {<!-- --> Thread.sleep(5000); } catch (InterruptedException e) {<!-- --> e.printStackTrace(); } System.out.println("Task finished."); return "Hello, World!"; }); System.out.println("Waiting for the result..."); String result = future.get(); System.out.println("Result: " + result); } }
在这个示例中,我们使用CompletableFuture类来实现异步编程,并定义了一个supplyAsync()方法,它会在另一个线程中执行任务并返回计算结果。在main方法中,我们创建了一个CompletableFuture对象future,然后调用get()方法等待任务完成,并获取计算结果。最终输出结果"Hello, World!"。