ThreadLocal 的实现原理
一般,类属性中的数据是多个线程共享的,但 ThreadLocal 类型的数据 声明为类属性,却可以为每一个使用它(通过 set(T value)方法)的线程存储 线程私有的数据,通过其源码我们可以发现其中的原理。
public class ThreadLocal<T> { /** * 下面的 getMap()方法 传入当前线程,获得一个ThreadLocalMap对象,说明每一个线程维护了 * 自己的一个 map,保证读取出来的value是自己线程的。 * * ThreadLocalMap 是ThreadLocal静态内部类,存储value的键值就是ThreadLocal本身。 * * 因此可以断定,每个线程维护一个ThreadLocalMap的键值对映射Map。不同线程的Map的 key值 是一样的, * 都是ThreadLocal,但 value 是不同的。 */ public T get() { Thread t = Thread.currentThread(); ThreadLocalMap map = getMap(t); if (map != null) { ThreadLocalMap.Entry e = map.getEntry(this); if (e != null) { @SuppressWarnings("unchecked") T result = (T)e.value; return result; } } return setInitialValue(); } public void set(T value) { Thread t = Thread.currentThread(); ThreadLocalMap map = getMap(t); if (map != null) map.set(this, value); else createMap(t, value); } }Copy to clipboardErrorCopied
ThreadLocal 在 Spring 中的使用
Spring 事务处理的设计与实现中大量使用了 ThreadLocal 类,比如,TransactionSynchronizationManager 维护了一系列的 ThreadLocal 变量,用于存储线程私有的 事务属性及资源。源码如下。
/** * 管理每个线程的资源和事务同步的中心帮助程序。供资源管理代码使用,但不供典型应用程序代码使用。 * * 资源管理代码应该检查线程绑定的资源,如,JDBC连接 或 Hibernate Sessions。 * 此类代码通常不应该将资源绑定到线程,因为这是事务管理器的职责。另一个选项是, * 如果事务同步处于活动状态,则在首次使用时延迟绑定,以执行跨任意数量资源的事务。 */ public abstract class TransactionSynchronizationManager { /** * 一般是一个线程持有一个 独立的事务,以相互隔离地处理各自的事务。 * 所以这里使用了很多 ThreadLocal对象,为每个线程绑定 对应的事务属性及资源, * 以便后续使用时能直接获取。 */ private static final ThreadLocal<Map<Object, Object>> resources = new NamedThreadLocal<Map<Object, Object>>("Transactional resources"); private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations = new NamedThreadLocal<Set<TransactionSynchronization>>("Transaction synchronizations"); private static final ThreadLocal<String> currentTransactionName = new NamedThreadLocal<String>("Current transaction name"); private static final ThreadLocal<Boolean> currentTransactionReadOnly = new NamedThreadLocal<Boolean>("Current transaction read-only status"); private static final ThreadLocal<Integer> currentTransactionIsolationLevel = new NamedThreadLocal<Integer>("Current transaction isolation level"); private static final ThreadLocal<Boolean> actualTransactionActive = new NamedThreadLocal<Boolean>("Actual transaction active"); /** * 为当前线程 绑定 对应的resource资源 */ public static void bindResource(Object key, Object value) throws IllegalStateException { Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key); Assert.notNull(value, "Value must not be null"); Map<Object, Object> map = resources.get(); // 如果当前线程的 resources中,绑定的数据map为空,则为 resources 绑定 map if (map == null) { map = new HashMap<Object, Object>(); resources.set(map); } Object oldValue = map.put(actualKey, value); if (oldValue instanceof ResourceHolder && ((ResourceHolder) oldValue).isVoid()) { oldValue = null; } if (oldValue != null) { throw new IllegalStateException("Already value [" + oldValue + "] for key [" + actualKey + "] bound to thread [" + Thread.currentThread().getName() + "]"); } if (logger.isTraceEnabled()) { logger.trace("Bound value [" + value + "] for key [" + actualKey + "] to thread [" + Thread.currentThread().getName() + "]"); } } /** * 返回当前线程绑定的所有资源 */ public static Map<Object, Object> getResourceMap() { Map<Object, Object> map = resources.get(); return (map != null ? Collections.unmodifiableMap(map) : Collections.emptyMap()); } }Copy to clipboardErrorCopied
ThreadLocal 在 Mybatis 中的使用
Mybatis 的 SqlSession 对象 也是各线程私有的资源,所以对其的管理也使用到了 ThreadLocal 类。源码如下。
public class SqlSessionManager implements SqlSessionFactory, SqlSession { private final ThreadLocal<SqlSession> localSqlSession = new ThreadLocal<>(); public void startManagedSession() { this.localSqlSession.set(openSession()); } public void startManagedSession(boolean autoCommit) { this.localSqlSession.set(openSession(autoCommit)); } public void startManagedSession(Connection connection) { this.localSqlSession.set(openSession(connection)); } public void startManagedSession(TransactionIsolationLevel level) { this.localSqlSession.set(openSession(level)); } public void startManagedSession(ExecutorType execType) { this.localSqlSession.set(openSession(execType)); } public void startManagedSession(ExecutorType execType, boolean autoCommit) { this.localSqlSession.set(openSession(execType, autoCommit)); } public void startManagedSession(ExecutorType execType, TransactionIsolationLevel level) { this.localSqlSession.set(openSession(execType, level)); } public void startManagedSession(ExecutorType execType, Connection connection) { this.localSqlSession.set(openSession(execType, connection)); } public boolean isManagedSessionStarted() { return this.localSqlSession.get() != null; } @Override public Connection getConnection() { final SqlSession sqlSession = localSqlSession.get(); if (sqlSession == null) { throw new SqlSessionException("Error: Cannot get connection. No managed session is started."); } return sqlSession.getConnection(); } @Override public void clearCache() { final SqlSession sqlSession = localSqlSession.get(); if (sqlSession == null) { throw new SqlSessionException("Error: Cannot clear the cache. No managed session is started."); } sqlSession.clearCache(); } @Override public void commit() { final SqlSession sqlSession = localSqlSession.get(); if (sqlSession == null) { throw new SqlSessionException("Error: Cannot commit. No managed session is started."); } sqlSession.commit(); } @Override public void commit(boolean force) { final SqlSession sqlSession = localSqlSession.get(); if (sqlSession == null) { throw new SqlSessionException("Error: Cannot commit. No managed session is started."); } sqlSession.commit(force); } @Override public void rollback() { final SqlSession sqlSession = localSqlSession.get(); if (sqlSession == null) { throw new SqlSessionException("Error: Cannot rollback. No managed session is started."); } sqlSession.rollback(); } @Override public void rollback(boolean force) { final SqlSession sqlSession = localSqlSession.get(); if (sqlSession == null) { throw new SqlSessionException("Error: Cannot rollback. No managed session is started."); } sqlSession.rollback(force); } @Override public List<BatchResult> flushStatements() { final SqlSession sqlSession = localSqlSession.get(); if (sqlSession == null) { throw new SqlSessionException("Error: Cannot rollback. No managed session is started."); } return sqlSession.flushStatements(); } @Override public void close() { final SqlSession sqlSession = localSqlSession.get(); if (sqlSession == null) { throw new SqlSessionException("Error: Cannot close. No managed session is started."); } try { sqlSession.close(); } finally { localSqlSession.set(null); } } }Copy to clipboardErrorCopied
J.U.C 包的实际应用
线程池 ThreadPoolExecutor
首先通过 ThreadPoolExecutor 的源码 看一下线程池的主要参数及方法。
public class ThreadPoolExecutor extends AbstractExecutorService { /** * 核心线程数 * 当向线程池提交一个任务时,若线程池已创建的线程数小于corePoolSize,即便此时存在空闲线程, * 也会通过创建一个新线程来执行该任务,直到已创建的线程数大于或等于corePoolSize */ private volatile int corePoolSize; /** * 最大线程数 * 当队列满了,且已创建的线程数小于maximumPoolSize,则线程池会创建新的线程来执行任务。 * 另外,对于无界队列,可忽略该参数 */ private volatile int maximumPoolSize; /** * 线程存活保持时间 * 当线程池中线程数 超出核心线程数,且线程的空闲时间也超过 keepAliveTime时, * 那么这个线程就会被销毁,直到线程池中的线程数小于等于核心线程数 */ private volatile long keepAliveTime; /** * 任务队列 * 用于传输和保存等待执行任务的阻塞队列 */ private final BlockingQueue<Runnable> workQueue; /** * 线程工厂 * 用于创建新线程。threadFactory 创建的线程也是采用 new Thread() 方式,threadFactory * 创建的线程名都具有统一的风格:pool-m-thread-n(m为线程池的编号,n为线程池中线程的编号 */ private volatile ThreadFactory threadFactory; /** * 线程饱和策略 * 当线程池和队列都满了,再加入的线程会执行此策略 */ private volatile RejectedExecutionHandler handler; /** * 构造方法提供了多种重载,但实际上都使用了最后一个重载 完成了实例化 */ public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; } /** * 执行一个任务,但没有返回值 */ public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); } /** * 提交一个线程任务,有返回值。该方法继承自其父类 AbstractExecutorService,有多种重载,这是最常用的一个。 * 通过future.get()获取返回值(阻塞直到任务执行完) */ public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; } /** * 关闭线程池,不再接收新的任务,但会把已有的任务执行完 */ public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(SHUTDOWN); interruptIdleWorkers(); onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } tryTerminate(); } /** * 立即关闭线程池,已有的任务也会被抛弃 */ public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(STOP); interruptWorkers(); tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); return tasks; } public boolean isShutdown() { return ! isRunning(ctl.get()); } }Copy to clipboardErrorCopied
Executors 提供的 4 种线程池
Executors 类 通过 ThreadPoolExecutor 封装了 4 种常用的线程池:CachedThreadPool,FixedThreadPool,ScheduledThreadPool 和 SingleThreadExecutor。其功能如下。
- CachedThreadPool:用来创建一个几乎可以无限扩大的线程池(最大线程数为 Integer.MAX_VALUE),适用于执行大量短生命周期的异步任务。
- FixedThreadPool:创建一个固定大小的线程池,保证线程数可控,不会造成线程过多,导致系统负载更为严重。
- SingleThreadExecutor:创建一个单线程的线程池,可以保证任务按调用顺序执行。
- ScheduledThreadPool:适用于执行 延时 或者 周期性 任务。
如何配置线程池
- CPU 密集型任务
尽量使用较小的线程池,一般为 CPU 核心数+1。 因为 CPU 密集型任务 使得 CPU 使用率 很高,若开过多的线程数,会造成 CPU 过度切换。 - IO 密集型任务
可以使用稍大的线程池,一般为 2*CPU 核心数。 IO 密集型任务 CPU 使用率 并不高,因此可以让 CPU 在等待 IO 的时候有其他线程去处理别的任务,充分利用 CPU 时间。
线程池的实际应用
Tomcat 在分发 web 请求 时使用了线程池来处理。