线程的几种状态
新生 运行 阻塞 等待 超时等待 终止
wait和sleep的区别
1.来自不同的类
wait来源与Object类
sleep来源于Thread类
2.关于锁的释放
wait会释放锁
sleep会抱着锁睡觉
3.使用的范围不同
wait是需要在代码块中使用
sleep可以在任何地方使用
Lock锁和Synchronized的区别?
- Synchronized是内置的关键字,Lock是一个Java类。
- Synchronized无法判断锁的状态,Lock可以判断是否获取到锁。
- Synchronized会自动释放锁,但是Lock锁需要手动释放锁,如果不释放锁,则会产生死锁。
- Synchronized中如果线程1获到得锁,线程2就要一直等待;Lock锁就不一定会等待下去。
- Synchronized是一个可重入锁,是不可以中断的、非公平的。Lock是一个可重入锁,可以判断锁,非公平(可以自己手动设置公平锁)。
- Synchronized适合锁少量的代码同步问题。Lock适合锁大量的同步代码。
生产者和消费者问题
package com.example.study.high.pc; /** * 线程间通信: * 生产者与消费者,等待唤醒,通知唤醒 * 线程交替执行A B 操作同一个变量num=0 * A num+1 * B num-1 */ public class TestPC { public static void main(String[] args) { Data data = new Data(); new Thread(()->{ for (int i = 0; i < 10; i++) { try { data.increment(); } catch (InterruptedException e) { throw new RuntimeException(e); } } },"线程A").start(); new Thread(()->{ for (int i = 0; i < 10; i++) { try { data.decrement(); } catch (InterruptedException e) { throw new RuntimeException(e); } } },"线程B").start(); } } /** * 等待、业务、通知 * 判断是否需要等待 * 等待业务执行 * 通知其他线程 */ class Data { private int number = 0; //+1 public synchronized void increment() throws InterruptedException { if(number!=0) { this.wait(); } number++; System.out.println(Thread.currentThread().getName()+"=>"+number); //通知其他线程,我+1完毕了 this.notifyAll(); } //-1 public synchronized void decrement() throws InterruptedException { if(number==0) { this.wait(); } number--; System.out.println(Thread.currentThread().getName()+"=>"+number); //通知其他线程,我-1完毕了 this.notifyAll(); } }
存在A、B、C、D四个线程还安全么?
会出现虚假唤醒?
解决方法:
if改成while
package com.example.study.high.pc; /** * 线程间通信: * 生产者与消费者,等待唤醒,通知唤醒 * 线程交替执行A B 操作同一个变量num=0 * A num+1 * B num-1 */ public class TestPC { public static void main(String[] args) { Data data = new Data(); new Thread(()->{ for (int i = 0; i < 10; i++) { try { data.increment(); } catch (InterruptedException e) { throw new RuntimeException(e); } } },"线程A").start(); new Thread(()->{ for (int i = 0; i < 10; i++) { try { data.decrement(); } catch (InterruptedException e) { throw new RuntimeException(e); } } },"线程B").start(); new Thread(()->{ for (int i = 0; i < 10; i++) { try { data.increment(); } catch (InterruptedException e) { throw new RuntimeException(e); } } },"线程A").start(); new Thread(()->{ for (int i = 0; i < 10; i++) { try { data.decrement(); } catch (InterruptedException e) { throw new RuntimeException(e); } } },"线程B").start(); } } /** * 等待、业务、通知 * 判断是否需要等待 * 等待业务执行 * 通知其他线程 */ class Data { private int number = 0; //+1 public synchronized void increment() throws InterruptedException { while (number!=0) { this.wait(); } number++; System.out.println(Thread.currentThread().getName()+"=>"+number); //通知其他线程,我+1完毕了 this.notifyAll(); } //-1 public synchronized void decrement() throws InterruptedException { while(number==0) { this.wait(); } number--; System.out.println(Thread.currentThread().getName()+"=>"+number); //通知其他线程,我-1完毕了 this.notifyAll(); } }
JUC版本的生产者与消费者问题
通过Lock可以找到
package com.example.study.high.pc; import ch.qos.logback.classic.pattern.ClassOfCallerConverter; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * 线程间通信: * 生产者与消费者,等待唤醒,通知唤醒 * 线程交替执行A B 操作同一个变量num=0 * A num+1 * B num-1 */ public class TestPC2 { public static void main(String[] args) { Data data = new Data(); new Thread(() -> { for (int i = 0; i < 10; i++) { try { data.increment(); } catch (InterruptedException e) { throw new RuntimeException(e); } } }, "线程A").start(); new Thread(() -> { for (int i = 0; i < 10; i++) { try { data.decrement(); } catch (InterruptedException e) { throw new RuntimeException(e); } } }, "线程B").start(); new Thread(() -> { for (int i = 0; i < 10; i++) { try { data.increment(); } catch (InterruptedException e) { throw new RuntimeException(e); } } }, "线程A").start(); new Thread(() -> { for (int i = 0; i < 10; i++) { try { data.decrement(); } catch (InterruptedException e) { throw new RuntimeException(e); } } }, "线程B").start(); } } /** * 等待、业务、通知 * 判断是否需要等待 * 等待业务执行 * 通知其他线程 */ class Data2 { private int number = 0; Lock lock = new ReentrantLock(); Condition condition = lock.newCondition(); //+1 public synchronized void increment() throws InterruptedException { try { while (number != 0) { condition.await(); } number++; System.out.println(Thread.currentThread().getName() + "=>" + number); //通知其他线程,我+1完毕了 condition.signalAll(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } //-1 public synchronized void decrement() throws InterruptedException { try { lock.lock(); while (number == 0) { condition.await(); } number--; System.out.println(Thread.currentThread().getName() + "=>" + number); //通知其他线程,我-1完毕了 condition.signalAll(); } catch (Exception e) { } finally { lock.unlock(); } } }
Condition可以精准的通知和唤醒线程
package com.example.study.high.pc; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * 线程间通信: * 生产者与消费者,等待唤醒,通知唤醒 * 线程交替执行A B 操作同一个变量num=0 * A num+1 * B num-1 */ public class TestPC3 { /** * 等待、业务、通知 * 判断是否需要等待 * 等待业务执行 * 通知其他线程 */ public static void main(String[] args) { Data4 data = new Data4(); new Thread(() -> { for (int i = 0; i < 10; i++) { data.printA(); } }, "A").start(); new Thread(() -> { for (int i = 0; i < 10; i++) { data.printB(); } }, "B").start(); new Thread(() -> { for (int i = 0; i < 10; i++) { data.printC(); } }, "C").start(); } } class Data4 { private Lock lock = new ReentrantLock(); private Condition condition1 = lock.newCondition(); private Condition condition2 = lock.newCondition(); private Condition condition3 = lock.newCondition(); private int number = 1; //1A 2B 3C public void printA() { lock.lock(); try { while(number!=1) { //等待 condition1.await(); } System.out.println(Thread.currentThread().getName() + "执行了A"); number=2; //唤醒指定的人 condition2.signal(); } catch (Exception e) { throw new RuntimeException(e); } finally { lock.unlock(); } } public void printB() { lock.lock(); try { while(number!=2) { //等待 condition2.await(); } System.out.println(Thread.currentThread().getName() + "执行了B"); number=3; //唤醒指定的线程 C condition3.signal(); } catch (Exception e) { throw new RuntimeException(e); } finally { lock.unlock(); } } public void printC() { lock.lock(); try { while(number!=3) { //等待 condition3.await(); } System.out.println(Thread.currentThread().getName() + "执行了C"); number=1; //唤醒指定的线程 A condition1.signal(); } catch (Exception e) { throw new RuntimeException(e); } finally { lock.unlock(); } } }
8锁现象
如何判断锁的是谁?永远的知道什么锁?
集合线程不安全!
package com.example.study.high.unsafe; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.UUID; import java.util.concurrent.CopyOnWriteArrayList; public class ListTest { public static void main(String[] args) { // List<String> list = new ArrayList<>(); /** * 解决方法: * 1.List<String> list = new Vector<>();Vector是线程安全的,add方法都是用synchronized修饰的 * 2.List<String> list = Collections.synchronizedList(new ArrayList<>()); * 3.List<String> list = new CopyOnWriteArrayList<>(); * CopyOnWrite是写入时复制, 计算机程序设计领域的一种优化策略; * 多个线程调用的时候,list读取的时候,固定的写入 * 在写入的时候,避免覆盖,造成数据问题 * CopyOnWriteArrayList比Vector、Collections.synchronizedList性能好的原因是什么? */ List<String> list = new CopyOnWriteArrayList(); for (int i = 0; i < 10; i++) { new Thread(()->{ list.add(UUID.randomUUID().toString().substring(0, 8)); System.out.println(list); }).start(); } } }
CopyOnWriteArrayList的缺点:
- 内存占用问题
- 数据一致性问题,只能保证最终一致性;如果希望写入的数据马上马上可以读出来,那不用使用这个容器。
public boolean add(E e) { final ReentrantLock lock = this.lock; lock.lock(); //上锁,只允许一个线程进入 try { Object[] elements = getArray(); // 获得当前数组对象 int len = elements.length; Object[] newElements = Arrays.copyOf(elements, len + 1);//拷贝到一个新的数组中 newElements[len] = e;//插入数据元素 setArray(newElements);//将新的数组对象设置回去 return true; } finally { lock.unlock();//释放锁 } }
CopyOnWriteArrayList与Collections.synchronizedList的区别?
- CopyOnWrite写操作不仅有lock锁,而且还要进行数组的copy,性能就比Collections.synchronizedList的性能低。
- 在读数据的时候,因为CopyOnWrite直接读的是数组,但是Collections.synchroniedList是有synchronized修饰,所以读性能的话,CopyOnWrite的性能好。
学习的好方法?
1.会用,2.货比三家,3.寻找更好的解决方法
HashSet底层是什么?
package com.example.study.high.unsafe; import java.util.Collections; import java.util.HashSet; import java.util.Set; import java.util.UUID; import java.util.concurrent.CopyOnWriteArraySet; public class SetTest { public static void main(String[] args) { //用这个类型的会出现ConcurrentModificationException 并发修改异常 // Set<String> set = new HashSet<>(); /** * 解决ConcurrentModificationException一场的解决方案: * 1.Set<String> set = Collections.synchronizedSet(new HashSet<>()); * 2.Set<String> set = new CopyOnWriteArraySet<>(); */ Set<String> set = new CopyOnWriteArraySet<>(); for (int i = 0; i < 100; i++) { new Thread(()->{ set.add(UUID.randomUUID().toString().substring(0,5)); System.out.println(set); },String.valueOf(i)).start(); } } }
Map并发条件下的使用
package com.example.study.high.unsafe; import java.util.HashMap; import java.util.Map; import java.util.UUID; public class MapTest { public static void main(String[] args) { //这样在并发多线程的环境下是不安全,会出现ConcurrentModificationException的并发修改异常 Map<String,String> map = new HashMap<>(); for (int i = 0; i < 30; i++) { new Thread(()->{ map.put(Thread.currentThread().getName(), UUID.randomUUID().toString().substring(0,5)); System.out.println(map); },String.valueOf(i)).start(); } } }
HashMap原理
- 数据结构
- JDK7:数组+链表
- JDK8:数组+链表+红黑树
- 为什么要用链表? 因为会出现哈希冲突
HashMap的put实现原理
/** * 通过key进行hash运算,得到hash值, * 然后根据hash值进行取模,得到数组下标, * 根据拿着下标去数组中取对应的Entity, * 如果该下标没有值,那么直接存入对应的key和value, * 如果该下标有值,则使用里链表进行存储,然后返回 * * @return */ @Override public V put(K k, V v) { //根据key进行hash预算,然后取模,得到数组下标 int index = hash(k); Entity<K, V> entity = table[index]; if (entity == null) { table[index] = new Entity<>(k, v, index, null); size++; } else { //通过链接的数据结构来存储 table[index] = new Entity<>(k, v, index, entity); } return table[index].getValue(); }
HashMap的get实现原理
/** * 通过key进行hash运算,得到hash值, * 然后根据hash值进行取模,得到数组下标, * 根据拿着下标去数组中取对应的Entity, * 查询到的key和对象进行比对,判断是否相等 * 如果不相等,判断next是否为空 * 如果不为空,继续比较他们是否相等, * 相等则直接返回,如果不相等,继续比较next * next是否为空,直到为空或相等为止 * 然后返回结果 * * @param V * @return */ @Override public V get(K k) { if (size == 0) { return null; } //根据key进行hash预算,然后取模,得到数组下标 int index = hash(k); Entity<K, V> entity1 = findValue(k, table[index]); return entity1 != null ? entity1.getValue() : null; }
JDK8为什么要用红黑树呢?
- 链接的查询速度太慢,所以引入了红黑树,用红黑树来替换了链表。有一个阈值,阈值为8;
- 如果链表的长度超过8的时候,链表的数据结果就会变成红黑树的数据结构。
- 当红黑树的节点小于6的时候,会将红黑树的数据结构重新转换成单向链表。
- 红黑树引入颜色的原因主要是因为平衡条件可以得到简化,红黑树可以以O(log2n)
- 一上来直接用的是链表的原因是,插入的时候,链表的效率更高,而红黑树在插入数据的时候要不断的维护树,要保持二叉查找树的平衡,这样导致红黑树的插入效率没有链表高的原因。
红黑树阈值为什么是8呢
之所以是8,是因为Java的源码贡献者在进行大量实验发现,hash碰撞发生8次的概率已经降低到了0.00000006,根据概率统计而选择的;
红黑树转链表的阈值为6的原因
红黑树转链表的阈值为6,主要是因为,如果也将该阈值设置于8,那么当hash碰撞在8时,会发生链表和红黑树的不停相互激荡转换,白白浪费资源
红黑树的特点是什么?
- 每个节点非红即黑
- 根节点总是黑色的
- 如果节点是红色的,则它的子节点必须是黑色的(反之不一定)
- 每个叶子节点都是黑色的空节点(NIL节点)
- 从根节点到叶节点或空子节点的每条路径,必须包含相同数目的黑色节点(即相同的黑色高度)
ConcurrentHashmap原理
数据结构
Segment数组,数组中每个元素中有一个HashMap的数据结构
put原理
- 计算出key对应的hashcode。
- 通过移位运算,计算得出一个index,也就是Segement的下标,获取对用的Segement对象。
- 对Segement对象进行上锁,Lock锁。
- 用刚才计算出的hashcode对Segement对象里的Entry数组进行取模运算。计算出一个index。
- 然后找到对应的entry,剩下的逻辑就和hashmap时一样的。
扩容
- concurrentHashmap的扩容和Segement的数组长度没有关系。
- 扩容也只是扩容Segement数组中当前entry的长度。
JUC常用的辅助类
CountDownLatch:减法计数器
package com.example.study.add; import java.util.concurrent.CountDownLatch; /** * 计数器 */ public class CountDownLatchDemo { public static void main(String[] args) { CountDownLatch countDownLatch = new CountDownLatch(10); for (int i = 0; i < 10; i++) { new Thread(()->{ System.out.println(Thread.currentThread().getName()+"go ount!"); //线程计数器,数量减1 countDownLatch.countDown(); },String.valueOf(i)).start(); } try { //等待计数器归零以后,才能继续向下执行 countDownLatch.await(); System.out.println("Close door!"); } catch (InterruptedException e) { throw new RuntimeException(e); } } }
原理:
CountDownLatch主要是减法计数器,主要有两个方法,分别是countdown()和await();
countdown()方法的意思是每执行一次,就减1
await()方法的意思只有等计数器归0以后,CountdownLatch.await就会被唤醒,继续执行。
CyclicBarrier加法计数器
package com.example.study.add; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; /** * 加法计数器 */ public class CyclicBarrierDemo { public static void main(String[] args) { CyclicBarrier cyclicBarrier = new CyclicBarrier(8,()->{ System.out.println("加法计数器执行成功了!"); }); for (int i = 0; i < 7; i++) { final int temp = i; new Thread(()->{ System.out.println(Thread.currentThread().getName()+":"+temp+"个线程"); try { //当线程数量达到8个以后,才会执行加法计数器 cyclicBarrier.await(); } catch (InterruptedException e) { throw new RuntimeException(e); } catch (BrokenBarrierException e) { throw new RuntimeException(e); } }).start(); } } }
Semaphore:信号量
package com.example.study.add; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; /** * 信号量 */ public class SemaphoreDemo { public static void main(String[] args) { //线程数量:停车位!限流 Semaphore semaphore = new Semaphore(3);//模拟三个停车位 for (int i = 0; i < 6; i++) { new Thread(()->{ try { //得到 semaphore.acquire(); System.out.println(Thread.currentThread().getName()+"抢到车位"); TimeUnit.SECONDS.sleep(2); System.out.println(Thread.currentThread().getName()+"停车两秒后离开车位"); } catch (InterruptedException e) { throw new RuntimeException(e); } finally { //释放 semaphore.release(); } }).start(); } } }
原理:
semaphore.acquire():获得,假如如果已经满了,等待,等待被释放为止!
semaphore.release():释放,会将当前的信号量+1,然后唤醒等待的线程!
作用:多个共享资源互斥的使用。并发限流,控制最大的线程数!
读写锁 ReadWriteLock
ReadWriteLock中包含了
- readLock:读锁/独占锁;写的时候,一次只能一个线程占有;
- writeLock:写锁/共享锁;读的时候多个线程都可以调用。
package com.example.study.rw; import java.util.HashMap; import java.util.Map; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; /** * 独占锁(写锁):写的时候,一次智能由一个线程占有 * 共享锁(读锁):多个线程可以同时调用 * */ public class ReadWriteLockDemo { public static void main(String[] args) { //线程操作资源类 /** * 使用这种方式来实现的话,在并发的情况下,会出现一个线程刚写入且没有写入完成的时候,其他线程就进来了。 */ MyCache myCache = new MyCache(); for (int i = 0; i < 30; i++) { final int temp = i; new Thread(() -> { myCache.put(String.valueOf(temp), Thread.currentThread().getName()); }).start(); } for (int i = 0; i < 30; i++) { final int temp = i; new Thread(()->{ myCache.get(String.valueOf(temp)); }).start(); } } } class MyCache { private volatile Map<String, Object> map = new HashMap<>(); //读写锁 ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); //存,写 public void put(String key, Object value) { try { readWriteLock.writeLock().lock(); //写入的时候,只希望同时只有一个线程往这里面写, System.out.println(Thread.currentThread().getName() + "\t 正在写入:" + key); map.put(key, value); System.out.println(Thread.currentThread().getName() + "\t 写入完成"); } catch (Exception e) { e.printStackTrace(); } finally { readWriteLock.writeLock().unlock(); } } //取,读 public void get(String key) { //进行一个读锁 try { readWriteLock.readLock().lock(); //读的时候,希望所有的线程都可以读,不做限制 System.out.println(Thread.currentThread().getName() + "\t 正在读取"); Object result = map.get(key); System.out.println(Thread.currentThread().getName() + "\t 读取完成:" + result); } catch (Exception e) { e.printStackTrace(); } finally { readWriteLock.readLock().unlock(); } } }
运行结果:
Thread-0 正在写入:0 Thread-0 写入完成 Thread-1 正在写入:1 Thread-1 写入完成 Thread-2 正在写入:2 Thread-2 写入完成 Thread-3 正在写入:3 Thread-3 写入完成 Thread-4 正在写入:4 Thread-39 正在读取 Thread-39 读取完成:Thread-9 Thread-30 正在读取 Thread-34 正在读取 Thread-35 正在读取 Thread-31 正在读取 Thread-33 正在读取 Thread-31 读取完成:Thread-1 Thread-35 读取完成:Thread-5 Thread-34 读取完成:Thread-4 Thread-30 读取完成:Thread-0
阻塞队列:BlockingQueue;父类是Collection
package com.example.study.bq; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.TimeUnit; public class Test { public static void main(String[] args) throws InterruptedException { /** * BlcokQueue也是实现的Collection接口 * BlockingQueue是一个接口,它是一个队列,它的实现类有ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue、DelayQueue、SynchronousQueue * BlockingQueue和Set、List、Map一样,都是Collection的子接口; * BkockingQueue的使用场景: * */ test4(); } /** * 原始长度是3,当添加第四个元素时,会抛出异常java.lang.IllegalStateException: Queue full,标识队列已满 */ public static void test1() { int capacity = 3; ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue<>(capacity); System.out.println(arrayBlockingQueue.add("1")); System.out.println(arrayBlockingQueue.add("2")); System.out.println(arrayBlockingQueue.add("3")); System.out.println(arrayBlockingQueue.remove()); System.out.println(arrayBlockingQueue.remove()); System.out.println(arrayBlockingQueue.remove()); //原始长度是3,当添加第四个元素时,会抛出异常java.lang.IllegalStateException: Queue full,标识队列已满 System.out.println(arrayBlockingQueue.add("4")); } /** * 新增数据的时候,超出长度3的时候,不会抛出异常,而是返回false * 删除数据的时候,超出长度3的时候,不会抛出异常,而是返回null * peek()方法可以打印队列的第一个元素,但是不会删除队列的第一个元素 */ public static void test2() { int capacity = 3; ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue<>(capacity); System.out.println(arrayBlockingQueue.offer("1")); System.out.println(arrayBlockingQueue.offer("2")); System.out.println(arrayBlockingQueue.offer("3")); //新增数据的时候,超出长度3的时候,不会抛出异常,而是返回false System.out.println(arrayBlockingQueue.offer("4")); //打印对手元素 System.out.println("检测对手元素:"+arrayBlockingQueue.peek()); System.out.println(arrayBlockingQueue.poll()); System.out.println(arrayBlockingQueue.poll()); System.out.println(arrayBlockingQueue.poll()); //删除数据的时候,超出长度3的时候,不会抛出异常,而是返回null System.out.println(arrayBlockingQueue.poll()); } /** * 等待,阻塞(并且是一直阻塞) */ public static void test3() throws InterruptedException { ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue<>(3); arrayBlockingQueue.put("1"); arrayBlockingQueue.put("2"); arrayBlockingQueue.put("3"); arrayBlockingQueue.put("4"); } public static void test4() throws InterruptedException { ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue<>(3); arrayBlockingQueue.offer("1"); arrayBlockingQueue.offer("2"); arrayBlockingQueue.offer("3"); //等待2秒,如果还是没有空间,就退出 arrayBlockingQueue.offer("4",2, TimeUnit.SECONDS); System.out.println("========================"); arrayBlockingQueue.poll(); arrayBlockingQueue.poll(); arrayBlockingQueue.poll(); //等待2秒,如果还是没有数据,就返回null arrayBlockingQueue.poll(2,TimeUnit.SECONDS); } }
SynchronnousQueue 同步队列
package com.example.study.bq; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.TimeUnit; /** * 同步队列 * 1. 不存储元素 * 2. put一个元素,必须从里面先take取出来,否则不能在put进去值 */ public class SynchronnousQueueDemo { public static void main(String[] args) { SynchronousQueue synchronousQueue = new SynchronousQueue<>(); new Thread(()->{ try { System.out.println(Thread.currentThread().getName()+"\t put 1"); synchronousQueue.put("1"); System.out.println(Thread.currentThread().getName()+"\t put 2"); synchronousQueue.put("2"); System.out.println(Thread.currentThread().getName()+"\t put 3"); synchronousQueue.put("3"); } catch (InterruptedException e) { e.printStackTrace(); } },"AAA").start(); new Thread(()->{ try { TimeUnit.SECONDS.sleep(3); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()+"\t take "+synchronousQueue.take()); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()+"\t take "+synchronousQueue.take()); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()+"\t take "+synchronousQueue.take()); } catch (InterruptedException e) { e.printStackTrace(); } },"BBB").start(); } }
线程池
三大方法、七大策略、四种拒绝策略。
在面试过程中不要说用官方的方法创建的,而是说要ThreadPool来创建,因为阿里编码规范中有相关要求
池化技术:事先准备好一些资源,有人要用的时候,就到这里来拿,用完以后要还回来。
线程池的好处
- 降低资源的消耗
- 提高响应的速度和效率
- 方便管理
- 线程复用、可以控制最大的并发数、管理线程
线程的三大方法
package com.example.study.pool; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * Executors工具类,三大方法 */ public class Demo01 { public static void main(String[] args) { // ExecutorService executorService = Executors.newSingleThreadExecutor(); // 单个线程 // ExecutorService executorService = Executors.newFixedThreadPool(5); // 创建一个固定的线程池的大小 ExecutorService executorService = Executors.newCachedThreadPool();// 可伸缩的,遇强则强,遇弱则弱 try { for (int i = 0; i < 100; i++) { //使用了线程池之后,使用线程池来创建线程 executorService.execute(()->{ System.out.println(Thread.currentThread().getName()+"\t 办理业务"); }); } } catch (Exception e) { throw new RuntimeException(e); } finally { //关闭线程池 executorService.shutdown(); } } }
7大参数
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
三大方法都调用了同一个方法,源码就是下面的代码:
public ThreadPoolExecutor(int corePoolSize, //核心线程池大小 int maximumPoolSize,//最大核心线程池大小 long keepAliveTime,//超时了没有人调用就会释放 TimeUnit unit,//超时单位 BlockingQueue<Runnable> workQueue,//阻塞队列 ThreadFactory threadFactory,//线程工厂,创建线程的,一般不用动 RejectedExecutionHandler handler//拒绝策略 ) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
四种拒绝策略
- AbortPolicy:如果线程池已经满了,再添加线程就会抛出异常;
- CallerRunsPolicy:哪儿来的去哪里
- DiscardOldestPolicy:抛弃队列中等待最久的任务,然后把当前任务加入队列中尝试再次提交当前任务
- DiscardPolicy:直接丢弃任务,不予任何处理也不抛出异常
线程池的最大的大小应该如何设置?
- CPU密集型:CPU是多少核,那最大线程数就定位为这个,这样可以保证CPU的效率最高。
- IO密集型:判断程序中十分耗IO的线程
package com.example.study.pool; import java.util.concurrent.*; /** * Executors工具类,三大方法 */ public class Demo01 { public static void main(String[] args) { // ExecutorService executorService = Executors.newSingleThreadExecutor(); // 单个线程 // ExecutorService executorService = Executors.newFixedThreadPool(5); // 创建一个固定的线程池的大小 // ExecutorService executorService = Executors.newCachedThreadPool();// 可伸缩的,遇强则强,遇弱则弱 //获取CPU核数 System.out.println(Runtime.getRuntime().availableProcessors()); /** * 自定义线程池 阿里巴巴开发推荐使用 * 这种创建线程池的方法是根据CPU密集型来创建的额 * 通过Runtime.getRuntime().availableProcessors()获取CPU核数 */ ExecutorService executorService = new ThreadPoolExecutor( 3, Runtime.getRuntime().availableProcessors(), 3, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(3), Executors.defaultThreadFactory(), new ThreadPoolExecutor.DiscardPolicy() //如果线程池已经满了,再添加线程就会抛出异常 ); /** * 四种拒绝策略: * AbortPolicy:如果线程池已经满了,再添加线程就会抛出异常 * 爆出的错误异常:java.util.concurrent.RejectedExecutionException * CallerRunsPolicy:哪儿来的去哪里 * DiscardOldestPolicy:抛弃队列中等待最久的任务,然后把当前任务加入队列中尝试再次提交当前任务 * DiscardPolicy:直接丢弃任务,不予任何处理也不抛出异常 */ try { //最大承载:Deque + max,目前队列中有5个任务,max为3,所以最大承载为Deque+max=8 for (int i = 0; i < 9; i++) { //使用了线程池之后,使用线程池来创建线程 executorService.execute(()->{ System.out.println(Thread.currentThread().getName()+"\t 办理业务"); }); } } catch (Exception e) { throw new RuntimeException(e); } finally { //关闭线程池 executorService.shutdown(); } } }
四大函数式接口(必须掌握)
Function:函数式接口
@FunctionalInterface public interface Runnable { public abstract void run(); } //超级多的FunctionalInterface //foreach就是一个函数式接口
package com.example.study.funtion; import java.util.function.Function; /** * 只要是函数式接口,就可以用lambda表达式简化 */ public class Demo1 { public static void main(String[] args) { Function function = (s)->{ return s; }; System.out.println(function.apply("hello")); } }
Predicate:断定型接口,只有一个输入参数,返回值为boolean型
package com.example.study.funtion; import java.util.function.Predicate; /** * 断定性接口,有一个输入参数,返回值只能是布尔值 */ public class PredicationDemo { public static void main(String[] args) { Predicate<String> predicatie = (String)->{ //判断字符串是否为空 return "".equals(""); }; System.out.println(predicatie.test("")); } }
Consumer:消费型接口,只有输入参数,没有返回值
package com.example.study.funtion; import java.util.function.Consumer; /** * 消费性接口,只有输入参数,乜有返回值 */ public class ComsumerDemo { public static void main(String[] args) { Consumer<String> consumer = (x) -> System.out.println(x); consumer.accept("hello"); } }
Supplier:供给型接口,没有参数,只有返回值
package com.example.study.funtion; import java.util.function.Supplier; /** * 供给型接口,只有返回值,没有参数 */ public class SupplierDemo { public static void main(String[] args) { Supplier<String> supplier = () -> "hello"; System.out.println(supplier.get()); } }
Stream流式计算
- 什么是流式计算?确实通过链式调用函数式接口,要先了解四大函数式接口。
package com.example.study.stream; import java.util.Arrays; import java.util.List; public class Test { public static void main(String[] args) { User user1 = new User(1, "a", 18); User user2 = new User(2, "b", 19); User user3 = new User(3, "c", 20); User user4 = new User(4, "d", 21); User user5 = new User(5, "e",22); User user6 = new User(6, "f",23); List<User> list = Arrays.asList(user1, user2, user3, user4, user5,user6); list.stream() .filter(u->{return u.getId()%2==0;}) .filter(u->{return u.getAge()>20;}) .map(u->{return u.getName().toUpperCase();}) .sorted((uu2,uu1)->{return uu1.compareTo(uu2);}) .limit(1) .forEach(System.out::println); } }
ForkJoin 特点:工作窃取
如何使用ForkJoin?
- 通过ForkJoinPool来执行
package com.example.study.forkjoin; import java.util.concurrent.RecursiveTask; /** * 求和计算的方式: * 1. for循环 * 2. forkjoin * 3. stream并行流 */ public class ForkJoinDemo extends RecursiveTask<Long> { private Long start; private Long end; private Long temp = 10000L; public ForkJoinDemo(Long start, Long end) { this.start = start; this.end = end; } @Override protected Long compute() { if((end-start)<temp) { //如果小于临界值,就直接通过for循环计算 Long sum = 0L; for (Long i = start; i <= end; i++) { sum += i; } return sum; } else { //如果大于临界值的话, long middle = (start+end)/2; ForkJoinDemo task1 = new ForkJoinDemo(start,middle); task1.fork();//拆分子任务,同时压入线程队列 ForkJoinDemo task2 = new ForkJoinDemo(middle+1,end); task2.fork();//拆分子任务,同时压入线程队列 return task2.join()+task1.join(); } } }
三种计算的方式以及效率的比对
package com.example.study.forkjoin; import java.util.concurrent.ExecutionException; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinTask; import java.util.stream.LongStream; public class Test { public static void main(String[] args) throws ExecutionException, InterruptedException { // test1(); //耗费时间为:5832 // test2(); //耗费时间为:9111 test3(); //计算结果=500000000500000000;耗费时间为:711,单位:ms;计算的效率是普通的计算方式的几十倍 } /** * 传统的计算方式 */ public static void test1() { Long sum = 0L; //计算执行时间 long start = System.currentTimeMillis(); for (int i = 0; i < 10_0000_0000; i++) { sum += i; } long end = System.currentTimeMillis(); System.out.println("计算结果="+ sum +";耗费时间为:"+(end-start)); } /** * 基于forkjoin来计算的,这种i计算方式是可以调优的 * @throws ExecutionException * @throws InterruptedException */ public static void test2() throws ExecutionException, InterruptedException { long sum = 0; //计算执行时间 long start = System.currentTimeMillis(); ForkJoinPool forkJoinPool = new ForkJoinPool(); ForkJoinTask<Long> task = new ForkJoinDemo(0L, 10_0000_0000L); ForkJoinTask<Long> result =forkJoinPool.submit(task);//提交一个任务 sum = result.get(); long end = System.currentTimeMillis(); System.out.println("计算结果="+ sum +";耗费时间为:"+(end-start)); } /** * 基于stream并行流来计算的 * 计算的效率是普通的计算方式的几十倍 * 效率非常的高 */ public static void test3() { //计算执行时间 long start = System.currentTimeMillis(); // IntStream.rangeClosed(0, 10_0000_0000).parallel().reduce(0, Integer::sum); long reduce = LongStream.rangeClosed(0L, 10_0000_0000L) .parallel() .reduce(0, Long::sum); long end = System.currentTimeMillis(); System.out.println("计算结果="+ reduce +";耗费时间为:"+(end-start)); } }
异步回调:Future
package com.example.study.future; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; /** * 线程的异步回调思路和ajax相同 */ public class Demo1 { public static void main(String[] args) throws ExecutionException, InterruptedException { //没有返回值的异步线程回调 CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> { try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { throw new RuntimeException(e); } System.out.println(Thread.currentThread().getName() + "runAsync=>Void"); }); System.out.println("1111"); completableFuture.join(); //有返回值的异步线程回调 CompletableFuture completableFuture1 = CompletableFuture.supplyAsync(()->{ int aa = 10/0; System.out.println("supplyAsync=>Integer"); return 1024; }); completableFuture1.whenComplete((u,t)->{ //处理成功以后 System.out.println("t=>"+t); System.out.println("u=>"+u); }).exceptionally((t)->{ //处理失败以后 System.out.println(t); return 233; }).get(); } }
JMM
请你谈谈Volatile的理解
volatile是jvm提供的轻量级的同步机制:
- 保证可见性
- 不保证原子性
- 禁止指令重排
什么是JMM?
JMM是一个Java内存模型,是一个不存在的东西,是一个概念!是一个约定!
关于JMM的一些同步的约定:
- 线程解锁前,必须把共享变量立刻刷回主存。
- 线程加锁前,
线程的八种内存交互操作:
- lock(锁定):作用于主内存的变量,把一个变量标识为线程独占状态
- unlock(解锁) :作用于主内存的变量,把一个处于锁定状态的共享变量释放
- read(读取):作用于主内存的变量,把一个变量的值从主内存传输到线程的工作内存中
- load(加载):作用于工作内存的变量,把通过read操作获取的变量值放入工作内存中
- use(使用):作用于工作内存的变量,把工作内存中的变量传输给执行引擎,每当虚拟机遇到需要使用到变量的值,就会使用到这个指令
- assign(赋值):作用于工作内存的变量,把执行引擎传输过来的值放入工作内存
- store(存储):作用于主内存的变量,把一个从线程中的工作内存的变量值传送到主内存中,以便后续的write操作
- write(写入):作用于主内存的变量,将store操作从工作内存获取的变量值放入主内存中
JMM对以上八种内存操作指令做出了如下约束:
- read和load、user和assign、store和write、lock和unlock必须成对出现,不允许单独操作其中一条指令
- 不允许线程丢弃离它最近的assign操作,即 工作内存中的变量值改变之后,必须告知主内存
- 不允许一个线程将没有assign过的数据从工作内存同步会主内存
- 一个新的变量必须在主内存中产生,不允许工作内存私自初始化一个变量来作为共享变量,即 实施use 和 store操作之前 , 必须经过 load 和 assign操作
- 同一变量同一时间只允许一个线程对其进行lock操作;多次lock之后,必须执行相同次数的unlock对其解锁
- 如果对一个变量进行lock操作,会清空所有工作内存中此变量的值, 即 每次获得锁的线程,加锁前必须要重新读取主内存中的变量值,才能提交给执行引擎进行use操作
- 如果一个变量没有被lock,就不能对其进行unlock操作,也不能对一个被其他线程锁住的变量进行unlock
- 对一个变量加锁之前,必须把工作内存中的变量值同步回主内存
Volatile
package com.example.study.tvolatile; public class VDemo1 { /** * volatie关键字保证了可见性,不保证原子性 这里即使加了Volatile也无法抱着原子性 */ private volatile static int num = 0; public static void add() { num++; } public static void main(String[] args) { for (int i = 0; i < 20; i++) { new Thread(() -> { for (int j = 0; j < 1000; j++) { add(); } }).start(); } while(Thread.activeCount()>2) { Thread.yield(); } System.out.println(num); } }
如何在不使用Synchronized和Lock的情况下,保证变量的原子性呢?
这些类的底层都是和操作系统挂钩,在内存中直接修改值,!UnSafe类是一个很特殊的存在!
指令重排
什么叫指令重排?
你写的程序,计算机并不是按照我们写的那样去执行的。
源代码--编译器优化----指令并行也可能重排--内存系统也有可能重排--执行
int x = 1;//1 int y=2;//2 x = x+5;//3 y = x+x;//4 我们所期望的是执行顺序是1234,但是系统可能执行的顺序是2134 1324 这就是指令重排
volatile是怎么避免指令重排?
内存屏障,CPU指令,作用:
- 保证特点的操作的执行顺序。
- 可以保证某些变量的内存可见性
volatile是可以保持可见性,不可保证原子性,由于内存屏障,可以避免指令重排的的现象产生。
单例不安全,因为有反射,反射可以破坏单例
package com.example.study.signle; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; /** * 枚举本神也是一个类 */ public enum EnumSingle { INSTANCE; public EnumSingle getInstance() { return INSTANCE; } } class Test { public static void main(String[] args) { EnumSingle enumSingle = EnumSingle.INSTANCE; try { Constructor constructor = EnumSingle.class.getDeclaredConstructor(); EnumSingle enumSingle1 = (EnumSingle) constructor.newInstance(); //通过反射的方式,调用无参构造函数;NoSuchMethodException: com.example.study.signle.EnumSingle.<init>() enumSingle1.getInstance(); } catch (NoSuchMethodException e) { throw new RuntimeException(e); } catch (InvocationTargetException e) { throw new RuntimeException(e); } catch (InstantiationException e) { throw new RuntimeException(e); } catch (IllegalAccessException e) { throw new RuntimeException(e); } } }
深入理解CAS
什么是CAS?
- CAS叫比较并交换,如果这个值是期望的,那么则执行操作,否则不执行。如果不是,就一直循环(底层是一个自旋锁)。
- 缺点:
- 底层是一个循环,会耗时
- 一次性只能保证一个共享变量的原子问题
- 存在ABA问题
Unsafe类的含义
CAS:ABA问题
package com.example.study.cas; import java.util.concurrent.atomic.AtomicInteger; public class CASDemo { //CAS compareAndSet 比较并交换 public static void main(String[] args) { AtomicInteger atomicInteger = new AtomicInteger(2020); // public final boolean compareAndSet(int expect, int update) //如果我期望的值达到了,那么就更新,否则,就不更新,CAS是CPU的并发原语 System.out.println(atomicInteger.compareAndSet(2020, 2021)); System.out.println(atomicInteger.get()); System.out.println(atomicInteger.compareAndSet(2021, 2020)); System.out.println(atomicInteger.get()); //乐观锁 System.out.println(atomicInteger.compareAndSet(2020, 6666)); System.out.println(atomicInteger.get()); } }
原子引用
利用原子引用就可以解决原子引用,核心原理是乐观锁。
其中有坑的地方:
package com.example.study.cas; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicStampedReference; public class CASDemo { /** * AtomicStampedReference 如果泛型是包装类,注意对象的引用问题 */ static AtomicStampedReference<Integer> atomicStampedReference = new AtomicStampedReference<>(1, 1); //CAS compareAndSet 比较并交换 public static void main(String[] args) { new Thread(()->{ int stamp = atomicStampedReference.getStamp(); System.out.println("a1=>"+stamp); try { TimeUnit.SECONDS.sleep(12); } catch (InterruptedException e) { throw new RuntimeException(e); } // 期望值,更新版本号 System.out.println(atomicStampedReference.compareAndSet(1, 2, atomicStampedReference.getStamp(), atomicStampedReference.getStamp() + 1)); System.out.println("a2=>"+atomicStampedReference.getStamp()); // 期望值,更新版本号 System.out.println(atomicStampedReference.compareAndSet(2, 1, atomicStampedReference.getStamp(), atomicStampedReference.getStamp() + 1)); System.out.println("a3=>"+atomicStampedReference.getStamp()); },"a").start(); new Thread(()->{ int stamp = atomicStampedReference.getStamp(); System.out.println("b1=>"+stamp); },"b").start(); // // // public final boolean compareAndSet(int expect, int update) // //如果我期望的值达到了,那么就更新,否则,就不更新,CAS是CPU的并发原语 // System.out.println(atomicInteger.compareAndSet(2020, 2021)); // System.out.println(atomicInteger.get()); // // System.out.println(atomicInteger.compareAndSet(2021, 2020)); // System.out.println(atomicInteger.get()); // // //乐观锁 // System.out.println(atomicInteger.compareAndSet(2020, 6666)); // System.out.println(atomicInteger.get()); } }
各种锁的理解
公平锁、非公平锁
公平锁:非常公平,不能够插队,必须先来后到!
非公平锁:非常不公平,可以插队(默认都是非公平的)
可重入锁
原理机制(加锁次数计数器)
可重入原理:加锁次数计数器
一个线程拿到锁之后,可以继续地持有锁,如果想再次进入由这把锁控制的方法,那么它可以直接进入。它的原理是利用加锁次数计数器来实现的。
1,每重入一次,计数器+1
每个对象自动含有一把锁,JVM负责跟踪对象被加锁的次数。
线程第一次给对象加锁的时候,计数器=0+1=1,每当这个相同的线程在此对象上再次获得锁时,计数器再+1。只有首先获取这把锁的线程,才能继续在这个对象上多次地获取这把锁
2,计数器-1
每当任务结束离开时,计数递减,当计数器减为0,锁被完全释放。
利用这个计数器可以得知这把锁是被当前多次持有,还是如果=0的话就是完全释放了。
可重入的锁(递归锁)
所谓的可重入锁,其实就是当外面锁,和里面也有锁的时候,只要外面的锁解开了,那里面的锁也会解开。
Synchronized版本的可重入锁
package com.example.study.lock; /** * Synchronized */ public class Demo01 { public static void main(String[] args) { Phone phone = new Phone(); new Thread(()->{ phone.sms(); },"A").start(); new Thread(()->{ phone.sms(); },"B").start(); } } class Phone{ public synchronized void sms() { System.out.println(Thread.currentThread().getName()+"sms"); call(); } public synchronized void call() { System.out.println(Thread.currentThread().getName()+"call"); } }
Lock锁版本的可重入锁
package com.example.study.lock; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class Demo02 { public static void main(String[] args) { Phone1 phone = new Phone1(); new Thread(()->{ phone.sms(); },"A").start(); new Thread(()->{ phone.sms(); },"B").start(); } } class Phone1 { Lock lock = new ReentrantLock(); public void sms() { lock.lock(); lock.lock(); //只要是lock锁,就必须手动释放锁,否则会造成死锁 try { System.out.println(Thread.currentThread().getName() + "sms"); call(); } catch (Exception e) { throw new RuntimeException(e); } finally { lock.unlock(); lock.unlock(); } } public synchronized void call() { lock.lock(); try { System.out.println(Thread.currentThread().getName() + "call"); } catch (Exception e) { throw new RuntimeException(e); } finally { lock.unlock(); } } }
自旋锁
死锁的排查解决办法
排查方法:
- 在jdk的bin目录下,通过命令来排查
- 使用jps -l 定位进程号,下图的8082就是进程号
- 使用jstack 进程号 查询具体的信息