牛逼!一文看懂 Java 并发编程在各主流框架中的应用

简介: Spring、Netty、Mybatis 等框架的代码中大量运用了 Java 多线程编程技巧。并发编程处理的恰当与否,将直接影响架构的性能。 本文通过对这些框架源码的分析,结合并发编程的常用技巧,来讲解多线程编程在这些主流框架中的应用。

本文选自 Doocs 开源社区旗下“源码猎人”项目,作者 AmyliaY。


项目将会持续更新,欢迎 Star 关注。


项目地址:https://github.com/doocs/source-code-hunter


Spring、Netty、Mybatis 等框架的代码中大量运用了 Java 多线程编程技巧。并发编程处理的恰当与否,将直接影响架构的性能。 本文通过对这些框架源码的分析,结合并发编程的常用技巧,来讲解多线程编程在这些主流框架中的应用


Java 内存模型


JVM 规范定义了 Java 内存模型来屏蔽掉各种操作系统、虚拟机实现厂商和硬件的内存访问差异,以确保 Java 程序在所有操作系统和平台上能够达到一致的内存访问效果。


工作内存和主内存


Java 内存模型规定所有的变量都存储在主内存中,每个线程都有自己独立的工作内存,工作内存保存了对应该线程使用的变量的主内存副本拷贝。 线程对这些变量的操作都在自己的工作内存中进行,不能直接操作主内存和其他工作内存中存储的变量或者变量副本。线程间的变量传递需通过主内存来完成,三者的关系如下图所示。


4.png


Java 内存操作协议


Java 内存模型定义了 8 种操作来完成主内存和工作内存的变量访问,具体如下。


5.png


read:把一个变量的值从主内存传输到线程的工作内存中,以便随后的 load 动作使用。

load:把从主内存中读取的变量值载入工作内存的变量副本中。

use:把工作内存中一个变量的值传递给 Java 虚拟机执行引擎。

assign:把从执行引擎接收到的变量的值赋值给工作内存中的变量。

store:把工作内存中一个变量的值传送到主内存中,以便随后的 write 操作。write:工作内存传递过来的变量值放入主内存中。

lock:把主内存的一个变量标识为某个线程独占的状态。

unlock:把主内存中 一个处于锁定状态的变量释放出来,被释放后的变量才可以被其他线程锁定。


内存模型三大特性


1、原子性


这个概念与事务中的原子性大概一致,表明此操作是不可分割,不可中断的,要么全部执行,要么全部不执行。Java 内存模型直接保证的原子性操作包括 read、load、use、assign、store、write、lock、unlock 这八个。


2、可见性


可见性是指当一个线程修改了共享变量的值,其他线程能够立即得知这个修改。 Java 内存模型是通过在变量修改后将新值同步回主内存,在变量读取前从主内存刷新变量值这种依赖主内存作为传递媒介的方式来实现可见性的,无论是普通变量还是 volatile 变量都是如此,普通变量与 volatile 变量的区别是,volatile 的特殊规则保证了新值能立即同步到主内存,以及每次使用前立即从主内存刷新。因此,可以说 volatile 保证了多线程操作时变量的可见性,而普通变量则不能保证这一点。除了 volatile 外,synchronized 也提供了可见性,synchronized 的可见性是由 “对一个变量执行 unlock 操作 之前,必须先把此变量同步回主内存中(执行 store、write 操作)” 这条规则获得。


3、有序性


单线程环境下,程序会 “有序的”执行,即:线程内表现为串行语义。但是在多线程环境下,由于指令重排,并发执行的正确性会受到影响。在 Java 中使用 volatile 和 synchronized 关键字,可以保证多线程执行的有序性。volatile 通过加入内存屏障指令来禁止内存的重排序。synchronized 通过加锁,保证同一时刻只有一个线程来执行同步代码。


volatile 的应用


打开 NioEventLoop 的代码中,有一个控制 IO 操作 和 其他任务运行比例的,用 volatile 修饰的 int 类型字段 ioRatio,代码如下。


private volatile int ioRatio = 50;


这里为什么要用 volatile 修饰呢?我们首先对 volatile 关键字进行说明,然后再结合 Netty 的代码进行分析。


关键字 volatile 是 Java 提供的最轻量级的同步机制,Java 内存模型对 volatile 专门定义了一些特殊的访问规则。下面我们就看它的规则。当一个变量被 volatile 修饰后,它将具备以下两种特性。


