17 Java多线程(线程创建+线程状态+线程安全+死锁+线程池+Lock接口+线程安全集合)(中):https://developer.aliyun.com/article/1580255
17.7 线程池
17.7.1 为什么需要线程池?
- 如果有非常的多的任务需要多线程来完成,且每个线程执行时间不会太长,这样频繁的创建和销毁线程。
- 频繁创建和销毁线程会比较耗性
17.7.2 线程池原理
线程池用维护者一个队列,队列中保存着处于等待(空闲)状态的线程。不用每次都创建新的线程。
17.7.3 线程池API
常用的线程池接口和类(所在包java.util.concurrent)。
Executor:线程池的顶级接口。
ExecutorService:线程池接口,可通过submit(Runnable task) 提交任务代码。
Executors工厂类:通过此类可以获得一个线程池。
方法名 | 描述 |
newFixedThreadPool(int nThreads) | 获取固定数量的线程池。参数:指定线程池中线程的数量。 |
newCachedThreadPool() | 获得动态数量的线程池,如不够则创建新的,无上限 |
newSingleThreadExecutor() | 创建单个线程的线程池,只有一个线程。 |
newScheduledThreadPool() | 创建固定大小的线程池,可以延迟或定时执行任务。 |
案例演示:测试线程池。
public class TestThreadPool { public static void main(String[] args) { //1.1创建固定线程个数的线程池 //ExecutorService es=Executors.newFixedThreadPool(4); //1.2创建缓存线程池,线程个数由任务个数决定 ExecutorService es=Executors.newCachedThreadPool(); //1.3创建单线程线程池 //Executors.newSingleThreadExecutor(); //1.4创建调度线程池 调度:周期、定时执行 //Executors.newScheduledThreadPool(corePoolSize) Executors.newScheduledThreadPool(3); //2创建任务 Runnable runnable=new Runnable() { private int ticket=100; @Override public void run() { while(true) { if(ticket<=0) { break; } System.out.println(Thread.currentThread().getName()+"买了第"+ticket+"张票"); ticket--; } } }; //3提交任务 for(int i=0;i<5;i++) { es.submit(runnable); } //4关闭线程池 es.shutdown();//等待所有任务执行完毕 然后关闭线程池,不接受新任务。 } }
17.7.4 Callable接口
public interface Callable< V >{
public V call() throws Exception;
}
案例演示:Callable接口的使用。
public class TestCallable { public static void main(String[] args) throws Exception{ //功能需求:使用Callable实现1-100和 //1创建Callable对象 Callable<Integer> callable=new Callable<Integer>() { @Override public Integer call() throws Exception { System.out.println(Thread.currentThread().getName()+"开始计算"); int sum=0; for(int i=1;i<=100;i++) { sum+=i; Thread.sleep(100); } return sum; } }; //2把Callable对象 转成可执行任务 FutureTask<Integer> task=new FutureTask<>(callable); //3创建线程 Thread thread=new Thread(task); //4启动线程 thread.start(); //5获取结果(等待call执行完毕,才会返回) Integer sum=task.get(); System.out.println("结果是:"+sum); } }
Runnable接口和Callable接口的区别:
- Callable接口中call方法有返回值,Runnable接口中run方法没有返回值。
- Callable接口中call方法有声明异常,Runnable接口中run方法没有异常。
17.7.5 Future接口
- Future接口表示将要执行完任务的结果。
- get()以阻塞形式等待Future中的异步处理结果(call()的返回值)。
案例演示:计算1-100的和。
public class TestFuture { public static void main(String[] args) throws Exception{ //1创建线程池 ExecutorService es=Executors.newFixedThreadPool(1); //2提交任务 Future:表示将要执行完任务的结果 Future<Integer> future=es.submit(new Callable<Integer>() { @Override public Integer call() throws Exception { System.out.println(Thread.currentThread().getName()+"开始计算"); int sum=0; for(int i=1;i<=100;i++) { sum+=i; Thread.sleep(10); } return sum; } }); //3获取任务结果,等待任务执行完毕才会返回. System.out.println(future.get()); //4关闭线程池 es.shutdown(); } }
17.7.6 案例
需求:使用两个线程,并发计算150、51100的和,再进行汇总统计。
public class TestFuture2 { public static void main(String[] args) throws Exception{ //1创建线程池 ExecutorService es=Executors.newFixedThreadPool(2); //2提交任务 Future<Integer> future1=es.submit(new Callable<Integer>() { @Override public Integer call() throws Exception { int sum=0; for(int i=1;i<=50;i++) { sum+=i; } System.out.println("1-50计算完毕"); return sum; } }); Future<Integer> future2=es.submit(new Callable<Integer>() { @Override public Integer call() throws Exception { int sum=0; for(int i=51;i<=100;i++) { sum+=i; } System.out.println("51-100计算完毕"); return sum; } }); //3获取结果 int sum=future1.get()+future2.get(); System.out.println("结果是:"+sum); //4关闭线程池 es.shutdown(); } }
17.8 Lock接口
17.8.1 Lock
- JDK5加入,与synchronized比较,显示定义,结构更灵活。
- 提供更多实用性方法,功能更强大、性能更优越。
常用方法:
方法名 | 描述 |
void lock() | 获取锁,如锁被占用,则等待。 |
boolean tryLock() | 尝试获取锁(成功返回true。失败返回false,不阻塞)。 |
void unlock() | 释放锁。 |
17.8.2 重入锁
- Lock接口的实现类,与synchronized一样具有互斥锁功能。
public class MyList { //创建锁 private Lock lock=new ReentrantLock(); private String[] str= {"A","B","","",""}; private int count=2; public void add(String value) { lock.lock(); try { str[count]=value; try { Thread.sleep(1000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } count++; System.out.println(Thread.currentThread().getName()+"添加了"+value); } finally { lock.unlock(); } } public String[] getStr() { return str; } }
17.8.3 读写锁
ReentrantReadWriteLock:
- 一种支持一写多读的同步锁,读写分离,可分别分配读锁、写锁。
- 支持多次分配读锁,使多个读操作可以并发执行。
互斥规则:
- 写-写:互斥,阻塞。
- 读-写:互斥,读阻塞写、写阻塞读。
- 读-读:不互斥、不阻塞。
- 在读操作远远高于写操作的环境中,可在保障线程安全的情况下,提高运行效率。
ReadWriteDemo类:
public class ReadWriteDemo { //创建读写锁 private ReentrantReadWriteLock rrl=new ReentrantReadWriteLock(); //获取读锁 private ReadLock readLock=rrl.readLock(); //获取写锁 private WriteLock writeLock=rrl.writeLock(); //互斥锁 private ReentrantLock lock=new ReentrantLock(); private String value; //读取 public String getValue() { //使用读锁上锁 lock.lock(); try { try { Thread.sleep(1000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println("读取:"+this.value); return this.value; }finally { lock.unlock(); } } //写入 public void setValue(String value) { lock.lock(); try { try { Thread.sleep(1000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println("写入:"+value); this.value=value; }finally { lock.unlock(); } } }
TestReadWriteLock类:
public class TestReadWriteLock { public static void main(String[] args) { ReadWriteDemo readWriteDemo=new ReadWriteDemo(); //创建线程池 ExecutorService es=Executors.newFixedThreadPool(20); Runnable read=new Runnable() { @Override public void run() { readWriteDemo.getValue(); } }; Runnable write=new Runnable() { @Override public void run() { readWriteDemo.setValue("张三:"+new Random().nextInt(100)); } }; long start=System.currentTimeMillis(); //分配2个写的任务 for(int i=0;i<2;i++) { es.submit(write); } //分配18读取任务 for(int i=0;i<18;i++) { es.submit(read); } es.shutdown();//关闭 while(!es.isTerminated()) {//空转 } long end=System.currentTimeMillis(); System.out.println("用时:"+(end-start)); } }
17.9 线程安全集合
Collections工具类中提供了多个可以获得线程安全集合的方法。
方法名 |
public static Collection synchronizedCollection(Collection c) |
public static List synchronizedList(List list) |
public static Set synchronizedSet(Set s) |
public static <K,V> Map<K,V> synchronizedMap(Map<K,V> m) |
public static SortedSet synchronizedSortedSet(SortedSet s) |
public static <K,V> SortedMap<K,V> synchronizedSortedMap(SortedMap<K,V> m) |
注:JDK1.2提供,接口统一、维护性高,但性能没有提升,均以synchonized实现。
17.9.1 CopyOnWriteArrayList
- 线程安全的ArrayList,加强版读写分离。
- 写有锁,读无锁,读写之间不阻塞,优于读写锁。
- 写入时,先copy一个容器副本、再添加新元素,最后替换引用。
- 使用方式与ArrayList无异。
public class TestCopyOnWriteArrayList { public static void main(String[] args) { //1创建集合 CopyOnWriteArrayList<String> list=new CopyOnWriteArrayList<>(); //2使用多线程操作 ExecutorService es=Executors.newFixedThreadPool(5); //3提交任务 for(int i=0;i<5;i++) { es.submit(new Runnable() { @Override public void run() { for(int j=0;j<10;j++) { list.add(Thread.currentThread().getName()+"...."+new Random().nextInt(1000)); } } }); } //4关闭线程池 es.shutdown(); while(!es.isTerminated()) {} //5打印结果 System.out.println("元素个数:"+list.size()); for (String string : list) { System.out.println(string); } } }
17.9.2 CopyOnWriteArraySet
- 线程安全的Set,底层使用CopyOnWriteArrayList实现。
- 唯一不同在于,使用addIfAbsent()添加元素,会遍历数组。
- 如存在元素,则不添加(扔掉副本)。
public class TestCopyOnWriteArraySet { public static void main(String[] args) { //1创建集合 CopyOnWriteArraySet<String> set=new CopyOnWriteArraySet<>(); //2添加元素 set.add("pingguo"); set.add("huawei"); set.add("xiaomi"); set.add("lianxiang"); set.add("pingguo"); //3打印 System.out.println("元素个数:"+set.size()); System.out.println(set.toString()); } }
17.9.3 ConcurrentHashMap
- 初始容量默认为16段(Segment),使用分段锁设计。
- 不对整个Map加锁,而是为每个Segment加锁。
- 当多个对象存入同一个Segment时,才需要互斥。
- 最理想状态为16个对象分别存入16个Segment,并行数量16。
- 使用方式与HashMap无异。
public class TestConcurrentHashMap { public static void main(String[] args) { //1创建集合 ConcurrentHashMap<String, String> hashMap=new ConcurrentHashMap<String, String>(); //2使用多线程添加数据 for(int i=0;i<5;i++) { new Thread(new Runnable() { @Override public void run() { for(int k=0;k<10;k++) { hashMap.put(Thread.currentThread().getName()+"--"+k, k+""); System.out.println(hashMap); } } }).start(); } } }
17.9.4 Queue
- Collection的子接口,表示队列FIFO(First In First Out)。
推荐方法:
方法名 | 描述 |
boolean offer(E e) | 顺序添加一个元素 (到达上限后,再添加则会返回false)。 |
E poll() | 获得第一个元素并移除 (如果队列没有元素时,则返回null)。 |
E keep() | 获得第一个元素但不移除 (如果队列没有元素时,则返回null)。 |
public class TestQueue { public static void main(String[] args) { //1创建队列 Queue<String> queue=new LinkedList<>(); //2入队 queue.offer("苹果"); queue.offer("橘子"); queue.offer("葡萄"); queue.offer("西瓜"); queue.offer("榴莲"); //3出队 System.out.println(queue.peek()); System.out.println("----------------"); System.out.println("元素个数:"+queue.size()); int size=queue.size(); for(int i=0;i<size;i++) { System.out.println(queue.poll()); } System.out.println("出队完毕:"+queue.size()); } }
17.9.5 ConcurrentLinkedQueue
- 线程安全、可高效读写的队列,高并发下性能最好的队列。
- 无锁、CAS比较交换算法,修改的方法包含三个核心参数(V,E,N)。
- V:要更新的变量、E:预期值、N:新值。
- 只有当V==E时,V=N;否则表示已被更新过,则取消当前操作。
public class TestConcsurrentLinkedQueue { public static void main(String[] args) throws Exception { //1创建安全队列 ConcurrentLinkedQueue<Integer> queue=new ConcurrentLinkedQueue<>(); //2入队操作 Thread t1=new Thread(new Runnable() { @Override public void run() { for(int i=1;i<=5;i++) { queue.offer(i); } } }); Thread t2=new Thread(new Runnable() { @Override public void run() { for(int i=6;i<=10;i++) { queue.offer(i); } } }); //3启动线程 t1.start(); t2.start(); t1.join(); t2.join(); System.out.println("-------------出队-------------"); //4出队操作 int size=queue.size(); for(int i=0;i<size;i++) { System.out.println(queue.poll()); } } }
17.9.6 BlockingQueue
- Queue的子接口,阻塞的队列,增加了两个线程状态为无限期等待的方法。
- 可用于解决生产生、消费者问题。
推荐方法:
方法名 | 描述 |
void put(E e) | 将指定元素插入此队列中,如果没有可用空间,则等待。 |
E take() | 获取并移除此队列头部元素,如果没有可用元素,则等待。 |
17.9.6.1 ArrayBlockingQueue
- 数组结构实现,有界队列。
- 手工固定上限。
public class TestArrayBlockingQueue { public static void main(String[] args) throws Exception{ //创建一个有界队列,添加数据 ArrayBlockingQueue<String> queue=new ArrayBlockingQueue<>(5); //添加元素 queue.put("aaa"); queue.put("bbb"); queue.put("ccc"); queue.put("ddd"); queue.put("eee"); //删除元素 queue.take(); System.out.println("已经添加了5个元素"); queue.put("xyz"); System.out.println("已经添加了6个元素"); System.out.println(queue.toString()); } }
17.9.6.2 LinkedBlockingQueue
- 链表结构实现,无界队列。
- 默认上限Integer.MAX_VALUE。
- 使用方法和ArrayBlockingQueue相同。
17.9.6.3 案例
使用阻塞队列实现生产者和消费者。
public class Demo7 { public static void main(String[] args) { //1创建队列 ArrayBlockingQueue<Integer> queue=new ArrayBlockingQueue<>(6); //2创建两个线程 Thread t1=new Thread(new Runnable() { @Override public void run() { for(int i=0;i<30;i++) { try { queue.put(i); System.out.println(Thread.currentThread().getName()+"生产了第"+i+"号面包"); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } }, "晨晨"); Thread t2=new Thread(new Runnable() { @Override public void run() { for(int i=0;i<30;i++) { try { Integer num=queue.take(); System.out.println(Thread.currentThread().getName()+"消费了第"+i+"号面包"); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } }, "冰冰"); //启动线程 t1.start(); t2.start(); } }
aaa");
queue.put(“bbb”);
queue.put(“ccc”);
queue.put(“ddd”);
queue.put(“eee”);
//删除元素
queue.take();
System.out.println(“已经添加了5个元素”);
queue.put(“xyz”);
System.out.println(“已经添加了6个元素”);
System.out.println(queue.toString());
}
}
#### 17.9.6.2 LinkedBlockingQueue > + 链表结构实现,无界队列。 > > + 默认上限Integer.MAX_VALUE。 > + 使用方法和ArrayBlockingQueue相同。 #### 17.9.6.3 案例 > 使用阻塞队列实现生产者和消费者。 ```java public class Demo7 { public static void main(String[] args) { //1创建队列 ArrayBlockingQueue<Integer> queue=new ArrayBlockingQueue<>(6); //2创建两个线程 Thread t1=new Thread(new Runnable() { @Override public void run() { for(int i=0;i<30;i++) { try { queue.put(i); System.out.println(Thread.currentThread().getName()+"生产了第"+i+"号面包"); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } }, "晨晨"); Thread t2=new Thread(new Runnable() { @Override public void run() { for(int i=0;i<30;i++) { try { Integer num=queue.take(); System.out.println(Thread.currentThread().getName()+"消费了第"+i+"号面包"); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } }, "冰冰"); //启动线程 t1.start(); t2.start(); } }