合理使用线程池以及线程变量 下

简介: 合理使用线程池以及线程变量

 最佳实践


  • 避免用Executors 的创建线程池


image.gif图片.png


Executors常用方法有以下几个:


  1. newCachedThreadPool():创建一个可缓存的线程池,调用 execute 将重用以前构造的线程(如果线程可用)。如果没有可用的线程,则创建一个新线程并添加到线程池中。终止并从缓存中移除那些已有 60 秒钟未被使用的线程。CachedThreadPool适用于并发执行大量短期耗时短的任务,或者负载较轻的服务器;
  2. newFiexedThreadPool(int nThreads):创建固定数目线程的线程池,线程数小于nThreads时,提交新的任务会创建新的线程,当线程数等于nThreads时,提交新的任务后任务会被加入到阻塞队列,正在执行的线程执行完毕后从队列中取任务执行,FiexedThreadPool适用于负载略重但任务不是特别多的场景,为了合理利用资源,需要限制线程数量;
  3. newSingleThreadExecutor() 创建一个单线程化的 Executor,SingleThreadExecutor适用于串行执行任务的场景,每个任务按顺序执行,不需要并发执行;
  4. newScheduledThreadPool(int corePoolSize) 创建一个支持定时及周期性的任务执行的线程池,多数情况下可用来替代 Timer 类。ScheduledThreadPool中,返回了一个ScheduledThreadPoolExecutor实例,而ScheduledThreadPoolExecutor实际上继承了ThreadPoolExecutor。从代码中可以看出,ScheduledThreadPool基于ThreadPoolExecutor,corePoolSize大小为传入的corePoolSize,maximumPoolSize大小为Integer.MAX_VALUE,超时时间为0,workQueue为DelayedWorkQueue。实际上ScheduledThreadPool是一个调度池,其实现了schedule、scheduleAtFixedRate、scheduleWithFixedDelay三个方法,可以实现延迟执行、周期执行等操作;
  5. newSingleThreadScheduledExecutor() 创建一个corePoolSize为1的ScheduledThreadPoolExecutor;
  6. newWorkStealingPool(int parallelism)返回一个ForkJoinPool实例,ForkJoinPool 主要用于实现“分而治之”的算法,适合于计算密集型的任务。


Executors类看起来功能比较强大、用起来还比较方便,但存在如下弊端


  1. FiexedThreadPool和SingleThreadPool任务队列长度为Integer.MAX_VALUE,可能会堆积大量的请求,从而导致OOM;
  2. CachedThreadPool和ScheduledThreadPool允许创建的线程数量为Integer.MAX_VALUE,可能会创建大量的线程,从而导致OOM;


使用线程时,可以直接调用 ThreadPoolExecutor 的构造函数来创建线程池,并根据业务实际场景来设置corePoolSize、blockingQueue、RejectedExecuteHandler等参数。


  • 避免使用局部线程池


使用局部线程池时,若任务执行完后没有执行shutdown()方法或有其他不当引用,极易造成系统资源耗尽。


  • 合理设置线程池参数


在工程实践中,通常使用下述公式来计算核心线程数:

nThreads=(w+c)/c*n*u=(w/c+1)*n*u  


其中,w为等待时间,c为计算时间,n为CPU核心数(通常可通过 Runtime.getRuntime().availableProcessors()方法获取),u为CPU目标利用率(取值区间为[0, 1]);在最大化CPU利用率的情况下,当处理的任务为计算密集型任务时,即等待时间w为0,此时核心线程数等于CPU核心数。


上述计算公式是理想情况下的建议核心线程数,而不同系统/应用在运行不同的任务时可能会有一定的差异,因此最佳线程数参数还需要根据任务的实际运行情况和压测表现进行微调。


  • 增加异常处理


为了更好地发现、分析和解决问题,建议在使用多线程时增加对异常的处理,异常处理通常有下述方案:


  1. 在任务代码处增加try...catch异常处理
  2. 如果使用的Future方式,则可通过Future对象的get方法接收抛出的异常
  3. 为工作线程设置setUncaughtExceptionHandler,在uncaughtException方法中处理异常


  • 优雅关闭线程池


public void destroy() {
        try {
            poolExecutor.shutdown();
            if (!poolExecutor.awaitTermination(AWAIT_TIMEOUT, TimeUnit.SECONDS)) {
                poolExecutor.shutdownNow();
            }
        } catch (InterruptedException e) {
            // 如果当前线程被中断,重新取消所有任务
            pool.shutdownNow();
            // 保持中断状态
            Thread.currentThread().interrupt();
        }
    }


