Spring Cloud中Hystrix 线程隔离导致ThreadLocal数据丢失(续)

本文涉及的产品
网络型负载均衡 NLB,每月750个小时 15LCU
应用型负载均衡 ALB,每月750个小时 15LCU
简介: Spring Cloud中Hystrix 线程隔离导致ThreadLocal数据丢失(续)

前言

上篇文《Spring Cloud中Hystrix 线程隔离导致ThreadLocal数据丢失》我们对ThreadLocal数据丢失进行了详细的分析,并通过代码的方式复现了这个问题。

在上篇文章的末尾我也说了思路给大家提供了,如果需要能够在Hystrix 为线程隔离模式也能正确传递数据的话,需要我们自己去修改。

我这边以Zuul中自定义负载均衡策略来进行讲解,在Zuul中需要实现灰度发布的功能,需要在Filter中将请求的用户信息传递到自定的负载策略中,Zuul中整合了Hystrix,从Zuul Filter的请求到Ribbon的策略类中,线程已经发生了变化,变成了Hystrix提供的线程池来执行(配置隔离模式为线程)。这个时用ThreadLocal就会出问题了,数据传输会错乱。也就是我们前面分析的问题。

关于修改我说下自己分析问题的一些思路,ransmittable-thread-local可以解决这个问题,可以对线程或者线程池进行修饰,其实最终的原理就是对线程进行包装,在线程run之前和之后做一些处理来保证数据的正确传递。

改造思路

首先我想的就是改掉Hystrix中的线程池或者线程,只有这样才能让ransmittable-thread-local来接管线程中数据的传递。

通过调试的方式找到com.netflix.hystrix.HystrixThreadPool是Hystrix线程池的接口,里面定义了一个获取ExecutorService方法,代码如下:

public interface HystrixThreadPool {
    /**
     * Implementation of {@link ThreadPoolExecutor}.
     *
     * @return ThreadPoolExecutor
     */
    public ExecutorService getExecutor();
}

通过查找接口的实现类,发现只有一个默认的实现com.netflix.hystrix.HystrixThreadPool.HystrixThreadPoolDefault,实现也在接口中,是一个静态类。实现的方法如下:

@Override
 public ThreadPoolExecutor getExecutor() {
     touchConfig();
     return threadPool;
 }

threadPool是类中的一个变量,主要是通过touchConfig方法来设置线程的参数,touchConfig代码如下:

private void touchConfig() {
      final int dynamicCoreSize = properties.coreSize().get();
      final int configuredMaximumSize = properties.maximumSize().get();
      int dynamicMaximumSize = properties.actualMaximumSize();
      final boolean allowSizesToDiverge = properties.getAllowMaximumSizeToDivergeFromCoreSize().get();
      boolean maxTooLow = false;
      if (allowSizesToDiverge && configuredMaximumSize < dynamicCoreSize) {
          //if user sets maximum < core (or defaults get us there), we need to maintain invariant of core <= maximum
          dynamicMaximumSize = dynamicCoreSize;
          maxTooLow = true;
      }
      //......
}

这是最外层获取线程池的地方,可以根据代码一步步进去看,最终获取线程池的代码在com.netflix.hystrix.strategy.concurrency.HystrixConcurrencyStrategy.getThreadPool方法中。

上面是线程池的源码分析,我们可以改造源码,将线程池用ransmittable-thread-local进行修饰。

改造线程方式

另外一种是改造线程的方式,在Hystrix将命令丢入线程池的时候对线程进行修饰也可以解决此问题,因为ransmittable-thread-local对线程池进行修饰,其原理也是改造了线程,通过源码可以看出:

public static ExecutorService getTtlExecutorService(ExecutorService executorService) {
        if (executorService == null || executorService instanceof ExecutorServiceTtlWrapper) {
            return executorService;
        }
        return new ExecutorServiceTtlWrapper(executorService);
}
class ExecutorServiceTtlWrapper extends ExecutorTtlWrapper implements ExecutorService {
    private final ExecutorService executorService;
    ExecutorServiceTtlWrapper(ExecutorService executorService) {
        super(executorService);
        this.executorService = executorService;
    }
    @Override
    public <T> Future<T> submit(Callable<T> task) {
        return executorService.submit(TtlCallable.get(task));
    }
    @Override
    public <T> Future<T> submit(Runnable task, T result) {
        return executorService.submit(TtlRunnable.get(task), result);
    }
    @Override
    public Future<?> submit(Runnable task) {
        return executorService.submit(TtlRunnable.get(task));
    }
    // ...........
}

重点在TtlRunnable.get()

改造Hystrix中线程的方式,可以通过HystrixContextScheduler进行入手,Hystrix通过HystrixContextScheduler的ThreadPoolScheduler把命令submit到ThreadPoolExecutor中去执行。

