1. 并发基础概念:
并发编程是指多个线程同时执行程序的情况。在并发编程中,由于多个线程可能同时访问共享资源,因此需要考虑线程同步、原子性、可见性等问题。
线程安全:
指在多线程环境下,对共享数据进行访问时,不会出现数据污染或不一致的问题。为了实现线程安全,可以使用锁机制或者其他并发控制手段。
public class Counter { private int count; public synchronized void increment() { count++; } public int getCount() { return count; } }
在这个示例中,我们定义了一个计数器类Counter,并使用synchronized关键字来实现线程安全。在increment()方法中,我们使用synchronized关键字来保证一次只有一个线程可以进入临界区执行操作,从而避免了多个线程同时访问count变量的问题。
原子性:
指一个操作要么完全执行成功,要么完全执行失败,不会出现部分执行的情况。为了实现原子性,可以使用原子类或者CAS(Compare-and-Swap)等机制。
public class AtomicIntegerDemo { private static AtomicInteger counter = new AtomicInteger(0); public static void main(String[] args) throws InterruptedException { Thread t1 = new Thread(() -> { for (int i = 0; i < 10000; i++) { counter.incrementAndGet(); } }); Thread t2 = new Thread(() -> { for (int i = 0; i < 10000; i++) { counter.incrementAndGet(); } }); t1.start(); t2.start(); t1.join(); t2.join(); System.out.println(counter.get()); } }
在这个示例中,我们使用AtomicInteger类来实现原子操作。在线程t1和t2中,我们使用incrementAndGet()方法对计数器进行原子性自增操作,从而避免了多个线程同时访问count变量的问题。
可见性:
指当一个线程修改了共享数据后,其他线程能够立即看到该修改。为了实现可见性,可以使用volatile关键字或者synchronized关键字等机制。
public class VolatileDemo { private static volatile boolean flag = false; public static void main(String[] args) throws InterruptedException { Thread readerThread = new Thread(() -> { while (!flag) { // do nothing } System.out.println("Flag is now true"); }); Thread writerThread = new Thread(() -> { flag = true; System.out.println("Flag is now true"); }); readerThread.start(); writerThread.start(); readerThread.join(); writerThread.join(); } }
在这个示例中,我们使用volatile关键字来实现可见性。在readerThread中,我们不断地循环检查flag变量的值;在writerThread中,我们将flag变量设为true,并打印输出。由于flag变量是volatile类型的,因此一旦writerThread修改了该变量的值,readerThread就能够立即看到修改。
2. 线程池:
线程池是一种用于管理和复用线程的机制,可以提高系统资源利用率和响应速度,避免了频繁创建和销毁线程的开销。Java中提供了ThreadPoolExecutor类和Executors工具类来实现线程池。
ThreadPoolExecutor类:
是Java中线程池的核心实现类,通过参数配置可以自定义线程池的大小、任务队列、拒绝策略等属性。
public class ThreadPoolExecutorDemo { public static void main(String[] args) { ExecutorService executorService = new ThreadPoolExecutor( 2, // corePoolSize 5, // maximumPoolSize 60, // keepAliveTime TimeUnit.SECONDS, // unit new LinkedBlockingQueue<Runnable>(10), // workQueue new ThreadPoolExecutor.CallerRunsPolicy() // handler ); for (int i = 1; i <= 20; i++) { final int taskId = i; executorService.execute(() -> { System.out.println("Task #" + taskId + " is running on thread " + Thread.currentThread().getName()); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } }); } executorService.shutdown(); } }
在这个示例中,我们使用ThreadPoolExecutor类来实现线程池,并设置corePoolSize为2、maximumPoolSize为5、keepAliveTime为60秒、workQueue为LinkedBlockingQueue(容量为10)、handler为CallerRunsPolicy。然后我们使用execute()方法向线程池中提交20个任务,每个任务都会打印输出当前的线程名并休眠2秒钟。最后调用shutdown()方法关闭线程池。
Executors工具类:
是Java中线程池的辅助类,提供了一些静态方法来创建常用类型的线程池。
public class ExecutorsDemo { public static void main(String[] args) { ExecutorService executorService = Executors.newFixedThreadPool(5); for (int i = 1; i <= 20; i++) { final int taskId = i; executorService.execute(() -> { System.out.println("Task #" + taskId + " is running on thread " + Thread.currentThread().getName()); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } }); } executorService.shutdown(); } }
在这个示例中,我们使用Executors工具类中的newFixedThreadPool()方法来创建固定大小为5的线程池,并使用execute()方法向线程池中提交20个任务,每个任务都会打印输出当前的线程名并休眠2秒钟。最后调用shutdown()方法关闭线程池。
总之,线程池是一种非常重要的并发编程机制,可以提高系统资源利用率和响应速度。通过ThreadPoolExecutor类和Executors工具类,我们可以方便地创建、配置和管理线程池,以及执行任务。
3. Lock接口:
Lock接口是Java中提供的一种显式锁机制,可以实现更细粒度的控制和管理。Lock接口提供了加锁和释放锁的方法,具有可重入性、公平性等特点,常用的实现类包括ReentrantLock、ReentrantReadWriteLock、StampedLock等。
ReentrantLock:
是Lock接口的一种实现方式,具有可重入性、公平性、可中断性等特点。
public class ReentrantLockDemo { private static final Lock lock = new ReentrantLock(); private static int count = 0; public static void main(String[] args) throws InterruptedException { Thread t1 = new Thread(() -> { for (int i = 0; i < 10000; i++) { increment(); } }); Thread t2 = new Thread(() -> { for (int i = 0; i < 10000; i++) { increment(); } }); t1.start(); t2.start(); t1.join(); t2.join(); System.out.println("Count: " + count); } private static void increment() { lock.lock(); try { count++; } finally { lock.unlock(); } } }
在这个示例中,我们使用ReentrantLock类来实现锁机制,并定义了一个计数器变量count。在increment()方法中,我们首先调用lock()方法获取锁;然后对计数器变量进行自增操作;最后调用unlock()方法释放锁。在main()方法中,我们创建两个线程t1和t2来分别执行increment()方法10000次,最终打印输出计数器变量count的值。
ReentrantReadWriteLock:
是Lock接口的另一种实现方式,具有读写分离、公平性等特点。适用于读操作远多于写操作的场景。
public class ReentrantReadWriteLockDemo { private static final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); private static final Lock readLock = lock.readLock(); private static final Lock writeLock = lock.writeLock(); private static int count = 0; public static void main(String[] args) throws InterruptedException { Thread t1 = new Thread(() -> { for (int i = 0; i < 10000; i++) { increment(); } }); Thread t2 = new Thread(() -> { for (int i = 0; i < 10000; i++) { get(); } }); t1.start(); t2.start(); t1.join(); t2.join(); System.out.println("Count: " + count); } private static void increment() { writeLock.lock(); try { count++; } finally { writeLock.unlock(); } } private static int get() { readLock.lock(); try { return count; } finally { readLock.unlock(); } } }
在这个示例中,我们使用ReentrantReadWriteLock类来实现读写锁机制,并定义了一个计数器变量count。在increment()方法中,我们首先调用writeLock()方法获取写锁;然后对计数器变量进行自增操作;最后调用writeUnlock()方法释放写锁。在get()方法中,我们首先调用readLock()方法获取读锁;然后返回计数器变量的值;最后调用readUnlock()方法释放读锁。在main()方法中,我们创建两个线程t1和t2来分别执行increment()和get()方法10000次,最终打印输出计数器变量count的值。
StampedLock:
是Lock接口的另一种实现方式,具有乐观锁和悲观锁等特点。适用于读操作频繁而写操作较少的场景。
public class StampedLockDemo { private static final StampedLock lock = new StampedLock(); private static int count = 0; public static void main(String[] args){ Thread t1 = new Thread(() -> { for (int i = 0; i < 10000; i++) { increment(); } }); Thread t2 = new Thread(() -> { for (int i = 0; i < 10000; i++) { get(); } }); t1.start(); t2.start(); t1.join(); t2.join(); System.out.println("Count: " + count); } private static void increment() { long stamp = lock.writeLock(); try { count++; } finally { lock.unlockWrite(stamp); } } private static int get() { long stamp = lock.tryOptimisticRead(); int c = count; if (!lock.validate(stamp)) { stamp = lock.readLock(); try { c = count; } finally { lock.unlockRead(stamp); } } return c; } }
在这个示例中,我们使用StampedLock类来实现乐观锁机制,并定义了一个计数器变量count。在increment()方法中,我们首先调用writeLock()方法获取写锁;然后对计数器变量进行自增操作;最后调用unlockWrite()方法释放写锁。在get()方法中,我们首先调用tryOptimisticRead()方法获取乐观读锁,并记录当前的版本号stamp和计数器变量的值c;然后判断版本号是否有效,如果无效则调用readLock()方法获取悲观读锁,并重新读取计数器变量的值;最后调用unlockRead()方法释放悲观读锁。在main()方法中,我们创建两个线程t1和t2来分别执行increment()和get()方法10000次,最终打印输出计数器变量count的值。
4. Condition接口:
Condition接口是与Lock接口配合使用的一种线程协作机制。它可以实现更细粒度的线程等待和通知,并且可以支持多个条件变量,比如对于生产者-消费者模型中的缓冲区,可以分别使用一个notFull和notEmpty条件变量来进行生产者和消费者之间的协作。
public class ConditionDemo { private static final Lock lock = new ReentrantLock(); private static final Condition notEmpty = lock.newCondition(); private static final Condition notFull = lock.newCondition(); private static final int CAPACITY = 10; private static final Queue<Integer> queue = new LinkedList<>(); public static void main(String[] args) throws InterruptedException { Thread t1 = new Thread(() -> { for (int i = 0; i < 20; i++) { try { produce(i); } catch (InterruptedException e) { e.printStackTrace(); } } }); Thread t2 = new Thread(() -> { for (int i = 0; i < 20; i++) { try { consume(); } catch (InterruptedException e) { e.printStackTrace(); } } }); t1.start(); t2.start(); t1.join(); t2.join(); } private static void produce(int value) throws InterruptedException { lock.lock(); try { while (queue.size() == CAPACITY) { notFull.await(); } queue.offer(value); System.out.println("Produced: " + value); notEmpty.signalAll(); } finally { lock.unlock(); } } private static void consume() throws InterruptedException { lock.lock(); try { while (queue.isEmpty()) { notEmpty.await(); } int value = queue.poll(); System.out.println("Consumed: " + value); notFull.signalAll(); } finally { lock.unlock(); } } }
在这个示例中,我们使用Lock接口的实现类ReentrantLock来提供锁机制,并使用newCondition()方法创建了两个条件变量notEmpty和notFull。然后定义了一个容量为10的队列queue,并编写了produce()方法和consume()方法来分别实现生产者和消费者的功能。在produce()方法中,首先调用lock()方法获取锁;然后判断队列是否已满,如果是则调用notFull.await()方法等待notFull条件变量的信号;否则将数据加入到队列中,打印输出生产的数据,并通过notEmpty.signalAll()方法通知等待notEmpty条件变量的其他线程。在consume()方法中,首先调用lock()方法获取锁;然后判断队列是否为空,如果是则调用notEmpty.await()方法等待notEmpty条件变量的信号;否则从队列中取出数据,打印输出消费的数据,并通过notFull.signalAll()方法通知等待notFull条件变量的其他线程。在main()方法中,我们创建两个线程t1和t2来分别执行生产者和消费者的功能,最终演示缓冲区中的数据生产和消费过程。
5. CAS(Compare-And-Swap)操作:
CAS操作是一种基于硬件指令级别的原子操作,可以实现非阻塞算法。它可以在多线程并发执行时保证数据的一致性和正确性。
CAS操作涉及到三个参数:需要更新的内存位置V、期望值A和新值B。当且仅当预期值A与内存位置V中的当前值相同时,才会将内存位置V中的值更新为新值B;否则,不进行任何操作。通过不断重试直至成功,从而保证了并发情况下的数据原子性和一致性。
public class CASDemo { private static final AtomicReference<String> atomicStr = new AtomicReference<>("Hello, World!"); public static void main(String[] args) throws InterruptedException { Thread t1 = new Thread(() -> { boolean swapped = false; while (!swapped) { String prev = atomicStr.get(); String next = prev.replace("World", "John"); swapped = atomicStr.compareAndSet(prev, next); } }); Thread t2 = new Thread(() -> { boolean swapped = false; while (!swapped) { String prev = atomicStr.get(); String next = prev.replace("World", "Mary"); swapped = atomicStr.compareAndSet(prev, next); } }); t1.start(); t2.start(); t1.join(); t2.join(); System.out.println(atomicStr.get()); } }
在这个示例中,我们使用AtomicReference类来实现CAS操作,并定义了一个初始字符串"Hello, World!“。在t1线程中,我们不断尝试将字符串中的"World"替换成"John”;在t2线程中,我们不断尝试将字符串中的"World"替换成"Mary"。通过compareAndSet()方法比较并更新字符串的值,直至成功,最终输出更新后的字符串。
6. 原子类:
原子类是Java中提供的一种线程安全机制,它封装了常见的原子操作,并保证了这些操作的原子性和可见性。原子类包括AtomicBoolean、AtomicInteger、AtomicLong等,可以用于实现非阻塞算法、锁机制等多种并发编程场景。
public class AtomicDemo { private static final AtomicInteger atomicInt = new AtomicInteger(0); public static void main(String[] args) throws InterruptedException { Thread t1 = new Thread(() -> { for (int i = 0; i < 10000; i++) { atomicInt.incrementAndGet(); } }); Thread t2 = new Thread(() -> { for (int i = 0; i < 10000; i++) { atomicInt.addAndGet(2); } }); t1.start(); t2.start(); t1.join(); t2.join(); System.out.println("Count: " + atomicInt.get()); } }
在这个示例中,我们使用AtomicInteger类来实现原子操作,并定义了一个初始值为0的计数器atomicInt。在t1线程中,我们调用incrementAndGet()方法对计数器进行自增;在t2线程中,我们调用addAndGet()方法对计数器进行自增2。最终输出计数器的值。
除了AtomicInteger,Java还提供了其他原子类如AtomicBoolean、AtomicLong等,并且可以通过自定义的方式实现自己的原子类。这些原子类可以用于实现非阻塞算法、锁机制等多种并发编程场景,是Java中常见的一种线程安全机制。
7. 并发容器:
并发容器是Java中提供的一种线程安全机制,它封装了常见的容器类,并保证了这些容器类的线程安全性。并发容器包括ConcurrentHashMap、CopyOnWriteArrayList等,可以用于实现多线程并发访问数据的场景。
public class ConcurrentHashMapDemo { public static void main(String[] args) throws InterruptedException { Map<String, Integer> map = new ConcurrentHashMap<>(); Thread t1 = new Thread(() -> { for (int i = 0; i < 10000; i++) { map.put("Key" + i, i); } }); Thread t2 = new Thread(() -> { for (int i = 0; i < 10000; i++) { map.remove("Key" + i); } }); t1.start(); t2.start(); t1.join(); t2.join(); System.out.println("Size: " + map.size()); } }
在这个示例中,我们使用ConcurrentHashMap类来实现并发容器,并定义了一个空的map对象。在t1线程中,我们利用put()方法向map中添加10000个键值对;在t2线程中,我们利用remove()方法从map中移除10000个键值对。最终输出map的大小。
除了ConcurrentHashMap,Java还提供了其他并发容器如CopyOnWriteArrayList、ConcurrentLinkedQueue等,并且可以通过自定义的方式实现自己的并发容器。这些并发容器可以用于实现多线程并发访问数据的场景,是Java中常见的一种线程安全机制。
8. CountDownLatch:
CountDownLatch是Java中提供的一种线程同步机制,用来控制线程的执行顺序和同步。它通过计数器来实现,当计数器的值减为0时,所有等待线程会被释放,继续执行后续操作。
public class CountDownLatchDemo { private static final int THREAD_COUNT = 5; private static final CountDownLatch startLatch = new CountDownLatch(1); private static final CountDownLatch endLatch = new CountDownLatch(THREAD_COUNT); public static void main(String[] args) throws InterruptedException { for (int i = 0; i < THREAD_COUNT; i++) { Thread t = new Thread(() -> { try { System.out.println(Thread.currentThread().getName() + " waiting to start..."); startLatch.await(); System.out.println(Thread.currentThread().getName() + " started"); Thread.sleep((long) (Math.random() * 10000)); System.out.println(Thread.currentThread().getName() + " finished"); } catch (InterruptedException e) { e.printStackTrace(); } finally { endLatch.countDown(); } }, "Thread-" + i); t.start(); } Thread.sleep(3000); System.out.println("All threads ready, start now!"); startLatch.countDown(); endLatch.await(); System.out.println("All threads finished!"); } }
在这个示例中,我们使用CountDownLatch类来实现线程同步,并定义了两个计数器startLatch和endLatch。在main方法中,我们创建了5个线程,每个线程都会等待startLatch的计数器值减为0后才开始执行;执行完毕后,将endLatch的计数器值减1。在主线程中,我们等待3秒钟后,将startLatch的计数器值减为0,从而使5个等待的线程开始执行;然后等待所有线程的执行结束,即当endLatch的计数器值减为0时输出"All threads finished!"。
9. CyclicBarrier:
CyclicBarrier是Java中提供的一种线程同步机制,它也用来控制线程的执行顺序和同步。与CountDownLatch不同的是,CyclicBarrier可以重复使用,即在计数器值减为0后可以自动重置计数器,从而继续等待下一轮任务的到来。
public class CyclicBarrierDemo { private static final int THREAD_COUNT = 5; private static final CyclicBarrier barrier = new CyclicBarrier(THREAD_COUNT, () -> { System.out.println("All threads arrived at barrier!"); }); public static void main(String[] args) throws InterruptedException { for (int i = 0; i < THREAD_COUNT; i++) { Thread t = new Thread(() -> { try { System.out.println(Thread.currentThread().getName() + " working..."); Thread.sleep((long) (Math.random() * 10000)); System.out.println(Thread.currentThread().getName() + " arrived at barrier!"); barrier.await(); System.out.println(Thread.currentThread().getName() + " continue working..."); Thread.sleep((long) (Math.random() * 10000)); System.out.println(Thread.currentThread().getName() + " finished"); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } }, "Thread-" + i); t.start(); } } }
在这个示例中,我们使用CyclicBarrier类来实现线程同步,并定义了一个计数器barrier。在main方法中,我们创建了5个线程,每个线程都会先工作一段时间,然后等待其他线程到达barrier;当所有线程都到达barrier时,会执行barrier的回调函数,并将计数器值重置为初始值。之后,每个线程继续工作一段时间,最终输出"finished"。
10. Semaphore:
Semaphore是Java中提供的一种线程同步机制,用来控制资源的访问数量。它通过内部维护的计数器来实现,当计数器的值大于0时,允许访问资源;否则,需要等待其他线程释放资源后才能访问。
public class SemaphoreDemo { private static final int THREAD_COUNT = 10; private static final Semaphore semaphore = new Semaphore(5); public static void main(String[] args) throws InterruptedException { for (int i = 0; i < THREAD_COUNT; i++) { Thread t = new Thread(() -> { try { semaphore.acquire(); System.out.println(Thread.currentThread().getName() + " acquiring resource..."); Thread.sleep((long) (Math.random() * 10000)); System.out.println(Thread.currentThread().getName() + " releasing resource..."); semaphore.release(); } catch (InterruptedException e) { e.printStackTrace(); } }, "Thread-" + i); t.start(); } } }
在这个示例中,我们使用Semaphore类来实现线程同步,并定义了一个计数器semaphore。在main方法中,我们创建了10个线程,每个线程都会尝试获取semaphore的许可证;当许可证已被占用时,线程会被阻塞,直到有其他线程释放许可证。然后,每个线程会工作一段时间,最终释放许可证。
11. Future接口和CompletableFuture类:
Future接口是Java中提供的一种异步编程模型,它可以对异步任务进行处理,并在任务完成后获取结果。Future接口提供了一系列方法用于查询任务是否完成、等待任务完成以及获取任务执行结果。
CompletableFuture类是Java 8中新增的一个类,它继承自Future接口,并提供了更加简洁易用的异步编程方式。CompletableFuture类支持链式调用、组合多个子任务、异常处理等特性,可以方便地实现复杂的异步编程逻辑。
public class CompletableFutureDemo { public static void main(String[] args) throws InterruptedException, ExecutionException { CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { System.out.println("Task started."); try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Task finished."); return "Hello, World!"; }); System.out.println("Waiting for the result..."); String result = future.get(); System.out.println("Result: " + result); } }
在这个示例中,我们使用CompletableFuture类来实现异步编程,并定义了一个supplyAsync()方法,它会在另一个线程中执行任务并返回计算结果。在main方法中,我们创建了一个CompletableFuture对象future,然后调用get()方法等待任务完成,并获取计算结果。最终输出结果"Hello, World!"。
除了supplyAsync()方法,CompletableFuture类还提供了一系列方法用于处理异步任务,例如thenApply()、thenAccept()、thenRun()、thenCompose()等。这些方法都支持链式调用,可以方便地实现异步编程逻辑。