为了实现优雅停机的目标,我们应当先调用shutdown方法,调用这个方法也就意味着,这个线程池不会再接收任何新的任务,但是已经提交的任务还会继续执行。之后我们还应当调用awaitTermination方法,这个方法可以设定线程池在关闭之前的最大超时时间,如果在超时时间结束之前线程池能够正常关闭则会返回true,否则,超时会返回false。通常我们需要根据业务场景预估一个合理的超时时间,然后调用该方法。


如果awaitTermination方法返回false,但又希望尽可能在线程池关闭之后再做其他资源回收工作,可以考虑再调用一下shutdownNow方法,此时队列中所有尚未被处理的任务都会被丢弃,同时会设置线程池中每个线程的中断标志位。shutdownNow并不保证一定可以让正在运行的线程停止工作,除非提交给线程的任务能够正确响应中断。


  • 鹰眼上下文参数传递


/**
* 在主线程中,开启鹰眼异步模式,并将ctx传递给多线程任务
**/
// 防止鹰眼链路丢失,需要传递
RpcContext_inner ctx = EagleEye.getRpcContext();
// 开启异步模式
ctx.setAsyncMode(true);
/**
* 在线程池任务线程中,设置鹰眼rpc环境
**/
private void runTask() {
    try {
        EagleEye.setRpcContext(ctx);
        // do something...
    } catch (Exception e) {
        log.error("requestError, params: {}", this.params, e);
    } finally {
        // 判断当前任务是否是主线程在运行,当Rejected策略为CallerRunsPolicy的时候,核对当前线程
        if (mainThread != Thread.currentThread()) {
            EagleEye.clearRpcContext();
        }
    }
}



ThreadLocal线程变量概述

 什么是ThreadLocal


ThreadLocal类提供了线程本地变量(thread-local variables),这些变量不同于普通的变量,访问线程本地变量的每个线程(通过其get或set方法)都有其自己的独立初始化的变量副本,因此ThreadLocal没有多线程竞争的问题,不需要单独进行加锁。


 ThreadLocal使用场景

  1. 每个线程都需要有属于自己的实例数据(线程隔离);
  2. 框架跨层数据的传递;
  3. 需要参数全局传递的复杂调用链路的场景;
  4. 数据库连接的管理,在AOP的各种嵌套调用中保证事务的一致性;


ThreadLocal的原理与实践

对于ThreadLocal而言,常用的方法有get/set/initialValue 3个方法。


众所周知,在java中SimpleDateFormat有线程安全问题,为了安全地使用SimpleDateFormat,除了1)创建SimpleDateFormat局部变量;和2)加同步锁 两种方案外,我们还可以使用3)ThreadLocal的方案:


/**
* 使用 ThreadLocal 定义一个全局的 SimpleDateFormat
*/
private static ThreadLocal<SimpleDateFormat> simpleDateFormatThreadLocal = new
ThreadLocal<SimpleDateFormat>() {
@Override
protected SimpleDateFormat initialValue() {
return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
}
};
// 用法
String dateString = simpleDateFormatThreadLocal.get().format(calendar.getTime());

 ThreadLocal原理


Thread 内部维护了一个  ThreadLocal.ThreadLocalMap 实例(threadLocals),ThreadLocal 的操作都是围绕着 threadLocals 来操作的。


  • threadLocal.get()方法


/**
     * Returns the value in the current thread's copy of this
     * thread-local variable.  If the variable has no value for the
     * current thread, it is first initialized to the value returned
     * by an invocation of the {@link #initialValue} method.
     *
     * @return the current thread's value of this thread-local
     */
public T get() {
    // 1. 获取当前线程
    Thread t = Thread.currentThread();
    // 2. 获取当前线程内部的ThreadLocalMap变量t.threadLocals;
    ThreadLocalMap map = getMap(t);
    // 3. 判断map是否为null
    if (map != null) {
        // 4. 使用当前threadLocal变量获取entry
        ThreadLocalMap.Entry e = map.getEntry(this);
        // 5. 判断entry是否为null
        if (e != null) {
            // 6.返回Entry.value
            @SuppressWarnings("unchecked")
            T result = (T)e.value;
            return result;
        }
    }
    // 7. 如果map/entry为null设置初始值
    return setInitialValue();
}
 /**
     * Variant of set() to establish initialValue. Used instead
     * of set() in case user has overridden the set() method.
     *
     * @return the initial value
     */
