Java 并发基础
标签 : Java基础
线程简述
线程是进程的执行部分,用来完成一定的任务; 线程拥有自己的堆栈,程序计数器和自己的局部变量,但不拥有系统资源, 他与其他线程共享父进程的共享资源及部分运行时环境,因此编程时需要小心,确保线程不会妨碍同一进程中的其他线程;
- 多线程优势
- 进程之间不能共享内存,但线程之间共享内存/文件描述符/进程状态非常容易.
- 系统创建进程时需要为该其分配很多系统资源(如进程控制块),但创建线程的开销要小得多,因此线程实现多任务并发比进程效率高.
- Java语言内置多线程支持,而不是单纯采用底层操作系统API调用, 从而可以简化Java的多线程编程.
线程创建与启动
Java使用java.lang.Thread
代表线程(所有的线程对象必须是Thread
类的实例).使用java.lang.Runnable
java.util.concurrent.Callable
和java.util.concurrent.Future
来代表一段线程执行体(一段顺序执行的代码).一个线程的作用是完成一段程序流的执行,同时子线程的执行还可以跟父线程并行, 两段线程的执行流程没有关系, 父线程还可以继续执行其他的事情.
继承Thread
继承Thread
类,并重写run()
方法(代表线程执行体),然后调用start()
方法来启动线程.
/**
* @author jifang
* @since 16/1/20下午2:32.
*/
public class ThreadStart {
public static void main(String[] args) {
new ConcreteThread().start();
new ConcreteThread("second").start();
for (int i = 0; i < 10; ++i) {
System.out.println(Thread.currentThread().getName() + ": i");
}
}
private static class ConcreteThread extends Thread {
public ConcreteThread() {
}
public ConcreteThread(String name) {
super(name);
}
@Override
public void run() {
for (int i = 0; i < 10; ++i) {
System.out.println(getName() + ": " + i);
}
}
}
}
继承Thread类来创建线程类时,多个线程之间无法共享线程类的实例变量.
实现Runnable
实现Runnable
接口,重写run()
方法(同样代表线程执行体),并将该类实例作为Thread
的target
提交给线程执行.
/**
* @author jifang
* @since 16/1/20下午2:47.
*/
public class RunnableStart {
public static void main(String[] args) {
Runnable runnable = new ConcreteRunnable();
new Thread(runnable, "first").start();
new Thread(runnable).start();
for (int i = 0; i < 10; ++i) {
System.out.println(Thread.currentThread().getName() + " " + i);
}
}
private static class ConcreteRunnable implements Runnable {
private int i = 0;
@Override
public void run() {
for (; i < 10; ++i) {
System.out.println(Thread.currentThread().getName() + " " + i);
}
}
}
}
运行上例可以看到i
值重复的现象,这是因为有多个线程都在修改同一个i
值, 对于并发修改共享资源的情况,需要添加同步机制保护,详见下面.
Runnable
对象仅作为Thread对象的target
,其包含的run()
方法仅作为线程执行体.实际的线程对象依然是Thread
实例, 只是该Thread
线程执行的是target
的run()
方法.
Callable与Future
Callable
接口提供一个call()
方法作为线程执行体,相比于run()
,call()
可以有返回值,还可以声明抛出异常.但它并不是Runnable
接口的子接口, 所以不能直接作为target
执行.因此Java又提供了Future
接口来代表Callable
中call()
方法的返回值,并提供java.util.concurrent.FutureTask
类实现Callable
与Runnable
接口(其实现了RunnableFuture
接口,该接口同时继承了Runnable
Future
),以作为Thread
的target
.
Future
提供如下方法控制与其关联的Callable
:
方法 | 释义 |
---|---|
boolean cancel(boolean mayInterruptIfRunning) |
Attempts to cancel execution of this task. |
V get() |
Waits if necessary for the computation to complete, and then retrieves its result. |
V get(long timeout, TimeUnit unit) |
Waits if necessary for at most the given time for the computation to complete, and then retrieves its result, if available. |
boolean isCancelled() |
Returns true if this task was cancelled before it completed normally. |
boolean isDone() |
Returns true if this task completed. |
Callable
创建并启动线程的步骤如下:
- 实现
Callable
接口并重写call()
方法; - 使用
FutureTask
类包装Callable
对象; - 将
FutureTask
实例提交给Thread
并启动新线程; - 使用
FutureTask
的get()
获取子线程执行结束后的返回值.
/**
* @author jifang
* @since 16/1/20下午3:00.
*/
public class CallableStart {
public static void main(String[] args) throws ExecutionException, InterruptedException {
RunnableFuture<Integer> task = new FutureTask<>(new ConcreteCallable());
new Thread(task).start();
while (true) {
System.out.println("主线程在干其他事情...");
if (task.isDone()) {
System.out.println("子线程返回值: " + task.get());
break;
}
Thread.sleep(5);
}
}
private static class ConcreteCallable implements Callable<Integer> {
@Override
public Integer call() throws Exception {
int total = 0;
for (int i = 0; i < 100; ++i) {
Thread.sleep(10);
total += i;
}
return total;
}
}
}
由于实现
Runnable
和Callable
的方式可以让多个线程共享同一个target
,因此适用于多个线程处理同一份资源的情况,从而将CPU/代码/数据分开.
线程生命周期
当线程被new
出并start
后,他既不是马上就进入执行状态, 也不会一直处于执行状态, 一个线程会经过新建NEW
-> 就绪RUNNABLE
-> 运行RUNNING
-> 阻塞BLOCKED
-> 死亡DEAD
五种状态切换.
1. 新建New
当new
出一个Thread
后,该线程处于新建状态,此时他和其他Java对象一样,仅由JVM为其分配内存.并没有表现出任何线程的动态特征.
2. 就绪Runnable
当线程对象调用start()
后,该线程处于就绪状态,JVM会为其创建方法调用栈(Stack Trace)/线程控制块/程序计数器(PC),处于这个状态的线程表示是可以运行的.但何时运行,取决于JVM里线程调度器的调度.
3. 运行Running
如果处于就绪状态的线程一旦获得了CPU,就开始执行run()
方法中的线程执行体,则线程进入运行状态.
4. 阻塞Blocked
当发生如下情况时,线程会进入阻塞状态
- 线程调用
sleep()
主动放弃处理器; - 线程调用阻塞IO, 其IO资源未到;
- 线程试图获得同步监视器, 但同步监视器被其他线程持有;
- 线程等待某个通知
wait()
; - 调用了线程的
suspend()
方法(该方法将导致线程挂起,但这样容易导致死锁,不建议使用[详细见线程同步]).
当前线程被阻塞之后, 其他线程就可以获得执行的机会.
当发生如下情况, 线程可以解除阻塞, 重新进入就绪:
- 线程
sleep()
到达指定时间; - 阻塞IO返回;
- 成功获得同步监视器;
- 线程收到了其他线程发出的通知
notify()
; - 被
suspend()
的线程被调用了resume()
恢复方法;
被阻塞的线程会在合适的时候重新进入
就绪状态
.
5. 线程死亡
run()
/call()
方法执行完成, 线程正常结束;- 线程抛出未捕获的
Exception
或Error
; - 直接调用线程的
stop()
方法结束该线程(该方法容易导致死锁,不建议使用).
一旦子线程启动起来后,就拥有和父线程相同的地位,不会受父线程的任何影响(因此当主线程结束时,其他线程不会同主线程一起结束).
为了测试某个线程是否生存, 可以调用Thread
实例的isAlive()
方法(就绪/运行/阻塞返回true, 新建/死亡返回false).
不要试图对已经死亡的线程调用
start()
方法, 死亡线程将不可再次作为线程执行.否则会抛出java.lang.IllegalThreadStateException
.
线程池
线程池会在系统启动时即创建大量空闲线程,然后将一个Runnable/Callable
对象提交给线程池,池就会分配/创建一个线程来执行他们的run()
/call()
,任务执行结束,该线程并不会死亡,而是再次返回池中变为空闲状态,等待执行下一个任务;
线程池不仅可以避免每当有新任务就启动一个新线程带来的系统开销,而且可以有效控制系统中并发线程的数量,一旦系统中的线程超过一定数量,将导致系统性能剧烈下降,甚至JVM崩溃,而线程池可以设置最大线程数以防止线程数超标.
Java提供java.util.concurrent.Executors
工厂类来生产线程池, 该工厂类提供如下静态方法:
方法 | 释义 |
---|---|
static ExecutorService newCachedThreadPool() |
Creates a thread pool that creates new threads as needed, but will reuse previously constructed threads when they are available. |
static ExecutorService newFixedThreadPool(int nThreads) |
Creates a thread pool that reuses a fixed number of threads operating off a shared unbounded queue. |
static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) |
Creates a thread pool that can schedule commands to run after a given delay, or to execute periodically. |
static ExecutorService newSingleThreadExecutor() |
Creates an Executor that uses a single worker thread operating off an unbounded queue. |
static ScheduledExecutorService newSingleThreadScheduledExecutor() |
Creates a single-threaded executor that can schedule commands to run after a given delay, or to execute periodically. |
上面这些方法还有都有一个重载方法,需要使用java.util.concurrent.ThreadFactory
参数,ThreadFactory
是一个接口,用于自定义线程的创建策略.
1.java.util.concurrent.ExecutorService
代表尽快执行任务的线程池,当有任务执行时,只需将RunnableCallable
实例submit()
给线程池就好(只池中有空闲线程,就立即执行任务),ExecutorService
提供如下方法来提交任务:
方法 | 描述 |
---|---|
<T> Future<T> submit(Callable<T> task) |
Submits a value-returning task for execution and returns a Future representing the pending results of the task. |
Future<?> submit(Runnable task) |
Submits a Runnable task for execution and returns a Future representing that task. |
<T> Future<T> submit(Runnable task, T result) |
Submits a Runnable task for execution and returns a Future representing that task. |
Java为ExecutorService
提供了一个java.util.concurrent.ThreadPoolExecutor
实现类,该类有如下构造方法:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
// ...
}
因此, 如果默认的线程池策略(如最[小/大]线程数/线程等待时间)不能满足我们的需求,我们可以自定义线程池策略.
2.ScheduledExecutorService
线程池是ExecutorService
的子接口,代表可以在指定延迟后或周期性执行线程任务.它提供了如下方法来提交任务:
方法 |
---|
<V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) |
ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) |
ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) |
ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) |
其释义可以参考JDK文档;
/**
* @author jifang
* @since 16/1/20下午9:47.
*/
public class ThreadPool {
public static void main(String[] args) {
ExecutorService pool = getThreadPool();
pool.submit(new ConcreteRunnable());
pool.submit(new ConcreteRunnable());
pool.shutdown();
}
private static ExecutorService getThreadPool() {
return Executors.newCachedThreadPool(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r);
}
});
// return Executors.newCachedThreadPool();
// return Executors.newFixedThreadPool(2);
// return Executors.newSingleThreadExecutor();
}
private static class ConcreteRunnable implements Runnable {
@Override
public void run() {
for (int i = 0; i < 10; ++i) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(Thread.currentThread().getName() + " " + i);
}
}
}
}
- 使用自定义策略的线程池,提交
Callable
任务
public class ThreadPool {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService pool = getThreadPool();
Future<Integer> task1 = pool.submit(new ConcreteCallable());
Future<Integer> task2 = pool.submit(new ConcreteCallable());
System.out.println(task1.isDone());
System.out.println(task2.isDone());
pool.shutdown();
System.out.println("task1 " + task1.get());
System.out.println("task2 " + task2.get());
}
private static ExecutorService getThreadPool() {
return new ThreadPoolExecutor(5, 20, 20L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
}
private static class ConcreteCallable implements Callable<Integer> {
@Override
public Integer call() throws Exception {
int sum = 0;
for (int i = 0; i < 100; ++i) {
Thread.sleep(10);
sum += i;
}
return sum;
}
}
}
用完一个线程池后, 应该调用该线程池的
shutdown()
方法,该方法将启动线程池关闭序列,不再接收新任务,但会将以前所有已提交的任务尽快执行完成.所有任务都执行完,池中所有线程都会死亡.
线程控制
Java提供了一些工具方法,通过这些方法可以控制线程的执行.
Join
join()
方法可以一个让线程等待另一个线程执行完成: 调用线程被阻塞,知道被join()
的线程执行完成.
该方法通常由主线程调用,将大问题划分成小问题,每个小问题分配一个线程执行,当所有的小问题处理完成,再由主线程来做最后处理.如多线程排序,将一个大的排序任务分割为几个小块,分配给几个线程,当所有子线程执行完成后,再由主线程进行归并:
/**
* @author jifang
* @since 16/1/21上午11:18.
*/
public class MultiThreadSort {
private static final int THREAD_COUNT = 12; /*12个线程分段排序*/
private static final int NUMBER_COUNT = 201600;
private static final int PER_COUNT = NUMBER_COUNT / THREAD_COUNT;
private static final int RANDOM_LIMIT = 10000000;
public static void main(String[] args) throws InterruptedException {
// 为数组分配随机值, 为了方便查看, 为其分配10000000以内的值
Random random = new Random();
int[] array = new int[NUMBER_COUNT];
for (int i = 0; i < NUMBER_COUNT; ++i) {
array[i] = random.nextInt(RANDOM_LIMIT);
}
List<Thread> threadList = new LinkedList<>();
for (int index = 0; index < THREAD_COUNT; ++index) {
Thread t = new Thread(new SortRunnable(array, index * PER_COUNT, (index + 1) * PER_COUNT));
t.start();
threadList.add(t);
}
// 等待线程排序完成
join(threadList);
// 分段合并
int[] result = merge(array, PER_COUNT, THREAD_COUNT);
if (check(result)) {
System.out.println("correct");
}
}
private static boolean check(int[] array) {
for (int i = 0; i < array.length - 1; ++i) {
if (array[i] > array[i + 1]) {
System.out.println("error");
return false;
}
}
return true;
}
private static void join(List<Thread> threads) throws InterruptedException {
for (Thread thread : threads) {
thread.join();
}
}
/**
* 分段合并
*
* @param array 已经分段排好序的数组
* @param size 每段的长度
* @param count 一共的段数
* @return
*/
private static int[] merge(int[] array, int size, int count) {
// indexes保存array每段的起始位置
int[] indexes = new int[count];
for (int i = 0; i < count; ++i) {
indexes[i] = i * size;
}
int[] result = new int[array.length];
// i保存result下标
for (int i = 0; i < result.length; ++i) {
int minNumber = Integer.MAX_VALUE;
int minIndex = 0;
// 内层for循环的作用: 找出这count段中最小的那个值
for (int index = 0; index < indexes.length; ++index) {
// indexes[index]: 当前段的起始位置
if ((indexes[index] < (index + 1) * size) && (array[indexes[index]] < minNumber)) {
minNumber = array[indexes[index]];
minIndex = index;
}
}
result[i] = minNumber;
indexes[minIndex]++;
}
return result;
}
private static class SortRunnable implements Runnable {
private int[] array;
private int start;
private int end;
public SortRunnable(int[] array, int start, int end) {
this.array = array;
this.start = start;
this.end = end;
}
@Override
public void run() {
// 分段排序
Arrays.sort(array, start, end);
}
}
}
join()
还其他重载形式,可以设定主调线程的最长等待时间.
后台线程
后台线程的任务是为其他线程提供服务,又被成为”守护线程”, JVM的垃圾回收线程就是典型的后台守护线程.
调用Thread
对象的setDaemon(true)
方法可以将指定线程设置成后台线程(在start()
之前),isDaemon()
可以判断是否为后台线程(主线程默认是非后台线程, 非后台线程创建的默认是非后台线程, 反之亦然).
后台线程的特征: 所有前台线程死亡, 后台线程会自动死亡.
Sleep
前面多次看到在线程的执行过程中调用sleep()
让线程睡眠(进入阻塞状态),以模拟耗时的操作. 其方法签名如下:
static void sleep(long millis) throws InterruptedException;
由于sleep()
会抛出CheckedException
,因此可以将其包装一下:
/**
* @author jifang
* @since 16/1/23 下午9:17.
*/
public class SleepUtil {
public static void sleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
sleep()
还有重载形式, 但不常用.
Yield
yield()
方法让当前正在执行的线程暂停,但不是阻塞线程,而是让该线程转入就绪状态,重新等待调度.
实际上,当某个线程调用yield()
方法让出处理器资源后,只有优先级与当前线程相同,或优先级比当前线程更高的处于就绪的线程才会获得执行机会, 因此完全有可能线程转入就绪后,调度器又将其调度出来重新执行.
注意:
yield()
方法可移植性并不是很好, 而且很有可能导致死锁.所以并不推荐使用(详细见线程同步).
线程优先级
每个线程都具有一定的优先级,优先级高的线程获得更多的执行机会;默认情况下,每个子线程的优先级与子父线程相同(默认main线程具有普通优先级).
Thread类提供了setPriority(int newPriority)
/getPriority()
方法来设置/获取线程优先级.newPriority
的范围为1~10,但由于这些级别需要操作系统的支持,但不同操作系统的优先级策略并不相同,因此推荐使用Thread类提供了三个静态常量进行设置:
/**
* The minimum priority that a thread can have.
*/
public final static int MIN_PRIORITY = 1;
/**
* The default priority that is assigned to a thread.
*/
public final static int NORM_PRIORITY = 5;
/**
* The maximum priority that a thread can have.
*/
public final static int MAX_PRIORITY = 10;
/**
* @author jifang
* @since 16/1/21上午11:12.
*/
public class ThreadPriority {
public static void main(String[] args) {
Thread low = new Thread(new PriorityRunnable(), "low");
low.setPriority(Thread.MIN_PRIORITY);
Thread mid = new Thread(new PriorityRunnable(), "mid");
mid.setPriority(Thread.NORM_PRIORITY);
Thread high = new Thread(new PriorityRunnable(), "high");
high.setPriority(Thread.MAX_PRIORITY);
start(low, mid, high);
}
private static void start(Thread... threads) {
for (Thread thread : threads) {
thread.start();
}
}
private static class PriorityRunnable implements Runnable {
@Override
public void run() {
for (int i = 0; i < 10; ++i) {
System.out.println(Thread.currentThread().getName() + " " + i);
}
}
}
}
线程同步
模拟银行取钱的场景,无线程同步:
- 账户
/**
* 银行账户
*
* @author jifang
* @since 16/1/21下午2:05.
*/
public class Account {
private double balance;
public Account() {
}
public Account(double balance) {
this.balance = balance;
}
public double getBalance() {
return balance;
}
public void reduceBalance(double count) {
this.balance -= count;
}
}
- 甲/乙线程取钱
/**
* @author jifang
* @since 16/1/21下午2:09.
*/
public class DrawMoney {
public static void main(String[] args) {
Runnable r = new DrawRunnable(new Account(800), 300);
new Thread(r, "甲").start();
new Thread(r, "乙").start();
}
private static class DrawRunnable implements Runnable {
private final Account account;
private double money;
public DrawRunnable(Account account, double money) {
this.account = account;
this.money = money;
}
@Override
public void run() {
while (true) {
if (account.getBalance() > money) {
System.out.println(Thread.currentThread().getName() + "取钱" + money + "成功");
try {
Thread.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
account.reduceBalance(money);
System.out.println("\t" + Thread.currentThread().getName() + "成功后的余额: " + account.getBalance());
} else {
System.out.println(Thread.currentThread().getName() + "取钱失败");
System.out.println("\t" + Thread.currentThread().getName() + "失败后的余额: " + account.getBalance());
break;
}
}
}
}
}
运行如上程序, 很有可能会产生余额为负的情况(但现实中这是不可能存在的,出现这样的结果就说明是我们的程序出错了).
synchronized
之所以会出现上面的情况, 是因为run()
方法不具有线程安全性(当账户余额为500时, 甲乙两个线程的account.getBalance() > money
都返回true(为了增大这类事件产生的概率,线程会在判断完之后会sleep
1毫秒以等待另一个线程),这样两个线程都会去取款300,因此会导致余额出现-100的情况).
为了解决该问题, Java多线程引入了同步监视器synchronized
关键字;被synchronized
保护的代码称之为同步代码块,线程开始执行同步代码块之前, 必须先获得对同步监视器的锁定.
任何时刻只能有一个线程获得对同步监视器的锁定,当同步代码执行完后,该线程会自动释放对同步监视器的锁定.
@Override
public void run() {
while (true) {
synchronized (account) {
if (account.getBalance() > money) {
System.out.println(Thread.currentThread().getName() + "取钱" + money + "成功");
try {
Thread.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
account.reduceBalance(money);
System.out.println("\t" + Thread.currentThread().getName() + "成功后的余额: " + account.getBalance());
} else {
System.out.println(Thread.currentThread().getName() + "取钱失败");
System.out.println("\t" + Thread.currentThread().getName() + "失败后的余额: " + account.getBalance());
break;
}
}
}
}
推荐使用可能被并发访问的共享资源作为同步监视器.
synchronized
关键词还可以用于修饰方法,该方法称之为同步方法,同步方法锁定的同步监视器是this
,也就是调用方法的对象.我们可将取钱的操作移动到Account
中,并将该类改造成线程安全类:
/**
* @author jifang
* @since 16/1/21下午2:05.
*/
public class Account {
// ...
public synchronized boolean draw(double money) {
if (getBalance() > money) {
System.out.println(Thread.currentThread().getName() + "取钱" + money + "成功");
try {
Thread.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
balance -= money;
System.out.println("\t" + Thread.currentThread().getName() + "成功后的余额: " + getBalance());
return true;
} else {
System.out.println(Thread.currentThread().getName() + "取钱失败");
System.out.println("\t" + Thread.currentThread().getName() + "失败后的余额: " + getBalance());
return false;
}
}
}
这样Thread的run()
方法则变得非常简单:
@Override
public void run() {
while (account.draw(money)) {
}
}
synchronized
可以修饰方法,也可以修改代码块,但不能修饰构造器,成员变量等.
同步监视器释放
释放同步监视器锁定:
- 当前线程的同步方法/同步代码块执行结束, 释放同步监视器;
- 当前线程在同步代码块/同步方法中遇到
break/return
终止该代码块/方法的执行, 释放同步监视器. - 当前线程在同步代码块/同步方法中出现了未处理的
Error
/Exception
, 导致异常结束, 释放同步监视器. - 当前线程调用了同步对象的
wait()
方法,当前线程暂停,并释放同步监视器.
不释放同步监视器:
- 程序调用
Thread.sleep()
/Thread.yield()
方法暂停当前线程执行. - 其他线程调用当前线程的
suspend()
方法将线程挂起.
wait/notify
现在系统中有两个线程,分别执行存钱/取钱,考虑这样一种特殊的需求:”要求存钱者和取钱着不断地重复存钱/取钱动作,同时规定不允许连续两次存钱, 也不允许两次连续取钱”.
可以借助Object
类提供的wait()
/notify()
/notifyAll()
三个方法来完成这一需求,但这三个方法必须由同步监视器对象来调用,因此可以分为以下两种情况:
- 使用
synchronized
修饰的同步方法, 由于this
就是同步监视器,所以可以在同步方法里面直接调用这三个方法. - 使用
synchronized
修饰的同步代码块, 由于同步监视器是synchronized
括号中的对象, 所以必须使用该对象调用这三个方法.
方法 | 释义 |
---|---|
void wait() |
Causes the current thread to wait until another thread invokes the notify() method or the notifyAll() method for this object. (注意: 调用wait() 方法的当前线程会释放对同步监视器的锁定) |
void notify() |
Wakes up a single thread that is waiting on this object’s monitor. |
void notifyAll() |
Wakes up all threads that are waiting on this object’s monitor. |
/**
* @author jifang
* @since 16/1/24 下午4:35.
*/
public class Account {
private double balance = 0.0;
/*haveBalance标识当前账户是否还有余额*/
private boolean haveBalance = false;
public double getBalance() {
return balance;
}
/**
* 取钱
*
* @param amount
*/
public synchronized void draw(double amount) throws InterruptedException {
// 如果没有存款, 则释放锁定, 持续等待
while (!haveBalance) {
wait();
}
System.out.printf("%s执行取钱操作", Thread.currentThread().getName());
balance -= amount;
System.out.printf(", 当前余额%f%n", balance);
haveBalance = false;
// 唤醒其他线程
notifyAll();
}
/**
* 存钱
*
* @param amount
*/
public synchronized void deposit(double amount) throws InterruptedException {
// 如果有存款, 则释放锁定, 持续等待
while (haveBalance) {
wait();
}
System.out.printf("%s执行存钱操作", Thread.currentThread().getName());
balance += amount;
System.out.printf(", 当前余额%f%n", balance);
haveBalance = true;
// 唤醒其他线程
notifyAll();
}
}
public class Depositor {
public static void main(String[] args) {
Account account = new Account();
new Thread(new DrawMethod(account, 100), "- 取钱者").start();
new Thread(new DepositMethod(account, 100), "+ 存钱者").start();
}
private static class DrawMethod implements Runnable {
private Account account;
private double amount;
public DrawMethod(Account account, double amount) {
this.account = account;
this.amount = amount;
}
@Override
public void run() {
while (true) {
try {
account.draw(amount);
SleepUtil.sleep(500);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}
private static class DepositMethod implements Runnable {
private Account account;
private double amount;
public DepositMethod(Account account, double amount) {
this.account = account;
this.amount = amount;
}
@Override
public void run() {
while (true) {
try {
SleepUtil.sleep(500);
account.deposit(amount);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}
}
Lock
从1.5开始,Java提供了另外一种线程同步机制Lock
,Lock
提供比synchronized
更广泛的锁定操作,并且支持多个相关的Condition
. java.util.concurrent.locks.Lock
和java.util.concurrent.locks.ReadWriteLock
是Java提供的两类锁的根接口,并且为Lock
提供了ReentrantLock
/ReentrantReadWriteLock.ReadLock
/ReentrantReadWriteLock.WriteLock
实现, 为ReadWriteLock
提供ReentrantReadWriteLock
实现.
Lock
很容易实现对共享资源的互斥访问:每次只能有一个线程对Lock
加锁,线程在访问共享资源之前应先获得Lock
对象并lock()
, 在访问结束之后要unlock()
.
ReentrantLock
表示可重入锁,也就是说一个线程可以对已被加锁的ReentrantLock
锁再次加锁,ReentrantLock
对象会维持一个计数器来追踪lock()
方法的嵌套调用,所以一段被锁保护的代码可以调用另一个被相同锁保护的方法.
- 使用
Lock
实现生产者/消费者模式
/**
* 注意: add/reduce一定要确保使用的是同一把锁
*
* @author jifang
* @since 16/1/21下午4:52.
*/
public class Repository {
/*使用mutex保护count*/
private final Lock mutex = new ReentrantLock();
private int count;
private int limit;
public Repository(int count, int limit) {
this.count = count;
this.limit = limit;
}
private boolean canAdd(int count) {
return this.count + count <= limit;
}
private boolean canReduce(int count) {
return this.count - count >= 0;
}
public boolean add(int count) {
try {
// + 加锁
mutex.lock();
if (canAdd(count)) {
SleepUtil.sleep(80);
this.count += count;
return true;
}
} finally {
// + 解锁
mutex.unlock();
}
return false;
}
public boolean reduce(int count) {
try {
// - 加锁
mutex.lock();
if (canReduce(count)) {
SleepUtil.sleep(80);
this.count -= count;
return true;
}
} finally {
// - 解锁
mutex.unlock();
}
return false;
}
public int getCount() {
return count;
}
}
/**
* @author jifang
* @since 16/1/21下午5:02.
*/
public class ProducerConsumer {
public static void main(String[] args) {
ExecutorService pool = getExecutor();
Repository repository = new Repository(0, 100);
pool.submit(new Producer(repository, 30));
pool.submit(new Producer(repository, 30));
pool.submit(new Consumer(repository, 1));
pool.submit(new Consumer(repository, 2));
}
private static ExecutorService getExecutor() {
return new ThreadPoolExecutor(10, 20, 20, TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>());
}
private static class Producer implements Runnable {
private Repository repository;
private int produceCount;
public Producer(Repository repository, int produceCount) {
this.repository = repository;
this.produceCount = produceCount;
}
@Override
public void run() {
while (true) {
try {
Thread.sleep(700);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
if (repository.add(produceCount)) {
System.out.println("+ 生产者" + Thread.currentThread().getName() + " 生产<" + produceCount + ">个产品 -- 成功");
} else {
System.out.println("+ 生产者" + Thread.currentThread().getName() + " 生产<" + produceCount + ">个产品 -- 失败");
}
System.out.println("当前仓库共有" + repository.getCount() + "个产品");
}
}
}
private static class Consumer implements Runnable {
private Repository repository;
private int consumeCount;
public Consumer(Repository repository, int consumeCount) {
this.repository = repository;
this.consumeCount = consumeCount;
}
@Override
public void run() {
while (true) {
try {
Thread.sleep(300);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
if (repository.reduce(consumeCount)) {
System.out.println("- 消费者" + Thread.currentThread().getName() + " 消费<" + consumeCount + ">个产品 -- 成功");
} else {
System.out.println("- 消费者" + Thread.currentThread().getName() + " 消费<" + consumeCount + ">个产品 -- 失败");
}
System.out.println("当前仓库共有" + repository.getCount() + "个产品");
}
}
}
}
使用
Lock
对象进行线程同步, 当加锁/释放锁出现在不同的作用范围时, 通常建议使用finally
块来确保一定会释放锁.
相对于Lock
, synchronized
则更加方便,可以避免很多锁的常见编程错误,但Lock
却提供了synchronized
所没有功能,比如用于tryLock()
尝试加锁, 获取可中断锁的lockInterruptibly()
, 还有在等待时间内获取锁的tryLock(long time, TimeUnit unit)
方法.
Condition
使用Lock
来保证互斥时,Java提供了一个java.util.concurrent.locks.Condition
来保持协调,使用Condition
可以让那些已经得到Lock
的线程无法继续执行的而释放Lock
,也可以唤醒其他等待在该Condition
上的线程. Condition
实例需要绑定在一个Lock
对象上,使用Lock
来保护Condition
,因此调用Lock
的newCondition()
方法才能获得Condition
实例. 他提供了如下方法:
方法 | 释义 |
---|---|
void await() |
Causes the current thread to wait until it is signalled or interrupted(当前线程会释放对Lock 的锁定). |
void signal() |
Wakes up one waiting thread. |
void signalAll() |
Wakes up all waiting threads. |
下面使用Condition
来模拟生产者/消费者模型(当仓库中有产品时才能消费, 当仓库未满时才能生产):
public class Repository {
/*使用mutex保护condition*/
private final Lock mutex = new ReentrantLock();
private final Condition addCondition = mutex.newCondition();
private final Condition reduceCondition = mutex.newCondition();
private int count;
private int limit;
public Repository(int count, int limit) {
this.count = count;
this.limit = limit;
}
private boolean canAdd(int count) {
return this.count + count <= limit;
}
private boolean canReduce(int count) {
return this.count - count >= 0;
}
public void add(int count) {
try {
// + 加锁
mutex.lock();
while (!canAdd(count)) {
System.out.printf("+... 生产者 %s is waiting...%n", Thread.currentThread().getName());
addCondition.await();
}
this.count += count;
System.out.printf("+ %s生产成功, 当前产品数%d%n", Thread.currentThread().getName(), getCount());
// 唤醒消费线程
reduceCondition.signalAll();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
// + 解锁
mutex.unlock();
}
SleepUtil.sleep(80);
}
public void reduce(int count) {
try {
// - 加锁
mutex.lock();
while (!canReduce(count)) {
System.out.printf("-... 消费者 %s is waiting...%n", Thread.currentThread().getName());
reduceCondition.await();
}
this.count -= count;
System.out.printf("- %s消费成功, 当前产品数%d%n", Thread.currentThread().getName(), getCount());
// 唤醒生产线程
addCondition.signalAll();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
// - 解锁
mutex.unlock();
}
SleepUtil.sleep(80);
}
private int getCount() {
return count;
}
}
public class ProducerConsumer {
public static void main(String[] args) {
ExecutorService pool = getExecutor();
Repository repository = new Repository(0, 100);
pool.submit(new Producer(repository, 30));
pool.submit(new Producer(repository, 30));
pool.submit(new Consumer(repository, 1));
pool.submit(new Consumer(repository, 2));
}
private static ExecutorService getExecutor() {
return new ThreadPoolExecutor(10, 20, 20, TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>());
}
private static class Producer implements Runnable {
private Repository repository;
private int produceCount;
public Producer(Repository repository, int produceCount) {
this.repository = repository;
this.produceCount = produceCount;
}
@Override
public void run() {
while (true) {
repository.add(produceCount);
}
}
}
private static class Consumer implements Runnable {
private Repository repository;
private int consumeCount;
public Consumer(Repository repository, int consumeCount) {
this.repository = repository;
this.consumeCount = consumeCount;
}
@Override
public void run() {
while (true) {
repository.reduce(consumeCount);
}
}
}
}
ThreadLocal
java.lang.ThreadLocal
和其他所有的同步机制一样,都是为了解决多个线程中对同一共享变量(临界资源)的访问冲突问题,不过其他同步机制(synchronized
/Lock
)都是通过加锁来实现多个线程对同一变量的安全访问. 而ThreadLocal
则从另外一个角度来解决这个问题– 将临界资源进行复制(使其不再是临界资源),每个线程都拥有一份,从而也就没有必要再对其同步. ThreadLocal
代表一个线程局部变量,他为每一个使用该变量的线程都提供一个副本,使得每一个线程都可以独立的改变自己的副本,而不会和其他的线程冲突(好像每个线程都完全拥有该变量).
ThreadLocal
提供如下方法:
方法 | 释义 |
---|---|
T get() |
Returns the value in the current thread’s copy of this thread-local variable. |
void remove() |
Removes the current thread’s value for this thread-local variable. |
void set(T value) |
Sets the current thread’s copy of this thread-local variable to the specified value. |
/**
* @author jifang
* @since 16/1/23 下午6:14.
*/
public class Account {
private ThreadLocal<Long> balance = new ThreadLocal<>();
public Account(Long balance) {
this.balance.set(balance);
}
public Long getBalance() {
return balance.get();
}
public void setBalance(Long balance) {
this.balance.set(balance);
}
}
/**
* @author jifang
* @since 16/1/23 下午6:10.
*/
public class ThreadLocalClient {
public static void main(String[] args) {
// 同一个Account
Account account = new Account(888L);
new Thread(new Writer(account)).start();
new Thread(new Reader(account)).start();
// 主线程
for (int i = 0; i < 10; ++i) {
sleep(10);
System.out.println(Thread.currentThread().getName() + "-balance: " + account.getBalance());
}
}
private static class Writer implements Runnable {
private Account account;
public Writer(Account account) {
this.account = account;
}
@Override
public void run() {
for (int i = 0; i < 10; ++i) {
account.setBalance((long) i);
sleep(10);
System.out.println(Thread.currentThread().getName() + "-balance: " + account.getBalance());
}
}
}
private static class Reader implements Runnable {
private Account account;
public Reader(Account account) {
this.account = account;
}
@Override
public void run() {
for (int i = 0; i < 10; ++i) {
sleep(10);
System.out.println(Thread.currentThread().getName() + "-balance: " + account.getBalance());
}
}
}
private static void sleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
可以看到, 虽然在主线程里面对Account
设置的初始值, 但是到子线程里面还是默认值.
建议: 如果多个线程之间需要共享资源, 以达到线程之间通信的目的, 就使用同步机制;如果仅仅需要隔离多个线程之间的共享冲突, 则可以选择使用
ThreadLocal
.
线程通信
BlockingQueue
Java 1.5 提供了java.util.concurrent.BlockingQueue
接口, 虽然他也是Queue
的子接口, 但他的主要用途不是作为容器, 而是用作线程同步的工具 –多个线程通过交替向BlockingQueue
放入/取出数据,从而完成线程间通信的目的.
BlockingQueue
提供如下方法用于线程间通信:
Java为BlockingQueue
提供了如下实现:
ArrayBlockingQueue
: 基于数组实现的BlockingQueue
.LinkedBlockingQueue
: 基于链表实现的BlockingQueue
.SynchronousQueue
: 同步队列, 对队列的存/取操作必须交替进行.PriorityBlockingQueue
: 并不是标准的队列,其take/poll/remove
方法返回的是队列中最小元素(其大小关系可以通过实现Comparator
接口进行定制).DelayQueue
: 一个特殊的BlockingQueue
,底层基于PriorityBlockingQueue
实现.DelayQueue
要求集合元素必须实现Delayed
接口(该接口里面只有一个long getDelay(TimeUnit unit)
方法),DelayQueue
根据集合元素的getDelay()
方法的返回值进行排序.
使用BlockingQueue
实现生产者/消费者模型
/**
* @author jifang
* @since 16/1/21下午9:11.
*/
public class ConsumerProducer {
private static final int CORE_POOL_SIZE = 5;
private static final int MAX_POOL_SIZE = 20;
private static final int KEEP_ALIVE_TIME = 20;
public static void main(String[] args) {
BlockingQueue<String> queue = getQueue();
Producer producer = new Producer(queue, 1000);
Consumer consumer = new Consumer(queue);
ExecutorService pool = getThreadPool();
pool.submit(producer);
pool.submit(consumer);
pool.submit(consumer);
}
private static BlockingQueue<String> getQueue() {
return new ArrayBlockingQueue<>(10);
}
private static ExecutorService getThreadPool() {
return new ThreadPoolExecutor(CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>());
}
private static class Producer implements Runnable {
private BlockingQueue<String> queue;
private int count;
public Producer(BlockingQueue<String> queue, int count) {
this.queue = queue;
this.count = count;
}
@Override
public void run() {
Random random = new Random();
while (count-- != 0) {
try {
Thread.sleep(100);
queue.put("product " + random.nextInt(100));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}
private static class Consumer implements Runnable {
private BlockingQueue<String> queue;
public Consumer(BlockingQueue<String> queue) {
this.queue = queue;
}
@Override
public void run() {
while (true) {
try {
String product = queue.take();
System.out.println(Thread.currentThread().getName() + " " + product);
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
Disruptor
最近师兄技术分享提到了并发框架Disruptor,原因是最近在做数据迁移,需要多个线程之间进行数据通信,将原来老库中的数据读出,中间处理一道之后写入新库.第一次听到它不免惊讶一番(竟然还有这么牛逼的东西(⊙﹏⊙)),于是就在网上找了几篇文章,了解了一下他的应用场景/使用方法和基本原理,现炒现卖整理如下,详细可参考我列在后面的参考文档.
Disruptor入门示例
- 事件
事件(Event)就是通过 Disruptor 进行交换的数据类型
/**
* @author jifang
* @since 16/1/22 下午8:49.
*/
public class IntegerEvent {
private int value;
public int getValue() {
return value;
}
public void setValue(int value) {
this.value = value;
}
}
- 事件工厂
事件工厂(EventFactory)定义了如何初始化Disruptor的环形缓冲区RingBuffer
/**
* 用来产生Event事件(填充默认的buffer)
*
* @author jifang
* @since 16/1/22 下午8:50.
*/
public class IntegerEventFactory implements EventFactory<IntegerEvent> {
@Override
public IntegerEvent newInstance() {
return new IntegerEvent();
}
}
- 事件消费者
定义了消费者线程该如何处理事件Event
public class IntegerConsumer implements EventHandler<IntegerEvent> {
@Override
public void onEvent(IntegerEvent event, long sequence, boolean endOfBatch) throws Exception {
System.out.println("sequence: " + sequence);
System.out.println("endOfBatch: " + endOfBatch);
System.out.println("event-value: " + event.getValue());
System.out.println("\t" + Thread.currentThread().getName());
}
}
- 事件生产者
定义了生产者向RingBuffer
push哪些数据,Disruptor的事件发布过程分为三个阶段:
- 从
RingBuffer
中获取下一个可以写入的序列号sequence
; - 获取
sequence
对应的事件对象Event
,将数据写入Event
; - 将填充好的
Event
提交到RingBuffer
.
- 从
public class IntegerProducer {
private RingBuffer<IntegerEvent> buffer;
public IntegerProducer(RingBuffer<IntegerEvent> buffer) {
this.buffer = buffer;
}
public void pushData(int value) {
long sequence = buffer.next();
try {
//该event从buffer中拿出(当环形buffer填满时, 你能看到event里面的值)
IntegerEvent event = buffer.get(sequence);
event.setValue(value);
} finally {
buffer.publish(sequence);
}
}
}
注意:
buffer.publish(sequence)
需放在finally
中以确保一定会得到调用.如果某个请求的sequence
未被提交,将会阻塞当前/其他Producer发布事件.
附: Disruptor还提供另一种发布事件的形式,简化以上操作并确保publish()
总是得到调用:
public class IntegerProducer {
private RingBuffer<IntegerEvent> buffer;
public IntegerProducer(RingBuffer<IntegerEvent> buffer) {
this.buffer = buffer;
}
private EventTranslatorOneArg<IntegerEvent, Integer> translator = new EventTranslatorOneArg<IntegerEvent, Integer>() {
@Override
public void translateTo(IntegerEvent event, long sequence, Integer value) {
event.setValue(value);
}
};
public void pushData(int value) {
buffer.publishEvent(translator, value);
}
}
Disruptor要求
publish()
必须得到调用(即使发生异常也要调用),那么就需要调用者在事件处理时判断事件携带的数据是否是正确/完整的,而不是由Disruptor来保证.
- 启动Disruptor
public class Client {
private IntegerEventFactory factory;
private int ringBufferSize;
private ExecutorService executorService;
@Before
public void setUp() throws Exception {
factory = new IntegerEventFactory();
ringBufferSize = 8;
executorService = new ThreadPoolExecutor(5, 20, 20, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(20));
}
@Test
public void client() {
Disruptor<IntegerEvent> disruptor = new Disruptor<>(factory, ringBufferSize, executorService);
// 关联消费者
disruptor.handleEventsWith(new IntegerConsumer());
// 子线程从buffer消费数据
disruptor.start();
// 主线程向buffer生产数据
IntegerProducer producer = new IntegerProducer(disruptor.getRingBuffer());
for (int i = 0; i < 100; ++i) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
producer.pushData(i);
}
disruptor.shutdown();
executorService.shutdown();
}
}
Disruptor选项
Disruptor可以根据不同的软件/硬件来调整选项以获得更高的性能. 其基本的选项有两个:单生产者模式设定和可选的等待策略.
单生产者模式
在并发系统中提高性能的方式之一就是
单一写者原则
:如果你的代码中仅有一个事件生产者,那么可以在生成Disruptor时将其设置为单一生产者模式来提高系统的性能.可选的等待策略
Disruptor定义了com.lmax.disruptor.WaitStrategy
接口用于抽象Consumer
如何等待新事件,其默认策略是BlockingWaitStrategy
public static <E> RingBuffer<E> createMultiProducer(EventFactory<E> factory, int bufferSize)
{
return createMultiProducer(factory, bufferSize, new BlockingWaitStrategy());
}
这个策略的内部使用一个锁Lock
和条件变量Condition
来控制线程的执行和等待.因此是最慢的等待策略(但也是CPU使用率最低和最稳定的策略).但Disruptor提供了根据不同的部署环境调整等待的方法(与单生产者模式一样, 也是需要在构建Disruptor
实例时指定), Disruptor提供了如下几种常用策略.
策略 | 描述 |
---|---|
SleepingWaitStrategy |
SleepingWaitStrategy 方式是循环等待并且在循环中间调用LockSupport.parkNanos() 来睡眠.它的优点在于生产线程只需要计数,而不执行任何指令;并且没有条件变量的消耗,因此和BlockingWaitStrategy 一样,SleepingWaitStrategy 的CPU使用率也比较低.但是,事件对象从生产者到消费者传递的延迟变大了.因此最好用在不需要低延迟,而事件发布对于生产者的影响比较小的情况下,如异步日志. |
YieldingWaitStrategy |
YieldingWaitStrategy 是可以被用在低延迟系统中的两个策略之一,这种策略在减低系统延迟的同时也会增加CPU运算量.该策略会循环等待sequence 增加到合适值(循环中调用Thread.yield() 以允许其他准备好的线程执行);如果需要高性能而且事件消费者线程比逻辑内核少,推荐使用YieldingWaitStrategy 策略,如:在开启超线程时. |
BusySpinWaitStrategy |
BusySpinWaitStrategy 是性能最高的等待策略,同时也是对部署环境要求最高的策略(CPU空转等待).这个策略最好用在要求极高性能且事件处理线数小于CPU逻辑核心数的场景中,如在禁用超线程技术时. |
我们指定单生产者模式和BusySpinWaitStrategy
来启动上例:
public class Client {
// ...
@Test
public void client() {
// 指定单生产者模式,与BusySpinWaitStrategy策略
Disruptor<IntegerEvent> disruptor = new Disruptor<>(factory, ringBufferSize, executorService, ProducerType.SINGLE, new BusySpinWaitStrategy());
//...
}
}
当程序跑起来之后,可以查看机器的top输出,直观的感受一下Disruptor是多么消耗CPU…
线程安全集合
包装不安全集合
平常使用的ArrayList
LikedList
HashMap
HashSet
LinkedHashMap
LinkedHashSet
TreeMap
TreeSet
等java.util
包下的集合(除HashTable
Vactor
除外)都是线程不安全的, 当多个线程并发向这些集合中存/取数据时, 就可能会破获这些集合的数据完整性.
因此java.util.Collections
类提供了一些工具方法来将这些线程不安全的集合包装成线程安全的集合.
方法 | 释义 |
---|---|
static <T> Collection<T> synchronizedCollection(Collection<T> c) |
Returns a synchronized (thread-safe) collection backed by the specified collection. |
static <T> List<T> synchronizedList(List<T> list) |
Returns a synchronized (thread-safe) list backed by the specified list. |
static <K,V> Map<K,V> synchronizedMap(Map<K,V> m) |
Returns a synchronized (thread-safe) map backed by the specified map. |
static <T> Set<T> synchronizedSet(Set<T> s) |
Returns a synchronized (thread-safe) set backed by the specified set. |
static <K,V> SortedMap<K,V> synchronizedSortedMap(SortedMap<K,V> m) |
Returns a synchronized (thread-safe) sorted map backed by the specified sorted map. |
static <T> SortedSet<T> synchronizedSortedSet(SortedSet<T> s) |
Returns a synchronized (thread-safe) sorted set backed by the specified sorted set. |
/**
* @author jifang
* @since 16/1/23 下午9:07.
*/
public class ListClient {
private static List<Integer> list = Collections.synchronizedList(new ArrayList<Integer>());
private static Runnable runnable = new Runnable() {
@Override
public void run() {
for (int i = 0; i < 100; ++i) {
SleepUtil.sleep(10);
list.add(i);
}
}
};
public static void main(String[] args) {
new Thread(runnable).start();
new Thread(runnable).start();
}
}
注意: 如果需要将某个集合包装成线程安全集合, 需要在创建之后立即包装.
线程安全集合
从1.5开始, Java在java.util.concurrent
包下提供了大量支持高效并发访问的接口和实现.
从UML类图可以看出,线程安全的集合类可分为以Concurrent开头和以CopyOnWrite开头的两类.
以Concurrent开头的集合代表了支持并发访问的集合, 他们可以支持多个线程并发读/写, 这些写入操作保证是线程安全的,而读取操作不必锁定, 因此在并发写入时有较好的性能.
ConcurrentLinkedQueue
实现了多线程的高效访问, 访问时无需等待, 因此当多个线程共享访问一个公共集合时,ConcurrentLinkedQueue
是一个较好的选择(不允许使用null
元素).ConcurrentHashMap
默认支持16个线程并发写入(可以通过concurrencyLevel
构造参数修改), 当线程数超过concurrencyLevel
时, 可能有一些线程需要等待.
由于
ConcurrentLinkedQueue
ConcurrentHashMap
支持并发访问, 所以当使用迭代器来遍历集合时, 该迭代器可能不能反映出创建迭代器之后所做的修改, 但程序不会抛出异常.
以CopyOnWrite开头的集合采用了写时复制技术.
CopyOnWriteArrayList
采用底层复制数组的方式来实现写入操作: 当线程执行读操作时, 线程会直接读取集合本身, 无须加锁和阻塞;但当线程对CopyOnWriteArrayList
集合执行写入操作(add
/remove
/set
)时, 该集合会在底层复制一份新数组, 然后对新数组执行写入操作.由于对CopyOnWriteArrayList
的写入是针对副本执行, 因此它是线程安全的.
注意: 由于
CopyOnWriteArrayList
的写入操作需要频繁的复制数组,因此写入性能较差;但由于读操作不用加锁(不是同一个数组),因此读操作非常快. 因此,CopyOnWriteArrayList
适合在读操作远远大于写操作的场景中, 如缓存.
- 由于
CopyOnWriteArraySet
底层封装的是CopyOnWriteArrayList
, 因此他的实现机制完全类似于CopyOnWriteArrayList
.
未处理异常
从1.5开始, Java增强了线程的异常处理功能,如果线程执行过程中抛出了一个未处理异常,JVM会在结束该线程之前查找是否有对应的java.lang.Thread.UncaughtExceptionHandler
对象,如果找到了Handler
对象, 则会调用该对象的void uncaughtException(Thread thread, Throwable exception)
方法来处理该异常. Thread
提供了如下两个方法来设置异常处理器:
方法 | 释义 |
---|---|
static void setDefaultUncaughtExceptionHandler(Thread.UncaughtExceptionHandler eh) |
Set the default handler invoked when a thread abruptly terminates due to an uncaught exception, and no other handler has been defined for that thread. |
void setUncaughtExceptionHandler(Thread.UncaughtExceptionHandler eh) |
Set the handler invoked when this thread abruptly terminates due to an uncaught exception. |
/**
* @author jifang
* @since 16/1/23 下午5:50.
*/
public class Main {
private static Thread.UncaughtExceptionHandler handler = new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
System.out.println(t.getName());
e.printStackTrace();
}
};
public static void main(String[] args) {
Thread.setDefaultUncaughtExceptionHandler(handler);
int a = 5 / 0;
System.out.println("线程正常结束, a=" + a);
}
}
- 参考:
- Disruptor 极速体验
- Disruptor入门
- 并发框架Disruptor译文
- Linux多线程实践(9) –简单线程池的设计与实现
- Linux多线程实践(8) –Posix条件变量解决生产者消费者问题
- Linux多线程实践(7) –多线程排序对比
- Linux多线程实践(4) –线程特定数据
- Linux多线程实践(1) –线程理论
- Linux IPC实践(7) –Posix消息队列
- 进程同步的基本概念:临界资源、同步和互斥
- 疯狂Java讲义
- Effective java(第2版)