JUC并发编程

简介: JUC并发编程


1. 并发基础概念:


并发编程是指多个线程同时执行程序的情况。在并发编程中,由于多个线程可能同时访问共享资源,因此需要考虑线程同步、原子性、可见性等问题。


线程安全:


指在多线程环境下,对共享数据进行访问时,不会出现数据污染或不一致的问题。为了实现线程安全,可以使用锁机制或者其他并发控制手段。

public class Counter {
   private int count;
   public synchronized void increment() {
      count++;
   }
   public int getCount() {
      return count;
   }
}

在这个示例中,我们定义了一个计数器类Counter,并使用synchronized关键字来实现线程安全。在increment()方法中,我们使用synchronized关键字来保证一次只有一个线程可以进入临界区执行操作,从而避免了多个线程同时访问count变量的问题。


原子性

指一个操作要么完全执行成功,要么完全执行失败,不会出现部分执行的情况。为了实现原子性,可以使用原子类或者CAS(Compare-and-Swap)等机制。


public class AtomicIntegerDemo {
   private static AtomicInteger counter = new AtomicInteger(0);
   public static void main(String[] args) throws InterruptedException {
      Thread t1 = new Thread(() -> {
         for (int i = 0; i < 10000; i++) {
            counter.incrementAndGet();
         }
      });
      Thread t2 = new Thread(() -> {
         for (int i = 0; i < 10000; i++) {
            counter.incrementAndGet();
         }
      });
      t1.start();
      t2.start();
      t1.join();
      t2.join();
      System.out.println(counter.get());
   }
}

在这个示例中,我们使用AtomicInteger类来实现原子操作。在线程t1和t2中,我们使用incrementAndGet()方法对计数器进行原子性自增操作,从而避免了多个线程同时访问count变量的问题。


可见性:


指当一个线程修改了共享数据后,其他线程能够立即看到该修改。为了实现可见性,可以使用volatile关键字或者synchronized关键字等机制。

public class VolatileDemo {
   private static volatile boolean flag = false;
   public static void main(String[] args) throws InterruptedException {
      Thread readerThread = new Thread(() -> {
         while (!flag) {
            // do nothing
         }
         System.out.println("Flag is now true");
      });
      Thread writerThread = new Thread(() -> {
         flag = true;
         System.out.println("Flag is now true");
      });
      readerThread.start();
      writerThread.start();
      readerThread.join();
      writerThread.join();
   }
}


在这个示例中,我们使用volatile关键字来实现可见性。在readerThread中,我们不断地循环检查flag变量的值;在writerThread中,我们将flag变量设为true,并打印输出。由于flag变量是volatile类型的,因此一旦writerThread修改了该变量的值,readerThread就能够立即看到修改。


2. 线程池:


线程池是一种用于管理和复用线程的机制,可以提高系统资源利用率和响应速度,避免了频繁创建和销毁线程的开销。Java中提供了ThreadPoolExecutor类和Executors工具类来实现线程池。


ThreadPoolExecutor类:


是Java中线程池的核心实现类,通过参数配置可以自定义线程池的大小、任务队列、拒绝策略等属性。

public class ThreadPoolExecutorDemo {
   public static void main(String[] args) {
      ExecutorService executorService = new ThreadPoolExecutor(
            2, // corePoolSize
            5, // maximumPoolSize
            60, // keepAliveTime
            TimeUnit.SECONDS, // unit
            new LinkedBlockingQueue<Runnable>(10), // workQueue
            new ThreadPoolExecutor.CallerRunsPolicy() // handler
      );
      for (int i = 1; i <= 20; i++) {
         final int taskId = i;
         executorService.execute(() -> {
            System.out.println("Task #" + taskId + " is running on thread " + Thread.currentThread().getName());
            try {
               Thread.sleep(2000);
            } catch (InterruptedException e) {
               e.printStackTrace();
            }
         });
      }
      executorService.shutdown();
   }
}


在这个示例中,我们使用ThreadPoolExecutor类来实现线程池,并设置corePoolSize为2、maximumPoolSize为5、keepAliveTime为60秒、workQueue为LinkedBlockingQueue(容量为10)、handler为CallerRunsPolicy。然后我们使用execute()方法向线程池中提交20个任务,每个任务都会打印输出当前的线程名并休眠2秒钟。最后调用shutdown()方法关闭线程池。