private T setInitialValue() {
    // 1. 初始化value,如果重写就用重写后的value,默认null
    T value = initialValue();
    // 2. 获取当前线程
    Thread t = Thread.currentThread();
    // 3. 获取当前线程内部的ThreadLocalMap变量
    ThreadLocalMap map = getMap(t);
    if (map != null)
        // 4. 不为null就set, key: threadLocal, value: value
        map.set(this, value);
    else
        // 5. map若为null则创建ThreadLocalMap对象
        createMap(t, value);
    return value;
}
/**
 * Create the map associated with a ThreadLocal. Overridden in
 * InheritableThreadLocal.
 *
 * @param t the current thread
 * @param firstValue value for the initial entry of the map
 */
void createMap(Thread t, T firstValue) {
    t.threadLocals = new ThreadLocalMap(this, firstValue);
}
/**
 * Construct a new map initially containing (firstKey, firstValue).
 * ThreadLocalMaps are constructed lazily, so we only create
 * one when we have at least one entry to put in it.
 */
ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
    // 1. 初始化entry数组,size: 16
    table = new Entry[INITIAL_CAPACITY];
    // 2. 计算value的index
    int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);
    // 3. 在对应index位置赋值
    table[i] = new Entry(firstKey, firstValue);
    // 4. entry size
    size = 1;
    // 5. 设置threshold: threshold = len * 2 / 3;
    setThreshold(INITIAL_CAPACITY);
}
/**
 * Set the resize threshold to maintain at worst a 2/3 load factor.
 */
private void setThreshold(int len) {
    threshold = len * 2 / 3;
}


  • threadLocal.set()方法


/**
     * Sets the current thread's copy of this thread-local variable
     * to the specified value.  Most subclasses will have no need to
     * override this method, relying solely on the {@link #initialValue}
     * method to set the values of thread-locals.
     *
     * @param value the value to be stored in the current thread's copy of
     *        this thread-local.
     */
    public void set(T value) {
        // 1. 获取当前线程
        Thread t = Thread.currentThread();
        // 2. 获取当前线程内部的ThreadLocalMap变量
        ThreadLocalMap map = getMap(t);
        if (map != null)
            // 3. 设置value
            map.set(this, value);
        else
            // 4. 若map为null则创建ThreadLocalMap
            createMap(t, value);
    }


  • ThreadLocalMap


从JDK源码可见,ThreadLocalMap中的Entry是弱引用类型的,这就意味着如果这个ThreadLocal只被这个Entry引用,而没有被其他对象强引用时,就会在下一次GC的时候回收掉。


static class ThreadLocalMap {
        /**
         * The entries in this hash map extend WeakReference, using
         * its main ref field as the key (which is always a
         * ThreadLocal object).  Note that null keys (i.e. entry.get()
         * == null) mean that the key is no longer referenced, so the
         * entry can be expunged from table.  Such entries are referred to
         * as "stale entries" in the code that follows.
         */
        static class Entry extends WeakReference<ThreadLocal<?>> {
            /** The value associated with this ThreadLocal. */
            Object value;
            Entry(ThreadLocal<?> k, Object v) {
                super(k);
                value = v;
            }
        }
    // ...
}

 ThreadLocal示例


  • 鹰眼链路ThreadLocal的使用


EagleEye(鹰眼)作为全链路监控系统在集团内部被广泛使用,traceId、rpcId、压测标等信息存储在EagleEye的ThreadLocal变量中,并在HSF/Dubbo服务调用间进行传递。EagleEye通过Filter将数据初始化到ThreadLocal中,部分相关代码如下:


EagleEyeHttpRequest eagleEyeHttpRequest = this.convertHttpRequest(httpRequest);
// 1. 初始化,将traceId、rpcId等数据存储到鹰眼的ThreadLocal变量中
EagleEyeRequestTracer.startTrace(eagleEyeHttpRequest, false);
try {
    chain.doFilter(httpRequest, httpResponse);
} finally {
    // 2. 清理ThreadLocal变量值
    EagleEyeRequestTracer.endTrace(this.convertHttpResponse(httpResponse));
}


在EagleEyeFilter中,通过EagleEyeRequestTracer.startTrace方法进行初始化,在前置入参转换后,通过startTrace重载方法将鹰眼上下文参数存入ThreadLocal中,相关代码如下:


