hystrix-go 使用与原理

简介: hystrix-go 使用与原理

开篇


这周在看内部一个熔断限流包时,发现它是基于一个开源项目 hystrix-go 实现了,因此有了这篇文章。


Hystrix


Hystrix 是由 Netflex 开发的一款开源组件,提供了基础的熔断功能。 Hystrix 将降级的策略封装在 Command 中,提供了 run 和 fallback 两个方法,前者表示正常的逻辑,比如微服务之间的调用……,如果发生了故障,再执行 fallback 方法返回结果,我们可以把它理解成保底操作。如果正常逻辑在短时间内频繁发生故障,那么可能会触发短路,也就是之后的请求不再执行 run, 而是直接执行 fallback。更多关于 Hystrix 的信息可以查看 https://github.com/Netflix/Hystrix,而

hystrix-go 则是用 go 实现的 hystrix 版,更确切的说,是简化版。只是上一次更新还是 2018 年 的一次 pr, 也就毕业了?


为什么需要这些工具?

比如一个微服务化的产品线上,每一个服务都专注于自己的业务,并对外提供相应的服务接口,或者依赖于外部服务的某个逻辑接口,就像下面这样。


1668567332880.jpg


假设我们当前是 服务A,有部分逻辑依赖于 服务C,服务C 又依赖于 服务E, 当前微服务之间进行 rpc 或者 http 通信,假设此时 服务C 调用 服务 E 失败,比如由于网络波动导致超时或者服务 E 由于过载,系统 E 已经 down 掉了。


1668567347350.jpg


调用失败,一般会有失败重试等机制。但是再想想,假设服务 E 已然不可用的情况下,此时新的调用不断产生,同时伴随着调用等待和失败重试,会导致 服务 C 对服务 E 的调用而产生大量的积压,慢慢会耗尽服务 C 的资源,进而导致服务 C 也 down 掉,这样恶性循环下,会影响到整个微服务体系,产生雪崩效应。


1668567357045.jpg


虽然导致雪崩的发生不仅仅这一种,但是我们需要采取一定的措施,来保证不让这个噩梦发生。而 hystrix-go 就很好的提供了 熔断和降级的措施。它的主要思想在于,设置一些阀值,比如最大并发数 (当并发数大于设置的并发数,拦截),错误率百分比 (请求数量大于等于设置 的阀值,并且错误率达到设置的百分比时,触发熔断) 以及熔断尝试恢复时间等 。


使用


hystrix-go 的使用非常简单,你可以调用它的 Go 或者 Do 方法,只是 Go 方法是异步的方式。而 Do 方法是同步方式。我们从一个简单的例子开启。

_ = hystrix.Do("wuqq", func() error {
        // talk to other services
        _, err := http.Get("https://www.baidu.com/")
        if err != nil {
            fmt.Println("get error:%v",err)
            return err
        }
        return nil
    }, func(err error) error {
        fmt.Printf("handle  error:%v\n", err)
        return nil
    })

Do 函数需要三个参数,第一个参数 commmand 名称,你可以把每个名称当成一个独立当服务,第二个参数是处理正常的逻辑,比如 http 调用服务,返回参数是 err。如果处理 | 调用失败,那么就执行第三个参数逻辑, 我们称为保底操作。由于服务错误率过高导致熔断器开启,那么之后的请求也直接回调此函数。


既然熔断器是按照配置的规则而进行是否开启的操作,那么我们当然可以设置我们想要的值。

hystrix.ConfigureCommand("wuqq", hystrix.CommandConfig{
        Timeout:                int(3 * time.Second),
        MaxConcurrentRequests:  10,
        SleepWindow:            5000,
        RequestVolumeThreshold: 10,
        ErrorPercentThreshold:  30,
    })
    _ = hystrix.Do("wuqq", func() error {
        // talk to other services
        _, err := http.Get("https://www.baidu.com/")
        if err != nil {
            fmt.Println("get error:%v",err)
            return err
        }
        return nil
    }, func(err error) error {
        fmt.Printf("handle  error:%v\n", err)
        return nil
    })

稍微解释一下上面配置的值含义:


Timeout: 执行 command 的超时时间。