Executors工具类:


是Java中线程池的辅助类,提供了一些静态方法来创建常用类型的线程池。

public class ExecutorsDemo {
   public static void main(String[] args) {
      ExecutorService executorService = Executors.newFixedThreadPool(5);
      for (int i = 1; i <= 20; i++) {
         final int taskId = i;
         executorService.execute(() -> {
            System.out.println("Task #" + taskId + " is running on thread " + Thread.currentThread().getName());
            try {
               Thread.sleep(2000);
            } catch (InterruptedException e) {
               e.printStackTrace();
            }
         });
      }
      executorService.shutdown();
   }
}


在这个示例中,我们使用Executors工具类中的newFixedThreadPool()方法来创建固定大小为5的线程池,并使用execute()方法向线程池中提交20个任务,每个任务都会打印输出当前的线程名并休眠2秒钟。最后调用shutdown()方法关闭线程池。


总之,线程池是一种非常重要的并发编程机制,可以提高系统资源利用率和响应速度。通过ThreadPoolExecutor类和Executors工具类,我们可以方便地创建、配置和管理线程池,以及执行任务。


3. Lock接口:


Lock接口是Java中提供的一种显式锁机制,可以实现更细粒度的控制和管理。Lock接口提供了加锁和释放锁的方法,具有可重入性、公平性等特点,常用的实现类包括ReentrantLock、ReentrantReadWriteLock、StampedLock等。


ReentrantLock:


是Lock接口的一种实现方式,具有可重入性、公平性、可中断性等特点。

public class ReentrantLockDemo {
   private static final Lock lock = new ReentrantLock();
   private static int count = 0;
   public static void main(String[] args) throws InterruptedException {
      Thread t1 = new Thread(() -> {
         for (int i = 0; i < 10000; i++) {
            increment();
         }
      });
      Thread t2 = new Thread(() -> {
         for (int i = 0; i < 10000; i++) {
            increment();
         }
      });
      t1.start();
      t2.start();
      t1.join();
      t2.join();
      System.out.println("Count: " + count);
   }
   private static void increment() {
      lock.lock();
      try {
         count++;
      } finally {
         lock.unlock();
      }
   }
}


在这个示例中,我们使用ReentrantLock类来实现锁机制,并定义了一个计数器变量count。在increment()方法中,我们首先调用lock()方法获取锁;然后对计数器变量进行自增操作;最后调用unlock()方法释放锁。在main()方法中,我们创建两个线程t1和t2来分别执行increment()方法10000次,最终打印输出计数器变量count的值。


ReentrantReadWriteLock:


是Lock接口的另一种实现方式,具有读写分离、公平性等特点。适用于读操作远多于写操作的场景。

public class ReentrantReadWriteLockDemo {
   private static final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
   private static final Lock readLock = lock.readLock();
   private static final Lock writeLock = lock.writeLock();
   private static int count = 0;
   public static void main(String[] args) throws InterruptedException {
      Thread t1 = new Thread(() -> {
         for (int i = 0; i < 10000; i++) {
            increment();
         }
      });
      Thread t2 = new Thread(() -> {
         for (int i = 0; i < 10000; i++) {
            get();
         }
      });
      t1.start();
      t2.start();
      t1.join();
      t2.join();
      System.out.println("Count: " + count);
   }
   private static void increment() {
      writeLock.lock();
      try {
         count++;
      } finally {
         writeLock.unlock();
      }
   }
   private static int get() {
      readLock.lock();
      try {
         return count;
      } finally {
         readLock.unlock();
      }
   }
}


在这个示例中,我们使用ReentrantReadWriteLock类来实现读写锁机制,并定义了一个计数器变量count。在increment()方法中,我们首先调用writeLock()方法获取写锁;然后对计数器变量进行自增操作;最后调用writeUnlock()方法释放写锁。在get()方法中,我们首先调用readLock()方法获取读锁;然后返回计数器变量的值;最后调用readUnlock()方法释放读锁。在main()方法中,我们创建两个线程t1和t2来分别执行increment()和get()方法10000次,最终打印输出计数器变量count的值。


StampedLock:


