分布式熔断、限流与服务保护:深入 Hystrix 原理及使用

简介: Hystrix 原理及使用

本文不仅仅会讲述 Hystrix 如何使用,还会深入讲解其实现原理。适合读者:任何阶段的 Java 程序猿。

Hystrix 简介:Hystrix 是 Netflix 开源的一款容错系统,能帮助使用者码出具备强大的容错能力和鲁棒性的程序。Hystrix 具备拥有回退机制和断路器功能的线程和信号隔离,请求缓存和请求打包(request collapsing,即自动批处理,译者注),以及监控和配置等功能。

Hystrix 源于 Netflix API 团队在 2011 年启动的弹性工程工作,而目前它在 Netflix 每天处理着数百亿的隔离线程以及数千亿的隔离信号调用。Hystrix 是基于 Apache License 2.0 协议的开源的程序库,目前托管在 GitHub 上。

学习的过程笔者习惯首先将项目跑起来,然后再去看其原理。下面第一节将先带大家看一下hystrix的使用。之前讲了一篇chat: Spring Boot 源码深入分析。下面首先看一下hystrix在springboot中如何使用。

集成到springboot

前提:你要先搭建一个简单的springboot工程

添加依赖

 <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <version>1.3.6.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>com.netflix.hystrix</groupId>
            <artifactId>hystrix-core</artifactId>
            <version>1.3.16</version>
        </dependency>
        <dependency>
            <groupId>com.netflix.hystrix</groupId>
            <artifactId>hystrix-metrics-event-stream</artifactId>
            <version>1.3.16</version>
        </dependency>
        <dependency>
            <groupId>com.netflix.hystrix</groupId>
            <artifactId>hystrix-javanica</artifactId>
            <version>1.3.16</version>
        </dependency>

配置HystrixConfiguration

package com.lbl.springBootDemo;

import org.springframework.boot.context.embedded.ServletRegistrationBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;  

import com.netflix.hystrix.contrib.javanica.aop.aspectj.HystrixCommandAspect;  
import com.netflix.hystrix.contrib.metrics.eventstream.HystrixMetricsStreamServlet;  


@Configuration  
public class HystrixConfiguration {  

    //注册Hystrix框架的拦截器,使得框架的注解能够生效  
    @Bean  
    public HystrixCommandAspect hystrixAspect() {  
        return new HystrixCommandAspect();  
    }  

    //注入servlet,拦截url="/hystrix.stream"  
    //测试git dev commit  
    @Bean  
    public ServletRegistrationBean hystrixMetricsStreamServlet() {
        System.out.print("++++++++++++++***************");
        ServletRegistrationBean registration = new ServletRegistrationBean(new HystrixMetricsStreamServlet());  
        registration.addUrlMappings("/hystrix.stream");  
        return registration;  
    }  
}  

编写controller类,并添加Hystrix

我们这里配置的超时时间为100ms,然后ok方法随机sleep 0-200ms

package com.lbl.springBootDemo;

import com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand;
import com.netflix.hystrix.contrib.javanica.annotation.HystrixProperty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

import java.util.Random;
import java.util.concurrent.TimeUnit;  

@RestController
@RequestMapping(value="/hystrix")
public class HystrixController {

    private static final Logger LOG = LoggerFactory.getLogger(HystrixController.class);

    @RequestMapping(value = "/ok", method = RequestMethod.GET)
    @HystrixCommand(fallbackMethod = "okFallback", commandProperties = {
                    @HystrixProperty(name = "execution.isolation.thread.timeoutInMilliseconds", value = "100"),
                    @HystrixProperty(name = "circuitBreaker.requestVolumeThreshold", value = "50")

    })
    public String ok() throws Exception {
        int l = new Random().nextInt(200);  
        LOG.info(String.format("l=%s", l));  
        TimeUnit.MILLISECONDS.sleep(l);  
        return "ok";  
    }  

    public String okFallback(Throwable e) {  
        System.out.println("execute okFallback!" + e.getMessage());  
        LOG.error("error", e);  
        return "fallback";  
    }
    public String okFallback() {
        return "fallbackssssss";
    }

}  

启动应用,访问http://127.0.0.1:8000/hello/hystrix/ok,可以看到,有时返回ok,有时返回fallbackssssss

注意:熔断处理方法(如上okFallback方法)的方法参数要与原方法保持一致,否则会报找不到fallbackMethod异常。

部署hystrixdashboard监控

  • 下载hystrix-dashboard-1.5.9.war
  • 将其部署到Tomcat中,启动。访问http://127.0.0.1:8080/hystrix-dashboard-1.5.9结果如下图:

enter image description here

然后将刚才的应用添加到监控中:

enter image description here

部署成功后的截图:

enter image description here

用jmeter进行压力测试

  • 设置jmeter50个线程,循环20次
  • 配置http请求http://127.0.0.1:8000/hello/hystrix/ok

enter image description here

结果如下:

没有熔断时:

enter image description here

使用jmeter请求,导致熔断时:

enter image description here

过一段时间,你会发现,又变成非熔断状态了

上面讲了在springboot中的应用。可能你的公司并没有使用springboot,那么接下来看一下如何集成到SpringMVC。

集成到springMVC

添加依赖

<dependency>
            <groupId>com.netflix.hystrix</groupId>
            <artifactId>hystrix-core</artifactId>
            <version>1.5.12</version>
        </dependency>
        <dependency>
            <groupId>com.netflix.hystrix</groupId>
            <artifactId>hystrix-metrics-event-stream</artifactId>
            <version>1.5.12</version>
        </dependency>
        <dependency>
            <groupId>com.netflix.hystrix</groupId>
            <artifactId>hystrix-javanica</artifactId>
            <version>1.5.12</version>
        </dependency>
        <dependency>
            <groupId>com.netflix.hystrix</groupId>
            <artifactId>hystrix-servo-metrics-publisher</artifactId>
            <version>1.5.12</version>
        </dependency>

引入Hystrix Aspect

<aop:aspectj-autoproxy/>
<bean id="hystrixAspect" class="com.netflix.hystrix.contrib.javanica.aop.aspectj.HystrixCommandAspect"></bean>
<context:component-scan base-package="com.***.***"/>
<context:annotation-config/>

web.xml中添加如下servlet:

<servlet>
    <description>Hystrix</description>
    <display-name>HystrixMetricsStreamServlet</display-name>
    <servlet-name>HystrixMetricsStreamServlet</servlet-name>
    <servlet-class>com.netflix.hystrix.contrib.metrics.eventstream.HystrixMetricsStreamServlet</servlet-class>
</servlet>
<servlet-mapping>
    <servlet-name>HystrixMetricsStreamServlet</servlet-name>
    <url-pattern>/hystrix.stream</url-pattern>
</servlet-mapping>

HystrixController同样适用上面的实例

启动项目运行即可。运行结果与上面springboot相同。

这里没有配置到监控hystrixdashboard。请读者自行配置吧, 与上面讲的一样的。

注解

上面的实例中使用了注解@HystrixCommand。hystrix可是使用代码方式(下面会将),也可是使用注解方式,在实际开发工作中,根据情况选用。笔者本人习惯使用注解,这也是Java体系惯例,使用简单方便。 详细的注解文档请参考官方文档,详见这里

hystrix是什么

上面快速让大家看了一下hystrix的使用,那么接下来分析一下它的实现原理,只有懂得其原理,才能更好的发挥hystrix的能力。