MaxConcurrentRequests:command 的最大并发量 。

SleepWindow:当熔断器被打开后,SleepWindow 的时间就是控制过多久后去尝试服务是否可用了。

RequestVolumeThreshold: 一个统计窗口 10 秒内请求数量。达到这个请求数量后才去判断是否要开启熔断

ErrorPercentThreshold:错误百分比,请求数量大于等于 RequestVolumeThreshold 并且错误率到达这个百分比后就会启动熔断

当然你不设置的话,那么自动走的默认值。


1668567408696.jpg

我们再来看一个简单的例子:

package main
import (
   "fmt"
 "github.com/afex/hystrix-go/hystrix" "net/http" "time")
type Handle struct{}
func (h *Handle) ServeHTTP(r http.ResponseWriter, request *http.Request) {
   h.Common(r, request)
}
func (h *Handle) Common(r http.ResponseWriter, request *http.Request) {
   hystrix.ConfigureCommand("mycommand", hystrix.CommandConfig{
      Timeout:                int(3 * time.Second),
      MaxConcurrentRequests:  10,
      SleepWindow:            5000,
      RequestVolumeThreshold: 20,
      ErrorPercentThreshold:  30,
   })
   msg := "success"
  _ = hystrix.Do("mycommand", func() error {
      _, err := http.Get("https://www.baidu.com")
      if err != nil {
         fmt.Printf("请求失败:%v", err)
         return err
  }
      return nil
  }, func(err error) error {
      fmt.Printf("handle  error:%v\n", err)
      msg = "error"
  return nil
  })
   r.Write([]byte(msg))
}
func main() {
   http.ListenAndServe(":8090", &Handle{})
}

我们开启了一个 http 服务,监听端口号 8090,所有请求的处理逻辑都在 Common 方法中,在这个方法中,我们主要是发起一次 http 请求,请求成功响应 success, 如果失败,响应失败原因。


我们再写另一个简单程序,并发 11 次的请求 8090 端口。

package main
import (
    "fmt"
    "io/ioutil"
    "net/http"
    "sync"
    "time"
)
var client *http.Client
func init() {
    tr := &http.Transport{
        MaxIdleConns:    100,
        IdleConnTimeout: 1 * time.Second,
    }
    client = &http.Client{Transport: tr}
}
type info struct {
    Data interface{} `json:"data"`
}
func main() {
    var wg sync.WaitGroup
    for i := 0; i < 11; i++ {
        wg.Add(1)
        go func(int2 int) {
            defer wg.Done()
            req, err := http.NewRequest("GET", "http://localhost:8090", nil)
            if err != nil {
                fmt.Printf("初始化http客户端处错误:%v", err)
                return
            }
            resp, err := client.Do(req)
            if err != nil {
                fmt.Printf("初始化http客户端处错误:%v", err)
                return
            }
            defer resp.Body.Close()
            nByte, err := ioutil.ReadAll(resp.Body)
            if err != nil {
                fmt.Printf("读取http数据失败:%v", err)
                return
            }
            fmt.Printf("接收到到值:%v\n", string(nByte))
        }(i)
    }
    wg.Wait()
    fmt.Printf("请求完毕\n")
}

由于我们配置 MaxConcurrentRequests 为 10,那么意味着还有个 g 请求会失败:

1668567444859.jpg

和我们想的一样。


接着我们把网络断开,并发请求改成 10 次。再次运行程序并发请求 8090 端口,此时由于网络已关闭,导致请求百度失败:

1668567456913.jpg

接着继续请求:

1668567468906.jpg

熔断器已开启,上面我们配置的 RequestVolumeThreshold 和 ErrorPercentThreshold 生效。


然后我们把网连上,五秒后 (SleepWindow 的值) 继续并发调用,当前熔断器处于半开的状态,此时请求允许调用依赖,如果成功则关闭,失败则继续开启熔断器。

1668567480487.jpg

可以看到,有一个成功了,那么此时熔断器已关闭,接下来继续运行函数并发调用:

1668567492084.jpg

可以看到,10 个都已经是正常成功的状态了。