是Lock接口的另一种实现方式,具有乐观锁和悲观锁等特点。适用于读操作频繁而写操作较少的场景。

public class StampedLockDemo {
   private static final StampedLock lock = new StampedLock();
   private static int count = 0;
   public static void main(String[] args){
      Thread t1 = new Thread(() -> {
         for (int i = 0; i < 10000; i++) {
            increment();
         }
      });
      Thread t2 = new Thread(() -> {
         for (int i = 0; i < 10000; i++) {
            get();
         }
      });
      t1.start();
      t2.start();
      t1.join();
      t2.join();
      System.out.println("Count: " + count);
   }
   private static void increment() {
      long stamp = lock.writeLock();
      try {
         count++;
      } finally {
         lock.unlockWrite(stamp);
      }
   }
   private static int get() {
      long stamp = lock.tryOptimisticRead();
      int c = count;
      if (!lock.validate(stamp)) {
         stamp = lock.readLock();
         try {
            c = count;
         } finally {
            lock.unlockRead(stamp);
         }
      }
      return c;
   }
}


在这个示例中,我们使用StampedLock类来实现乐观锁机制,并定义了一个计数器变量count。在increment()方法中,我们首先调用writeLock()方法获取写锁;然后对计数器变量进行自增操作;最后调用unlockWrite()方法释放写锁。在get()方法中,我们首先调用tryOptimisticRead()方法获取乐观读锁,并记录当前的版本号stamp和计数器变量的值c;然后判断版本号是否有效,如果无效则调用readLock()方法获取悲观读锁,并重新读取计数器变量的值;最后调用unlockRead()方法释放悲观读锁。在main()方法中,我们创建两个线程t1和t2来分别执行increment()和get()方法10000次,最终打印输出计数器变量count的值。


4. Condition接口:


Condition接口是与Lock接口配合使用的一种线程协作机制。它可以实现更细粒度的线程等待和通知,并且可以支持多个条件变量,比如对于生产者-消费者模型中的缓冲区,可以分别使用一个notFull和notEmpty条件变量来进行生产者和消费者之间的协作。

public class ConditionDemo {
   private static final Lock lock = new ReentrantLock();
   private static final Condition notEmpty = lock.newCondition();
   private static final Condition notFull = lock.newCondition();
   private static final int CAPACITY = 10;
   private static final Queue<Integer> queue = new LinkedList<>();
   public static void main(String[] args) throws InterruptedException {
      Thread t1 = new Thread(() -> {
         for (int i = 0; i < 20; i++) {
            try {
               produce(i);
            } catch (InterruptedException e) {
               e.printStackTrace();
            }
         }
      });
      Thread t2 = new Thread(() -> {
         for (int i = 0; i < 20; i++) {
            try {
               consume();
            } catch (InterruptedException e) {
               e.printStackTrace();
            }
         }
      });
      t1.start();
      t2.start();
      t1.join();
      t2.join();
   }
   private static void produce(int value) throws InterruptedException {
      lock.lock();
      try {
         while (queue.size() == CAPACITY) {
            notFull.await();
         }
         queue.offer(value);
         System.out.println("Produced: " + value);
         notEmpty.signalAll();
      } finally {
         lock.unlock();
      }
   }
   private static void consume() throws InterruptedException {
      lock.lock();
      try {
         while (queue.isEmpty()) {
            notEmpty.await();
         }
         int value = queue.poll();
         System.out.println("Consumed: " + value);
         notFull.signalAll();
      } finally {
         lock.unlock();
      }
   }
}


在这个示例中,我们使用Lock接口的实现类ReentrantLock来提供锁机制,并使用newCondition()方法创建了两个条件变量notEmpty和notFull。然后定义了一个容量为10的队列queue,并编写了produce()方法和consume()方法来分别实现生产者和消费者的功能。在produce()方法中,首先调用lock()方法获取锁;然后判断队列是否已满,如果是则调用notFull.await()方法等待notFull条件变量的信号;否则将数据加入到队列中,打印输出生产的数据,并通过notEmpty.signalAll()方法通知等待notEmpty条件变量的其他线程。在consume()方法中,首先调用lock()方法获取锁;然后判断队列是否为空,如果是则调用notEmpty.await()方法等待notEmpty条件变量的信号;否则从队列中取出数据,打印输出消费的数据,并通过notFull.signalAll()方法通知等待notFull条件变量的其他线程。在main()方法中,我们创建两个线程t1和t2来分别执行生产者和消费者的功能,最终演示缓冲区中的数据生产和消费过程。