Hystrix是Netflix开源的一款容错系统,能帮助使用者写出具备强大的容错能力和鲁棒性的程序。

在分布式环境中,不可避免地有许多服务依赖将失败,尤其现在流行的微服务。 Hystrix是一个库,可以通过线程隔离、熔断、服务降级等措施来帮助您控制这些分布式服务之间的交互。

Hystrix可以做到以下事情:

  • 通过控制延迟和故障来保障第三方服务调用的可靠性
  • 在复杂的分布式系统中防止级联故障,防止雪崩
  • 快速失败、快速恢复
  • 优雅降级
  • 提供近时时监控、报警和操作控制

为什么用Hystrix?什么情况下用?

分布式系统中,或者说微服务,各个系统错综复杂,一个系统依赖的服务比较多,而且会有多级依赖。当其中某一个服务出现问题,在高并发的情况下都有可能导致整个系统的瘫痪,蝴蝶效应在这里表现明显。

也许你会问为什么会这样?如上图,假如服务I出现较严重延迟,这时上层应用访问量tps比较大时, 首先上层应用资源会被占满,并且一般网络请求(http/rpc)都有重试机制,服务I的压力会更大,严重时则会导致应用宕机。

enter image description here

hystrix 工作流程

首先看一下官网的流程图:

这张图已经完美的诠释了hystrix的工作流程。

下面详细解释一下

HystrixCommand 和 HystrixObservableCommand

上图中的 1

要想使用hystrix,只需要继承HystrixCommand或HystrixObservableCommand。

两者主要区别是:

  • 前者的命令逻辑写在run();后者的命令逻辑写在construct()
  • 前者的run()是由新创建的线程执行;后者的construct()是由调用程序线程执行
  • 前者一个实例只能向调用程序发送(emit)单条数据;后者一个实例可以顺序发送多条数据(onNext)

Command的执行

上图中的 2

execute()、queue()、observe()、toObservable()这4个方法用来触发执行run()/construct(),一个实例只能执行一次这4个方法,特别说明的是HystrixObservableCommand没有execute()和queue()。

4个方法的主要区别是:

  • execute():以同步堵塞方式执行run()。
  • queue():以异步非堵塞方式执行run(),类似于java里的future
  • observe():事件注册前执行run()/construct()。事件注册前,先调用observe()自动触发执行run()/construct()(如果继承的是HystrixCommand,hystrix将创建新线程非堵塞执行run();如果继承的是HystrixObservableCommand,将以调用程序线程堵塞执行construct()),第二步是从observe()返回后调用程序调用subscribe()完成事件注册,如果run()/construct()执行成功则触发onNext()和onCompleted(),如果执行异常则触发onError()。
  • toObservable():事件注册后执行run()/construct()。第一步是事件注册前,一调用toObservable()就直接返回一个Observable<String>对象,第二步调用subscribe()完成事件注册后自动触发执行run()/construct()(如果继承的是HystrixCommand,hystrix将创建新线程非堵塞执行run(),调用程序不必等待run();如果继承的是HystrixObservableCommand,将以调用程序线程堵塞执行construct(),调用程序等待construct()执行完才能继续往下走),如果run()/construct()执行成功则触发onNext()和onCompleted(),如果执行异常则触发onError()。

另外需要说明的是,这四个方法,最终都是调用toObservable,从上图中也可以看出。

判断缓存

上图中的 3

如果有缓存,则直接从缓存中取,如果没有,则继续发送请求。

是否熔断

上图中的4

这里包含两层含义:配置了强制熔断;由于error或timeout超过阈值导致熔断。

线程池、信号量、队列满了

上图中的5

容器满了自然需要执行fallback了。

真正执行HystrixObservableCommand.construct() or HystrixCommand.run()

上图中的6

需要注意:没有办法强制停止线程工作,最好解决办法是抛出InterruptedException异常。如果被Command包装的功能代码没有抛出InterruptedException异常,即使出现了timeOut,该线程也会继续工作(和http请求超时类似),这样虽然可以熔断,但是其线程资源还是占用的,并没有真正释放资源。大多数httpclient并没有处理InterruptedException异常,所以要正确配置http客户端的链接和超时时间。

计算系统健康值

上图中的7

根据配置的规则计算是否需要熔断。

服务隔离

enter image description here

服务隔离有两个重要的好处:

  • 我们作为服务消费者,去访问不同的外部服务,如果其中一个服务不稳定,有可能导致线程池或者http请求等资源被过多的占用,导致整个系统垮掉(雪崩)。而我们通过对不同的服务的请求进行隔离,就可以做到互补影响,如上图中依赖A如果异常,不会影响到依赖。
  • 我们作为服务的提供方,我们可以动态调整外部服务的访问情况。假如有A/B两个外部调用,在某个大促期间,B是P0级别的服务,必须保证可用,但是A允许降级。那么我们作为服务提供方,就可以将外部调用隔离开来,A的请求降级,保证B的请求。

hystrix可以做到哪些事情

  • 服务隔离
  • 降级
  • 熔断
  • 限流
  • 请求合并
  • 请求缓存
  • ......

滑动时间窗口

滑动时间窗口使hystrix的核心。Hystrix的Metrics中保存了当前服务的健康状况,包括服务调用总次数和服务调用失败次数等。根据Metrics的计数,熔断器从而能计算出当前服务的调用失败率,用来和设定的阈值比较从而决定熔断器的状态切换逻辑,因此Metrics的实现非常重要。

Hystrix在这些版本中开始使用RxJava的Observable.window()实现滑动窗口。

hystrix中Metrics的配置:

  • metrics.rollingStats.timeInMilliseconds

此属性设置滚动统计窗口分为的桶数。默认10秒。假如设置为10秒,每秒1个桶:

  • metrics.rollingStats.numBuckets

此属性设置滚动统计窗口分为的桶数。注意metrics.rollingStats.timeInMilliseconds % metrics.rollingStats.numBuckets == 0必须成立。默认10个。

自定义值HystrixCommandProperties.Setter().withMetricsRollingStatisticalWindowBuckets(int value)

  • metrics.rollingPercentile.enabled

更多配置请参考 HystrixConfiguration

关于rxjava以及滑动时间窗口的概念超出了本文的范畴,给读者提供一个比较好的文章:Hystrix 1.5 滑动窗口实现原理总结

编写你的第一个helloword程序

要实现hystrix只需要继承HystrixCommand或HystrixObservableCommand即可

public class CommandHelloWorld extends HystrixCommand<String> {

    private final String name;

    public HelloWorldCommand(String name) {
        //最少配置:指定命令组名(CommandGroup)  
        //super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"));

/**
         * 这里可以设置GroupKey/CommandKey/ThreadPoolKey.  其中ThreadPoolKey比较重要,后面会讲到
         * 可以
         */
        super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"))
                .andCommandKey(HystrixCommandKey.Factory.asKey("HelloWorld"))
                .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("HelloWorldPool")));

   this.name = name;
    }

    @Override
    protected String run() {
        // 依赖逻辑封装在run()方法中
        return "Hello " + name + " thread:" + Thread.currentThread().getName();
    }
}

测试代码

@Test
        public void test1() {
            HelloWorldCommand helloWorldCommand = new HelloWorldCommand("Synchronous-hystrix");
            //使用execute()同步调用代码,效果等同于:helloWorldCommand.queue().get();可以查看其源代码
            String result = helloWorldCommand.execute();
            System.out.println("result=" + result);

            /**
             * 每个Command对象只能调用一次,不可以重复调用,
             * 重复调用对应异常信息:This instance can only be executed once. Please instantiate a new instance.
             */
//        System.out.println("result=" + helloWorldCommand.execute());
        }

