这篇文章介绍的transport不是我们学习的重点,因为你可能用不到他,但是作为学习go-micro的一部分,还是得提一下。
transport用于服务间通信,基于socket的send/recv语义。其接口的方法集如下:
typeTransportinterface{Init(...Option)errorOptions()OptionsDial(addrstring,opts...DialOption)(Client,error)Listen(addrstring,opts...ListenOption)(Listener,error)String()string}var(DefaultTransportTransport=NewHTTPTransport()DefaultDialTimeout=time.Second*5)
主要的作用体现在Dial
和Listen
方法中,
func(h*httpTransport)Dial(addrstring,opts...DialOption)(Client,error){dopts:=DialOptions{Timeout:DefaultDialTimeout,}for_,opt:=rangeopts{opt(&dopts)}varconnnet.Connvarerrerror//TODO:supportdialoptionhereratherthanusinginternalconfigifh.opts.Secure||h.opts.TLSConfig!=nil{config:=h.opts.TLSConfigifconfig==nil{config=&tls.Config{InsecureSkipVerify:true,}}config.NextProtos=[]string{"http/1.1"}conn,err=newConn(func(addrstring)(net.Conn,error){returntls.DialWithDialer(&net.Dialer{Timeout:dopts.Timeout},"tcp",addr,config)})(addr)}else{conn,err=newConn(func(addrstring)(net.Conn,error){returnnet.DialTimeout("tcp",addr,dopts.Timeout)})(addr)}iferr!=nil{returnnil,err}return&httpTransportClient{ht:h,addr:addr,conn:conn,buff:bufio.NewReader(conn),dialOpts:dopts,r:make(chan*http.Request,1),local:conn.LocalAddr().String(),remote:conn.RemoteAddr().String(),},nil}
在net/http
源码中,我们也学习过Dial
方法,他的作用是连接服务端,并返回一个代表连接的conn.
其实这里也是差不多的,只不过代表连接的conn放在一个client结构体中,这个结构体实现了transport.Client
接口
typeSocketinterface{Recv(*Message)errorSend(*Message)errorClose()errorLocal()stringRemote()string}typeClientinterface{Socket}
Listen方法根据addr和ListenOption参数进行了处理,然后组装成httpTransportListener
。该结构体实现了
typeListenerinterface{Addr()stringClose()errorAccept(func(Socket))error}
func(h*httpTransport)Listen(addrstring,opts...ListenOption)(Listener,error){varoptionsListenOptionsfor_,o:=rangeopts{o(&options)}varlnet.Listenervarerrerror//TODO:supportuseoflistenoptionsifh.opts.Secure||h.opts.TLSConfig!=nil{config:=h.opts.TLSConfigfn:=func(addrstring)(net.Listener,error){ifconfig==nil{hosts:=[]string{addr}//checkifitsavalidhost:portifhost,_,err:=net.SplitHostPort(addr);err==nil{iflen(host)==0{hosts=maddr.IPs()}else{hosts=[]string{host}}}//generateacertificatecert,err:=mls.Certificate(hosts...)iferr!=nil{returnnil,err}config=&tls.Config{Certificates:[]tls.Certificate{cert}}}returntls.Listen("tcp",addr,config)}l,err=mnet.Listen(addr,fn)}else{fn:=func(addrstring)(net.Listener,error){returnnet.Listen("tcp",addr)}l,err=mnet.Listen(addr,fn)}iferr!=nil{returnnil,err}return&httpTransportListener{ht:h,listener:l,},nil}
在httpTransportListener
的Accept
方法中,启动了一个http server来监听请求并处理。
func(h*httpTransportListener)Accept(fnfunc(Socket))error{//createhandlermuxmux:=http.NewServeMux()//registerourtransporthandlermux.HandleFunc("/",func(whttp.ResponseWriter,r*http.Request){varbuf*bufio.ReadWritervarconnet.Conn//readaregularrequestifr.ProtoMajor==1{b,err:=ioutil.ReadAll(r.Body)iferr!=nil{http.Error(w,err.Error(),http.StatusInternalServerError)return}r.Body=ioutil.NopCloser(bytes.NewReader(b))//hijacktheconnhj,ok:=w.(http.Hijacker)if!ok{//we'rescrewedhttp.Error(w,"cannotserveconn",http.StatusInternalServerError)return}conn,bufrw,err:=hj.Hijack()iferr!=nil{http.Error(w,err.Error(),http.StatusInternalServerError)return}deferconn.Close()buf=bufrwcon=conn}//bufferedreaderbufr:=bufio.NewReader(r.Body)//savetherequestch:=make(chan*http.Request,1)ch<-r//createanewtransportsocketsock:=&httpTransportSocket{ht:h.ht,w:w,r:r,rw:buf,buf:bufr,ch:ch,conn:con,local:h.Addr(),remote:r.RemoteAddr,closed:make(chanbool),}//executethesocketfn(sock)})//getoptionalhandlersifh.ht.opts.Context!=nil{handlers,ok:=h.ht.opts.Context.Value("http_handlers").(map[string]http.Handler)ifok{forpattern,handler:=rangehandlers{mux.Handle(pattern,handler)}}}//defaulthttp2serversrv:=&http.Server{Handler:mux,}//insecureconnectionuseh2cif!(h.ht.opts.Secure||h.ht.opts.TLSConfig!=nil){srv.Handler=h2c.NewHandler(mux,&http2.Server{})}//beginservingreturnsrv.Serve(h.listener)}
在Accept
方法中,启动的Server劫持了客户端请求,并重新创建了一个httpTransportSocket
,然后根据Accept
方法的函数参数去执行该socket
.
我们自定义一个服务,来测试一下,如何使用transport
.
//Packagemainpackagemainimport("context""time"hello"github.com/asim/go-micro/examples/v3/greeter/srv/proto/hello""github.com/asim/go-micro/v3""github.com/asim/go-micro/v3/util/log""google.golang.org/grpc")typeSaystruct{}func(s*Say)Hello(ctxcontext.Context,req*hello.Request,rsp*hello.Response)error{log.Log("ReceivedSay.Hellorequest")rsp.Msg="Hello"+req.Namereturnnil}funcmain(){gofunc(){for{grpc.DialContext(context.TODO(),"127.0.0.1:9091")time.Sleep(time.Second)}}()service:=micro.NewService(micro.Name("go.micro.srv.greeter"),)//optionallysetupcommandlineusageservice.Init()//RegisterHandlershello.RegisterSayHandler(service.Server(),new(Say))//Runserveriferr:=service.Run();err!=nil{log.Fatal(err)}}
有关服务间通信使用的proto结构体
syntax="proto3";packagego.micro.srv.greeter;serviceSay{rpcHello(Request)returns(Response){}}messageRequest{stringname=1;}messageResponse{stringmsg=1;}
启动服务的时候,通过日志输出,可以大致的了解启动过程
2021-09-2715:47:40file=v3@v3.5.2-0.20210630062103-c13bb07171bc/service.go:199level=infoStarting[service]go.micro.srv.greeter2021-09-2715:47:40file=server/rpc_server.go:820level=infoTransport[http]Listeningon[::]:648732021-09-2715:47:40file=server/rpc_server.go:840level=infoBroker[http]Connectedto127.0.0.1:648742021-09-2715:47:40file=server/rpc_server.go:654level=infoRegistry[mdns]Registeringnode:go.micro.srv.greeter-042f3737-1410-4a86-9fe5-8f23fc5cc05b
service.Init()
做初始化的时候,会把所有的无提供Options的服务进行默认的处理 service.Run()
启动service,
iflogger.V(logger.InfoLevel,logger.DefaultLogger){logger.Infof("Starting[service]%s",s.Name())}
然后启动server.默认的server是newRpcServer
.rpcServer
启动的过程如下:
在transport上进行监听
swap address
连接broker
注册前检查
然后就是for循环监听conn上的请求,并处理
ts.Accept
方法我们在之前的内容中说过,在我们的server劫持到请求后,组装新的socket让rpcServer.ServeConn
方法来进行处理,在服务移除的时候,会关闭transport
的监听
func(h*httpTransport)Dial(addrstring,opts...DialOption)(Client,error){dopts:=DialOptions{Timeout:DefaultDialTimeout,}for_,opt:=rangeopts{opt(&dopts)}varconnnet.Connvarerrerror//TODO:supportdialoptionhereratherthanusinginternalconfigifh.opts.Secure||h.opts.TLSConfig!=nil{config:=h.opts.TLSConfigifconfig==nil{config=&tls.Config{InsecureSkipVerify:true,}}config.NextProtos=[]string{"http/1.1"}conn,err=newConn(func(addrstring)(net.Conn,error){returntls.DialWithDialer(&net.Dialer{Timeout:dopts.Timeout},"tcp",addr,config)})(addr)}else{conn,err=newConn(func(addrstring)(net.Conn,error){returnnet.DialTimeout("tcp",addr,dopts.Timeout)})(addr)}iferr!=nil{returnnil,err}return&httpTransportClient{ht:h,addr:addr,conn:conn,buff:bufio.NewReader(conn),dialOpts:dopts,r:make(chan*http.Request,1),local:conn.LocalAddr().String(),remote:conn.RemoteAddr().String(),},nil}0
在ServeConn
中涉及的sock参数的接收和发送都是我们httpTransportSocket
中定义的动作,具体操作,可以详细的看看源码。
然后实现一个简单的客户端请求
func(h*httpTransport)Dial(addrstring,opts...DialOption)(Client,error){dopts:=DialOptions{Timeout:DefaultDialTimeout,}for_,opt:=rangeopts{opt(&dopts)}varconnnet.Connvarerrerror//TODO:supportdialoptionhereratherthanusinginternalconfigifh.opts.Secure||h.opts.TLSConfig!=nil{config:=h.opts.TLSConfigifconfig==nil{config=&tls.Config{InsecureSkipVerify:true,}}config.NextProtos=[]string{"http/1.1"}conn,err=newConn(func(addrstring)(net.Conn,error){returntls.DialWithDialer(&net.Dialer{Timeout:dopts.Timeout},"tcp",addr,config)})(addr)}else{conn,err=newConn(func(addrstring)(net.Conn,error){returnnet.DialTimeout("tcp",addr,dopts.Timeout)})(addr)}iferr!=nil{returnnil,err}return&httpTransportClient{ht:h,addr:addr,conn:conn,buff:bufio.NewReader(conn),dialOpts:dopts,r:make(chan*http.Request,1),local:conn.LocalAddr().String(),remote:conn.RemoteAddr().String(),},nil}1
在客户端调用服务端方法的时候,这就涉及到client方法调用流程,默认的Client实现是rpcClient
,最主要的部分是它的call方法
func(h*httpTransport)Dial(addrstring,opts...DialOption)(Client,error){dopts:=DialOptions{Timeout:DefaultDialTimeout,}for_,opt:=rangeopts{opt(&dopts)}varconnnet.Connvarerrerror//TODO:supportdialoptionhereratherthanusinginternalconfigifh.opts.Secure||h.opts.TLSConfig!=nil{config:=h.opts.TLSConfigifconfig==nil{config=&tls.Config{InsecureSkipVerify:true,}}config.NextProtos=[]string{"http/1.1"}conn,err=newConn(func(addrstring)(net.Conn,error){returntls.DialWithDialer(&net.Dialer{Timeout:dopts.Timeout},"tcp",addr,config)})(addr)}else{conn,err=newConn(func(addrstring)(net.Conn,error){returnnet.DialTimeout("tcp",addr,dopts.Timeout)})(addr)}iferr!=nil{returnnil,err}return&httpTransportClient{ht:h,addr:addr,conn:conn,buff:bufio.NewReader(conn),dialOpts:dopts,r:make(chan*http.Request,1),local:conn.LocalAddr().String(),remote:conn.RemoteAddr().String(),},nil}2
该方法首先组装transport.Message
消息体,该消息体包含两部分,一部分为Header
,一部分为Body
. 组装完成后进行消息编码,然后通过rpcStream
方式进行发送请求和接收响应。 以rpcStream
的Recv
方法为例,我们可以发现,其读取请求的消息是通过codec
,
func(h*httpTransport)Dial(addrstring,opts...DialOption)(Client,error){dopts:=DialOptions{Timeout:DefaultDialTimeout,}for_,opt:=rangeopts{opt(&dopts)}varconnnet.Connvarerrerror//TODO:supportdialoptionhereratherthanusinginternalconfigifh.opts.Secure||h.opts.TLSConfig!=nil{config:=h.opts.TLSConfigifconfig==nil{config=&tls.Config{InsecureSkipVerify:true,}}config.NextProtos=[]string{"http/1.1"}conn,err=newConn(func(addrstring)(net.Conn,error){returntls.DialWithDialer(&net.Dialer{Timeout:dopts.Timeout},"tcp",addr,config)})(addr)}else{conn,err=newConn(func(addrstring)(net.Conn,error){returnnet.DialTimeout("tcp",addr,dopts.Timeout)})(addr)}iferr!=nil{returnnil,err}return&httpTransportClient{ht:h,addr:addr,conn:conn,buff:bufio.NewReader(conn),dialOpts:dopts,r:make(chan*http.Request,1),local:conn.LocalAddr().String(),remote:conn.RemoteAddr().String(),},nil}3
codec
读取消息其实就是调用client
的Recv
,这个client
就是我们开头提到的httpTransportClient
.
func(h*httpTransport)Dial(addrstring,opts...DialOption)(Client,error){dopts:=DialOptions{Timeout:DefaultDialTimeout,}for_,opt:=rangeopts{opt(&dopts)}varconnnet.Connvarerrerror//TODO:supportdialoptionhereratherthanusinginternalconfigifh.opts.Secure||h.opts.TLSConfig!=nil{config:=h.opts.TLSConfigifconfig==nil{config=&tls.Config{InsecureSkipVerify:true,}}config.NextProtos=[]string{"http/1.1"}conn,err=newConn(func(addrstring)(net.Conn,error){returntls.DialWithDialer(&net.Dialer{Timeout:dopts.Timeout},"tcp",addr,config)})(addr)}else{conn,err=newConn(func(addrstring)(net.Conn,error){returnnet.DialTimeout("tcp",addr,dopts.Timeout)})(addr)}iferr!=nil{returnnil,err}return&httpTransportClient{ht:h,addr:addr,conn:conn,buff:bufio.NewReader(conn),dialOpts:dopts,r:make(chan*http.Request,1),local:conn.LocalAddr().String(),remote:conn.RemoteAddr().String(),},nil}4