Java 内存模型(二)

简介: 《读尽源码》

ThreadLocal 的实现原理

一般,类属性中的数据是多个线程共享的,但 ThreadLocal 类型的数据 声明为类属性,却可以为每一个使用它(通过 set(T value)方法)的线程存储 线程私有的数据,通过其源码我们可以发现其中的原理。

public class ThreadLocal<T> {
    /**
     * 下面的 getMap()方法 传入当前线程,获得一个ThreadLocalMap对象,说明每一个线程维护了
     * 自己的一个 map,保证读取出来的value是自己线程的。
     *
     * ThreadLocalMap 是ThreadLocal静态内部类,存储value的键值就是ThreadLocal本身。
     *
     * 因此可以断定,每个线程维护一个ThreadLocalMap的键值对映射Map。不同线程的Map的 key值 是一样的,
     * 都是ThreadLocal,但 value 是不同的。
     */
    public T get() {
        Thread t = Thread.currentThread();
        ThreadLocalMap map = getMap(t);
        if (map != null) {
            ThreadLocalMap.Entry e = map.getEntry(this);
            if (e != null) {
                @SuppressWarnings("unchecked")
                T result = (T)e.value;
                return result;
            }
        }
        return setInitialValue();
    }
    public void set(T value) {
        Thread t = Thread.currentThread();
        ThreadLocalMap map = getMap(t);
        if (map != null)
            map.set(this, value);
        else
            createMap(t, value);
    }
}Copy to clipboardErrorCopied

ThreadLocal 在 Spring 中的使用

Spring 事务处理的设计与实现中大量使用了 ThreadLocal 类,比如,TransactionSynchronizationManager 维护了一系列的 ThreadLocal 变量,用于存储线程私有的 事务属性及资源。源码如下。

/**
 * 管理每个线程的资源和事务同步的中心帮助程序。供资源管理代码使用,但不供典型应用程序代码使用。
 *
 * 资源管理代码应该检查线程绑定的资源,如,JDBC连接 或 Hibernate Sessions。
 * 此类代码通常不应该将资源绑定到线程,因为这是事务管理器的职责。另一个选项是,
 * 如果事务同步处于活动状态,则在首次使用时延迟绑定,以执行跨任意数量资源的事务。
 */
public abstract class TransactionSynchronizationManager {
    /**
     *  一般是一个线程持有一个 独立的事务,以相互隔离地处理各自的事务。
     *  所以这里使用了很多 ThreadLocal对象,为每个线程绑定 对应的事务属性及资源,
     *  以便后续使用时能直接获取。
     */
    private static final ThreadLocal<Map<Object, Object>> resources =
            new NamedThreadLocal<Map<Object, Object>>("Transactional resources");
    private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations =
            new NamedThreadLocal<Set<TransactionSynchronization>>("Transaction synchronizations");
    private static final ThreadLocal<String> currentTransactionName =
            new NamedThreadLocal<String>("Current transaction name");
    private static final ThreadLocal<Boolean> currentTransactionReadOnly =
            new NamedThreadLocal<Boolean>("Current transaction read-only status");
    private static final ThreadLocal<Integer> currentTransactionIsolationLevel =
            new NamedThreadLocal<Integer>("Current transaction isolation level");
    private static final ThreadLocal<Boolean> actualTransactionActive =
            new NamedThreadLocal<Boolean>("Actual transaction active");
    /**
     * 为当前线程 绑定 对应的resource资源
     */
    public static void bindResource(Object key, Object value) throws IllegalStateException {
        Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
        Assert.notNull(value, "Value must not be null");
        Map<Object, Object> map = resources.get();
        // 如果当前线程的 resources中,绑定的数据map为空,则为 resources 绑定 map
        if (map == null) {
            map = new HashMap<Object, Object>();
            resources.set(map);
        }
        Object oldValue = map.put(actualKey, value);
        if (oldValue instanceof ResourceHolder && ((ResourceHolder) oldValue).isVoid()) {
            oldValue = null;
        }
        if (oldValue != null) {
            throw new IllegalStateException("Already value [" + oldValue + "] for key [" +
                    actualKey + "] bound to thread [" + Thread.currentThread().getName() + "]");
        }
        if (logger.isTraceEnabled()) {
            logger.trace("Bound value [" + value + "] for key [" + actualKey + "] to thread [" +
                    Thread.currentThread().getName() + "]");
        }
    }
    /**
     * 返回当前线程绑定的所有资源
     */
    public static Map<Object, Object> getResourceMap() {
        Map<Object, Object> map = resources.get();
        return (map != null ? Collections.unmodifiableMap(map) : Collections.emptyMap());
    }
}Copy to clipboardErrorCopied

