JUC并发编程【java提高】2

简介: JUC并发编程【java提高】2

CopyOnWriteArrayList比Vector厉害在哪里?

Vector底层是使用synchronized关键字来实现的:效率特别低下。

CopyOnWriteArrayList使用的是Lock锁,效率会更加高效!


Set 不安全

package com.kuang.unsafe;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArraySet;
/**
 * 同理可证:ConcurrentModificationException
 * 1、Set<String> set = Collections.synchronizedSet(new HashSet<>());
 * 2、Set<String> set = new CopyOnWriteArraySet<>();
 */
public class SetTest {
    public static void main(String[] args) {
//        Set<String> set = new HashSet<>();
//        Set<String> set = Collections.synchronizedSet(new HashSet<>());
        Set<String> set = new CopyOnWriteArraySet<>();
        for (int i = 1; i <= 30; i++) {
            new Thread(()->{
                set.add(UUID.randomUUID().toString().substring(0,5));
                System.out.println(set);
            },String.valueOf(i)).start();
        }
    }
}

HashSet底层是什么?

hashSet底层就是一个HashMap




HashMap 不安全

package com.kuang.unsafe;
import java.util.HashMap;
import java.util.Map;
public class MapTest {
    public static void main(String[] args) {
        //Map是这样用的吗? 不是,工作中不用HashMap
        // 默认等价于什么? new HashMap<>(16,0.75)
        Map<String,String> map = new HashMap<>();
        //加载因子、初始化容量
    }
}


同样的HashMap基础类也存在并发修改异常!

package com.kuang.unsafe;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
//ConcurrentModificationException
public class MapTest {
    public static void main(String[] args) {
        //Map是这样用的吗? 不是,工作中不用HashMap
        // 默认等价于什么? new HashMap<>(16,0.75)
//        Map<String,String> map = new HashMap<>();
        //唯一的一个家庭作业,探究ConcurrentHashMap的源码
        Map<String,String> map = new ConcurrentHashMap<>();
        for (int i = 1; i <= 30; i++) {
            new Thread(()->{
                map.put(Thread.currentThread().getName(),UUID.randomUUID().toString().substring(0,5));
                System.out.println(map);
            },String.valueOf(i)).start();
        }
    }
}

7、Callable(简单)


1、可以有返回值;

2、可以抛出异常;

3、方法不同,run()/call()

