
微信搜「 JavaKeeper」程序员成长充电站,互联网技术武道场。无套路领取 500+ 本电子书和 30+ 视频教学和源码。
能力说明:
精通JVM运行机制,包括类生命、内存模型、垃圾回收及JVM常见参数;能够熟练使用Runnable接口创建线程和使用ExecutorService并发执行任务、识别潜在的死锁线程问题;能够使用Synchronized关键字和atomic包控制线程的执行顺序,使用并行Fork/Join框架;能过开发使用原始版本函数式接口的代码。
暂时未有相关云产品技术能力~
阿里云技能认证
详细说明手撕面试题:多个线程顺序执行问题 大家在换工作面试中,除了一些常规算法题,还会遇到各种需要手写的题目,所以打算总结出来,给大家个参考。 第一篇打算总结下阿里最喜欢问的多个线程顺序打印问题,我遇到的是机试,直接写出运行。同类型的题目有很多,比如 三个线程分别打印 A,B,C,要求这三个线程一起运行,打印 n 次,输出形如“ABCABCABC....”的字符串 两个线程交替打印 0~100 的奇偶数 通过 N 个线程顺序循环打印从 0 至 100 多线程按顺序调用,A->B->C,AA 打印 5 次,BB 打印10 次,CC 打印 15 次,重复 10 次 用两个线程,一个输出字母,一个输出数字,交替输出 1A2B3C4D...26Z 其实这类题目考察的都是线程间的通信问题,基于这类题目,做一个整理,方便日后手撕面试官,文明的打工人,手撕面试题。 使用 Lock 我们以第一题为例:三个线程分别打印 A,B,C,要求这三个线程一起运行,打印 n 次,输出形如“ABCABCABC....”的字符串。 思路:使用一个取模的判断逻辑 C%M ==N,题为 3 个线程,所以可以按取模结果编号:0、1、2,他们与 3 取模结果仍为本身,则执行打印逻辑。 public class PrintABCUsingLock { private int times; // 控制打印次数 private int state; // 当前状态值:保证三个线程之间交替打印 private Lock lock = new ReentrantLock(); public PrintABCUsingLock(int times) { this.times = times; } private void printLetter(String name, int targetNum) { for (int i = 0; i < times; ) { lock.lock(); if (state % 3 == targetNum) { state++; i++; System.out.print(name); } lock.unlock(); } } public static void main(String[] args) { PrintABCUsingLock loopThread = new PrintABCUsingLock(1); new Thread(() -> { loopThread.printLetter("B", 1); }, "B").start(); new Thread(() -> { loopThread.printLetter("A", 0); }, "A").start(); new Thread(() -> { loopThread.printLetter("C", 2); }, "C").start(); } } main 方法启动后,3 个线程会抢锁,但是 state 的初始值为 0,所以第一次执行 if 语句的内容只能是 线程 A,然后还在 for 循环之内,此时 state = 1,只有 线程 B 才满足 1% 3 == 1,所以第二个执行的是 B,同理只有 线程 C 才满足 2% 3 == 2,所以第三个执行的是 C,执行完 ABC 之后,才去执行第二次 for 循环,所以要把 i++ 写在 for 循环里边,不能写成 for (int i = 0; i < times;i++) 这样。 使用 wait/notify 其实遇到这类型题目,好多同学可能会先想到的就是 join(),或者 wati/notify 这样的思路。算是比较传统且万能的解决方案。也有些面试官会要求不能使用这种方式。 思路:还是以第一题为例,我们用对象监视器来实现,通过 wait 和 notify() 方法来实现等待、通知的逻辑,A 执行后,唤醒 B,B 执行后唤醒 C,C 执行后再唤醒 A,这样循环的等待、唤醒来达到目的。 public class PrintABCUsingWaitNotify { private int state; private int times; private static final Object LOCK = new Object(); public PrintABCUsingWaitNotify(int times) { this.times = times; } public static void main(String[] args) { PrintABCUsingWaitNotify printABC = new PrintABCUsingWaitNotify(10); new Thread(() -> { printABC.printLetter("A", 0); }, "A").start(); new Thread(() -> { printABC.printLetter("B", 1); }, "B").start(); new Thread(() -> { printABC.printLetter("C", 2); }, "C").start(); } private void printLetter(String name, int targetState) { for (int i = 0; i < times; i++) { synchronized (LOCK) { while (state % 3 != targetState) { try { LOCK.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } state++; System.out.print(name); LOCK.notifyAll(); } } } } 同样的思路,来解决下第 2 题:两个线程交替打印奇数和偶数 使用对象监视器实现,两个线程 A、B 竞争同一把锁,只要其中一个线程获取锁成功,就打印 ++i,并通知另一线程从等待集合中释放,然后自身线程加入等待集合并释放锁即可。 public class OddEvenPrinter { private Object monitor = new Object(); private final int limit; private volatile int count; OddEvenPrinter(int initCount, int times) { this.count = initCount; this.limit = times; } public static void main(String[] args) { OddEvenPrinter printer = new OddEvenPrinter(0, 10); new Thread(printer::print, "odd").start(); new Thread(printer::print, "even").start(); } private void print() { synchronized (monitor) { while (count < limit) { try { System.out.println(String.format("线程[%s]打印数字:%d", Thread.currentThread().getName(), ++count)); monitor.notifyAll(); monitor.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } //防止有子线程被阻塞未被唤醒,导致主线程不退出 monitor.notifyAll(); } } } 同样的思路,来解决下第 5 题:用两个线程,一个输出字母,一个输出数字,交替输出 1A2B3C4D...26Z public class NumAndLetterPrinter { private static char c = 'A'; private static int i = 0; static final Object lock = new Object(); public static void main(String[] args) { new Thread(() -> printer(), "numThread").start(); new Thread(() -> printer(), "letterThread").start(); } private static void printer() { synchronized (lock) { for (int i = 0; i < 26; i++) { if (Thread.currentThread().getName() == "numThread") { //打印数字1-26 System.out.print((i + 1)); // 唤醒其他在等待的线程 lock.notifyAll(); try { // 让当前线程释放锁资源,进入wait状态 lock.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } else if (Thread.currentThread().getName() == "letterThread") { // 打印字母A-Z System.out.print((char) ('A' + i)); // 唤醒其他在等待的线程 lock.notifyAll(); try { // 让当前线程释放锁资源,进入wait状态 lock.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } lock.notifyAll(); } } } 使用 Lock/Condition 还是以第一题为例,使用 Condition 来实现,其实和 wait/notify 的思路一样。 Condition 中的 await() 方法相当于 Object 的 wait() 方法,Condition 中的 signal() 方法相当于Object 的 notify() 方法,Condition 中的 signalAll() 相当于 Object 的 notifyAll() 方法。 不同的是,Object 中的 wait(),notify(),notifyAll()方法是和"同步锁"(synchronized关键字)捆绑使用的;而 Condition 是需要与"互斥锁"/"共享锁"捆绑使用的。 public class PrintABCUsingLockCondition { private int times; private int state; private static Lock lock = new ReentrantLock(); private static Condition c1 = lock.newCondition(); private static Condition c2 = lock.newCondition(); private static Condition c3 = lock.newCondition(); public PrintABCUsingLockCondition(int times) { this.times = times; } public static void main(String[] args) { PrintABCUsingLockCondition print = new PrintABCUsingLockCondition(10); new Thread(() -> { print.printLetter("A", 0, c1, c2); }, "A").start(); new Thread(() -> { print.printLetter("B", 1, c2, c3); }, "B").start(); new Thread(() -> { print.printLetter("C", 2, c3, c1); }, "C").start(); } private void printLetter(String name, int targetState, Condition current, Condition next) { for (int i = 0; i < times; ) { lock.lock(); try { while (state % 3 != targetState) { current.await(); } state++; i++; System.out.print(name); next.signal(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } } } 使用 Lock 锁的多个 Condition 可以实现精准唤醒,所以碰到那种多个线程交替打印不同次数的题就比较容易想到,比如解决第四题:多线程按顺序调用,A->B->C,AA 打印 5 次,BB 打印10 次,CC 打印 15 次,重复 10 次 代码就不贴了,思路相同。 以上几种方式,其实都会存在一个锁的抢夺过程,如果抢锁的的线程数量足够大,就会出现很多线程抢到了锁但不该自己执行,然后就又解锁或 wait() 这种操作,这样其实是有些浪费资源的。 使用 Semaphore 在信号量上我们定义两种操作: 信号量主要用于两个目的,一个是用于多个共享资源的互斥使用,另一个用于并发线程数的控制。 acquire(获取) 当一个线程调用 acquire 操作时,它要么通过成功获取信号量(信号量减1),要么一直等下去,直到有线程释放信号量,或超时。 release(释放)实际上会将信号量的值加1,然后唤醒等待的线程。 先看下如何解决第一题:三个线程循环打印 A,B,C public class PrintABCUsingSemaphore { private int times; private static Semaphore semaphoreA = new Semaphore(1); // 只有A 初始信号量为1,第一次获取到的只能是A private static Semaphore semaphoreB = new Semaphore(0); private static Semaphore semaphoreC = new Semaphore(0); public PrintABCUsingSemaphore(int times) { this.times = times; } public static void main(String[] args) { PrintABCUsingSemaphore printer = new PrintABCUsingSemaphore(1); new Thread(() -> { printer.print("A", semaphoreA, semaphoreB); }, "A").start(); new Thread(() -> { printer.print("B", semaphoreB, semaphoreC); }, "B").start(); new Thread(() -> { printer.print("C", semaphoreC, semaphoreA); }, "C").start(); } private void print(String name, Semaphore current, Semaphore next) { for (int i = 0; i < times; i++) { try { System.out.println("111" + Thread.currentThread().getName()); current.acquire(); // A获取信号执行,A信号量减1,当A为0时将无法继续获得该信号量 System.out.print(name); next.release(); // B释放信号,B信号量加1(初始为0),此时可以获取B信号量 System.out.println("222" + Thread.currentThread().getName()); } catch (InterruptedException e) { e.printStackTrace(); } } } } 如果题目中是多个线程循环打印的话,一般使用信号量解决是效率较高的方案,上一个线程持有下一个线程的信号量,通过一个信号量数组将全部关联起来,这种方式不会存在浪费资源的情况。 接着用信号量的方式解决下第三题:通过 N 个线程顺序循环打印从 0 至 100 public class LoopPrinter { private final static int THREAD_COUNT = 3; static int result = 0; static int maxNum = 10; public static void main(String[] args) throws InterruptedException { final Semaphore[] semaphores = new Semaphore[THREAD_COUNT]; for (int i = 0; i < THREAD_COUNT; i++) { //非公平信号量,每个信号量初始计数都为1 semaphores[i] = new Semaphore(1); if (i != THREAD_COUNT - 1) { System.out.println(i+"==="+semaphores[i].getQueueLength()); //获取一个许可前线程将一直阻塞, for 循环之后只有 syncObjects[2] 没有被阻塞 semaphores[i].acquire(); } } for (int i = 0; i < THREAD_COUNT; i++) { // 初次执行,上一个信号量是 syncObjects[2] final Semaphore lastSemphore = i == 0 ? semaphores[THREAD_COUNT - 1] : semaphores[i - 1]; final Semaphore currentSemphore = semaphores[i]; final int index = i; new Thread(() -> { try { while (true) { // 初次执行,让第一个 for 循环没有阻塞的 syncObjects[2] 先获得令牌阻塞了 lastSemphore.acquire(); System.out.println("thread" + index + ": " + result++); if (result > maxNum) { System.exit(0); } // 释放当前的信号量,syncObjects[0] 信号量此时为 1,下次 for 循环中上一个信号量即为syncObjects[0] currentSemphore.release(); } } catch (Exception e) { e.printStackTrace(); } }).start(); } } } 使用 LockSupport LockSupport 是 JDK 底层的基于 sun.misc.Unsafe 来实现的类,用来创建锁和其他同步工具类的基本线程阻塞原语。它的静态方法unpark()和park()可以分别实现阻塞当前线程和唤醒指定线程的效果,所以用它解决这样的问题会更容易一些。 (在 AQS 中,就是通过调用 LockSupport.park( )和 LockSupport.unpark() 来实现线程的阻塞和唤醒的。) public class PrintABCUsingLockSupport { private static Thread threadA, threadB, threadC; public static void main(String[] args) { threadA = new Thread(() -> { for (int i = 0; i < 10; i++) { // 打印当前线程名称 System.out.print(Thread.currentThread().getName()); // 唤醒下一个线程 LockSupport.unpark(threadB); // 当前线程阻塞 LockSupport.park(); } }, "A"); threadB = new Thread(() -> { for (int i = 0; i < 10; i++) { // 先阻塞等待被唤醒 LockSupport.park(); System.out.print(Thread.currentThread().getName()); // 唤醒下一个线程 LockSupport.unpark(threadC); } }, "B"); threadC = new Thread(() -> { for (int i = 0; i < 10; i++) { // 先阻塞等待被唤醒 LockSupport.park(); System.out.print(Thread.currentThread().getName()); // 唤醒下一个线程 LockSupport.unpark(threadA); } }, "C"); threadA.start(); threadB.start(); threadC.start(); } } 理解了思路,解决其他问题就容易太多了。 比如,我们再解决下第五题:用两个线程,一个输出字母,一个输出数字,交替输出 1A2B3C4D...26Z public class NumAndLetterPrinter { private static Thread numThread, letterThread; public static void main(String[] args) { letterThread = new Thread(() -> { for (int i = 0; i < 26; i++) { System.out.print((char) ('A' + i)); LockSupport.unpark(numThread); LockSupport.park(); } }, "letterThread"); numThread = new Thread(() -> { for (int i = 1; i <= 26; i++) { System.out.print(i); LockSupport.park(); LockSupport.unpark(letterThread); } }, "numThread"); numThread.start(); letterThread.start(); } } 写在最后 好了,以上就是常用的五种实现方案,多练习几次,手撕没问题。 当然,这类问题,解决方式不止是我列出的这些,还会有 join、CountDownLatch、也有放在队列里解决的,思路有很多,面试官想考察的其实只是对多线程的编程功底,其实自己练习的时候,是个很好的巩固理解 JUC 的过程。 以梦为马,越骑越傻。诗和远方,越走越慌。不忘初心是对的,但切记要出发,加油吧,程序员。 在路上的你,可以微信搜「 JavaKeeper 」一起前行,无套路领取 500+ 本电子书和 30+ 视频教学和源码,本文 GitHub github.com/JavaKeeper 已经收录,服务端开发、面试必备技能兵器谱,有你想要的。
队列和阻塞队列 队列 队列(Queue)是一种经常使用的集合。Queue 实际上是实现了一个先进先出(FIFO:First In First Out)的有序表。和 List、Set 一样都继承自 Collection。它和 List 的区别在于,List可以在任意位置添加和删除元素,而Queue 只有两个操作: 把元素添加到队列末尾; 从队列头部取出元素。 超市的收银台就是一个队列: 我们常用的 LinkedList 就可以当队列使用,实现了 Dequeue 接口,还有 ConcurrentLinkedQueue,他们都属于非阻塞队列。 阻塞队列 阻塞队列,顾名思义,首先它是一个队列,而一个阻塞队列在数据结构中所起的作用大致如下 线程 1 往阻塞队列中添加元素,而线程 2 从阻塞队列中移除元素 当阻塞队列是空时,从队列中获取元素的操作将会被阻塞。 当阻塞队列是满时,从队列中添加元素的操作将会被阻塞。 试图从空的阻塞队列中获取元素的线程将会阻塞,直到其他的线程往空的队列插入新的元素,同样,试图往已满的阻塞队列添加新元素的线程同样也会阻塞,直到其他的线程从列中移除一个或多个元素或者完全清空队列后继续新增。 类似我们去海底捞排队,海底捞爆满情况下,阻塞队列相当于用餐区,用餐区满了的话,就阻塞在候客区等着,可以用餐的话 put 一波去用餐,吃完就 take 出去。 为什么要用阻塞队列,有什么好处吗 在多线程领域:所谓阻塞,是指在某些情况下会挂起线程(即阻塞),一旦条件满足,被挂起的线程又会自动被唤醒。 那为什么需要 BlockingQueue 呢 好处是我们不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,因为这些 BlockingQueue 都包办了。 在 concurrent 包发布以前,多线程环境下,我们每个程序员都必须自己去实现这些细节,尤其还要兼顾效率和线程安全,这会给我们的程序带来不小的复杂性。现在有了阻塞队列,我们的操作就从手动挡换成了自动挡。 Java 里的阻塞队列 Collection的子类除了我们熟悉的 List 和 Set,还有一个 Queue,阻塞队列 BlockingQueue 继承自 Queue。 BlockingQueue 是个接口,需要使用它的实现之一来使用 BlockingQueue,java.util.concurrent 包下具有以下 BlockingQueue 接口的实现类: JDK 提供了 7 个阻塞队列。分别是 ArrayBlockingQueue :一个由数组结构组成的有界阻塞队列 LinkedBlockingQueue :一个由链表结构组成的有界阻塞队列 PriorityBlockingQueue :一个支持优先级排序的无界阻塞队列 DelayQueue:一个使用优先级队列实现的无界阻塞队列 SynchronousQueue:一个不存储元素的阻塞队列 LinkedTransferQueue:一个由链表结构组成的无界阻塞队列(实现了继承于 BlockingQueue 的 TransferQueue) LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列 BlockingQueue 核心方法 相比 Queue 接口,BlockingQueue 有四种形式的 API。 方法类型 抛出异常 返回特殊值 一直阻塞 超时退出 插入 add(e) offer(e) put(e) offer(e,time,unit) 移除(取出) remove() poll() take() poll(time,unit) 检查 element() peek() 不可用 不可用 以 ArrayBlockingQueue 为例来看下 Java 阻塞队列提供的常用方法 抛出异常: 当阻塞队列满时,再往队列里 add 插入元素会抛出 java.lang.IllegalStateException: Queue full 异常; 当队列为空时,从队列里 remove 移除元素时会抛出 NoSuchElementException 异常 。 element(),返回队列头部的元素,如果队列为空,则抛出一个 NoSuchElementException 异常 返回特殊值: offer(),插入方法,成功返回 true,失败返回 false; poll(),移除方法,成功返回出队列的元素,队列里没有则返回 null peek() ,返回队列头部的元素,如果队列为空,则返回 null 一直阻塞: 当阻塞队列满时,如果生产线程继续往队列里 put 元素,队列会一直阻塞生产线程,直到拿到数据,或者响应中断退出; 当阻塞队列空时,消费线程试图从队列里 take 元素,队列也会一直阻塞消费线程,直到队列可用。 超时退出: 当阻塞队列满时,队列会阻塞生产线程一定时间,如果超过一定的时间,生产线程就会退出,返回 false 当阻塞队列空时,队列会阻塞消费线程一定时间,如果超过一定的时间,消费线程会退出,返回 null BlockingQueue 实现类 逐个分析下这 7 个阻塞队列,常用的几个顺便探究下源码。 ArrayBlockingQueue ArrayBlockingQueue,一个由数组实现的有界阻塞队列。该队列采用先进先出(FIFO)的原则对元素进行排序添加的。 ArrayBlockingQueue 为有界且固定,其大小在构造时由构造函数来决定,确认之后就不能再改变了。 ArrayBlockingQueue 支持对等待的生产者线程和使用者线程进行排序的可选公平策略,但是在默认情况下不保证线程公平的访问,在构造时可以选择公平策略(fair = true)。公平性通常会降低吞吐量,但是减少了可变性和避免了“不平衡性”。(ArrayBlockingQueue 内部的阻塞队列是通过 ReentrantLock 和 Condition 条件队列实现的, 所以 ArrayBlockingQueue 中的元素存在公平和非公平访问的区别) 所谓公平访问队列是指阻塞的所有生产者线程或消费者线程,当队列可用时,可以按照阻塞的先后顺序访问队列,即先阻塞的生产者线程,可以先往队列里插入元素,先阻塞的消费者线程,可以先从队列里获取元素,可以保证先进先出,避免饥饿现象。 源码解读 public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { // 通过数组来实现的队列 final Object[] items; //记录队首元素的下标 int takeIndex; //记录队尾元素的下标 int putIndex; //队列中的元素个数 int count; //通过ReentrantLock来实现同步 final ReentrantLock lock; //有2个条件对象,分别表示队列不为空和队列不满的情况 private final Condition notEmpty; private final Condition notFull; //迭代器 transient Itrs itrs; //offer方法用于向队列中添加数据 public boolean offer(E e) { // 可以看出添加的数据不支持null值 checkNotNull(e); final ReentrantLock lock = this.lock; //通过重入锁来实现同步 lock.lock(); try { //如果队列已经满了的话直接就返回false,不会阻塞调用这个offer方法的线程 if (count == items.length) return false; else { //如果队列没有满,就调用enqueue方法将元素添加到队列中 enqueue(e); return true; } } finally { lock.unlock(); } } //多了个等待时间的 offer方法 public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { checkNotNull(e); long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; //获取可中断锁 lock.lockInterruptibly(); try { while (count == items.length) { if (nanos <= 0) return false; //等待设置的时间 nanos = notFull.awaitNanos(nanos); } //如果等待时间过了,队列有空间的话就会调用enqueue方法将元素添加到队列 enqueue(e); return true; } finally { lock.unlock(); } } //将数据添加到队列中的具体方法 private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; final Object[] items = this.items; items[putIndex] = x; //通过循环数组实现的队列,当数组满了时下标就变成0了 if (++putIndex == items.length) putIndex = 0; count++; //激活因为notEmpty条件而阻塞的线程,比如调用take方法的线程 notEmpty.signal(); } //将数据从队列中取出的方法 private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null; final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; //将对应的数组下标位置设置为null释放资源 items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); //激活因为notFull条件而阻塞的线程,比如调用put方法的线程 notFull.signal(); return x; } //put方法和offer方法不一样的地方在于,如果队列是满的话,它就会把调用put方法的线程阻塞,直到队列里有空间 public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; //因为后面调用了条件变量的await()方法,而await()方法会在中断标志设置后抛出InterruptedException异常后退出, // 所以在加锁时候先看中断标志是不是被设置了,如果设置了直接抛出InterruptedException异常,就不用再去获取锁了 lock.lockInterruptibly(); try { while (count == items.length) //如果队列满的话就阻塞等待,直到notFull的signal方法被调用,也就是队列里有空间了 notFull.await(); //队列里有空间了执行添加操作 enqueue(e); } finally { lock.unlock(); } } //poll方法用于从队列中取数据,不会阻塞当前线程 public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { //如果队列为空的话会直接返回null,否则调用dequeue方法取数据 return (count == 0) ? null : dequeue(); } finally { lock.unlock(); } } //有等待时间的 poll 重载方法 public E poll(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) { if (nanos <= 0) return null; nanos = notEmpty.awaitNanos(nanos); } return dequeue(); } finally { lock.unlock(); } } //take方法也是用于取队列中的数据,但是和poll方法不同的是它有可能会阻塞当前的线程 public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { //当队列为空时,就会阻塞当前线程 while (count == 0) notEmpty.await(); //直到队列中有数据了,调用dequeue方法将数据返回 return dequeue(); } finally { lock.unlock(); } } //返回队首元素 public E peek() { final ReentrantLock lock = this.lock; lock.lock(); try { return itemAt(takeIndex); // null when queue is empty } finally { lock.unlock(); } } //获取队列的元素个数,加了锁,所以结果是准确的 public int size() { final ReentrantLock lock = this.lock; lock.lock(); try { return count; } finally { lock.unlock(); } } // 此外,还有一些其他方法 //返回队列剩余空间,还能加几个元素 public int remainingCapacity() { final ReentrantLock lock = this.lock; lock.lock(); try { return items.length - count; } finally { lock.unlock(); } } // 判断队列中是否存在当前元素o public boolean contains(Object o){} // 返回一个按正确顺序,包含队列中所有元素的数组 public Object[] toArray(){} // 自动清空队列中的所有元素 public void clear(){} // 移除队列中所有可用元素,并将他们加入到给定的 Collection 中 public int drainTo(Collection<? super E> c){} // 返回此队列中按正确顺序进行迭代的,包含所有元素的迭代器 public Iterator<E> iterator() } LinkedBlockingQueue LinkedBlockingQueue 是一个用单向链表实现的有界阻塞队列。此队列的默认和最大长度为 Integer.MAX_VALUE。此队列按照先进先出的原则对元素进行排序。 如果不是特殊业务,LinkedBlockingQueue 使用时,切记要定义容量 new LinkedBlockingQueue(capacity) ,防止过度膨胀。 源码解读 public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { private static final long serialVersionUID = -6903933977591709194L; // 基于链表实现,肯定要有结点类,典型的单链表结构 static class Node<E> { E item; Node<E> next; Node(E x) { item = x; } } //容量 private final int capacity; //当前队列元素数量 private final AtomicInteger count = new AtomicInteger(); // 头节点,不存数据 transient Node<E> head; // 尾节点,便于入队 private transient Node<E> last; // take锁,出队锁,只有take,poll方法会持有 private final ReentrantLock takeLock = new ReentrantLock(); // 出队等待条件 // 当队列无元素时,take锁会阻塞在notEmpty条件上,等待其它线程唤醒 private final Condition notEmpty = takeLock.newCondition(); // 入队锁,只有put,offer会持有 private final ReentrantLock putLock = new ReentrantLock(); // 入队等待条件 // 当队列满了时,put锁会会阻塞在notFull上,等待其它线程唤醒 private final Condition notFull = putLock.newCondition(); //同样提供三个构造器 public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); // 初始化head和last指针为空值节点 this.capacity = capacity; last = head = new Node<E>(null); } public LinkedBlockingQueue() { // 如果没传容量,就使用最大int值初始化其容量 this(Integer.MAX_VALUE); } public LinkedBlockingQueue(Collection<? extends E> c) {} //入队 public void put(E e) throws InterruptedException { // 不允许null元素 if (e == null) throw new NullPointerException(); //规定给当前put方法预留一个本地变量 int c = -1; // 新建一个节点 Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; // 使用put锁加锁 putLock.lockInterruptibly(); try { // 如果队列满了,就阻塞在notFull条件上 // 等待被其它线程唤醒 while (count.get() == capacity) { notFull.await(); } // 队列不满了,就入队 enqueue(node); // 队列长度加1 c = count.getAndIncrement(); // 如果现队列长度小于容量 // 就再唤醒一个阻塞在notFull条件上的线程 // 这里为啥要唤醒一下呢? // 因为可能有很多线程阻塞在notFull这个条件上的 // 而取元素时只有取之前队列是满的才会唤醒notFull // 为什么队列满的才唤醒notFull呢? // 因为唤醒是需要加putLock的,这是为了减少锁的次数 // 所以,这里索性在放完元素就检测一下,未满就唤醒其它notFull上的线程 // 说白了,这也是锁分离带来的代价 if (c + 1 < capacity) notFull.signal(); } finally { // 释放锁 putLock.unlock(); } // 如果原队列长度为0,现在加了一个元素后立即唤醒notEmpty条件 if (c == 0) signalNotEmpty(); } private void signalNotEmpty() { final ReentrantLock takeLock = this.takeLock; // 加take锁 takeLock.lock(); try { // 唤醒notEmpty条件 notEmpty.signal(); } finally { takeLock.unlock(); } } private void signalNotFull() { final ReentrantLock putLock = this.putLock; putLock.lock(); try { notFull.signal(); } finally { putLock.unlock(); } } private void enqueue(Node<E> node) { // 直接加到last后面 last = last.next = node; } public boolean offer(E e) { //用带过期时间的说明 } public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { if (e == null) throw new NullPointerException(); //转换为纳秒 long nanos = unit.toNanos(timeout); int c = -1; final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; //获取入队锁,支持等待锁的过程中被中断 putLock.lockInterruptibly(); try { //队列满了,再看看有没有超时 while (count.get() == capacity) { if (nanos <= 0) //等待时间超时 return false; //进行等待,awaitNanos(long nanos)是AQS中的方法 //在等待过程中,如果被唤醒或超时,则继续当前循环 //如果被中断,则抛出中断异常 nanos = notFull.awaitNanos(nanos); } //进入队尾 enqueue(new Node<E>(e)); c = count.getAndIncrement(); //说明当前元素后面还能再插入一个 //就唤醒一个入队条件队列中阻塞的线程 if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } //节点数量为0,说明队列是空的 if (c == 0) //唤醒一个出队条件队列阻塞的线程 signalNotEmpty(); return true; } public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { // 如果队列无元素,则阻塞在notEmpty条件上 while (count.get() == 0) { notEmpty.await(); } // 否则,出队 x = dequeue(); // 获取出队前队列的长度 c = count.getAndDecrement(); // 如果取之前队列长度大于1,则唤醒notEmpty if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } // 如果取之前队列长度等于容量 // 则唤醒notFull if (c == capacity) signalNotFull(); return x; } private E dequeue() { Node<E> h = head; Node<E> first = h.next; h.next = h; // help GC head = first; E x = first.item; first.item = null; return x; } public E poll(long timeout, TimeUnit unit) throws InterruptedException { E x = null; int c = -1; long nanos = unit.toNanos(timeout); final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { while (count.get() == 0) { //队列为空且已经超时,直接返回空 if (nanos <= 0) return null; //等待过程中可能被唤醒,超时,中断 nanos = notEmpty.awaitNanos(nanos); } //进行出队操作 x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } //如果出队前,队列是满的,则唤醒一个被take()阻塞的线程 if (c == capacity) signalNotFull(); return x; } public E poll() { // } public E peek() { if (count.get() == 0) return null; final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { Node<E> first = head.next; if (first == null) return null; else return first.item; } finally { takeLock.unlock(); } } void unlink(Node<E> p, Node<E> trail) { // assert isFullyLocked(); // p.next is not changed, to allow iterators that are // traversing p to maintain their weak-consistency guarantee. p.item = null; trail.next = p.next; if (last == p) last = trail; if (count.getAndDecrement() == capacity) notFull.signal(); } public boolean remove(Object o) { if (o == null) return false; fullyLock(); try { for (Node<E> trail = head, p = trail.next; p != null; trail = p, p = p.next) { if (o.equals(p.item)) { unlink(p, trail); return true; } } return false; } finally { fullyUnlock(); } } public boolean contains(Object o) { } static final class LBQSpliterator<E> implements Spliterator<E> { } } LinkedBlockingQueue 与 ArrayBlockingQueue 对比 ArrayBlockingQueue 入队出队采用一把锁,导致入队出队相互阻塞,效率低下; LinkedBlockingQueue 入队出队采用两把锁,入队出队互不干扰,效率较高; 二者都是有界队列,如果长度相等且出队速度跟不上入队速度,都会导致大量线程阻塞; LinkedBlockingQueue 如果初始化不传入初始容量,则使用最大 int 值,如果出队速度跟不上入队速度,会导致队列特别长,占用大量内存; PriorityBlockingQueue PriorityBlockingQueue 是一个支持优先级的无界阻塞队列。(虽说是无界队列,但是由于资源耗尽的话,也会OutOfMemoryError,无法添加元素) 默认情况下元素采用自然顺序升序排列。也可以自定义类实现 compareTo() 方法来指定元素排序规则,或者初始化 PriorityBlockingQueue 时,指定构造参数 Comparator 来对元素进行排序。但需要注意的是不能保证同优先级元素的顺序。PriorityBlockingQueue 是基于最小二叉堆实现,使用基于 CAS 实现的自旋锁来控制队列的动态扩容,保证了扩容操作不会阻塞 take 操作的执行。 DelayQueue DelayQueue 是一个使用优先级队列实现的延迟无界阻塞队列。 队列使用 PriorityQueue 来实现。队列中的元素必须实现 Delayed 接口,在创建元素时可以指定多久才能从队列中获取当前元素。只有在延迟期满时才能从队列中提取元素。我们可以将 DelayQueue 运用在以下应用场景: 缓存系统的设计:可以用 DelayQueue 保存缓存元素的有效期,使用一个线程循环查询 DelayQueue,一旦能从 DelayQueue 中获取元素时,表示缓存有效期到了。 定时任务调度。使用 DelayQueue 保存当天将会执行的任务和执行时间,一旦从 DelayQueue 中获取到任务就开始执行,从比如 Timer 就是使用 DelayQueue 实现的。 SynchronousQueue SynchronousQueue 是一个不存储元素的阻塞队列,也即是单个元素的队列。 每一个 put 操作必须等待一个 take 操作,否则不能继续添加元素。SynchronousQueue 可以看成是一个传球手,负责把生产者线程处理的数据直接传递给消费者线程。队列本身并不存储任何元素,非常适合于传递性场景, 比如在一个线程中使用的数据,传递给另外一个线程使用,SynchronousQueue 的吞吐量高于 LinkedBlockingQueue 和 ArrayBlockingQueue。 Coding synchronousQueue 是一个没有数据缓冲的阻塞队列,生产者线程对其的插入操作 put() 必须等待消费者的移除操作 take(),反过来也一样。 对应 peek, contains, clear, isEmpty ... 等方法其实是无效的。 但是 poll() 和 offer() 就不会阻塞,举例来说就是 offer 的时候如果有消费者在等待那么就会立马满足返回 true,如果没有就会返回 false,不会等待消费者到来。 public class SynchronousQueueDemo { public static void main(String[] args) { BlockingQueue<String> queue = new SynchronousQueue<>(); //System.out.println(queue.offer("aaa")); //false //System.out.println(queue.poll()); //null System.out.println(queue.add("bbb")); //IllegalStateException: Queue full new Thread(()->{ try { System.out.println("Thread 1 put a"); queue.put("a"); System.out.println("Thread 1 put b"); queue.put("b"); System.out.println("Thread 1 put c"); queue.put("c"); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); new Thread(()->{ try { TimeUnit.SECONDS.sleep(2); System.out.println("Thread 2 get:"+queue.take()); TimeUnit.SECONDS.sleep(2); System.out.println("Thread 2 get:"+queue.take()); TimeUnit.SECONDS.sleep(2); System.out.println("Thread 2 get:"+queue.take()); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); } } Thread 1 put a Thread 2 get:a Thread 1 put b Thread 2 get:b Thread 1 put c Thread 2 get:c 源码解读 不像ArrayBlockingQueue、LinkedBlockingDeque之类的阻塞队列依赖AQS实现并发操作,SynchronousQueue直接使用CAS实现线程的安全访问。 synchronousQueue 提供了两个构造器(公平与否),内部是通过 Transferer 来实现的,具体分为两个Transferer,分别是 TransferStack 和 TransferQueue。 TransferStack:非公平竞争模式使用的数据结构是后进先出栈(LIFO Stack) TransferQueue:公平竞争模式则使用先进先出队列(FIFO Queue) 性能上两者是相当的,一般情况下,FIFO 通常可以支持更大的吞吐量,但 LIFO 可以更大程度的保持线程的本地化。 private transient volatile Transferer<E> transferer; public SynchronousQueue() { this(false); } public SynchronousQueue(boolean fair) { transferer = fair ? new TransferQueue<E>() : new TransferStack<E>(); } 分析 TransferQueue 的实现 //构造函数中会初始化一个出队的节点,并且首尾都指向这个节点 TransferQueue() { QNode h = new QNode(null, false); // initialize to dummy node. head = h; tail = h; } //队列节点, static final class QNode { volatile QNode next; // next node in queue volatile Object item; // CAS'ed to or from null volatile Thread waiter; // to control park/unpark final boolean isData; QNode(Object item, boolean isData) { this.item = item; this.isData = isData; } // 设置next和item的值,用于进行并发更新, cas 无锁操作 boolean casNext(QNode cmp, QNode val) { return next == cmp && UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); } boolean casItem(Object cmp, Object val) { return item == cmp && UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val); } void tryCancel(Object cmp) { UNSAFE.compareAndSwapObject(this, itemOffset, cmp, this); } boolean isCancelled() { return item == this; } boolean isOffList() { return next == this; } // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE; private static final long itemOffset; private static final long nextOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> k = QNode.class; itemOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("item")); nextOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("next")); } catch (Exception e) { throw new Error(e); } } } 从 put() 方法和 take() 方法可以看出最终调用的都是 TransferQueue 的 transfer() 方法。 public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); if (transferer.transfer(e, false, 0) == null) { Thread.interrupted(); throw new InterruptedException(); } } public E take() throws InterruptedException { E e = transferer.transfer(null, false, 0); if (e != null) return e; Thread.interrupted(); throw new InterruptedException(); } //transfer方法用于提交数据或者是获取数据 E transfer(E e, boolean timed, long nanos) { QNode s = null; // constructed/reused as needed //如果e不为null,就说明是添加数据的入队操作 boolean isData = (e != null); for (;;) { QNode t = tail; QNode h = head; if (t == null || h == null) // saw uninitialized value continue; // spin //如果当前操作和 tail 节点的操作是一样的;或者头尾相同(表明队列中啥都没有)。 if (h == t || t.isData == isData) { // empty or same-mode QNode tn = t.next; // 如果 t 和 tail 不一样,说明,tail 被其他的线程改了,重来 if (t != tail) // inconsistent read continue; // 如果 tail 的 next 不是空。就需要将 next 追加到 tail 后面了 if (tn != null) { // lagging tail // 使用 CAS 将 tail.next 变成 tail, advanceTail(t, tn); continue; } // 时间到了,不等待,返回 null,插入失败,获取也是失败的 if (timed && nanos <= 0) // can't wait return null; if (s == null) s = new QNode(e, isData); if (!t.casNext(null, s)) // failed to link in continue; advanceTail(t, s); // swing tail and wait Object x = awaitFulfill(s, e, timed, nanos); if (x == s) { // wait was cancelled clean(t, s); return null; } if (!s.isOffList()) { // not already unlinked advanceHead(t, s); // unlink if head if (x != null) // and forget fields s.item = s; s.waiter = null; } return (x != null) ? (E)x : e; } else { // complementary-mode QNode m = h.next; // node to fulfill if (t != tail || m == null || h != head) continue; // inconsistent read Object x = m.item; if (isData == (x != null) || // m already fulfilled x == m || // m cancelled !m.casItem(x, e)) { // lost CAS advanceHead(h, m); // dequeue and retry continue; } advanceHead(h, m); // successfully fulfilled LockSupport.unpark(m.waiter); return (x != null) ? (E)x : e; } } } LinkedTransferQueue LinkedTransferQueue 是一个由链表结构组成的无界阻塞 TransferQueue 队列。 LinkedTransferQueue采用一种预占模式。意思就是消费者线程取元素时,如果队列不为空,则直接取走数据,若队列为空,那就生成一个节点(节点元素为null)入队,然后消费者线程被等待在这个节点上,后面生产者线程入队时发现有一个元素为null的节点,生产者线程就不入队了,直接就将元素填充到该节点,并唤醒该节点等待的线程,被唤醒的消费者线程取走元素,从调用的方法返回。我们称这种节点操作为“匹配”方式。 队列实现了 TransferQueue 接口重写了 tryTransfer 和 transfer 方法,这组方法和 SynchronousQueue 公平模式的队列类似,具有匹配的功能 LinkedBlockingDeque LinkedBlockingDeque 是一个由链表结构组成的双向阻塞队列。 所谓双向队列指的你可以从队列的两端插入和移出元素。双端队列因为多了一个操作队列的入口,在多线程同时入队时,也就减少了一半的竞争。相比其他的阻塞队列,LinkedBlockingDeque 多了 addFirst,addLast,offerFirst,offerLast,peekFirst,peekLast 等方法,以 First 单词结尾的方法,表示插入,获取(peek)或移除双端队列的第一个元素。以 Last 单词结尾的方法,表示插入,获取或移除双端队列的最后一个元素。另外插入方法 add 等同于 addLast,移除方法 remove 等效于 removeFirst。 在初始化 LinkedBlockingDeque 时可以设置容量防止其过渡膨胀,默认容量也是 Integer.MAX_VALUE。另外双向阻塞队列可以运用在“工作窃取”模式中。 阻塞队列使用场景 我们常用的生产者消费者模式就可以基于阻塞队列实现; 线程池中活跃线程数达到 corePoolSize 时,线程池将会将后续的 task 提交到 BlockingQueue 中; 生产者消费者模式 JDK API文档的 BlockingQueue 给出了一个典型的应用 面试题:一个初始值为 0 的变量,两个线程对齐交替操作,一个+1,一个-1,5 轮 public class ProdCounsume_TraditionDemo { public static void main(String[] args) { ShareData shareData = new ShareData(); new Thread(() -> { for (int i = 0; i <= 5; i++) { shareData.increment(); } }, "T1").start(); new Thread(() -> { for (int i = 0; i <= 5; i++) { shareData.decrement(); } }, "T1").start(); } } //线程操作资源类 class ShareData { private int num = 0; private Lock lock = new ReentrantLock(); private Condition condition = lock.newCondition(); public void increment() { lock.lock(); try { while (num != 0) { //等待,不能生产 condition.await(); } //干活 num++; System.out.println(Thread.currentThread().getName() + "\t" + num); //唤醒 condition.signal(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } public void decrement() { lock.lock(); try { while (num == 0) { //等待,不能生产 condition.await(); } //干活 num--; System.out.println(Thread.currentThread().getName() + "\t" + num); //唤醒 condition.signal(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } } 线程池 线程池的核心方法 ThreadPoolExecutor,用 BlockingQueue 存放任务的阻塞队列,被提交但尚未被执行的任务 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) 线程池在内部实际也是构建了一个生产者消费者模型,将线程和任务两者解耦,并不直接关联,从而良好的缓冲任务,复用线程。 不同的线程池实现用的是不同的阻塞队列,newFixedThreadPool 和 newSingleThreadExecutor 用的是LinkedBlockingQueue,newCachedThreadPool 用的是 SynchronousQueue。 文章持续更新,可以微信搜「 JavaKeeper 」第一时间阅读,无套路领取 500+ 本电子书和 30+ 视频教学和源码,本文 GitHub github.com/JavaKeeper 已经收录,Javaer 开发、面试必备技能兵器谱,有你想要的。 参考与感谢 https://www.liaoxuefeng.com/ SynchronousQueue源码 https://juejin.im/post/5ae754c7f265da0ba76f8534
什么是 BloomFilter 布隆过滤器(英语:Bloom Filter)是 1970 年由布隆提出的。它实际上是一个很长的二进制向量和一系列随机映射函数。主要用于判断一个元素是否在一个集合中。 通常我们会遇到很多要判断一个元素是否在某个集合中的业务场景,一般想到的是将集合中所有元素保存起来,然后通过比较确定。链表、树、散列表(又叫哈希表,Hash table)等等数据结构都是这种思路。但是随着集合中元素的增加,我们需要的存储空间也会呈现线性增长,最终达到瓶颈。同时检索速度也越来越慢,上述三种结构的检索时间复杂度分别为$O(n)$,$O(logn)$,$O(1)$。 这个时候,布隆过滤器(Bloom Filter)就应运而生。 布隆过滤器原理 了解布隆过滤器原理之前,先回顾下 Hash 函数原理。 哈希函数 哈希函数的概念是:将任意大小的输入数据转换成特定大小的输出数据的函数,转换后的数据称为哈希值或哈希编码,也叫散列值。下面是一幅示意图: 所有散列函数都有如下基本特性: 如果两个散列值是不相同的(根据同一函数),那么这两个散列值的原始输入也是不相同的。这个特性是散列函数具有确定性的结果,具有这种性质的散列函数称为单向散列函数。 散列函数的输入和输出不是唯一对应关系的,如果两个散列值相同,两个输入值很可能是相同的,但也可能不同,这种情况称为“散列碰撞(collision)”。 但是用 hash表存储大数据量时,空间效率还是很低,当只有一个 hash 函数时,还很容易发生哈希碰撞。 布隆过滤器数据结构 BloomFilter 是由一个固定大小的二进制向量或者位图(bitmap)和一系列映射函数组成的。 在初始状态时,对于长度为 m 的位数组,它的所有位都被置为0,如下图所示: 当有变量被加入集合时,通过 K 个映射函数将这个变量映射成位图中的 K 个点,把它们置为 1(假定有两个变量都通过 3 个映射函数)。 查询某个变量的时候我们只要看看这些点是不是都是 1 就可以大概率知道集合中有没有它了 如果这些点有任何一个 0,则被查询变量一定不在; 如果都是 1,则被查询变量很可能存在 为什么说是可能存在,而不是一定存在呢?那是因为映射函数本身就是散列函数,散列函数是会有碰撞的。 误判率 布隆过滤器的误判是指多个输入经过哈希之后在相同的bit位置1了,这样就无法判断究竟是哪个输入产生的,因此误判的根源在于相同的 bit 位被多次映射且置 1。 这种情况也造成了布隆过滤器的删除问题,因为布隆过滤器的每一个 bit 并不是独占的,很有可能多个元素共享了某一位。如果我们直接删除这一位的话,会影响其他的元素。(比如上图中的第 3 位) 特性 一个元素如果判断结果为存在的时候元素不一定存在,但是判断结果为不存在的时候则一定不存在。 布隆过滤器可以添加元素,但是不能删除元素。因为删掉元素会导致误判率增加。 添加与查询元素步骤 添加元素 将要添加的元素给 k 个哈希函数 得到对应于位数组上的 k 个位置 将这k个位置设为 1 查询元素 将要查询的元素给k个哈希函数 得到对应于位数组上的k个位置 如果k个位置有一个为 0,则肯定不在集合中 如果k个位置全部为 1,则可能在集合中 优点 相比于其它的数据结构,布隆过滤器在空间和时间方面都有巨大的优势。布隆过滤器存储空间和插入/查询时间都是常数 $O(K)$,另外,散列函数相互之间没有关系,方便由硬件并行实现。布隆过滤器不需要存储元素本身,在某些对保密要求非常严格的场合有优势。 布隆过滤器可以表示全集,其它任何数据结构都不能; 缺点 但是布隆过滤器的缺点和优点一样明显。误算率是其中之一。随着存入的元素数量增加,误算率随之增加。但是如果元素数量太少,则使用散列表足矣。 另外,一般情况下不能从布隆过滤器中删除元素。我们很容易想到把位数组变成整数数组,每插入一个元素相应的计数器加 1, 这样删除元素时将计数器减掉就可以了。然而要保证安全地删除元素并非如此简单。首先我们必须保证删除的元素的确在布隆过滤器里面。这一点单凭这个过滤器是无法保证的。另外计数器回绕也会造成问题。 在降低误算率方面,有不少工作,使得出现了很多布隆过滤器的变种。 布隆过滤器使用场景和实例 在程序的世界中,布隆过滤器是程序员的一把利器,利用它可以快速地解决项目中一些比较棘手的问题。 如网页 URL 去重、垃圾邮件识别、大集合中重复元素的判断和缓存穿透等问题。 布隆过滤器的典型应用有: 数据库防止穿库。 Google Bigtable,HBase 和 Cassandra 以及 Postgresql 使用BloomFilter来减少不存在的行或列的磁盘查找。避免代价高昂的磁盘查找会大大提高数据库查询操作的性能。 业务场景中判断用户是否阅读过某视频或文章,比如抖音或头条,当然会导致一定的误判,但不会让用户看到重复的内容。 缓存宕机、缓存击穿场景,一般判断用户是否在缓存中,如果在则直接返回结果,不在则查询db,如果来一波冷数据,会导致缓存大量击穿,造成雪崩效应,这时候可以用布隆过滤器当缓存的索引,只有在布隆过滤器中,才去查询缓存,如果没查询到,则穿透到db。如果不在布隆器中,则直接返回。 WEB拦截器,如果相同请求则拦截,防止重复被攻击。用户第一次请求,将请求参数放入布隆过滤器中,当第二次请求时,先判断请求参数是否被布隆过滤器命中。可以提高缓存命中率。Squid 网页代理缓存服务器在 cache digests 中就使用了布隆过滤器。Google Chrome浏览器使用了布隆过滤器加速安全浏览服务 Venti 文档存储系统也采用布隆过滤器来检测先前存储的数据。 SPIN 模型检测器也使用布隆过滤器在大规模验证问题时跟踪可达状态空间。 Coding~ 知道了布隆过滤去的原理和使用场景,我们可以自己实现一个简单的布隆过滤器 自定义的 BloomFilter public class MyBloomFilter { /** * 一个长度为10 亿的比特位 */ private static final int DEFAULT_SIZE = 256 << 22; /** * 为了降低错误率,使用加法hash算法,所以定义一个8个元素的质数数组 */ private static final int[] seeds = {3, 5, 7, 11, 13, 31, 37, 61}; /** * 相当于构建 8 个不同的hash算法 */ private static HashFunction[] functions = new HashFunction[seeds.length]; /** * 初始化布隆过滤器的 bitmap */ private static BitSet bitset = new BitSet(DEFAULT_SIZE); /** * 添加数据 * * @param value 需要加入的值 */ public static void add(String value) { if (value != null) { for (HashFunction f : functions) { //计算 hash 值并修改 bitmap 中相应位置为 true bitset.set(f.hash(value), true); } } } /** * 判断相应元素是否存在 * @param value 需要判断的元素 * @return 结果 */ public static boolean contains(String value) { if (value == null) { return false; } boolean ret = true; for (HashFunction f : functions) { ret = bitset.get(f.hash(value)); //一个 hash 函数返回 false 则跳出循环 if (!ret) { break; } } return ret; } /** * 模拟用户是不是会员,或用户在不在线。。。 */ public static void main(String[] args) { for (int i = 0; i < seeds.length; i++) { functions[i] = new HashFunction(DEFAULT_SIZE, seeds[i]); } // 添加1亿数据 for (int i = 0; i < 100000000; i++) { add(String.valueOf(i)); } String id = "123456789"; add(id); System.out.println(contains(id)); // true System.out.println("" + contains("234567890")); //false } } class HashFunction { private int size; private int seed; public HashFunction(int size, int seed) { this.size = size; this.seed = seed; } public int hash(String value) { int result = 0; int len = value.length(); for (int i = 0; i < len; i++) { result = seed * result + value.charAt(i); } int r = (size - 1) & result; return (size - 1) & result; } } What?我们写的这些早有大牛帮我们实现,还造轮子,真是浪费时间,No,No,No,我们学习过程中是可以造轮子的,造轮子本身就是我们自己对设计和实现的具体落地过程,不仅能提高我们的编程能力,在造轮子的过程中肯定会遇到很多我们没有思考过的问题,成长看的见~~ 实际项目使用的时候,领导和我说项目一定要稳定运行,没自信的我放弃了自己的轮子。 Guava 中的 BloomFilter <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>23.0</version> </dependency> public class GuavaBloomFilterDemo { public static void main(String[] args) { //后边两个参数:预计包含的数据量,和允许的误差值 BloomFilter<Integer> bloomFilter = BloomFilter.create(Funnels.integerFunnel(), 100000, 0.01); for (int i = 0; i < 100000; i++) { bloomFilter.put(i); } System.out.println(bloomFilter.mightContain(1)); System.out.println(bloomFilter.mightContain(2)); System.out.println(bloomFilter.mightContain(3)); System.out.println(bloomFilter.mightContain(100001)); //bloomFilter.writeTo(); } } 分布式环境中,布隆过滤器肯定还需要考虑是可以共享的资源,这时候我们会想到 Redis,是的,Redis 也实现了布隆过滤器。 当然我们也可以把布隆过滤器通过 bloomFilter.writeTo() 写入一个文件,放入OSS、S3这类对象存储中。 Redis 中的 BloomFilter Redis 提供的 bitMap 可以实现布隆过滤器,但是需要自己设计映射函数和一些细节,这和我们自定义没啥区别。 Redis 官方提供的布隆过滤器到了 Redis 4.0 提供了插件功能之后才正式登场。布隆过滤器作为一个插件加载到 Redis Server 中,给 Redis 提供了强大的布隆去重功能。 在已安装 Redis 的前提下,安装 RedisBloom,有两种方式 直接编译进行安装 git clone https://github.com/RedisBloom/RedisBloom.git cd RedisBloom make #编译 会生成一个rebloom.so文件 redis-server --loadmodule /path/to/rebloom.so #运行redis时加载布隆过滤器模块 redis-cli # 启动连接容器中的 redis 客户端验证 使用Docker进行安装 docker pull redislabs/rebloom:latest # 拉取镜像 docker run -p 6379:6379 --name redis-redisbloom redislabs/rebloom:latest #运行容器 docker exec -it redis-redisbloom bash redis-cli 使用 布隆过滤器基本指令: bf.add 添加元素到布隆过滤器 bf.exists 判断元素是否在布隆过滤器 bf.madd 添加多个元素到布隆过滤器,bf.add 只能添加一个 bf.mexists 判断多个元素是否在布隆过滤器 127.0.0.1:6379> bf.add user Tom (integer) 1 127.0.0.1:6379> bf.add user John (integer) 1 127.0.0.1:6379> bf.exists user Tom (integer) 1 127.0.0.1:6379> bf.exists user Linda (integer) 0 127.0.0.1:6379> bf.madd user Barry Jerry Mars 1) (integer) 1 2) (integer) 1 3) (integer) 1 127.0.0.1:6379> bf.mexists user Barry Linda 1) (integer) 1 2) (integer) 0 我们只有这几个参数,肯定不会有误判,当元素逐渐增多时,就会有一定的误判了,这里就不做这个实验了。 上面使用的布隆过滤器只是默认参数的布隆过滤器,它在我们第一次 add 的时候自动创建。 Redis 还提供了自定义参数的布隆过滤器,bf.reserve 过滤器名 error_rate initial_size error_rate:允许布隆过滤器的错误率,这个值越低过滤器的位数组的大小越大,占用空间也就越大 initial_size:布隆过滤器可以储存的元素个数,当实际存储的元素个数超过这个值之后,过滤器的准确率会下降 但是这个操作需要在 add 之前显式创建。如果对应的 key 已经存在,bf.reserve 会报错 127.0.0.1:6379> bf.reserve user 0.01 100 (error) ERR item exists 127.0.0.1:6379> bf.reserve topic 0.01 1000 OK 我是一名 Javaer,肯定还要用 Java 来实现的,Java 的 Redis 客户端比较多,有些还没有提供指令扩展机制,笔者已知的 Redisson 和 lettuce 是可以使用布隆过滤器的,我们这里用 Redisson public class RedissonBloomFilterDemo { public static void main(String[] args) { Config config = new Config(); config.useSingleServer().setAddress("redis://127.0.0.1:6379"); RedissonClient redisson = Redisson.create(config); RBloomFilter<String> bloomFilter = redisson.getBloomFilter("user"); // 初始化布隆过滤器,预计统计元素数量为55000000,期望误差率为0.03 bloomFilter.tryInit(55000000L, 0.03); bloomFilter.add("Tom"); bloomFilter.add("Jack"); System.out.println(bloomFilter.count()); //2 System.out.println(bloomFilter.contains("Tom")); //true System.out.println(bloomFilter.contains("Linda")); //false } } 扩展 为了解决布隆过滤器不能删除元素的问题,布谷鸟过滤器横空出世。论文《Cuckoo Filter:Better Than Bloom》作者将布谷鸟过滤器和布隆过滤器进行了深入的对比。相比布谷鸟过滤器而言布隆过滤器有以下不足:查询性能弱、空间利用效率低、不支持反向操作(删除)以及不支持计数。 由于使用较少,暂不深入。 文章持续更新,可以微信搜「 JavaKeeper 」第一时间阅读,无套路领取 500+ 本电子书和 30+ 视频教学和源码,本文 GitHub github.com/JavaKeeper 已经收录,Javaer 开发、面试必备技能兵器谱,有你想要的。 参考与感谢 https://www.cs.cmu.edu/~dga/papers/cuckoo-conext2014.pdf http://www.justdojava.com/2019/10/22/bloomfilter/ https://www.cnblogs.com/cpselvis/p/6265825.html https://juejin.im/post/5cc5aa7ce51d456e431adac5
前言 面试常常被要求「熟悉分布式技术」,当年搞 “XXX管理系统” 的时候,我都不知道分布式系统是个啥。分布式系统是一个硬件或软件组件分布在不同的网络计算机中上,彼此之间仅仅通过消息传递进行通信和协调的系统。 计算机系统从集中式到分布式的变革伴随着包括分布式网络、分布式事务、分布式数据一致性等在内的一系列问题和挑战,同时也催生了一大批诸如ACID、CAP和 BASE 等经典理论的快速发展。 为了解决分布式一致性问题,涌现出了一大批经典的一致性协议和算法,最为著名的就是二阶段提交协议(2PC),三阶段提交协议(3PC)和Paxos算法。Zookeeper的一致性是通过基于 Paxos 算法的 ZAB 协议完成的。一致性协议之前的文章也有介绍:「走进分布式一致性协议」从2PC、3PC、Paxos 到 ZAB,这里就不再说了。 1. 概述 1.1 定义 ZooKeeper 官网是这么介绍的:”Apache ZooKeeper 致力于开发和维护一个支持高度可靠的分布式协调的开源服务器“ 1.2 ZooKeeper是个啥 ZooKeeper 是 Apache 软件基金会的一个软件项目,它为大型「分布式计算」提供开源的分布式配置服务、同步服务和命名注册。 Zookeeper 最早起源于雅虎研究院的一个研究小组。在当时,研究人员发现,在雅虎内部很多大型系统基本都需要依赖一个类似的系统来进行分布式协调,但是这些系统往往都存在分布式单点问题。所以,雅虎的开发人员就试图开发一个通用的无单点问题的分布式协调框架,以便让开发人员将精力集中在处理业务逻辑上,Zookeeper 就这样诞生了。后来捐赠给了 Apache ,现已成为 Apache 顶级项目。 关于“ZooKeeper”这个项目的名字,其实也有一段趣闻。在立项初期,考虑到之前内部很多项目都是使用动物的名字来命名的(例如著名的Pig项目),雅虎的工程师希望给这个项目也取一个动物的名字。时任研究院的首席科学家 RaghuRamakrishnan 开玩笑地说:“再这样下去,我们这儿就变成动物园了!”此话一出,大家纷纷表示就叫动物园管理员吧一一一因为各个以动物命名的分布式组件放在一起,雅虎的整个分布式系统看上去就像一个大型的动物园了,而 Zookeeper 正好要用来进行分布式环境的协调一一于是,Zookeeper 的名字也就由此诞生了。 ZooKeeper 是用于维护配置信息,命名,提供分布式同步和提供组服务的集中式服务。所有这些类型的服务都以某种形式被分布式应用程序使用。每次实施它们时,都会进行很多工作来修复不可避免的 bug 和竞争条件。由于难以实现这类服务,因此应用程序最初通常会跳过它们,这会使它们在存在更改的情况下变得脆弱并且难以管理。即使部署正确,这些服务的不同实现也会导致管理复杂。 ZooKeeper 的目标是将这些不同服务的精华提炼为一个非常简单的接口,用于集中协调服务。服务本身是分布式的,并且高度可靠。服务将实现共识,组管理和状态协议,因此应用程序不需要自己实现它们。 1.3 ZooKeeper工作机制 ZooKeeper 从设计模式角度来理解:就是一个基于观察者模式设计的分布式服务管理框架,它负责存储和管理大家都关心的数据,然后接受观察者的注册,一旦这些数据的状态发生变化,ZK 就将负责通知已经在 ZK 上注册的那些观察者做出相应的反应,从而实现集群中类似 Master/Slave 管理模式。 1.4 特性 ZooKeeper:一个领导者(leader),多个跟随者(follower)组成的集群。 Leader 负责进行投票的发起和决议,更新系统状态。 Follower 用于接收客户请求并向客户端返回结果,在选举 Leader 过程中参与投票。 集群中只要有半数以上节点存活,Zookeeper 集群就能正常服务。 全局数据一致(单一视图):每个 Server 保存一份相同的数据副本,Client 无论连接到哪个 Server,数据都是一致的。 顺序一致性: 从同一客户端发起的事务请求,最终将会严格地按照顺序被应用到 ZooKeeper 中去。 原子性: 所有事务请求的处理结果在整个集群中所有机器上的应用情况是一致的,也就是说,要么整个集群中所有的机器都成功应用了某一个事务,要么都没有应用。 实时性,在一定时间范围内,client 能读到最新数据。 可靠性: 一旦一次更改请求被应用,更改的结果就会被持久化,直到被下一次更改覆盖。 1.5 设计目标 简单的数据结构 :Zookeeper 使得分布式程序能够通过一个共享的树形结构的名字空间来进行相互协调,即Zookeeper 服务器内存中的数据模型由一系列被称为ZNode的数据节点组成,Zookeeper 将全量的数据存储在内存中,以此来提高服务器吞吐、减少延迟的目的。 可以构建集群 : Zookeeper 集群通常由一组机器构成,组成 Zookeeper 集群的每台机器都会在内存中维护当前服务器状态,并且每台机器之间都相互通信。 顺序访问 : 对于来自客户端的每个更新请求,Zookeeper 都会分配一个全局唯一的递增编号,这个编号反映了所有事务操作的先后顺序。 高性能 :Zookeeper 和 Redis 一样全量数据存储在内存中,100% 读请求压测 QPS 12-13W 1.6 数据结构 Zookeeper 数据模型的结构与 Unix 文件系统的结构相似,整体上可以看做是一棵树,每个节点称作一个 「ZNode」。每个 ZNode 默认能存储 1MB 的数据,每个 ZNode 都可以通过其路径唯一标识。 1.7 应用场景 ZooKeeper 是一个典型的分布式数据一致性解决方案,分布式应用程序可以基于 ZooKeeper 实现诸如数据发布/订阅、负载均衡、命名服务、分布式协调/通知、集群管理、Master 选举、分布式锁和分布式队列等功能 统一命名服务 在分布式系统中,通过使用命名服务,客户端应用能够根据指定名字来获取资源或服务的地址,提供者等信息。被命名的实体通常可以是集群中的机器,提供的服务地址,进程对象等等——这些我们都可以统称他们为名字(Name)。其中较为常见的就是一些分布式服务框架(如RPC、RMI)中的服务地址列表。通过调用 Zookeeper 提供的创建节点的 API,能够很容易创建一个全局唯一的 path,这个 path 就可以作为一个名称。 阿里巴巴开源的分布式服务框架 Dubbo 就使用 ZooKeeper 来作为其命名服务,维护全局的服务地址列表。 数据发布与订阅(配置中心) 发布与订阅模型,即所谓的配置中心,顾名思义就是发布者将数据发布到 ZooKeeper 节点上,供订阅者动态获取数据,实现配置信息的集中式管理和动态更新。例如全局的配置信息,服务式服务框架的服务地址列表等就非常适合使用。 分布式环境下,配置文件管理和同步是一个常见问题 一个集群中,所有节点的配置信息是一致的,比如 Hadoop 集群、集群中的数据库配置信息等全局配置 对配置文件修改后,希望能够快速同步到各个节点上。 配置管理可交由 ZooKeeper 实现 可将配置信息写入 ZooKeeper 上的一个 Znode 各个节点监听这个 Znode 一旦 Znode 中的数据被修改,ZooKeeper 将通知各个节点 统一集群管理 所谓集群管理无在乎两点:是否有机器退出和加入、选举 Master。 管理节点 分布式环境中,实时掌握每个节点的状态是必要的,比如我们要知道集群中各机器状态、收集各个机器的运行时状态数据、服务器动态上下线等。 交由 ZooKeeper 实现的方式 可将节点信息写入 ZooKeeper 上的一个 Znode 监听这个 Znode 可获取它的实时状态变化 典型应用:HBase 中 Master 状态监控和选举。 Master选举 在分布式环境中,相同的业务应用分布在不同的机器上,有些业务逻辑(例如一些耗时的计算,网络I/O处理),往往只需要让整个集群中的某一台机器进行执行,其余机器可以共享这个结果,这样可以大大减少重复劳动,提高性能,于是这个master选举便是这种场景下的碰到的主要问题。 利用 Zookeeper 的强一致性,能够很好的保证在分布式高并发情况下节点的创建一定是全局唯一的,即:同时有多个客户端请求创建 /currentMaster 节点,最终一定只有一个客户端请求能够创建成功。Zookeeper 通过这种节点唯一的特性,可以创建一个 Master 节点,其他客户端 Watcher 监控当前 Master 是否存活,一旦 Master 挂了,其他机器再创建这样的一个 Master 节点,用来重新选举。 软负载均衡 分布式系统中,负载均衡是一种很普遍的技术,为了保证高可用性,通常同一个应用或同一个服务的提供方都会部署多份,达到对等服务。可以是硬件的负载均衡,如 F5,也可以是软件的负载,我们熟知的 Nginx,或者这里介绍的 Zookeeper。 分布式协调/通知 Zookeeper 中特有的 「Watcher」 注册与异步通知机制,能够很好的实现分布式环境下不同机器,甚至不同系统之间的协调和通知,从而实现对数据变更的实时处理。 使用方法通常是不同系统都对 ZK 上同一个 znode 进行注册,监听 znode 的变化(包括 znode 本身内容及子节点的),其中一个系统 update 了 znode,那么另一个系统能够收到通知,并作出相应处理。 心跳检测中可以让检测系统和被检测系统之间并不直接关联起来,而是通过 ZK 上某个节点关联,减少系统耦合; 系统调度模式中,假设某系统有控制台和推送系统两部分组成,控制台的职责是控制推送系统进行相应的推送工作。管理人员在控制台作的一些操作,实际上是修改了 ZK 上某些节点的状态,而 ZK 就把这些变化通知给他们注册 Watcher 的客户端,即推送系统,于是,作出相应的推送任务。 分布式锁 分布式锁,这个主要得益于 ZooKeeper 为我们保证了数据的强一致性。 锁服务可以分为两类,一个是保持独占,另一个是控制时序。 所谓保持独占,就是所有试图来获取这个锁的客户端,最终只有一个可以成功获得这把锁。通常的做法是把 zk 上的一个 znode 看作是一把锁,通过 create znode 的方式来实现。所有客户端都去创建 /distribute_lock 节点,最终成功创建的那个客户端也即拥有了这把锁。 控制时序,就是所有试图来获取这个锁的客户端,最终都是会被安排执行,只是有个全局时序了。做法和上面基本类似,只是这里 /distribute_lock 已预先存在,客户端在它下面创建临时有序节点(这个可以通过节点的属性控制:CreateMode.EPHEMERAL_SEQUENTIAL来指定)。ZK 的父节点(/distribute_lock)维持一份 sequence,保证子节点创建的时序性,从而也形成了每个客户端的全局时序。 个人感觉还是用 Redis 实现分布式锁更加方便。 PS:阿里中间件团队:“其实,ZK 并非天生就是为这些应用场景设计的,都是后来众多开发者根据其框架的特性,利用其提供的一系列API接口(或者称为原语集),摸索出来的典型使用方法。” 2. Hello ZooKeeper ZooKeeper 的三种部署方式: 单机模式,即部署在单台机器上的一个 ZK 服务,适用于学习、了解 ZK 基础功能 伪分布模式,即部署在一台机器上的多个(原则上大于3个)ZK 服务,伪集群,适用于学习、开发和测试 全分布式模式(复制模式),即在多台机器上部署服务,真正的集群模式,生产环境中使用 计划写三篇的,第二篇会实战 coding,运用各种 API,到时候再装集群,本节先来个单机玩~~ 2.1 本地模式安装部署 2.1.1 安装前准备 安装 Jdk 拷贝或下载 Zookeeper 安装包到 Linux 系统下(这里有个小问题,如果你下载 ZK 版本是3.5+ 的话,要下载 bin.tar.gz,愚笨的我最先没看到官网说明,一顿操作各种报错找不到 Main 方法) 解压到指定目录 tar -zxvf apache-zookeeper-3.5.7-bin.tar.gz 2.1.2 配置修改 将 zookeeper-3.5.7/conf 这个路径下的 zoo_sample.cfg 修改为 zoo.cfg ; mv zoo_sample.cfg zoo.cfg 打开 zoo.cfg 文件,修改 dataDir 路径: dataDir=XXX/zookeeper-3.5.7/zkData 2.1.3 操作 Zookeeper 启动 Zookeeper: bin/zkServer.sh start /usr/local/bin/java ZooKeeper JMX enabled by default Using config: /home/sync360/test/apache-zookeeper-3.5.7-bin/bin/../conf/zoo.cfg Starting zookeeper ... STARTED 查看进程是否启动: jps 4020 Jps 4001 QuorumPeerMain 查看状态:bin/zkServer.sh status /usr/local/bin/java ZooKeeper JMX enabled by default Using config: /home/apache-zookeeper-3.5.7-bin/bin/../conf/zoo.cfg Client port found: 2181. Client address: localhost. Mode: standalone 启动客户端:bin/zkCli.sh Connecting to localhost:2181 2020-03-25 15:41:19,112 [myid:] - INFO [main:Environment@109] - Client environment:zookeeper.version=3.5.7-f0fdd52973d373ffd9c86b81d99842dc2c7f660e, built on 02/10/2020 11:30 GMT ... 2020-03-25 15:41:19,183 [myid:] - INFO [main:ClientCnxn@1653] - zookeeper.request.timeout value is 0. feature enabled= Welcome to ZooKeeper! ... WATCHER:: WatchedEvent state:SyncConnected type:None path:null 退出客户端:quit 停止 Zookeeper: bin/zkServer.sh stop 2.2 常用命令 命令基本语法 功能描述 help 显示所有操作命令 ls path [watch] 使用 ls 命令来查看当前znode中所包含的内容 ls2 path [watch] 查看当前节点数据并能看到更新次数等数据 create 普通创建-s 含有序列-e 临时(重启或者超时消失) get path [watch] 获得节点的值 set 设置节点的具体值 stat 查看节点状态 delete 删除节点 rmr 递归删除节点 ls 查看当前 zk 中所包含的内容 [zk: localhost:2181(CONNECTED) 1] ls / [lazyegg, zookeeper] create 创建一个新的 znode [zk: localhost:2181(CONNECTED) 2] create /test Created /test get 查看新的 znode 的值 [zk: localhost:2181(CONNECTED) 4] get /test null 可以看到值为 null,我们刚才设置了一个没有值的节点,也可以通过 create /zoo dog 直接创建有内容的节点 set 对 zk 所关联的字符串进行设置 set /test hello delete 删除节点 delete /test 2.3 配置参数解读 在 Zookeeper 的设计中,如果是集群模式,那所有机器上的 zoo.cfg 文件内容应该都是一致的。 Zookeeper 中的配置文件 zoo.cfg 中参数含义解读如下: tickTime =2000:通信心跳数 Zookeeper 使用的基本时间,服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是每个 tickTime时间就会发送一个心跳,时间单位为毫秒 它用于心跳机制,并且设置最小的 session 超时时间为两倍心跳时间。(session的最小超时时间是2*tickTime); initLimit =10:主从初始通信时限,集群中的 Follower 跟随者服务器与 Leader 领导者服务器之间初始连接时能容忍的最多心跳数(tickTime的数量),用它来限定集群中的 ZK 服务器连接到 Leader 的时限; syncLimit =5:主从同步通信时限,集群中 Leader 与 Follower 之间的最大响应时间单位,假如响应超过syncLimit * tickTime,Leader 认为 Follwer 死掉,从服务器列表中删除 Follwer; dataDir:数据文件目录+数据持久化路径; clientPort =2181:客户端连接端口 3. 你要知道的概念 ZooKeeper 本身就是一个分布式程序(只要半数以上节点存活,ZooKeeper 就能正常服务)。 为了保证高可用,最好是以集群形态来部署 ZooKeeper,这样只要集群中大部分机器是可用的(能够容忍一定的机器故障),那么 ZooKeeper 本身仍然是可用的。 ZooKeeper 将数据保存在内存中,这也就保证了高吞吐量和低延迟(但是内存限制了能够存储的容量不太大,此限制也是保持 znode 中存储的数据量较小的进一步原因)。 ZooKeeper 是高性能的。 在“读”多于“写”的应用程序中尤其的高性能,因为“写”会导致所有的服务器间同步状态。(“读”多于“写”是协调服务的典型场景。) ZooKeeper 底层其实只提供了两个功能: 管理(存储、读取)用户程序提交的数据 为用户程序提交数据节点监听服务 这里引入一个简单的例子,逐个介绍一些 ZK 中的概念。 在分布式系统中经常会遇到这种情况,多个应用读取同一个配置。例如:Client1,Client2 两个应用都会读取配置 B 中的内容,一旦 B 中的内容出现变化,就会通知 Client1 和 Client2。 一般的做法是在 Client1,Client2 中按照时钟频率询问 B 的变化,或者使用观察者模式来监听 B 的变化,发现变化以后再更新两个客户端。那么 ZooKeeper 如何协调这种场景? 这两个客户端连接到 ZooKeeper 的服务器,并获取其中存放的 B。保存 B 值的地方在 ZooKeeper 服务端中就称为 ZNode。 3.1 数据节点(Znode) 在谈到分布式的时候,我们通常说的“节点"是指组成集群的每一台机器。然而,在 Zookeeper 中,“节点"分为两类,第一类同样是指构成集群的机器,我们称之为「机器节点」;第二类则是指数据模型中的数据单元,我们称之为「数据节点」一一ZNode。上图中的 A、B 就是一个数据结点。 Zookeeper 将所有数据存储在内存中,数据模型是一棵树(Znode Tree),由斜杠(/)进行分割的路径,就是一个 Znode,例如 /Configuration/B。每个 Znode 上都会保存自己的数据内容,同时还会保存一系列属性信息。 在 Zookeeper 中,Znode 可以分为持久节点和临时节点两类。 所谓持久节点是指一旦这个 ZNode 被创建了,除非主动进行 ZNode 的移除操作,否则这个 ZNode 将一直保存在 Zookeeper 上。 而临时节点就不一样了,它的生命周期和客户端会话绑定,一旦客户端会话失效,那么这个客户端创建的所有临时节点都会被移除。 另外,ZooKeeper 还允许用户为每个节点添加一个特殊的属性:SEQUENTIAL。也被叫做 顺序结点,一旦节点被标记上这个属性,那么在这个节点被创建的时候,Zookeeper 会自动在其节点名后面追加上一个整型数字,这个整型数字是一个由父节点维护的自增数字。 3.2 事件监听器(Watcher) 上面说了 ZooKeeper 用来存放数据的 ZNode,并且把 B 的值存储在里面。如果 B 被更新了,两个客户端(Client1、Client2)如何获得通知呢? Zookeeper 允许用户在指定节点上注册一些 Watcher,当 Znode 发生变化时,将触发并删除一个 watch。当 watch 被触发时客户端会收到一个数据包,指示 znode 已经被修改。如果客户端和 ZooKeeper 服务器之间的连接中断,客户端将收到本地通知。该机制是 Zookeeper 实现分布式协调服务的重要特性。 3.6.0中的新增功能:客户端还可以在 znode 上设置永久性的递归监视,这些监视在触发时不会删除,并且会以递归方式触发已注册 znode 以及所有子 znode 的更改。 ZooKeeper 客户端(Client)会在指定的节点(/Configuration/B)上注册一个 Watcher,ZNode 上的 B 被更新的时候,服务端就会通知 Client1 和 Client2。 3.3 版本 有了 Watcher 机制,就可以实现分布式协调/通知了,假设有这样的场景,两个客户端同时对 B 进行写入操作,这两个客户端就会存在竞争关系,通常需要对 B 进行加锁操作,ZK 通过 version 版本号来控制实现乐观锁中的“写入校验”机制。 Zookeeper 的每个 ZNode 上都会存储数据,对应于每个 ZNode,Zookeeper 都会为其维护一个叫作 Stat 的数据结构,Stat 中记录了这个 ZNode 的三个数据版本,分别是 version(当前ZNode的版本)、cversion(当前ZNode 子节点的版本)和 aversion(当前ZNode的ACL版本)。 znode 里都有些啥呢? 3.4 Stat 结构体 Znodes 维护了一个 stat 结构,其中包含数据更改、ACL更改的版本号、时间戳等。 状态属性 说明 czxid 创建节点的事务zxid。 每次修改 ZK 状态都会收到一个zxid形式的时间戳,也就是 ZK 事务ID。事务ID是 ZK 中所有修改总的次序。每个修改都有唯一的zxid,如果zxid1小于zxid2,那么zxid1在zxid2之前发生 ctime znode被创建的毫秒数(从1970年开始) mzxid znode最后更新的事务zxid mtime znode最后修改的毫秒数(从1970年开始) pzxid znode最后更新的子节点zxid version 数据节点版本号 cversion 子节点版本号,znode子节点修改次数 aversion znode访问控制列表的变化号 ephemeralOwner 如果是临时节点,这个是znode拥有者的session id。如果不是临时节点则是0 dataLength znode的数据长度 numChildren znode子节点数量 3.5 会话(Session) Session 指的是 ZooKeeper 服务器与客户端会话。 在 ZooKeeper 中,一个客户端连接是指客户端和服务器之间的一个 TCP 长连接。客户端启动的时候,首先会与服务器建立一个 TCP 连接,从第一次连接建立开始,客户端会话的生命周期也开始了。通过这个连接,客户端能够通过心跳检测与服务器保持有效的会话,也能够向 Zookeeper 服务器发送请求并接受响应,同时还能够通过该连接接收来自服务器的 Watch 事件通知。 Session 作为会话实体,用来代表客户端会话,其包括 4 个属性: SessionID,用来全局唯一识别会话; TimeOut,会话超时事件。客户端在创造 Session 实例的时候,会设置一个会话超时的时间。当由于服务器压力太大、网络故障或是客户端主动断开连接等各种原因导致客户端连接断开时,只要在 sessionTimeout 规定的时间内能够重新连接上集群中任意一台服务器,那么之前创建的会话仍然有效; TickTime,下次会话超时时间点; isClosing,当服务端如果检测到会话超时失效了,会通过设置这个属性将会话关闭。 3.6 ACL Zookeeper 采用 ACL(Access Control Lists)策略来进行权限控制,类似于 UNIX 文件系统的权限控制。Zookeeper 定义了如下 5 种权限: CREATE: 创建子节点的权限 READ: 获取节点数据和子节点列表的权限 WRITE: 更新节点数据的权限 DELETE: 删除子节点的权限 ADMIN: 设置节点ACL的权限 其中尤其需要注意的是,CREATE 和 DELETE 这两种权限都是针对子节点的权限控制。 3.7 集群角色 最典型集群模式:Master/Slave 模式(主备模式)。在这种模式中,通常 Master 服务器作为主服务器提供写服务,其他的 Slave 从服务器通过异步复制的方式获取 Master 服务器最新的数据提供读服务。 但是,在 ZooKeeper 中没有选择传统的 Master/Slave 概念,而是引入了Leader、Follower 和 Observer 三种角色。 Leader: 为客户端提供读和写的服务,负责投票的发起和决议,更新系统状态 Follower: 为客户端提供读服务,如果是写服务则转发给 Leader。在选举过程中参与投票 Observer: 为客户端提供读服务器,如果是写服务则转发给 Leader。不参与选举过程中的投票,也不参与“过半写成功”策略。在不影响写性能的情况下提升集群的读性能。此角色是在 zookeeper3.3 系列新增的角色。 server 状态 LOOKING:寻找Leader状态 LEADING:领导者状态,表明当前服务器角色是 Leader FOLLOWING:跟随者状态,表明当前服务器角色是 Follower OBSERVING:观察者状态,表明当前服务器角色是 Observer 选举机制 服务器1启动,此时只有它一台服务器启动了,它发出去的报文没有任何响应,所以它的选举状态一直是LOOKING 状态。 服务器2启动,它与最开始启动的服务器1进行通信,互相交换自己的选举结果,由于两者都没有历史数据,所以 id 值较大的服务器2胜出,但是由于没有达到超过半数以上的服务器都同意选举它(这个例子中的半数以上是3),所以服务器1、2还是继续保持 LOOKING 状态。 服务器3启动,根据前面的理论分析,服务器3成为服务器1、2、3中的老大,而与上面不同的是,此时有三台服务器选举了它,所以它成为了这次选举的Leader。 服务器4启动,根据前面的分析,理论上服务器4应该是服务器1、2、3、4中最大的,但是由于前面已经有半数以上的服务器选举了服务器3,所以它只能接受当小弟的命了。 服务器5启动,同4一样当小弟。 Watcher 监听器 Zookeeper 中最有特色且最不容易理解的是监视(Watches)。 Zookeeper 所有的读操作——getData(),getChildren(), 和 exists() 都可以设置监视(watch),监视事件可以理解为一次性的触发器, 官方定义如下: a watch event is one-time trigger, sent to the client that set the watch, which occurs when the data for which the watch was set changes。对此需要作出如下理解: One-time trigger(一次性触发) 当设置监视的数据发生改变时,该监视事件会被发送到客户端,例如,如果客户端调用了 getData("/znode1", true) 并且稍后 /znode1 节点上的数据发生了改变或者被删除了,客户端将会获取到 /znode1 发生变化的监视事件,而如果 /znode1 再一次发生了变化,除非客户端再次对 /znode1 设置监视,否则客户端不会收到事件通知。(3.6之后可以设置永久监视) Sent to the client(发送至客户端) Zookeeper 客户端和服务端是通过 socket 进行通信的,由于网络存在故障,所以监视事件很有可能不会成功到达客户端,监视事件是异步发送至监视者的,Zookeeper 本身提供了保序性(ordering guarantee):即客户端只有首先看到了监视事件后,才会感知到它所设置监视的 znode 发生了变化(a client will never see a change for which it has set a watch until it first sees the watch event)。 网络延迟或者其他因素可能导致不同的客户端在不同的时刻感知某一监视事件,但是不同的客户端所看到的一切具有一致的顺序。 The data for which the watch was set(被设置 watch 的数据) 这意味着 znode 节点本身具有不同的改变方式。你也可以想象 Zookeeper 维护了两条监视链表:数据监视和子节点监视(data watches and child watches), getData() 和 exists() 设置数据监视,getChildren() 设置子节点监视。 或者,你也可以想象 Zookeeper 设置的不同监视返回不同的数据,getData() 和 exists() 返回 znode 节点的相关信息,而 getChildren() 返回子节点列表。因此, setData() 会触发设置在某一节点上所设置的数据监视(假定数据设置成功),而一次成功的 create() 操作则会触发当前节点上所设置的数据监视以及父节点的子节点监视。一次成功的 delete() 操作将会触发当前节点的数据监视和子节点监视事件,同时也会触发该节点父节点的 child watch。 Zookeeper 中的监视是轻量级的,因此容易设置、维护和分发。当客户端与 Zookeeper 服务器端失去联系时,客户端并不会收到监视事件的通知,只有当客户端重新连接后,若在必要的情况下,以前注册的监视会重新被注册并触发,对于开发人员来说这通常是透明的。只有一种情况会导致监视事件的丢失,即:通过 exists() 设置了某个 znode 节点的监视,但是如果某个客户端在此 znode 节点被创建和删除的时间间隔内与 zookeeper 服务器失去了联系,该客户端即使稍后重新连接 zookeepe r服务器后也得不到事件通知。 从上图可以看到,Watcher 机制包括三个角色:客户端线程、客户端的 WatchManager 以及 ZooKeeper 服务器。Watcher 机制就是这三个角色之间的交互,整个过程分为注册、存储和通知三个步骤: 客户端向 ZooKeeper 服务器注册一个 Watcher 监听; 把这个监听信息存储到客户端的 WatchManager 中; 当 ZooKeeper 中的节点发生变化时,会通知客户端,客户端会调用相应 Watcher 对象中的回调方法。 文章持续更新,可以微信搜「 JavaKeeper 」第一时间阅读,无套路领取 500+ 本电子书和 30+ 视频教学和源码,本文 GitHub github.com/JavaKeeper 已经收录,Javaer 开发、面试必备技能兵器谱,有你想要的。 参考: 《从Paxos到ZooKeeper 分布式一致性原理与实践》 《阿里中间件团队博客》http://jm.taobao.org/2011/10/08/1232/ 《Zookeeper官方文档》https://zookeeper.apache.org/doc/ 《尚硅谷Zookeeper》 https://cloud.tencent.com/developer/article/1578401
文章已收录在 GitHub JavaKeeper ,N 线互联网开发、面试必备技能兵器谱,笔记自取。 微信搜「 JavaKeeper 」程序员成长充电站,互联网技术武道场。无套路领取 500+ 本电子书和 30+ 视频教学和源码。 前言 循环依赖问题,算是一道烂大街的面试题了,解毒之前,我们先来回顾两个知识点: 初学 Spring 的时候,我们就知道 IOC,控制反转么,它将原本在程序中手动创建对象的控制权,交由 Spring 框架来管理,不需要我们手动去各种 new XXX。 尽管是 Spring 管理,不也得创建对象吗, Java 对象的创建步骤很多,可以 new XXX、序列化、clone() 等等, 只是 Spring 是通过反射 + 工厂的方式创建对象并放在容器的,创建好的对象我们一般还会对对象属性进行赋值,才去使用,可以理解是分了两个步骤。 好了,对这两个步骤有个印象就行,接着我们进入循环依赖,先说下循环依赖的概念 什么是循环依赖 所谓的循环依赖是指,A 依赖 B,B 又依赖 A,它们之间形成了循环依赖。或者是 A 依赖 B,B 依赖 C,C 又依赖 A,形成了循环依赖。更或者是自己依赖自己。它们之间的依赖关系如下: 这里以两个类直接相互依赖为例,他们的实现代码可能如下: public class BeanB { private BeanA beanA; public void setBeanA(BeanA beanA) { this.beanA = beanA; } } public class BeanA { private BeanB beanB; public void setBeanB(BeanB beanB) { this.beanB = beanB; } } 配置信息如下(用注解方式注入同理,只是为了方便理解,用了配置文件): <bean id="beanA" class="priv.starfish.BeanA"> <property name="beanB" ref="beanB"/> </bean> <bean id="beanB" class="priv.starfish.BeanB"> <property name="beanA" ref="beanA"/> </bean> Spring 启动后,读取如上的配置文件,会按顺序先实例化 A,但是创建的时候又发现它依赖了 B,接着就去实例化 B ,同样又发现它依赖了 A ,这尼玛咋整?无限循环呀 Spring “肯定”不会让这种事情发生的,如前言我们说的 Spring 实例化对象分两步,第一步会先创建一个原始对象,只是没有设置属性,可以理解为"半成品"—— 官方叫 A 对象的早期引用(EarlyBeanReference),所以当实例化 B 的时候发现依赖了 A, B 就会把这个“半成品”设置进去先完成实例化,既然 B 完成了实例化,所以 A 就可以获得 B 的引用,也完成实例化了,这其实就是 Spring 解决循环依赖的思想。 不理解没关系,先有个大概的印象,然后我们从源码来看下 Spring 具体是怎么解决的。 源码解毒 代码版本:5.0.16.RELEASE 在 Spring IOC 容器读取 Bean 配置创建 Bean 实例之前, 必须对它进行实例化。只有在容器实例化后,才可以从 IOC 容器里获取 Bean 实例并使用,循环依赖问题也就是发生在实例化 Bean 的过程中的,所以我们先回顾下获取 Bean 的过程。 获取 Bean 流程 Spring IOC 容器中获取 bean 实例的简化版流程如下(排除了各种包装和检查的过程) 大概的流程顺序(可以结合着源码看下,我就不贴了,贴太多的话,呕~呕呕,想吐): 流程从 getBean 方法开始,getBean 是个空壳方法,所有逻辑直接到 doGetBean 方法中 transformedBeanName 将 name 转换为真正的 beanName(name 可能是 FactoryBean 以 & 字符开头或者有别名的情况,所以需要转化下) 然后通过 getSingleton(beanName) 方法尝试从缓存中查找是不是有该实例 sharedInstance(单例在 Spring 的同一容器只会被创建一次,后续再获取 bean,就直接从缓存获取即可) 如果有的话,sharedInstance 可能是完全实例化好的 bean,也可能是一个原始的 bean,所以再经 getObjectForBeanInstance 处理即可返回 当然 sharedInstance 也可能是 null,这时候就会执行创建 bean 的逻辑,将结果返回 第三步的时候我们提到了一个缓存的概念,这个就是 Spring 为了解决单例的循环依赖问题而设计的 三级缓存 /** Cache of singleton objects: bean name --> bean instance */ private final Map<String, Object> singletonObjects = new ConcurrentHashMap<>(256); /** Cache of singleton factories: bean name --> ObjectFactory */ private final Map<String, ObjectFactory<?>> singletonFactories = new HashMap<>(16); /** Cache of early singleton objects: bean name --> bean instance */ private final Map<String, Object> earlySingletonObjects = new HashMap<>(16); 这三级缓存的作用分别是: singletonObjects:完成初始化的单例对象的 cache,这里的 bean 经历过 实例化->属性填充->初始化 以及各种后置处理(一级缓存) earlySingletonObjects:存放原始的 bean 对象(完成实例化但是尚未填充属性和初始化),仅仅能作为指针提前曝光,被其他 bean 所引用,用于解决循环依赖的 (二级缓存) singletonFactories:在 bean 实例化完之后,属性填充以及初始化之前,如果允许提前曝光,Spring 会将实例化后的 bean 提前曝光,也就是把该 bean 转换成 beanFactory 并加入到 singletonFactories(三级缓存) 我们首先从缓存中试着获取 bean,就是从这三级缓存中查找 protected Object getSingleton(String beanName, boolean allowEarlyReference) { // 从 singletonObjects 获取实例,singletonObjects 中的实例都是准备好的 bean 实例,可以直接使用 Object singletonObject = this.singletonObjects.get(beanName); //isSingletonCurrentlyInCreation() 判断当前单例bean是否正在创建中 if (singletonObject == null && isSingletonCurrentlyInCreation(beanName)) { synchronized (this.singletonObjects) { // 一级缓存没有,就去二级缓存找 singletonObject = this.earlySingletonObjects.get(beanName); if (singletonObject == null && allowEarlyReference) { // 二级缓存也没有,就去三级缓存找 ObjectFactory<?> singletonFactory = this.singletonFactories.get(beanName); if (singletonFactory != null) { // 三级缓存有的话,就把他移动到二级缓存,.getObject() 后续会讲到 singletonObject = singletonFactory.getObject(); this.earlySingletonObjects.put(beanName, singletonObject); this.singletonFactories.remove(beanName); } } } } return singletonObject; } 如果缓存没有的话,我们就要创建了,接着我们以单例对象为例,再看下创建 bean 的逻辑(大括号表示内部类调用方法): 创建 bean 从以下代码开始,一个匿名内部类方法参数(总觉得 Lambda 的方式可读性不如内部类好理解) if (mbd.isSingleton()) { sharedInstance = getSingleton(beanName, () -> { try { return createBean(beanName, mbd, args); } catch (BeansException ex) { destroySingleton(beanName); throw ex; } }); bean = getObjectForBeanInstance(sharedInstance, name, beanName, mbd); } getSingleton() 方法内部主要有两个方法 public Object getSingleton(String beanName, ObjectFactory<?> singletonFactory) { // 创建 singletonObject singletonObject = singletonFactory.getObject(); // 将 singletonObject 放入缓存 addSingleton(beanName, singletonObject); } getObject() 匿名内部类的实现真正调用的又是 createBean(beanName, mbd, args) 往里走,主要的实现逻辑在 doCreateBean方法,先通过 createBeanInstance 创建一个原始 bean 对象 接着 addSingletonFactory 添加 bean 工厂对象到 singletonFactories 缓存(三级缓存) 通过 populateBean 方法向原始 bean 对象中填充属性,并解析依赖,假设这时候创建 A 之后填充属性时发现依赖 B,然后创建依赖对象 B 的时候又发现依赖 A,还是同样的流程,又去 getBean(A),这个时候三级缓存已经有了 beanA 的“半成品”,这时就可以把 A 对象的原始引用注入 B 对象(并将其移动到二级缓存)来解决循环依赖问题。这时候 getObject() 方法就算执行结束了,返回完全实例化的 bean 最后调用 addSingleton 把完全实例化好的 bean 对象放入 singletonObjects 缓存(一级缓存)中,打完收工 Spring 解决循环依赖 建议搭配着“源码”看下边的逻辑图,更好下饭 流程其实上边都已经说过了,结合着上图我们再看下具体细节,用大白话再捋一捋: Spring 创建 bean 主要分为两个步骤,创建原始 bean 对象,接着去填充对象属性和初始化 每次创建 bean 之前,我们都会从缓存中查下有没有该 bean,因为是单例,只能有一个 当我们创建 beanA 的原始对象后,并把它放到三级缓存中,接下来就该填充对象属性了,这时候发现依赖了 beanB,接着就又去创建 beanB,同样的流程,创建完 beanB 填充属性时又发现它依赖了 beanA,又是同样的流程,不同的是,这时候可以在三级缓存中查到刚放进去的原始对象 beanA,所以不需要继续创建,用它注入 beanB,完成 beanB 的创建 既然 beanB 创建好了,所以 beanA 就可以完成填充属性的步骤了,接着执行剩下的逻辑,闭环完成 这就是单例模式下 Spring 解决循环依赖的流程了。 但是这个地方,不管是谁看源码都会有个小疑惑,为什么需要三级缓存呢,我赶脚二级他也够了呀 革命尚未成功,同志仍需努力 跟源码的时候,发现在创建 beanB 需要引用 beanA 这个“半成品”的时候,就会触发"前期引用",即如下代码: ObjectFactory<?> singletonFactory = this.singletonFactories.get(beanName); if (singletonFactory != null) { // 三级缓存有的话,就把他移动到二级缓存 singletonObject = singletonFactory.getObject(); this.earlySingletonObjects.put(beanName, singletonObject); this.singletonFactories.remove(beanName); } singletonFactory.getObject() 是一个接口方法,这里具体的实现方法在 protected Object getEarlyBeanReference(String beanName, RootBeanDefinition mbd, Object bean) { Object exposedObject = bean; if (!mbd.isSynthetic() && hasInstantiationAwareBeanPostProcessors()) { for (BeanPostProcessor bp : getBeanPostProcessors()) { if (bp instanceof SmartInstantiationAwareBeanPostProcessor) { SmartInstantiationAwareBeanPostProcessor ibp = (SmartInstantiationAwareBeanPostProcessor) bp; // 这么一大段就这句话是核心,也就是当bean要进行提前曝光时, // 给一个机会,通过重写后置处理器的getEarlyBeanReference方法,来自定义操作bean // 值得注意的是,如果提前曝光了,但是没有被提前引用,则该后置处理器并不生效!!! // 这也正式三级缓存存在的意义,否则二级缓存就可以解决循环依赖的问题 exposedObject = ibp.getEarlyBeanReference(exposedObject, beanName); } } } return exposedObject; } 这个方法就是 Spring 为什么使用三级缓存,而不是二级缓存的原因,它的目的是为了后置处理,如果没有 AOP 后置处理,就不会走进 if 语句,直接返回了 exposedObject ,相当于啥都没干,二级缓存就够用了。 所以又得出结论,这个三级缓存应该和 AOP 有关系,继续。 在 Spring 的源码中getEarlyBeanReference 是 SmartInstantiationAwareBeanPostProcessor 接口的默认方法,真正实现这个方法的只有AbstractAutoProxyCreator 这个类,用于提前曝光的 AOP 代理。 @Override public Object getEarlyBeanReference(Object bean, String beanName) throws BeansException { Object cacheKey = getCacheKey(bean.getClass(), beanName); this.earlyProxyReferences.put(cacheKey, bean); // 对bean进行提前Spring AOP代理 return wrapIfNecessary(bean, beanName, cacheKey); } 这么说有点干,来个小 demo 吧,我们都知道 Spring AOP、事务等都是通过代理对象来实现的,而事务的代理对象是由自动代理创建器来自动完成的。也就是说 Spring 最终给我们放进容器里面的是一个代理对象,而非原始对象,假设我们有如下一段业务代码: @Service public class HelloServiceImpl implements HelloService { @Autowired private HelloService helloService; @Override @Transactional public Object hello() { return "Hello JavaKeeper"; } } 此 Service 类使用到了事务,所以最终会生成一个 JDK 动态代理对象 Proxy。刚好它又存在自己引用自己的循环依赖,完美符合我们的场景需求。 我们再自定义一个后置处理,来看下效果: @Component public class HelloProcessor implements SmartInstantiationAwareBeanPostProcessor { @Override public Object getEarlyBeanReference(Object bean, String beanName) throws BeansException { System.out.println("提前曝光了:"+beanName); return bean; } } 可以看到,调用方法栈中有我们自己实现的 HelloProcessor,说明这个 bean 会通过 AOP 代理处理。 再从源码看下这个自己循环自己的 bean 的创建流程: protected Object doCreateBean( ... ){ ... boolean earlySingletonExposure = (mbd.isSingleton() && this.allowCircularReferences && isSingletonCurrentlyInCreation(beanName)); // 需要提前暴露(支持循环依赖),就注册一个ObjectFactory到三级缓存 if (earlySingletonExposure) { // 添加 bean 工厂对象到 singletonFactories 缓存中,并获取原始对象的早期引用 //匿名内部方法 getEarlyBeanReference 就是后置处理器 // SmartInstantiationAwareBeanPostProcessor 的一个方法, // 它的功效为:保证自己被循环依赖的时候,即使被别的Bean @Autowire进去的也是代理对象 addSingletonFactory(beanName, () -> getEarlyBeanReference(beanName, mbd, bean)); } // 此处注意:如果此处自己被循环依赖了 那它会走上面的getEarlyBeanReference,从而创建一个代理对象从 三级缓存转移到二级缓存里 // 注意此时候对象还在二级缓存里,并没有在一级缓存。并且此时后续的这两步操作还是用的 exposedObject,它仍旧是原始对象~~~ populateBean(beanName, mbd, instanceWrapper); exposedObject = initializeBean(beanName, exposedObject, mbd); // 因为事务的AOP自动代理创建器在getEarlyBeanReference 创建代理后,initializeBean 就不会再重复创建了,二选一的) // 所以经过这两大步后,exposedObject 还是原始对象,通过 getEarlyBeanReference 创建的代理对象还在三级缓存呢 ... // 循环依赖校验 if (earlySingletonExposure) { // 注意此处第二个参数传的false,表示不去三级缓存里再去调用一次getObject()方法了~~~,此时代理对象还在二级缓存,所以这里拿出来的就是个 代理对象 // 最后赋值给exposedObject 然后return出去,进而最终被addSingleton()添加进一级缓存里面去 // 这样就保证了我们容器里 最终实际上是代理对象,而非原始对象~~~~~ Object earlySingletonReference = getSingleton(beanName, false); if (earlySingletonReference != null) { if (exposedObject == bean) { exposedObject = earlySingletonReference; } } ... } } 自我解惑: 问:还是不太懂,为什么这么设计呢,即使有代理,在二级缓存代理也可以吧 | 为什么要使用三级缓存呢? 我们再来看下相关代码,假设我们现在是二级缓存架构,创建 A 的时候,我们不知道有没有循环依赖,所以放入二级缓存提前暴露,接着创建 B,也是放入二级缓存,这时候发现又循环依赖了 A,就去二级缓存找,是有,但是如果此时还有 AOP 代理呢,我们要的是代理对象可不是原始对象,这怎么办,只能改逻辑,在第一步的时候,不管3721,所有 Bean 统统去完成 AOP 代理,如果是这样的话,就不需要三级缓存了,但是这样不仅没有必要,而且违背了 Spring 在结合 AOP 跟 Bean 的生命周期的设计。 所以 Spring “多此一举”的将实例先封装到 ObjectFactory 中(三级缓存),主要关键点在 getObject() 方法并非直接返回实例,而是对实例又使用 SmartInstantiationAwareBeanPostProcessor 的 getEarlyBeanReference 方法对 bean 进行处理,也就是说,当 Spring 中存在该后置处理器,所有的单例 bean 在实例化后都会被进行提前曝光到三级缓存中,但是并不是所有的 bean 都存在循环依赖,也就是三级缓存到二级缓存的步骤不一定都会被执行,有可能曝光后直接创建完成,没被提前引用过,就直接被加入到一级缓存中。因此可以确保只有提前曝光且被引用的 bean 才会进行该后置处理。 protected Object getSingleton(String beanName, boolean allowEarlyReference) { Object singletonObject = this.singletonObjects.get(beanName); if (singletonObject == null && isSingletonCurrentlyInCreation(beanName)) { synchronized (this.singletonObjects) { singletonObject = this.earlySingletonObjects.get(beanName); if (singletonObject == null && allowEarlyReference) { // 三级缓存获取,key=beanName value=objectFactory,objectFactory中存储 //getObject()方法用于获取提前曝光的实例 ObjectFactory<?> singletonFactory = this.singletonFactories.get(beanName); if (singletonFactory != null) { // 三级缓存有的话,就把他移动到二级缓存 singletonObject = singletonFactory.getObject(); this.earlySingletonObjects.put(beanName, singletonObject); this.singletonFactories.remove(beanName); } } } } return singletonObject; } boolean earlySingletonExposure = (mbd.isSingleton() && this.allowCircularReferences && isSingletonCurrentlyInCreation(beanName)); if (earlySingletonExposure) { if (logger.isDebugEnabled()) { logger.debug("Eagerly caching bean '" + beanName + "' to allow for resolving potential circular references"); } // 添加 bean 工厂对象到 singletonFactories 缓存中,并获取原始对象的早期引用 //匿名内部方法 getEarlyBeanReference 就是后置处理器 // SmartInstantiationAwareBeanPostProcessor 的一个方法, // 它的功效为:保证自己被循环依赖的时候,即使被别的Bean @Autowire进去的也是代理对象~~~~ AOP自动代理创建器此方法里会创建的代理对象~~~ addSingletonFactory(beanName, () -> getEarlyBeanReference(beanName, mbd, bean)); } 再问:AOP 代理对象提前放入了三级缓存,没有经过属性填充和初始化,这个代理又是如何保证依赖属性的注入的呢? 这个又涉及到了 Spring 中动态代理的实现,不管是cglib代理还是jdk动态代理生成的代理类,代理时,会将目标对象 target 保存在最后生成的代理 $proxy 中,当调用 $proxy 方法时会回调 h.invoke,而 h.invoke 又会回调目标对象 target 的原始方法。所有,其实在 AOP 动态代理时,原始 bean 已经被保存在 提前曝光代理中了,之后 原始 bean 继续完成属性填充和初始化操作。因为 AOP 代理$proxy 中保存着 traget 也就是是 原始bean 的引用,因此后续 原始bean 的完善,也就相当于Spring AOP中的 target 的完善,这样就保证了 AOP 的属性填充与初始化了! 非单例循环依赖 看完了单例模式的循环依赖,我们再看下非单例的情况,假设我们的配置文件是这样的: <bean id="beanA" class="priv.starfish.BeanA" scope="prototype"> <property name="beanB" ref="beanB"/> </bean> <bean id="beanB" class="priv.starfish.BeanB" scope="prototype"> <property name="beanA" ref="beanA"/> </bean> 启动 Spring,结果如下: Error creating bean with name 'beanA' defined in class path resource [applicationContext.xml]: Cannot resolve reference to bean 'beanB' while setting bean property 'beanB'; Error creating bean with name 'beanB' defined in class path resource [applicationContext.xml]: Cannot resolve reference to bean 'beanA' while setting bean property 'beanA'; Caused by: org.springframework.beans.factory.BeanCurrentlyInCreationException: Error creating bean with name 'beanA': Requested bean is currently in creation: Is there an unresolvable circular reference? 对于 prototype 作用域的 bean,Spring 容器无法完成依赖注入,因为 Spring 容器不进行缓存 prototype 作用域的 bean ,因此无法提前暴露一个创建中的bean 。 原因也挺好理解的,原型模式每次请求都会创建一个实例对象,即使加了缓存,循环引用太多的话,就比较麻烦了就,所以 Spring 不支持这种方式,直接抛出异常: if (isPrototypeCurrentlyInCreation(beanName)) { throw new BeanCurrentlyInCreationException(beanName); } 构造器循环依赖 上文我们讲的是通过 Setter 方法注入的单例 bean 的循环依赖问题,用 Spring 的小伙伴也都知道,依赖注入的方式还有构造器注入、工厂方法注入的方式(很少使用),那如果构造器注入方式也有循环依赖,可以搞不? 我们再改下代码和配置文件 public class BeanA { private BeanB beanB; public BeanA(BeanB beanB) { this.beanB = beanB; } } public class BeanB { private BeanA beanA; public BeanB(BeanA beanA) { this.beanA = beanA; } } <bean id="beanA" class="priv.starfish.BeanA"> <constructor-arg ref="beanB"/> </bean> <bean id="beanB" class="priv.starfish.BeanB"> <constructor-arg ref="beanA"/> </bean> 执行结果,又是异常 看看官方给出的说法 Circular dependencies If you use predominantly constructor injection, it is possible to create an unresolvable circular dependency scenario. For example: Class A requires an instance of class B through constructor injection, and class B requires an instance of class A through constructor injection. If you configure beans for classes A and B to be injected into each other, the Spring IoC container detects this circular reference at runtime, and throws a BeanCurrentlyInCreationException. One possible solution is to edit the source code of some classes to be configured by setters rather than constructors. Alternatively, avoid constructor injection and use setter injection only. In other words, although it is not recommended, you can configure circular dependencies with setter injection. Unlike the typical case (with no circular dependencies), a circular dependency between bean A and bean B forces one of the beans to be injected into the other prior to being fully initialized itself (a classic chicken-and-egg scenario). 大概意思是: 如果您主要使用构造器注入,循环依赖场景是无法解决的。建议你用 setter 注入方式代替构造器注入 其实也不是说只要是构造器注入就会有循环依赖问题,Spring 在创建 Bean 的时候默认是按照自然排序来进行创建的,我们暂且把先创建的 bean 叫主 bean,上文的 A 即主 bean,只要主 bean 注入依赖 bean 的方式是 setter 方式,依赖 bean 的注入方式无所谓,都可以解决,反之亦然 所以上文我们 AB 循环依赖问题,只要 A 的注入方式是 setter ,就不会有循环依赖问题。 面试官问:为什么呢? Spring 解决循环依赖依靠的是 Bean 的“中间态”这个概念,而这个中间态指的是已经实例化,但还没初始化的状态。实例化的过程又是通过构造器创建的,如果 A 还没创建好出来,怎么可能提前曝光,所以构造器的循环依赖无法解决,我一直认为应该先有鸡才能有蛋。 小总结 | 面试这么答 B 中提前注入了一个没有经过初始化的 A 类型对象不会有问题吗? 虽然在创建 B 时会提前给 B 注入了一个还未初始化的 A 对象,但是在创建 A 的流程中一直使用的是注入到 B 中的 A 对象的引用,之后会根据这个引用对 A 进行初始化,所以这是没有问题的。 Spring 是如何解决的循环依赖? Spring 为了解决单例的循环依赖问题,使用了三级缓存。其中一级缓存为单例池(singletonObjects),二级缓存为提前曝光对象(earlySingletonObjects),三级缓存为提前曝光对象工厂(singletonFactories)。 假设A、B循环引用,实例化 A 的时候就将其放入三级缓存中,接着填充属性的时候,发现依赖了 B,同样的流程也是实例化后放入三级缓存,接着去填充属性时又发现自己依赖 A,这时候从缓存中查找到早期暴露的 A,没有 AOP 代理的话,直接将 A 的原始对象注入 B,完成 B 的初始化后,进行属性填充和初始化,这时候 B 完成后,就去完成剩下的 A 的步骤,如果有 AOP 代理,就进行 AOP 处理获取代理后的对象 A,注入 B,走剩下的流程。 为什么要使用三级缓存呢?二级缓存能解决循环依赖吗? 如果没有 AOP 代理,二级缓存可以解决问题,但是有 AOP 代理的情况下,只用二级缓存就意味着所有 Bean 在实例化后就要完成 AOP 代理,这样违背了 Spring 设计的原则,Spring 在设计之初就是通过 AnnotationAwareAspectJAutoProxyCreator 这个后置处理器来在 Bean 生命周期的最后一步来完成 AOP 代理,而不是在实例化后就立马进行 AOP 代理。 参考与感谢: 《Spring 源码深度解析》- 郝佳著 https://developer.aliyun.com/article/766880 http://www.tianxiaobo.com/2018/06/08/Spring-IOC-容器源码分析-循环依赖的解决办法 https://cloud.tencent.com/developer/article/1497692 https://blog.csdn.net/chaitoudaren/article/details/105060882
2020年09月