执行结果:

result=Hello Synchronous-hystrix thread:hystrix-ExampleGroup-1

要想异步执行:

@Test
        public void test2() throws ExecutionException, InterruptedException {
            Future<String> future = new HelloWorldCommand("Synchronous-hystrix").queue();
            String result = future.get();
            System.out.println("result=" + result);
        }

执行结果:

com.lbl.springBootDemo.hystrix.HelloWorldCommand.run
result=Hello Synchronous-hystrix thread:hystrix-ExampleGroup-1

通过observe阻塞与非阻塞执行

@Test
        public void test3() throws ExecutionException, InterruptedException {
            Observable<String> observe = new HelloWorldCommand("Synchronous-hystrix").observe();

            //block
            String single = observe.toBlocking().single();
            System.out.println("result=" + single);

            // non-blocking
//            observe.subscribe(new Observer<String>() {
//                    // 在onNext/onError完成之后最后回调
//                @Override
//                public void onCompleted() {
//                    System.out.println("onCompleted: ");
//                }
//
//                @Override
//                public void onError(Throwable e) {
//                   System.out.println("onError " + e.getMessage());
//                    e.printStackTrace();
//                }
//
//                @Override
//                public void onNext(String v) {// 获取结果后回调
//                    System.out.println("onNext: " + v);
//                }
//            });

            //非阻塞,忽略error和completed。相当于只执行onNext
//            observe.subscribe(s -> System.out.println("onNext1111: " + s));
        }

请读者自行运行上面代码。

异常处理

这个比较简单,直接贴代码:

可以使用getFallback进行降级处理,返回兜底数据等。

package com.lbl.hystrix;

import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;
import com.netflix.hystrix.HystrixCommandProperties;

import java.util.Random;
import java.util.concurrent.TimeUnit;

/**
 * Fallback
 */
public class FallbackCommand extends HystrixCommand<String> {
    private final String name;

    public FallbackCommand(String name) {
        super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"));
        this.name = name;
    }

    /**
     * 程序执行异常时可以通过getFallback进行降级操作
     * @return
     */
    @Override
    protected String getFallback() {
        return "exeucute Falled!";
    }

    @Override
    protected String run() throws Exception {
        int i = 1/0;
        return "Hello " + name + " thread:" + Thread.currentThread().getName();
    }

    public static void main(String[] args) throws Exception {
        System.out.println(new FallbackCommand("test-Fallback").execute());
    }
}

请读者自行执行代码。

请求结果缓存:Request Cache

有一些请求操作比较频繁,但是这些数据是基本不变的。流行的解决办法就是使用redis等第三方缓存来处理。

这里我们采用hystrix来实现方法级别的缓存,可以将相同参数的请求直接返回cache数据。注意:相同参数的请求才可以被缓存

实现起来也比较简单:

  • 重写getCacheKey方法。
  • 缓存的处理取决于请求上下文,我们必须初始化HystrixRequestContext

比较简单,直接上代码:

package com.lbl.hystrix;

import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;
import com.netflix.hystrix.strategy.concurrency.HystrixRequestContext;
import org.junit.Assert;
import org.junit.Test;

public class CommandUsingRequestCache extends HystrixCommand<Boolean> {

    private final int value;

    protected CommandUsingRequestCache(int value) {
        super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"));
        this.value = value;
    }

    @Override
    protected Boolean run() {
        System.out.println("执行 run ");
        return value == 0 || value % 2 == 0;
    }

    @Override
    protected String getCacheKey() {
        System.out.println("读取cache");
        return String.valueOf(value);
    }


    public static class UnitTest {

        @Test
        public void testWithoutCacheHits() {
            HystrixRequestContext context = HystrixRequestContext.initializeContext();
            try {
                Assert.assertTrue(new CommandUsingRequestCache(2).execute());
                Assert.assertFalse(new CommandUsingRequestCache(1).execute());
                Assert.assertTrue(new CommandUsingRequestCache(0).execute());
                Assert.assertTrue(new CommandUsingRequestCache(58672).execute());
            } finally {
                context.shutdown();
            }
        }

        @Test
        public void testWithCacheHits() {
            HystrixRequestContext context = HystrixRequestContext.initializeContext();
            try {
                CommandUsingRequestCache command2a = new CommandUsingRequestCache(2);
                CommandUsingRequestCache command2b = new CommandUsingRequestCache(2);

                Assert.assertTrue(command2a.execute());
                //首次执行,肯定不走cache
                Assert.assertFalse(command2a.isResponseFromCache());

                System.out.println("***********");
                Assert.assertTrue(command2b.execute());
                //第二次执行,参数一样,走cache
                Assert.assertTrue(command2b.isResponseFromCache());
            } finally {
                context.shutdown();
            }

            System.out.println("***********");
            // 创建一个新的request context
            context = HystrixRequestContext.initializeContext();
            try {
                CommandUsingRequestCache command3b = new CommandUsingRequestCache(2);
                Assert.assertTrue(command3b.execute());
                // 这是一个新的request context。所以不会从cache中取值
                Assert.assertFalse(command3b.isResponseFromCache());
            } finally {
                context.shutdown();
            }
        }
    }
}

testWithCacheHits的运行结果:

读取cache
读取cache
执行 run 
***********
读取cache
读取cache
***********
读取cache
读取cache
执行 run 

比较奇怪的是:每次都执行了两次getCacheKey方法,暂时还没有找到原因(还没有看源码),在使用的时候要注意。哪位大牛知道原因请指正,不胜感激。

缓存清除

在实际应用中,可能会存在多个command共同操作一个数据的情况,就和多线程的临界区一样,我们这里也暂且叫做临界区。那么我们如何保证一个command对临界区数据的修改,另一个command可以立刻读取到最新的值呢?这里我们可以使用HystrixRequestCache.clear()来清除缓存数据。注意:与多线程编程一样,这里临界区的资源要用volatile修饰,因为本质上这里也是多线程执行的。

示例代码:

package com.lbl.hystrix;

import static org.junit.Assert.*;

import org.junit.Test;

import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;
import com.netflix.hystrix.HystrixCommandKey;
import com.netflix.hystrix.HystrixRequestCache;
import com.netflix.hystrix.strategy.concurrency.HystrixConcurrencyStrategyDefault;
import com.netflix.hystrix.strategy.concurrency.HystrixRequestContext;

/**
 * Example {@link HystrixCommand} implementation for handling the get-set-get use case within
 * a single request context so that the "set" can invalidate the cached "get".
 */
public class CommandUsingRequestCacheInvalidation {

    /* 临界区资源。volatile修饰 */
    private static volatile String prefixStoredOnRemoteDataStore = "ValueBeforeSet_";

    public static class GetterCommand extends HystrixCommand<String> {

        private static final HystrixCommandKey GETTER_KEY = HystrixCommandKey.Factory.asKey("GetterCommand");
        private final int id;