package com.kuang.callable;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
public class CallableTest {
    public static void main(String[] args) {
//        new Thread(new Runnable()).start();
//        new Thread(new FutureTask<V>()).start();
//        new Thread(new FutureTask<V>(callable)).start();
//        new Thread().start();//怎么启动callable
        MyThread thread = new MyThread();
        //适配类
        FutureTask futureTask = new FutureTask(thread); //适配类
        new Thread(futureTask,"A").start();
        new Thread(futureTask,"B").start();//结果会被缓存,效率高
        try {
            String s = (String) futureTask.get();//callable的返回值,这个get方法可能会产生阻塞,把他放在最后
            //或者使用异步通信来处理
            System.out.println(s);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }
}
class MyThread implements Callable<String> {
    @Override
    public String call() throws Exception {
        System.out.println("call");//会打印1个call
        return "1234";
    }
}

细节:

1、有缓存

2、结果可能需要等待,会阻塞

8、常用的辅助类(必会)

8.1、CountDownLatch



package com.kuang.add;
import java.util.concurrent.CountDownLatch;
//计数器
public class CountDownLatchDemo {
    public static void main(String[] args) throws InterruptedException {
        //总数是6,必须要执行任务的时候,再使用
        CountDownLatch countDownLatch = new CountDownLatch(6);
        for (int i = 1; i <=6; i++) {
            new Thread(()->{
                System.out.println(Thread.currentThread().getName()+"Go out");
                countDownLatch.countDown();//-1
            },String.valueOf(i)).start();
        }
        countDownLatch.await();// 等待计数器归零 然后向下执行
        System.out.println("Close Door");
    }
}

主要方法:

countDownLatch.countDown(); //减一操作;

countDownLatch.await();// 等待计数器归零

await 等待计数器归零,就唤醒,再继续向下运行

8.2、CyclicBarrier


加法计数器

package com.kuang.add;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierDemo {
    public static void main(String[] args) {
        //集齐7颗龙珠召唤神龙
        //召唤龙珠的线程
        CyclicBarrier cyclicBarrier = new CyclicBarrier(7,()->{
            System.out.println("召唤神龙成功");
        });
        for (int i = 1; i <= 7; i++) {
            final int temp=i;
            //lambda表达式操作到i吗?
            new Thread(()->{
                System.out.println(Thread.currentThread().getName()+"收集"+temp+"个龙珠");
                try {
                    cyclicBarrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }
}

8.3、Semaphore


抢车位

6车–3个停车位置

package com.kuang.add;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
public class SemaphoreDemo {
    public static void main(String[] args) {
        //线程数量:停车位!限流
        Semaphore semaphore = new Semaphore(3);
        for (int i = 1; i <= 6; i++) {
            new Thread(()->{
                //acquire()
                try {
                    semaphore.acquire();//获得,如果满了,会等待被释放为止
                    System.out.println(Thread.currentThread().getName()+"抢到车位");
                    TimeUnit.SECONDS.sleep(2);
                    System.out.println(Thread.currentThread().getName()+"离开车位");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    semaphore.release();//释放
                }
            },String.valueOf(i)).start();
        }
    }
}

原理:

semaphore.acquire();//获得资源,如果资源已经使用完了,就等待资源释放后再进行使用!

semaphore.release();//释放,会将当前的信号量释放+1,然后唤醒等待的线程!

作用: 多个共享资源互斥的使用! 并发限流,控制最大的线程数!

9、读写锁



package com.kuang.rw;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
//ReadWriteLock
public class ReadWriteLockDemo {
    public static void main(String[] args) {
        MyCache myCatch = new MyCache();
        //写入
        for (int i = 0; i <= 5; i++) {
            final int temp = i;
            new Thread(()->{
                myCatch.put(temp+"",temp+"");
            },String.valueOf(i)).start();
        }
        //读取
        for (int i = 0; i <=5; i++) {
            final int temp = i;
            new Thread(()->{
                myCatch.get(temp+"");
            },String.valueOf(i)).start();
        }
    }
}
//    自定义缓存
class MyCache {
    private volatile Map<String,Object> map = new HashMap<>(0);
    //存
    public void put(String key,Object value) {
        System.out.println(Thread.currentThread().getName()+"写入"+value);
        map.put(key, value);
        System.out.println(Thread.currentThread().getName()+"写入成功");
    }
    //取
    public void get(String key) {
        System.out.println(Thread.currentThread().getName()+"读取"+key);
        Object o = map.get(key);
        System.out.println(Thread.currentThread().getName()+"读取成功");
    }
}
结果:
同时写的问题

所以如果我们不加锁的情况,多线程的读写会造成数据不可靠的问题。

我们也可以采用synchronized这种重量锁和轻量锁 lock去保证数据的可靠。

但是这次我们采用更细粒度的锁:ReadWriteLock 读写锁来保证

package com.kuang.rw;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
 * 独占锁(写锁) 一次只能被一个线程占有
 * 共享锁(读锁) 可以同时被多个线程占有
 * ReadWriteLock
 * 读--读 可以共存
 * 读--写 不能共存
 * 写--写 不能共存
 */
public class ReadWriteLockDemo {
    public static void main(String[] args) {
//        MyCache myCatch = new MyCache();
        MyCacheLock myCatch = new MyCacheLock();
        //写入
        for (int i = 0; i <= 5; i++) {
            final int temp = i;
            new Thread(()->{
                myCatch.put(temp+"",temp+"");
            },String.valueOf(i)).start();
        }
        //读取
        for (int i = 0; i <=5; i++) {
            final int temp = i;
            new Thread(()->{
                myCatch.get(temp+"");
            },String.valueOf(i)).start();
        }
    }
}
//    自定义缓存
class MyCache {
    private volatile Map<String,Object> map = new HashMap<>(0);
    //存
    public void put(String key,Object value) {
        System.out.println(Thread.currentThread().getName()+"写入"+value);
        map.put(key, value);
        System.out.println(Thread.currentThread().getName()+"写入成功");
    }
    //取
    public void get(String key) {
        System.out.println(Thread.currentThread().getName()+"读取"+key);
        Object o = map.get(key);
        System.out.println(Thread.currentThread().getName()+"读取成功");
    }
}
//加锁的
class MyCacheLock {
    private volatile Map<String,Object> map = new HashMap<>(0);
    //读写锁:更加细粒度的控制
    private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
//    private Lock lock=new ReentrantLock();
    //存,写的时候,只希望同时有一个线程写
    public void put(String key,Object value) {
        readWriteLock.writeLock().lock();
        try {
            System.out.println(Thread.currentThread().getName()+"写入"+key);
            map.put(key, value);
            System.out.println(Thread.currentThread().getName()+"写入成功");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            readWriteLock.writeLock().unlock();
        }
    }
    //取,读,所有的人都可以读
    public void get(String key) {
        readWriteLock.readLock().lock();
        try {
            System.out.println(Thread.currentThread().getName()+"读取"+key);
            Object o = map.get(key);
            System.out.println(Thread.currentThread().getName()+"读取成功");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            readWriteLock.readLock().unlock();
        }
    }
}
结果:
只有一个写

独占锁(写锁) 一次只能被一个线程占有

共享锁(读锁) 可以同时被多个线程占有

10、阻塞队列



BlockingQueue 是Collection的一个子类


什么情况下我们会使用阻塞队列

多线程并发处理、线程池

学会使用队列

添加、移除

四组API

1.抛出异常

2.不会抛出异常

3.阻塞等待

4.超时等待

抛出异常

package com.kuang.bq;
import java.util.concurrent.ArrayBlockingQueue;
public class Test {
    public static void main(String[] args) {
       test1();
    }
    /**
     * 抛出异常
     */
    public static void test1(){
        //队列的大小
        ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
        System.out.println(blockingQueue.add("a"));
        System.out.println(blockingQueue.add("b"));
        System.out.println(blockingQueue.add("c"));
        //抛出异常 IllegalStateException
//        System.out.println(blockingQueue.add("c"));
        System.out.println(blockingQueue.element());//查看队首元素是谁
        System.out.println("======================");
        System.out.println(blockingQueue.remove());
        System.out.println(blockingQueue.remove());
        System.out.println(blockingQueue.remove());
        //抛出异常 NoSuchElementException
        System.out.println(blockingQueue.remove());
    }
}

不抛出异常

   /**
     *有返回值,没有异常
     */
    public static void test2(){
        //队列的大小
        ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
        System.out.println(blockingQueue.offer("a"));
        System.out.println(blockingQueue.offer("b"));
        System.out.println(blockingQueue.offer("c"));
//        System.out.println(blockingQueue.offer("d"));//false 不抛出异常
      System.out.println(blockingQueue.peek());//查看队首元素是谁
        System.out.println("======================");
        System.out.println(blockingQueue.poll());
        System.out.println(blockingQueue.poll());
        System.out.println(blockingQueue.poll());
        System.out.println(blockingQueue.poll());//null 不抛出异常
    }

等待,阻塞(一直阻塞)

  /**
     * 等待,阻塞(一直阻塞)
     */
    public static void test3() throws InterruptedException {
        //队列的大小
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue<>(3);
        arrayBlockingQueue.put("a");
        arrayBlockingQueue.put("b");
        arrayBlockingQueue.put("c");
//        arrayBlockingQueue.put("d");队列没有位置,一直阻塞
        System.out.println("=============");
        //队列移除顺序
        System.out.println(arrayBlockingQueue.take());
        System.out.println(arrayBlockingQueue.take());
        System.out.println(arrayBlockingQueue.take());
        System.out.println(arrayBlockingQueue.take());//没有这个元素,一直阻塞
    }

等待,阻塞(等待超时)

  /**
     * 等待,阻塞(等待超时)
     */
    public static void test4() throws InterruptedException {
        //队列的大小
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue<>(3);
        System.out.println(arrayBlockingQueue.offer("a"));
        System.out.println(arrayBlockingQueue.offer("b"));
        System.out.println(arrayBlockingQueue.offer("c"));
        //等待超过两秒退出
        arrayBlockingQueue.offer("d", 2, TimeUnit.SECONDS);
        System.out.println(arrayBlockingQueue.poll());
        System.out.println(arrayBlockingQueue.poll());
        //等待超过两秒就退出
        System.out.println(arrayBlockingQueue.poll(2, TimeUnit.SECONDS));
    }

SynchronousQueue 同步队列

同步队列 没有容量,也可以视为容量为1的队列;

put方法 和 take方法;

package com.kuang.bq;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
/**
 * 同步队列 没有容量,也可以视为容量为1的队列;
 * Synchronized 和 其他的BlockingQueue 不一样 它不存储元素;
 *
 * put了一个元素,就必须从里面先take出来,否则不能再put进去值!
 */
public class SynchronousQueueDemo {
    public static void main(String[] args) {
        BlockingQueue<String> synchronousQueue = new java.util.concurrent.SynchronousQueue<>();
        // 往queue中添加元素
        new Thread(() -> {
            try {
                System.out.println(Thread.currentThread().getName() + " put 1");
                synchronousQueue.put("1");
                System.out.println(Thread.currentThread().getName() + " put 2");
                synchronousQueue.put("2");
                System.out.println(Thread.currentThread().getName() + " put 3");
                synchronousQueue.put("3");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"T1").start();
        // 取出元素
        new Thread(()-> {
            try {
                TimeUnit.SECONDS.sleep(3);
                System.out.println(Thread.currentThread().getName() + " take " + synchronousQueue.take());
                TimeUnit.SECONDS.sleep(3);
                System.out.println(Thread.currentThread().getName() + " take " + synchronousQueue.take());
                TimeUnit.SECONDS.sleep(3);
                System.out.println(Thread.currentThread().getName() + " take " + synchronousQueue.take());
            }catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"T2").start();
    }
}

结果

T1 put 1
T2 take 1
T1 put 2
T2 take 2
T1 put 3
T2 take 3

11、线程池(重点)

线程池:三大方式、七大参数、四种拒绝策略

池化技术

程序的运行,本质:占用系统的资源!我们需要去优化资源的使用 ===> 池化技术

线程池、JDBC的连接池、内存池、对象池 等等。。。。

池化技术:事先准备好一些资源,如果有人要用,就来我这里拿,用完之后还给我,以此来提高效率。
线程池的好处

1.降低资源的消耗

2.提高响应速度

3.方便管理

线程可以复用,可以控制最大并发量,管理线程
线程池:三大方法


ExecutorService threadPool = Executors.newSingleThreadExecutor();//单个线程

ExecutorService threadPool2 = Executors.newFixedThreadPool(5); //创建一个固定的线程池的大小

ExecutorService threadPool3 = Executors.newCachedThreadPool(); //可伸缩的

package com.kuang.pool;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
//工具类 Executors 三大方法;
//使用了线程池之后,使用线程池创建线程
public class Demo01 {
    public static void main(String[] args) {
//        ExecutorService threadPool = Executors.newSingleThreadExecutor();//单个线程
//        ExecutorService threadPool = Executors.newFixedThreadPool(5); //创建一个固定的线程池的大小
        ExecutorService threadPool = Executors.newCachedThreadPool(); //可伸缩的
        //线程池用完必须要关闭线程池
        try {
            for (int i = 1; i <=10 ; i++) {
                //通过线程池创建线程
                threadPool.execute(()->{
                    System.out.println(Thread.currentThread().getName()+ " ok");
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            threadPool.shutdown();
        }
    }
}

// ExecutorService threadPool = Executors.newSingleThreadExecutor();//单个线程

pool-1-thread-1 ok
pool-1-thread-1 ok
pool-1-thread-1 ok
pool-1-thread-1 ok
pool-1-thread-1 ok
pool-1-thread-1 ok
pool-1-thread-1 ok
pool-1-thread-1 ok
pool-1-thread-1 ok
pool-1-thread-1 ok

// ExecutorService threadPool = Executors.newFixedThreadPool(5);//创建一个固定大小得线程池

pool-1-thread-2 ok
pool-1-thread-2 ok
pool-1-thread-2 ok
pool-1-thread-2 ok
pool-1-thread-2 ok
pool-1-thread-4 ok
pool-1-thread-3 ok
pool-1-thread-1 ok
pool-1-thread-2 ok
pool-1-thread-5 ok

ExecutorService threadPool = Executors.newCachedThreadPool();//可伸缩,线程数可变

pool-1-thread-3 ok
pool-1-thread-4 ok
pool-1-thread-1 ok
pool-1-thread-5 ok
pool-1-thread-2 ok
pool-1-thread-6 ok
pool-1-thread-8 ok
pool-1-thread-9 ok
pool-1-thread-10 ok
pool-1-thread-7 ok

7大参数

newSingleThreadExecutor()源码分析

 /**
     * Creates an Executor that uses a single worker thread operating
     * off an unbounded queue. (Note however that if this single
     * thread terminates due to a failure during execution prior to
     * shutdown, a new one will take its place if needed to execute
     * subsequent tasks.)  Tasks are guaranteed to execute
     * sequentially, and no more than one task will be active at any
     * given time. Unlike the otherwise equivalent
     * {@code newFixedThreadPool(1)} the returned executor is
     * guaranteed not to be reconfigurable to use additional threads.
     *
     * @return the newly created single-threaded Executor
     */
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

newFixedThreadPool()源码分析

 /**
     * Creates a thread pool that reuses a fixed number of threads
     * operating off a shared unbounded queue.  At any point, at most
     * {@code nThreads} threads will be active processing tasks.
     * If additional tasks are submitted when all threads are active,
     * they will wait in the queue until a thread is available.
     * If any thread terminates due to a failure during execution
     * prior to shutdown, a new one will take its place if needed to
     * execute subsequent tasks.  The threads in the pool will exist
     * until it is explicitly {@link ExecutorService#shutdown shutdown}.
     *
     * @param nThreads the number of threads in the pool
     * @return the newly created thread pool
     * @throws IllegalArgumentException if {@code nThreads <= 0}
     */
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

newCachedThreadPool()源码分析

 /**
     * Creates a thread pool that creates new threads as needed, but
     * will reuse previously constructed threads when they are
     * available.  These pools will typically improve the performance
     * of programs that execute many short-lived asynchronous tasks.
     * Calls to {@code execute} will reuse previously constructed
     * threads if available. If no existing thread is available, a new
     * thread will be created and added to the pool. Threads that have
     * not been used for sixty seconds are terminated and removed from
     * the cache. Thus, a pool that remains idle for long enough will
     * not consume any resources. Note that pools with similar
     * properties but different details (for example, timeout parameters)
     * may be created using {@link ThreadPoolExecutor} constructors.
     *
     * @return the newly created thread pool
     */
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

可以看到,三个方法的底层都是new ThreadPoolExecutor

public ThreadPoolExecutor(int corePoolSize,  //核心线程池大小
                          int maximumPoolSize, //最大的线程池大小
                          long keepAliveTime,  //超时了没有人调用就会释放
                          TimeUnit unit, //超时单位
                          BlockingQueue<Runnable> workQueue, //阻塞队列
                          ThreadFactory threadFactory, //线程工厂 创建线程的 一般不用动
                          RejectedExecutionHandler handler //拒绝策略
                         ) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}


阿里巴巴的Java操作手册中明确说明:对于Integer.MAX_VALUE初始值较大,所以一般情况我们要使用底层的ThreadPoolExecutor来创建线程池。


手动创建一个线程池


模拟上面的银行业务

核心线程大小设为2:就是一直工作的窗口

最大线程设为5:就是银行最多的工作窗口

keepAliveTime设置为1小时:如果1小时都没有业务,就关闭窗口

候客区:new LinkedBlockingQueue(3),假设候客区最多3个人

线程工厂:就用默认的,Executors.defaultThreaFactory()

拒绝策略: 可以发现有4种拒绝策略,用默认的AbortPolicy()//银行满了,但是还有人进来,就不处理这个人,并抛出异常

package com.kuang.pool;
import java.util.concurrent.*;
public class PollDemo {
    public static void main(String[] args) {
        //自定义线程池!工作 ThreadPoolExecutor
        ExecutorService threadPool =new ThreadPoolExecutor(
                2,
                5,
                3,
                TimeUnit.SECONDS,
                new LinkedBlockingDeque<>(3),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy()//银行满了,还有人精力,不处理这个人,抛出异常
        );
        try {
            for (int i = 1; i <= 8; i++) {  // 1 5 6 7 8 9(RejectedExecutionException)
                threadPool.execute(() -> {
                    System.out.println(Thread.currentThread().getName() + "ok");
                });
            }
        }catch (Exception e) {
            e.printStackTrace();
        }
        finally {
            threadPool.shutdown();
        }
    }
}

4种拒绝策略

  1. new ThreadPoolExecutor.AbortPolicy(): //该拒绝策略为:银行满了,还有人进来,不处理这个人的,并抛出异常
    超出最大承载,就会抛出异常:队列容量大小+maxPoolSize
pool-1-thread-1ok
pool-1-thread-3ok
pool-1-thread-3ok
pool-1-thread-3ok
pool-1-thread-2ok
pool-1-thread-4ok
pool-1-thread-5ok
pool-1-thread-1ok
java.util.concurrent.RejectedExecutionException: Task com.kuang.pool.PollDemo$$Lambda$1/1096979270@7ba4f24f rejected from java.util.concurrent.ThreadPoolExecutor@3b9a45b3[Running, pool size = 5, active threads = 0, queued tasks = 0, completed tasks = 8]
  at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
  at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
  at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
  at com.kuang.pool.PollDemo.main(PollDemo.java:19)
Process finished with exit code 0
  1. new ThreadPoolExecutor.CallerRunsPolicy(): //该拒绝策略为:哪来的去哪里 main线程进行处理
pool-1-thread-1ok
pool-1-thread-4ok
mainok
pool-1-thread-3ok
pool-1-thread-4ok
pool-1-thread-2ok
pool-1-thread-3ok
pool-1-thread-1ok
pool-1-thread-5ok
Process finished with exit code 0
  1. new ThreadPoolExecutor.DiscardPolicy(): //该拒绝策略为:队列满了,丢掉任务,不会抛出异常。
pool-1-thread-1ok
pool-1-thread-4ok
pool-1-thread-3ok
pool-1-thread-2ok
pool-1-thread-4ok
pool-1-thread-3ok
pool-1-thread-5ok
pool-1-thread-1ok
Process finished with exit code 0
  1. new ThreadPoolExecutor.DiscardOldestPolicy(): //该拒绝策略为:队列满了,尝试去和最早的进程竞争,不会抛出异常
pool-1-thread-2ok
pool-1-thread-4ok
pool-1-thread-3ok
pool-1-thread-4ok
pool-1-thread-2ok
pool-1-thread-1ok
pool-1-thread-3ok
pool-1-thread-5ok
Process finished with exit code 0

小结和扩展

如何设置线程池的最大大小maximumPoolSize

了解CPU密集型I/O密集型

1、CPU密集型:电脑的核数是几核就选择几;选择maximunPoolSize的大小

    // 获取cpu 的核数
        int max = Runtime.getRuntime().availableProcessors();
        ExecutorService service =new ThreadPoolExecutor(
                2,
                max,
                3,
                TimeUnit.SECONDS,
                new LinkedBlockingDeque<>(3),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy()
        );

2、I/O密集型:

在程序中有15个大型任务,io十分占用资源;I/O密集型就是判断我们程序中十分耗I/O的线程数量,大约是最大I/O数的一倍到两倍之间。

相关文章
|
22小时前
|
安全 Java 调度
深入理解Java并发编程:线程安全与性能优化
【5月更文挑战第12天】 在现代软件开发中,多线程编程是提升应用程序性能和响应能力的关键手段之一。特别是在Java语言中,由于其内置的跨平台线程支持,开发者可以轻松地创建和管理线程。然而,随之而来的并发问题也不容小觑。本文将探讨Java并发编程的核心概念,包括线程安全策略、锁机制以及性能优化技巧。通过实例分析与性能比较,我们旨在为读者提供一套既确保线程安全又兼顾性能的编程指导。
|
2天前
|
安全 Java
深入理解Java并发编程:线程安全与性能优化
【5月更文挑战第11天】在Java并发编程中,线程安全和性能优化是两个重要的主题。本文将深入探讨这两个方面,包括线程安全的基本概念,如何实现线程安全,以及如何在保证线程安全的同时进行性能优化。我们将通过实例和代码片段来说明这些概念和技术。
2 0
|
2天前
|
Java 调度
Java并发编程:深入理解线程池
【5月更文挑战第11天】本文将深入探讨Java中的线程池,包括其基本概念、工作原理以及如何使用。我们将通过实例来解释线程池的优点,如提高性能和资源利用率,以及如何避免常见的并发问题。我们还将讨论Java中线程池的实现,包括Executor框架和ThreadPoolExecutor类,并展示如何创建和管理线程池。最后,我们将讨论线程池的一些高级特性,如任务调度、线程优先级和异常处理。
|
2天前
|
缓存 Java 数据库
Java并发编程学习11-任务执行演示
【5月更文挑战第4天】本篇将结合任务执行和 Executor 框架的基础知识,演示一些不同版本的任务执行Demo,并且每个版本都实现了不同程度的并发性。
20 4
Java并发编程学习11-任务执行演示
|
3天前
|
Java
【Java多线程】面试常考 —— JUC(java.util.concurrent) 的常见类
【Java多线程】面试常考 —— JUC(java.util.concurrent) 的常见类
13 0
|
4天前
|
缓存 Java 数据库
Java并发编程中的锁优化策略
【5月更文挑战第9天】 在高负载的多线程应用中,Java并发编程的高效性至关重要。本文将探讨几种常见的锁优化技术,旨在提高Java应用程序在并发环境下的性能。我们将从基本的synchronized关键字开始,逐步深入到更高效的Lock接口实现,以及Java 6引入的java.util.concurrent包中的高级工具类。文中还会介绍读写锁(ReadWriteLock)的概念和实现原理,并通过对比分析各自的优势和适用场景,为开发者提供实用的锁优化策略。
5 0
|
4天前
|
算法 安全 Java
深入探索Java中的并发编程:CAS机制的原理与应用
总之,CAS机制是一种用于并发编程的原子操作,它通过比较内存中的值和预期值来实现多线程下的数据同步和互斥,从而提供了高效的并发控制。它在Java中被广泛应用于实现线程安全的数据结构和算法。
19 0
|
SQL 缓存 监控
Java JUC 简介
本系列文章旨在介绍 Java 并发相关的知识,本文作为开篇主要介绍了 JDK 中常用的并发库(JUC)的使用方式, 后续的文章中我会自上而下地剖析了 JUC 中各个部门的实现原理,从直接下级框架 AbstractQueuedSynchronizer 也就是大家常说的 AQS,再到其中使用的 CAS, Wait,Park,最后到操作系统层面的 Mutex,Condition,希望通过这篇文章,大家能够对整个 Java 并发有一个清晰全面的认识,而且把这些内容串在一起你会发现它们本质上都是相通的。
|
3天前
|
Java 数据库
【Java多线程】对线程池的理解并模拟实现线程池
【Java多线程】对线程池的理解并模拟实现线程池
13 1
|
1天前
|
Java
Java一分钟:线程协作:wait(), notify(), notifyAll()
【5月更文挑战第11天】本文介绍了Java多线程编程中的`wait()`, `notify()`, `notifyAll()`方法,它们用于线程间通信和同步。这些方法在`synchronized`代码块中使用,控制线程执行和资源访问。文章讨论了常见问题,如死锁、未捕获异常、同步使用错误及通知错误,并提供了生产者-消费者模型的示例代码,强调理解并正确使用这些方法对实现线程协作的重要性。
10 3