ThreadLocal 在 Mybatis 中的使用

Mybatis 的 SqlSession 对象 也是各线程私有的资源,所以对其的管理也使用到了 ThreadLocal 类。源码如下。

public class SqlSessionManager implements SqlSessionFactory, SqlSession {
  private final ThreadLocal<SqlSession> localSqlSession = new ThreadLocal<>();
  public void startManagedSession() {
    this.localSqlSession.set(openSession());
  }
  public void startManagedSession(boolean autoCommit) {
    this.localSqlSession.set(openSession(autoCommit));
  }
  public void startManagedSession(Connection connection) {
    this.localSqlSession.set(openSession(connection));
  }
  public void startManagedSession(TransactionIsolationLevel level) {
    this.localSqlSession.set(openSession(level));
  }
  public void startManagedSession(ExecutorType execType) {
    this.localSqlSession.set(openSession(execType));
  }
  public void startManagedSession(ExecutorType execType, boolean autoCommit) {
    this.localSqlSession.set(openSession(execType, autoCommit));
  }
  public void startManagedSession(ExecutorType execType, TransactionIsolationLevel level) {
    this.localSqlSession.set(openSession(execType, level));
  }
  public void startManagedSession(ExecutorType execType, Connection connection) {
    this.localSqlSession.set(openSession(execType, connection));
  }
  public boolean isManagedSessionStarted() {
    return this.localSqlSession.get() != null;
  }
  @Override
  public Connection getConnection() {
    final SqlSession sqlSession = localSqlSession.get();
    if (sqlSession == null) {
      throw new SqlSessionException("Error:  Cannot get connection.  No managed session is started.");
    }
    return sqlSession.getConnection();
  }
  @Override
  public void clearCache() {
    final SqlSession sqlSession = localSqlSession.get();
    if (sqlSession == null) {
      throw new SqlSessionException("Error:  Cannot clear the cache.  No managed session is started.");
    }
    sqlSession.clearCache();
  }
  @Override
  public void commit() {
    final SqlSession sqlSession = localSqlSession.get();
    if (sqlSession == null) {
      throw new SqlSessionException("Error:  Cannot commit.  No managed session is started.");
    }
    sqlSession.commit();
  }
  @Override
  public void commit(boolean force) {
    final SqlSession sqlSession = localSqlSession.get();
    if (sqlSession == null) {
      throw new SqlSessionException("Error:  Cannot commit.  No managed session is started.");
    }
    sqlSession.commit(force);
  }
  @Override
  public void rollback() {
    final SqlSession sqlSession = localSqlSession.get();
    if (sqlSession == null) {
      throw new SqlSessionException("Error:  Cannot rollback.  No managed session is started.");
    }
    sqlSession.rollback();
  }
  @Override
  public void rollback(boolean force) {
    final SqlSession sqlSession = localSqlSession.get();
    if (sqlSession == null) {
      throw new SqlSessionException("Error:  Cannot rollback.  No managed session is started.");
    }
    sqlSession.rollback(force);
  }
  @Override
  public List<BatchResult> flushStatements() {
    final SqlSession sqlSession = localSqlSession.get();
    if (sqlSession == null) {
      throw new SqlSessionException("Error:  Cannot rollback.  No managed session is started.");
    }
    return sqlSession.flushStatements();
  }
  @Override
  public void close() {
    final SqlSession sqlSession = localSqlSession.get();
    if (sqlSession == null) {
      throw new SqlSessionException("Error:  Cannot close.  No managed session is started.");
    }
    try {
      sqlSession.close();
    } finally {
      localSqlSession.set(null);
    }
  }
}Copy to clipboardErrorCopied

J.U.C 包的实际应用

线程池 ThreadPoolExecutor

首先通过 ThreadPoolExecutor 的源码 看一下线程池的主要参数及方法。