通过上面的分析,最终可以定位到提交命令的代码如下:

 private static class ThreadPoolWorker extends Worker {
        private final HystrixThreadPool threadPool;
        private final CompositeSubscription subscription = new CompositeSubscription();
        private final Func0<Boolean> shouldInterruptThread;
        public ThreadPoolWorker(HystrixThreadPool threadPool, Func0<Boolean> shouldInterruptThread) {
            this.threadPool = threadPool;
            this.shouldInterruptThread = shouldInterruptThread;
        }
        @Override
        public void unsubscribe() {
            subscription.unsubscribe();
        }
        @Override
        public boolean isUnsubscribed() {
            return subscription.isUnsubscribed();
        }
        @Override
        public Subscription schedule(final Action0 action) {
            if (subscription.isUnsubscribed()) {
                // don't schedule, we are unsubscribed
                return Subscriptions.unsubscribed();
            }
            // This is internal RxJava API but it is too useful.
            ScheduledAction sa = new ScheduledAction(action);
            subscription.add(sa);
            sa.addParent(subscription);
            ThreadPoolExecutor executor = (ThreadPoolExecutor) threadPool.getExecutor();
            FutureTask<?> f = (FutureTask<?>) executor.submit(sa);
            sa.add(new FutureCompleterWithConfigurableInterrupt(f, shouldInterruptThread, executor));
            return sa;
        }
        @Override
        public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
            throw new IllegalStateException("Hystrix does not support delayed scheduling");
        }
}

核心代码在schedule方法中,只需要将schedule中的sa进行修饰即可。

改造后的代码如下:

public Subscription schedule(final Action0 action) {
     if (subscription.isUnsubscribed()) {
            // don't schedule, we are unsubscribed
            return Subscriptions.unsubscribed();
     }
     // This is internal RxJava API but it is too useful.
     ScheduledAction sa = new ScheduledAction(action);
     subscription.add(sa);
     sa.addParent(subscription);
     ThreadPoolExecutor executor = (ThreadPoolExecutor) threadPool.getExecutor();
     FutureTask<?> f = (FutureTask<?>) executor.submit(TtlRunnable.get(sa));
     sa.add(new FutureCompleterWithConfigurableInterrupt(f, shouldInterruptThread, executor));
     return sa;
}

改源码还涉及到重新打包等问题,每个项目都得用修改后的jar包,比较麻烦,最简单的做法就是在项目中建一个同样的HystrixContextScheduler类,包名也要和之前一样,让jvm优先加载,这样就能用这个修改的类来代替Hystrix原始的类。

最后我们来验证下这样的改动是否正确,首先我们在Zuul的Filter中进行值的传递:

RibbonFilterContextHolder是基于InheritableThreadLocal做的值传递,代码如下:

public class RibbonFilterContextHolder {
    private static final ThreadLocal<RibbonFilterContext> contextHolder = new InheritableThreadLocal<RibbonFilterContext>() {
        @Override
        protected RibbonFilterContext initialValue() {
            return new DefaultRibbonFilterContext();
        }
    };
    public static RibbonFilterContext getCurrentContext() {
        return contextHolder.get();
    }
    public static void clearCurrentContext() {
        contextHolder.remove();
    }
}

完整源码请参考:

https://github.com/yinjihuan/spring-cloud/blob/master/fangjia-common/src/main/java/com/fangjia/common/support/RibbonFilterContextHolder.java

private static AtomicInteger ac = new AtomicInteger();
   @Override
   public Object run() {
       RequestContext ctx = RequestContext.getCurrentContext();
       RibbonFilterContextHolder.getCurrentContext().add("servers",ac.addAndGet(1)+"");
       return null;
   }

通过AtomicInteger 进行数字的累加操作,后面测试的时候用10个线程并发测试,如如果在Ribbon的自定义负载策略中接收的值是0-9的话表示正确,否则错误。

接下来定义一个负载策略类,输出接收的值:

public class GrayPushRule extends AbstractLoadBalancerRule {
    private AtomicInteger nextServerCyclicCounter;
    private static final boolean AVAILABLE_ONLY_SERVERS = true;
    private static final boolean ALL_SERVERS = false;
    private static Logger log = LoggerFactory.getLogger(RoundRobinRule.class);
    public GrayPushRule() {
        this.nextServerCyclicCounter = new AtomicInteger(0);
    }
    public GrayPushRule(ILoadBalancer lb) {
        this();
        this.setLoadBalancer(lb);
    }
    public Server choose(ILoadBalancer lb, Object key) {
        String servers = RibbonFilterContextHolder.getCurrentContext().get("servers");
        System.out.println(Thread.currentThread().getName()+":"+servers);  
        return null;
    }
    public Server choose(Object key) {
        return this.choose(this.getLoadBalancer(), key);
    }
    public void initWithNiwsConfig(IClientConfig clientConfig) {
    }
}