        public GetterCommand(int id) {
            super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("GetSetGet"))
                    .andCommandKey(GETTER_KEY));
            this.id = id;
        }

        @Override
        protected String run() {
            return prefixStoredOnRemoteDataStore + id;
        }

        @Override
        protected String getCacheKey() {
            return String.valueOf(id);
        }

        /**
         * 清除缓存代码
         * 
         * @param id
         *            argument that would normally be passed to the command
         *
         *
         */
        public static void flushCache(int id) {
            //这里是根据key删除缓存,与上面getCacheKey方法返回值要一致才行
            HystrixRequestCache.getInstance(GETTER_KEY,
                    HystrixConcurrencyStrategyDefault.getInstance()).clear(String.valueOf(id));
        }

    }

    public static class SetterCommand extends HystrixCommand<Void> {

        private final int id;
        private final String prefix;

        public SetterCommand(int id, String prefix) {
            super(HystrixCommandGroupKey.Factory.asKey("GetSetGet"));
            this.id = id;
            this.prefix = prefix;
        }

        @Override
        protected Void run() {
            // 给prefixStoredOnRemoteDataStore赋新值
            prefixStoredOnRemoteDataStore = prefix;
            // 清除缓存
            GetterCommand.flushCache(id);
            // no return value
            return null;
        }
    }

    public static class UnitTest {

        @Test
        public void getGetSetGet() {
            HystrixRequestContext context = HystrixRequestContext.initializeContext();
            try {
                assertEquals("ValueBeforeSet_1", new GetterCommand(1).execute());
                GetterCommand commandAgainstCache = new GetterCommand(1);
                assertEquals("ValueBeforeSet_1", commandAgainstCache.execute());
                //第二次请求,参数与第一次相同,走缓存
                assertTrue(commandAgainstCache.isResponseFromCache());
                // 调用SetterCommand改变临界区数据的值,这里会清除缓存
                new SetterCommand(1, "ValueAfterSet_").execute();

                GetterCommand commandAfterSet = new GetterCommand(1);
                // 由于上面已经清除了缓存,所以下面会执行run方法,而不是从缓存中取
                assertFalse(commandAfterSet.isResponseFromCache());
                assertEquals("ValueAfterSet_1", commandAfterSet.execute());
            } finally {
                context.shutdown();
            }
        }
    }

}

上面代码中,GetterCommand负责读取数据,并提供删除缓存的方法flushCacheSetterCommand负责修改数据,修改完数据后会调用GetterCommand的清缓存的方法。清读者自行执行代码。

合并请求 :Request Collapsing

Request Collapsing 的含义是指将多个请求合并为一个请求。

  • Hystrix支持2种请求合并方式:请求范围和全局范围( request-scoped and globally-scoped)。这是在collapser构造中配置的,默认为request-scoped。请求范围是指在一个HystrixRequestContexts上下文中的请求。全局范围指垮HystrixRequestContexts的请求。
  • 如果你的两个请求能自动合并的前提是两者足够“近”,即两者启动执行的间隔时长要足够小,默认为10ms,即超过10ms将不自动合并。
  • 实际应用中主要目的是:节省网络开销。

上代码:

package com.lbl.hystrix;

import static org.junit.Assert.*;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Future;

import org.junit.Test;

import com.netflix.hystrix.HystrixCollapser;
import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;
import com.netflix.hystrix.HystrixCommandKey;
import com.netflix.hystrix.HystrixEventType;
import com.netflix.hystrix.HystrixInvokableInfo;
import com.netflix.hystrix.HystrixRequestLog;
import com.netflix.hystrix.strategy.concurrency.HystrixRequestContext;

public class CommandCollapserGetValueForKey extends HystrixCollapser<List<String>, String, Integer> {

    private final Integer key;

    public CommandCollapserGetValueForKey(Integer key) {
        this.key = key;
    }

    @Override
    public Integer getRequestArgument() {
        return key;
    }

    @Override
    protected HystrixCommand<List<String>> createCommand(final Collection<CollapsedRequest<String, Integer>> requests) {
        return new BatchCommand(requests);
    }

    @Override
    protected void mapResponseToRequests(List<String> batchResponse, Collection<CollapsedRequest<String, Integer>> requests) {
        int count = 0;
        for (CollapsedRequest<String, Integer> request : requests) {
            request.setResponse(batchResponse.get(count++));
        }
    }

    private static final class BatchCommand extends HystrixCommand<List<String>> {
        private final Collection<CollapsedRequest<String, Integer>> requests;

        private BatchCommand(Collection<CollapsedRequest<String, Integer>> requests) {
            super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"))
                    .andCommandKey(HystrixCommandKey.Factory.asKey("GetValueForKey")));
            this.requests = requests;
        }

        @Override
        protected List<String> run() {
            System.out.println("真正执行.");

            ArrayList<String> response = new ArrayList<String>();
            for (CollapsedRequest<String, Integer> request : requests) {
                //在一个请求中收到批量request,也就是request被合并以后传递到这里,然后依次执行
                response.add("ValueForKey: " + request.getArgument());
            }
            return response;
        }
    }

    public static class UnitTest {

        @Test
        public void testCollapser() throws Exception {
            HystrixRequestContext context = HystrixRequestContext.initializeContext();
            try {
                Future<String> f1 = new CommandCollapserGetValueForKey(1).queue();
                Future<String> f2 = new CommandCollapserGetValueForKey(2).queue();

//                Thread.sleep(11);// 这里休眠11毫秒,超过了两个请求允许的最长时间间隔(10毫秒),所以会执行两次

                Future<String> f3 = new CommandCollapserGetValueForKey(3).queue();
                Future<String> f4 = new CommandCollapserGetValueForKey(4).queue();

                assertEquals("ValueForKey: 1", f1.get());
                assertEquals("ValueForKey: 2", f2.get());
                assertEquals("ValueForKey: 3", f3.get());
                assertEquals("ValueForKey: 4", f4.get());

                int numExecuted = HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size();
                System.err.println("num executed: " + numExecuted);//获取真正执行的command的数量

                // assert that the batch command 'GetValueForKey' was in fact executed and that it executed only 
                // once or twice (due to non-determinism of scheduler since this example uses the real timer)
                if (numExecuted > 2) {
                    System.err.println("some of the commands should have been collapsed");
                }

                System.err.println("HystrixRequestLog.getCurrentRequest().getAllExecutedCommands(): " + HystrixRequestLog.getCurrentRequest().getAllExecutedCommands());

                int numLogs = 0;
                for (HystrixInvokableInfo<?> command : HystrixRequestLog.getCurrentRequest().getAllExecutedCommands()) {
                    numLogs++;

                    //判断执行的command是不是我们执行的command
                    assertEquals("GetValueForKey", command.getCommandKey().name());

                    System.err.println(command.getCommandKey().name() + " => command.getExecutionEvents(): " + command.getExecutionEvents());

                    // confirm that it was a COLLAPSED command execution
                    assertTrue(command.getExecutionEvents().contains(HystrixEventType.COLLAPSED));
                    assertTrue(command.getExecutionEvents().contains(HystrixEventType.SUCCESS));
                }

                assertEquals(numExecuted, numLogs);
            } finally {
                context.shutdown();
            }
        }
    }
}

执行结果:

真正执行.
num executed: 1
HystrixRequestLog.getCurrentRequest().getAllExecutedCommands(): [com.lbl.hystrix.CommandCollapserGetValueForKey$BatchCommand@2f0a87b3]
GetValueForKey => command.getExecutionEvents(): [SUCCESS, COLLAPSED]

快速失败

快速失败就是指没有重写getFallback,遇到异常后直接抛出,程序停止运行。

package com.lbl.hystrix;

import static org.junit.Assert.*;

import org.junit.Test;