5. CAS(Compare-And-Swap)操作:


CAS操作是一种基于硬件指令级别的原子操作,可以实现非阻塞算法。它可以在多线程并发执行时保证数据的一致性和正确性。


CAS操作涉及到三个参数:需要更新的内存位置V、期望值A和新值B。当且仅当预期值A与内存位置V中的当前值相同时,才会将内存位置V中的值更新为新值B;否则,不进行任何操作。通过不断重试直至成功,从而保证了并发情况下的数据原子性和一致性。

public class CASDemo {
   private static final AtomicReference<String> atomicStr = new AtomicReference<>("Hello, World!");
   public static void main(String[] args) throws InterruptedException {
      Thread t1 = new Thread(() -> {
         boolean swapped = false;
         while (!swapped) {
            String prev = atomicStr.get();
            String next = prev.replace("World", "John");
            swapped = atomicStr.compareAndSet(prev, next);
         }
      });
      Thread t2 = new Thread(() -> {
         boolean swapped = false;
         while (!swapped) {
            String prev = atomicStr.get();
            String next = prev.replace("World", "Mary");
            swapped = atomicStr.compareAndSet(prev, next);
         }
      });
      t1.start();
      t2.start();
      t1.join();
      t2.join();
      System.out.println(atomicStr.get());
   }
}


在这个示例中,我们使用AtomicReference类来实现CAS操作,并定义了一个初始字符串"Hello, World!“。在t1线程中,我们不断尝试将字符串中的"World"替换成"John”;在t2线程中,我们不断尝试将字符串中的"World"替换成"Mary"。通过compareAndSet()方法比较并更新字符串的值,直至成功,最终输出更新后的字符串。


6. 原子类:


原子类是Java中提供的一种线程安全机制,它封装了常见的原子操作,并保证了这些操作的原子性和可见性。原子类包括AtomicBoolean、AtomicInteger、AtomicLong等,可以用于实现非阻塞算法、锁机制等多种并发编程场景。

public class AtomicDemo {
   private static final AtomicInteger atomicInt = new AtomicInteger(0);
   public static void main(String[] args) throws InterruptedException {
      Thread t1 = new Thread(() -> {
         for (int i = 0; i < 10000; i++) {
            atomicInt.incrementAndGet();
         }
      });
      Thread t2 = new Thread(() -> {
         for (int i = 0; i < 10000; i++) {
            atomicInt.addAndGet(2);
         }
      });
      t1.start();
      t2.start();
      t1.join();
      t2.join();
      System.out.println("Count: " + atomicInt.get());
   }
}


在这个示例中,我们使用AtomicInteger类来实现原子操作,并定义了一个初始值为0的计数器atomicInt。在t1线程中,我们调用incrementAndGet()方法对计数器进行自增;在t2线程中,我们调用addAndGet()方法对计数器进行自增2。最终输出计数器的值。


除了AtomicInteger,Java还提供了其他原子类如AtomicBoolean、AtomicLong等,并且可以通过自定义的方式实现自己的原子类。这些原子类可以用于实现非阻塞算法、锁机制等多种并发编程场景,是Java中常见的一种线程安全机制。


7. 并发容器:


并发容器是Java中提供的一种线程安全机制,它封装了常见的容器类,并保证了这些容器类的线程安全性。并发容器包括ConcurrentHashMap、CopyOnWriteArrayList等,可以用于实现多线程并发访问数据的场景。

public class ConcurrentHashMapDemo {
   public static void main(String[] args) throws InterruptedException {
      Map<String, Integer> map = new ConcurrentHashMap<>();
      Thread t1 = new Thread(() -> {
         for (int i = 0; i < 10000; i++) {
            map.put("Key" + i, i);
         }
      });
      Thread t2 = new Thread(() -> {
         for (int i = 0; i < 10000; i++) {
            map.remove("Key" + i);
         }
      });
      t1.start();
      t2.start();
      t1.join();
      t2.join();
      System.out.println("Size: " + map.size());
   }
}