图片.png


图片.png

EagleEyeFilter在finally代码块中,通过EagleEyeRequestTracer.endTrace方法结束调用链,通过clear方法将ThreadLocal中的数据进行清理,相关代码实现如下:


图片.png


  • Bad case:XX项目权益领取失败问题


在某权益领取原有链路中,通过app打开一级页面后才能发起权益领取请求,请求经过淘系无线网关(Mtop)后到达服务端,服务端通过mtop sdk获取当前会话信息。


在XX项目中,对权益领取链路进行了升级改造,在一级页面请求时,通过服务端同时发起权益领取请求。具体地,服务端在处理一级页面请求时,同时通过调用hsf/dubbo接口来进行权益领取,因此在发起rpc调用时需要携带用户当前会话信息,在服务提供端将会话信息进行提取并注入到mtop上下文,从而才能通过mtop sdk获取到会话id等信息。某开发同学在实现时,因ThreadLocal使用不当造成下述问题:


  1. 问题1:因ThreadLocal初始化时机不当,造成获取不到会话信息,进而导致权益领取失败;
  2. 问题2:请求完成时,因未清理ThreadLocal中的变量值,导致脏数据;

【问题1:权益领取失败分析】

在权益领取服务中,该应用构建了一套高效和线程安全的依赖注入框架,基于该框架的业务逻辑模块通常抽象为xxxModule形式,Module间为网状依赖关系,框架会按依赖关系自动调用init方法(其中,被依赖的module 的init方法先执行)。

在应用中,权益领取接口的主入口为CommonXXApplyModule类,CommonXXApplyModule依赖XXSessionModule。当请求来临时,会按依赖关系依次调用init方法,因此XXSessionModule的init方法会优先执行;而开发同学在CommonXXApplyModule类中的init方法中通过调用recoverMtopContext()方法来期望恢复mtop上下文,因recoverMtopContext()方法的调用时机过晚,从而导致XXSessionModule模块获取不到正确的会话id等信息而导致权益领取失败。


图片.png


【问题2:脏数据分析】

权益领取服务在处理请求时,若当前线程曾经处理过权益领取请求,因ThreadLocal变量值未被清理,此时XXSessionModule通过mtop SDK获取会话信息时得到的是前一次请求的会话信息,从而造成脏数据。


【解决方案】

在依赖注入框架入口处AbstractGate#visit(或在XXSessionModule中)通过recoverMtopContext方法注入mtop上下文信息,并在入口方法的finally代码块清理当前请求的threadlocal变量值。


 思考&小结


  1. ThreadLocalMap中的Entry为什么要设计为弱引用类型?

若使用强引用类型,则threadlocal的引用链为:Thread -> ThreadLocal.ThreadLocalMap -> Entry[] -> Entry -> key(threadLocal对象)和value;在这种场景下,只要这个线程还在运行(如线程池场景),若不调用remove方法,则该对象及关联的所有强引用对象都不会被垃圾回收器回收。


  1. 使用static和不使用static修饰threadlocal变量有和区别?

若使用static关键字进行修饰,则一个线程仅对应一个线程变量;否则,threadlocal语义变为perThread-perInstance,容易引发内存泄漏,如下述示例:


public class ThreadLocalTest {
    public static class ThreadLocalDemo {
        private ThreadLocal<String> threadLocalHolder = new ThreadLocal();
        public void setValue(String value) {
            threadLocalHolder.set(value);
        }
        public String getValue() {
            return threadLocalHolder.get();
        }
    }
    public static void main(String[] args) {
        int count = 3;
        List<ThreadLocalDemo> list = new LinkedList<>();
        for (int i = 0; i < count; i++) {
            ThreadLocalDemo demo = new ThreadLocalDemo();
            demo.setValue("demo-" + i);
            list.add(demo);
        }
        System.out.println();
    }
}


在上述main方法第22行debug,可见线程的threadLocals变量中有3个threadlocal实例。在工程实践中,使用threadlocal时通常期望一个线程只有一个threadlocal实例,因此,若不使用static修饰,期望的语义发生了变化,同时易引起内存泄漏。


图片.png


 最佳实践


  • ThreadLocal变量值初始化和清理建议成对出现