线程可见性:当一个线程修改了被 volatile 修饰的变量后,无论是否加锁,其他线程都可以立即看到最新的修改,而普通变量却做不到这点。禁止指令重排序优化:普通的变量仅仅保证在该方法的执行过程中所有依赖赋值结果的地方都能获取正确的结果,而不能保证变量赋值操作的顺序与程序代码的执行顺序一致。举个简单的例子说明下指令重排序优化问题,代码如下。


public class ThreadStopExample {    private static boolean stop;    public static void main(String[] args) throws InterruptedException {        Thread workThread = new Thread(new Runnable() {            public void run() {                int i= 0;                while (!stop) {                    i++;                    try{                        TimeUnit.SECONDS.sleep(1);                    } catch (InterruptedException e) {                        e.printStackTrace();                    }                }            }        });        workThread.start();        TimeUnit.SECONDS.sleep(3);        stop = true;    }}


我们预期程序会在 3s 后停止,但是实际上它会一直执行下去,原因就是虚拟机对代码进行了指令重排序和优化,优化后的指令如下。


if (!stop)    While(true)        ......


workThread 线程在执行重排序后的代码时,是无法发现变量 stop 被其它线程修改的,因此无法停止运行。要解决这个问题,只要将 stop 前增加 volatile 修饰符即可。volatile 解决了如下两个问题。第一,主线程对 stop 的修改在 workThread 线程 中可见,也就是说 workThread 线程 立即看到了其他线程对于 stop 变量 的修改。第二,禁止指令重排序,防止因为重排序导致的并发访问逻辑混乱。


一些人认为使用 volatile 可以代替传统锁,提升并发性能,这个认识是错误的。volatile 仅仅解决了可见性的问题,但是它并不能保证互斥性,也就是说多个线程并发修改某个变量时,依旧会产生多线程问题。因此,不能靠 volatile 来完全替代传统的锁。根据经验总结,volatile 最适用的场景是 “ 一个线程写,其他线程读 ”,如果有多个线程并发写操作,仍然需要使用锁或者线程安全的容器或者原子变量来代替。下面我们继续对 Netty 的源码做分析。上面讲到了 ioRatio 被定义成 volatile,下面看看代码为什么要这样定义。


final long ioTime = System.nanoTime() - ioStartTime;    runAllTasks(ioTime * (100 - ioRatio) / ioRatio);


通过代码分析我们发现,在 NioEventLoop 线程 中,ioRatio 并没有被修改,它是只读操作。既然没有修改,为什么要定义成 volatile 呢?继续看代码,我们发现 NioEventLoop 提供了重新设置 IO 执行时间比例的公共方法。


public void setIoRatio(int ioRatio) {        if (ioRatio <= 0 || ioRatio > 100) {            throw new IllegalArgumentException("ioRatio: " + ioRatio + " (expected: 0 < ioRatio <= 100)");        }        this.ioRatio = ioRatio;    }


首先,NioEventLoop 线程 没有调用该 set 方法,说明调整 IO 执行时间比例 是外部发起的操作,通常是由业务的线程调用该方法,重新设置该参数。这样就形成了一个线程写、一个线程读。根据前面针对 volatile 的应用总结,此时可以使用 volatile 来代替传统的 synchronized 关键字,以提升并发访问的性能。


ThreadLocal 的应用及源码解析


ThreadLocal 又称为线程本地存储区(Thread Local Storage,简称为 TLS),每个线程都有自己的私有的本地存储区域,不同线程之间彼此不能访问对方的 TLS 区域。使用 ThreadLocal 变量 的 set(T value) 方法 可以将数据存入该线程本地存储区,使用 get() 方法 可以获取到之前存入的值。


ThreadLocal 的常见应用


不使用 ThreadLocal。


