go

时间:2023-11-30 本站 点击:0

在微服务体系中,任何一个服务都可以使用多个实例进行部署,那么对于一个请求发送到哪个实例来进行处理,需要一个策略来选择处理节点。

Selector是基于注册中心registry来进行节点选择和状态标记的。在选择过程中可以使用不同的算法进行选择。

Selector接口定义为

//Selectorbuildsontheregistryasamechanismtopicknodes//andmarktheirstatus.Thisallowshostpoolsandotherthings//tobebuiltusingvariousalgorithms.typeSelectorinterface{Init(opts...Option)errorOptions()Options//Select返回一个函数,返回值函数应该返回下一个节点Select(servicestring,opts...SelectOption)(Next,error)//Marksetsthesuccess/erroragainstanodeMark(servicestring,node*registry.Node,errerror)//ResetreturnsstatebacktozeroforaserviceReset(servicestring)//CloserenderstheselectorunusableClose()error//NameoftheselectorString()string}

如果需要不同的选择策略, 可以自定义选择策略

//Nextisafunctionthatreturnsthenextnode//basedontheselector'sstrategytypeNextfunc()(*registry.Node,error)//FilterisusedtofilteraserviceduringtheselectionprocesstypeFilterfunc([]*registry.Service)[]*registry.Service//Strategyisaselectionstrategye.grandom,roundrobintypeStrategyfunc([]*registry.Service)Next

selector默认的实现

funcNewSelector(opts...Option)Selector{sopts:=Options{Strategy:Random,}for_,opt:=rangeopts{opt(&sopts)}ifsopts.Registry==nil{sopts.Registry=registry.DefaultRegistry}s:=&registrySelector{so:sopts,}s.rc=s.newCache()returns}

在默认的实现中,节点选择策略为随机选择

funcinit(){rand.Seed(time.Now().UnixNano())}//RandomisarandomstrategyalgorithmfornodeselectionfuncRandom(services[]*registry.Service)Next{nodes:=make([]*registry.Node,0,len(services))for_,service:=rangeservices{nodes=append(nodes,service.Nodes...)}returnfunc()(*registry.Node,error){iflen(nodes)==0{returnnil,ErrNoneAvailable}i:=rand.Int()%len(nodes)returnnodes[i],nil}}

节点选择流程如下:

根据指定的服务名,获取服务列表

根据服务选项中的过滤器,进行过滤

如果还存在多个节点,那么根据选择策略,选择一个节点返回

