Java 内存模型(二)

简介: 《读尽源码》

ThreadLocal 的实现原理

一般,类属性中的数据是多个线程共享的,但 ThreadLocal 类型的数据 声明为类属性,却可以为每一个使用它(通过 set(T value)方法)的线程存储 线程私有的数据,通过其源码我们可以发现其中的原理。

public class ThreadLocal<T> {
    /**
     * 下面的 getMap()方法 传入当前线程,获得一个ThreadLocalMap对象,说明每一个线程维护了
     * 自己的一个 map,保证读取出来的value是自己线程的。
     *
     * ThreadLocalMap 是ThreadLocal静态内部类,存储value的键值就是ThreadLocal本身。
     *
     * 因此可以断定,每个线程维护一个ThreadLocalMap的键值对映射Map。不同线程的Map的 key值 是一样的,
     * 都是ThreadLocal,但 value 是不同的。
     */
    public T get() {
        Thread t = Thread.currentThread();
        ThreadLocalMap map = getMap(t);
        if (map != null) {
            ThreadLocalMap.Entry e = map.getEntry(this);
            if (e != null) {
                @SuppressWarnings("unchecked")
                T result = (T)e.value;
                return result;
            }
        }
        return setInitialValue();
    }
    public void set(T value) {
        Thread t = Thread.currentThread();
        ThreadLocalMap map = getMap(t);
        if (map != null)
            map.set(this, value);
        else
            createMap(t, value);
    }
}Copy to clipboardErrorCopied

ThreadLocal 在 Spring 中的使用

Spring 事务处理的设计与实现中大量使用了 ThreadLocal 类,比如,TransactionSynchronizationManager 维护了一系列的 ThreadLocal 变量,用于存储线程私有的 事务属性及资源。源码如下。

/**
 * 管理每个线程的资源和事务同步的中心帮助程序。供资源管理代码使用,但不供典型应用程序代码使用。
 *
 * 资源管理代码应该检查线程绑定的资源,如,JDBC连接 或 Hibernate Sessions。
 * 此类代码通常不应该将资源绑定到线程,因为这是事务管理器的职责。另一个选项是,
 * 如果事务同步处于活动状态,则在首次使用时延迟绑定,以执行跨任意数量资源的事务。
 */
public abstract class TransactionSynchronizationManager {
    /**
     *  一般是一个线程持有一个 独立的事务,以相互隔离地处理各自的事务。
     *  所以这里使用了很多 ThreadLocal对象,为每个线程绑定 对应的事务属性及资源,
     *  以便后续使用时能直接获取。
     */
    private static final ThreadLocal<Map<Object, Object>> resources =
            new NamedThreadLocal<Map<Object, Object>>("Transactional resources");
    private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations =
            new NamedThreadLocal<Set<TransactionSynchronization>>("Transaction synchronizations");
    private static final ThreadLocal<String> currentTransactionName =
            new NamedThreadLocal<String>("Current transaction name");
    private static final ThreadLocal<Boolean> currentTransactionReadOnly =
            new NamedThreadLocal<Boolean>("Current transaction read-only status");
    private static final ThreadLocal<Integer> currentTransactionIsolationLevel =
            new NamedThreadLocal<Integer>("Current transaction isolation level");
    private static final ThreadLocal<Boolean> actualTransactionActive =
            new NamedThreadLocal<Boolean>("Actual transaction active");
    /**
     * 为当前线程 绑定 对应的resource资源
     */
    public static void bindResource(Object key, Object value) throws IllegalStateException {
        Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
        Assert.notNull(value, "Value must not be null");
        Map<Object, Object> map = resources.get();
        // 如果当前线程的 resources中,绑定的数据map为空,则为 resources 绑定 map
        if (map == null) {
            map = new HashMap<Object, Object>();
            resources.set(map);
        }
        Object oldValue = map.put(actualKey, value);
        if (oldValue instanceof ResourceHolder && ((ResourceHolder) oldValue).isVoid()) {
            oldValue = null;
        }
        if (oldValue != null) {
            throw new IllegalStateException("Already value [" + oldValue + "] for key [" +
                    actualKey + "] bound to thread [" + Thread.currentThread().getName() + "]");
        }
        if (logger.isTraceEnabled()) {
            logger.trace("Bound value [" + value + "] for key [" + actualKey + "] to thread [" +
                    Thread.currentThread().getName() + "]");
        }
    }
    /**
     * 返回当前线程绑定的所有资源
     */
    public static Map<Object, Object> getResourceMap() {
        Map<Object, Object> map = resources.get();
        return (map != null ? Collections.unmodifiableMap(map) : Collections.emptyMap());
    }
}Copy to clipboardErrorCopied