import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;
import com.netflix.hystrix.exception.HystrixRuntimeException;

/**
 * Sample {@link HystrixCommand} that does not have a fallback implemented
 * so will "fail fast" when failures, rejections, short-circuiting etc occur.
 */
public class CommandThatFailsFast extends HystrixCommand<String> {

    private final boolean throwException;

    public CommandThatFailsFast(boolean throwException) {
        super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"));
        this.throwException = throwException;
    }

    @Override
    protected String run() {
        if (throwException) {
            throw new RuntimeException("failure from CommandThatFailsFast");
        } else {
            return "success";
        }
    }

    public static class UnitTest {

        @Test
        public void testSuccess() {
            assertEquals("success", new CommandThatFailsFast(false).execute());
        }

        @Test
        public void testFailure() {
            try {
                new CommandThatFailsFast(true).execute();
                fail("we should have thrown an exception");
            } catch (HystrixRuntimeException e) {
                assertEquals("failure from CommandThatFailsFast", e.getCause().getMessage());
                e.printStackTrace();
            }
        }
    }
}

运行结果 (省略部分错误日志):

20:18:26.279 [hystrix-ExampleGroup-1] DEBUG com.netflix.hystrix.AbstractCommand - Error executing HystrixCommand.run(). Proceeding to fallback logic ...
java.lang.RuntimeException: failure from CommandThatFailsFast
    at com.lbl.hystrix.CommandThatFailsFast.run(CommandThatFailsFast.java:27)
    at com.lbl.hystrix.CommandThatFailsFast.run(CommandThatFailsFast.java:15)
20:18:26.279 [hystrix-ExampleGroup-1] DEBUG com.netflix.hystrix.AbstractCommand - No fallback for HystrixCommand. 
java.lang.UnsupportedOperationException: No fallback available.
    at com.netflix.hystrix.HystrixCommand.getFallback(HystrixCommand.java:293)
    at com.netflix.hystrix.HystrixCommand$3.call(HystrixCommand.java:322)

上面可以看到,会有一个No fallback for HystrixCommand的异常。

我们一般会重写getFallback来处理异常,进行降级等操作。

当降级处理遇到网络请求

上面getFallback都是本地模拟数据(也可以是对象),但是实际项目中,可能会牵扯到网络访问,比如请求另外一个网址,或者从redis等缓存中取值。这种情况下在getFallBack中就有可能会再次出现异常,所以在getFallback的外部请求依然要包装成HystrixCommandor HystrixObservableCommand ,并且再次实现getFallback,再次实现的getFallback就需要考虑降级策略了。

package com.lbl.hystrix;

import static org.junit.Assert.*;

import org.junit.Test;

import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;
import com.netflix.hystrix.HystrixCommandKey;
import com.netflix.hystrix.HystrixEventType;
import com.netflix.hystrix.HystrixInvokableInfo;
import com.netflix.hystrix.HystrixRequestLog;
import com.netflix.hystrix.HystrixThreadPoolKey;
import com.netflix.hystrix.strategy.concurrency.HystrixRequestContext;

/**
 * Sample {@link HystrixCommand} that implements fallback logic that requires
 * network traffic and thus executes another {@link HystrixCommand} from the {@link #getFallback()} method.
 * <p>
 * Note also that the fallback command uses a separate thread-pool as well even though
 * it's in the same command group.
 * <p>
 * It needs to be on a separate thread-pool otherwise the first command could saturate it
 * and the fallback command never have a chance to execute.
 */
public class CommandWithFallbackViaNetwork extends HystrixCommand<String> {
    private final int id;

    protected CommandWithFallbackViaNetwork(int id) {
        super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("RemoteServiceX"))
                .andCommandKey(HystrixCommandKey.Factory.asKey("GetValueCommand")));
        this.id = id;
    }

    @Override
    protected String run() {
        //模拟异常
        throw new RuntimeException("force failure for example");
    }

    @Override
    protected String getFallback() {
        //这里访问另一个command,并且要实现getFallback避免外部访问异常
        return new FallbackViaNetwork(id).execute();
    }

    private static class FallbackViaNetwork extends HystrixCommand<String> {
        private final int id;

        public FallbackViaNetwork(int id) {
            super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("RemoteServiceX"))
                    .andCommandKey(HystrixCommandKey.Factory.asKey("GetValueFallbackCommand"))
                    // use a different threadpool for the fallback command
                    // so saturating the RemoteServiceX pool won't prevent
                    // fallbacks from executing
                    .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("RemoteServiceXFallback")));
            this.id = id;
        }

        @Override
        protected String run() {
            //            MemCacheClient.getValue(id);
            throw new RuntimeException("the fallback also failed");
        }

        @Override
        protected String getFallback() {
            // the fallback also failed
            // so this fallback-of-a-fallback will 
            // fail silently and return null
            System.out.println("第二次执行失败。");
            return null;
        }
    }

    public static class UnitTest {

        @Test
        public void test() {
            HystrixRequestContext context = HystrixRequestContext.initializeContext();
            try {
                assertEquals(null, new CommandWithFallbackViaNetwork(1).execute());

                HystrixInvokableInfo<?> command1 = HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().toArray(new HystrixInvokableInfo<?>[2])[0];
                assertEquals("GetValueCommand", command1.getCommandKey().name());
                assertTrue(command1.getExecutionEvents().contains(HystrixEventType.FAILURE));

                HystrixInvokableInfo<?> command2 = HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().toArray(new HystrixInvokableInfo<?>[2])[1];
                assertEquals("GetValueFallbackCommand", command2.getCommandKey().name());
                assertTrue(command2.getExecutionEvents().contains(HystrixEventType.FAILURE));
            } finally {
                context.shutdown();
            }
        }
    }
}

执行结果:

20:30:47.454 [hystrix-RemoteServiceXFallback-1] DEBUG com.netflix.hystrix.AbstractCommand - Error executing HystrixCommand.run(). Proceeding to fallback logic ...
java.lang.RuntimeException: the fallback also failed
    at com.lbl.hystrix.CommandWithFallbackViaNetwork$FallbackViaNetwork.run(CommandWithFallbackViaNetwork.java:64)
第二次执行失败。

信号量

在执行网络请求时,当遇到网络延迟后者线程开销不可接受时,可以将executionIsolationStrategy属性设置为ExecutionIsolationStrategy.SEMAPHORE,Hystrix将使用信号量隔离。默认是线程池隔离。

信号量的使用比较简单:

super(Setter
                .withGroupKey(HystrixCommandGroupKey.Factory.asKey("SystemX"))
                .andCommandKey(HystrixCommandKey.Factory.asKey("PrimarySecondaryCommand"))
                .andCommandPropertiesDefaults(
                        HystrixCommandProperties.Setter()
                                .withExecutionIsolationStrategy(ExecutionIsolationStrategy.SEMAPHORE)));

主备模式或者AB测试

这里标题不太好想,可能你看到上面标题不明白啥意思。简单解释一下:我们平时在系统升级时,有可能会有一些功能需要线上验证,但是又想保留老的逻辑,过去的逻辑可能是写个if...else...,然后通过某个配置来控制。这里我们采用hystrix来解决这个问题。

enter image description here

上图是官网的一张图片,最左侧的Command是主调度器,右边的primary Comand和Secondary Command是两个不同逻辑的command代码。通过一个配置来在两个command中进行切换。

代码如下:

package com.lbl.hystrix;

import static org.junit.Assert.*;

