5:读写锁(ReadWriteLock)
5.1 使用
package main; import java.util.HashMap; import java.util.Map; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; /* * 独占锁(写锁) 一次只能被一个线程占有 * 共享锁(读锁) 多个线程可以同时占有 * ReadWriteLock * 读 - 读 可以共存 * 读 - 写 不能共存 * 写 - 写 不能共存 * */ public class ReadWriteLockDemo { public static void main(String[] args) { //MyCache myCache =new MyCache(); MyCacheLock myCache =new MyCacheLock(); //写入 for (int i = 0; i < 5; i++) { final int temp = i; new Thread(()->{ myCache.put(temp+"",temp+""); },String.valueOf(i)).start(); } //读取 for (int i = 0; i < 5; i++) { final int temp = i; new Thread(()->{ myCache.get(temp+""); },String.valueOf(i)).start(); } } } class MyCacheLock{ private volatile Map<String,Object> map = new HashMap<>(); //读写锁:更加细粒度的控制 private ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); //存,写 的时候,只希望同时只有一个线程写 public void put(String key,Object value){ readWriteLock.writeLock().lock(); try { System.out.println(Thread.currentThread().getName()+"写入"+key); map.put(key,value); System.out.println(Thread.currentThread().getName()+"写入OK"); } catch (Exception e) { e.printStackTrace(); } finally { readWriteLock.writeLock().unlock(); } } //取,读 所有人都可以读 public void get(String key){ readWriteLock.readLock().lock(); try { System.out.println(Thread.currentThread().getName()+"读取"+key); map.get(key); System.out.println(Thread.currentThread().getName()+"读取OK"); } catch (Exception e) { e.printStackTrace(); } finally { readWriteLock.readLock().unlock(); } } } class MyCache{ private volatile Map<String,Object> map = new HashMap<>(); //存,写 public void put(String key,Object value){ System.out.println(Thread.currentThread().getName()+"写入"+key); map.put(key,value); System.out.println(Thread.currentThread().getName()+"写入OK"); } //取,读 public void get(String key){ System.out.println(Thread.currentThread().getName()+"读取"+key); map.get(key); System.out.println(Thread.currentThread().getName()+"读取OK"); } } 打印结果: 1写入1 1写入OK 2写入2 2写入OK 3写入3 3写入OK 4写入4 4写入OK 5写入5 5写入OK 2读取2 2读取OK 3读取3 3读取OK 5读取5 5读取OK 1读取1 4读取4 4读取OK 1读取OK
6:阻塞队列
6.1:队列
队列(FIFO)先进先出。
写入:如果队列满了,就必须阻塞等待。
读取:如果队列是空的,必须阻塞,等待生产,从而读取消息。
如下图所示:
6.2 :四组API
6.2.1:抛出异常
添加:add()
移除:remove()
判断队首:element()
public class FIFO { public static void main(String[] args) { say(); } public static void say(){ ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3); System.out.println(blockingQueue.add("a")); System.out.println(blockingQueue.add("b")); System.out.println(blockingQueue.add("c")); //Exception in thread "main" java.lang.IllegalStateException: Queue full //System.out.println(blockingQueue.add("d")); System.out.println(blockingQueue.remove()); System.out.println(blockingQueue.remove()); System.out.println(blockingQueue.remove()); //Exception in thread "main" java.util.NoSuchElementException //System.out.println(blockingQueue.remove()); } }
6.2.2:有返回值,不会抛出异常
添加:offer()
移除:poll()
判断队首:peek()
public class FIFO { public static void main(String[] args) { say2(); } public static void say2(){ ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3); //存 System.out.println(blockingQueue.offer("a")); System.out.println(blockingQueue.offer("b")); System.out.println(blockingQueue.offer("c")); System.out.println(blockingQueue.offer("d")); //移除 System.out.println(blockingQueue.poll()); System.out.println(blockingQueue.poll()); System.out.println(blockingQueue.poll()); System.out.println(blockingQueue.poll()); } } 打印: true true true false a b c null
6.2.3:阻塞等待
取:put()
移除:take()
public class FIFO { public static void main(String[] args) throws InterruptedException { say3(); } public static void say3() throws InterruptedException { //队列大小 ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3); blockingQueue.put("a"); blockingQueue.put("b"); blockingQueue.put("c"); //队列没有位置了,第四个一直阻塞 //blockingQueue.put("d"); System.out.println(blockingQueue.take()); System.out.println(blockingQueue.take()); System.out.println(blockingQueue.take()); // 队列中没有元素了,第四个一直阻塞 //System.out.println(blockingQueue.take()); } }
6.2.4:超时等待
public class FIFO { public static void main(String[] args) throws InterruptedException { say4(); } public static void say4() throws InterruptedException { ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3); System.out.println(blockingQueue.offer("a")); System.out.println(blockingQueue.offer("b")); System.out.println(blockingQueue.offer("c")); System.out.println(blockingQueue.offer("d",2, TimeUnit.SECONDS)); System.out.println(blockingQueue.poll()); System.out.println(blockingQueue.poll()); System.out.println(blockingQueue.poll()); System.out.println(blockingQueue.poll(2,TimeUnit.SECONDS)); } }
6.2.5 总结
方法摘要 | |
ArrayBlockingQueue(int capacity) | 创建具有给定(固定)容量和默认访问策略的 ArrayBlockingQueue |
boolean add(E e) | 在插入此队列的尾部,如果有可能立即这样做不超过该队列的容量,返回指定的元素 true成功时与抛出 IllegalStateException如果此队列已满。 |
boolean remove(Object o) | 从该队列中删除指定元素的单个实例(如果存在)。 |
boolean offer(E e) | 如果可以在不超过队列容量的情况下立即将其指定的元素插入该队列的尾部,则在成功时 false如果该队列已满,则返回 true |
boolean offer(E e, long timeout, TimeUnit unit) | 在该队列的尾部插入指定的元素,等待指定的等待时间,以使空间在队列已满时变为可用 |
E poll() | 检索并删除此队列的头,如果此队列为空,则返回 null 。 |
E poll(long timeout, TimeUnit unit) | 检索并删除此队列的头,等待指定的等待时间(如有必要)使元素变为可用 |
void put(E e) | 在该队列的尾部插入指定的元素,如果队列已满,则等待空间变为可用 |
E take() | 检索并删除此队列的头,如有必要,等待元素可用 |
E peek() | 检索但不删除此队列的头,如果此队列为空,则返回 null |
6.3: SynchronousQueue(同步队列)
public class 同步队列 { public static void main(String[] args) { BlockingQueue<String> queue = new SynchronousQueue<>(); new Thread(()->{ try { System.out.println(Thread.currentThread().getName()+"执行了a操作"); queue.put("a"); System.out.println(Thread.currentThread().getName()+"执行了b操作"); queue.put("b"); System.out.println(Thread.currentThread().getName()+"执行了c操作"); queue.put("c"); } catch (InterruptedException e) { throw new RuntimeException(e); } },"A").start(); new Thread(()->{ try { TimeUnit.SECONDS.sleep(3); System.out.println(Thread.currentThread().getName()+"读取了====a操作"); queue.take(); TimeUnit.SECONDS.sleep(3); System.out.println(Thread.currentThread().getName()+"读取了====b操作"); queue.take(); TimeUnit.SECONDS.sleep(3); System.out.println(Thread.currentThread().getName()+"读取了====c操作"); queue.take(); } catch (InterruptedException e) { throw new RuntimeException(e); } },"B").start(); } } 打印结果: A执行了a操作 B读取了====a操作 A执行了b操作 B读取了====b操作 A执行了c操作 B读取了====c操作
7:线程池
三大方法,7大参数,4种拒绝策略
7.1 线程池的好处
1. 降低资源的消耗
2. 提高响应的速度
3. 方便管理
线程复用,可以控制最大并发数,管理线程。
7.1.1 3大方法
public class pool { public static void main(String[] args) { ExecutorService threadPool = Executors.newCachedThreadPool();// 缓存线程池(遇强则强) ExecutorService threadPool2 = Executors.newFixedThreadPool(6);//固定线程池大小 ExecutorService threadPool3 = Executors.newSingleThreadExecutor();//单一线程 try { for (int i = 0; i < 100; i++) { threadPool.execute(()->{ System.out.println(Thread.currentThread().getName()); }); } } catch (Exception e) { throw new RuntimeException(e); } finally { threadPool.shutdown(); } } }
7.1.2 7大参数
ThreadPoolExecutor
构造方法 :
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
创建一个新的 ThreadPoolExecutor与给定的初始参数。
参数 | |
corePoolSize | - (核心线程数)即使空闲时仍保留在池中的线程数,除非设置 allowCoreThreadTimeOut |
maximumPoolSize | - 池中允许的最大线程数 |
keepAliveTime | - 当线程数大于内核时,这是多余的空闲线程在终止前等待新任务的最大时间。 |
unit | - keepAliveTime参数的时间单位 |
workQueue | - 用于在执行任务之前使用的队列。 这个队列将仅保存execute方法提交的Runnable任务。(阻塞队列) |
threadFactory | - 执行程序创建新线程时使用的工厂(线程工厂) |
handler | - 执行被阻止时使用的处理程序,因为达到线程限制和队列容量(拒绝策略) |
7.1.2 四种拒绝策略
RejectedExecutionHandler | |
ThreadPoolExecutor.AbortPolicy | 被拒绝的任务的处理程序,抛出一个 RejectedExecutionException 。(银行满了,还有人进来,不处理这个人的,抛出异常) |
ThreadPoolExecutor.CallerRunsPolicy | 一个被拒绝的任务的处理程序,直接在 execute方法的调用线程中运行被拒绝的任务,除非执行程序已被关闭,否则这个任务被丢弃。(哪来回哪去) |
ThreadPoolExecutor.DiscardOldestPolicy | 被拒绝的任务的处理程序,丢弃最旧的未处理请求,然后重试 execute ,除非执行程序被关闭,在这种情况下,任务被丢弃。 |
ThreadPoolExecutor.DiscardPolicy | 被拒绝的任务的处理程序静默地丢弃被拒绝的任务 |
7.1.2.1 代码示例:
/** * new ThreadPoolExecutor.AbortPolicy() 银行满了,还有人进来,不处理这个人的,抛出异常) * new ThreadPoolExecutor.CallerRunsPolicy() 哪来的回哪去 * new ThreadPoolExecutor.DiscardOldestPolicy() 队列满了不会抛出异常 * new ThreadPoolExecutor.DiscardOldestPolicy() 队列满了。尝试去跟第一个线程竞争。如果没竞争过,还是回丢弃任务 * 不会抛出异常 */ public class newPool { public static void main(String[] args) { ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(2,5,3, TimeUnit.SECONDS, new LinkedBlockingQueue<>(3), Executors.defaultThreadFactory(), new ThreadPoolExecutor.DiscardOldestPolicy()); try { //最大承载:Queue + max = 5 + 3 = 8 //超过RejectedExecution for (int i = 0; i < 9 ; i++) { poolExecutor.execute(()->{ System.out.println(Thread.currentThread().getName()); }); } } catch (Exception e) { throw new RuntimeException(e); } finally { poolExecutor.shutdown(); } } }
8:了解CPU密集型和IO密集型(池的大小)
package main; import java.util.concurrent.*; public class ExecutorServiceDemo { public static void main(String[] args) { /* * 最大线程到底该如何定义 * 1.CPU密集型,几核,最大线程就是几,可以保持CPU的效率最高 * 2.IO密集型 大于你程序中十分耗IO的线程 * */ ExecutorService threadPool = new ThreadPoolExecutor( 2, Runtime.getRuntime().availableProcessors(), 3, TimeUnit.SECONDS, new ArrayBlockingQueue<>(3), Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy()); try { //最大承载:Queue + max //超过RejectedExecution for (int i = 0; i < 9; i++) { //使用线程池之后,使用线程池来创建线程 threadPool.execute(()->{ System.out.println(Thread.currentThread().getName()+" : ok"); }); } } catch (Exception e) { e.printStackTrace(); } finally { threadPool.shutdown(); } } }