ThreadLocal 在 Mybatis 中的使用

Mybatis 的 SqlSession 对象 也是各线程私有的资源,所以对其的管理也使用到了 ThreadLocal 类。源码如下。

public class SqlSessionManager implements SqlSessionFactory, SqlSession {
  private final ThreadLocal<SqlSession> localSqlSession = new ThreadLocal<>();
  public void startManagedSession() {
    this.localSqlSession.set(openSession());
  }
  public void startManagedSession(boolean autoCommit) {
    this.localSqlSession.set(openSession(autoCommit));
  }
  public void startManagedSession(Connection connection) {
    this.localSqlSession.set(openSession(connection));
  }
  public void startManagedSession(TransactionIsolationLevel level) {
    this.localSqlSession.set(openSession(level));
  }
  public void startManagedSession(ExecutorType execType) {
    this.localSqlSession.set(openSession(execType));
  }
  public void startManagedSession(ExecutorType execType, boolean autoCommit) {
    this.localSqlSession.set(openSession(execType, autoCommit));
  }
  public void startManagedSession(ExecutorType execType, TransactionIsolationLevel level) {
    this.localSqlSession.set(openSession(execType, level));
  }
  public void startManagedSession(ExecutorType execType, Connection connection) {
    this.localSqlSession.set(openSession(execType, connection));
  }
  public boolean isManagedSessionStarted() {
    return this.localSqlSession.get() != null;
  }
  @Override
  public Connection getConnection() {
    final SqlSession sqlSession = localSqlSession.get();
    if (sqlSession == null) {
      throw new SqlSessionException("Error:  Cannot get connection.  No managed session is started.");
    }
    return sqlSession.getConnection();
  }
  @Override
  public void clearCache() {
    final SqlSession sqlSession = localSqlSession.get();
    if (sqlSession == null) {
      throw new SqlSessionException("Error:  Cannot clear the cache.  No managed session is started.");
    }
    sqlSession.clearCache();
  }
  @Override
  public void commit() {
    final SqlSession sqlSession = localSqlSession.get();
    if (sqlSession == null) {
      throw new SqlSessionException("Error:  Cannot commit.  No managed session is started.");
    }
    sqlSession.commit();
  }
  @Override
  public void commit(boolean force) {
    final SqlSession sqlSession = localSqlSession.get();
    if (sqlSession == null) {
      throw new SqlSessionException("Error:  Cannot commit.  No managed session is started.");
    }
    sqlSession.commit(force);
  }
  @Override
  public void rollback() {
    final SqlSession sqlSession = localSqlSession.get();
    if (sqlSession == null) {
      throw new SqlSessionException("Error:  Cannot rollback.  No managed session is started.");
    }
    sqlSession.rollback();
  }
  @Override
  public void rollback(boolean force) {
    final SqlSession sqlSession = localSqlSession.get();
    if (sqlSession == null) {
      throw new SqlSessionException("Error:  Cannot rollback.  No managed session is started.");
    }
    sqlSession.rollback(force);
  }
  @Override
  public List<BatchResult> flushStatements() {
    final SqlSession sqlSession = localSqlSession.get();
    if (sqlSession == null) {
      throw new SqlSessionException("Error:  Cannot rollback.  No managed session is started.");
    }
    return sqlSession.flushStatements();
  }
  @Override
  public void close() {
    final SqlSession sqlSession = localSqlSession.get();
    if (sqlSession == null) {
      throw new SqlSessionException("Error:  Cannot close.  No managed session is started.");
    }
    try {
      sqlSession.close();
    } finally {
      localSqlSession.set(null);
    }
  }
}Copy to clipboardErrorCopied

J.U.C 包的实际应用

线程池 ThreadPoolExecutor

首先通过 ThreadPoolExecutor 的源码 看一下线程池的主要参数及方法。