import org.junit.Test;

import com.netflix.config.ConfigurationManager;
import com.netflix.config.DynamicBooleanProperty;
import com.netflix.config.DynamicPropertyFactory;
import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;
import com.netflix.hystrix.HystrixCommandKey;
import com.netflix.hystrix.HystrixCommandProperties;
import com.netflix.hystrix.HystrixCommandProperties.ExecutionIsolationStrategy;
import com.netflix.hystrix.HystrixThreadPoolKey;
import com.netflix.hystrix.strategy.concurrency.HystrixRequestContext;

/**
 * Sample {@link HystrixCommand} pattern using a semaphore-isolated command
 * that conditionally invokes thread-isolated commands.
 */
public class CommandFacadeWithPrimarySecondary extends HystrixCommand<String> {

    private final static DynamicBooleanProperty usePrimary = DynamicPropertyFactory.getInstance().getBooleanProperty("primarySecondary.usePrimary", true);

    private final int id;

    public CommandFacadeWithPrimarySecondary(int id) {
        super(Setter
                .withGroupKey(HystrixCommandGroupKey.Factory.asKey("SystemX"))
                .andCommandKey(HystrixCommandKey.Factory.asKey("PrimarySecondaryCommand"))
                .andCommandPropertiesDefaults(
                        // 这里采用信号量
                        HystrixCommandProperties.Setter()
                                .withExecutionIsolationStrategy(ExecutionIsolationStrategy.SEMAPHORE)));
        this.id = id;
    }

    /**
     * 通过DynamicPropertyFactory来路由到不同的command
     * @return
     */
    @Override
    protected String run() {
        if (usePrimary.get()) {
            return new PrimaryCommand(id).execute();
        } else {
            return new SecondaryCommand(id).execute();
        }
    }

    @Override
    protected String getFallback() {
        return "static-fallback-" + id;
    }

    @Override
    protected String getCacheKey() {
        return String.valueOf(id);
    }

    private static class PrimaryCommand extends HystrixCommand<String> {

        private final int id;

        private PrimaryCommand(int id) {
            super(Setter
                    .withGroupKey(HystrixCommandGroupKey.Factory.asKey("SystemX"))
                    .andCommandKey(HystrixCommandKey.Factory.asKey("PrimaryCommand"))
                    .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("PrimaryCommand"))
                    .andCommandPropertiesDefaults(
                            // we default to a 600ms timeout for primary
                            HystrixCommandProperties.Setter().withExecutionTimeoutInMilliseconds(600)));
            this.id = id;
        }

        @Override
        protected String run() {
            // perform expensive 'primary' service call
            return "responseFromPrimary-" + id;
        }

    }

    private static class SecondaryCommand extends HystrixCommand<String> {

        private final int id;

        private SecondaryCommand(int id) {
            super(Setter
                    .withGroupKey(HystrixCommandGroupKey.Factory.asKey("SystemX"))
                    .andCommandKey(HystrixCommandKey.Factory.asKey("SecondaryCommand"))
                    .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("SecondaryCommand"))
                    .andCommandPropertiesDefaults(
                            // we default to a 100ms timeout for secondary
                            HystrixCommandProperties.Setter().withExecutionTimeoutInMilliseconds(100)));
            this.id = id;
        }

        @Override
        protected String run() {
            // perform fast 'secondary' service call
            return "responseFromSecondary-" + id;
        }

    }

    public static class UnitTest {

        @Test
        public void testPrimary() {
            HystrixRequestContext context = HystrixRequestContext.initializeContext();
            try {
                //将属性"primarySecondary.usePrimary"设置为true,则走PrimaryCommand;设置为false,则走SecondaryCommand
                ConfigurationManager.getConfigInstance().setProperty("primarySecondary.usePrimary", true);
                assertEquals("responseFromPrimary-20", new CommandFacadeWithPrimarySecondary(20).execute());
            } finally {
                context.shutdown();
                ConfigurationManager.getConfigInstance().clear();
            }
        }

        @Test
        public void testSecondary() {
            HystrixRequestContext context = HystrixRequestContext.initializeContext();
            try {
                //将属性"primarySecondary.usePrimary"设置为true,则走PrimaryCommand;设置为false,则走SecondaryCommand
                ConfigurationManager.getConfigInstance().setProperty("primarySecondary.usePrimary", false);
                assertEquals("responseFromSecondary-20", new CommandFacadeWithPrimarySecondary(20).execute());
            } finally {
                context.shutdown();
                ConfigurationManager.getConfigInstance().clear();
            }
        }
    }
}

本文中的内容主要依赖于Configuration

线程池、信号量

hystrix提供两种模式:线程池和信号量模式。

  • 线程池隔离模式:使用一个线程池来存储当前的请求,线程池对请求作处理,设置任务返回处理超时时间,堆积的请求堆积入线程池队列。这种方式需要为每个依赖的服务申请线程池,有一定的资源消耗,好处是可以应对突发流量(流量洪峰来临时,处理不完可将数据存储到线程池队里慢慢处理)。

  • 信号量隔离模式:使用一个原子计数器(或信号量)来记录当前有多少个线程在运行,请求来先判断计数器的数值,若超过设置的最大线程个数则丢弃改类型的新请求,若不超过则执行计数操作请求来计数器+1,请求返回计数器-1。这种方式是严格的控制线程且立即返回模式,无法应对突发流量(流量洪峰来临时,处理的线程超过数量,其他的请求会直接返回,不继续去请求依赖的服务)。

模式使用线程池模式,除非任务比较耗时,导致单线程任务过于繁重的情况。

Generally the only time you should use semaphore isolation for HystrixCommands is when the call is so high volume (hundreds per second, per instance) that the overhead of separate threads is too high; this typically only applies to non-network calls.

超时熔断、降级

在网络访问中,为了优化用户体验,遇到超时的情况,可以直接放弃本次请求,不等待结果的返回,直接返回用户默认数据或者降级为从另一个服务或者缓存等默认数据(具体业务具体分析)。

HystrixCommandProperties.Setter().withExecutionTimeoutInMilliseconds(int value)

默认超时时间是1000毫秒。

package com.lbl.hystrix;

import com.netflix.hystrix.*;

public class TimeOutCommand extends HystrixCommand<String> {
    private final String name;

    public TimeOutCommand(String name) {
        super(Setter
                .withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"))
                .andCommandKey(HystrixCommandKey.Factory.asKey("LblCommandKey"))
                .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("LblThreadPoolKey"))
                .andCommandPropertiesDefaults(
                        HystrixCommandProperties.Setter()
//                                .withExecutionTimeoutEnabled(false)//是否启用超时中断
                                .withExecutionTimeoutInMilliseconds(100)//配置超时时间
                                .withExecutionIsolationThreadInterruptOnTimeout(true)));//超时后是否中断
        this.name = name;
    }

    /**
     * 程序执行异常时可以通过getFallback进行降级操作
     *
     * @return
     */
    @Override
    protected String getFallback() {
        return "exeucute Falled!";
    }

    @Override
    protected String run() throws Exception {
        try {
            Thread.sleep(200);
            System.out.println("我没有被中断哦…………");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "Hello " + name + " thread:" + Thread.currentThread().getName();
    }

    public static void main(String[] args) throws Exception {
        System.out.println(new TimeOutCommand("test-Fallback").execute());

        System.in.read();


    }
}