如果不执行清理操作,则可能会出现:


  1. 内存泄漏:由于ThreadLocalMap的中key是弱引用,而Value是强引用。这就导致了一个问题,ThreadLocal在没有外部对象强引用时,发生GC时弱引用Key会被回收,而Value不会回收,从而Entry里面的元素出现<null,value>的情况。如果创建ThreadLocal的线程一直持续运行,那么这个Entry对象中的value就有可能一直得不到回收,这样可能会导致内存泄露。
  2. 脏数据:由于线程复用,在用户1请求时,可能保存了业务数据在ThreadLocal中,若不清理,则用户2的请求进来时,可能会读到用户1的数据。


建议使用try...finally 进行清理。


  • ThreadLocal变量建议使用static进行修饰


我们在使用ThreadLocal时,通常期望的语义是perThread,若不使用static进行修饰,则语义变为perThread-perInstance;在线程池场景下,若不用static进行修饰,创建的线程相关实例可能会达到 M * N个(其中M为线程数,N为对应类的实例数),易造成内存泄漏(https://errorprone.info/bugpattern/ThreadLocalUsage)。


  • 谨慎使用ThreadLocal.withInitial


在应用中,谨慎使用ThreadLocal.withInitial(Supplier<? extends S> supplier)这个工厂方法创建ThreadLocal对象,一旦不同线程的ThreadLocal使用了同一个Supplier对象,那么隔离也就无从谈起了,如:


// 反例,实际上使用了共享对象obj而并未隔离,
private static ThreadLocal<Obj> threadLocal = ThreadLocal.withIntitial(() -> obj);

总结


在java工程实践中,线程池和线程变量被广泛使用,因线程池和线程变量的不当使用经常造成安全生产事故,因此,正确使用线程池和线程变量是每一位开发人员必须修炼的基本功。本文从线程池和线程变量的使用出发,简要介绍了线程池和线程变量的原理和使用实践,各开发人员可结合最佳实践和实际应用场景,正确地使用线程和线程变量,构建出稳定、高效的java应用服务。


团队介绍


我们来自大淘宝技术-淘宝交易平台,负责淘宝业务的商品详情、购物车、下单、订单、物流、退款等从购前、购中到购后履约的基础链路相关业务。这里有百亿级别的数据、有超过百万QPS的高并发流量、有丰富的业务场景,服务于10亿级的消费者,支撑淘宝天猫前后端各种行业的基础业务、玩法、规则及业务拓展。这里有巨大的挑战等着你来,若有兴趣可将简历发至lican.lc@alibaba-inc.com,期待您的加入!

相关文章
|
29天前
|
存储 算法 Java
【C/C++ 线程池设计思路】 深入探索线程池设计:任务历史记录的高效管理策略
【C/C++ 线程池设计思路】 深入探索线程池设计:任务历史记录的高效管理策略
72 0
|
1天前
|
安全 算法 Java
JavaSE&多线程&线程池
JavaSE&多线程&线程池
15 7
|
26天前
|
Java 测试技术 Python
Python开启线程和线程池的方法
Python开启线程和线程池的方法
17 0
Python开启线程和线程池的方法
|
29天前
|
安全 Java 调度
【C/C++ 线程池设计思路 】设计与实现支持优先级任务的C++线程池 简要介绍
【C/C++ 线程池设计思路 】设计与实现支持优先级任务的C++线程池 简要介绍
44 2
|
1月前
|
负载均衡 Java 数据处理
【C++ 并发 线程池】轻松掌握C++线程池:从底层原理到高级应用(三)
【C++ 并发 线程池】轻松掌握C++线程池:从底层原理到高级应用
55 2
|
1月前
|
存储 监控 Java
【C++ 并发 线程池】轻松掌握C++线程池:从底层原理到高级应用(二)
【C++ 并发 线程池】轻松掌握C++线程池:从底层原理到高级应用
43 1
|
1月前
|
负载均衡 安全 Java
【C++ 并发 线程池】轻松掌握C++线程池:从底层原理到高级应用(一)
【C++ 并发 线程池】轻松掌握C++线程池:从底层原理到高级应用
57 2
|
16天前
|
存储 Java 数据库连接
java多线程之线程通信
java多线程之线程通信
|
27天前
|
存储 缓存 NoSQL
Redis单线程已经很快了6.0引入多线程
Redis单线程已经很快了6.0引入多线程
31 3
|
30天前
|
消息中间件 安全 Linux
线程同步与IPC:单进程多线程环境下的选择与权衡
线程同步与IPC:单进程多线程环境下的选择与权衡
58 0