JUC并发编程【java提高】2

简介: JUC并发编程【java提高】2

CopyOnWriteArrayList比Vector厉害在哪里?

Vector底层是使用synchronized关键字来实现的:效率特别低下。

CopyOnWriteArrayList使用的是Lock锁,效率会更加高效!


Set 不安全

package com.kuang.unsafe;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArraySet;
/**
 * 同理可证:ConcurrentModificationException
 * 1、Set<String> set = Collections.synchronizedSet(new HashSet<>());
 * 2、Set<String> set = new CopyOnWriteArraySet<>();
 */
public class SetTest {
    public static void main(String[] args) {
//        Set<String> set = new HashSet<>();
//        Set<String> set = Collections.synchronizedSet(new HashSet<>());
        Set<String> set = new CopyOnWriteArraySet<>();
        for (int i = 1; i <= 30; i++) {
            new Thread(()->{
                set.add(UUID.randomUUID().toString().substring(0,5));
                System.out.println(set);
            },String.valueOf(i)).start();
        }
    }
}

HashSet底层是什么?

hashSet底层就是一个HashMap




HashMap 不安全

package com.kuang.unsafe;
import java.util.HashMap;
import java.util.Map;
public class MapTest {
    public static void main(String[] args) {
        //Map是这样用的吗? 不是,工作中不用HashMap
        // 默认等价于什么? new HashMap<>(16,0.75)
        Map<String,String> map = new HashMap<>();
        //加载因子、初始化容量
    }
}


同样的HashMap基础类也存在并发修改异常!

package com.kuang.unsafe;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
//ConcurrentModificationException
public class MapTest {
    public static void main(String[] args) {
        //Map是这样用的吗? 不是,工作中不用HashMap
        // 默认等价于什么? new HashMap<>(16,0.75)
//        Map<String,String> map = new HashMap<>();
        //唯一的一个家庭作业,探究ConcurrentHashMap的源码
        Map<String,String> map = new ConcurrentHashMap<>();
        for (int i = 1; i <= 30; i++) {
            new Thread(()->{
                map.put(Thread.currentThread().getName(),UUID.randomUUID().toString().substring(0,5));
                System.out.println(map);
            },String.valueOf(i)).start();
        }
    }
}

7、Callable(简单)


1、可以有返回值;

2、可以抛出异常;

3、方法不同,run()/call()