说明:withExecutionIsolationThreadInterruptOnTimeout用于配置超时后是否中断run方法的执行。这个需要根据具体业务逻辑具体分析,如果你的代码允许中断,那么最好中断,以节省开销。反之则禁止中断。

请读者自行尝试修改上述几个配置运行代码,查看结果。

配置项说明

  • execution.timeout.enabled

是否启用超时配置。默认true 自定义:HystrixCommandProperties.Setter().withExecutionTimeoutEnabled(boolean value)

  • execution.isolation.thread.timeoutInMilliseconds

配置超时时间。默认一秒 自定义HystrixCommandProperties.Setter().withExecutionTimeoutInMilliseconds(int value)

  • execution.isolation.thread.interruptOnTimeout

超时以后,是否中断run的执行。默认false 自定义:HystrixCommandProperties.Setter().withExecutionIsolationThreadInterruptOnCancel(boolean value)

  • execution.isolation.semaphore.maxConcurrentRequests

该属性仅在信号量模式下有效,该属性配置访问run方法的最大并发请求数。默认10。 自定义:HystrixCommandProperties.Setter().withExecutionIsolationSemaphoreMaxConcurrentRequests(int value)

  • fallback.isolation.semaphore.maxConcurrentRequests

该属性配置了fallback方法的直达并发请求数。默认10。该属性同时支持线程池默认和信号量模式。 自定义:HystrixCommandProperties.Setter().withFallbackIsolationSemaphoreMaxConcurrentRequests(int value)

并发熔断

有时候我们需要控制并发访问量,房子服务器由于高并发导致宕机。 使用到的配置项就是上面刚刚讲的maxConcurrentRequestsmaxConcurrentRequests。实例如下:

package com.lbl.hystrix;

import com.netflix.hystrix.*;
import com.netflix.hystrix.HystrixCommandProperties.ExecutionIsolationStrategy;
import org.junit.Test;

import java.io.IOException;

/**
 * 并发测试
 * 默认执行run()用的是主线程,为了模拟并行执行command,这里我们自己创建多个线程来执行command
 * 设置ExecutionIsolationSemaphoreMaxConcurrentRequests为3,意味着信号量最多允许执行run的并发数为3,超过则触发降级,即不执行run而执行getFallback
 * 设置FallbackIsolationSemaphoreMaxConcurrentRequests为2,意味着信号量最多允许执行fallback的并发数为2,超过则抛异常fallback execution rejected
 */
public class ConcurrentTest extends HystrixCommand<String> {

    private final String name;

    public ConcurrentTest(String name) {
        super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("SemaphoreTestGroup"))
                .andCommandKey(HystrixCommandKey.Factory.asKey("SemaphoreTestKey"))
                .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("SemaphoreTestThreadPoolKey"))
                .andCommandPropertiesDefaults(    // 配置信号量隔离
                        HystrixCommandProperties.Setter()
                                .withExecutionIsolationStrategy(ExecutionIsolationStrategy.SEMAPHORE)    // 信号量隔离
                                .withExecutionIsolationSemaphoreMaxConcurrentRequests(3)
                                .withFallbackIsolationSemaphoreMaxConcurrentRequests(2)
                )
        );
        this.name = name;
    }

    @Override
    protected String run() throws Exception {
        Thread.sleep(100);//模拟执行时间
        return "run(): name=" + name + ",线程名是" + Thread.currentThread().getName();
    }

    @Override
    protected String getFallback() {
        try {
            Thread.sleep(100);//模拟执行时间
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "getFallback(): name=" + name + ",线程名是" + Thread.currentThread().getName();
    }

    public static class UnitTest {

        @Test
        public void testSynchronous() throws IOException {

            try {
                for (int i = 0; i < 10; i++) {
                    final int j = i;
                    // 自主创建线程来执行command,创造并发场景
                    Thread thread = new Thread(() ->
                            System.out.println("===========" + new ConcurrentTest("HLX" + j).execute())    // 被信号量拒绝的线程从这里抛出异常
                    );
                    thread.start();
                }
            } catch (Exception e) {
                e.printStackTrace();
            }

            System.in.read();
        }
    }

}

执行结果:

Exception in thread "Thread-7" com.netflix.hystrix.exception.HystrixRuntimeException: SemaphoreTestKey fallback execution rejected.
    at com.netflix.hystrix.AbstractCommand.handleFallbackRejectionByEmittingError(AbstractCommand.java:1043)
    at com.netflix.hystrix.AbstractCommand.getFallbackOrThrowException(AbstractCommand.java:875)
===========run(): name=HLX0,线程名是Thread-0
===========getFallback(): name=HLX3,线程名是Thread-3
===========run(): name=HLX9,线程名是Thread-9
===========run(): name=HLX5,线程名是Thread-5
===========getFallback(): name=HLX1,线程名是Thread-1

通过sleep模拟的程序执行时间,以便于伪造并发。结果中我们可以看到,run执行了3次,fallback执行了2次。其余的报异常HystrixRuntimeException: SemaphoreTestKey fallback execution rejected

熔断

熔断是指错误达到某个设定的阈值,或者请求量超过阈值后,系统自动(或手动)阻止代码或服务的执行调用,从而达到系统整体保护的效果。当检测到系统可用时,需要恢复访问。

下面我们模拟一个由于错误Exception导致的熔断实例:

配置一个时间窗口内失败2次则进行熔断,之后过8秒,进行重试,检测服务是否恢复。

package com.lbl.hystrix;

import com.netflix.hystrix.*;
import org.junit.Test;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.TimeUnit;

public class CircuitBreakerTest extends HystrixCommand<String> {

    private final String name;

    public CircuitBreakerTest(String name) {
        super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("CircuitBreakerTestGroup"))
                        .andCommandKey(HystrixCommandKey.Factory.asKey("CircuitBreakerTestKey"))
                        .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("CircuitBreakerTest"))
                        .andThreadPoolPropertiesDefaults(    // 配置线程池
                                HystrixThreadPoolProperties.Setter()
                                        .withCoreSize(200)    // 配置线程池里的线程数,设置足够多线程,以防未熔断却打满threadpool
                        )
                        .andCommandPropertiesDefaults(    // 配置熔断器
                                HystrixCommandProperties.Setter()
                                        .withCircuitBreakerEnabled(true) //默认true
                                        .withCircuitBreakerErrorThresholdPercentage(20)     //(出错百分比阈值,当达到此阈值后,开始短路。默认50%)
                                        .withCircuitBreakerRequestVolumeThreshold(3)        //// 在统计数据之前,必须在10秒内发出3个请求。  默认是20.withCircuitBreakerSleepWindowInMilliseconds(8000)  //(断路多久以后开始尝试是否恢复,默认5s)
                        )
        );
        this.name = name;
    }

    @Override
    protected String run() throws Exception {
        if(Integer.parseInt(name) == 1 || Integer.parseInt(name) == 17){
            System.out.println(1 / 0);
        }
        return name;
    }

    @Override
    protected String getFallback() {
        return "CircuitBreaker fallback: " + name;
    }

    public static class UnitTest {
        @Test
        public void testxianliu() throws IOException, InterruptedException {
            for (int i = 0; i < 20; i++) {
                CircuitBreakerTest circuitBreakerTest = new CircuitBreakerTest(String.valueOf(i));
                String execute = circuitBreakerTest.execute();
                boolean responseFromFallback = circuitBreakerTest.isResponseFromFallback();
                System.out.println(execute + "; responseFromFallback: " + responseFromFallback + ".isCircuitBreakerOpen:" + circuitBreakerTest.isCircuitBreakerOpen());
                TimeUnit.SECONDS.sleep(1);
            }
        }
    }

}

