一.rpcx介绍
1.1 rpc是什么
远程过程调用的通信协议。该协议允许运行于一台计算机的程序调用另一台计算机的子程序,而程序员无需额外地为这个交互作用编程。如果涉及的软件采用面向对象编程,那么远程过程调用亦可称作远程调用或远程方法调用。简单地说就是能使应用像调用本地方法一样的调用远程的过程或服务。很显然,这是一种client-server的交互形式,调用者是client,执行者是server。
一个完整的rpc的调用过程如下:
一次完整的RPC调用流程(同步调用,异步另说)如下:
- 服务消费方(client)调用以本地调用方式调用服务;
- client stub接收到调用后负责将方法、参数等组装成能够进行网络传输的消息体;
- client stub找到服务地址,并将消息发送到服务端;
- server stub收到消息后进行解码;
- server stub根据解码结果调用本地的服务;
- 本地服务执行并将结果返回给server stub;
- server stub将返回结果打包成消息并发送至消费方;
- client stub接收到消息,并进行解码;
- 服务消费方得到最终结果。
1.2 rpcx介绍
RPC只是描绘了 Client 与 Server 之间的点对点调用流程,包括 stub、通信、RPC 消息解析等部分,在实际应用中,还需要考虑服务的高可用、负载均衡等问题,所以产品级的 RPC 框架除了点对点的 RPC 协议的具体实现外,还应包括服务的发现与注销、提供服务的多台 Server 的负载均衡、服务的高可用等更多的功能。目前的 RPC 框架大致有两种不同的侧重方向,一种偏重于服务治理,另一种偏重于跨语言调用。 然而rpcx 属于服务治理类型,是一个基于 Go 开发的高性能的轻量级 RPC 框架。
1.2.1 rpcx特点
- 基于net/rpc,可以将net/rpc实现的RPC项目轻松的转换为分布式的RPC
- 插件式设计,可以配置所需的插件,比如服务发现、日志、统计分析等
- 基于TCP长连接,只需很小的额外的消息头
- 支持多种编解码协议,如Gob、Json、MessagePack、gencode、ProtoBuf等
- 服务发现:服务发布、订阅、通知等,支持多种发现方式如ZooKeeper、Etcd等
- 高可用策略:失败重试(Failover)、快速失败(Failfast)
- 负载均衡:支持随机请求、轮询、低并发优先、一致性 Hash等
- 规模可扩展,可以根据性能的需求增减服务器
- 其他:调用统计、访问日志等
- 功能都可以通过插件的方式完成。
1.2.2 rpcx架构
rpcx中有服务提供者 RPC Server,服务调用者 RPC Client 和服务注册中心 Registry 三个角色。
- Server 向 Registry 注册服务,并向注册中心发送心跳汇报状态(基于不同的registry有不同的实现)。
- Client 需要向注册中心查询 RPC 服务者列表,Client 根据 Registry 返回的服务者列表,选取其中一个 Sever进行 RPC 调用。
- 当 Server 发生宕机时,Registry 会监测到服务者不可用(zookeeper session机制或者手工心跳),Client感知后会对本地的服务列表作相应调整。client可能被动感知(zookeeper)或者主动定时拉取。
可选地,Server可以定期向Registry汇报调用统计信息,Client可以根据调用次数选择压力最小的Server。
当前rpcx支持zookeeper, etcd等注册中心。rpcx基于Go net/rpc的底层实现, Client和Server之间通讯是通过TCP进行通讯的,它们之间通过Client发送Request,Server返回Response实现。Request和Response消息的格式都是Header+Body的格式。Header和Body具体的格式根据编码方式的不同而不同,可以是二进制,也可以是结构化数据如JSON。
1.2.3 容错
Client提供了两种容错方式: Failfast、Failover、Failtry:
- Failfast: 如果Client调用失败,立即返回,不会重试。
- Failover: 如果Client调用失败,会尝试从服务列表中选择另外一个服务器调用,直到成功或者到达重试次数。
- Failtry: 如果Client调用失败,会继续这个服务器重试,直到成功或者到达重试次数。
1.2.4 重选算法
- 随机选择: 随机选择一个服务器并返回,可能和上一次的重复
- RoundRobin: 按顺序选择一个服务器。
- 一致性哈希 :使用Jump Consistent Hash algorithm。
- CallLeast : 根据调用次数选择压力最小的服务器。
1.2.5 序列化
- gob: 官方提供的序列化方式,基于一个包含元数据的流
- jsonrpc :也是官方提供的编码库,以JSON格式传输
- msgp: 类似json格式的编码,但是更小更快,可以直接编码struct
- gencode: 一个超级快的序列化库,需要定义schema,但是定义方式和struct类似
- protobuf: Google推出的广受关注的序列化库,推荐使用gogo-protobuf,可以获得更高的性能
二. rpcx-server
2.1 server结构
先看一下官方Server定义的结构体:
// Server is rpcx server that use TCP or UDP. type Server struct { ln net.Listener //监听 readTimeout time.Duration //读取client数据的超时时间 writeTimeout time.Duration //写入client数据的超时时间 gatewayHTTPServer *http.Server DisableHTTPGateway bool //使用HTTP网关 DisableJSONRPC bool //使用json-rpc serviceMapMu sync.RWMutex serviceMap map[string]*service //server端提供的service的记录表 mu sync.RWMutex activeConn map[net.Conn]struct{} // 存活的连接 doneChan chan struct{} // server完成管道 seq uint64 // server端编号 inShutdown int32 onShutdown []func(s *Server) //禁止一个套接字的IO // TLSConfig for creating tls tcp connection. tlsConfig *tls.Config // tcp连接的配置 // BlockCrypt for kcp.BlockCrypt options map[string]interface{} //kip协议时提供的一些限制 // CORS options corsOptions *CORSOptions Plugins PluginContainer //插件管理,通过实现插件注册插件,增加server的特性 // AuthFunc can be used to auth. AuthFunc func(ctx context.Context, req *protocol.Message, token string) error //认证 handlerMsgNum int32 //处理消息量 }
2.2 server 启动
接下来,定义服务,并启动:
首先你应使用 NewServer 来创建一个服务器实例。其次你可以调用 Serve 或者 ServeHTTP 来监听请求。
package main import ( "flag" example "github.com/rpcx-ecosystem/rpcx-examples3" "github.com/smallnest/rpcx/server" ) var ( addr = flag.String("addr", "localhost:8972", "server address") ) func main() { flag.Parse() s := server.NewServer() //s.RegisterName("Arith", new(example.Arith), "") s.Register(new(example.Arith), "") s.Serve("tcp", *addr) }
在你定义完服务后,你会想将它暴露出去来使用。你应该通过启动一个TCP或UDP服务器来监听请求。
服务器支持以如下这些方式启动,监听请求和关闭:
func NewServer(options ...OptionFn) *Server func (s *Server) Close() error func (s *Server) RegisterOnShutdown(f func()) func (s *Server) Serve(network, address string) (err error) func (s *Server) ServeHTTP(w http.ResponseWriter, req *http.Request)
rpcx 提供了 3个 OptionFn 来设置启动选项:
func WithReadTimeout(readTimeout time.Duration) OptionFn func WithTLSConfig(cfg *tls.Config) OptionFn func WithWriteTimeout(writeTimeout time.Duration) OptionFn
可以设置 读超时、写超时和tls证书。
- ServeHTTP 将服务通过HTTP暴露出去。
- Serve 通过TCP或UDP协议与客户端通信。
2.3.rpcx-server类图
三. rpcx-client
3.1 xClient 及其初始化
type xClient struct { failMode FailMode selectMode SelectMode cachedClient map[string]RPCClient breakers sync.Map servicePath string option Option mu sync.RWMutex servers map[string]string discovery ServiceDiscovery selector Selector isShutdown bool auth string Plugins PluginContainer ch chan []*KVPair serverMessageChan chan<- *protocol.Message }
客户端在初始化时会根据参数FailMode、SelectMode、Discovery、Option来确定调用失败后处理模式、路由选择的模式、发现服务器列表以及可选配置项。FailMode和SelectMode为服务治理 (失败模式与路由选择)的选项定义。在大规模的RPC系统中,许多服务节点在提供同一个服务。
3.2 FailMode
如果调用失败,客户端应该选择另一个节点或者立即返回错误,失败处理模式FailMode仅对同步调用有效(xClient.Call),而异步调用(xClient.Go)无效,FailMode一共有下面几种值可选择:
type FailMode int const ( // 自动选择另一台服务器 Failover FailMode = iota // 立即返回错误 Failfast // 再次使用当前客户端 Failtry // 如果第一台服务器在指定时间内没有快速响应,则选择另一台服务器 Failbackup )
3.3 SelectMode
路由选择模式SelectMode则有下面几种情况可选择:
// SelectMode 定义从候选者中选择服务的算法 type SelectMode int const ( // 随机选择:从服务节点中随机选择一个节点。由于节点是随机选择,所以并不能保证节点之间负载的均匀 RandomSelect SelectMode = iota // 轮询模式:从服务节点列表中逐个选择依次使用,能保证每个节点均匀被访问,在节点服务能力相差不大时适用。 RoundRobin // 加权轮询模式:使用基于权重的轮询算法 WeightedRoundRobin // 加权网络质量优先,客户端会基于ping(ICMP) 探测各个节点的网络质量,网络质量越好则节点的权重也就越高。 WeightedICMP // 一致性Hash:使用 JumpConsistentHash 选择节点, 相同的servicePath, serviceMethod 和参数会路由到同一个节点上。 JumpConsistentHash 是一个快速计算一致性哈希的算法,但是有一个缺陷是它不能删除节点,如果删除节点,路由需要重新计算一致性哈希。 ConsistentHash // 最近的服务器:它要求服务在注册的时候要设置它所在的地理经纬度 Closest // 通过用户进行选择 SelectByUser = 1000 )
3.4 Option
一些其他的配置选项:
type Option struct { Group string Retries int //重试次数 TLSConfig *tls.Config Block interface{} RPCPath string ConnectTimeout time.Duration //超时时间 ReadTimeout time.Duration WriteTimeout time.Duration BackupLatency time.Duration GenBreaker func() Breaker SerializeType protocol.SerializeType //默认通信协议 CompressType protocol.CompressType Heartbeat bool //是否启动心跳 HeartbeatInterval time.Duration //心跳的超时时间 }
注意:TCP有保活机制,为什么还需要在应用层维持心跳包,这个跟服务端虽然使用的TCP连接IRC服务器,但依然在上层封装Ping和Pong的原理是一样的。tcp的keep-alive默认是7200秒,也就是2小时,首先是检测时间太长,这么长的时间只能检测连接是否存在并不能检测数据是否能正常收发。而且keep-alive的数据包如果碰到四层负载均衡的中继设备,TCP内部的包会被中继设备接收并不会传到对端,TCP内部的数据包才会被转发。
3.5 调用
func (c *xClient) Call(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) error func (c *xClient) Go(ctx context.Context, serviceMethod string, args interface{}, reply interface{},done chan *Call) (*Call, error) func (c *xClient) Fork(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) error func (c *xClient) Broadcast(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) error
- Go()方法是异步调用,在异步调用中,失败模式FailMode将会不起作用,它即时返回一个Call结构体实例。
- Call()方法是同步调用,也是最常用的调用方式,它会根据选择器确定服务器,支持失败模式FailMode,可以设置Option可选项,来进行远程调用,直到服务器返回数据或者超时。
- Broadcast()方法将请求发送到该服务的所有节点。如果所有的节点都正常返回才算成功。只有在所有节点没有错误的情况下, Broadcast()方法将返回其中的一个节点的返回信息。 如果有节点返回错误的话,Broadcast()方法将返回这些错误信息中的一个。失败模式FailMode和路由选择SelectMode在该方法中都不会生效,最好设置超时避免程序挂起。
- Fork()方法将请求发送到该服务的所有节点。如果有任何一个节点正常返回,则成功,Fork()方法将返回其中的一个节点的返回结果。 如果所有节点返回错误的话,Fork()方法将返回这些错误信息中的一个。失败模式FailMode和路由选择SelectMode在该方法中都不会生效。
3.6 示例
package main import ( "context" "flag" "log" example "github.com/rpcx-ecosystem/rpcx-examples3" "github.com/smallnest/rpcx/client" ) var ( addr = flag.String("addr", "localhost:8972", "server address") ) func main() { flag.Parse() d := client.NewPeer2PeerDiscovery("tcp@"+*addr, "") xclient := client.NewXClient("Arith", client.Failtry, client.RandomSelect, d, client.DefaultOption) defer xclient.Close() args := &example.Args{ A: 10, B: 20, } reply := &example.Reply{} err := xclient.Call(context.Background(), "Mul", args, reply) if err != nil { log.Fatalf("failed to call: %v", err) } log.Printf("%d * %d = %d", args.A, args.B, reply.C) }