在这个示例中,我们使用ConcurrentHashMap类来实现并发容器,并定义了一个空的map对象。在t1线程中,我们利用put()方法向map中添加10000个键值对;在t2线程中,我们利用remove()方法从map中移除10000个键值对。最终输出map的大小。


除了ConcurrentHashMap,Java还提供了其他并发容器如CopyOnWriteArrayList、ConcurrentLinkedQueue等,并且可以通过自定义的方式实现自己的并发容器。这些并发容器可以用于实现多线程并发访问数据的场景,是Java中常见的一种线程安全机制。


8. CountDownLatch:


CountDownLatch是Java中提供的一种线程同步机制,用来控制线程的执行顺序和同步。它通过计数器来实现,当计数器的值减为0时,所有等待线程会被释放,继续执行后续操作。

public class CountDownLatchDemo {
   private static final int THREAD_COUNT = 5;
   private static final CountDownLatch startLatch = new CountDownLatch(1);
   private static final CountDownLatch endLatch = new CountDownLatch(THREAD_COUNT);
   public static void main(String[] args) throws InterruptedException {
      for (int i = 0; i < THREAD_COUNT; i++) {
         Thread t = new Thread(() -> {
            try {
               System.out.println(Thread.currentThread().getName() + " waiting to start...");
               startLatch.await();
               System.out.println(Thread.currentThread().getName() + " started");
               Thread.sleep((long) (Math.random() * 10000));
               System.out.println(Thread.currentThread().getName() + " finished");
            } catch (InterruptedException e) {
               e.printStackTrace();
            } finally {
               endLatch.countDown();
            }
         }, "Thread-" + i);
         t.start();
      }
      Thread.sleep(3000);
      System.out.println("All threads ready, start now!");
      startLatch.countDown();
      endLatch.await();
      System.out.println("All threads finished!");
   }
}


在这个示例中,我们使用CountDownLatch类来实现线程同步,并定义了两个计数器startLatch和endLatch。在main方法中,我们创建了5个线程,每个线程都会等待startLatch的计数器值减为0后才开始执行;执行完毕后,将endLatch的计数器值减1。在主线程中,我们等待3秒钟后,将startLatch的计数器值减为0,从而使5个等待的线程开始执行;然后等待所有线程的执行结束,即当endLatch的计数器值减为0时输出"All threads finished!"。


9. CyclicBarrier:


CyclicBarrier是Java中提供的一种线程同步机制,它也用来控制线程的执行顺序和同步。与CountDownLatch不同的是,CyclicBarrier可以重复使用,即在计数器值减为0后可以自动重置计数器,从而继续等待下一轮任务的到来。

public class CyclicBarrierDemo {
   private static final int THREAD_COUNT = 5;
   private static final CyclicBarrier barrier = new CyclicBarrier(THREAD_COUNT, () -> {
      System.out.println("All threads arrived at barrier!");
   });
   public static void main(String[] args) throws InterruptedException {
      for (int i = 0; i < THREAD_COUNT; i++) {
         Thread t = new Thread(() -> {
            try {
               System.out.println(Thread.currentThread().getName() + " working...");
               Thread.sleep((long) (Math.random() * 10000));
               System.out.println(Thread.currentThread().getName() + " arrived at barrier!");
               barrier.await();
               System.out.println(Thread.currentThread().getName() + " continue working...");
               Thread.sleep((long) (Math.random() * 10000));
               System.out.println(Thread.currentThread().getName() + " finished");
            } catch (InterruptedException | BrokenBarrierException e) {
               e.printStackTrace();
            }
         }, "Thread-" + i);
         t.start();
      }
   }
}


在这个示例中,我们使用CyclicBarrier类来实现线程同步,并定义了一个计数器barrier。在main方法中,我们创建了5个线程,每个线程都会先工作一段时间,然后等待其他线程到达barrier;当所有线程都到达barrier时,会执行barrier的回调函数,并将计数器值重置为初始值。之后,每个线程继续工作一段时间,最终输出"finished"。


10. Semaphore:


Semaphore是Java中提供的一种线程同步机制,用来控制资源的访问数量。它通过内部维护的计数器来实现,当计数器的值大于0时,允许访问资源;否则,需要等待其他线程释放资源后才能访问。