public class ThreadPoolExecutor extends AbstractExecutorService {
    /**
     * 核心线程数
     * 当向线程池提交一个任务时,若线程池已创建的线程数小于corePoolSize,即便此时存在空闲线程,
     * 也会通过创建一个新线程来执行该任务,直到已创建的线程数大于或等于corePoolSize
     */
    private volatile int corePoolSize;
    /**
     * 最大线程数
     * 当队列满了,且已创建的线程数小于maximumPoolSize,则线程池会创建新的线程来执行任务。
     * 另外,对于无界队列,可忽略该参数
     */
    private volatile int maximumPoolSize;
    /**
     * 线程存活保持时间
     * 当线程池中线程数 超出核心线程数,且线程的空闲时间也超过 keepAliveTime时,
     * 那么这个线程就会被销毁,直到线程池中的线程数小于等于核心线程数
     */
    private volatile long keepAliveTime;
    /**
     * 任务队列
     * 用于传输和保存等待执行任务的阻塞队列
     */
    private final BlockingQueue<Runnable> workQueue;
    /**
     * 线程工厂
     * 用于创建新线程。threadFactory 创建的线程也是采用 new Thread() 方式,threadFactory
     * 创建的线程名都具有统一的风格:pool-m-thread-n(m为线程池的编号,n为线程池中线程的编号
     */
    private volatile ThreadFactory threadFactory;
    /**
     * 线程饱和策略
     * 当线程池和队列都满了,再加入的线程会执行此策略
     */
    private volatile RejectedExecutionHandler handler;
    /**
     * 构造方法提供了多种重载,但实际上都使用了最后一个重载 完成了实例化
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             threadFactory, defaultHandler);
    }
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              RejectedExecutionHandler handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), handler);
    }
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
    /**
     * 执行一个任务,但没有返回值
     */
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }
    /**
     * 提交一个线程任务,有返回值。该方法继承自其父类 AbstractExecutorService,有多种重载,这是最常用的一个。
     * 通过future.get()获取返回值(阻塞直到任务执行完)
     */
    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }
    /**
     * 关闭线程池,不再接收新的任务,但会把已有的任务执行完
     */
    public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(SHUTDOWN);
            interruptIdleWorkers();
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
    }
    /**
     * 立即关闭线程池,已有的任务也会被抛弃
     */
    public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(STOP);
            interruptWorkers();
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
        return tasks;
    }
    public boolean isShutdown() {
        return ! isRunning(ctl.get());
    }
}Copy to clipboardErrorCopied

Executors 提供的 4 种线程池

Executors 类 通过 ThreadPoolExecutor 封装了 4 种常用的线程池:CachedThreadPool,FixedThreadPool,ScheduledThreadPool 和 SingleThreadExecutor。其功能如下。

  1. CachedThreadPool:用来创建一个几乎可以无限扩大的线程池(最大线程数为 Integer.MAX_VALUE),适用于执行大量短生命周期的异步任务。
  2. FixedThreadPool:创建一个固定大小的线程池,保证线程数可控,不会造成线程过多,导致系统负载更为严重。
  3. SingleThreadExecutor:创建一个单线程的线程池,可以保证任务按调用顺序执行。
  4. ScheduledThreadPool:适用于执行 延时 或者 周期性 任务。

如何配置线程池

  • CPU 密集型任务
    尽量使用较小的线程池,一般为 CPU 核心数+1。 因为 CPU 密集型任务 使得 CPU 使用率 很高,若开过多的线程数,会造成 CPU 过度切换。
  • IO 密集型任务
    可以使用稍大的线程池,一般为 2*CPU 核心数。 IO 密集型任务 CPU 使用率 并不高,因此可以让 CPU 在等待 IO 的时候有其他线程去处理别的任务,充分利用 CPU 时间。

线程池的实际应用

Tomcat 在分发 web 请求 时使用了线程池来处理。

