[关闭]
@aliasliyu4 2018-08-13T23:23:08.000000Z 字数 5970 阅读 3594

go-hystrix 源码读解

开局一张图
PgsQlq.md.png

下文是github上go-hystrix对与其功能的阐述,其脱胎于java版本的Hystrix库,主要目的是为了解决分布式系统中对于错误的保护,这一点从其熔断的定义也可以明白。

Hystrix is a latency and fault tolerance library designed to isolate points of access to remote systems, services and 3rd party libraries, stop cascading failure and enable resilience in complex distributed systems where failure is inevitable.

I think the Hystrix patterns of programmer-defined fallbacks and adaptive health monitoring are good for any distributed system. Go routines and channels are great concurrency primitives, but don't directly help our application stay available during failures.

hystrix-go aims to allow Go programmers to easily build applications with similar execution semantics of the Java-based Hystrix library.

command config定义

  1. type CommandConfig struct {
  2. Timeout int `json:"timeout"` // 超时时间定义
  3. MaxConcurrentRequests int `json:"max_concurrent_requests"` // 最大并发请求数
  4. RequestVolumeThreshold int `json:"request_volume_threshold"` // 跳闸的最小请求数(不健康的断路器)
  5. SleepWindow int `json:"sleep_window"` // 跳闸之后可以重试的时间
  6. ErrorPercentThreshold int `json:"error_percent_threshold"` // 请求出错比
  7. }

断路器的定义

  1. // CircuitBreaker is created for each ExecutorPool to track whether requests
  2. // should be attempted, or rejected if the Health of the circuit is too low.
  3. type CircuitBreaker struct { // 断路器-定义
  4. Name string // 名字
  5. open bool // 开启与否,关闭"open"=true,开启"open" = false
  6. forceOpen bool // 强制开启
  7. mutex *sync.RWMutex // 读写锁(unblock reading, block writer)
  8. openedOrLastTestedTime int64 // 断路器被打开或者最近一次尝试的时间,尝试指断路器打开之后,系统探测是否可以发送请求。
  9. executorPool *executorPool // 执行池
  10. metrics *metricExchange // 监控断路器
  11. }
  12. var (
  13. circuitBreakersMutex *sync.RWMutex // 断路器锁
  14. circuitBreakers map[string]*CircuitBreaker // 注册断路器,所有的断路器都保存在这里
  15. )
  16. func init() {
  17. circuitBreakersMutex = &sync.RWMutex{} // 初始化断路器锁
  18. circuitBreakers = make(map[string]*CircuitBreaker) // 初始化断路器
  19. }

获取断路器

  1. // GetCircuit returns the circuit for the given command and whether this call created it.
  2. func GetCircuit(name string) (*CircuitBreaker, bool, error) {
  3. circuitBreakersMutex.RLock()
  4. _, ok := circuitBreakers[name]
  5. if !ok {
  6. circuitBreakersMutex.RUnlock()
  7. circuitBreakersMutex.Lock()
  8. defer circuitBreakersMutex.Unlock() // 注意这里同时加了两次🔒且第二把锁是互斥锁,其中一个goroutine hold住并且赋值,锁释放。其他goroutine从内存中获取断路器
  9. if cb, ok := circuitBreakers[name]; ok { // double check,防止其他的goroutine修改了全局变量circuitBreakers
  10. return cb, false, nil
  11. }
  12. circuitBreakers[name] = newCircuitBreaker(name)
  13. } else {
  14. defer circuitBreakersMutex.RUnlock()
  15. }
  16. return circuitBreakers[name], !ok, nil
  17. }

执行池

  1. type executorPool struct { // 执行池
  2. Name string // 名字
  3. Metrics *poolMetrics // 执行池监控
  4. Max int // 最大的并发请求数量
  5. Tickets chan *struct{} // 票证
  6. }
  7. // 开启一个新的执行池
  8. func newExecutorPool(name string) *executorPool {
  9. p := &executorPool{}
  10. p.Name = name // 名字
  11. p.Metrics = newPoolMetrics(name)
  12. p.Max = getSettings(name).MaxConcurrentRequests // 从配置中获取最大的并发请求数量,如果配置中没有,则从默认配置中获取
  13. p.Tickets = make(chan *struct{}, p.Max) // 初始化buffer chan
  14. for i := 0; i < p.Max; i++ {
  15. p.Tickets <- &struct{}{}
  16. }
  17. return p
  18. }
  19. func (p *executorPool) Return(ticket *struct{}) {
  20. if ticket == nil {
  21. return
  22. }
  23. p.Metrics.Updates <- poolMetricsUpdate{
  24. activeCount: p.ActiveCount(),
  25. }
  26. p.Tickets <- ticket
  27. }
  28. func (p *executorPool) ActiveCount() int {
  29. return p.Max - len(p.Tickets)
  30. }