public class SemaphoreDemo {
   private static final int THREAD_COUNT = 10;
   private static final Semaphore semaphore = new Semaphore(5);
   public static void main(String[] args) throws InterruptedException {
      for (int i = 0; i < THREAD_COUNT; i++) {
         Thread t = new Thread(() -> {
            try {
               semaphore.acquire();
               System.out.println(Thread.currentThread().getName() + " acquiring resource...");
               Thread.sleep((long) (Math.random() * 10000));
               System.out.println(Thread.currentThread().getName() + " releasing resource...");
               semaphore.release();
            } catch (InterruptedException e) {
               e.printStackTrace();
            }
         }, "Thread-" + i);
         t.start();
      }
   }
}


在这个示例中,我们使用Semaphore类来实现线程同步,并定义了一个计数器semaphore。在main方法中,我们创建了10个线程,每个线程都会尝试获取semaphore的许可证;当许可证已被占用时,线程会被阻塞,直到有其他线程释放许可证。然后,每个线程会工作一段时间,最终释放许可证。


11. Future接口和CompletableFuture类:


Future接口是Java中提供的一种异步编程模型,它可以对异步任务进行处理,并在任务完成后获取结果。Future接口提供了一系列方法用于查询任务是否完成、等待任务完成以及获取任务执行结果。


CompletableFuture类是Java 8中新增的一个类,它继承自Future接口,并提供了更加简洁易用的异步编程方式。CompletableFuture类支持链式调用、组合多个子任务、异常处理等特性,可以方便地实现复杂的异步编程逻辑。

public class CompletableFutureDemo {
   public static void main(String[] args) throws InterruptedException, ExecutionException {
      CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
         System.out.println("Task started.");
         try {
            Thread.sleep(5000);
         } catch (InterruptedException e) {
            e.printStackTrace();
         }
         System.out.println("Task finished.");
         return "Hello, World!";
      });
      System.out.println("Waiting for the result...");
      String result = future.get();
      System.out.println("Result: " + result);
   }
}


在这个示例中,我们使用CompletableFuture类来实现异步编程,并定义了一个supplyAsync()方法,它会在另一个线程中执行任务并返回计算结果。在main方法中,我们创建了一个CompletableFuture对象future,然后调用get()方法等待任务完成,并获取计算结果。最终输出结果"Hello, World!"。


除了supplyAsync()方法,CompletableFuture类还提供了一系列方法用于处理异步任务,例如thenApply()、thenAccept()、thenRun()、thenCompose()等。这些方法都支持链式调用,可以方便地实现异步编程逻辑。

相关文章
|
6月前
|
安全 Java 编译器
高并发编程之什么是 JUC
高并发编程之什么是 JUC
52 1
|
6月前
|
缓存 Java 编译器
JUC 并发编程之JMM
Java内存模型是Java虚拟机(JVM)规范中定义的一组规则,用于屏蔽各种硬件和操作系统的内存访问差异,保证多线程情况下程序的正确执行。Java内存模型规定了线程之间如何交互以及线程和内存之间的关系。它主要解决的问题是可见性、原子性和有序性。
|
5月前
|
安全 算法 Java
|
6月前
|
安全 Java
JUC并发编程之原子类
并发编程是现代计算机应用中不可或缺的一部分,而在并发编程中,处理共享资源的并发访问是一个重要的问题。为了避免多线程访问共享资源时出现竞态条件(Race Condition)等问题,Java提供了一组原子类(Atomic Classes)来支持线程安全的操作。
|
安全 Java 调度
JUC并发编程(上)
JUC并发编程(上)
68 0
|
并行计算 Java 应用服务中间件
JUC并发编程超详细详解篇(一)
JUC并发编程超详细详解篇
1631 1
JUC并发编程超详细详解篇(一)
|
存储 缓存 监控
JUC并发编程(下)
JUC并发编程(下)
39 0
|
Web App开发 安全 Java
JUC高并发编程(一)——JUC基础知识
JUC高并发编程(一)——JUC基础知识
138 0
|
存储 SQL 缓存
JUC 并发编程学习笔记(总)
JUC 并发编程学习笔记(总)
99 0
JUC 并发编程学习笔记(总)
|
安全 Java 编译器
JUC 并发编程
JUC 并发编程