八、并发包
在实际开发中如果不需要考虑线程安全问题,大家不需要做线程安全,因为如果做了反而性能不好!但是开发中有很多业务是需要考虑线程安全问题的,此时就必须考虑了。否则业务出现问题。 Java为很多业务场景提供了性能优异,且线程安全的并发包,程序员可以选择使用!
8.1、ConcurrentHashMap
Map集合中的经典集合:HashMap它是线程不安全的,性能好。如果在要求线程安全的业务情况下就不能用这个集合做Map集合,否则业务会崩溃。
为了保证线程安全,可以使用Hashtable。Hashtable是线程安全的Map集合,但是性能较差!(已经被淘汰了,虽然安全,但是性能差)
为什么说HashTable的性能差呢?我们看看源码可以得知,HashTable的每一个方法都用synchronized
修饰了,实在是过于悲观。
ConcurrentHashMap不止线程安全,而且效率高,性能好,最新最好用的线程安全的Map集。
8.1.1、HashMap线程不安全演示
public class Const { public static HashMap<String,String> map = new HashMap<>(); }
public void run() { for (int i = 0; i < 500000; i++) { Const.map.put(this.getName() + (i + 1), this.getName() + i + 1); } System.out.println(this.getName() + " 结束!"); }
public class Demo { public static void main(String[] args) throws InterruptedException { Thread1A a1 = new Thread1A(); Thread1A a2 = new Thread1A(); a1.setName("线程1-"); a2.setName("线程2-"); a1.start(); a2.start(); //休息10秒,确保两个线程执行完毕 Thread.sleep(1000 * 5); //打印集合大小 System.out.println("Map大小:" + Const.map.size()); } }
我们执行后可以发现出来的错误是有以下三种:
- 没有达到预期的效果
- 抛出异常
- 结果错误
8.1.2、Hashtable演示
public class Const { public static Hashtable<String,String> map = new Hashtable<>(); }
public void run() { long start = System.currentTimeMillis(); for (int i = 0; i < 500000; i++) { Const.map.put(this.getName() + (i + 1), this.getName() + i + 1); } long end = System.currentTimeMillis(); System.out.println(this.getName() + " 结束!用时:" + (end - start) + " 毫秒"); }
public class Demo { public static void main(String[] args) throws InterruptedException { Thread1A a1 = new Thread1A(); Thread1A a2 = new Thread1A(); a1.setName("线程1-"); a2.setName("线程2-"); a1.start(); a2.start(); //休息10秒,确保两个线程执行完毕 Thread.sleep(1000 * 5); //打印集合大小 System.out.println("Map大小:" + Const.map.size()); } }
8.1.3、ConcurrentHashMap演示
public class Const { public static ConcurrentHashMap<String,String> map = new ConcurrentHashMap<>(); }
public void run() { long start = System.currentTimeMillis(); for (int i = 0; i < 500000; i++) { Const.map.put(this.getName() + (i + 1), this.getName() + i + 1); } long end = System.currentTimeMillis(); System.out.println(this.getName() + " 结束!用时:" + (end - start) + " 毫秒"); }
public class Demo { public static void main(String[] args) throws InterruptedException { Thread1A a1 = new Thread1A(); Thread1A a2 = new Thread1A(); a1.setName("线程1-"); a2.setName("线程2-"); a1.start(); a2.start(); //休息10秒,确保两个线程执行完毕 Thread.sleep(1000 * 5); //打印集合大小 System.out.println("Map大小:" + Const.map.size()); } }
8.1.4、HashTable效率低下的原因
我们查看HashTable的源码我们可以发现他的每一个方法都用synchronized
修饰了,实在是过于悲观。
public synchronized V put(K key, V value) public synchronized V get(Object key)
在线程竞争激烈的情况下HashTable的效率非常低下。因为当一个线程访问HashTable的同步方法,其他线程也访问HashTable的同步方法时,会进入阻塞状态。如线程1使用put进行元素添加,线程2不但不能使用put方法添加元素,也不能使用get方法来获取元素,所以竞争越激烈效率越低。
8.1.5、ConcurrentHashMap高效的原因
ConcurrentHashMap上锁机制:CAS + 局部(synchronized)锁定(分段式锁)
8.1.6、总结
- HashMap是线程不安全的。
- Hashtable线程安全基于synchronized,综合性能差,被淘汰了。
- ConcurrentHashMap:线程安全的,分段式锁,综合性能最好,线程安全开发中推荐使用
8.2、CountDownLatch
CountDownLatch允许一个或多个线程等待其他线程完成操作,再执行自己。
需求
线程1要执行打印:A和C,线程2要执行打印:B,但线程1在打印A后,要线程2打印B之后才能打印C,所以:线程1在打印A后,必须等待线程2打印完B之后才能继续执行。
CountDownLatch构造器以及方法
public CountDownLatch(int count)// 初始化唤醒需要的down几步。count相当于一个计数器
方法 | 详解 |
public void await() throws InterruptedException | 让当前线程等待,必须down完初始化的数字才可以被唤醒,否则进入无限等待 |
public void countDown() | 计数器进行减1 (down 1) |
8.2.1、例子
public class ThreadA extends Thread { private CountDownLatch down ; public ThreadA(CountDownLatch down) { this.down = down; } @Override public void run() { System.out.println("A"); try { down.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("C"); } }
public class ThreadB extends Thread { private CountDownLatch down ; public ThreadB(CountDownLatch down) { this.down = down; } @Override public void run() { System.out.println("B"); down.countDown(); } }
public class Demo { public static void main(String[] args) { CountDownLatch down = new CountDownLatch(1);//创建1个计数器 new ThreadA(down).start(); new ThreadB(down).start(); } }
8.2.2、总结
- CountDownLatch中count down是倒数的意思,latch则是门闩的含义。整体含义可以理解为倒数的门栓,似乎有一点“三二一,芝麻开门”的感觉。
- CountDownLatch是通过一个计数器来实现的,每当一个线程完成了自己的任务后,可以调用countDown()方法让计数器-1,当计数器到达0时,调用CountDownLatch的wait()方法的线程阻塞状态解除,继续执行。
8.3、CyclicBarrier
CyclicBarrier的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行。这里的屏障相当于需要达到的要求。
CyclicBarrier构造方法:
public CyclicBarrier(int parties, Runnable barrierAction)// 用于在线程到达屏障时,优先执行barrierAction,方便处理更复杂的业务场景
CyclicBarrier重要方法:
public int await()// 每个线程调用await方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞
8.3.1、例子
制作员工线程
public class PersonThread extends Thread { private CyclicBarrier cbRef; public PersonThread(CyclicBarrier cbRef) { this.cbRef = cbRef; } @Override public void run() { try { Thread.sleep((int) (Math.random() * 1000)); System.out.println(Thread.currentThread().getName() + " 到了! "); cbRef.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } } }
制作开会线程
public class MeetingThread extends Thread { @Override public void run() { System.out.println("好了,人都到了,开始开会......"); } }
制作测试类
public class Demo { public static void main(String[] args) { CyclicBarrier cbRef = new CyclicBarrier(5, new MeetingThread());//等待5个线程执行完毕,再执行MeetingThread PersonThread p1 = new PersonThread(cbRef); PersonThread p2 = new PersonThread(cbRef); PersonThread p3 = new PersonThread(cbRef); PersonThread p4 = new PersonThread(cbRef); PersonThread p5 = new PersonThread(cbRef); p1.start(); p2.start(); p3.start(); p4.start(); p5.start(); } }
8.3.2、使用场景
CyclicBarrier可以用于多线程计算数据,最后合并计算结果的场景。例如:使用两个线程读取2个文件中的数据,当两个文件中的数据都读取完毕以后,进行数据的汇总操作。
8.4、Semaphore
Semaphore(发信号)的主要作用是控制线程的并发数量。他的机制和synchronized
一样都是上锁,但是,但某个时间段内,synchronized
只能有一个线程允许执行。Semaphore可以设置同时允许几个线程执行。它的作用是控制访问特定资源的线程数目。
Semaphore构造方法:
public Semaphore(int permits) //permits 表示许可线程的数量 public Semaphore(int permits, boolean fair) //fair 表示公平性,如果这个设为 true 的话,下次执行的线程会是等待最久的线程
Semaphore重要方法:
public void acquire() throws InterruptedException //表示获取许可 public void release() //release() 表示释放许可
8.4.1、示范一
我们测试一下只允许一个线程的案例。
制作一个Service类
public class Service { private Semaphore semaphore = new Semaphore(1);//1表示许可的意思,表示最多允许1个线程执行acquire()和release()之间的内容 public void testMethod() { try { semaphore.acquire(); System.out.println(Thread.currentThread().getName() + " 进入 时间=" + System.currentTimeMillis()); Thread.sleep(1000); System.out.println(Thread.currentThread().getName() + " 结束 时间=" + System.currentTimeMillis()); semaphore.release(); //acquire()和release()方法之间的代码为"同步代码" } catch (InterruptedException e) { e.printStackTrace(); } } }
制作线程类
public class ThreadA extends Thread { private Service service; public ThreadA(Service service) { super(); this.service = service; } @Override public void run() { service.testMethod(); } }
测试类
public class Demo { public static void main(String[] args) { Service service = new Service(); //启动5个线程 for (int i = 1; i <= 5; i++) { ThreadA a = new ThreadA(service); a.setName("线程 " + i); a.start();//5个线程会同时执行Service的testMethod方法,而某个时间段只能有1个线程执行 } } }
8.4.2、示范二
我们测试一下只允许两个线程的案例。
修改Service类
public class Service { private Semaphore semaphore = new Semaphore(2);//2表示许可的意思,表示最多允许2个线程执行acquire()和release()之间的内容 public void testMethod() { try { semaphore.acquire(); System.out.println(Thread.currentThread().getName() + " 进入 时间=" + System.currentTimeMillis()); Thread.sleep(5000); System.out.println(Thread.currentThread().getName() + " 结束 时间=" + System.currentTimeMillis()); semaphore.release(); //acquire()和release()方法之间的代码为"同步代码" } catch (InterruptedException e) { e.printStackTrace(); } } }
8.5、Exchanger
Exchanger(交换者)是一个用于线程间协作的工具类。Exchanger用于进行线程间的数据交换。
这两个线程通过exchange方法交换数据,如果第一个线程先执行exchange()方法,它会一直等待第二个线程也执行exchange方法,当两个线程都到达同步点时,这两个线程就可以交换数据,将本线程生产出来的数据传递给对方。
Exchanger构造方法:
public Exchanger();
Exchanger重要方法:
public V exchange(V x)
8.5.1、exchange方法的阻塞特性
- 制作线程A,并能够接收一个Exchanger对象:
public class ThreadA extends Thread { private Exchanger<String> exchanger; public ThreadA(Exchanger<String> exchanger) { super(); this.exchanger = exchanger; } @Override public void run() { try { System.out.println("线程A欲传递值'礼物A'给线程B,并等待线程B的值..."); System.out.println("在线程A中得到线程B的值=" + exchanger.exchange("礼物A")); } catch (InterruptedException e) { e.printStackTrace(); } } }
- 制作main()方法
public class Demo { public static void main(String[] args) { Exchanger<String> exchanger = new Exchanger<String>(); ThreadA a = new ThreadA(exchanger); a.start(); } }
8.5.2、exchange方法执行交换
- 制作线程A
public class ThreadA extends Thread { private Exchanger<String> exchanger; public ThreadA(Exchanger<String> exchanger) { super(); this.exchanger = exchanger; } @Override public void run() { try { System.out.println("线程A欲传递值'礼物A'给线程B,并等待线程B的值..."); System.out.println("在线程A中得到线程B的值=" + exchanger.exchange("礼物A")); } catch (InterruptedException e) { e.printStackTrace(); } } }
- 制作线程B
public class ThreadB extends Thread { private Exchanger<String> exchanger; public ThreadB(Exchanger<String> exchanger) { super(); this.exchanger = exchanger; } @Override public void run() { try { System.out.println("线程B欲传递值'礼物B'给线程A,并等待线程A的值..."); System.out.println("在线程B中得到线程A的值=" + exchanger.exchange("礼物B")); } catch (InterruptedException e) { e.printStackTrace(); } } }
- 制作测试类
public class Demo { public static void main(String[] args) throws InterruptedException { Exchanger<String> exchanger = new Exchanger<String>(); ThreadA a = new ThreadA(exchanger); ThreadB b = new ThreadB(exchanger); a.start(); b.start(); } }
8.5.3、exchange方法超时
exchange方法我们可以设置不一直等待,可以设置一个超时时间。
- 制作线程A
public class ThreadA extends Thread { private Exchanger<String> exchanger; public ThreadA(Exchanger<String> exchanger) { super(); this.exchanger = exchanger; } @Override public void run() { try { System.out.println("线程A欲传递值'礼物A'给线程B,并等待线程B的值,只等5秒..."); System.out.println("在线程A中得到线程B的值 =" + exchanger.exchange("礼物A",5, TimeUnit.SECONDS)); System.out.println("线程A结束!"); } catch (InterruptedException e) { e.printStackTrace(); } catch (TimeoutException e) { System.out.println("5秒钟没等到线程B的值,线程A结束!"); } } } 复制代码
- 制作测试类
public class Run { public static void main(String[] args) { Exchanger<String> exchanger = new Exchanger<String>(); ThreadA a = new ThreadA(exchanger); a.start(); } } 复制代码
8.5.4、使用场景
我们可以做可以做数据校对工作,比如我们需要将纸制银行流水通过人工的方式录入成电子银行流水。为了避免错误,采用AB岗两人进行录入,录入到两个文件中,系统需要加载这两个文件,并对两个文件数据进行校对,看看是否录入一致。
九、线程池
我们使用线程的时候就去创建一个线程,这样实现起来非常简便,但是就会有一个问题:
如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间,线程也属于宝贵的系统资源。
于是Java提供了一种思想:线程池,就是执行完一个任务,并不被销毁,而是可以继续执行其他的任务。
9.1、线程池的概念
线程池其实就是一个容纳多个线程的容器,其中的线程可以反复使用,省去了频繁创建线程对象的操作,无需反复创建线程而消耗过多资源。
合理利用线程池能够带来三个好处:
- 降低资源消耗。减少了创建和销毁线程的次数,每个工作线程都可以被重复利用,可执行多个任务。
- 提高响应速度。当任务到达时,任务可以不需要的等到线程创建就能立即执行。
- 提高线程的可管理性。可以根据系统的承受能力,调整线程池中工作线线程的数目,防止因为消耗过多的内存,而把服务器累趴下(每个线程需要大约1MB内存,线程开的越多,消耗的内存也就越大,最后死机)。
9.2、线程池的实现
Java中线程池的顶级接口是java.util.concurrent.Executor
,但是严格意义上讲Executor
并不是一个线程池,而是一个执行线程的工具。真正的线程池接java.util.concurrent.ExecutorService
。
要配置一个线程池是比较复杂的,尤其是对于线程池的原理不是很清楚的情况下,很有可能配置的线程池不是较优的,因此在java.util.concurrent.Executors
线程工厂类里面提供了一些静态工厂,生成一些常用的线程池。官方建议使用Executors工程类来创建线程池对象。
Executors类中有个创建线程池的方法有:
public static ExecutorService newFixedThreadPool(int nThreads)
:返回线程池对象。(创建的是有界线程池,也就是池中的线程个数可以指定最大数量)public Future<?> submit(Runnable task)
:获取线程池中的某一个线程对象,并执行
Future
接口:用来记录线程任务执行完毕后产生的结果。
使用线程池中线程对象的步骤:
- 创建线程池对象。
- 创建Runnable接口子类对象。
- 提交Runnable接口子类对象。
- 关闭线程池(一般不做)。
9.2.1、Runnable
public class MyRunnable implements Runnable { @Override public void run() { System.out.println("我要一个教练"); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("教练来了: " + Thread.currentThread().getName()); System.out.println("教我游泳,交完后,教练回到了游泳池"); } }
public class ThreadPoolDemo { public static void main(String[] args) { // 创建线程池对象 ExecutorService service = Executors.newFixedThreadPool(2);//包含2个线程对象 // 创建Runnable实例对象 MyRunnable r = new MyRunnable(); //自己创建线程对象的方式 // Thread t = new Thread(r); // t.start(); ---> 调用MyRunnable中的run() // 从线程池中获取线程对象,然后调用MyRunnable中的run() service.submit(r); // 再获取个线程对象,调用MyRunnable中的run() service.submit(r); service.submit(r); // 注意:submit方法调用结束后,程序并不终止,是因为线程池控制了线程的关闭。 // 将使用完的线程又归还到了线程池中 // 关闭线程池 //service.shutdown(); } }、
9.2.2、Callable
<T> Future<T> submit(Callable<T> task)
: 获取线程池中的某一个线程对象,并执行.
Future : 表示计算的结果。
V get()
: 获取计算完成的结果。
public class ThreadPoolDemo2 { public static void main(String[] args) throws Exception { // 创建线程池对象 ExecutorService service = Executors.newFixedThreadPool(2);//包含2个线程对象 // 创建Runnable实例对象 Callable<Double> c = new Callable<Double>() { @Override public Double call() throws Exception { return Math.random(); } }; // 从线程池中获取线程对象,然后调用Callable中的call() Future<Double> f1 = service.submit(c); // Futur 调用get() 获取运算结果 System.out.println(f1.get()); Future<Double> f2 = service.submit(c); System.out.println(f2.get()); Future<Double> f3 = service.submit(c); System.out.println(f3.get()); } }
十、死锁
多个线程同时被阻塞,它们中的一个或者全部都在等待某个资源被释放。由于线程被无限期地阻塞,因此程序不可能正常终止。
举个例子:客户(占用资金,等待经销商的货品资源) 经销商(占用货品资源,等待客户的资金)
10.1、死锁产生的必要条件
- 互斥:某种资源一次只允许一个进程访问,即该资源一旦分配给某个进程,其他进程就不能再访问,直到该进程访问结束。
- 不可抢占: 别人已经占有了某项资源,你不能因为自己也需要该资源,就去把别人的资源抢过来。资源请求者不能强制从资源占有者手中夺取资源,资源只能由资源占有者主动释放。
- 请求和保持,即当资源请求者在请求其他的资源的同时保持对原有资源的占有。
- 循环等待,即存在一个等待循环队列:p1要p2的资源,p2要p1的资源。这样就形成了一个等待环路
当上述四个条件都成立的时候,便形成死锁。当然,死锁的情况下如果打破上述任何一个条件,便可让死锁消失
10.2、死锁代码实现
死锁是多个线程满足上述四个条件才会形成,死锁需要尽量避免,且死锁一般存在资源的嵌套请求!
package test; /** * @author Xiao_Lin * @date 2020/12/30 19:10 */ public class DeadlockDemo { // 1.至少需要两个资源,每个资源只需要1份。 public static Object resources1 = new Object(); public static Object resources2 = new Object();; public static void main(String[] args) { new Thread(new Runnable() { @Override public void run() { // 线程1:占用资源1 ,请求资源2 synchronized (resources1){ System.out.println("线程1抢占资源1,等待资源2"); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } synchronized (resources2){ System.out.println("线程1成功抢占资源2"); } }; } }).start(); new Thread(new Runnable() { @Override public void run() { synchronized (resources2){ System.out.println("线程2抢占资源2,等待资源1"); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } synchronized (resources1){ System.out.println("线程2成功抢占资源1"); } } } }).start(); } }
作者:XiaoLin_Java
链接:https://juejin.cn/post/6984190169708494879
来源:稀土掘金
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。