背景
伴随着微服务架构被宣传得如火如茶,一些概念也被推到了我们的面前。一提到微服务,就离不开这几个字:高内聚低耦合;微服务的架构设计最终目的也就是实现这几个字。在微服务架构中,微服务就是完成一个单一的业务功能,每个微服务可以独立演进,一个应用可能会有多个微服务组成,微服务之间的数据交可以通过远程调用来完成,这样在一个微服务架构下就会形成这样的依赖关系:
微服务A调用微服务C、D,微服务B又依赖微服务B、E,微服务D依赖于服务F,这只是一个简单的小例子,实际业务中服务之间的依赖关系比这还复杂,这样在调用链路上如果某个微服务的调用响应时间过长或者不可用,那么对上游服务(按调用关系命名)的调用就会占用越来越多的系统资源,进而引起系统崩溃,这就是微服务的雪蹦效应。
为了解决微服务的雪蹦效应,提出来使用熔断机制为微服务链路提供保护机制。熔断机制大家应该都不陌生,电路的中保险丝就是一种熔断机制,在微服务中的熔断机制是什么样的呢?
当链路中的某个微服务不可用或者响应的时间太长时,会进行服务的降级,进而熔断该节点微服务的调用,快速返回错误的响应信息,当检测到该节点微服务调用响应正常后,恢复调用链路。
本文我们就介绍一个开源熔断框架:hystrix-go。
熔断框架(hystrix-go)
Hystrix是一个延迟和容错库,旨在隔离对远程系统、服务和第三方服务的访问点,停止级联故障并在故障不可避免的复杂分布式系统中实现弹性。hystrix-go 旨在允许 Go 程序员轻松构建具有与基于 Java 的 Hystrix 库类似的执行语义的应用程序。所以本文就从使用开始到源码分析一下hystrix-go。
快速安装
goget-ugithub.com/afex/hystrix-go/hystrix
快速使用
hystrix-go真的是开箱即用,使用还是比较简单的,主要分为两个步骤:
配置熔断规则,否则将使用默认配置。可以调用的方法
funcConfigure(cmdsmap[string]CommandConfig)funcConfigureCommand(namestring,configCommandConfig)
Configure
方法内部也是调用的ConfigureCommand
方法,就是传参数不一样,根据自己的代码风格选择。
定义依赖于外部系统的应用程序逻辑 - runFunc
和服务中断期间执行的逻辑代码 - fallbackFunc
,可以调用的方法:
funcGo(namestring,runrunFunc,fallbackfallbackFunc)//内部调用Goc方法funcGoC(ctxcontext.Context,namestring,runrunFuncC,fallbackfallbackFuncC)funcDo(namestring,runrunFunc,fallbackfallbackFunc)//内部调用的是Doc方法funcDoC(ctxcontext.Context,namestring,runrunFuncC,fallbackfallbackFuncC)//内部调用Goc方法,处理了异步过程
Go
和Do
的区别在于异步还是同步,Do
方法在调用Doc
方法内处理了异步过程,他们最终都是调用的Goc
方法。后面我们进行分析。
举一个例子:我们在Gin
框架上加一个接口级的熔断中间件
//代码已上传github:文末查看地址varCircuitBreakerName="api_%s_circuit_breaker"funcCircuitBreakerWrapper(ctx*gin.Context){name:=fmt.Sprintf(CircuitBreakerName,ctx.Request.URL)hystrix.Do(name,func()error{ctx.Next()code:=ctx.Writer.Status()ifcode!=http.StatusOK{returnerrors.New(fmt.Sprintf("statuscode%d",code))}returnnil},func(errerror)error{iferr!=nil{//监控上报(未实现)_,_=io.WriteString(f,fmt.Sprintf("circuitBreakeranderris%s\n",err.Error()))//写入文件(字符串)fmt.Printf("circuitBreakeranderris%s\n",err.Error())//返回熔断错误ctx.JSON(http.StatusServiceUnavailable,gin.H{"msg":err.Error(),})}returnnil})}funcinit(){hystrix.ConfigureCommand(CircuitBreakerName,hystrix.CommandConfig{Timeout:int(3*time.Second),//执行command的超时时间为3sMaxConcurrentRequests:10,//command的最大并发量RequestVolumeThreshold:100,//统计窗口10s内的请求数量,达到这个请求数量后才去判断是否要开启熔断SleepWindow:int(2*time.Second),//当熔断器被打开后,SleepWindow的时间就是控制过多久后去尝试服务是否可用了ErrorPercentThreshold:20,//错误百分比,请求数量大于等于RequestVolumeThreshold并且错误率到达这个百分比后就会启动熔断})ifcheckFileIsExist(filename){//如果文件存在f,errfile=os.OpenFile(filename,os.O_APPEND,0666)//打开文件}else{f,errfile=os.Create(filename)//创建文件}}funcmain(){deferf.Close()hystrixStreamHandler:=hystrix.NewStreamHandler()hystrixStreamHandler.Start()gohttp.ListenAndServe(net.JoinHostPort("","81"),hystrixStreamHandler)r:=gin.Default()r.GET("/api/ping/baidu",func(c*gin.Context){_,err:=http.Get("https://www.baidu.com")iferr!=nil{c.JSON(http.StatusInternalServerError,gin.H{"msg":err.Error()})return}c.JSON(http.StatusOK,gin.H{"msg":"success"})},CircuitBreakerWrapper)r.Run()//listenandserveon0.0.0.0:8080(forwindows"localhost:8080")}funccheckFileIsExist(filenamestring)bool{if_,err:=os.Stat(filename);os.IsNotExist(err){returnfalse}returntrue}
指令:wrk -t100 -c100 -d1s http://127.0.0.1:8080/api/ping/baidu
运行结果:
circuitBreakeranderrisstatuscode500circuitBreakeranderrisstatuscode500.....circuitBreakeranderrishystrix:maxconcurrencycircuitBreakeranderrishystrix:maxconcurrency.....circuitBreakeranderrishystrix:circuitopencircuitBreakeranderrishystrix:circuitopen.....
对错误进行分析:
circuitBreaker and err is status code 500
:因为我们关闭了网络,所以请求是没有响应的
circuitBreaker and err is hystrix: max concurrency
:我们设置的最大并发量MaxConcurrentRequests
是10
,我们的压测工具使用的是100并发,所有会触发这个熔断
circuitBreaker and err is hystrix: circuit open
:我们设置熔断开启的请求数量RequestVolumeThreshold
是100
,所以当10
s内的请求数量大于100
时就会触发熔断。
简单对上面的例子做一个解析:
添加接口级的熔断中间件
初始化熔断相关配置
开启dashboard
可视化hystrix的上报信息,浏览器打开http://localhost:81
,可以看到如下结果:
hystrix-go
流程分析
本来想对源码进行分析,代码量有点大,所以就针对流程来分析,顺便看一些核心代码。
配置熔断规则
既然是熔断,就要有熔断规则,我们可以调用两个方法配置熔断规则,不会最终调用的都是ConfigureCommand
,这里没有特别的逻辑,如果我们没有配置,系统将使用默认熔断规则:
var(//DefaultTimeoutishowlongtowaitforcommandtocomplete,inmillisecondsDefaultTimeout=1000//DefaultMaxConcurrentishowmanycommandsofthesametypecanrunatthesametimeDefaultMaxConcurrent=10//DefaultVolumeThresholdistheminimumnumberofrequestsneededbeforeacircuitcanbetrippedduetohealthDefaultVolumeThreshold=20//DefaultSleepWindowishowlong,inmilliseconds,towaitafteracircuitopensbeforetestingforrecoveryDefaultSleepWindow=5000//DefaultErrorPercentThresholdcausescircuitstoopenoncetherollingmeasureoferrorsexceedsthispercentofrequestsDefaultErrorPercentThreshold=50//DefaultLoggeristhedefaultloggerthatwillbeusedintheHystrixpackage.Bydefaultprintsnothing.DefaultLogger=NoopLogger{})
配置规则如下:
Timeout
:定义执行command的超时时间,时间单位是ms
,默认时间是1000ms
;
MaxConcurrnetRequests
:定义command的最大并发量,默认值是10
并发量;
SleepWindow
:熔断器被打开后使用,在熔断器被打开后,根据SleepWindow
设置的时间控制多久后尝试服务是否可用,默认时间为5000ms
;
RequestVolumeThreshold
:判断熔断开关的条件之一,统计10s
(代码中写死了)内请求数量,达到这个请求数量后再根据错误率判断是否要开启熔断;
ErrorPercentThreshold
:判断熔断开关的条件之一,统计错误百分比,请求数量大于等于RequestVolumeThreshold
并且错误率到达这个百分比后就会启动熔断
默认值是50
;
这些规则根据command的name进行区分存放到一个map
中。
执行command
执行command
主要可以调用四个方法,分别是:
funcGo(namestring,runrunFunc,fallbackfallbackFunc)funcGoC(ctxcontext.Context,namestring,runrunFuncC,fallbackfallbackFuncC)funcDo(namestring,runrunFunc,fallbackfallbackFunc)funcDoC(ctxcontext.Context,namestring,runrunFuncC,fallbackfallbackFuncC)
Do
内部调用的Doc
方法,Go
内部调用的是Goc
方法,在Doc
方法内部最终调用的还是Goc
方法,只是在Doc
方法内做了同步逻辑:
funcDoC(ctxcontext.Context,namestring,runrunFuncC,fallbackfallbackFuncC)error{.....省略部分封装代码varerrChanchanerroriffallback==nil{errChan=GoC(ctx,name,r,nil)}else{errChan=GoC(ctx,name,r,f)}select{case<-done:returnnilcaseerr:=<-errChan:returnerr}}
因为他们最终都是调用的Goc
方法,所以我们执行分析Goc
方法的内部逻辑;代码有点长,我们分逻辑来分析:
创建command
对象
cmd:=&command{run:run,fallback:fallback,start:time.Now(),errChan:make(chanerror,1),finished:make(chanbool,1),}//获取熔断器circuit,_,err:=GetCircuit(name)iferr!=nil{cmd.errChan<-errreturncmd.errChan}
介绍一下command
的数据结构:
typecommandstruct{sync.Mutexticket*struct{}starttime.TimeerrChanchanerrorfinishedchanboolcircuit*CircuitBreakerrunrunFuncCfallbackfallbackFuncCrunDurationtime.Durationevents[]string}
字段介绍:
ticket
:用来做最大并发量控制,这个就是一个令牌
start
:记录command
执行的开始时间
errChan
:记录command
执行错误
finished
:标志command
执行结束,用来做协程同步
circuit
:存储熔断器相关信息
run
:应用程序
fallback
:应用程序执行失败后要执行的函数
runDuration
:记录command
执行消耗时间
events
:events
主要是存储事件类型信息,比如执行成功的success
,或者失败的timeout
、context_canceled
等
上段代码重点是GetCircuit
方法,这一步的目的就是获取熔断器,使用动态加载的方式,如果没有就创建一个熔断器,熔断器结构如下:
funcConfigure(cmdsmap[string]CommandConfig)funcConfigureCommand(namestring,configCommandConfig)0
解释一下这几个字段:
name
:熔断器的名字,其实就是创建的command名字
open
:判断熔断器是否打开的标志
forceopen
:手动触发熔断器的开关,单元测试使用
mutex
:使用读写锁保证并发安全
openedOrLastTestedTime
:记录上一次打开熔断器的时间,因为要根据这个时间和SleepWindow
时间来做恢复尝试
executorPool
:用来做流量控制,因为我们有一个最大并发量控制,就是根据这个来做的流量控制,每次请求都要获取令牌
metrics
:用来上报执行状态的事件,通过它把执行状态信息存储到实际熔断器执行各个维度状态 (成功次数,失败次数,超时……) 的数据集合中。
后面会单独分析executorPool
、metrics
的实现逻辑。
定义令牌相关的方法和变量
因为我们有一个条件是最大并发控制,采用的是令牌的方式进行流量控制,每一个请求都要获取一个令牌,使用完毕要把令牌还回去,先看一下这段代码:
funcConfigure(cmdsmap[string]CommandConfig)funcConfigureCommand(namestring,configCommandConfig)1
使用sync.NewCond
创建一个条件变量,用来协调通知你可以归还令牌了。
然后定义一个返回令牌的方法,调用Return
方法归还令牌。
定义上报执行事件的方法
前面我们也提到了,我们的熔断器会上报执行状态的事件,通过它把执行状态信息存储到实际熔断器执行各个维度状态 (成功次数,失败次数,超时……) 的数据集合中。所以要定义一个上报的方法:
funcConfigure(cmdsmap[string]CommandConfig)funcConfigureCommand(namestring,configCommandConfig)2
开启协程一:执行应用程序逻辑 - runFunc
协程一的主要目的就是执行应用程序逻辑:
funcConfigure(cmdsmap[string]CommandConfig)funcConfigureCommand(namestring,configCommandConfig)3
总结一下这个协程:
判断熔断器是否打开,如果打开了熔断器直接进行熔断,不在进行后面的请求
运行应用程序逻辑
开启协程二:同步协程一并监听错误
先看代码:
funcConfigure(cmdsmap[string]CommandConfig)funcConfigureCommand(namestring,configCommandConfig)4
这个协程的逻辑比较清晰明了,目的就是监听业务执行被取消以及超时。
画图总结command执行流程
上面我们都是通过代码来进行分析的,看起来还是有点乱,最后画个图总结一下:
上面我们分析了整个具体流程,接下来我们针对一些核心点就行分析
上报状态事件
hystrix-go
为每一个Command
设置了一个默认统计控制器,用来保存熔断器的所有状态,包括调用次数、失败次数、被拒绝次数等,存储指标结构如下:
funcConfigure(cmdsmap[string]CommandConfig)funcConfigureCommand(namestring,configCommandConfig)5
使用rolling.Number
结构保存状态指标,使用rolling.Timing
保存时间指标。
最终监控上报都依靠metricExchange
来实现,数据结构如下:
funcConfigure(cmdsmap[string]CommandConfig)funcConfigureCommand(namestring,configCommandConfig)6
上报command
的信息结构:
funcConfigure(cmdsmap[string]CommandConfig)funcConfigureCommand(namestring,configCommandConfig)7
说了这么多,大家还是有点懵,其实用一个类图就能表明他们之间的关系:
我们可以看到类mertricExchange
提供了一个Monitor
方法,这个方法主要逻辑就是监听状态事件,然后写入指标,所以整个上报流程就是这个样子:
流量控制
hystrix-go
对流量控制采用的是令牌算法,能得到令牌的就可以执行后继的工作,执行完后要返还令牌。 结构体executorPool
就是hystrix-go
流量控制
的具体实现。字段Max
就是每秒最大的并发值。
funcConfigure(cmdsmap[string]CommandConfig)funcConfigureCommand(namestring,configCommandConfig)8
这里还有一个上报指标,这个又单独实现一套方法用来统计执行数量,比如执行的总数量、最大并发数等,我们依赖画一个类图来表示:
上报执行数量逻辑与上报状态事件的逻辑是一样的,使用channel
进行数据通信的,上报与返还令牌都在Return
方法中:
funcConfigure(cmdsmap[string]CommandConfig)funcConfigureCommand(namestring,configCommandConfig)9
主要逻辑两步:
上报当前可用的令牌数
返回令牌
熔断器
我们最后来分析熔断器中一个比较重要的方法:AllowRequest
,我们在执行Command
是会根据这个方法来判断是否可以执行command
,接下来我们就来看一下这个判断的主要逻辑:
funcGo(namestring,runrunFunc,fallbackfallbackFunc)//内部调用Goc方法funcGoC(ctxcontext.Context,namestring,runrunFuncC,fallbackfallbackFuncC)funcDo(namestring,runrunFunc,fallbackfallbackFunc)//内部调用的是Doc方法funcDoC(ctxcontext.Context,namestring,runrunFuncC,fallbackfallbackFuncC)//内部调用Goc方法,处理了异步过程0
内部就是调用IsOpen()
、allowSingleTest
这两个方法:
IsOpen()
funcGo(namestring,runrunFunc,fallbackfallbackFunc)//内部调用Goc方法funcGoC(ctxcontext.Context,namestring,runrunFuncC,fallbackfallbackFuncC)funcDo(namestring,runrunFunc,fallbackfallbackFunc)//内部调用的是Doc方法funcDoC(ctxcontext.Context,namestring,runrunFuncC,fallbackfallbackFuncC)//内部调用Goc方法,处理了异步过程1
allowSingleTest()
先解释一下为什么要有这个方法,还记得我们之前设置了一个熔断规则中的SleepWindow
吗,如果在开启熔断的情况下,在SleepWindow
时间后进行尝试,这个方法的目的就是干这个的:
funcGo(namestring,runrunFunc,fallbackfallbackFunc)//内部调用Goc方法funcGoC(ctxcontext.Context,namestring,runrunFuncC,fallbackfallbackFuncC)funcDo(namestring,runrunFunc,fallbackfallbackFunc)//内部调用的是Doc方法funcDoC(ctxcontext.Context,namestring,runrunFuncC,fallbackfallbackFuncC)//内部调用Goc方法,处理了异步过程2
这里只看到了熔断器被开启的设置了,但是没有关闭熔断器的逻辑,因为关闭熔断器的逻辑是在上报状态指标的方法ReportEvent
内实现,我们最后再看一下ReportEvent
的实现:
funcGo(namestring,runrunFunc,fallbackfallbackFunc)//内部调用Goc方法funcGoC(ctxcontext.Context,namestring,runrunFuncC,fallbackfallbackFuncC)funcDo(namestring,runrunFunc,fallbackfallbackFunc)//内部调用的是Doc方法funcDoC(ctxcontext.Context,namestring,runrunFuncC,fallbackfallbackFuncC)//内部调用Goc方法,处理了异步过程3
可视化hystrix的上报信息
通过上面的分析我们知道hystrix-go
上报了状态事件、执行数量事件,那么这些指标我们可以怎么查看呢?
设计者早就想到了这个问题,所以他们做了一个dashborad
,可以查看hystrix
的上报信息,使用方法只需在服务启动时添加如下代码:
funcGo(namestring,runrunFunc,fallbackfallbackFunc)//内部调用Goc方法funcGoC(ctxcontext.Context,namestring,runrunFuncC,fallbackfallbackFuncC)funcDo(namestring,runrunFunc,fallbackfallbackFunc)//内部调用的是Doc方法funcDoC(ctxcontext.Context,namestring,runrunFuncC,fallbackfallbackFuncC)//内部调用Goc方法,处理了异步过程4
然后打开浏览器:http://127.0.0.1:81/hystrix-dashboard,进行观测吧。
总结
故事终于接近尾声了,一个熔断机制的实现确实不简单,要考虑的因素也是方方面面,尤其在微服务架构下,熔断机制是必不可少的,不仅要在框架层面实现熔断机制,还要根据具体业务场景使用熔断机制,这些都是值得我们深思熟虑的。本文介绍的熔断框架实现的还是比较完美的,这种优秀的设计思路值得我们学习。
文中代码已上传github
:github地址
作者:Golang梦工厂