1.共享模型之内存
1.1 指令级并行原理
1.1.1 基础概念
Clock Cycle Time:
主频的概念大家接触的比较多,而 CPU 的 Clock Cycle Time(时钟周期时间),等于主频的倒数,意思是 CPU 能够识别的最小时间单位,比如说 4G 主频的 CPU 的 Clock Cycle Time 就是 0.25 ns,作为对比,我们墙上挂钟的Cycle Time 是 1s。例如,运行一条加法指令一般需要一个时钟周期时间。
CPI:
表示一条指令平均执行时间。
IPC:
IPC(Instruction Per Clock Cycle) 即 CPI 的倒数,表示每个时钟周期能够运行的指令数
CPU执行时间:
程序的 CPU 执行时间,即我们前面提到的 user + system 时间,可以用下面的公式来表示:
程序 CPU 执行时间 = 指令数 * CPI * Clock Cycle Time
1.1.2 指令重排序优化
现代处理器会设计为一个时钟周期完成一条执行时间最长的 CPU 指令。为什么这么做呢?可以想到指令还可以再划分成一个个更小的阶段,例如,每条指令都可以分为: 取指令 - 指令译码 - 执行指令 - 内存访问 - 数据 写回 这 5 个阶段。
编辑
在不改变程序结果的前提下,这些指令的各个阶段可以通过重排序和组合来实现指令级并行,这一技术在 80's 中叶到 90's 中叶占据了计算架构的重要地位。指令重排的前提是,重排指令不能影响结果。
1.1.3 支持流水线的处理器
现代 CPU 支持多级指令流水线,例如支持同时执行 取指令 - 指令译码 - 执行指令 - 内存访问 - 数据写回的处理器,就可以称之为五级指令流水线。这时 CPU 可以在一个时钟周期内,同时运行五条指令的不同阶段(相当于一条执行时间最长的复杂指令),IPC = 1,本质上,流水线技术并不能缩短单条指令的执行时间,但它变相地提高了指令地吞吐率。
编辑
1.1.4 SuperScalar 处理器
大多数处理器包含多个执行单元,并不是所有计算功能都集中在一起,可以再细分为整数运算单元、浮点数运算单元等,这样可以把多条指令也可以做到并行获取、译码等,CPU 可以在一个时钟周期内,执行多于一条指令,IPC > 1。
编辑
1.2 CPU缓存结构原理
1.2.1 CPU缓存结构
编辑
cpu 拿到的内存地址格式是这样的:[高位组标记][低位索引][偏移量]
编辑
1.2.2 CPU缓存读
首先根据低位,计算在缓存中的索引,之后判断缓存是否有效。低位如果为0,去内存读取最新数据更新缓存行,为1则再对比高位组标记是否一致,如果一致,根据偏移量返回缓存数据,不一致,则去内存读取新数据并更新缓存行。
1.2.3 CPU缓存一致性
MESI 是多核 CPU 缓存一致性协议,用来保证多个核心看到的同一份内存数据是一致的。它给每个缓存行(Cache Line)标记 4 种状态:
- M (Modified, 修改):数据只在我这,且被改过,和内存不一样
- E (Exclusive, 独占):数据只在我这,和内存一致,没人跟我抢
- S (Shared, 共享):数据在多个核心里都有,和内存一致
- I (Invalid, 无效):我的这份数据已经过期了,不能用
读请求(CPU 要读数据)
- M/E/S 状态:直接读自己缓存里的数据就行 ✅
- I 状态:自己缓存无效,必须去主存读新数据(规则 7)
写请求(CPU 要改数据)
- E 状态:直接改,状态变成 M,不用立刻写回内存(规则 2)
- 因为现在只有我有这份数据,改了也没人冲突
- S 状态:不能直接改!要先让其他所有核心的这份数据变成 I(无效),自己再改成 M(规则 5)
- M 状态:如果别的核心要读这份数据:
- 先把我改过的数据写回主存
- 让所有核心的这份数据变成 S(共享)(规则 4)
监听(Bus Snooping,核心之间 “偷听” 总线)
- E 状态:听到别的核心要读我这份数据 → 自己变成 S(共享)(规则 3)
- S 状态:听到别的核心要 invalidate(失效)我这份数据 → 自己变成 I(无效)(规则 6)
- M 状态:听到别的核心要读我这份数据 → 先写回内存,再大家都变成 S(规则 4)
编辑
编辑
- 初始:核心 1 的
000行是 E 状态(有效位 1,组标记 1,数据 6000),其他核心都是 I - 核心 2 要读
000:
- 核心 1 监听到读请求,把自己从 E → S
- 核心 2 从内存读到数据,状态变成 S
- 现在核心 1 和 2 的
000都是 S 状态,数据一致
- 核心 1 要写
000:
- 触发规则 5:先让核心 2 的
000变成 I(无效) - 核心 1 把数据改成 8000,状态变成 M
- 核心 3 要读
000:
- 触发规则 4:核心 1 先把 8000 写回主存
- 核心 1 从 M → S,核心 3 从内存读,状态变成 S
- 现在核心 1 和 3 的
000都是 S 状态,数据一致
1.2.4 内存屏障
可见性:一个线程改了变量,别的线程能不能立刻看到
有序性:CPU会不会为了优化,把指令代码的执行顺序打乱
1. 写屏障 (Store Barrier /sfence)
作用:把 “墙” 前面的所有写操作,都强制刷新到主存,并且不让指令越过这堵墙。
- 可见性:在写屏障之前对共享变量的修改,会被立刻同步到主存,保证之后其他线程能读到最新值。
- 有序性:CPU 不能把写屏障之前的写操作,重排到写屏障之后去执行。
- 就像:
写A → 写屏障 → 写B,CPU 绝不能变成写屏障 → 写A → 写B或写B → 写屏障 → 写A。
2. 读屏障 (Load Barrier /lfence)
作用:让 “墙” 后面的所有读操作,都从主存重新加载最新数据,也不让指令越过这堵墙。
- 可见性:在读屏障之后的读操作,会强制从主存加载最新数据,而不是用自己缓存里的旧数据。
- 有序性:CPU 不能把读屏障之后的读操作,重排到读屏障之前去执行。
- 就像:
读A → 读屏障 → 读B,CPU 绝不能变成读B → 读屏障 → 读A或读屏障 → 读A → 读B。
有两个线程 t1 和 t2,变量 k 是 volatile static(Java 里 volatile 就自带读写屏障):
- t1 线程写数据:
t1先写static i、static j- 然后写
volatile static k,这个写操作自带写屏障 - 写屏障生效:把
i、j、k的修改都刷新到主存 - 有序性保证:
写i → 写j → 写k(带屏障)的顺序不会被打乱
- t2 线程读数据:
t2先读volatile static k,这个读操作自带读屏障- 读屏障生效:强制让
t2从主存重新加载i、j、k - 所以
t2接下来读i、j时,拿到的就是t1刚写完的最新值 ✅ - 有序性保证:
读k(带屏障) → 读i → 读j的顺序不会被打乱
MESI 协议 是硬件层面保证缓存一致,但:
- MESI 只保证 “缓存里的数据最终会一致”,不保证实时性
- CPU 为了性能,会乱序执行指令,可能导致 “先写的变量,后被读到”
- 编译器也可能优化代码顺序,让你写的代码和实际执行顺序不一样
读写屏障实现:CPU提供的屏障指令
x86 架构对内存模型的约束比较 “宽松”(强内存模型),大部分场景下不需要显式的读屏障,核心靠写屏障和缓存一致性(MESI)配合:
| 屏障类型 | 硬件指令 | 核心作用(对应读写屏障) |
| 写屏障 | sfence/mfence |
1. 禁止屏障前的写操作重排到屏障后;2. 强制把缓存中 “脏数据”(MESI 的 M 状态)刷到主存;3. 触发 MESI 协议的 “写回”,让其他核心的缓存失效。 |
| 读屏障 | 几乎不需要 | x86 天然保证 “读操作不会重排到读操作前”,且 MESI 协议会自动保证读的可见性;仅极端场景用 lfence。 |
1.3 Java内存模型
JMM(Java Memory Model)是JVM规范中定义的内存模型。描述了多线程下各种变量的访问规则以及将变量存储到内存和从内存中读取变量的底层细节,Java内存模型是对共享数据可见性,原子性,有序性的规则和保障。
JMM将内存分为两部分:主内存和线程工作内存。所有线程共享的变量存储到主内存,每个线程变量都有自己的工作内存,每个线程的工作内存存储该线程对共享变量的副本,线程对共享变量的读写操作都是在工作内存中完成的。不同线程之间不能直接访问对方工作内存中的变量,线程间变量的值的传递需要通过主存完成。局部变量是线程私有的,不存在竞争问题。
Java内存模型中定义了8种操作来完成主内存和线程工作内存之间的交互协议,线程的工作内存会复制主内存中的共享变量副本,在工作内存种完成对副本变量的操作,然后再将操作后的结果同步到主内存相对应的共享变量中。lock与unlock就是这8种操作中专门用于控制共享变量独占访问和同步规则的两个操作。 如果对一个共享变量执行lock操作,则标记这个共享变量是线程独占状态,其他线程对该变量执行lock操作时会被阻塞,直到当前线程执行unlock,执行lock会清空工作内存中此变量的值,使其强制从主存中读取最新值。对一个变量执行unlock操作之前,必须先把此变量同步到主内存中,保证最新值同步到主存中。
编辑
1.4 Volatile
一旦一个共享变量(类的成员变量、类的静态成员变量)被volatile修饰之后,那么就具备了两层语义:保证线程间的可见性与禁止进行指令重排序。
volatile底层实现原理是内存屏障,对volatile变量的写指令后会加入写屏障,对volatile变量的读指令前会加入读屏障。
1.4.1 保证线程间的可见性
一个线程修改了某个变量的值,这个新值对于其他线程来说是立即可见的,volatile关键字会强制将修改的值写入主存。写屏障保证在该屏障之前对共享变量的所有改动都同步到主存中,读屏障保证在该屏障之后对共享变量的读取,加载的都是主存中的最新数据。
1.4.2 保证有序性
指令重排序:编译器为了优化性能,会重排代码的执行顺序。
volatile关键字修饰共享变量可以禁止指令重排序,保证代码执行的有序性。用volatile修饰共享变量会在读写共享变量时加入不同的屏障,阻止其他读写操作越过屏障,从而达到阻止重排序的效果。写屏障会确保指令重排序时,不会将写屏障之前的代码排在写屏障之后,读屏障会确保指令重排序时,不会将读屏障之后的代码排到读屏障之前。通过插入内存屏障禁止在内存屏障前后的指令执行重排序优化。
2.共享模型之无锁/不可变
2.1 CAS
CAS的全称是: Compare And Swap(比较再交换),它体现的一种乐观锁的思想,在无锁情况下保证线程操作共享数据的原子性。
在JUC( java.util.concurrent )包下实现的很多类都用到了CAS操作
- AbstractQueuedSynchronizer(AQS框架)
- AtomicXXX类
实现方式:
在JMM内存模型中,每个线程的工作内存在操作共享变量时会在自己的工作内存中复制一份副本,主存中的共享变量存储当前的内存值,每个线程的工作内存中存储复制时的变量旧内存值和即将要更新出来的新值,当且仅当副本的旧内存值和主存中共享变量的当前内存值相等时,副本变量才会修改数据并将数据同步到主存中。如果当前副本的旧内存值和主存中的共享变量当前值不相等的话,就开始自旋操作,该线程变量重新复制主存中的最新数据值,然后重新进行比较和更新操作,这样一直循环进入自旋操作,直到成功更新内容。因为没有加锁,所以线程不会陷入阻塞,效率较高,如果竞争频繁,重试频繁发生,效率会受影响。
底层实现:
CAS底层依赖于一个Unsafe类来直接调用OS底层的CAS指令
编辑
都是native修饰的方法,由系统提供的接口执行,并非java代码实现,一般的思路也都是自旋锁实现。在java中比较常见使用有很多,比如ReentrantLock和Atomic开头的线程安全类,都调用了Unsafe中的方法。
- ReentrantLock中的一段CAS代码
编辑
乐观锁和悲观锁:
乐观锁认为线程安全问题不一定会发生,认为多个事务可以顺利并发执行,只有在提交事务时才检查是否发生冲突,一般会基于版本号来解决,在更新数据时,会检查当前版本号和读取之前的版本号是否一致,如果一致则正常,如果不一致则表面的数据被其他事务修改过。
悲观锁:假设线程安全问题很有可能会发生,在每次数据访问之前先对数据进行锁定,防止其他事务对该数据进行修改。
2.2 日期转化问题
2.2.1 问题
下面的代码在运行时,由于 SimpleDateFormat 不是线程安全的,有很大几率出现 java.lang.NumberFormatException 或者出现不正确的日期解析结果。
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); for (int i = 0; i < 10; i++) { new Thread(() -> { try { log.debug("{}", sdf.parse("1951-04-21")); } catch (Exception e) { log.error("{}", e); } }).start(); }
2.2.2 解决方案
使用同步锁:
这样虽能解决问题,但带来的是性能上的损失,并不算很好:
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); for (int i = 0; i < 50; i++) { new Thread(() -> { synchronized (sdf) { try { log.debug("{}", sdf.parse("1951-04-21")); } catch (Exception e) { log.error("{}", e); } } }).start(); }
不可变:
如果一个对象在不能够修改其内部状态(属性),那么它就是线程安全的,因为不存在并发修改啊!这样的对象在Java 中有很多,例如在 Java 8 后,提供了一个新的日期格式化类:
DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd"); for (int i = 0; i < 10; i++) { new Thread(() -> { LocalDate date = dtf.parse("2018-10-01", LocalDate::from); log.debug("{}", date); }).start(); }
不可变设计:
另一个大家更为熟悉的 String 类也是不可变的,以它为例,说明一下不可变设计的要素
public final class String implements java.io.Serializable, Comparable<String>, CharSequence { /** The value is used for character storage. */ private final char value[]; /** Cache the hash code for the string */ private int hash; // Default to 0 // ... }
发现该类、类中所有属性都是 final 的,属性用 final 修饰保证了该属性是只读的,不能修改类用 final 修饰保证了该类中的方法不能被覆盖,防止子类无意间破坏不可变性。
0: aload_0 1: invokespecial #1 // Method java/lang/Object."<init>":()V 4: aload_0 5: bipush 20 7: putfield #2 // Field a:I <-- 写屏障 10: return
发现 final 变量的赋值也会通过 putfield 指令来完成,同样在这条指令之后也会加入写屏障,保证在其它线程读到它的值时不会出现为 0 的情况。
保护性拷贝:
使用字符串时,也有一些跟修改相关的方法啊,比如 substring 等,那么下面就看一看这些方法是
如何实现的,就以 substring 为例:
发现其内部是调用 String 的构造方法创建了一个新字符串,再进入这个构造看看,是否对 final char[] value 做出了修改,结果发现也没有,构造新字符串对象时,会生成新的 char[] value,对内容进行复制 。这种通过创建副本对象来避免共享的手段称之为【保护性拷贝(defensive copy)】
3.共享模型之工具
3.1 线程池
线程池是Java并发编程中管理线程资源的重要工具,它通过复用已创建的线程,减少线程创建和销毁的开销,提高系统性能和稳定性。
3.1.1 线程池核心参数
线程池核心参数主要参考ThreadPoolExecutor这个类的7个参数的构造函数
编辑
- corePoolSize 核心线程数目
- maximumPoolSize 最大线程数目 = (核心线程+救急线程的最大数目)
- keepAliveTime 生存时间 - 救急线程的生存时间,生存时间内没有新任务,此线程资源会释放
- unit 时间单位 - 救急线程的生存时间单位,如秒、毫秒等
- workQueue - 当没有空闲核心线程时,新来任务会加入到此队列排队,队列满会创建救急线程执行任务
- threadFactory 线程工厂 - 可以定制线程对象的创建,例如设置线程名字、是否是守护线程等
- handler 拒绝策略 - 当所有线程都在繁忙,workQueue 也放满时,会触发拒绝策略
3.1.2 工作流程
1,任务在提交的时候,首先判断核心线程数是否已满,如果没有满则直接添加到工作线程执行
2,如果核心线程数满了,则判断阻塞队列是否已满,如果没有满,当前任务存入阻塞队列
3,如果阻塞队列也满了,则判断线程数是否小于最大线程数,如果满足条件,则使用临时线程执行任务
如果核心或临时线程执行完成任务后会检查阻塞队列中是否有需要执行的线程,如果有,则使用非核心线程执行任务
4,如果所有线程都在忙着(核心线程+临时线程),则走拒绝策略
拒绝策略:
1.AbortPolicy:直接抛出异常,默认策略;
2.CallerRunsPolicy:用调用者所在的线程来执行任务;
3.DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
4.DiscardPolicy:直接丢弃任务;
参考代码:
public class TestThreadPoolExecutor { static class MyTask implements Runnable { private final String name; private final long duration; public MyTask(String name) { this(name, 0); } public MyTask(String name, long duration) { this.name = name; this.duration = duration; } @Override public void run() { try { LoggerUtils.get("myThread").debug("running..." + this); Thread.sleep(duration); } catch (InterruptedException e) { e.printStackTrace(); } } @Override public String toString() { return "MyTask(" + name + ")"; } } public static void main(String[] args) throws InterruptedException { AtomicInteger c = new AtomicInteger(1); ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(2); ThreadPoolExecutor threadPool = new ThreadPoolExecutor( 2, 3, 0, TimeUnit.MILLISECONDS, queue, r -> new Thread(r, "myThread" + c.getAndIncrement()), new ThreadPoolExecutor.AbortPolicy()); showState(queue, threadPool); threadPool.submit(new MyTask("1", 3600000)); showState(queue, threadPool); threadPool.submit(new MyTask("2", 3600000)); showState(queue, threadPool); threadPool.submit(new MyTask("3")); showState(queue, threadPool); threadPool.submit(new MyTask("4")); showState(queue, threadPool); threadPool.submit(new MyTask("5",3600000)); showState(queue, threadPool); threadPool.submit(new MyTask("6")); showState(queue, threadPool); } private static void showState(ArrayBlockingQueue<Runnable> queue, ThreadPoolExecutor threadPool) { try { Thread.sleep(300); } catch (InterruptedException e) { e.printStackTrace(); } List<Object> tasks = new ArrayList<>(); for (Runnable runnable : queue) { try { Field callable = FutureTask.class.getDeclaredField("callable"); callable.setAccessible(true); Object adapter = callable.get(runnable); Class<?> clazz = Class.forName("java.util.concurrent.Executors$RunnableAdapter"); Field task = clazz.getDeclaredField("task"); task.setAccessible(true); Object o = task.get(adapter); tasks.add(o); } catch (Exception e) { e.printStackTrace(); } } LoggerUtils.main.debug("pool size: {}, queue: {}", threadPool.getPoolSize(), tasks); } }
3.1.3 线程池常见的阻塞队列
workQueue - 当没有空闲核心线程时,新来任务会加入到此队列排队,队列满会创建救急线程执行任务
比较常见的有4个,用的最多是ArrayBlockingQueue和LinkedBlockingQueue
1.ArrayBlockingQueue:基于数组结构的有界阻塞队列,FIFO。
2.LinkedBlockingQueue:基于链表结构的有界阻塞队列,FIFO。
3.DelayedWorkQueue :是一个优先级队列,它可以保证每次出队的任务都是当前队列中执行时间最靠前的
4.SynchronousQueue:不存储元素的阻塞队列,每个插入操作都必须等待一个移出操作。
ArrayBlockingQueue的LinkedBlockingQueue区别
| LinkedBlockingQueue | ArrayBlockingQueue |
| 默认无界,支持有界 | 强制有界 |
| 底层是链表 | 底层是数组 |
| 是懒惰的,创建节点的时候添加数据 | 提前初始化 Node 数组 |
| 入队会生成新 Node | Node需要是提前创建好的 |
| 两把锁(头尾) | 一把锁 |
左边是LinkedBlockingQueue加锁的方式,右边是ArrayBlockingQueue加锁的方式
- LinkedBlockingQueue读和写各有一把锁,性能相对较好
- ArrayBlockingQueue只有一把锁,读和写公用,性能相对于LinkedBlockingQueue差一些
编辑
总结:
Jdk中提供了很多阻塞队列,开发中常见的有两个:ArrayBlockingQueue和LinkedBlockingQueue
ArrayBlockingQueue和LinkedBlockingQueue是Java中两种常见的阻塞队列,它们在实现和使用上有一些关键的区别。
首先,ArrayBlockingQueue是一个有界队列,它在创建时必须指定容量,并且这个容量不能改变。而LinkedBlockingQueue默认是无界的,但也可以在创建时指定最大容量,使其变为有界队列。
其次,它们在内部数据结构上也有所不同。ArrayBlockingQueue是基于数组实现的,而LinkedBlockingQueue则是基于链表实现的。这意味着ArrayBlockingQueue在访问元素时可能会更快,因为它可以直接通过索引访问数组中的元素。而LinkedBlockingQueue则在添加和删除元素时可能更快,因为它不需要移动其他元素来填充空间。
另外,它们在加锁机制上也有所不同。ArrayBlockingQueue使用一把锁来控制对队列的访问,这意味着读写操作都是互斥的。而LinkedBlockingQueue则使用两把锁,一把用于控制读操作,另一把用于控制写操作,这样可以提高并发性能。
3.1.4 确定核心线程数
在设置核心线程数之前,需要先熟悉一些执行线程池执行任务的类型
- IO密集型任务
一般来说:文件读写、DB读写、网络请求等
推荐:核心线程数大小设置为2N+1 (N为计算机的CPU核数)
- CPU密集型任务
一般来说:计算型代码、Bitmap转换、Gson转换等
推荐:核心线程数大小设置为N+1 (N为计算机的CPU核数)
参考回答
① 高并发、任务执行时间短 -->( CPU核数+1 ),减少线程上下文的切换
② 并发不高、任务执行时间长
- IO密集型的任务 --> (CPU核数 * 2 + 1)
- 计算密集型任务 --> ( CPU核数+1 )
③ 并发高、业务执行时间长,解决这种类型任务的关键不在于线程池而在于整体架构的设计,看看这些业务里面某些数据是否能做缓存是第一步,增加服务器是第二步,至于线程池的设置,设置参考(2)
3.1.5 线程池种类
在java.util.concurrent.Executors类中提供了大量创建连接池的静态方法,常见就有四种
创建使用固定线程数的线程池:
- 核心线程数与最大线程数一样,没有救急线程
- 阻塞队列是LinkedBlockingQueue,最大容量为Integer.MAX_VALUE
- 适用场景:适用于任务量已知,相对耗时的任务
- 案例:
public class FixedThreadPoolCase { static class FixedThreadDemo implements Runnable{ @Override public void run() { String name = Thread.currentThread().getName(); for (int i = 0; i < 2; i++) { System.out.println(name + ":" + i); } } } public static void main(String[] args) throws InterruptedException { //创建一个固定大小的线程池,核心线程数和最大线程数都是3 ExecutorService executorService = Executors.newFixedThreadPool(3); for (int i = 0; i < 5; i++) { executorService.submit(new FixedThreadDemo()); Thread.sleep(10); } executorService.shutdown(); } }
单线程化的线程池,它只会用唯一的工作线程来执行任 务,保证所有任务按照指定顺序(FIFO)执行
编辑
- 核心线程数和最大线程数都是1
- 阻塞队列是LinkedBlockingQueue,最大容量为Integer.MAX_VALUE
- 适用场景:适用于按照顺序执行的任务
- 案例:
public class NewSingleThreadCase { static int count = 0; static class Demo implements Runnable { @Override public void run() { count++; System.out.println(Thread.currentThread().getName() + ":" + count); } } public static void main(String[] args) throws InterruptedException { //单个线程池,核心线程数和最大线程数都是1 ExecutorService exec = Executors.newSingleThreadExecutor(); for (int i = 0; i < 10; i++) { exec.execute(new Demo()); Thread.sleep(5); } exec.shutdown(); } }
可缓存线程池:
编辑
- 核心线程数为0
- 最大线程数是Integer.MAX_VALUE
- 阻塞队列为SynchronousQueue:不存储元素的阻塞队列,每个插入操作都必须等待一个移出操作。
- 适用场景:适合任务数比较密集,但每个任务执行时间较短的情况
- 案例:
public class CachedThreadPoolCase { static class Demo implements Runnable { @Override public void run() { String name = Thread.currentThread().getName(); try { //修改睡眠时间,模拟线程执行需要花费的时间 Thread.sleep(100); System.out.println(name + "执行完了"); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) throws InterruptedException { //创建一个缓存的线程,没有核心线程数,最大线程数为Integer.MAX_VALUE ExecutorService exec = Executors.newCachedThreadPool(); for (int i = 0; i < 10; i++) { exec.execute(new Demo()); Thread.sleep(1); } exec.shutdown(); } }
提供了“延迟”和“周期执行”功能的ThreadPoolExecutor:
public class ScheduledThreadPoolCase { static class Task implements Runnable { @Override public void run() { try { String name = Thread.currentThread().getName(); System.out.println(name + ", 开始:" + new Date()); Thread.sleep(1000); System.out.println(name + ", 结束:" + new Date()); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) throws InterruptedException { //按照周期执行的线程池,核心线程数为2,最大线程数为Integer.MAX_VALUE ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(2); System.out.println("程序开始:" + new Date()); /** * schedule 提交任务到线程池中 * 第一个参数:提交的任务 * 第二个参数:任务执行的延迟时间 * 第三个参数:时间单位 */ scheduledThreadPool.schedule(new Task(), 0, TimeUnit.SECONDS); scheduledThreadPool.schedule(new Task(), 1, TimeUnit.SECONDS); scheduledThreadPool.schedule(new Task(), 5, TimeUnit.SECONDS); Thread.sleep(5000); // 关闭线程池 scheduledThreadPool.shutdown(); } }
参考回答
在jdk中默认提供了4中方式创建线程池
第一个是:newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回 收空闲线程,若无可回收,则新建线程。
第二个是:newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列 中等待。
第三个是:newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。
第四个是:newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
3.1.6 为什么不建议使用Executors创建线程池
参考阿里开发手册《Java开发手册-嵩山版》
主要原因是如果使用Executors创建线程池的话,它允许的请求队列默认长度是Integer.MAX_VALUE,这样的话,有可能导致堆积大量的请求,从而导致OOM(内存溢出)。
所以,我们一般推荐使用ThreadPoolExecutor来创建线程池,这样可以明确规定线程池的参数,避免资源的耗尽。
3.1.7 线程池应用场景
CountDownLatch
CountDownLatch(闭锁/倒计时锁)用来进行线程同步协作,等待所有线程完成倒计时(一个或者多个线程,等待其他多个线程完成某件事情之后才能执行)
- 其中构造参数用来初始化等待计数值
- await() 用来等待计数归零
- countDown() 用来让计数减一
编辑
public class CountDownLatchDemo { public static void main(String[] args) throws InterruptedException { //初始化了一个倒计时锁 参数为 3 CountDownLatch latch = new CountDownLatch(3); new Thread(() -> { System.out.println(Thread.currentThread().getName()+"-begin..."); try { Thread.sleep(1000); } catch (InterruptedException e) { throw new RuntimeException(e); } //count-- latch.countDown(); System.out.println(Thread.currentThread().getName()+"-end..." +latch.getCount()); }).start(); new Thread(() -> { System.out.println(Thread.currentThread().getName()+"-begin..."); try { Thread.sleep(2000); } catch (InterruptedException e) { throw new RuntimeException(e); } //count-- latch.countDown(); System.out.println(Thread.currentThread().getName()+"-end..." +latch.getCount()); }).start(); new Thread(() -> { System.out.println(Thread.currentThread().getName()+"-begin..."); try { Thread.sleep(1500); } catch (InterruptedException e) { throw new RuntimeException(e); } //count-- latch.countDown(); System.out.println(Thread.currentThread().getName()+"-end..." +latch.getCount()); }).start(); String name = Thread.currentThread().getName(); System.out.println(name + "-waiting..."); //等待其他线程完成 latch.await(); System.out.println(name + "-wait end..."); } }
ES数据导入:
在我们项目上线之前,我们需要把数据库中的数据一次性的同步到es索引库中,但是当时的数据好像是1000万左右,一次性读取数据肯定不行(oom异常),当时我就想到可以使用线程池的方式导入,利用CountDownLatch来控制,就能避免一次性加载过多,防止内存溢出
整体流程就是通过CountDownLatch+线程池配合去执行
编辑
详细实现流程:
编辑
数据汇总:
在一个电商网站中,用户下单之后,需要查询数据,数据包含了三部分:订单信息、包含的商品、物流信息;这三块信息都在不同的微服务中进行实现的,我们如何完成这个业务呢?
编辑
- 在实际开发的过程中,难免需要调用多个接口来汇总数据,如果所有接口(或部分接口)的没有依赖关系,就可以使用线程池+future来提升性能
异步调用:
在进行搜索的时候,需要保存用户的搜索记录,而搜索记录不能影响用户的正常搜索,我们通常会开启一个线程去执行历史记录的保存,在新开启的线程在执行的过程中,可以利用线程提交任务
3.1.8 如何控制某个方法允许并发访问线程的数量?
Semaphore [ˈsɛməˌfɔr] 信号量,是JUC包下的一个工具类,我们可以通过其限制执行的线程数量,达到限流的效果
当一个线程执行时先通过其方法进行获取许可操作,获取到许可的线程继续执行业务逻辑,当线程执行完成后进行释放许可操作,未获取达到许可的线程进行等待或者直接结束。
Semaphore两个重要的方法
lsemaphore.acquire(): 请求一个信号量,这时候的信号量个数-1(一旦没有可使用的信号量,也即信号量个数变为负数时,再次请求的时候就会阻塞,直到其他线程释放了信号量)
lsemaphore.release():释放一个信号量,此时信号量个数+1
线程任务类:
public class SemaphoreCase { public static void main(String[] args) { // 1. 创建 semaphore 对象 Semaphore semaphore = new Semaphore(3); // 2. 10个线程同时运行 for (int i = 0; i < 10; i++) { new Thread(() -> { try { // 3. 获取许可 semaphore.acquire(); } catch (InterruptedException e) { e.printStackTrace(); } try { System.out.println("running..."); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("end..."); } finally { // 4. 释放许可 semaphore.release(); } }).start(); } } }
3.2 ThreadLocal
3.2.1 概述
ThreadLocal是多线程中对于解决线程安全的一个操作类,它会为每个线程都分配一个独立的线程副本从而解决了变量并发访问冲突的问题。ThreadLocal 同时实现了线程内的资源共享。
ThreadLocal会为每个线程分配一个独立的线程空间,用于解决变量并发访问冲突的问题。ThreadLocal会实现线程内的资源共享。ThreadLocal本质来说就是一个线程内部存储类,让多个线程只操作自己内部的值,从而实现线程数据隔离。
案例:使用JDBC操作数据库时,会将每一个线程的Connection放入各自的ThreadLocal中,从而保证每个线程都在各自的 Connection 上进行数据库的操作,避免A线程关闭了B线程的连接。
3.2.2 基本使用
三个主要方法:
- set(value) 设置值
- get() 获取值
- remove() 清除值
public class ThreadLocalTest { static ThreadLocal<String> threadLocal = new ThreadLocal<>(); public static void main(String[] args) { new Thread(() -> { String name = Thread.currentThread().getName(); threadLocal.set("itcast"); print(name); System.out.println(name + "-after remove : " + threadLocal.get()); }, "t1").start(); new Thread(() -> { String name = Thread.currentThread().getName(); threadLocal.set("itheima"); print(name); System.out.println(name + "-after remove : " + threadLocal.get()); }, "t2").start(); } static void print(String str) { //打印当前线程中本地内存中本地变量的值 System.out.println(str + " :" + threadLocal.get()); //清除本地内存中的本地变量 threadLocal.remove(); } }
3.2.3 底层原理
ThreadLocal本质来说就是一个线程内部存储类,从而让多个线程只操作自己内部的值,从而实现线程数据隔离
编辑
在ThreadLocal中有一个内部类叫做ThreadLocalMap,类似于HashMap
ThreadLocalMap中有一个属性table数组,这个是真正存储数据的位置
set方法
编辑
get方法/remove方法
编辑
在ThreadLocal中有一个ThreadLocalMap内部类(ThreadLocalMap属于Thread,而不是属于ThreadLocal),每个线程持有一个ThreadLocalMap对象,在这个内部类中有一个属性为table的数组,这个是真正存数据的地方。
Spring框架集成ThreadLocal:
Spring框架中,每个用户的请求会被分配一个独立的线程处理,每一个线程会先获取到自己的ThreadLocalMap对象,然后以当前ThreadLocal对象为Key,以用户信息为Value存储到自己的ThreadLocalMap中,获取信息同理,先获取当前线程的ThreadLocalMap,然后再获取对应的用户信息。
3.2.4 内存泄漏问题
ava对象中存在四种引用:强引用,软引用,弱引用,虚引用
强引用:表示一个对象处于有用且必须的状态,如果一个对象具有强引用,则GC不会回收它。即便堆中内存不足了,宁可出现OOM,也不会对其进行回收。
弱引用:表示一个对象处于可能有用且非必须的状态。则GC线程扫描内存区域时,一旦发现弱引用,就会回收到弱引用相关联的对象,对于弱引用的回收,无关内存区域是否够,一旦发现就会被回收。
每个Thread维护一个ThreadLocalMap对象,ThreadLocalMap中的Key被设计为了弱引用,会被GC调用释放Key,但是Value是强引用,不会被GC回收。使用ThreadLocal时我们往往将其作为静态变量(作为一个强引用)使用,无法被动依赖GC回收,我们通常主动remove释放key,这样就能避免内存溢出问题。
3.3 ConcurrentHashMap
ConcurrentHashMap 是一种线程安全的高效Map集合
底层数据结构:
- JDK1.7底层采用分段的数组+链表实现
- JDK1.8 采用的数据结构跟HashMap1.8的结构一样,数组+链表/红黑二叉树。
(1) JDK1.7中concurrentHashMap
数据结构:
编辑
- 提供了一个segment数组,在初始化ConcurrentHashMap 的时候可以指定数组的长度,默认是16,一旦初始化之后中间不可扩容
- 在每个segment中都可以挂一个HashEntry数组,数组里面可以存储具体的元素,HashEntry数组是可以扩容的
- 在HashEntry存储的数组中存储的元素,如果发生冲突,则可以挂单向链表
存储流程:
编辑
- 先去计算key的hash值,然后确定segment数组下标
- 再通过hash值确定hashEntry数组中的下标存储数据
- 在进行操作数据的之前,会先判断当前segment对应下标位置是否有线程进行操作,为了线程安全使用的是ReentrantLock进行加锁,如果获取锁是被会使用cas自旋锁进行尝试
(2) JDK1.8中concurrentHashMap
在JDK1.8中,放弃了Segment臃肿的设计,数据结构跟HashMap的数据结构是一样的:数组+红黑树+链表
采用 CAS + Synchronized来保证并发安全进行实现
- CAS控制数组节点的添加
- synchronized只锁定当前链表或红黑二叉树的首节点,只要hash不冲突,就不会产生并发的问题 , 效率得到提升
编辑
如果某个数组为空,此时有两个线程同时添加到同一个位置,那么通过CAS来确保添加的安全性,一个成功,另一个失败的话就会自旋尝试,然后添加到元素的后面。如果添加的时候首节点是有的,那么会对首节点上syn锁,确保并发安全
参考回答
ConcurrentHashMap 是一种线程安全的高效Map集合,jdk1.7和1.8也做了很多调整。
- JDK1.7的底层采用是分段的数组+链表 实现
- JDK1.8 采用的数据结构跟HashMap1.8的结构一样,数组+链表/红黑二叉树。
在jdk1.7中 ConcurrentHashMap 里包含一个 Segment 数组。Segment 的结构和HashMap类似,是一 种数组和链表结构,一个 Segment 包含一个 HashEntry 数组,每个 HashEntry 是一个链表结构 的元素,每个 Segment 守护着一个HashEntry数组里的元素,当对 HashEntry 数组的数据进行修 改时,必须首先获得对应的 Segment的锁。
Segment 是一种可重入的锁 ReentrantLock,每个 Segment 守护一个HashEntry 数组里得元 素,当对 HashEntry 数组的数据进行修改时,必须首先获得对应的 Segment 锁
在jdk1.8中的ConcurrentHashMap 做了较大的优化,性能提升了不少。首先是它的数据结构与jdk1.8的hashMap数据结构完全一致。其次是放弃了Segment臃肿的设计,取而代之的是采用Node + CAS + Synchronized来保 证并发安全进行实现,synchronized只锁定当前链表或红黑二叉树的首节点,这样只要hash不冲 突,就不会产生并发 , 效率得到提升。
3.4 导致并发程序出现问题的根本原因
Java并发编程三大特性
- 原子性
- 可见性
- 有序性
1)原子性
一个线程在CPU中操作不可暂停,也不可中断,要不执行完成,要不不执行
比如,如下代码能保证原子性吗?
编辑
以上代码会出现超卖或者是一张票卖给同一个人,执行并不是原子性的
解决方案:
1.synchronized:同步加锁
2.JUC里面的lock:加锁
编辑
2)内存可见性
内存可见性:让一个线程对共享变量的修改对另一个线程可见
比如,以下代码不能保证内存可见性
编辑
解决方案:
- synchronized
- volatile(推荐)
- LOCK
3)有序性
指令重排:处理器为了提高程序运行效率,可能会对输入代码进行优化,它不保证程序中各个语句的执行先后顺序同代码中的顺序一致,但是它会保证程序最终执行结果和代码顺序执行的结果是一致的。
还是之前的例子,如下代码:
编辑
解决方案:
- volatile
参考回答
Java并发编程有三大核心特性,分别是原子性、可见性和有序性。
首先,原子性指的是一个线程在CPU中的操作是不可暂停也不可中断的,要么执行完成,要么不执行。比如,一些简单的操作如赋值可能是原子的,但复合操作如自增就不是原子的。为了保证原子性,我们可以使用synchronized关键字或JUC里面的Lock来进行加锁。
其次,可见性是指让一个线程对共享变量的修改对另一个线程可见。由于线程可能在自己的工作内存中缓存共享变量的副本,因此一个线程对共享变量的修改可能不会立即反映在其他线程的工作内存中。为了解决这个问题,我们可以使用synchronized关键字、volatile关键字或Lock来确保可见性。
最后,有序性是指处理器为了提高程序运行效率,可能会对输入代码进行优化,导致程序中各个语句的执行先后顺序与代码中的顺序不一致。虽然处理器会保证程序最终执行结果与代码顺序执行的结果一致,但在某些情况下我们可能需要确保特定的执行顺序。为了解决这个问题,我们可以使用volatile关键字来禁止指令重排。