执行函数入口GO,传递进去自定义的run,fallback函数,这个函数会封装ctx,然后继续调用GoC。

  1. func Go(name string, run runFunc, fallback fallbackFunc) chan error {
  2. runC := func(ctx context.Context) error { // 匿名run带ctx
  3. return run()
  4. }
  5. var fallbackC fallbackFuncC
  6. if fallback != nil {
  7. fallbackC = func(ctx context.Context, err error) error { // 匿名fallback带ctx
  8. return fallback(err)
  9. }
  10. }
  11. return GoC(context.Background(), name, runC, fallbackC)
  12. }

主要的执行逻辑GoC,提供了异步的执行。

  1. func GoC(ctx context.Context, name string, run runFuncC, fallback fallbackFuncC) chan error {
  2. cmd := &command{ // command执行者
  3. run: run, // run
  4. fallback: fallback, // fallback
  5. start: time.Now(), // 开始时间
  6. errChan: make(chan error, 1), // 错误
  7. finished: make(chan bool, 1), // 是否完成
  8. }
  9. // dont have methods with explicit params and returns
  10. // let data come in and out naturally, like with any closure
  11. // explicit error return to give place for us to kill switch the operation (fallback)
  12. circuit, _, err := GetCircuit(name) // 获取断路器
  13. if err != nil {
  14. cmd.errChan <- err
  15. return cmd.errChan
  16. }
  17. cmd.circuit = circuit
  18. ticketCond := sync.NewCond(cmd) // cond条件,具体使用见下文参考
  19. ticketChecked := false
  20. returnTicket := func() { //
  21. cmd.Lock()
  22. // Avoid releasing before a ticket is acquired.
  23. for !ticketChecked {
  24. ticketCond.Wait() // 相当于select{}
  25. }
  26. cmd.circuit.executorPool.Return(cmd.ticket) // 将ticket放回池子中
  27. cmd.Unlock()
  28. }
  29. returnOnce := &sync.Once{} // 确保被multi goroutine执行一次
  30. reportAllEvent := func() { // events采集,后续dashboard使用
  31. err := cmd.circuit.ReportEvent(cmd.events, cmd.start, cmd.runDuration)
  32. if err != nil {
  33. log.Printf(err.Error())
  34. }
  35. }
  36. // g1, 检测断路器不允许通过,尝试fallback,将中途遇到的event上报。
  37. go func() {
  38. defer func() { cmd.finished <- true }()
  39. if !cmd.circuit.AllowRequest() {
  40. cmd.Lock()
  41. // It's safe for another goroutine to go ahead releasing a nil ticket.
  42. ticketChecked = true
  43. ticketCond.Signal()
  44. cmd.Unlock()
  45. returnOnce.Do(func() {
  46. returnTicket()
  47. cmd.errorWithFallback(ctx, ErrCircuitOpen)
  48. reportAllEvent()
  49. })
  50. return
  51. }
  52. cmd.Lock()
  53. select {
  54. case cmd.ticket = <-circuit.executorPool.Tickets: // 从池子中取出ticket
  55. ticketChecked = true
  56. ticketCond.Signal() // 通知cond.Wait()
  57. cmd.Unlock()
  58. default:
  59. ticketChecked = true
  60. ticketCond.Signal()
  61. cmd.Unlock()
  62. returnOnce.Do(func() {
  63. returnTicket()
  64. cmd.errorWithFallback(ctx, ErrMaxConcurrency) // 并发过高
  65. reportAllEvent()
  66. })
  67. return
  68. }
  69. runStart := time.Now()
  70. runErr := run(ctx)
  71. returnOnce.Do(func() {
  72. defer reportAllEvent()
  73. cmd.runDuration = time.Since(runStart) // 运行时间
  74. returnTicket() // 把ticket返回去
  75. if runErr != nil {
  76. cmd.errorWithFallback(ctx, runErr) // fall back!
  77. return
  78. }
  79. cmd.reportEvent("success") // 执行成功
  80. })
  81. }()
  82. go func() {
  83. timer := time.NewTimer(getSettings(name).Timeout)
  84. defer timer.Stop()
  85. select {
  86. case <-cmd.finished: // 结束select
  87. // returnOnce has been executed in another goroutine
  88. case <-ctx.Done(): // 处理ctx错误
  89. returnOnce.Do(func() {
  90. returnTicket()
  91. cmd.errorWithFallback(ctx, ctx.Err())
  92. reportAllEvent()
  93. })
  94. return
  95. case <-timer.C: // 处理超时
  96. returnOnce.Do(func() {
  97. returnTicket()
  98. cmd.errorWithFallback(ctx, ErrTimeout)
  99. reportAllEvent()
  100. })
  101. return
  102. }
  103. }()
  104. return cmd.errChan // 错误返回至上层
  105. }

同步的执行方式,如果需要。

  1. func Do(name string, run runFunc, fallback fallbackFunc) error {
  2. runC := func(ctx context.Context) error {
  3. return run()
  4. }
  5. var fallbackC fallbackFuncC
  6. if fallback != nil {
  7. fallbackC = func(ctx context.Context, err error) error {
  8. return fallback(err)
  9. }
  10. }
  11. return DoC(context.Background(), name, runC, fallbackC) // 具体逻辑如上GoC
  12. }

参考:

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注