Java 编程问题:十一、并发-深入探索3https://developer.aliyun.com/article/1426167
221 原子变量
通过Runnable
计算从 1 到 1000000 的所有数字的简单方法如下所示:
public class Incrementator implements Runnable { public [static] int count = 0; @Override public void run() { count++; } public int getCount() { return count; } }
让我们旋转五个线程,同时递增count
变量:
Incrementator nonAtomicInc = new Incrementator(); ExecutorService executor = Executors.newFixedThreadPool(5); for (int i = 0; i < 1 _000_000; i++) { executor.execute(nonAtomicInc); }
但是,如果我们多次运行此代码,会得到不同的结果,如下所示:
997776, 997122, 997681 ...
所以,为什么我们不能得到预期的结果,1000000?原因是count++
不是原子操作/动作。它由三个原子字节码指令组成:
iload_1 iinc 1, 1 istore_1
在一个线程中,读取count
值并逐个递增,另一个线程读取较旧的值,导致错误的结果。在多线程应用中,调度器可以停止在这些字节码指令之间执行当前线程,并启动一个新线程,该线程在同一个变量上工作。我们可以通过同步来修复问题,或者通过原子变量来更好地解决问题。
原子变量类在java.util.concurrent.atomic
中可用。它们是将争用范围限制为单个变量的包装类;它们比 Java 同步轻量级得多,基于 CAS(简称比较交换):现代 CPU 支持这种技术,它将给定内存位置的内容与给定值进行比较,如果当前值等于预期值,则更新为新值。主要是以类似于volatile
的无锁方式影响单个值的原子复合作用。最常用的原子变量是标量:
AtomicInteger
AtomicLong
AtomicBoolean
AtomicReference
并且,以下是针对数组的:
AtomicIntegerArray
AtomicLongArray
AtomicReferenceArray
让我们通过AtomicInteger
重写我们的示例:
public class AtomicIncrementator implements Runnable { public static AtomicInteger count = new AtomicInteger(); @Override public void run() { count.incrementAndGet(); } public int getCount() { return count.get(); } }
注意,我们写的不是count++
,而是count.incrementAndGet()
。这只是AtomicInteger
提供的方法之一。此方法以原子方式递增变量并返回新值。这一次,count
将是 1000000。
下表列出了几种常用的AtomicInteger
方法。左栏包含方法,右栏包含非原子含义:
AtomicInteger ai = new AtomicInteger(0); // atomic int i = 0; // non-atomic // and int q = 5; int r; // and int e = 0; boolean b;
原子操作 | 非原子对应物 |
r = ai.get(); |
r = i; |
ai.set(q); |
i = q; |
r = ai.incrementAndGet(); |
r = ++i; |
r = ai.getAndIncrement(); |
r = i++; |
r = ai.decrementAndGet(); |
r = --i; |
r = ai.getAndDecrement(); |
r = i--; |
r = ai.addAndGet(q); |
i = i + q; r = i; |
r = ai.getAndAdd(q); |
r = i; i = i + q; |
r = ai.getAndSet(q); |
r = i; i = q; |
b = ai.compareAndSet(e, q); |
if (i == e) { i = q; return true; } else { return false; } |
让我们通过原子操作解决几个问题:
- 通过
updateAndGet(IntUnaryOperator updateFunction)
更新数组元素:
// [9, 16, 4, 25] AtomicIntegerArray atomicArray = new AtomicIntegerArray(new int[] {3, 4, 2, 5}); for (int i = 0; i < atomicArray.length(); i++) { atomicArray.updateAndGet(i, elem -> elem * elem); }
- 通过
updateAndGet(IntUnaryOperator updateFunction)
更新单个整数:
// 15 AtomicInteger nr = new AtomicInteger(3); int result = nr.updateAndGet(x -> 5 * x);
- 通过
accumulateAndGet(int x, IntBinaryOperator accumulatorFunction)
更新单个整数:
// 15 AtomicInteger nr = new AtomicInteger(3); // x = 3, y = 5 int result = nr.accumulateAndGet(5, (x, y) -> x * y);
- 通过
addAndGet(int delta)
更新单个整数:
// 7 AtomicInteger nr = new AtomicInteger(3); int result = nr.addAndGet(4);
- 通过
compareAndSet(int expectedValue, int newValue)
更新单个整数:
// 5, true AtomicInteger nr = new AtomicInteger(3); boolean wasSet = nr.compareAndSet(3, 5);
从 JDK9 开始,原子变量类被多种方法所丰富,如get
/setPlain()
、get
/setOpaque()
、getAcquire()
以及它们的同伴。要了解这些方法,请看一下 Doug Lea 的《使用 JDK9 内存顺序模式》。
加法器和累加器
在 JavaAPI 文档之后,如果多线程应用更新频繁,但读取频率较低,建议使用LongAdder
、DoubleAdder
、LongAccumulator
、DoubleAccumulator
,而不是AtomicFoo
类。对于这种情况,这些类的设计是为了优化线程的使用。
这意味着,我们不需要使用AtomicInteger
来计算从 1 到 1000000 的整数,而可以使用LongAdder
如下:
public class AtomicAdder implements Runnable { public static LongAdder count = new LongAdder(); @Override public void run() { count.add(1); } public long getCount() { return count.sum(); } }
或者,我们可以使用LongAccumulator
如下:
public class AtomicAccumulator implements Runnable { public static LongAccumulator count = new LongAccumulator(Long::sum, 0); @Override public void run() { count.accumulate(1); } public long getCount() { return count.get(); } }
LongAdder
和DoubleAdder
适用于暗示加法的场景(特定于加法的操作),而LongAccumulator
和DoubleAccumulator
适用于依赖给定函数组合值的场景。
222 重入锁
Lock
接口包含一组锁定操作,可以显式地用于微调锁定过程(它提供比内在锁定更多的控制)。其中,我们有轮询、无条件、定时和可中断的锁获取。基本上,Lock
用附加功能公开了synchronized
关键字的Future
。Lock
接口如下图所示:
public interface Lock { void lock(); void lockInterruptibly() throws InterruptedException; boolean tryLock(); boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException; void unlock(); Condition newCondition(); }
Lock
的实现之一是ReentrantLock
。可重入锁的作用如下:当线程第一次进入锁时,保持计数设置为 1。在解锁之前,线程可以重新进入锁,从而使每个条目的保持计数增加一。每个解锁请求将保留计数减一,当保留计数为零时,将打开锁定的资源。
与synchronized
关键字具有相同的坐标,ReentrantLock
遵循以下实现习惯用法:
Lock / ReentrantLock lock = new ReentrantLock(); ... lock.lock(); try { ... } finally { lock.unlock(); }
对于非公平锁,未指定线程被授予访问权限的顺序。如果锁应该是公平的(优先于等待时间最长的线程),那么使用ReentrantLock(boolean fair)
构造器。
通过ReentrantLock
将 1 到 1000000 之间的整数求和可以如下完成:
public class CounterWithLock { private static final Lock lock = new ReentrantLock(); private static int count; public void counter() { lock.lock(); try { count++; } finally { lock.unlock(); } } }
让我们通过几个线程来使用它:
CounterWithLock counterWithLock = new CounterWithLock(); Runnable task = () -> { counterWithLock.counter(); }; ExecutorService executor = Executors.newFixedThreadPool(8); for (int i = 0; i < 1 _000_000; i++) { executor.execute(task); }
完成!
另外,下面的代码表示一种基于ReentrantLock.lockInterruptibly()
解决问题的习惯用法。绑定到本书的代码附带了一个使用lockInterruptibly()
的示例:
Lock / ReentrantLock lock = new ReentrantLock(); public void execute() throws InterruptedException { lock.lockInterruptibly(); try { // do something } finally { lock.unlock(); } }
如果持有此锁的线程被中断,则抛出InterruptedException
。用lock()
代替lockInterruptibly()
不接受中断。
另外,下面的代码表示使用ReentrantLock.tryLock(long timeout, TimeUnit unit) throws InterruptedException
的习惯用法。本书附带的代码还有一个示例:
Lock / ReentrantLock lock = new ReentrantLock(); public boolean execute() throws InterruptedException { if (!lock.tryLock(n, TimeUnit.SECONDS)) { return false; } try { // do something } finally { lock.unlock(); } return true; }
注意,tryLock()
尝试获取指定时间的锁。如果这段时间过去了,那么线程将不会获得锁。它不会自动重试。如果线程在获取锁的过程中被中断,则抛出InterruptedException
。
最后,绑定到本书的代码附带了一个使用ReentrantLock.newCondition()
的示例。下一个屏幕截图显示了这个成语:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-QcGAKhWJ-1657346000216)(https://github.com/apachecn/apachecn-java-zh/raw/master/docs/java-coding-prob/img/1bb55dfb-35d8-4092-9924-66c157844a72.png)]
223 重入读写锁
通常,读写连接(例如,读写文件)应基于两个语句完成:
- 只要没有编写器(共享悲观锁),读者就可以同时阅读。
- 单个写入程序一次可以写入(独占/悲观锁定)。
下图显示了左侧的读取器和右侧的写入器:
主要由ReentrantReadWriteLock
实现以下行为:
- 为两个锁(读锁和写锁)提供悲观锁语义。
- 如果某些读取器持有读锁,而某个写入器需要写锁,则在写入器释放写锁之前,不允许更多的读取器获取读锁。
- 写入程序可以获得读锁,但读取器不能获得写锁。
对于非公平锁,未指定线程被授予访问权限的顺序。如果锁应该是公平的(优先于等待时间最长的线程),那么使用ReentrantReadWriteLock(boolean fair)
构造器。
ReentrantReadWriteLock
的用法如下:
ReadWriteLock / ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); ... lock.readLock() / writeLock().lock(); try { ... } finally { lock.readLock() / writeLock().unlock(); }
下面的代码表示一个ReentrantReadWriteLock
用例,它读取和写入一个整数量变量:
public class ReadWriteWithLock { private static final Logger logger = Logger.getLogger(ReadWriteWithLock.class.getName()); private static final Random rnd = new Random(); private static final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true); private static final Reader reader = new Reader(); private static final Writer writer = new Writer(); private static int amount; private static class Reader implements Runnable { @Override public void run() { if (lock.isWriteLocked()) { logger.warning(() -> Thread.currentThread().getName() + " reports that the lock is hold by a writer ..."); } lock.readLock().lock(); try { logger.info(() -> "Read amount: " + amount + " by " + Thread.currentThread().getName()); } finally { lock.readLock().unlock(); } } } private static class Writer implements Runnable { @Override public void run() { lock.writeLock().lock(); try { Thread.sleep(rnd.nextInt(2000)); logger.info(() -> "Increase amount with 10 by " + Thread.currentThread().getName()); amount += 10; } catch (InterruptedException ex) { Thread.currentThread().interrupt(); logger.severe(() -> "Exception: " + ex); } finally { lock.writeLock().unlock(); } } ... }
让我们用两个读取器和四个写入器执行 10 次读卡和 10 次写卡:
ExecutorService readerService = Executors.newFixedThreadPool(2); ExecutorService writerService = Executors.newFixedThreadPool(4); for (int i = 0; i < 10; i++) { readerService.execute(reader); writerService.execute(writer); }
可能的输出如下:
[09:09:25] [INFO] Read amount: 0 by pool-1-thread-1 [09:09:25] [INFO] Read amount: 0 by pool-1-thread-2 [09:09:26] [INFO] Increase amount with 10 by pool-2-thread-1 [09:09:27] [INFO] Increase amount with 10 by pool-2-thread-2 [09:09:28] [INFO] Increase amount with 10 by pool-2-thread-4 [09:09:29] [INFO] Increase amount with 10 by pool-2-thread-3 [09:09:29] [INFO] Read amount: 40 by pool-1-thread-2 [09:09:29] [INFO] Read amount: 40 by pool-1-thread-1 [09:09:31] [INFO] Increase amount with 10 by pool-2-thread-1 ...
在决定依赖ReentrantReadWriteLock
之前,请考虑它可能会挨饿(例如,当作家被优先考虑时,读者可能会挨饿)。此外,我们无法将读锁升级为写锁(可以从写入器降级为读取器),并且不支持乐观读。如果其中任何一个问题对您来说很重要,那么请考虑StampedLock
,我们将在下一个问题中研究它。
224 冲压锁
简言之,StampedLock
的性能优于ReentrantReadWriteLock
,支持乐观读取。它不像可重入的;因此容易死锁。主要地,锁获取返回一个戳记(一个long
值),它在finally
块中用于解锁。每次尝试获取一个锁都会产生一个新的戳,如果没有可用的锁,那么它可能会阻塞,直到可用为止。换句话说,如果当前线程持有锁,并且再次尝试获取锁,则可能导致死锁。
StampedLock
读/写编排过程通过以下几种方法实现:
readLock()
:非独占获取锁,必要时阻塞,直到可用。对于获取读锁的非阻塞尝试,我们必须tryReadLock()
。对于超时阻塞,我们有tryReadLock(long time, TimeUnit unit)
。退回的印章用于unlockRead()
。writeLock()
:独占获取锁,必要时阻塞直到可用。对于获取写锁的非阻塞尝试,我们有tryWriteLock()
。对于超时阻塞,我们有tryWriteLock(long time, TimeUnit unit)
。退回的印章用于unlockWrite()
。tryOptimisticRead()
:这是给StampedLock
增加一个大加号的方法。此方法返回一个应通过validate()
标志方法验证的戳记。如果锁当前未处于写入模式,则返回的戳记仅为非零。
readLock()
和writeLock()
的成语非常简单:
StampedLock lock = new StampedLock(); ... long stamp = lock.readLock() / writeLock(); try { ... } finally { lock.unlockRead(stamp) / unlockWrite(stamp); }
试图给tryOptimisticRead()
一个成语可能会导致以下结果:
StampedLock lock = new StampedLock(); int x; // a writer-thread can modify x ... long stamp = lock.tryOptimisticRead(); int thex = x; if (!lock.validate(stamp)) { stamp = lock.readLock(); try { thex = x; } finally { lock.unlockRead(stamp); } } return thex;
在这个习惯用法中,注意初始值(x
)是在获得乐观读锁之后分配给thex
变量的。然后利用validate()
标志法验证了自给定戳的发射度以来,戳锁没有被独占获取。如果validate()
返回false
(相当于在获得乐观锁之后由线程获取写锁),则通过阻塞readLock()
获取读锁,并再次赋值(x
。请记住,如果有任何写锁,读锁可能会阻塞。获取乐观锁允许我们读取值,然后验证这些值是否有任何更改。只有在存在的情况下,我们才能通过阻塞读锁。
下面的代码表示一个StampedLock
用例,它读取和写入一个整数量变量。基本上,我们通过乐观的方式重申了前一个问题的解决方案:
public class ReadWriteWithStampedLock { private static final Logger logger = Logger.getLogger(ReadWriteWithStampedLock.class.getName()); private static final Random rnd = new Random(); private static final StampedLock lock = new StampedLock(); private static final OptimisticReader optimisticReader = new OptimisticReader(); private static final Writer writer = new Writer(); private static int amount; private static class OptimisticReader implements Runnable { @Override public void run() { long stamp = lock.tryOptimisticRead(); // if the stamp for tryOptimisticRead() is not valid // then the thread attempts to acquire a read lock if (!lock.validate(stamp)) { stamp = lock.readLock(); try { logger.info(() -> "Read amount (read lock): " + amount + " by " + Thread.currentThread().getName()); } finally { lock.unlockRead(stamp); } } else { logger.info(() -> "Read amount (optimistic read): " + amount + " by " + Thread.currentThread().getName()); } } } private static class Writer implements Runnable { @Override public void run() { long stamp = lock.writeLock(); try { Thread.sleep(rnd.nextInt(2000)); logger.info(() -> "Increase amount with 10 by " + Thread.currentThread().getName()); amount += 10; } catch (InterruptedException ex) { Thread.currentThread().interrupt(); logger.severe(() -> "Exception: " + ex); } finally { lock.unlockWrite(stamp); } } } ... }
让我们用两个读取器和四个写入器执行 10 次读卡和 10 次写卡:
ExecutorService readerService = Executors.newFixedThreadPool(2); ExecutorService writerService = Executors.newFixedThreadPool(4); for (int i = 0; i < 10; i++) { readerService.execute(optimisticReader); writerService.execute(writer); }
可能的输出如下:
... [12:12:07] [INFO] Increase amount with 10 by pool-2-thread-4 [12:12:07] [INFO] Read amount (read lock): 90 by pool-1-thread-2 [12:12:07] [INFO] Read amount (optimistic read): 90 by pool-1-thread-2 [12:12:07] [INFO] Increase amount with 10 by pool-2-thread-1 ...
从 JDK10 开始,我们可以使用isWriteLockStamp()
、isReadLockStamp()
、isLockStamp()
和isOptimisticReadStamp()
查询戳记的类型。根据类型,我们可以决定合适的解锁方法,例如:
if (StampedLock.isReadLockStamp(stamp)) lock.unlockRead(stamp); }
在捆绑到本书的代码中,还有一个应用,用于举例说明tryConvertToWriteLock()
方法。此外,您可能对开发使用tryConvertToReadLock()
和tryConvertToOptimisticRead()
的应用感兴趣。
225 死锁(哲学家聚餐)
什么是僵局?网上一个著名的笑话解释如下:
面试官:向我们解释一下,我们会雇佣你的!
我:雇佣我,我会向你解释…
简单的死锁可以解释为一个持有L
锁并试图获取P
锁的A
线程,同时,还有一个持有P
锁并试图获取L
锁的B
线程。这种死锁称为循环等待。Java 没有死锁检测和解决机制(就像数据库一样),因此死锁对于应用来说非常尴尬。死锁可能会完全或部分阻塞应用,会导致严重的性能损失、奇怪的行为等等。通常情况下,死锁很难调试,解决死锁的唯一方法是重新启动应用,并希望取得最好的结果。
哲学家吃饭是一个著名的问题,用来说明僵局。这个问题说五位哲学家围坐在一张桌子旁。他们每个人轮流思考和吃饭。为了吃饭,哲学家需要双手叉子左手叉子右手叉子。困难是因为只有五个叉子。吃完后,哲学家把两个叉子放回桌上,然后由另一个重复同样循环的哲学家拿起。当一个哲学家不吃饭时,他/她在思考。下图说明了这种情况:
主要任务是找到解决这个问题的办法,让哲学家们思考和进食,以避免饿死。
在《法典》中,我们可以把每个哲学家看作一个实例。作为Runnable
实例,我们可以在不同的线程中执行它们。每个哲学家都能拿起两个叉子放在他左右两侧。如果我们将叉表示为String
,则可以使用以下代码:
public class Philosopher implements Runnable { private final String leftFork; private final String rightFork; public Philosopher(String leftFork, String rightFork) { this.leftFork = leftFork; this.rightFork = rightFork; } @Override public void run() { // implemented below } }
所以,哲学家可以拿起leftFork
和rightFork
。但是,由于哲学家们共用这些叉子,哲学家必须在这两个叉子上获得唯一的锁。leftFork
上有专用锁,且rightFork
上有专用锁,等于手中有两个叉子。leftFork
和rightFork
上有专属锁,相当于哲学家的饮食。释放两个专属锁就等于哲学家不吃不思。
通过synchronized
关键字可以实现锁定,如下run()
方法:
@Override public void run() { while (true) { logger.info(() -> Thread.currentThread().getName() + ": thinking"); doIt(); synchronized(leftFork) { logger.info(() -> Thread.currentThread().getName() + ": took the left fork (" + leftFork + ")"); doIt(); synchronized(rightFork) { logger.info(() -> Thread.currentThread().getName() + ": took the right fork (" + rightFork + ") and eating"); doIt(); logger.info(() -> Thread.currentThread().getName() + ": put the right fork ( " + rightFork + ") on the table"); doIt(); } logger.info(() -> Thread.currentThread().getName() + ": put the left fork (" + leftFork + ") on the table and thinking"); doIt(); } } }
哲学家从思考开始。过了一会儿他饿了,所以他试着拿起左叉子和右叉子。如果成功,他会吃一会儿。后来,他把叉子放在桌子上,继续思考,直到他又饿了。同时,另一个哲学家会吃东西。
doIt()
方法通过随机睡眠模拟所涉及的动作(思考、进食、采摘和放叉)。代码中可以看到以下内容:
private static void doIt() { try { Thread.sleep(rnd.nextInt(2000)); } catch (InterruptedException ex) { Thread.currentThread().interrupt(); logger.severe(() -> "Exception: " + ex); } }
最后,我们需要福克斯和哲学家,请参见以下代码:
String[] forks = { "Fork-1", "Fork-2", "Fork-3", "Fork-4", "Fork-5" }; Philosopher[] philosophers = { new Philosopher(forks[0], forks[1]), new Philosopher(forks[1], forks[2]), new Philosopher(forks[2], forks[3]), new Philosopher(forks[3], forks[4]), new Philosopher(forks[4], forks[0]) };
每个哲学家都将在一个线程中运行,如下所示:
Thread threadPhilosopher1 = new Thread(philosophers[0], "Philosopher-1"); ... Thread threadPhilosopher5 = new Thread(philosophers[4], "Philosopher-5"); threadPhilosopher1.start(); ... threadPhilosopher5.start();
这个实现似乎还可以,甚至可以正常工作一段时间。但是,此实现迟早会以如下方式阻止输出:
[17:29:21] [INFO] Philosopher-5: took the left fork (Fork-5) ... // nothing happens
这是僵局!每个哲学家都有左手叉(锁在上面),等待右手叉放在桌子上(锁要放了)。显然,这种期望是不能满足的,因为只有五个叉子,每个哲学家手里都有一个叉子。
为了避免这种死锁,有一个非常简单的解决方案。我们只是强迫其中一个哲学家先拿起正确的叉子。在成功地选择了右叉子之后,他可以试着选择左叉子。在代码中,这是对以下行的快速修改:
// the original line new Philosopher(forks[4], forks[0]) // the modified line that eliminates the deadlock new Philosopher(forks[0], forks[4])
这一次我们可以在没有死锁的情况下运行应用。
总结
好吧,就这些!本章讨论了 Fork/Join 框架、CompletableFuture
、ReentrantLock
、ReentrantReadWriteLock
、StampedLock
、原子变量、任务取消、可中断方法、线程局部和死锁等问题
从本章下载应用以查看结果和其他详细信息。**