然后增加配置,使用自定义的策略,还需要将Hystrix的线程池数量改小一点,这样才可以线程复用

fsh-house.ribbon.NFLoadBalancerRuleClassName=com.fangjia.fsh.api.rule.GrayPushRule
# 线程隔离模式
zuul.ribbon-isolation-strategy=thread
hystrix.threadpool.default.coreSize=3

启动服务,用ab进行测试:

ab -n 10 -c 10 http://192.168.10.170:2103/fsh-house/house/1

输出结果如下:

hystrix-RibbonCommand-3:10
hystrix-RibbonCommand-2:3
hystrix-RibbonCommand-1:8
hystrix-RibbonCommand-3:10
hystrix-RibbonCommand-2:3
hystrix-RibbonCommand-1:8
hystrix-RibbonCommand-3:10
hystrix-RibbonCommand-2:3
hystrix-RibbonCommand-1:8
hystrix-RibbonCommand-3:10

很多数据都重复了,这就是线程复用导致的问题,接下来我们用上面讲的方式进行改造

需要将RibbonFilterContextHolder中的InheritableThreadLocal改成TransmittableThreadLocal

private static final TransmittableThreadLocal<RibbonFilterContext> contextHolder = new TransmittableThreadLocal<RibbonFilterContext>() {
    @Override
    protected RibbonFilterContext initialValue() {
        return new DefaultRibbonFilterContext();
    }
};

然后在项目中新建一个HystrixContextScheduler类,包名必须是com.netflix.hystrix.strategy.concurrency,代码就按上面贴的进行改,主要是对线程进行修饰:

FutureTask<?> f = (FutureTask<?>) executor.submit(TtlRunnable.get(sa));

再次启动服务,进行测试,结果如下:

hystrix-RibbonCommand-2:10
hystrix-RibbonCommand-1:1
hystrix-RibbonCommand-3:7
hystrix-RibbonCommand-3:8
hystrix-RibbonCommand-1:2
hystrix-RibbonCommand-2:4
hystrix-RibbonCommand-3:5
hystrix-RibbonCommand-1:9
hystrix-RibbonCommand-2:3
hystrix-RibbonCommand-3:6

现在的结果已经是正确的

改造线程池方式

上面介绍了改造线程的方式,并且通过建一个同样的Java类来覆盖Jar包中的实现,感觉有点投机取巧,其实不用这么麻烦,Hystrix默认提供了HystrixPlugins类,可以让用户自定义线程池,下面来看看怎么使用:

在启动之前调用进行注册自定义实现的逻辑:

HystrixPlugins.getInstance().registerConcurrencyStrategy(new ThreadLocalHystrixConcurrencyStrategy());

ThreadLocalHystrixConcurrencyStrategy就是我们自定义的创建线程池的类,需要继承HystrixConcurrencyStrategy,前面也有讲到通过调试代码发现最终获取线程池的代码就在HystrixConcurrencyStrategy中。

我们只需要重写getThreadPool方法即可完成对线程池的改造,由于TtlExecutors只能修饰ExecutorService和Executor,而HystrixConcurrencyStrategy中返回的是ThreadPoolExecutor,我们需要对ThreadPoolExecutor进行包装一层,最终在execute方法中对线程修饰,也就相当于改造了线程池。