相关文章
|
24天前
|
存储 缓存 安全
Java内存模型深度解析:从理论到实践####
【10月更文挑战第21天】 本文深入探讨了Java内存模型(JMM)的核心概念与底层机制,通过剖析其设计原理、内存可见性问题及其解决方案,结合具体代码示例,帮助读者构建对JMM的全面理解。不同于传统的摘要概述,我们将直接以故事化手法引入,让读者在轻松的情境中领略JMM的精髓。 ####
33 6
|
15天前
|
安全 Java 程序员
深入理解Java内存模型与并发编程####
本文旨在探讨Java内存模型(JMM)的复杂性及其对并发编程的影响,不同于传统的摘要形式,本文将以一个实际案例为引子,逐步揭示JMM的核心概念,包括原子性、可见性、有序性,以及这些特性在多线程环境下的具体表现。通过对比分析不同并发工具类的应用,如synchronized、volatile关键字、Lock接口及其实现等,本文将展示如何在实践中有效利用JMM来设计高效且安全的并发程序。最后,还将简要介绍Java 8及更高版本中引入的新特性,如StampedLock,以及它们如何进一步优化多线程编程模型。 ####
21 0
|
25天前
|
存储 算法 Java
Java内存管理深度剖析与优化策略####
本文深入探讨了Java虚拟机(JVM)的内存管理机制,重点分析了堆内存的分配策略、垃圾回收算法以及如何通过调优提升应用性能。通过案例驱动的方式,揭示了常见内存泄漏的根源与解决策略,旨在为开发者提供实用的内存管理技巧,确保应用程序既高效又稳定地运行。 ####
|
17天前
|
存储 监控 算法
Java内存管理深度剖析:从垃圾收集到内存泄漏的全面指南####
本文深入探讨了Java虚拟机(JVM)中的内存管理机制,特别是垃圾收集(GC)的工作原理及其调优策略。不同于传统的摘要概述,本文将通过实际案例分析,揭示内存泄漏的根源与预防措施,为开发者提供实战中的优化建议,旨在帮助读者构建高效、稳定的Java应用。 ####
31 8
|
15天前
|
存储 监控 算法
深入探索Java虚拟机(JVM)的内存管理机制
本文旨在为读者提供对Java虚拟机(JVM)内存管理机制的深入理解。通过详细解析JVM的内存结构、垃圾回收算法以及性能优化策略,本文不仅揭示了Java程序高效运行背后的原理,还为开发者提供了优化应用程序性能的实用技巧。不同于常规摘要仅概述文章大意,本文摘要将简要介绍JVM内存管理的关键点,为读者提供一个清晰的学习路线图。
|
19天前
|
存储 算法 Java
Java 内存管理与优化:掌控堆与栈,雕琢高效代码
Java内存管理与优化是提升程序性能的关键。掌握堆与栈的运作机制,学习如何有效管理内存资源,雕琢出更加高效的代码,是每个Java开发者必备的技能。
46 5
|
17天前
|
存储 算法 Java
Java内存管理深度解析####
本文深入探讨了Java虚拟机(JVM)中的内存分配与垃圾回收机制,揭示了其高效管理内存的奥秘。文章首先概述了JVM内存模型,随后详细阐述了堆、栈、方法区等关键区域的作用及管理策略。在垃圾回收部分,重点介绍了标记-清除、复制算法、标记-整理等多种回收算法的工作原理及其适用场景,并通过实际案例分析了不同GC策略对应用性能的影响。对于开发者而言,理解这些原理有助于编写出更加高效、稳定的Java应用程序。 ####
|
17天前
|
安全 Java 程序员
Java内存模型的深入理解与实践
本文旨在深入探讨Java内存模型(JMM)的核心概念,包括原子性、可见性和有序性,并通过实例代码分析这些特性在实际编程中的应用。我们将从理论到实践,逐步揭示JMM在多线程编程中的重要性和复杂性,帮助读者构建更加健壮的并发程序。
|
22天前
|
算法 Java 开发者
Java内存管理与垃圾回收机制深度剖析####
本文深入探讨了Java虚拟机(JVM)的内存管理机制,特别是其垃圾回收机制的工作原理、算法及实践优化策略。不同于传统的摘要概述,本文将以一个虚拟的“城市环卫系统”为比喻,生动形象地揭示Java内存管理的奥秘,旨在帮助开发者更好地理解并调优Java应用的性能。 ####
|
23天前
|
Java
java内存区域
1)栈内存:保存所有的对象名称 2)堆内存:保存每个对象的具体属性 3)全局数据区:保存static类型的属性 4)全局代码区:保存所有的方法定义
21 1