public class ThreadPoolExecutor extends AbstractExecutorService {
    /**
     * 核心线程数
     * 当向线程池提交一个任务时,若线程池已创建的线程数小于corePoolSize,即便此时存在空闲线程,
     * 也会通过创建一个新线程来执行该任务,直到已创建的线程数大于或等于corePoolSize
     */
    private volatile int corePoolSize;
    /**
     * 最大线程数
     * 当队列满了,且已创建的线程数小于maximumPoolSize,则线程池会创建新的线程来执行任务。
     * 另外,对于无界队列,可忽略该参数
     */
    private volatile int maximumPoolSize;
    /**
     * 线程存活保持时间
     * 当线程池中线程数 超出核心线程数,且线程的空闲时间也超过 keepAliveTime时,
     * 那么这个线程就会被销毁,直到线程池中的线程数小于等于核心线程数
     */
    private volatile long keepAliveTime;
    /**
     * 任务队列
     * 用于传输和保存等待执行任务的阻塞队列
     */
    private final BlockingQueue<Runnable> workQueue;
    /**
     * 线程工厂
     * 用于创建新线程。threadFactory 创建的线程也是采用 new Thread() 方式,threadFactory
     * 创建的线程名都具有统一的风格:pool-m-thread-n(m为线程池的编号,n为线程池中线程的编号
     */
    private volatile ThreadFactory threadFactory;
    /**
     * 线程饱和策略
     * 当线程池和队列都满了,再加入的线程会执行此策略
     */
    private volatile RejectedExecutionHandler handler;
    /**
     * 构造方法提供了多种重载,但实际上都使用了最后一个重载 完成了实例化
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             threadFactory, defaultHandler);
    }
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              RejectedExecutionHandler handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), handler);
    }
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
    /**
     * 执行一个任务,但没有返回值
     */
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }
    /**
     * 提交一个线程任务,有返回值。该方法继承自其父类 AbstractExecutorService,有多种重载,这是最常用的一个。
     * 通过future.get()获取返回值(阻塞直到任务执行完)
     */
    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }
    /**
     * 关闭线程池,不再接收新的任务,但会把已有的任务执行完
     */
    public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(SHUTDOWN);
            interruptIdleWorkers();
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
    }
    /**
     * 立即关闭线程池,已有的任务也会被抛弃
     */
    public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(STOP);
            interruptWorkers();
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
        return tasks;
    }
    public boolean isShutdown() {
        return ! isRunning(ctl.get());
    }
}Copy to clipboardErrorCopied

Executors 提供的 4 种线程池

Executors 类 通过 ThreadPoolExecutor 封装了 4 种常用的线程池:CachedThreadPool,FixedThreadPool,ScheduledThreadPool 和 SingleThreadExecutor。其功能如下。

  1. CachedThreadPool:用来创建一个几乎可以无限扩大的线程池(最大线程数为 Integer.MAX_VALUE),适用于执行大量短生命周期的异步任务。
  2. FixedThreadPool:创建一个固定大小的线程池,保证线程数可控,不会造成线程过多,导致系统负载更为严重。
  3. SingleThreadExecutor:创建一个单线程的线程池,可以保证任务按调用顺序执行。
  4. ScheduledThreadPool:适用于执行 延时 或者 周期性 任务。

如何配置线程池

  • CPU 密集型任务
    尽量使用较小的线程池,一般为 CPU 核心数+1。 因为 CPU 密集型任务 使得 CPU 使用率 很高,若开过多的线程数,会造成 CPU 过度切换。
  • IO 密集型任务
    可以使用稍大的线程池,一般为 2*CPU 核心数。 IO 密集型任务 CPU 使用率 并不高,因此可以让 CPU 在等待 IO 的时候有其他线程去处理别的任务,充分利用 CPU 时间。

线程池的实际应用

Tomcat 在分发 web 请求 时使用了线程池来处理。

