1.Unsafe类
通常我们最好也不要使用Unsafe类,除非有明确的目的,并且也要对它有深入的了解才行。要想使用Unsafe类需要用一些比较tricky的办法。Unsafe类使用了单例模式,需要通过一个静态方法getUnsafe()来获取。但Unsafe类做了限制,如果是普通的调用的话,它会抛出一个SecurityException异常;只有由主类加载器加载的类才能调用这个方法。其源码如下:
public static Unsafe getUnsafe() { Class var0 = Reflection.getCallerClass(); if(!VM.isSystemDomainLoader(var0.getClassLoader())) { throw new SecurityException("Unsafe"); } else { return theUnsafe; } }
获取到Unsafe实例之后,我们就可以为所欲为了。Unsafe类提供了以下这些功能:
1.1、内存管理。包括分配内存、释放内存等。
该部分包括了allocateMemory(分配内存)、reallocateMemory(重新分配内存)、copyMemory(拷贝内存)、freeMemory(释放内存 )、getAddress(获取内存地址)、addressSize、pageSize、getInt(获取内存地址指向的整数)、getIntVolatile(获取内存地址指向的整数,并支持volatile语义)、putInt(将整数写入指定内存地址)、putIntVolatile(将整数写入指定内存地址,并支持volatile语义)、putOrderedInt(将整数写入指定内存地址、有序或者有延迟的方法)等方法。getXXX和putXXX包含了各种基本类型的操作。
利用copyMemory方法,我们可以实现一个通用的对象拷贝方法,无需再对每一个对象都实现clone方法,当然这通用的方法只能做到对象浅拷贝。
1.2、数组操作。
这部分包括了arrayBaseOffset(获取数组第一个元素的偏移地址)、arrayIndexScale(获取数组中元素的增量地址)等方法。arrayBaseOffset与arrayIndexScale配合起来使用,就可以定位数组中每个元素在内存中的位置。
由于Java的数组最大值为Integer.MAX_VALUE,使用Unsafe类的内存分配方法可以实现超大数组。实际上这样的数据就可以认为是C数组,因此需要注意在合适的时间释放内存。
1.3、多线程同步。包括锁机制、CAS操作等。
这部分包括了monitorEnter、tryMonitorEnter、monitorExit、compareAndSwapInt、compareAndSwap等方法。
其中monitorEnter、tryMonitorEnter、monitorExit已经被标记为deprecated,不建议使用。
Unsafe类的CAS操作可能是用的最多的,它为Java的锁机制提供了一种新的解决办法,比如AtomicInteger等类都是通过该方法来实现的。compareAndSwap方法是原子的,可以避免繁重的锁机制,提高代码效率。这是一种乐观锁,通常认为在大部分情况下不出现竞态条件,如果操作失败,会不断重试直到成功。
1.4、挂起与恢复。
这部分包括了park、unpark等方法。
将一个线程进行挂起是通过park方法实现的,调用 park后,线程将一直阻塞直到超时或者中断等条件出现。unpark可以终止一个挂起的线程,使其恢复正常。整个并发框架中对线程的挂起操作被封装在 LockSupport类中,LockSupport类中有各种版本pack方法,但最终都调用了Unsafe.park()方法。
1.5、内存屏障。
这部分包括了loadFence、storeFence、fullFence等方法。这是在Java 8新引入的,用于定义内存屏障,避免代码重排序。
loadFence() 表示该方法之前的所有load操作在内存屏障之前完成。同理storeFence()表示该方法之前的所有store操作在内存屏障之前完成。fullFence()表示该方法之前的所有load、store操作在内存屏障之前完成。
2.Exchanger类
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * 如果第一个线程先执行exchange方法,它会一直等待第二个线程也执行exchange,当两个线程都到达同步点时, * 这两个线程就可以交换数据,将本线程生产出来的数据传递给对方 * @author lzhcode * */ public class TestExchanger { public static void main(String[] args) { ExecutorService service = Executors.newCachedThreadPool(); final Exchanger exchanger = new Exchanger(); service.execute(new Runnable(){ public void run() { try { String data1 = "zxx"; System.out.println("线程" + Thread.currentThread().getName() + "正在把数据" + data1 +"换出去"); Thread.sleep((long)(Math.random()*10000)); String data2 = (String)exchanger.exchange(data1); System.out.println("线程" + Thread.currentThread().getName() + "换回的数据为" + data2); }catch(Exception e){ } } }); service.execute(new Runnable(){ public void run() { try { String data1 = "lhm"; System.out.println("线程" + Thread.currentThread().getName() + "正在把数据" + data1 +"换出去"); Thread.sleep((long)(Math.random()*10000)); String data2 = (String)exchanger.exchange(data1); System.out.println("线程" + Thread.currentThread().getName() + "换回的数据为" + data2); }catch(Exception e){ } } }); } }
3.ForkJoin 框架
在日常的业务需求中,经常出现的批量查询,批量写入等接口的提供,一般来说,最简单最low的方式就是写一个for循环来一次执行,但是当业务方对接口的性能要求较高时,就比较尴尬了
通常可以想到的方式是采用并发操作,首先想到可以实现的方式就是利用线程池来做
通常实现方式如下
// 1. 创建线程池 ExecutorService executorService = new ThreadPoolExecutor(3, 5, 60, TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>(10), new DefaultThreadFactory("biz-exec"), new ThreadPoolExecutor.CallerRunsPolicy()); // 2. 创建执行任务 List<Future<Object>> futureList = new ArrayList<>(); for(Object arg : list) { futureList.add(executorService.submit(new Callable<Object>() { @Override public Object call() throws Exception { // xxx } })); } // 3. 结果获取 for(Future f: futureList) { Object obj = f.get(); }
用上面的这种方式并没有什么问题,我们接下来考虑的是如何使用ForkJoin框架来实现类似的功能
任务分割
ForkJoinTask
: 基本任务,使用forkjoin框架必须创建的对象,提供fork,join操作,常用的两个子类
RecursiveAction
: 无结果返回的任务RecursiveTask
: 有返回结果的任务
说明:
fork
: 让task异步执行join
: 让task同步执行,可以获取返回值- ForkJoinTask 在不显示使用ForkJoinPool.execute/invoke/submit()方法进行执行的情况下,也可以使用自己的fork/invoke方法进行执行
结果合并
ForkJoinPool
执行 ForkJoinTask
,
- 任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。
- 当一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部获取一个任务
三中提交方式:
execute
异步,无返回结果submit
异步,有返回结果 (返回Future<T>
)invoke
同步,有返回结果 (会阻塞)
public class CountTask extends RecursiveTask<Integer> { private int start; private int end; private static final int THRED_HOLD = 30; public CountTask(int start, int end) { this.start = start; this.end = end; } @Override protected Integer compute() { int sum = 0; boolean canCompute = (end - start) <= THRED_HOLD; if (canCompute) { // 不需要拆分 for (int i = start; i <= end; i++) { sum += i; } System.out.println("thread: " + Thread.currentThread() + " start: " + start + " end: " + end); } else { int mid = (end + start) / 2; CountTask left = new CountTask(start, mid); CountTask right = new CountTask(mid + 1, end); left.fork(); right.fork(); sum = left.join() + right.join(); } return sum; } }
@Test public void testFork() throws ExecutionException, InterruptedException { int start = 0; int end = 200; CountTask task = new CountTask(start, end); ForkJoinPool pool = ForkJoinPool.commonPool(); Future<Integer> ans = pool.submit(task); int sum = ans.get(); System.out.println(sum); }
4.LockSupport
特点:
1、基于UnSafe原语
实现
LockSupport类在jdk源码中基本定义就是创建锁和其他同步类的 基本线程阻塞,直接与UnSafe原语
类打交道
2、重入性
LockSupport是非重入锁,如果一个线程连续2次调用 LockSupport .park(),那么该线程一定会一直阻塞下去。
3、面向线程锁
面向线程锁是LockSupport很重要的一个特征,这样也就没有公平锁和非公平的区别了的,同时面向线程锁的特征在一定程度上降低代码的耦合度。
LockSupport比Object的wait/notify有两大优势:
①LockSupport不需要在同步代码块里 。所以线程间也不需要维护一个共享的同步对象了,实现了线程间的解耦。
②unpark函数可以先于park调用,所以不需要担心线程间的执行的先后顺序。
LockSupport在Java的工具类用应用很广泛,咱们这里找几个例子感受感受。以Java里最常用的类ThreadPoolExecutor为例。先看如下代
public class TestObjWait { public static void main(String[] args)throws Exception { ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(1000); ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(5,5,1000, TimeUnit.SECONDS,queue); Future<String> future = poolExecutor.submit(new Callable<String>() { @Override public String call() throws Exception { TimeUnit.SECONDS.sleep(5); return "hello"; } }); String result = future.get(); System.out.println(result); } }
代码中我们向线程池中扔了一个任务,然后调用Future的get方法,同步阻塞等待线程池的执行结果。
这里就要问了:get方法是如何组塞住当前线程?线程池执行完任务后又是如何唤醒线程的呢?
咱们跟着源码一步步分析,先看线程池的submit方法的实现:
在submit方法里,线程池将我们提交的基于Callable实现的任务,封装为基于RunnableFuture实现的任务,然后将任务提交到线程池执行,并向当前线程返回RunnableFutrue。
进入newTaskFor方法,就一句话:return new FutureTask<T>(callable);
所以,咱们主线程调用future的get方法就是FutureTask的get方法,线程池执行的任务对象也是FutureTask的实例。
接下来看看FutureTask的get方法的实现:
比较简单,就是判断下当前任务是否执行完毕,如果执行完毕直接返回任务结果,否则进入awaitDone方法阻塞等待。
awaitDone方法里,首先会用到上节讲到的cas操作,将线程封装为WaitNode,保持下来,以供后续唤醒线程时用。再就是调用了LockSupport的park/parkNanos组塞住当前线程。
上边已经说完了阻塞等待任务结果的逻辑,接下来再看看线程池执行完任务,唤醒等待线程的逻辑实现。
前边说了,咱们提交的基于Callable实现的任务,已经被封装为FutureTask任务提交给了线程池执行,任务的执行就是FutureTask的run方法执行。如下是FutureTask的run方法:
c.call()就是执行我们提交的任务,任务执行完后调用了set方法,进入set方法发现set方法调用了finishCompletion方法,想必唤醒线程的工作就在这里边了,看看代码实现吧:
总结:
1.Executor就是Runnable和Callable的调度容器,Future就是对于具体的Runnable或者Callable任务的执行结果进行取消、查询是否完成、获取结果、设置结果操作。get方法会阻塞,直到任务返回结果
2.FutureTask是Future也是Runnable,又是包装了的Callable( 如果是Runnable最终也会被转换为Callable )
3.线程池里的submit返回FutureTask,而FutureTask的get方法里调用了LockSupport的park和unpark
参考文章
Java线程——Callable与Runnable的区别_sinat_39634657的博客-CSDN博客_callable和runnable的区别
5.自定义线程池
@Bean(destroyMethod="shutdown") public ThreadPoolTaskExecutor defaultThreadPool() { // CPU可用核心数 int cpuNum = Runtime.getRuntime().availableProcessors(); //1.IO密集型 估算线程池大小 //线程数 = CPU可用核心数/(1-阻塞系数) //计算密集型任务的阻塞系数为0,而IO密集型任务的阻塞系数则接近于1。一个完全阻塞的任务是注 //定要挂掉的,所以我们无须担心阻塞系数会达到1。 //阻塞系数可以采用一些性能分析工具或java.lang.managenment API来确定线程话在系统I/O操作 //上的时间与CPU密集任务所消耗的时间比值。 //2.计算密集型估算线程池大小threadNum = cpuNum +1 或则 计算密集型估算线程池大小threadNum = cpuNum * 2; int threadNum = cpuNum * 2; ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); //核心线程数目 executor.setCorePoolSize(threadNum-1); //指定最大线程数 executor.setMaxPoolSize(threadNum); //队列中最大的数目 executor.setQueueCapacity(300); executor.setThreadFactory(new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); //如果为守护线程,恢复为常规 if(t.isDaemon()) { t.setDaemon(false); } //如果有设置线程优先级,恢复为常规 if(Thread.NORM_PRIORITY != t.getPriority()) { t.setPriority(Thread.NORM_PRIORITY); } return t; } }); //线程名称前缀 executor.setThreadNamePrefix("defaultThreadPool_"); //rejection-policy:当pool已经达到max size的时候,如何处理新任务 //CALLER_RUNS:不在新线程中执行任务,而是由调用者所在的线程来执行 //对拒绝task的处理策略 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //线程空闲后的最大存活时间 executor.setKeepAliveSeconds(60); //加载 executor.initialize(); return executor; }