前言
假如我有钱,我想买一个降噪耳机,我应该哪里买?
答案很简单,可以去京东或者线下实体店。 那如果把这个问题映射到微服务架构中:我打开京东,选中某款耳机进入详情页浏览,我可以看到这款耳机的价格、库存、规格、评价等。以我的理解,这个链路应该是这样的:
暂定这个系统由3个微服务组成:商品详情服务、库存服务、评价服务。
商品详情服务:聚合端上用户看到的所有信息
库存服务:维护商品的库存信息、规格信息、价格信息
评价服务:维护用户对商品的评价
微服务的目的是为了基于松耦合高内聚将单体服务进行拆分,然后将个服务进行多副本部署(我们甚至不知道它会被部署到哪里,实体机?虚拟机?容器?云上?)以达到高可用的目的。这也要付出点代价,商品详情服务需要知道:库存服务和评价服务在哪里?
由此,我们将继续学习 Consul 这款不错的服务发现工具,通前面的学习,我们已经对 Consul 的原理、使用、搭建有了认知。本次将学习:Consul 如何在 gRPC 构建的微服务网络环境中做一名合格的“指路人”。
编写一个 Go gRPC 服务
gRPC 是由 Google 开发并开源的RPC框架,详见官网。我们将通过官网的指导来编写一个简单的 go gRPC 服务
获取样例代码
克隆grpc-go
仓库
$gitclone-bv1.29.1.0https://github.com/grpc/grpc-go
切换到样例代码目录
$cdcdgrpc-go/examples/helloworld
目录结构如下:
├──greeter_client│└──main.go├──helloword│└──helloword.proto└──greeter_server|└──main.go
运行样例代码
编译执行 server 代码:
$gorungreeter_server/main.go
在新开一个终端,编译执行 client 代码,可以看到输出:
$gorungreeter_client/main.go2021/09/1116:28:29Greeting:Helloworld
gRPC 的 Banlancer
在greeter_client/main.go
中,是通过指定 server 地址的方式来实现访问到目标服务的
...const(address="localhost:50051")funcmain(){conn,err:=grpc.Dial(address,grpc.WithInsecure(),grpc.WithBlock())...}
但这种方式在生产环境是不可行的,因为我们并不知道目标服务的地址(目标服务的地址也有可能不只一个)。实际上,gRPC 已经为我们提供来解决方案:Balancer。
首先,看一下 gRPC 客户端负载均衡实现的官方架构图:
从图中,可以看到 Balancer 均衡器位于架构的最右方,内置一个 Picker 模块,Balancer 主要完成下面几个功能:
与 Rersovler 通信(维持通信机制),接收 Resovler 通知的服务端列表更新,维护 Connection Pool 及每个连接的状态
对上一步获取的服务端列表,调用newSubConn
异步建立长连接(每个 Backend 一个长连接),同时,监控连接的状态,及时更新 Connection Pool
创建 Picker,Picker 执行的算法就是真正的 LB 逻辑,当客户端使用conn
初始化 PRC 方法时,通过 Picker 选择一个存活的连接,返回给客户端,然后调用 UpdatePicker 更新 LB 算法的内置状态,为下一次调用做准备
Balaner 是 gRPC 负载均衡最核心的模块
据此,我们可用通过自定义的 Balancer,在 Balaner 基础上通过实现自定义的naming.Resolver
来达到使用 Consul 看发现服务的功能。
大概流程是:
grpc 在 Dial 的时候通过 WithBalancer 传入 Balancer
Balaner 会通过 naming.Resolver 去解析 (Resovle)Dial 传入的 target 得到一个nameing.Watcher
naming.Watcher
持续监视 target 解析到地址列表的变更并通过 Next 返回给 Balancer
实现 Consul Resolver
在grpc-go/naming/naming.go
中可以看到Resolver
接口的声明
typeResolverinterface{//ResolvecreatesaWatcherfortarget.Resolve(targetstring)(Watcher,error)}
需要实现一个Consul Resolver
,在里面返回可用的服务端地址列表,在examples
目录下新建grpcresolver
文件夹,在该文件夹下新建consul.go
文件:
packagegrpcresolverimport("fmt""net""strconv""sync""sync/atomic""github.com/hashicorp/consul/api""google.golang.org/grpc/naming")typewatchEntrystruct{addrstringmodiuint64lastuint64}typeconsulWatcherstruct{downint32c*api.Clientservicestringmusync.Mutexwatchedmap[string]*watchEntrylastIndexuint64}func(w*consulWatcher)Close(){atomic.StoreInt32(&w.down,1)}func(w*consulWatcher)Next()([]*naming.Update,error){w.mu.Lock()deferw.mu.Unlock()watched:=w.watchedlastIndex:=w.lastIndexretry://访问Consul,获取可用的服务列表services,meta,err:=w.c.Catalog().Service(w.service,"",&api.QueryOptions{WaitIndex:lastIndex,})iferr!=nil{returnnil,err}iflastIndex==meta.LastIndex{ifatomic.LoadInt32(&w.down)!=0{returnnil,nil}gotoretry}lastIndex=meta.LastIndexvarupdating[]*naming.Updatefor_,s:=rangeservices{ws:=watched[s.ServiceID]fmt.Println(s.ServiceAddress,s.ServicePort)ifws==nil{//如果是新注册的服务ws=&watchEntry{addr:net.JoinHostPort(s.ServiceAddress,strconv.Itoa(s.ServicePort)),modi:s.ModifyIndex,}watched[s.ServiceID]=wsupdating=append(updating,&naming.Update{Op:naming.Add,Addr:ws.addr,})}elseifws.modi!=s.ModifyIndex{//如果是原来的服务updating=append(updating,&naming.Update{Op:naming.Delete,Addr:ws.addr,})ws.addr=net.JoinHostPort(s.ServiceAddress,strconv.Itoa(s.ServicePort))ws.modi=s.ModifyIndexupdating=append(updating,&naming.Update{Op:naming.Add,Addr:ws.addr,})}ws.last=lastIndex}forid,ws:=rangewatched{ifws.last!=lastIndex{delete(watched,id)updating=append(updating,&naming.Update{Op:naming.Delete,Addr:ws.addr,})}}w.watched=watchedw.lastIndex=lastIndexreturnupdating,nil}typeconsulResolverapi.Clientfunc(r*consulResolver)Resolve(targetstring)(naming.Watcher,error){return&consulWatcher{c:(*api.Client)(r),service:target,watched:make(map[string]*watchEntry),},nil}funcForConsul(reg*api.Client)naming.Resolver{return(*consulResolver)(reg)}
server 端通过 Consul 注册服务
修改examples/helloword/greeter_server/main.go
,在启动服务前,将服务的信息注册到 Consul
packagemainimport("context""encoding/hex""flag""fmt""log""math/rand""net""strconv""time""github.com/hashicorp/consul/api""google.golang.org/grpc"pb"google.golang.org/grpc/examples/helloworld/helloworld")const(//host="192.168.10.102"//port=50051ttl=30*time.Second)//serverisusedtoimplementhelloworld.GreeterServer.typeserverstruct{pb.UnimplementedGreeterServerportint}//SayHelloimplementshelloworld.GreeterServerfunc(s*server)SayHello(ctxcontext.Context,in*pb.HelloRequest)(*pb.HelloReply,error){log.Printf("Received:%v",in.GetName())return&pb.HelloReply{Message:fmt.Sprintf("Hello%s,from%d",in.GetName(),s.port)},nil}funcmain(){host:=flag.String("h","127.0.0.1","host")port:=flag.Int("p",50051,"port")flag.Parse()lis,err:=net.Listen("tcp",net.JoinHostPort(*host,strconv.Itoa(*port)))iferr!=nil{log.Fatalf("failedtolisten:%v",err)}//ConsulClientregistry,err:=api.NewClient(api.DefaultConfig())iferr!=nil{log.Fatalln(err)}varh[16]byterand.Read(h[:])//生成一个全局IDid:=fmt.Sprintf("helloserver-%s-%d",hex.EncodeToString(h[:]),*port)fmt.Println(id)//注册到Consul,包含地址、端口信息,以及健康检查err=registry.Agent().ServiceRegister(&api.AgentServiceRegistration{ID:id,Name:"helloserver",Port:*port,Address:*host,Check:&api.AgentServiceCheck{TTL:(ttl+time.Second).String(),Timeout:time.Minute.String(),},})iferr!=nil{log.Fatalln(err)}gofunc(){checkid:="service:"+idforrangetime.Tick(ttl){err:=registry.Agent().PassTTL(checkid,"")iferr!=nil{log.Fatalln(err)}}}()s:=grpc.NewServer()pb.RegisterGreeterServer(s,&server{port:*port})iferr:=s.Serve(lis);err!=nil{log.Fatalf("failedtoserve:%v",err)}}
client 端通过 Consul 发现服务
packagemainimport("context""log""time""github.com/hashicorp/consul/api""google.golang.org/grpc""google.golang.org/grpc/examples/grpcresolver"pb"google.golang.org/grpc/examples/helloworld/helloworld")const(address="localhost:50051"defaultName="world")funcmain(){//consulregistry,err:=api.NewClient(api.DefaultConfig())iferr!=nil{log.Fatalln(err)}//自定义LB,并使用刚才写的ConsulResolverlbrr:=grpc.RoundRobin(grpcresolver.ForConsul(registry))//Setupaconnectiontotheserver.conn,err:=grpc.Dial("helloserver",grpc.WithInsecure(),grpc.WithBalancer(lbrr))iferr!=nil{log.Fatalf("didnotconnect:%v",err)}deferconn.Close()c:=pb.NewGreeterClient(conn)//调用server端RPC,通过响应观察负载均衡forrangetime.Tick(time.Second){name:=defaultNamer,err:=c.SayHello(context.Background(),&pb.HelloRequest{Name:name})iferr!=nil{log.Fatalf("couldnotgreet:%v",err)continue}log.Printf("serverreply:%s",r.GetMessage())}}
启动 & 查看
启动两个 Server,设置不同的启动端口
$cdcdgrpc-go/examples/helloworld0
$cdcdgrpc-go/examples/helloworld1
通过 Consul Web UI 查看,两个 instance 均是健康的
启动 Client
$cdcdgrpc-go/examples/helloworld2
可以看到是均匀对两个 server 发起调用,当我们将其中一个 instance server2 关掉(模拟不可用的情况),流量全面全部转移到另一台上了
说明失败转移也是正常的。
作者:Zioyi