相关文章
|
11天前
|
存储 Java 编译器
Java内存模型(JMM)深度解析####
本文深入探讨了Java内存模型(JMM)的工作原理,旨在帮助开发者理解多线程环境下并发编程的挑战与解决方案。通过剖析JVM如何管理线程间的数据可见性、原子性和有序性问题,本文将揭示synchronized关键字背后的机制,并介绍volatile关键字和final关键字在保证变量同步与不可变性方面的作用。同时,文章还将讨论现代Java并发工具类如java.util.concurrent包中的核心组件,以及它们如何简化高效并发程序的设计。无论你是初学者还是有经验的开发者,本文都将为你提供宝贵的见解,助你在Java并发编程领域更进一步。 ####
|
22天前
|
缓存 easyexcel Java
Java EasyExcel 导出报内存溢出如何解决
大家好,我是V哥。使用EasyExcel进行大数据量导出时容易导致内存溢出,特别是在导出百万级别的数据时。以下是V哥整理的解决该问题的一些常见方法,包括分批写入、设置合适的JVM内存、减少数据对象的复杂性、关闭自动列宽设置、使用Stream导出以及选择合适的数据导出工具。此外,还介绍了使用Apache POI的SXSSFWorkbook实现百万级别数据量的导出案例,帮助大家更好地应对大数据导出的挑战。欢迎一起讨论!
134 1
|
6天前
|
缓存 算法 Java
本文聚焦于Java内存管理与调优,介绍Java内存模型、内存泄漏检测与预防、高效字符串拼接、数据结构优化及垃圾回收机制
在现代软件开发中,性能优化至关重要。本文聚焦于Java内存管理与调优,介绍Java内存模型、内存泄漏检测与预防、高效字符串拼接、数据结构优化及垃圾回收机制。通过调整垃圾回收器参数、优化堆大小与布局、使用对象池和缓存技术,开发者可显著提升应用性能和稳定性。
21 6
|
10天前
|
存储 缓存 安全
Java内存模型(JMM):深入理解并发编程的基石####
【10月更文挑战第29天】 本文作为一篇技术性文章,旨在深入探讨Java内存模型(JMM)的核心概念、工作原理及其在并发编程中的应用。我们将从JMM的基本定义出发,逐步剖析其如何通过happens-before原则、volatile关键字、synchronized关键字等机制,解决多线程环境下的数据可见性、原子性和有序性问题。不同于常规摘要的简述方式,本摘要将直接概述文章的核心内容,为读者提供一个清晰的学习路径。 ####
31 2
|
11天前
|
存储 安全 Java
什么是 Java 的内存模型?
Java内存模型(Java Memory Model, JMM)是Java虚拟机(JVM)规范的一部分,它定义了一套规则,用于指导Java程序中变量的访问和内存交互方式。
28 1
|
17天前
|
存储 运维 Java
💻Java零基础:深入了解Java内存机制
【10月更文挑战第18天】本文收录于「滚雪球学Java」专栏,专业攻坚指数级提升,希望能够助你一臂之力,帮你早日登顶实现财富自由🚀;同时,欢迎大家关注&&收藏&&订阅!持续更新中,up!up!up!!
25 1
|
20天前
|
存储 算法 Java
Java虚拟机(JVM)的内存管理与性能优化
本文深入探讨了Java虚拟机(JVM)的内存管理机制,包括堆、栈、方法区等关键区域的功能与作用。通过分析垃圾回收算法和调优策略,旨在帮助开发者理解如何有效提升Java应用的性能。文章采用通俗易懂的语言,结合具体实例,使读者能够轻松掌握复杂的内存管理概念,并应用于实际开发中。
|
27天前
|
存储 监控 算法
Java中的内存管理与垃圾回收机制解析
本文深入探讨了Java编程语言中的内存管理方式,特别是垃圾回收机制。我们将了解Java的自动内存管理是如何工作的,它如何帮助开发者避免常见的内存泄漏问题。通过分析不同垃圾回收算法(如标记-清除、复制和标记-整理)以及JVM如何选择合适的垃圾回收策略,本文旨在帮助Java开发者更好地理解和优化应用程序的性能。
|
29天前
|
存储 Java
Java内存模型
【10月更文挑战第11天】Java 内存模型(JMM)是 Java 虚拟机规范中定义的多线程内存访问机制,解决内存可见性、原子性和有序性问题。它定义了主内存和工作内存的概念,以及可见性、原子性和有序性的规则,确保多线程环境下的数据一致性和操作正确性。使用 `synchronized` 和 `volatile` 等同步机制可有效避免数据竞争和不一致问题。
29 3
|
29天前
|
缓存 安全 Java
使用 Java 内存模型解决多线程中的数据竞争问题
【10月更文挑战第11天】在 Java 多线程编程中,数据竞争是一个常见问题。通过使用 `synchronized` 关键字、`volatile` 关键字、原子类、显式锁、避免共享可变数据、合理设计数据结构、遵循线程安全原则和使用线程池等方法,可以有效解决数据竞争问题,确保程序的正确性和稳定性。
35 2