78 同步访问共享的可变数据
当多个线程共享可变数据的时候,每个读或者写数据的线程都必须执行同步。
关键字 synchronized 可以保证在同一时刻,只有一个线程可以执行某一个方法,或者某一个代码块。
同步不仅可以阻止一个线程看到对象处于不一致的状态之中,它还可以保证进入同步方法或者同步代码块的每个线程,都能看到由同一个锁保护的之前所有的修改效果。
Java语言规范保证读或者写一个变量是原子的(atomic),除非这个变量的类型为long或者double。
虽然语言规范保证了线程在读取原子数据的时候,不会看到任意的数值,但是它并不保证一个线程写入的值对于另一个线程将是可⻅的。为了在线程之间进行可靠的通信,也为了互斥访问,同步是必要的。
这归因于Java语言规范中的内存模型(JMM),它规定了一个线程所做的变化何时以及如何变成对其他线程可⻅
如果对共享的可变数据的访问不能同步,其后果将非常可怕,例如下面这段程序:
public class StopThread { private static Boolean stopRequested; public static void main(String[] args) throws InterruptedException { Thread backgroundThread = new Thread(() -> { int i = 0; while (!stopRequested) i++; }); backgroundThread.start(); TimeUnit.SECONDS.sleep(1); stopRequested = true; } }
我们原本设想的是,这段程序运行大约一秒钟左右,之后主线程将stopRequested设置为true,使后台线程的循环终止。
但实际运行结果却是程序永远不会终止!问题在于,由于没有同步,就不能保证后台线程何时看到主线程对stopRequested的值所做的改变。没有同步,虚拟机将以下代码:
while (!stopRequested) i++;
转变成这样:
if (!stopRequested) while (true) i++;
编译器的优化初衷是好的,但这里却帮了倒忙!
修正这个问题的一种方式是同步访问stopRequested字段。这个程序会如预期般在大约一秒之内终止:
public class StopThread { private static Boolean stopRequested; private static synchronized void requestStop() { stopRequested = true; } private static synchronized Boolean stopRequested() { return stopRequested; } public static void main(String[] args) throws InterruptedException { Thread backgroundThread = new Thread(() -> { int i = 0; while (!stopRequested()) i++; }); backgroundThread.start(); TimeUnit.SECONDS.sleep(1); requestStop(); } }
注意读和写操作都要被同步,否则无法保证同步能起作用。
还是有其他更正确的替代方法,它更加简洁,性能也可能更好。虽然volatile修饰符不执行互斥访问,但它可以保证任何一个线程在读取该字段的时候都将看到最近刚刚被写入的值:
public class StopThread { private static volatile Boolean stopRequested; public static void main(String[] args) throws InterruptedException { Thread backgroundThread = new Thread(() -> { int i = 0; while (!stopRequested) i++; }); backgroundThread.start(); TimeUnit.SECONDS.sleep(1); stopRequested = true; } }
在使用volatile的时候务必要小心。以下面的方法为例,假设它要产生序列号:
private static volatile int nextSerialNumber = 0; public static int generateSerialNumber() { return nextSerialNumber++; }
这段代码是有问题的,原因在于:增量操作符(++)不是原子的。它在nextSerialNumber字段中执行两项操作:
读取nextSerialNumber的值
写回一个新值,相当于nextSerialNumber + 1
如果第二个线程在第一个线程第一步和第二步之间读取nextSerialNumber,第二个线程就会与第一个线程一起看到同一个值,并返回相同的nextSerialNumber。
修正 generateSerialNumber 方法:
private static final Atomiclong nextSerialNum = new Atomiclong(); public static long generateSerialNumber() { return nextSerialNum.getAndIncrement(); }
避免数据不一致的问题的最佳办法是不共享可变的数据或者共享不可变的数据。如果采用这一策略,对它建立文档就很重要,以便它可以随着程序的发展而得到维护。
如果一个线程修改了一个数据对象,然后其他线程也可以读取该对象,只要它没有再被修改。—— 这种对象叫做高效不可变(effectively immutable ) 。
将上面对象从一个线程传递到其他的线程被称作安全发布(safe publication) 。
安全发布对象引用有许多种方法:
将它保存在静态字段中,作为类初始化的一部分
将它保存在volatile字段、final字段或者通过正常锁定访问的字段中
将它放到支持并发的集合中
79 避免过度同步
为了避免死锁和数据破坏,千万不要从同步区字段内部调用外来方法。
在一个被同步的方法或者代码块中,永远不要放弃对客户端的控制。换句话说:在一个被同步的区域内部,不要调用设计成要被覆盖的方法,或者是由客户端以函数对象的形式提供的方法。这样的方法是外来的。
以下面的类为例,它实现了一个可以观察到的集合包装(set wrapper)。该类允许客户端在将元素添加到集合中时预订通知。
观察者(Observer)模式
package com.wjw.effectivejava2; import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Set; public class ObservableSet<E> extends ForwardingSet<E> { public ObservableSet(Set<E> set) { super(set); } private final List<SetObserver<E>> observers = new ArrayList<>(); public void addObserver(SetObserver<E> observer) { synchronized (observers) { observers.add(observer); } } public Boolean removeObserver(SetObserver<E> observer) { synchronized (observers) { return observers.remove(observer); } } private void notifyElementAdded(E element) { synchronized (observers) { for (SetObserver<E> observer : observers) observer.added(this, element); } } @Override public boolean add(E element) { Boolean added = super.add(element); if (added) notifyElementAdded(element); return added; } @Override public boolean addAll(Collection<? extends E> c) { Boolean result = false; for (E element : c) result |= add(element); // Calls notifyElementAdded return result; } }
@FunctionalInterface public interface SetObserver<E> { // Invoked when an element is added to the observable set void added(ObservableSet<E> set, E element); }
如果只是粗略地检验一下,ObservableSet 会显得很正常。例如,下面的程序打印出0〜99的数字:
public static void main(String[] args) { ObservableSet<Integer> set = new ObservableSet<>(new HashSet<>()); set.addObserver((s, e) -> System.out.println(e)); for (int i = 0; i < 100; i++) set.add(i); }
尝试一个更复杂的例子,将上面代码里set.addObserver((s, e) -> System.out.println(e));替换为:
set.addObserver(new SetObserver<>() { public void added(ObservableSet<Integer> s, Integer e) { System.out.println(e); if (e == 23) s.removeObserver(this); } });
注意:这个调用以一个匿名类SetObserver 实例代替了前一个调用中使用的lambda。这是因为函数对象需要将自身传给s.removeObserver ,而lambda则无法访问它们自己
80 executor、task和stream优先于线程
这一节主要是讲线程池的使用。
java.util.concurrent包中包含了一个Executor Framework它是一个很灵活的基于接口的任务执行工具。
ExecutorService exec = Executors.newSingleThreadExecutor();
为执行而提交一个runnable的方法:
exec.execute(runnable);
告诉executor如何优雅地终止
exec.shutdown();
你可以利用executor service完成更多的工作:
1. 等待一个任务集合中的任何任务完成
利用 invokeAny 或 invokeAll
2. 等待executor service优雅地完成终止
awaitTermination
3. 在任务完成时逐个地获取这些任务的结果
ExecutorCompletionService
4. 调度在某个时间段定时运行或周期运行的任务
ScheduledThreadPoolExecutor
为特殊的应用程序选择executor service是很有技巧的:
编写小程序,或者轻量负载的服务器,使用Executors.newCachedThreadPool
大负载的产品服务器中,使用Executors.newFixedThreadPool
但是后面人们又发现,使用原生的Executors线程池可能会带来OOM问题,所以又对此作了专门的要求:
在Executor Framework中,工作单元和执行机制是分开的。工作单元称作任务(task)。任务又有两种:Runnable 及 Callable(会返回值,并且能够抛出任意的异常)
在Java 7中,Executor框架被扩展为支持fork-join任务
这些任务是通过一种称作fork-join池的特殊executor服务运行的。
fork-join任务用ForkJoinTask实例表示,可以被分成更小的子任务,包含ForkJoinPool 的线程不仅要处理这些任务,还要从另一个线程中“偷”任务,以确保所有的线程保持忙碌,从而提高CPU使用率、提高吞吐量,并降低延迟。
并行流Parallel streams就是在fork join池上编写的
81 并发工具优于 wait 和notify
先把结论写出来:
直接使用 wait 方法和 notify 方法就像用“汇编语言”编程一样,而java.util.concurrent 则提供了更高级的语言。没有理由再使用 wait 方法和 notify。
如果你在维护使用wait 方法和 notify方法的代码,务必确保始终是利用在 while 循环内部调用 wait 方法。
应该优先使用 notifyAll 方法,而不是使用 notify 方法。如果使用 notify 方法,一定要确保程序的活性。
并发集合
并发集合为标准的集合接口(如List、Queue 和 Map )提供了高性能的并发实现。为了提供高并发性,这些实现在内部自己管理同步。因此,将并发集合锁定没有什么作用,只会使程序的速度变慢。
例如,下面这个方法模拟了 String.intern 的行为:
private static final ConcurrentMap<String, String> map = new ConcurrentHashMap<>(); public static String intern(String s) { String previousValue = map.putIfAbsent(s, s); return previousValue == null ? s : previousValue; }
String.intern:若常量池中不存在相同的字符串,JVM就会在常量池中创建一个字符串,然后返回该字符串的引用
putIfAbsent:如果所指定的 key 已经在 HashMap 中存在,返回和这个 key 值对应的 value, 如果所指定的 key 不在 HashMap 中存在,则返回 null。
ConcurrentHashMap 对get操作进行了优化。所以可以先用get操作进行条件判断:
public static String intern(String s) { String result = map.get(s); if (result == null) { result = map.putIfAbsent(s, s); if (result == null) result = s; } return result; }
并发集合导致同步的集合大多被废弃了。应该优先使用 ConcurrentHashMap ,而不是使用Collections.synchronizedMap。
有些集合接口已经通过阻塞操作(blocking operation)进行了扩展,它们会一直等待到可以成功执行为止。这样就允许将阻塞队列用于生产者一消费者队列(producer-consumer queue)。
大多数 ExecutorService 实现(包括ThreadPoolExecutor)都使用了一个BlockingQueue
同步器
同步器(Synchronizer)是使线程能够等待另一个线程的对象,允许它们协调动作。最常用的同步器是CountDownLatch 和 Semaphore 。较不常用的是 CyclicBarrier 和 Exchanger。功能最强大的同步器是Phaser。
倒计数锁存器(Countdown Latch)是一次性的障碍,允许一个或者多个线程等待一个或者多个其他线程来做某些事情。
要在这个简单的基本类型之上构建一些有用的东西,做起来相当容易。例如,假设想要构建一个简单的框架,用来给一个动作的并发执行定时。
直接在wait和notify之上实现这个逻辑会很棍乱,而在CountDownLatch之上实现则相当简单:
public static long time(Executor executor, int concurrency, Runnable action) throws InterruptedException { CountDownLatch ready = new CountDownLatch(concurrency); CountDownLatch start = new CountDownLatch(1); CountDownLatch done = new CountDownLatch(concurrency); for (int i = 0; i < concurrency; i++) { executor.execute(() -> { ready.countDown(); // Tell timer we're ready try { start.await(); // Wait till peers are ready action.run(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } finally { done.countDown(); // Tell timer we're done } }); } ready.await(); // Wait for all workers to be ready long startNanos = System.nanoTime(); start.countDown(); // And they're off! done.await(); // Wait for all workers to finish return System.nanoTime() - startNanos; }
代码解释:
这个方法使用了三个倒计数锁存器。第一个是ready ,工作线程用它来告诉 timer 线程它们已经准备好了。然后工作线程在第二个锁存器 start 上等待。
当最后一个工作线程调用ready.countDown时,timer 线程记录下起始时间,并调用start.countDown允许所有的工作线程继续进行。
然后 timer 线程在第三个锁存器 done 上等待,直到最后一个工作线程运行完该动作,并调done.countDown 。此时,timer 线程就会苏醒过来,并记录下结束的时间。
对于间歇式的定时,始终应该优先使用System.nanoTime ,而不是使用System.currentTimeMillis。因为 System.nanoTime 更准确,也更精确,它不受系统时钟调整的影响。
前一个例子中的那三个倒计数锁存器其实可以用一个 CyclicBarrier 或者 Phaser 实例代替。这样得到的代码更加简洁,但是理解起来比较困难。
wait 方法被用来使线程等待某个条件。它必须在同步区域内部被调用,这个同步区域将对象锁定在了调用 wait 方法的对象上。
synchronized (obj) { while (<condition does not hold>) obj.wait(); // (Releases lock, and reacquires on wakeup) ... // Perform action appropriate to condition }
应该使用循环模式来调用wait方法;永远不要在循环之外调用wait方法。循环会在等待之前和之后对条件进行测试:
在等待之前测试条件,当条件已经成立时就跳过等待
在等待之前测试条件,如果条件不成立的话继续等待
一旦出现下面的这几种情况,线程就可能会醒过来:
另一个线程可能意外地或恶意地调用了 notify 方法
公有可访问对象的同步方法中包含的wait 方法都会出现这样的问题
即使只有某些等待线程的条件已经被满足,但是通知线程仍然调用 notifyAll 方法
在没有通知的情况下,等待线程也可能会苏醒过来。这被称为“伪唤醒”(spurious wakeup)
82 文档应包含线程安全性
先把结论写出来:
每个类都应该严谨的描述其线程安全属性
有条件的线程安全类必须记录哪些方法调用序列需要外部同步,以及在执行这些序列时需要获取哪些锁
编写无条件线程安全的类,要使用一个private锁对象来代替同步方法,可以防止客户端和子类的不同步干扰
有一种说法是:“通过查看文档里是否有 synchronized 修饰符,可以判断方法是否是线程安全的”,这种说法是错误的,有如下多种原因:
Javadoc的输出中没有包含 synchronized 修饰符,因为方法声明中 synchronized 修饰符的存在是实现细节,而不是其API的一部分。它不能可靠地表明方法是线程安全的。
此外,上面的说法隐含了一个错误的观念:线程安全要么全有要么全无。实际上,要启用安全的并发使用,类必须清楚地记录它支持的线程安全级别。
线程安全级别:
不可变的 —— 这个类的实例是不可变的,不需要外部同步。
String、Long 和 BigInteger
无条件线程安全 —— 该类的实例是可变的,但是该类具有足够的内部同步,因此无需任何外部同步即可并发地使用该类的实例。
AtomicLong 和 ConcurrentHashMap
有条件的线程安全 —— 与无条件线程安全类似,只是有些方法需要外部同步才能安全并发使用。
Collections.synchronized 包装器返回的集合,其迭代器需要外部同步。
非线程安全 —— 该类的实例是可变的。要并发地使用它们,客户端必须使用外部同步。
ArrayList 和 HashMap
线程对立 —— 即使每个方法调用都被外部同步包围,该类对于并发使用也是不安全的。线程对立通常是由于在不同步的情况下修改静态数据而导致的。
在文档中记录一个有条件的线程安全类需要小心。你必须指出哪些调用序列需要外部同步,以及执行这
些序列必须获得哪些锁。
例如Collections.synchronizedMap 的文档提到:
It is imperative that the user manually synchronize on the returned map when iterating over any of its collection views:
(当遍历任何被返回 Map 的集合视图时,用户必须手工对他们进行同步)
Map<K, V> m = Collections.synchronizedMap(new HashMap<>()); Set<K> s = m.keySet(); // Needn't be in synchronized block ... synchronized(m) { // Synchronizing on m, not s! for (K key : s) key.f(); }
不遵循这个建议可能会导致不确定的行为。
当一个类使用public的可访问锁时,户端可以通过⻓时间持有可public的锁来发起拒绝服务攻击。为了防止这种拒绝服务攻击,可以使用一个私有锁对象:
private final Object lock = new Object(); public void foo(){ synchronized(lock){ ...... } }
lock字段必须声明为final,可以防止无意中更改它的内容,导致灾难性的非同步访问。
私有锁对象用法特别适合为继承而设计的类,如果这样一个类要使用它的实例进行锁定,那么子类很容易在无意中干扰基类的操作,反之亦然。
这种现象在Thread类上就出现过
83 谨慎使用延迟初始化
先把结论写出来:
应该正常初始化大多数字段,而不是延迟初始化
如果必须延迟初始化,以实现性能目标或为了破坏循环依赖,则使用合适的技术
对于字段,使用双重检查模式
对于静态字段,则应该使用 the lazy initialization holder class idiom
对于可以接受重复初始化的字段,可以使用单次检查模式
与大多数优化一样,延迟初始化的最佳建议是「除非需要,否则不要这样做」。
在大多数情况下,常规初始化优于延迟初始化。下面是一个使用常规初始化的实例字段的典型声明。注意用了final修饰符:
private final FieldType field = computeFieldValue();
private final FieldType field = computeFieldValue();
如果使用延迟初始化来取代初始化的循环(circularity),就要使用同步访问方法:
private FieldType field; private synchronized FieldType getField() { if (field == null) field = computeFieldValue(); return field; }
如果需要在静态字段上使用延迟初始化来提高性能,使用 lazy initialization holder class 模式
private static class FieldHolder { static final FieldType field = computeFieldValue(); } private static FieldType getField() { return FieldHolder.field; }
第一次调用getField时,它执行FieldHolder.field初始化FieldHolder类。这个习惯用法的优点是
getField方法不是同步的,所以延迟初始化实际上不会增加访问成本。
如果需要使用延迟初始化来提高实例字段的性能,就使用双重检查模式。
private volatile FieldType field; private FieldType getField() { FieldType result = field; if (result == null) { // First check (no locking) synchronized(this) { if (field == null) // Second check (with locking) field = result = computeFieldValue(); } } return result; }
第一次检查时没有锁定,看看这个字段有没有初始化;第二次检查时锁定了。只有当第二次检查字段未初始化时,才初始化字段。
将FieldType field声明为volatile很重要。
有时候可能需要延迟初始化一个实例字段,该字段可以被重复初始化。使用单次检查模式就可以了:
private volatile FieldType field; private FieldType getField() { FieldType result = field; if (result == null) field = result = computeFieldValue(); return result; }
84 不要依赖线程调度器
先把结论写出来:
不要让应用程序的正确性依赖于线程调度器
不要依赖Thread.yield
线程优先级可以少量地用于提高正常程序的服务质量,但绝不应该用于「修复」不能工作的程序
当许多线程可以运行时,线程调度器决定哪些线程可以运行以及运行多⻓时间。任何合理的操作系统都会尝试公平地做出这个决定,但是策略可能会有所不同。所以任何依赖线程调度器来保证正确性或性能的程序都可能是不可移植的。
编写健壮、响应快、可移植程序的最佳方法是确保可运行线程的平均数量不显著大于处理器的数量。
保持可运行线程数量尽可能少的主要方法是:如果线程没有做有用的工作,它们就不应该运行。这意味着要适当调整线程池的大小,保持任务短小。
任务不要太小,否则分派的开销也会损害性能
线程不应该处于忙-等状态(busy-wait),即应该反复检查一个共享对象,等待它的状态发生变化。会极大增加处理器的负载,降低其他进程的效率。例如下面这种CountDownLatch 就是不正确的实现:
public class SlowCountDownLatch { private int count; public SlowCountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException(count + " < 0"); this.count = count; } public void await() { while (true) { synchronized(this) { if (count == 0) return; } } } public synchronized void countDown() { if (count != 0) count--; } }
如果某一个程序不能工作,原因是由于某些线程相对于其他线程没有获得足够的CPU时间,也不要通过调用Thread.yield来修复,这种代码是不可移植的。
在一个JVM实现上提高性能的yield,在第二个JVM实现上可能会使性能变差
更好的做法是重构应用程序,以减少并发运行线程的数量。
一个相关的技术是调整线程优先级,但线程优先级是Java中最不可移植的特性之一。试图通过调整线程优先级来解决严重的活性问题是不合理的。在找到并修复潜在原因之前,问题很可能会再次出现。