执行结果:

0; responseFromFallback: false.isCircuitBreakerOpen:false
19:13:28.272 [hystrix-CircuitBreakerTest-2] DEBUG com.netflix.hystrix.AbstractCommand - Error executing HystrixCommand.run(). Proceeding to fallback logic ...
java.lang.ArithmeticException: / by zero
    //胜率部分错误日志
CircuitBreaker fallback: 1; responseFromFallback: true.isCircuitBreakerOpen:false
2; responseFromFallback: false.isCircuitBreakerOpen:false
CircuitBreaker fallback: 3; responseFromFallback: true.isCircuitBreakerOpen:true
CircuitBreaker fallback: 4; responseFromFallback: true.isCircuitBreakerOpen:true
CircuitBreaker fallback: 5; responseFromFallback: true.isCircuitBreakerOpen:true
CircuitBreaker fallback: 6; responseFromFallback: true.isCircuitBreakerOpen:true
CircuitBreaker fallback: 7; responseFromFallback: true.isCircuitBreakerOpen:true
CircuitBreaker fallback: 8; responseFromFallback: true.isCircuitBreakerOpen:true
CircuitBreaker fallback: 9; responseFromFallback: true.isCircuitBreakerOpen:true
CircuitBreaker fallback: 10; responseFromFallback: true.isCircuitBreakerOpen:true
11; responseFromFallback: false.isCircuitBreakerOpen:false
12; responseFromFallback: false.isCircuitBreakerOpen:false
13; responseFromFallback: false.isCircuitBreakerOpen:false
14; responseFromFallback: false.isCircuitBreakerOpen:false
15; responseFromFallback: false.isCircuitBreakerOpen:false
16; responseFromFallback: false.isCircuitBreakerOpen:false
19:11:44.541 [hystrix-CircuitBreakerTest-10] DEBUG com.netflix.hystrix.AbstractCommand - Error executing HystrixCommand.run(). Proceeding to fallback logic ...
java.lang.ArithmeticException: / by zero
    //省略部分错误日志
CircuitBreaker fallback: 17; responseFromFallback: true.isCircuitBreakerOpen:false
18; responseFromFallback: false.isCircuitBreakerOpen:false
19; responseFromFallback: false.isCircuitBreakerOpen:false

结果分析:

i=1时报错,但是还未超过withCircuitBreakerRequestVolumeThreshold的配置的值(3)。所以不会熔断。但是当i=3时,过去【0,1,2】错了一个,错误率是33.3%,超过了阈值20%,所以熔断。

i=11时,过了8秒(withCircuitBreakerSleepWindowInMilliseconds配置项),再次尝试请求原服务,发现服务可用,解除熔断。i=17时,再次出错一次。这时,i=[11,17]出错一次,错误率14.2%,未超阈值,不进行熔断。

读者可以尝试在增加出错的次数,看熔断的情况。

配置项说明

  • circuitBreaker.enabled

断路器是否可用。默认值为true。自定义值:HystrixCommandProperties.Setter().withCircuitBreakerEnabled(boolean value)

  • circuitBreaker.requestVolumeThreshold

这个属性指一个时间窗口内,达到多少次失败则会进行熔断。默认值是20

假如采用默认值,但是一个时间窗口内,只请求了19次,即使都失败了,也不会熔断。

自定义值:HystrixCommandProperties.Setter().withCircuitBreakerRequestVolumeThreshold(int value)

  • circuitBreaker.sleepWindowInMilliseconds

熔断之后,间隔多长时间,检测服务是否恢复。默认值5秒 自定义值:HystrixCommandProperties.Setter()withCircuitBreakerSleepWindowInMilliseconds(int value)

  • circuitBreaker.errorThresholdPercentage

该属性配置错误百分比。当错误百分比超过阈值时熔断。默认50%

自定义值:HystrixCommandProperties.Setter().withCircuitBreakerErrorThresholdPercentage(int value)

  • circuitBreaker.forceOpen

强制熔断。如果该属性设置为true,则所有请求走熔断模式,直接到fallback。默认false 自定义值HystrixCommandProperties.Setter().withCircuitBreakerForceOpen(boolean value)

  • circuitBreaker.forceClosed

强制通过,禁止熔断。如果该属性设置为true,则所有请求都请求到run。默认false 注意:forceOpen优先。如果forceOpen设置为true,则forceClosed失效。 自定义值HystrixCommandProperties.Setter().withCircuitBreakerForceOpen(boolean value)

另外,如果线程池被打满,即使没有出现上面的异常,或者达到某些阈值,也会降级。

相关文章
|
2月前
|
存储 缓存 算法
分布式锁服务深度解析:以Apache Flink的Checkpointing机制为例
【10月更文挑战第7天】在分布式系统中,多个进程或节点可能需要同时访问和操作共享资源。为了确保数据的一致性和系统的稳定性,我们需要一种机制来协调这些进程或节点的访问,避免并发冲突和竞态条件。分布式锁服务正是为此而生的一种解决方案。它通过在网络环境中实现锁机制,确保同一时间只有一个进程或节点能够访问和操作共享资源。
82 3
|
6月前
springCloud之服务降级熔断Hystrix、OpenFeign
springCloud之服务降级熔断Hystrix、OpenFeign
356 0
|
3月前
|
存储 NoSQL 调度
|
3月前
|
数据采集 分布式计算 MaxCompute
MaxCompute 分布式计算框架 MaxFrame 服务正式商业化公告
MaxCompute 分布式计算框架 MaxFrame 服务于北京时间2024年09月27日正式商业化!
99 3
|
4月前
|
存储 NoSQL 算法
Go 分布式令牌桶限流 + 兜底保障
Go 分布式令牌桶限流 + 兜底保障
|
6月前
|
消息中间件 传感器 Cloud Native
事件驱动作为分布式异步服务架构
【6月更文挑战第25天】本文介绍事件驱动架构(EDA)是异步分布式设计的关键模式,适用于高扩展性需求。EDA提升服务韧性,支持CQRS、数据通知、开放式接口和事件流处理。然而,其脆弱性包括组件控制、数据交换、逻辑关系复杂性、潜在死循环和高并发挑战。EDA在云原生环境,如Serverless,中尤其适用。
206 2
事件驱动作为分布式异步服务架构
|
4月前
|
Java 应用服务中间件 数据库
SpringCloud:服务保护和分布式事务详解
SpringCloud:服务保护和分布式事务详解
133 0
|
5月前
|
存储 缓存 NoSQL
高并发架构设计三大利器:缓存、限流和降级问题之Redis用于搭建分布式缓存集群问题如何解决
高并发架构设计三大利器:缓存、限流和降级问题之Redis用于搭建分布式缓存集群问题如何解决
102 1
|
6月前
|
监控 Java API
深入解析 Spring Cloud Sentinel:分布式系统流量控制与熔断降级的全面指南
深入解析 Spring Cloud Sentinel:分布式系统流量控制与熔断降级的全面指南
141 0
深入解析 Spring Cloud Sentinel:分布式系统流量控制与熔断降级的全面指南
|
7月前
|
监控 Java API
Spring cloud Hystrix 、Dashboard、API(zuul)相关报错
Spring cloud Hystrix 、Dashboard、API(zuul)相关报错
75 2