package com.kuang.callable;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
public class CallableTest {
    public static void main(String[] args) {
//        new Thread(new Runnable()).start();
//        new Thread(new FutureTask<V>()).start();
//        new Thread(new FutureTask<V>(callable)).start();
//        new Thread().start();//怎么启动callable
        MyThread thread = new MyThread();
        //适配类
        FutureTask futureTask = new FutureTask(thread); //适配类
        new Thread(futureTask,"A").start();
        new Thread(futureTask,"B").start();//结果会被缓存,效率高
        try {
            String s = (String) futureTask.get();//callable的返回值,这个get方法可能会产生阻塞,把他放在最后
            //或者使用异步通信来处理
            System.out.println(s);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }
}
class MyThread implements Callable<String> {
    @Override
    public String call() throws Exception {
        System.out.println("call");//会打印1个call
        return "1234";
    }
}

细节:

1、有缓存

2、结果可能需要等待,会阻塞

8、常用的辅助类(必会)

8.1、CountDownLatch



package com.kuang.add;
import java.util.concurrent.CountDownLatch;
//计数器
public class CountDownLatchDemo {
    public static void main(String[] args) throws InterruptedException {
        //总数是6,必须要执行任务的时候,再使用
        CountDownLatch countDownLatch = new CountDownLatch(6);
        for (int i = 1; i <=6; i++) {
            new Thread(()->{
                System.out.println(Thread.currentThread().getName()+"Go out");
                countDownLatch.countDown();//-1
            },String.valueOf(i)).start();
        }
        countDownLatch.await();// 等待计数器归零 然后向下执行
        System.out.println("Close Door");
    }
}

主要方法:

countDownLatch.countDown(); //减一操作;

countDownLatch.await();// 等待计数器归零

await 等待计数器归零,就唤醒,再继续向下运行

8.2、CyclicBarrier


加法计数器

package com.kuang.add;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierDemo {
    public static void main(String[] args) {
        //集齐7颗龙珠召唤神龙
        //召唤龙珠的线程
        CyclicBarrier cyclicBarrier = new CyclicBarrier(7,()->{
            System.out.println("召唤神龙成功");
        });
        for (int i = 1; i <= 7; i++) {
            final int temp=i;
            //lambda表达式操作到i吗?
            new Thread(()->{
                System.out.println(Thread.currentThread().getName()+"收集"+temp+"个龙珠");
                try {
                    cyclicBarrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }
}

8.3、Semaphore


抢车位

6车–3个停车位置

package com.kuang.add;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
public class SemaphoreDemo {
    public static void main(String[] args) {
        //线程数量:停车位!限流
        Semaphore semaphore = new Semaphore(3);
        for (int i = 1; i <= 6; i++) {
            new Thread(()->{
                //acquire()
                try {
                    semaphore.acquire();//获得,如果满了,会等待被释放为止
                    System.out.println(Thread.currentThread().getName()+"抢到车位");
                    TimeUnit.SECONDS.sleep(2);
                    System.out.println(Thread.currentThread().getName()+"离开车位");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    semaphore.release();//释放
                }
            },String.valueOf(i)).start();
        }
    }
}

原理:

semaphore.acquire();//获得资源,如果资源已经使用完了,就等待资源释放后再进行使用!

semaphore.release();//释放,会将当前的信号量释放+1,然后唤醒等待的线程!

作用: 多个共享资源互斥的使用! 并发限流,控制最大的线程数!

9、读写锁



package com.kuang.rw;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
//ReadWriteLock
public class ReadWriteLockDemo {
    public static void main(String[] args) {
        MyCache myCatch = new MyCache();
        //写入
        for (int i = 0; i <= 5; i++) {
            final int temp = i;
            new Thread(()->{
                myCatch.put(temp+"",temp+"");
            },String.valueOf(i)).start();
        }
        //读取
        for (int i = 0; i <=5; i++) {
            final int temp = i;
            new Thread(()->{
                myCatch.get(temp+"");
            },String.valueOf(i)).start();
        }
    }
}
//    自定义缓存
class MyCache {
    private volatile Map<String,Object> map = new HashMap<>(0);
    //存
    public void put(String key,Object value) {
        System.out.println(Thread.currentThread().getName()+"写入"+value);
        map.put(key, value);
        System.out.println(Thread.currentThread().getName()+"写入成功");
    }
    //取
    public void get(String key) {
        System.out.println(Thread.currentThread().getName()+"读取"+key);
        Object o = map.get(key);
        System.out.println(Thread.currentThread().getName()+"读取成功");
    }
}
结果:
同时写的问题

所以如果我们不加锁的情况,多线程的读写会造成数据不可靠的问题。

我们也可以采用synchronized这种重量锁和轻量锁 lock去保证数据的可靠。

但是这次我们采用更细粒度的锁:ReadWriteLock 读写锁来保证

package com.kuang.rw;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
 * 独占锁(写锁) 一次只能被一个线程占有
 * 共享锁(读锁) 可以同时被多个线程占有
 * ReadWriteLock
 * 读--读 可以共存
 * 读--写 不能共存
 * 写--写 不能共存
 */
public class ReadWriteLockDemo {
    public static void main(String[] args) {
//        MyCache myCatch = new MyCache();
        MyCacheLock myCatch = new MyCacheLock();
        //写入
        for (int i = 0; i <= 5; i++) {
            final int temp = i;
            new Thread(()->{
                myCatch.put(temp+"",temp+"");
            },String.valueOf(i)).start();
        }
        //读取
        for (int i = 0; i <=5; i++) {
            final int temp = i;
            new Thread(()->{
                myCatch.get(temp+"");
            },String.valueOf(i)).start();
        }
    }
}
//    自定义缓存
class MyCache {
    private volatile Map<String,Object> map = new HashMap<>(0);
    //存
    public void put(String key,Object value) {
        System.out.println(Thread.currentThread().getName()+"写入"+value);
        map.put(key, value);
        System.out.println(Thread.currentThread().getName()+"写入成功");
    }
    //取
    public void get(String key) {
        System.out.println(Thread.currentThread().getName()+"读取"+key);
        Object o = map.get(key);
        System.out.println(Thread.currentThread().getName()+"读取成功");
    }
}
//加锁的
class MyCacheLock {
    private volatile Map<String,Object> map = new HashMap<>(0);
    //读写锁:更加细粒度的控制
    private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
//    private Lock lock=new ReentrantLock();
    //存,写的时候,只希望同时有一个线程写
    public void put(String key,Object value) {
        readWriteLock.writeLock().lock();
        try {
            System.out.println(Thread.currentThread().getName()+"写入"+key);
            map.put(key, value);
            System.out.println(Thread.currentThread().getName()+"写入成功");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            readWriteLock.writeLock().unlock();
        }
    }
    //取,读,所有的人都可以读
    public void get(String key) {
        readWriteLock.readLock().lock();
        try {
            System.out.println(Thread.currentThread().getName()+"读取"+key);
            Object o = map.get(key);
            System.out.println(Thread.currentThread().getName()+"读取成功");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            readWriteLock.readLock().unlock();
        }
    }
}
结果:
只有一个写

独占锁(写锁) 一次只能被一个线程占有

共享锁(读锁) 可以同时被多个线程占有

10、阻塞队列



BlockingQueue 是Collection的一个子类


什么情况下我们会使用阻塞队列

多线程并发处理、线程池

学会使用队列

添加、移除

四组API

1.抛出异常

2.不会抛出异常

3.阻塞等待

4.超时等待

抛出异常

package com.kuang.bq;
import java.util.concurrent.ArrayBlockingQueue;
public class Test {
    public static void main(String[] args) {
       test1();
    }
    /**
     * 抛出异常
     */
    public static void test1(){
        //队列的大小
        ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
        System.out.println(blockingQueue.add("a"));
        System.out.println(blockingQueue.add("b"));
        System.out.println(blockingQueue.add("c"));
        //抛出异常 IllegalStateException
//        System.out.println(blockingQueue.add("c"));
        System.out.println(blockingQueue.element());//查看队首元素是谁
        System.out.println("======================");
        System.out.println(blockingQueue.remove());
        System.out.println(blockingQueue.remove());
        System.out.println(blockingQueue.remove());
        //抛出异常 NoSuchElementException
        System.out.println(blockingQueue.remove());
    }
}

不抛出异常

   /**
     *有返回值,没有异常
     */
    public static void test2(){
        //队列的大小
        ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
        System.out.println(blockingQueue.offer("a"));
        System.out.println(blockingQueue.offer("b"));
        System.out.println(blockingQueue.offer("c"));
//        System.out.println(blockingQueue.offer("d"));//false 不抛出异常
      System.out.println(blockingQueue.peek());//查看队首元素是谁
        System.out.println("======================");
        System.out.println(blockingQueue.poll());
        System.out.println(blockingQueue.poll());
        System.out.println(blockingQueue.poll());
        System.out.println(blockingQueue.poll());//null 不抛出异常
    }

等待,阻塞(一直阻塞)

  /**
     * 等待,阻塞(一直阻塞)
     */
    public static void test3() throws InterruptedException {
        //队列的大小
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue<>(3);
        arrayBlockingQueue.put("a");
        arrayBlockingQueue.put("b");
        arrayBlockingQueue.put("c");
//        arrayBlockingQueue.put("d");队列没有位置,一直阻塞
        System.out.println("=============");
        //队列移除顺序
        System.out.println(arrayBlockingQueue.take());
        System.out.println(arrayBlockingQueue.take());
        System.out.println(arrayBlockingQueue.take());
        System.out.println(arrayBlockingQueue.take());//没有这个元素,一直阻塞
    }

等待,阻塞(等待超时)

  /**
     * 等待,阻塞(等待超时)
     */
    public static void test4() throws InterruptedException {
        //队列的大小
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue<>(3);
        System.out.println(arrayBlockingQueue.offer("a"));
        System.out.println(arrayBlockingQueue.offer("b"));
        System.out.println(arrayBlockingQueue.offer("c"));
        //等待超过两秒退出
        arrayBlockingQueue.offer("d", 2, TimeUnit.SECONDS);
        System.out.println(arrayBlockingQueue.poll());
        System.out.println(arrayBlockingQueue.poll());
        //等待超过两秒就退出
        System.out.println(arrayBlockingQueue.poll(2, TimeUnit.SECONDS));
    }

SynchronousQueue 同步队列

同步队列 没有容量,也可以视为容量为1的队列;

put方法 和 take方法;

package com.kuang.bq;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
/**
 * 同步队列 没有容量,也可以视为容量为1的队列;
 * Synchronized 和 其他的BlockingQueue 不一样 它不存储元素;
 *
 * put了一个元素,就必须从里面先take出来,否则不能再put进去值!
 */
public class SynchronousQueueDemo {
    public static void main(String[] args) {
        BlockingQueue<String> synchronousQueue = new java.util.concurrent.SynchronousQueue<>();
        // 往queue中添加元素
        new Thread(() -> {
            try {
                System.out.println(Thread.currentThread().getName() + " put 1");
                synchronousQueue.put("1");
                System.out.println(Thread.currentThread().getName() + " put 2");
                synchronousQueue.put("2");
                System.out.println(Thread.currentThread().getName() + " put 3");
                synchronousQueue.put("3");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"T1").start();
        // 取出元素
        new Thread(()-> {
            try {
                TimeUnit.SECONDS.sleep(3);
                System.out.println(Thread.currentThread().getName() + " take " + synchronousQueue.take());
                TimeUnit.SECONDS.sleep(3);
                System.out.println(Thread.currentThread().getName() + " take " + synchronousQueue.take());
                TimeUnit.SECONDS.sleep(3);
                System.out.println(Thread.currentThread().getName() + " take " + synchronousQueue.take());
            }catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"T2").start();
    }
}

结果

T1 put 1
T2 take 1
T1 put 2
T2 take 2
T1 put 3
T2 take 3

11、线程池(重点)

线程池:三大方式、七大参数、四种拒绝策略

池化技术

程序的运行,本质:占用系统的资源!我们需要去优化资源的使用 ===> 池化技术

线程池、JDBC的连接池、内存池、对象池 等等。。。。

池化技术:事先准备好一些资源,如果有人要用,就来我这里拿,用完之后还给我,以此来提高效率。
线程池的好处

1.降低资源的消耗

2.提高响应速度

3.方便管理

线程可以复用,可以控制最大并发量,管理线程
线程池:三大方法


ExecutorService threadPool = Executors.newSingleThreadExecutor();//单个线程

ExecutorService threadPool2 = Executors.newFixedThreadPool(5); //创建一个固定的线程池的大小

ExecutorService threadPool3 = Executors.newCachedThreadPool(); //可伸缩的

package com.kuang.pool;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
//工具类 Executors 三大方法;
//使用了线程池之后,使用线程池创建线程
public class Demo01 {
    public static void main(String[] args) {
//        ExecutorService threadPool = Executors.newSingleThreadExecutor();//单个线程
//        ExecutorService threadPool = Executors.newFixedThreadPool(5); //创建一个固定的线程池的大小
        ExecutorService threadPool = Executors.newCachedThreadPool(); //可伸缩的
        //线程池用完必须要关闭线程池
        try {
            for (int i = 1; i <=10 ; i++) {
                //通过线程池创建线程
                threadPool.execute(()->{
                    System.out.println(Thread.currentThread().getName()+ " ok");
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            threadPool.shutdown();
        }
    }
}

// ExecutorService threadPool = Executors.newSingleThreadExecutor();//单个线程

pool-1-thread-1 ok
pool-1-thread-1 ok
pool-1-thread-1 ok
pool-1-thread-1 ok
pool-1-thread-1 ok
pool-1-thread-1 ok
pool-1-thread-1 ok
pool-1-thread-1 ok
pool-1-thread-1 ok
pool-1-thread-1 ok

// ExecutorService threadPool = Executors.newFixedThreadPool(5);//创建一个固定大小得线程池

pool-1-thread-2 ok
pool-1-thread-2 ok
pool-1-thread-2 ok
pool-1-thread-2 ok
pool-1-thread-2 ok
pool-1-thread-4 ok
pool-1-thread-3 ok
pool-1-thread-1 ok
pool-1-thread-2 ok
pool-1-thread-5 ok

ExecutorService threadPool = Executors.newCachedThreadPool();//可伸缩,线程数可变

pool-1-thread-3 ok
pool-1-thread-4 ok
pool-1-thread-1 ok
pool-1-thread-5 ok
pool-1-thread-2 ok
pool-1-thread-6 ok
pool-1-thread-8 ok
pool-1-thread-9 ok
pool-1-thread-10 ok
pool-1-thread-7 ok

7大参数

newSingleThreadExecutor()源码分析

 /**
     * Creates an Executor that uses a single worker thread operating
     * off an unbounded queue. (Note however that if this single
     * thread terminates due to a failure during execution prior to
     * shutdown, a new one will take its place if needed to execute
     * subsequent tasks.)  Tasks are guaranteed to execute
     * sequentially, and no more than one task will be active at any
     * given time. Unlike the otherwise equivalent
     * {@code newFixedThreadPool(1)} the returned executor is
     * guaranteed not to be reconfigurable to use additional threads.
     *
     * @return the newly created single-threaded Executor
     */
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

newFixedThreadPool()源码分析

 /**
     * Creates a thread pool that reuses a fixed number of threads
     * operating off a shared unbounded queue.  At any point, at most
     * {@code nThreads} threads will be active processing tasks.
     * If additional tasks are submitted when all threads are active,
     * they will wait in the queue until a thread is available.
     * If any thread terminates due to a failure during execution
     * prior to shutdown, a new one will take its place if needed to
     * execute subsequent tasks.  The threads in the pool will exist
     * until it is explicitly {@link ExecutorService#shutdown shutdown}.
     *
     * @param nThreads the number of threads in the pool
     * @return the newly created thread pool
     * @throws IllegalArgumentException if {@code nThreads <= 0}
     */
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

newCachedThreadPool()源码分析

 /**
     * Creates a thread pool that creates new threads as needed, but
     * will reuse previously constructed threads when they are
     * available.  These pools will typically improve the performance
     * of programs that execute many short-lived asynchronous tasks.
     * Calls to {@code execute} will reuse previously constructed
     * threads if available. If no existing thread is available, a new
     * thread will be created and added to the pool. Threads that have
     * not been used for sixty seconds are terminated and removed from
     * the cache. Thus, a pool that remains idle for long enough will
     * not consume any resources. Note that pools with similar
     * properties but different details (for example, timeout parameters)
     * may be created using {@link ThreadPoolExecutor} constructors.
     *
     * @return the newly created thread pool
     */
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

可以看到,三个方法的底层都是new ThreadPoolExecutor

public ThreadPoolExecutor(int corePoolSize,  //核心线程池大小
                          int maximumPoolSize, //最大的线程池大小
                          long keepAliveTime,  //超时了没有人调用就会释放
                          TimeUnit unit, //超时单位
                          BlockingQueue<Runnable> workQueue, //阻塞队列
                          ThreadFactory threadFactory, //线程工厂 创建线程的 一般不用动
                          RejectedExecutionHandler handler //拒绝策略
                         ) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}


阿里巴巴的Java操作手册中明确说明:对于Integer.MAX_VALUE初始值较大,所以一般情况我们要使用底层的ThreadPoolExecutor来创建线程池。


手动创建一个线程池


模拟上面的银行业务

核心线程大小设为2:就是一直工作的窗口

最大线程设为5:就是银行最多的工作窗口

keepAliveTime设置为1小时:如果1小时都没有业务,就关闭窗口

候客区:new LinkedBlockingQueue(3),假设候客区最多3个人

线程工厂:就用默认的,Executors.defaultThreaFactory()

拒绝策略: 可以发现有4种拒绝策略,用默认的AbortPolicy()//银行满了,但是还有人进来,就不处理这个人,并抛出异常

package com.kuang.pool;
import java.util.concurrent.*;
public class PollDemo {
    public static void main(String[] args) {
        //自定义线程池!工作 ThreadPoolExecutor
        ExecutorService threadPool =new ThreadPoolExecutor(
                2,
                5,
                3,
                TimeUnit.SECONDS,
                new LinkedBlockingDeque<>(3),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy()//银行满了,还有人精力,不处理这个人,抛出异常
        );
        try {
            for (int i = 1; i <= 8; i++) {  // 1 5 6 7 8 9(RejectedExecutionException)
                threadPool.execute(() -> {
                    System.out.println(Thread.currentThread().getName() + "ok");
                });
            }
        }catch (Exception e) {
            e.printStackTrace();
        }
        finally {
            threadPool.shutdown();
        }
    }
}

4种拒绝策略

  1. new ThreadPoolExecutor.AbortPolicy(): //该拒绝策略为:银行满了,还有人进来,不处理这个人的,并抛出异常
    超出最大承载,就会抛出异常:队列容量大小+maxPoolSize
pool-1-thread-1ok
pool-1-thread-3ok
pool-1-thread-3ok
pool-1-thread-3ok
pool-1-thread-2ok
pool-1-thread-4ok
pool-1-thread-5ok
pool-1-thread-1ok
java.util.concurrent.RejectedExecutionException: Task com.kuang.pool.PollDemo$$Lambda$1/1096979270@7ba4f24f rejected from java.util.concurrent.ThreadPoolExecutor@3b9a45b3[Running, pool size = 5, active threads = 0, queued tasks = 0, completed tasks = 8]
  at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
  at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
  at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
  at com.kuang.pool.PollDemo.main(PollDemo.java:19)
Process finished with exit code 0
  1. new ThreadPoolExecutor.CallerRunsPolicy(): //该拒绝策略为:哪来的去哪里 main线程进行处理
pool-1-thread-1ok
pool-1-thread-4ok
mainok
pool-1-thread-3ok
pool-1-thread-4ok
pool-1-thread-2ok
pool-1-thread-3ok
pool-1-thread-1ok
pool-1-thread-5ok
Process finished with exit code 0
  1. new ThreadPoolExecutor.DiscardPolicy(): //该拒绝策略为:队列满了,丢掉任务,不会抛出异常。
pool-1-thread-1ok
pool-1-thread-4ok
pool-1-thread-3ok
pool-1-thread-2ok
pool-1-thread-4ok
pool-1-thread-3ok
pool-1-thread-5ok
pool-1-thread-1ok
Process finished with exit code 0
  1. new ThreadPoolExecutor.DiscardOldestPolicy(): //该拒绝策略为:队列满了,尝试去和最早的进程竞争,不会抛出异常
pool-1-thread-2ok
pool-1-thread-4ok
pool-1-thread-3ok
pool-1-thread-4ok
pool-1-thread-2ok
pool-1-thread-1ok
pool-1-thread-3ok
pool-1-thread-5ok
Process finished with exit code 0

小结和扩展

如何设置线程池的最大大小maximumPoolSize

了解CPU密集型I/O密集型

1、CPU密集型:电脑的核数是几核就选择几;选择maximunPoolSize的大小

    // 获取cpu 的核数
        int max = Runtime.getRuntime().availableProcessors();
        ExecutorService service =new ThreadPoolExecutor(
                2,
                max,
                3,
                TimeUnit.SECONDS,
                new LinkedBlockingDeque<>(3),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy()
        );

2、I/O密集型:

在程序中有15个大型任务,io十分占用资源;I/O密集型就是判断我们程序中十分耗I/O的线程数量,大约是最大I/O数的一倍到两倍之间。

相关文章
|
1月前
|
Java 编译器 开发者
深入理解Java内存模型(JMM)及其对并发编程的影响
【9月更文挑战第37天】在Java的世界里,内存模型是隐藏在代码背后的守护者,它默默地协调着多线程环境下的数据一致性和可见性问题。本文将揭开Java内存模型的神秘面纱,带领读者探索其对并发编程实践的深远影响。通过深入浅出的方式,我们将了解内存模型的基本概念、工作原理以及如何在实际开发中正确应用这些知识,确保程序的正确性和高效性。
|
2月前
|
安全 Java API
JAVA并发编程JUC包之CAS原理
在JDK 1.5之后,Java API引入了`java.util.concurrent`包(简称JUC包),提供了多种并发工具类,如原子类`AtomicXX`、线程池`Executors`、信号量`Semaphore`、阻塞队列等。这些工具类简化了并发编程的复杂度。原子类`Atomic`尤其重要,它提供了线程安全的变量更新方法,支持整型、长整型、布尔型、数组及对象属性的原子修改。结合`volatile`关键字,可以实现多线程环境下共享变量的安全修改。
|
26天前
|
存储 消息中间件 安全
JUC组件实战:实现RRPC(Java与硬件通过MQTT的同步通信)
【10月更文挑战第9天】本文介绍了如何利用JUC组件实现Java服务与硬件通过MQTT的同步通信(RRPC)。通过模拟MQTT通信流程,使用`LinkedBlockingQueue`作为消息队列,详细讲解了消息发送、接收及响应的同步处理机制,包括任务超时处理和内存泄漏的预防措施。文中还提供了具体的类设计和方法实现,帮助理解同步通信的内部工作原理。
JUC组件实战:实现RRPC(Java与硬件通过MQTT的同步通信)
|
8天前
|
存储 缓存 安全
Java内存模型(JMM):深入理解并发编程的基石####
【10月更文挑战第29天】 本文作为一篇技术性文章,旨在深入探讨Java内存模型(JMM)的核心概念、工作原理及其在并发编程中的应用。我们将从JMM的基本定义出发,逐步剖析其如何通过happens-before原则、volatile关键字、synchronized关键字等机制,解决多线程环境下的数据可见性、原子性和有序性问题。不同于常规摘要的简述方式,本摘要将直接概述文章的核心内容,为读者提供一个清晰的学习路径。 ####
25 2
|
2月前
|
存储 缓存 安全
【Java面试题汇总】多线程、JUC、锁篇(2023版)
线程和进程的区别、CAS的ABA问题、AQS、哪些地方使用了CAS、怎么保证线程安全、线程同步方式、synchronized的用法及原理、Lock、volatile、线程的六个状态、ThreadLocal、线程通信方式、创建方式、两种创建线程池的方法、线程池设置合适的线程数、线程安全的集合?ConcurrentHashMap、JUC
【Java面试题汇总】多线程、JUC、锁篇(2023版)
|
2月前
|
Java 开发者
深入探索Java中的并发编程
本文将带你领略Java并发编程的奥秘,揭示其背后的原理与实践。通过深入浅出的解释和实例,我们将探讨Java内存模型、线程间通信以及常见并发工具的使用方法。无论是初学者还是有一定经验的开发者,都能从中获得启发和实用的技巧。让我们一起开启这场并发编程的奇妙之旅吧!
30 5
|
2月前
|
算法 安全 Java
Java中的并发编程是如何实现的?
Java中的并发编程是通过多线程机制实现的。Java提供了多种工具和框架来支持并发编程。
18 1
|
2月前
|
缓存 监控 Java
Java中的并发编程:理解并应用线程池
在Java的并发编程中,线程池是提高应用程序性能的关键工具。本文将深入探讨如何有效利用线程池来管理资源、提升效率和简化代码结构。我们将从基础概念出发,逐步介绍线程池的配置、使用场景以及最佳实践,帮助开发者更好地掌握并发编程的核心技巧。
|
数据采集 Java 大数据
java高并发系列 - 第14天:JUC中的LockSupport工具类,必备技能
java高并发系列 - 第14天:JUC中的LockSupport工具类,必备技能这是java高并发系列第14篇文章。 本文主要内容: 讲解3种让线程等待和唤醒的方法,每种方法配合具体的示例介绍LockSupport主要用法对比3种方式,了解他们之间的区别LockSupport位于java.util.concurrent(简称juc)包中,算是juc中一个基础类,juc中很多地方都会使用LockSupport,非常重要,希望大家一定要掌握。
1239 0
|
3天前
|
安全 Java 测试技术
Java并行流陷阱:为什么指定线程池可能是个坏主意
本文探讨了Java并行流的使用陷阱,尤其是指定线程池的问题。文章分析了并行流的设计思想,指出了指定线程池的弊端,并提供了使用CompletableFuture等替代方案。同时,介绍了Parallel Collector库在处理阻塞任务时的优势和特点。