剑指JUC原理-20.并发编程实践(中):https://developer.aliyun.com/article/1413700
再换一个例子来详细讲解!
ThreadLocal的值传递需求,往往发生在需要标识一条链路的情景下,例如我们查找日志时,往往携带一个TraceId去查找当时的这次请求链路下的所有日志,但是如果我们的业务代码中使用了线程池,如果不做处理你会发现这个线程池中执行任务的线程打印的日志的TraceId和我们搜索的请求的TraceId并不相同,所以为了定位问题方便我们往往需要保证一个请求的TraceId在异步任务中继续保持一致性,这就涉及了ThreadLocal的值传递。
那么常见的解决该办法的思路呢?
复写submit或者execute方法
一个比较朴素的想法是,在线程池执行任务的时候,把需要传递的值注入进去,因为投放任务的时候是“主线程”做的事情,执行任务是子线程执行的。所以可以这样简单实现:
public class TraceIdTransmitThreadPool extends ThreadPoolTaskExecutor { @Override public void execute(Runnable task) { String traceId = getTraceIdFromContext(); super.execute(()->{ // 其实本质上就是 使用ThreadLocalUtils类将traceId设置到线程的ThreadLocal变量中。 ThreadLocalUtils.set(traceId); try{ task.run(); }finally { ThreadLocalUtils.clear(traceId); } }); } private String getTraceIdFromContext() { return ThreadLocalUtils.get(); } }
利用InheritableThreadLocals
Thread类中除了threadLocals
还有个inheritableThreadLocals
这个ThreadLocalMap的局部变量,这个东西实际作用是什么呢?实际作用是在子线程创建的时候,父线程会把threadLocal拷贝到子线程中。下面我们用一个例子来解释下这个东西的作用
ThreadLocal<String> local = new InheritableThreadLocal<>(); //ThreadLocal<String> local = new ThreadLocal<>(); local.set("hello"); new Thread(() -> { // 仅使用ThreadLocal 这里将取到NUll值 // 使用InheritableThreadLocal 这里将取到主线程设置的线程局部变量 System.out.println("子线程:" + local.get()); }).start(); sleep(1000)
上面的代码输出为
子线程:hello
可以看出来确实主线程中设置的值被带进到子线程中了。下面简单分析下原理,翻开new Thread的构造方法源码时我们会找到下面这行代码:
if (inheritThreadLocals && parent.inheritableThreadLocals != null) this.inheritableThreadLocals = ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);
然后在InheritableThreadLocal
类的实现源码中发现其最主要的就是复写了getMap的实现
ThreadLocalMap getMap(Thread t) { return t.inheritableThreadLocals; }
所以能进行值传递的原因很简单,就是会把父进程的inheritableThreadLocals 进行值拷贝,然后get/set方法在取值的时候不再从Thread类的threadLocals中取值,而是从inheritableThreadLocals取, 但是我们线程池这种环境下面核心线程一般不会频繁的反复销毁重新创建,所以这种方案其实并不适合线程池的环境,此外可能是jdk官方也觉得这种方式设计的不好在jdk9之后就直接拿掉了这个inheritableThreadLocals局部变量
利用TransmittableThreadLocal
TransmittableThreadLocal是一个开源项目,gitee地址位于: gitee.com/mirrors/tra…
在本文开头的时候举了一个例子,现在我们将ThreadLocal更改为TransmittbaleThreadLocal,就可以直接体会到两者的区别,代码如下:
public static void case1() { //ThreadLocal<String> threadLocal = new ThreadLocal<>(); TransmittableThreadLocal<String> threadLocal = new TransmittableThreadLocal<>(); ExecutorService executorService = Executors.newFixedThreadPool(1); // 这里是核心 executorService = TtlExecutors.getTtlExecutorService(executorService); Runnable task1 = () -> { System.out.println("ThreadLocal value in task1: " + threadLocal.get()); threadLocal.set("Task1"); }; Runnable task2 = () -> { System.out.println("ThreadLocal value in task2: " + threadLocal.get()); threadLocal.set("Task2"); }; threadLocal.set("Hello"); executorService.submit(task1); sleep(100); executorService.submit(task2); System.out.println("ThreadLocal value in mainThread: " + threadLocal.get()); executorService.shutdown(); }
最终代码运行的时候如下:
ThreadLocal value in task1: Hello ThreadLocal value in mainThread: Hello ThreadLocal value in task2: Hello
从代码的运行结果可以看出子线程和主线程的线程局部变量的实现了统一,并且很神奇的一点是线程1中执行第一个任务之后对线程局部变量做了修改,丝毫不影响这个线程在执行第二个任务中线程局部变量的值,在执行第二个任务的时候仍然可以取到父线程中的值
那么这个究竟是怎么实现的呢?其实主要就是上面的代码中 executorService = TtlExecutors.getTtlExecutorService(executorService); 这行代码进行了装饰作用
其实TransmittbaleThreadLocal(简称TTL)的源码设计就是一个装饰者设计模式的典型范例
任务修饰:使用TtlRunnable和TtlCallable来修饰传入线程池的Runnable和Callable。
线程池修饰:使用 getTtlExecutorService来包装和修饰接口ExecutorService
我们先从TtlRunnable类开始进行分析,核心也就是看下run方法怎么实现的(这属于框架的基准内容比较重要)
public void run() { /** * capturedRef就是主线程传递下来的ThreadLocal的值。 */ Object captured = capturedRef.get(); if (captured == null || releaseTtlValueReferenceAfterRun && !capturedRef.compareAndSet(captured, null)) { throw new IllegalStateException("TTL value reference is released after run!"); } /** * 1. backup(备份)是子线程已经存在的ThreadLocal变量; * 这也是做到了上面说的一个线程执行两次任务,从父线程中拿到的局部变量值也不会互相影响的关键 * 2. 将captured的ThreadLocal值在子线程中set进去; */ Object backup = replay(captured); try { // 修饰的目标 runnable.run(); } finally { /** * 在子线程任务中,ThreadLocal可能发生变化,该步骤的目的是 * 回滚{@code runnable.run()}进入前的ThreadLocal的线程 */ restore(backup); } }
从上面的代码来看最重要的就是要知道captured这个变量的值到底是怎么get出来的,首先我们要知道从继承路线来看TransmittableThreadLocal 继承了InheritableThreadLocal所以自然有InheritableThreadLocal的全部能力
captured这个变量实际上是从这个capture方法返回的,这个方法返回的快照然后会被传递到replay方法中进行应用。
源码位置:com.alibaba.ttl.TransmittableThreadLocal.Transmitter#capture @NonNull public static Object capture() { return new Snapshot(captureTtlValues(), captureThreadLocalValues()); }
这个Snapshot看名字就知道是个快照,这个快照到底怎么实现的呢
// 抓取 TransmittableThreadLocal 的快照 private static WeakHashMap<TransmittableThreadLocal<Object>, Object> captureTtlValues() { WeakHashMap<TransmittableThreadLocal<Object>, Object> ttl2Value = new WeakHashMap<TransmittableThreadLocal<Object>, Object>(); // 从 TransmittableThreadLocal 的 holder 中,遍历所有有值的 TransmittableThreadLocal,将 TransmittableThreadLocal 取出和值复制到 Map 中。 for (TransmittableThreadLocal<Object> threadLocal : holder.get().keySet()) { ttl2Value.put(threadLocal, threadLocal.copyValue()); } return ttl2Value; } // 抓取注册的 ThreadLocal。 private static WeakHashMap<ThreadLocal<Object>, Object> captureThreadLocalValues() { final WeakHashMap<ThreadLocal<Object>, Object> threadLocal2Value = new WeakHashMap<ThreadLocal<Object>, Object>(); // 从 threadLocalHolder 中,遍历注册的 ThreadLocal,将 ThreadLocal 和 TtlCopier 取出,将值复制到 Map 中。 for (Map.Entry<ThreadLocal<Object>, TtlCopier<Object>> entry : threadLocalHolder.entrySet()) { final ThreadLocal<Object> threadLocal = entry.getKey(); final TtlCopier<Object> copier = entry.getValue(); threadLocal2Value.put(threadLocal, copier.copy(threadLocal.get())); } return threadLocal2Value; }
上面源码的注释中提到了“注册”过程,这个注册行为则发生在TransmittableThreadLocal的get/set方法内部实现中。 现在我们有了快照,但是我们怎么将快照中的数据内容传递到子线程中呢 这就是TtlRunnable类中run方法中调用的replay方法所做的事情了。其实仔细看源码就会知道这个方法的核心目标就是要把快照中的数据给设置到当前线程的上下文中,这样你在子线程中调用get方法才能取到对应的值。
@NonNull public static Object replay(@NonNull Object captured) { final Snapshot capturedSnapshot = (Snapshot) captured; return new Snapshot(replayTtlValues(capturedSnapshot.ttl2Value), replayThreadLocalValues(capturedSnapshot.threadLocal2Value)); } // 重播 TransmittableThreadLocal,并保存执行线程的原值 @NonNull private static WeakHashMap<TransmittableThreadLocal<Object>, Object> replayTtlValues(@NonNull WeakHashMap<TransmittableThreadLocal<Object>, Object> captured) { WeakHashMap<TransmittableThreadLocal<Object>, Object> backup = new WeakHashMap<TransmittableThreadLocal<Object>, Object>(); for (final Iterator<TransmittableThreadLocal<Object>> iterator = holder.get().keySet().iterator(); iterator.hasNext(); ) { TransmittableThreadLocal<Object> threadLocal = iterator.next(); backup.put(threadLocal, threadLocal.get()); if (!captured.containsKey(threadLocal)) { iterator.remove(); threadLocal.superRemove(); } } // 关键: 将 map 中的值,设置到 ThreadLocal 中。 setTtlValuesTo(captured); // TransmittableThreadLocal 的回调方法,在任务执行前执行。 doExecuteCallback(true); return backup; }
所以总结下就是 get/set 方法中完成了TransmittableThreadLocal的注册,然后在执行run方法的时候通过TtlRunnable进行了方法包装,在调用之前进行快照形成,并应用快照到当前线程中,最后在线程执行结束之后,run方法内部对线程局部变量做的修改则会被还原,这也是本节举例中最后三次打印都是一个结果的主要原因
所以TransmittableThreadLocal 就比较适合在多线程环境下作为线程局部变量进行类似traceId这样的参数的传参,此外TransmittableThreadLocal 还支持javaAgent方式启动,这样就不需要在代码中显式的去包装线程池了。
java -javaagent:path/to/transmittable-thread-local-2.x.y.jar \ -cp classes \ com.alibaba.demo.ttl.agent.AgentDemo
模拟高并发
有时候我们写的接口,在低并发的场景下,一点问题都没有。
但如果一旦出现高并发调用,该接口可能会出现一些意想不到的问题。
为了防止类似的事情发生,一般在项目上线前,我们非常有必要对接口做一下压力测试
。
当然,现在已经有比较成熟的压力测试工具,比如:Jmeter
、LoadRunner
等。
如果你觉得下载压测工具比较麻烦,也可以手写一个简单的模拟并发操作的工具,用CountDownLatch
就能实现,例如:
public static void concurrenceTest() { /** * 模拟高并发情况代码 */ final AtomicInteger atomicInteger = new AtomicInteger(0); final CountDownLatch countDownLatch = new CountDownLatch(1000); // 相当于计数器,当所有都准备好了,再一起执行,模仿多并发,保证并发量 final CountDownLatch countDownLatch2 = new CountDownLatch(1000); // 保证所有线程执行完了再打印atomicInteger的值 ExecutorService executorService = Executors.newFixedThreadPool(10); try { for (int i = 0; i < 1000; i++) { executorService.submit(new Runnable() { @Override public void run() { try { countDownLatch.await(); //一直阻塞当前线程,直到计时器的值为0,保证同时并发 } catch (InterruptedException e) { log.error(e.getMessage(),e); } //每个线程增加1000次,每次加1 for (int j = 0; j < 1000; j++) { atomicInteger.incrementAndGet(); } countDownLatch2.countDown(); } }); countDownLatch.countDown(); } countDownLatch2.await();// 保证所有线程执行完 executorService.shutdown(); } catch (Exception e){ log.error(e.getMessage(),e); } }
处理消息队列消息
在高并发的场景中,消息积压问题,可以说如影随形,真的没办法从根本上解决。表面上看,已经解决了,但后面不知道什么时候,就会冒出一次。
参考 《苏三说技术》所举的实际情况
有天下午,产品过来说:有几个商户投诉过来了,他们说菜品有延迟,快查一下原因。
这次问题出现得有点奇怪。
为什么这么说?
首先这个时间点就有点奇怪,平常出问题,不都是中午或者晚上用餐高峰期吗?怎么这次问题出现在下午?
根据以往积累的经验,我直接看了kafka的topic的数据,果然上面消息有积压,但这次每个partition都积压了十几万的消息没有消费,比以往加压的消息数量增加了几百倍。这次消息积压得极不寻常。
我赶紧查服务监控看看消费者挂了没,还好没挂。又查服务日志没有发现异常。这时我有点迷茫,碰运气问了问订单组下午发生了什么事情没?他们说下午有个促销活动,跑了一个JOB批量更新过有些商户的订单信息。
这时,我一下子如梦初醒,是他们在JOB中批量发消息导致的问题。怎么没有通知我们呢?实在太坑了。
虽说知道问题的原因了,倒是眼前积压的这十几万的消息该如何处理呢?
此时,如果直接调大partition数量是不行的,历史消息已经存储到4个固定的partition,只有新增的消息才会到新的partition。我们重点需要处理的是已有的partition。
直接加服务节点也不行,因为kafka允许同组的多个partition被一个consumer消费,但不允许一个partition被同组的多个consumer消费,可能会造成资源浪费。
解释:
在Kafka中,一个消费者组(Consumer Group)可以消费同一个主题(Topic)下的多个分区(Partition),但是同一个分区不能被多个消费者组消费。这是因为在Kafka中,分区是消费者组之间的数据分片单元,每个分区只能被一个消费者组的一个消费者实例消费,以保证数据的一致性和顺序性。如果一个分区被多个消费者组消费,可能会导致以下问题:
- 资源浪费:多个消费者实例会竞争消费同一个分区的消息,可能会导致资源浪费,如CPU、内存等。
- 数据一致性:如果多个消费者实例同时消费同一个分区的消息,可能会导致数据的不一致性。例如,如果一个消费者实例在处理消息时出现故障,其他消费者实例可能会重复处理相同的消息,从而导致数据不一致。
- 性能问题:多个消费者实例同时消费同一个分区的消息,可能会导致网络和磁盘I/O负载增加,从而影响整体性能。
因此,为了保证Kafka系统的稳定性和性能,不建议将同一个分区分配给多个消费者组。在实际应用中,可以根据实际需求和消费者组的数量,合理调整分区数量,以提高系统的并发处理能力和负载能力。
看来只有用多线程处理了。
为了紧急解决问题,我改成了用线程池处理消息,核心线程和最大线程数都配置成了50。
大致用法如下:
- 先定义一个线程池:
@Configuration public class ThreadPoolConfig { @Value("${thread.pool.corePoolSize:5}") private int corePoolSize; @Value("${thread.pool.maxPoolSize:10}") private int maxPoolSize; @Value("${thread.pool.queueCapacity:200}") private int queueCapacity; @Value("${thread.pool.keepAliveSeconds:30}") private int keepAliveSeconds; @Value("${thread.pool.threadNamePrefix:ASYNC_}") private String threadNamePrefix; @Bean("messageExecutor") public Executor messageExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(corePoolSize); executor.setMaxPoolSize(maxPoolSize); executor.setQueueCapacity(queueCapacity); executor.setKeepAliveSeconds(keepAliveSeconds); executor.setThreadNamePrefix(threadNamePrefix); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); executor.initialize(); return executor; } }
- 再定义一个消息的consumer:
@Service public class MyConsumerService { @Autowired private Executor messageExecutor; @KafkaListener(id="test",topics={"topic-test"}) public void listen(String message){ System.out.println("收到消息:" + message); messageExecutor.submit(new MyWork(message); } }
- 在定义的Runable实现类中处理业务逻辑:
public class MyWork implements Runnable { private String message; public MyWork(String message) { this.message = message; } @Override public void run() { System.out.println(message); } }
果然,调整之后消息积压数量确实下降的非常快,大约半小时后,积压的消息就非常顺利的处理完了。
统计数量
剑指JUC原理-10.并发编程大师的原子累加器底层优化原理(与人类的优秀灵魂对话)-CSDN博客
在这里只着重介绍一下 longadder 的 sum操作。
当我们最终获取计数器值时,我们可以使用LongAdder.longValue()
方法,其内部就是使用sum
方法来汇总数据的。
java.util.concurrent.atomic.LongAdder.sum()
:
public long sum() { Cell[] as = cells; Cell a; long sum = base; if (as != null) { for (int i = 0; i < as.length; ++i) { if ((a = as[i]) != null) sum += a.value; } } return sum; }
实现很简单,遍历cells
数组中的值,然后累加.
AtomicLong可以弃用了吗?
看上去LongAdder的性能全面超越了AtomicLong,而且阿里巴巴开发手册也提及到 推荐使用 LongAdder 对象,比 AtomicLong 性能更好(减少乐观 锁的重试次数),但是我们真的就可以舍弃掉LongAdder了吗?
当然不是,我们需要看场景来使用,如果是并发不太高的系统,使用AtomicLong可能会更好一些,而且内存需求也会小一些。
我们看过sum()方法后可以知道LongAdder在统计的时候如果有并发更新,可能导致统计的数据有误差。
而在高并发统计计数的场景下,才更适合使用LongAdder。
总结
LongAdder
中最核心的思想就是利用空间来换时间,将热点value
分散成一个Cell列表来承接并发的CAS,以此来提升性能。
LongAdder
的原理及实现都很简单,但其设计的思想值得我们品味和学习。
参考文章
[1] 聊聊并发编程的12种业务场景 - 掘金 (juejin.cn)
[3] 剑指JUC原理-15.ThreadLocal-CSDN博客
[5] ThreadLocal项目实战-TraceId日志 - 掘金 (juejin.cn)
[6] 一文读懂Java中的过滤器和拦截器:实例详解,逐步掌握 - 掘金 (juejin.cn)
[7] 从ThreadLocal到TransmittableThreadLocal,彻底学透ThreadLocal的设计 - 掘金 (juejin.cn)