public class SessionBean {    public static class Session {        private String id;        public String getId() {            return id;        }        public void setId(String id) {            this.id = id;        }    }    public Session createSession() {        return new Session();    }    public void setId(Session session, String id) {        session.setId(id);    }    public String getId(Session session) {        return session.getId();    }    public static void main(String[] args) {    //没有使用ThreadLocal,在方法间共享session需要进行session在方法间的传递        new Thread(() -> {            SessionBean bean = new SessionBean();            Session session = bean.createSession();            bean.setId(session, "susan");            System.out.println(bean.getId(session));        }).start();    }}


上述代码中,session 需要在方法间传递才可以修改和读取,保证线程中各方法操作的是一个。下面看一下使用 ThreadLocal 的代码。


public class SessionBean {//定义一个静态ThreadLocal变量session,就能够保证各个线程有自己的一份,并且方法可以方便获取,不用传递    private static ThreadLocal<Session> session = new ThreadLocal<>();    public static class Session {        private String id;        public String getId() {            return id;        }        public void setId(String id) {            this.id = id;        }    }    public void createSession() {        session.set(new Session());    }    public void setId(String id) {        session.get().setId(id);    }    public String getId() {        return session.get().getId();    }    public static void main(String[] args) {        new Thread(() -> {            SessionBean bean = new SessionBean();            bean.createSession();            bean.setId("susan");            System.out.println(bean.getId());        }).start();    }}


在方法的内部实现中,直接可以通过 session.get() 获取到当前线程的 session,省掉了参数在方法间传递的环节。


ThreadLocal 的实现原理


一般,类属性中的数据是多个线程共享的,但 ThreadLocal 类型的数据 声明为类属性,却可以为每一个使用它(通过 set(T value)方法)的线程存储线程私有的数据,通过其源码我们可以发现其中的原理。


public class ThreadLocal<T> {    /**     * 下面的 getMap()方法 传入当前线程,获得一个ThreadLocalMap对象,说明每一个线程维护了     * 自己的一个 map,保证读取出来的value是自己线程的。     *     * ThreadLocalMap 是ThreadLocal静态内部类,存储value的键值就是ThreadLocal本身。     *     * 因此可以断定,每个线程维护一个ThreadLocalMap的键值对映射Map。不同线程的Map的 key值 是一样的,     * 都是ThreadLocal,但 value 是不同的。     */    public T get() {        Thread t = Thread.currentThread();        ThreadLocalMap map = getMap(t);        if (map != null) {            ThreadLocalMap.Entry e = map.getEntry(this);            if (e != null) {                @SuppressWarnings("unchecked")                T result = (T)e.value;                return result;            }        }        return setInitialValue();    }    public void set(T value) {        Thread t = Thread.currentThread();        ThreadLocalMap map = getMap(t);        if (map != null)            map.set(this, value);        else            createMap(t, value);    }}


ThreadLocal 在 Spring 中的使用


Spring 事务处理的设计与实现中大量使用了 ThreadLocal 类,比如,TransactionSynchronizationManager 维护了一系列的 ThreadLocal 变量,用于存储线程私有的 事务属性及资源。源码如下。


/** * 管理每个线程的资源和事务同步的中心帮助程序。供资源管理代码使用,但不供典型应用程序代码使用。 * * 资源管理代码应该检查线程绑定的资源,如,JDBC连接 或 Hibernate Sessions。 * 此类代码通常不应该将资源绑定到线程,因为这是事务管理器的职责。另一个选项是, * 如果事务同步处于活动状态,则在首次使用时延迟绑定,以执行跨任意数量资源的事务。 */public abstract class TransactionSynchronizationManager {    /**     *  一般是一个线程持有一个 独立的事务,以相互隔离地处理各自的事务。     *  所以这里使用了很多 ThreadLocal对象,为每个线程绑定 对应的事务属性及资源,     *  以便后续使用时能直接获取。     */    private static final ThreadLocal<Map<Object, Object>> resources =            new NamedThreadLocal<Map<Object, Object>>("Transactional resources");    private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations =            new NamedThreadLocal<Set<TransactionSynchronization>>("Transaction synchronizations");    private static final ThreadLocal<String> currentTransactionName =            new NamedThreadLocal<String>("Current transaction name");    private static final ThreadLocal<Boolean> currentTransactionReadOnly =            new NamedThreadLocal<Boolean>("Current transaction read-only status");    private static final ThreadLocal<Integer> currentTransactionIsolationLevel =            new NamedThreadLocal<Integer>("Current transaction isolation level");    private static final ThreadLocal<Boolean> actualTransactionActive =            new NamedThreadLocal<Boolean>("Actual transaction active");    /**     * 为当前线程 绑定 对应的resource资源     */    public static void bindResource(Object key, Object value) throws IllegalStateException {        Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);        Assert.notNull(value, "Value must not be null");        Map<Object, Object> map = resources.get();        // 如果当前线程的 resources中,绑定的数据map为空,则为 resources 绑定 map        if (map == null) {            map = new HashMap<Object, Object>();            resources.set(map);        }        Object oldValue = map.put(actualKey, value);        if (oldValue instanceof ResourceHolder && ((ResourceHolder) oldValue).isVoid()) {            oldValue = null;        }        if (oldValue != null) {            throw new IllegalStateException("Already value [" + oldValue + "] for key [" +                    actualKey + "] bound to thread [" + Thread.currentThread().getName() + "]");        }        if (logger.isTraceEnabled()) {            logger.trace("Bound value [" + value + "] for key [" + actualKey + "] to thread [" +                    Thread.currentThread().getName() + "]");        }    }    /**     * 返回当前线程绑定的所有资源     */    public static Map<Object, Object> getResourceMap() {        Map<Object, Object> map = resources.get();        return (map != null ? Collections.unmodifiableMap(map) : Collections.emptyMap());    }}


ThreadLocal 在 Mybatis 中的使用


Mybatis 的 SqlSession 对象 也是各线程私有的资源,所以对其的管理也使用到了 ThreadLocal 类。源码如下。


public class SqlSessionManager implements SqlSessionFactory, SqlSession {  private final ThreadLocal<SqlSession> localSqlSession = new ThreadLocal<>();  public void startManagedSession() {    this.localSqlSession.set(openSession());  }  public void startManagedSession(boolean autoCommit) {    this.localSqlSession.set(openSession(autoCommit));  }  public void startManagedSession(Connection connection) {    this.localSqlSession.set(openSession(connection));  }  public void startManagedSession(TransactionIsolationLevel level) {    this.localSqlSession.set(openSession(level));  }  public void startManagedSession(ExecutorType execType) {    this.localSqlSession.set(openSession(execType));  }  public void startManagedSession(ExecutorType execType, boolean autoCommit) {    this.localSqlSession.set(openSession(execType, autoCommit));  }  public void startManagedSession(ExecutorType execType, TransactionIsolationLevel level) {    this.localSqlSession.set(openSession(execType, level));  }  public void startManagedSession(ExecutorType execType, Connection connection) {    this.localSqlSession.set(openSession(execType, connection));  }  public boolean isManagedSessionStarted() {    return this.localSqlSession.get() != null;  }  @Override  public Connection getConnection() {    final SqlSession sqlSession = localSqlSession.get();    if (sqlSession == null) {      throw new SqlSessionException("Error:  Cannot get connection.  No managed session is started.");    }    return sqlSession.getConnection();  }  @Override  public void clearCache() {    final SqlSession sqlSession = localSqlSession.get();    if (sqlSession == null) {      throw new SqlSessionException("Error:  Cannot clear the cache.  No managed session is started.");    }    sqlSession.clearCache();  }  @Override  public void commit() {    final SqlSession sqlSession = localSqlSession.get();    if (sqlSession == null) {      throw new SqlSessionException("Error:  Cannot commit.  No managed session is started.");    }    sqlSession.commit();  }  @Override  public void commit(boolean force) {    final SqlSession sqlSession = localSqlSession.get();    if (sqlSession == null) {      throw new SqlSessionException("Error:  Cannot commit.  No managed session is started.");    }    sqlSession.commit(force);  }  @Override  public void rollback() {    final SqlSession sqlSession = localSqlSession.get();    if (sqlSession == null) {      throw new SqlSessionException("Error:  Cannot rollback.  No managed session is started.");    }    sqlSession.rollback();  }  @Override  public void rollback(boolean force) {    final SqlSession sqlSession = localSqlSession.get();    if (sqlSession == null) {      throw new SqlSessionException("Error:  Cannot rollback.  No managed session is started.");    }    sqlSession.rollback(force);  }  @Override  public List<BatchResult> flushStatements() {    final SqlSession sqlSession = localSqlSession.get();    if (sqlSession == null) {      throw new SqlSessionException("Error:  Cannot rollback.  No managed session is started.");    }    return sqlSession.flushStatements();  }  @Override  public void close() {    final SqlSession sqlSession = localSqlSession.get();    if (sqlSession == null) {      throw new SqlSessionException("Error:  Cannot close.  No managed session is started.");    }    try {      sqlSession.close();    } finally {      localSqlSession.set(null);    }  }}


J.U.C 包的实际应用


线程池 ThreadPoolExecutor


首先通过 ThreadPoolExecutor 的源码 看一下线程池的主要参数及方法。


public class ThreadPoolExecutor extends AbstractExecutorService {    /**     * 核心线程数     * 当向线程池提交一个任务时,若线程池已创建的线程数小于corePoolSize,即便此时存在空闲线程,     * 也会通过创建一个新线程来执行该任务,直到已创建的线程数大于或等于corePoolSize     */    private volatile int corePoolSize;    /**     * 最大线程数     * 当队列满了,且已创建的线程数小于maximumPoolSize,则线程池会创建新的线程来执行任务。     * 另外,对于无界队列,可忽略该参数     */    private volatile int maximumPoolSize;    /**     * 线程存活保持时间     * 当线程池中线程数 超出核心线程数,且线程的空闲时间也超过 keepAliveTime时,     * 那么这个线程就会被销毁,直到线程池中的线程数小于等于核心线程数     */    private volatile long keepAliveTime;    /**     * 任务队列     * 用于传输和保存等待执行任务的阻塞队列     */    private final BlockingQueue<Runnable> workQueue;    /**     * 线程工厂     * 用于创建新线程。threadFactory 创建的线程也是采用 new Thread() 方式,threadFactory     * 创建的线程名都具有统一的风格:pool-m-thread-n(m为线程池的编号,n为线程池中线程的编号     */    private volatile ThreadFactory threadFactory;    /**     * 线程饱和策略     * 当线程池和队列都满了,再加入的线程会执行此策略     */    private volatile RejectedExecutionHandler handler;    /**     * 构造方法提供了多种重载,但实际上都使用了最后一个重载 完成了实例化     */    public ThreadPoolExecutor(int corePoolSize,                              int maximumPoolSize,                              long keepAliveTime,                              TimeUnit unit,                              BlockingQueue<Runnable> workQueue) {        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,             Executors.defaultThreadFactory(), defaultHandler);    }    public ThreadPoolExecutor(int corePoolSize,                              int maximumPoolSize,                              long keepAliveTime,                              TimeUnit unit,                              BlockingQueue<Runnable> workQueue,                              ThreadFactory threadFactory) {        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,             threadFactory, defaultHandler);    }    public ThreadPoolExecutor(int corePoolSize,                              int maximumPoolSize,                              long keepAliveTime,                              TimeUnit unit,                              BlockingQueue<Runnable> workQueue,                              RejectedExecutionHandler handler) {        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,             Executors.defaultThreadFactory(), handler);    }    public ThreadPoolExecutor(int corePoolSize,                              int maximumPoolSize,                              long keepAliveTime,                              TimeUnit unit,                              BlockingQueue<Runnable> workQueue,                              ThreadFactory threadFactory,                              RejectedExecutionHandler handler) {        if (corePoolSize < 0 ||            maximumPoolSize <= 0 ||            maximumPoolSize < corePoolSize ||            keepAliveTime < 0)            throw new IllegalArgumentException();        if (workQueue == null || threadFactory == null || handler == null)            throw new NullPointerException();        this.corePoolSize = corePoolSize;        this.maximumPoolSize = maximumPoolSize;        this.workQueue = workQueue;        this.keepAliveTime = unit.toNanos(keepAliveTime);        this.threadFactory = threadFactory;        this.handler = handler;    }    /**     * 执行一个任务,但没有返回值     */    public void execute(Runnable command) {        if (command == null)            throw new NullPointerException();        int c = ctl.get();        if (workerCountOf(c) < corePoolSize) {            if (addWorker(command, true))                return;            c = ctl.get();        }        if (isRunning(c) && workQueue.offer(command)) {            int recheck = ctl.get();            if (! isRunning(recheck) && remove(command))                reject(command);            else if (workerCountOf(recheck) == 0)                addWorker(null, false);        }        else if (!addWorker(command, false))            reject(command);    }    /**     * 提交一个线程任务,有返回值。该方法继承自其父类 AbstractExecutorService,有多种重载,这是最常用的一个。     * 通过future.get()获取返回值(阻塞直到任务执行完)     */    public <T> Future<T> submit(Callable<T> task) {        if (task == null) throw new NullPointerException();        RunnableFuture<T> ftask = newTaskFor(task);        execute(ftask);        return ftask;    }    /**     * 关闭线程池,不再接收新的任务,但会把已有的任务执行完     */    public void shutdown() {        final ReentrantLock mainLock = this.mainLock;        mainLock.lock();        try {            checkShutdownAccess();            advanceRunState(SHUTDOWN);            interruptIdleWorkers();            onShutdown(); // hook for ScheduledThreadPoolExecutor        } finally {            mainLock.unlock();        }        tryTerminate();    }    /**     * 立即关闭线程池,已有的任务也会被抛弃     */    public List<Runnable> shutdownNow() {        List<Runnable> tasks;        final ReentrantLock mainLock = this.mainLock;        mainLock.lock();        try {            checkShutdownAccess();            advanceRunState(STOP);            interruptWorkers();            tasks = drainQueue();        } finally {            mainLock.unlock();        }        tryTerminate();        return tasks;    }    public boolean isShutdown() {        return ! isRunning(ctl.get());    }}


线程池执行流程,如下图所示。


6.png


Executors 提供的 4 种线程池


Executors 类 通过 ThreadPoolExecutor 封装了 4 种常用的线程池:CachedThreadPool,FixedThreadPool,ScheduledThreadPool 和 SingleThreadExecutor。其功能如下。


1.CachedThreadPool:用来创建一个几乎可以无限扩大的线程池(最大线程数为 Integer.MAX_VALUE),适用于执行大量短生命周期的异步任务。2.FixedThreadPool:创建一个固定大小的线程池,保证线程数可控,不会造成线程过多,导致系统负载更为严重。3.SingleThreadExecutor:创建一个单线程的线程池,可以保证任务按调用顺序执行。4.ScheduledThreadPool:适用于执行 延时 或者 周期性 任务。


如何配置线程池


CPU 密集型任务


尽量使用较小的线程池,一般为 CPU 核心数+1。因为 CPU 密集型任务 使得 CPU 使用率 很高,若开过多的线程数,会造成 CPU 过度切换。


IO 密集型任务


可以使用稍大的线程池,一般为 2*CPU 核心数。IO 密集型任务 CPU 使用率 并不高,因此可以让 CPU 在等待 IO 的时候有其他线程去处理别的任务,充分利用 CPU 时间。


线程池的实际应用


Tomcat 在分发 web 请求时使用了线程池来处理。


BlockingQueue


核心方法


public interface BlockingQueue<E> extends Queue<E> {    // 将给定元素设置到队列中,如果设置成功返回true, 否则返回false。如果是往限定了长度的队列中设置值,推荐使用offer()方法。    boolean add(E e);    // 将给定的元素设置到队列中,如果设置成功返回true, 否则返回false. e的值不能为空,否则抛出空指针异常。    boolean offer(E e);    // 将元素设置到队列中,如果队列中没有多余的空间,该方法会一直阻塞,直到队列中有多余的空间。    void put(E e) throws InterruptedException;    // 将给定元素在给定的时间内设置到队列中,如果设置成功返回true, 否则返回false.    boolean offer(E e, long timeout, TimeUnit unit)        throws InterruptedException;    // 从队列中获取值,如果队列中没有值,线程会一直阻塞,直到队列中有值,并且该方法取得了该值。    E take() throws InterruptedException;    // 在给定的时间里,从队列中获取值,时间到了直接调用普通的 poll()方法,为null则直接返回null。    E poll(long timeout, TimeUnit unit)        throws InterruptedException;    // 获取队列中剩余的空间。    int remainingCapacity();    // 从队列中移除指定的值。    boolean remove(Object o);    // 判断队列中是否拥有该值。    public boolean contains(Object o);    // 将队列中值,全部移除,并发设置到给定的集合中。    int drainTo(Collection<? super E> c);    // 指定最多数量限制将队列中值,全部移除,并发设置到给定的集合中。    int drainTo(Collection<? super E> c, int maxElements);}


主要实现类


ArrayBlockingQueue


基于数组的阻塞队列实现,在 ArrayBlockingQueue 内部,维护了一个定长数组,以便缓存队列中的数据对象,这是一个常用的阻塞队列,除了一个定长数组外,ArrayBlockingQueue 内部还保存着两个整形变量,分别标识着队列的头部和尾部在数组中的位置。


ArrayBlockingQueue 在生产者放入数据 和 消费者获取数据时,都是共用同一个锁对象,由此也意味着两者无法真正并行运行,这点尤其不同于 LinkedBlockingQueue。ArrayBlockingQueue 和 LinkedBlockingQueue 间还有一个明显的不同之处在于,前者在插入或删除元素时不会产生或销毁任何额外的对象实例,而后者则会生成一个额外的 Node 对象。这在长时间内需要高效并发地处理大批量数据的系统中,其对于 GC 的影响还是存在一定的区别。而在创建 ArrayBlockingQueue 时,我们还可以控制对象的内部锁是否采用公平锁,默认采用非公平锁。


LinkedBlockingQueue


基于链表的阻塞队列,同 ArrayListBlockingQueue 类似,其内部也维持着一个数据缓冲队列(该队列由一个链表构成),当生产者往队列中放入一个数据时,队列会从生产者手中获取数据,并缓存在队列内部,而生产者立即返回;只有当队列缓冲区达到最大值缓存容量时(LinkedBlockingQueue 可以通过构造函数指定该值),才会阻塞生产者队列,直到消费者从队列中消费掉一份数据,生产者线程会被唤醒,反之对于消费者这端的处理也基于同样的原理。而 LinkedBlockingQueue 之所以能够高效的处理并发数据,还因为其对于生产者端和消费者端分别采用了独立的锁来控制数据同步,这也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。


需要注意的是,如果构造一个 LinkedBlockingQueue 对象,而没有指定其容量大小,LinkedBlockingQueue 会默认一个类似无限大小的容量(Integer.MAX_VALUE),这样的话,如果生产者的速度一旦大于消费者的速度,也许还没有等到队列满阻塞产生,系统内存就有可能已被消耗殆尽了。


PriorityBlockingQueue


基于优先级的阻塞队列(优先级的判断通过构造函数传入的 Compator 对象来决定),但需要注意的是 PriorityBlockingQueue 并不会阻塞数据生产者,而只会在没有可消费的数据时,阻塞数据的消费者。因此使用的时候要特别注意,生产者生产数据的速度绝对不能快于消费者消费数据的速度,否则时间一长,会最终耗尽所有的可用堆内存空间。在实现 PriorityBlockingQueue 时,内部控制线程同步的锁采用的是公平锁。


CAS 指令和原子类(应用比较多的就是计数器)


互斥同步最主要的问题就是进行线程阻塞和唤醒所带来的性能的额外损耗,因此这种同步被称为阻塞同步,它属于一种悲观的并发策略,我们称之为悲观锁。随着硬件和操作系统指令集的发展和优化,产生了非阻塞同步,被称为乐观锁。简单地说,就是先进行操作,操作完成之后再判断操作是否成功,是否有并发问题,如果有则进行失败补偿,如果没有就算操作成功,这样就从根本上避免了同步锁的弊端。


目前,在 Java 中应用最广泛的非阻塞同步就是 CAS。从 JDK1.5 以后,可以使用 CAS 操作,该操作由 sun.misc.Unsafe 类里的 compareAndSwapInt() 和 compareAndSwapLong() 等方法实现。通常情况下 sun.misc.Unsafe 类 对于开发者是不可见的,因此,JDK 提供了很多 CAS 包装类 简化开发者的使用,如 AtomicInteger。使用 Java 自带的 Atomic 原子类,可以避免同步锁带来的并发访问性能降低的问题,减少犯错的机会。


全文完!


希望本文对大家有所帮助。如果感觉本文有帮助,有劳转发或点一下“在看”!让更多人收获知识!


推荐阅读


深挖 Redis 6.0 源码—— SDS

从 Spring 及 Mybatis 框架源码中学习设计模式——创建型

从 Spring 及 Mybatis 框架源码中学习设计模式——行为型

目录
相关文章
|
15天前
|
安全 Java 程序员
深入理解Java内存模型与并发编程####
本文旨在探讨Java内存模型(JMM)的复杂性及其对并发编程的影响,不同于传统的摘要形式,本文将以一个实际案例为引子,逐步揭示JMM的核心概念,包括原子性、可见性、有序性,以及这些特性在多线程环境下的具体表现。通过对比分析不同并发工具类的应用,如synchronized、volatile关键字、Lock接口及其实现等,本文将展示如何在实践中有效利用JMM来设计高效且安全的并发程序。最后,还将简要介绍Java 8及更高版本中引入的新特性,如StampedLock,以及它们如何进一步优化多线程编程模型。 ####
21 0
|
17天前
|
Java 程序员
Java编程中的异常处理:从基础到高级
在Java的世界中,异常处理是代码健壮性的守护神。本文将带你从异常的基本概念出发,逐步深入到高级用法,探索如何优雅地处理程序中的错误和异常情况。通过实际案例,我们将一起学习如何编写更可靠、更易于维护的Java代码。准备好了吗?让我们一起踏上这段旅程,解锁Java异常处理的秘密!
|
1天前
|
算法 Java 调度
java并发编程中Monitor里的waitSet和EntryList都是做什么的
在Java并发编程中,Monitor内部包含两个重要队列:等待集(Wait Set)和入口列表(Entry List)。Wait Set用于线程的条件等待和协作,线程调用`wait()`后进入此集合,通过`notify()`或`notifyAll()`唤醒。Entry List则管理锁的竞争,未能获取锁的线程在此排队,等待锁释放后重新竞争。理解两者区别有助于设计高效的多线程程序。 - **Wait Set**:线程调用`wait()`后进入,等待条件满足被唤醒,需重新竞争锁。 - **Entry List**:多个线程竞争锁时,未获锁的线程在此排队,等待锁释放后获取锁继续执行。
24 12
|
14天前
|
安全 算法 Java
Java多线程编程中的陷阱与最佳实践####
本文探讨了Java多线程编程中常见的陷阱,并介绍了如何通过最佳实践来避免这些问题。我们将从基础概念入手,逐步深入到具体的代码示例,帮助开发者更好地理解和应用多线程技术。无论是初学者还是有经验的开发者,都能从中获得有价值的见解和建议。 ####
|
14天前
|
Java 调度
Java中的多线程编程与并发控制
本文深入探讨了Java编程语言中多线程编程的基础知识和并发控制机制。文章首先介绍了多线程的基本概念,包括线程的定义、生命周期以及在Java中创建和管理线程的方法。接着,详细讲解了Java提供的同步机制,如synchronized关键字、wait()和notify()方法等,以及如何通过这些机制实现线程间的协调与通信。最后,本文还讨论了一些常见的并发问题,例如死锁、竞态条件等,并提供了相应的解决策略。
38 3
|
19天前
|
监控 Java 数据库连接
Java线程管理:守护线程与用户线程的区分与应用
在Java多线程编程中,线程可以分为守护线程(Daemon Thread)和用户线程(User Thread)。这两种线程在行为和用途上有着明显的区别,了解它们的差异对于编写高效、稳定的并发程序至关重要。
26 2
|
19天前
|
开发框架 安全 Java
Java 反射机制:动态编程的强大利器
Java反射机制允许程序在运行时检查类、接口、字段和方法的信息,并能操作对象。它提供了一种动态编程的方式,使得代码更加灵活,能够适应未知的或变化的需求,是开发框架和库的重要工具。
35 2
|
20天前
|
安全 Java 开发者
Java中的多线程编程:从基础到实践
本文深入探讨了Java多线程编程的核心概念和实践技巧,旨在帮助读者理解多线程的工作原理,掌握线程的创建、管理和同步机制。通过具体示例和最佳实践,本文展示了如何在Java应用中有效地利用多线程技术,提高程序性能和响应速度。
54 1
|
前端开发 Java 数据库连接
Java三大主流框架概述
Struts、Hibernate和Spring是我们Java开发中的常用关键,他们分别针对不同的应用场景给出最合适的解决方案。但你是否知道,这些知名框架最初是怎样产生的? 我们知道,传统的Java Web应用程序是采用JSP+Servlet+Javabean来实现的,这种模式实现了最基本的MVC分层,使的程序结构分为几层,有负责前台展示的JSP、负责流程逻辑控制的Servle
1982 0
|
6天前
|
安全 Java API
java如何请求接口然后终止某个线程
通过本文的介绍,您应该能够理解如何在Java中请求接口并根据返回结果终止某个线程。合理使用标志位或 `interrupt`方法可以确保线程的安全终止,而处理好网络请求中的各种异常情况,可以提高程序的稳定性和可靠性。
36 6
下一篇
DataWorks