那么问题来了,为什么最上面的图只有一个是成功的?5 秒已经过了,并且当前网络正常,应该是 10 个请求都成功,但是我们看到的只有一个是成功状态。通过源码我们可以找到答案:

具体逻辑在判断当前请求是否可以调用依赖

if !cmd.circuit.AllowRequest() {
            ......
            return
        }
func (circuit *CircuitBreaker) AllowRequest() bool {
    return !circuit.IsOpen() || circuit.allowSingleTest()
}
func (circuit *CircuitBreaker) allowSingleTest() bool {
    circuit.mutex.RLock()
    defer circuit.mutex.RUnlock()
    now := time.Now().UnixNano()
    openedOrLastTestedTime := atomic.LoadInt64(&circuit.openedOrLastTestedTime)
    if circuit.open && now > openedOrLastTestedTime+getSettings(circuit.Name).SleepWindow.Nanoseconds() {
    /
        swapped := atomic.CompareAndSwapInt64(&circuit.openedOrLastTestedTime, openedOrLastTestedTime, now) //这一句才是关键
        if swapped {
            log.Printf("hystrix-go: allowing single test to possibly close circuit %v", circuit.Name)
        }
        return swapped
    }
    return false
}

这段代码首先判断了熔断器是否开启,并且当前时间大于 上一次开启熔断器的时间 + SleepWindow 的时间,如果条件都符合的话,更新此熔断器最新的 openedOrLastTestedTime , 是通过 CompareAndSwapInt64 原子操作完成的,意外着必然只会有一个成功。

此时熔断器还是半开的状态,接着如果能拿到令牌,执行 run 函数(也就是 Do 传入的第二个简单封装后的函数),发起 http 请求,如果成功,上报成功状态,关闭熔断器。如果失败,那么熔断器依旧开启。

1668567522210.jpg

1668567529170.jpg

相关文章
|
7月前
|
缓存 Go API
Go 实现一个支持多种过期、淘汰机制的本地缓存的核心原理
本文旨在探讨实现一个支持多种 过期、淘汰 机制的 go 本地缓存的核心原理,我将重点讲解如何支持多样化的过期和淘汰策略。
170 0
|
7月前
|
存储 安全 Java
Go 基础数据结构的底层原理(slice,channel,map)
Go 基础数据结构的底层原理(slice,channel,map)
103 0
|
存储 设计模式 编译器
详解Go语言类型与接口关系:从原理到应用全解密
详解Go语言类型与接口关系:从原理到应用全解密
95 0
|
测试技术 Go 开发工具
100天精通Golang(基础入门篇)——第3天:Go语言的执行原理及常用命令、编码规范和常用工具
100天精通Golang(基础入门篇)——第3天:Go语言的执行原理及常用命令、编码规范和常用工具
247 1
|
4月前
|
缓存 Java 编译器
Go 中的内存布局和分配原理
Go 中的内存布局和分配原理
|
4月前
|
存储 人工智能 JSON
深入理解 go reflect - 反射基本原理
深入理解 go reflect - 反射基本原理
63 0
|
4月前
|
缓存 安全 测试技术
深入理解 go sync.Map - 基本原理
深入理解 go sync.Map - 基本原理
37 0
|
6月前
|
Unix Go 开发者
探索Go语言并发模型:原理与实践
本文深入探讨了Go语言的并发模型,包括其设计原理、Goroutine和Channel的基本使用,以及如何在实际项目中高效地应用这些概念来提高程序的性能和可维护性。
|
7月前
|
存储 Java Go
Go 语言切片如何扩容?(全面解析原理和过程)
Go 语言切片如何扩容?(全面解析原理和过程)
135 2
|
7月前
|
负载均衡 监控 Go
Golang深入浅出之-Go语言中的服务网格(Service Mesh)原理与应用
【5月更文挑战第5天】服务网格是处理服务间通信的基础设施层,常由数据平面(代理,如Envoy)和控制平面(管理配置)组成。本文讨论了服务发现、负载均衡和追踪等常见问题及其解决方案,并展示了使用Go语言实现Envoy sidecar配置的例子,强调Go语言在构建服务网格中的优势。服务网格能提升微服务的管理和可观测性,正确应对问题能构建更健壮的分布式系统。
468 1