public class ThreadLocalHystrixConcurrencyStrategy extends HystrixConcurrencyStrategy {
    private final static Logger logger = LoggerFactory.getLogger(ThreadLocalHystrixConcurrencyStrategy.class);
    @Override
    public ThreadPoolExecutor getThreadPool(HystrixThreadPoolKey threadPoolKey, HystrixProperty<Integer> corePoolSize,
            HystrixProperty<Integer> maximumPoolSize, HystrixProperty<Integer> keepAliveTime, TimeUnit unit,
            BlockingQueue<Runnable> workQueue) {
        return this.doGetThreadPool(threadPoolKey, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }
    @Override
    public ThreadPoolExecutor getThreadPool(HystrixThreadPoolKey threadPoolKey,
            HystrixThreadPoolProperties threadPoolProperties) {
        return this.doGetThreadPool(threadPoolKey, threadPoolProperties);
    }
}

在doGetThreadPool方法中就返回包装的线程池,代码如下:

return new ThreadLocalThreadPoolExecutor(dynamicCoreSize, dynamicMaximumSize, keepAliveTime.get(), unit, workQueue,
                    threadFactory);

最后就是ThreadLocalThreadPoolExecutor的代码:

public class ThreadLocalThreadPoolExecutor extends ThreadPoolExecutor {
    private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
    public ThreadLocalThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
            BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }
    public ThreadLocalThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
            BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler);
    }
    @Override
    public void execute(Runnable command) {
        super.execute(TtlRunnable.get(command));
    }
}
相关实践学习
SLB负载均衡实践
本场景通过使用阿里云负载均衡 SLB 以及对负载均衡 SLB 后端服务器 ECS 的权重进行修改,快速解决服务器响应速度慢的问题
负载均衡入门与产品使用指南
负载均衡(Server Load Balancer)是对多台云服务器进行流量分发的负载均衡服务,可以通过流量分发扩展应用系统对外的服务能力,通过消除单点故障提升应用系统的可用性。 本课程主要介绍负载均衡的相关技术以及阿里云负载均衡产品的使用方法。
目录
相关文章
|
4月前
|
Java UED 开发者
Spring Boot 降级功能的神秘面纱:Hystrix 与 Resilience4j 究竟藏着怎样的秘密?
【8月更文挑战第29天】在分布式系统中,服务稳定性至关重要。为应对故障,Spring Boot 提供了 Hystrix 和 Resilience4j 两种降级工具。Hystrix 作为 Netflix 的容错框架,通过隔离依赖、控制并发及降级机制增强系统稳定性;Resilience4j 则是一个轻量级库,提供丰富的降级策略。两者均可有效提升系统可靠性,具体选择取决于需求与场景。在面对服务故障时,合理运用这些工具能确保系统基本功能正常运作,优化用户体验。以上简介包括了两个工具的简单示例代码,帮助开发者更好地理解和应用。
81 0
|
3月前
|
Java Spring
spring多线程实现+合理设置最大线程数和核心线程数
本文介绍了手动设置线程池时的最大线程数和核心线程数配置方法,建议根据CPU核数及程序类型(CPU密集型或IO密集型)来合理设定。对于IO密集型,核心线程数设为CPU核数的两倍;CPU密集型则设为CPU核数加一。此外,还讨论了`maxPoolSize`、`keepAliveTime`、`allowCoreThreadTimeout`和`queueCapacity`等参数的设置策略,以确保线程池高效稳定运行。
281 10
spring多线程实现+合理设置最大线程数和核心线程数
|
3月前
|
XML 监控 Java
Spring Cloud全解析:熔断之Hystrix简介
Hystrix 是由 Netflix 开源的延迟和容错库,用于提高分布式系统的弹性。它通过断路器模式、资源隔离、服务降级及限流等机制防止服务雪崩。Hystrix 基于命令模式,通过 `HystrixCommand` 封装对外部依赖的调用逻辑。断路器能在依赖服务故障时快速返回备选响应,避免长时间等待。此外,Hystrix 还提供了监控功能,能够实时监控运行指标和配置变化。依赖管理方面,可通过 `@EnableHystrix` 启用 Hystrix 支持,并配置全局或局部的降级策略。结合 Feign 可实现客户端的服务降级。
180 23
|
5月前
|
资源调度 Java 调度
Spring Cloud Alibaba 集成分布式定时任务调度功能
Spring Cloud Alibaba 发布了 Scheduling 任务调度模块 [#3732]提供了一套开源、轻量级、高可用的定时任务解决方案,帮助您快速开发微服务体系下的分布式定时任务。
14989 29
|
5月前
|
负载均衡 Java Spring
Spring cloud gateway 如何在路由时进行负载均衡
Spring cloud gateway 如何在路由时进行负载均衡
515 15
|
5月前
|
Java Spring
spring boot 中默认最大线程连接数,线程池数配置查看
spring boot 中默认最大线程连接数,线程池数配置查看
360 4
|
5月前
|
Java Spring
spring cloud gateway在使用 zookeeper 注册中心时,配置https 进行服务转发
spring cloud gateway在使用 zookeeper 注册中心时,配置https 进行服务转发
115 3
|
5月前
|
Java Spring 容器
Spring boot 自定义ThreadPoolTaskExecutor 线程池并进行异步操作
Spring boot 自定义ThreadPoolTaskExecutor 线程池并进行异步操作
236 3
|
4月前
|
安全 Java C#
Spring创建的单例对象,存在线程安全问题吗?
Spring框架提供了多种Bean作用域,包括单例(Singleton)、原型(Prototype)、请求(Request)、会话(Session)、全局会话(GlobalSession)等。单例是默认作用域,保证每个Spring容器中只有一个Bean实例;原型作用域则每次请求都会创建一个新的Bean实例;请求和会话作用域分别与HTTP请求和会话绑定,在Web应用中有效。 单例Bean在多线程环境中可能面临线程安全问题,Spring容器虽然确保Bean的创建过程是线程安全的,但Bean的使用安全性需开发者自行保证。保持Bean无状态是最简单的线程安全策略;