func(c*registrySelector)Select(servicestring,opts...SelectOption)(Next,error){sopts:=SelectOptions{Strategy:c.so.Strategy,}for_,opt:=rangeopts{opt(&sopts)}//gettheservice//trythecachefirst//ifthatfailsgodirectlytotheregistryservices,err:=c.rc.GetService(service)iferr!=nil{iferr==registry.ErrNotFound{returnnil,ErrNotFound}returnnil,err}//applythefiltersfor_,filter:=rangesopts.Filters{services=filter(services)}//ifthere'snothingleft,returniflen(services)==0{returnnil,ErrNoneAvailable}returnsopts.Strategy(services),nil}

从examples中介绍的客户端负载案例中,我们可以看看如何去使用

funcmain(){cmd.Init()client.DefaultClient=client.NewClient(client.Selector(FirstNodeSelector()),)fmt.Println("\n---Callexample---")fori:=0;i<10;i++{call(i)}}

FirstNodeSelector的返回值是一个firstNodeSelector负载器,他是Selector接口的实现者,逻辑非常简单,实现的效果就是永远选择服务列表中的第一个

//BuiltinrandomhashednodeselectortypefirstNodeSelectorstruct{optsselector.Options}func(n*firstNodeSelector)Init(opts...selector.Option)error{for_,o:=rangeopts{o(&n.opts)}returnnil}func(n*firstNodeSelector)Options()selector.Options{returnn.opts}func(n*firstNodeSelector)Select(servicestring,opts...selector.SelectOption)(selector.Next,error){services,err:=n.opts.Registry.GetService(service)iferr!=nil{returnnil,err}iflen(services)==0{returnnil,selector.ErrNotFound}varsoptsselector.SelectOptionsfor_,opt:=rangeopts{opt(&sopts)}for_,filter:=rangesopts.Filters{services=filter(services)}iflen(services)==0{returnnil,selector.ErrNotFound}iflen(services[0].Nodes)==0{returnnil,selector.ErrNotFound}returnfunc()(*registry.Node,error){returnservices[0].Nodes[0],nil},nil}func(n*firstNodeSelector)Mark(servicestring,node*registry.Node,errerror){return}func(n*firstNodeSelector)Reset(servicestring){return}func(n*firstNodeSelector)Close()error{returnnil}func(n*firstNodeSelector)String()string{return"first"}

继续看client的代码, 在实例化client的时候,初始化我们的选择器,这也体现了,只要我们是实现了Selector接口,自己定义选择器实现,也是没问题的。

client.DefaultClient=client.NewClient(client.Selector(FirstNodeSelector()),)

在服务调用时,从下面的client.Call方法实现,可以看看整个流程是如何运作的

funccall(iint){//Createnewrequesttoservicego.micro.srv.example,methodExample.Callreq:=client.NewRequest("go.micro.srv.example","Example.Call",&example.Request{Name:"John",})rsp:=&example.Response{}//Callserviceiferr:=client.Call(context.Background(),req,rsp);err!=nil{fmt.Println("callerr:",err,rsp)return}fmt.Println("Call:",i,"rsp:",rsp.Msg)}

如果你使用的grpc,

func(g*grpcClient)Call(ctxcontext.Context,reqclient.Request,rspinterface{},opts...client.CallOption)error{ifreq==nil{returnerrors.InternalServerError("go.micro.client","reqisnil")}elseifrsp==nil{returnerrors.InternalServerError("go.micro.client","rspisnil")}//makeacopyofcalloptscallOpts:=g.opts.CallOptionsfor_,opt:=rangeopts{opt(&callOpts)}next,err:=g.next(req,callOpts)iferr!=nil{returnerr}//checkifwealreadyhaveadeadlined,ok:=ctx.Deadline()if!ok{//nodeadlinesowecreateanewonevarcancelcontext.CancelFuncctx,cancel=context.WithTimeout(ctx,callOpts.RequestTimeout)defercancel()}else{//gotadeadlinesononeedtosetupcontext//butweneedtosetthetimeoutwepassalongopt:=client.WithRequestTimeout(time.Until(d))opt(&callOpts)}//shouldwenooprighthere?select{case<-ctx.Done():returnerrors.New("go.micro.client",fmt.Sprintf("%v",ctx.Err()),408)default:}//makecopyofcallmethodgcall:=g.call//wrapthecallinreversefori:=len(callOpts.CallWrappers);i>0;i--{gcall=callOpts.CallWrappers[i-1](gcall)}//returnerrors.New("go.micro.client","requesttimeout",408)call:=func(iint)error{//callbackofffirst.Someonemaywantaninitialstartdelayt,err:=callOpts.Backoff(ctx,req,i)iferr!=nil{returnerrors.InternalServerError("go.micro.client",err.Error())}//onlysleepifgreaterthan0ift.Seconds()>0{time.Sleep(t)}//selectnextnodenode,err:=next()service:=req.Service()iferr!=nil{iferr==selector.ErrNotFound{returnerrors.InternalServerError("go.micro.client","service%s:%s",service,err.Error())}returnerrors.InternalServerError("go.micro.client","errorselecting%snode:%s",service,err.Error())}//makethecallerr=gcall(ctx,node,req,rsp,callOpts)g.opts.Selector.Mark(service,node,err)ifverr,ok:=err.(*errors.Error);ok{returnverr}returnerr}ch:=make(chanerror,callOpts.Retries+1)vargerrerrorfori:=0;i<=callOpts.Retries;i++{gofunc(iint){ch<-call(i)}(i)select{case<-ctx.Done():returnerrors.New("go.micro.client",fmt.Sprintf("%v",ctx.Err()),408)caseerr:=<-ch://ifthecallsucceededletsbailearlyiferr==nil{returnnil}retry,rerr:=callOpts.Retry(ctx,req,i,err)ifrerr!=nil{returnrerr}if!retry{returnerr}gerr=err}}returngerr}

他会调用next方法,来获取节点

//Nextisafunctionthatreturnsthenextnode//basedontheselector'sstrategytypeNextfunc()(*registry.Node,error)//FilterisusedtofilteraserviceduringtheselectionprocesstypeFilterfunc([]*registry.Service)[]*registry.Service//Strategyisaselectionstrategye.grandom,roundrobintypeStrategyfunc([]*registry.Service)Next0

在真正的调用之前,会进行节点选择

//Nextisafunctionthatreturnsthenextnode//basedontheselector'sstrategytypeNextfunc()(*registry.Node,error)//FilterisusedtofilteraserviceduringtheselectionprocesstypeFilterfunc([]*registry.Service)[]*registry.Service//Strategyisaselectionstrategye.grandom,roundrobintypeStrategyfunc([]*registry.Service)Next1

以上就是节点选择器的大致内容,如果想要了解更详细的内容,可以去这里go-micro看更详细的代码。


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:/Golang/4532.html