Rpcx (一):详解【介绍、基础示例 demo】

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
云原生网关 MSE Higress,422元/月
网络型负载均衡 NLB,每月750个小时 15LCU
简介: Rpcx (一):详解【介绍、基础示例 demo】

一.rpcx介绍

1.1 rpc是什么

  远程过程调用的通信协议。该协议允许运行于一台计算机的程序调用另一台计算机的子程序,而程序员无需额外地为这个交互作用编程。如果涉及的软件采用面向对象编程,那么远程过程调用亦可称作远程调用或远程方法调用。简单地说就是能使应用像调用本地方法一样的调用远程的过程或服务。很显然,这是一种client-server的交互形式,调用者是client,执行者是server。

一个完整的rpc的调用过程如下:

一次完整的RPC调用流程(同步调用,异步另说)如下:

  1. 服务消费方(client)调用以本地调用方式调用服务;
  2. client stub接收到调用后负责将方法、参数等组装成能够进行网络传输的消息体;
  3. client stub找到服务地址,并将消息发送到服务端;
  4. server stub收到消息后进行解码;
  5. server stub根据解码结果调用本地的服务;
  6. 本地服务执行并将结果返回给server stub;
  7. server stub将返回结果打包成消息并发送至消费方;
  8. client stub接收到消息,并进行解码;
  9. 服务消费方得到最终结果。

1.2 rpcx介绍

  RPC只是描绘了 Client 与 Server 之间的点对点调用流程,包括 stub、通信、RPC 消息解析等部分,在实际应用中,还需要考虑服务的高可用、负载均衡等问题,所以产品级的 RPC 框架除了点对点的 RPC 协议的具体实现外,还应包括服务的发现与注销、提供服务的多台 Server 的负载均衡、服务的高可用等更多的功能。目前的 RPC 框架大致有两种不同的侧重方向,一种偏重于服务治理,另一种偏重于跨语言调用。 然而rpcx 属于服务治理类型,是一个基于 Go 开发的高性能的轻量级 RPC 框架。

1.2.1 rpcx特点

  • 基于net/rpc,可以将net/rpc实现的RPC项目轻松的转换为分布式的RPC
  • 插件式设计,可以配置所需的插件,比如服务发现、日志、统计分析等
  • 基于TCP长连接,只需很小的额外的消息头
  • 支持多种编解码协议,如Gob、Json、MessagePack、gencode、ProtoBuf等
  • 服务发现:服务发布、订阅、通知等,支持多种发现方式如ZooKeeper、Etcd等
  • 高可用策略:失败重试(Failover)、快速失败(Failfast)
  • 负载均衡:支持随机请求、轮询、低并发优先、一致性 Hash等
  • 规模可扩展,可以根据性能的需求增减服务器
  • 其他:调用统计、访问日志等
  • 功能都可以通过插件的方式完成。


1.2.2 rpcx架构

rpcx中有服务提供者 RPC Server,服务调用者 RPC Client 和服务注册中心 Registry 三个角色。

  • Server 向 Registry 注册服务,并向注册中心发送心跳汇报状态(基于不同的registry有不同的实现)。
  • Client 需要向注册中心查询 RPC 服务者列表,Client 根据 Registry 返回的服务者列表,选取其中一个 Sever进行 RPC 调用。
  • 当 Server 发生宕机时,Registry 会监测到服务者不可用(zookeeper session机制或者手工心跳),Client感知后会对本地的服务列表作相应调整。client可能被动感知(zookeeper)或者主动定时拉取。

可选地,Server可以定期向Registry汇报调用统计信息,Client可以根据调用次数选择压力最小的Server。

当前rpcx支持zookeeper, etcd等注册中心。rpcx基于Go net/rpc的底层实现, Client和Server之间通讯是通过TCP进行通讯的,它们之间通过Client发送Request,Server返回Response实现。Request和Response消息的格式都是Header+Body的格式。Header和Body具体的格式根据编码方式的不同而不同,可以是二进制,也可以是结构化数据如JSON。

1.2.3 容错

Client提供了两种容错方式: Failfast、Failover、Failtry:

  • Failfast: 如果Client调用失败,立即返回,不会重试。
  • Failover: 如果Client调用失败,会尝试从服务列表中选择另外一个服务器调用,直到成功或者到达重试次数。
  • Failtry: 如果Client调用失败,会继续这个服务器重试,直到成功或者到达重试次数。

1.2.4 重选算法

  • 随机选择: 随机选择一个服务器并返回,可能和上一次的重复
  • RoundRobin: 按顺序选择一个服务器。
  • 一致性哈希 :使用Jump Consistent Hash algorithm。
  • CallLeast : 根据调用次数选择压力最小的服务器。

1.2.5 序列化

  • gob: 官方提供的序列化方式,基于一个包含元数据的流
  • jsonrpc :也是官方提供的编码库,以JSON格式传输
  • msgp: 类似json格式的编码,但是更小更快,可以直接编码struct
  • gencode: 一个超级快的序列化库,需要定义schema,但是定义方式和struct类似
  • protobuf: Google推出的广受关注的序列化库,推荐使用gogo-protobuf,可以获得更高的性能

二. rpcx-server

2.1 server结构

先看一下官方Server定义的结构体:

// Server is rpcx server that use TCP or UDP.
type Server struct {
  ln                 net.Listener   //监听
  readTimeout        time.Duration  //读取client数据的超时时间
  writeTimeout       time.Duration  //写入client数据的超时时间
  gatewayHTTPServer  *http.Server   
  DisableHTTPGateway bool //使用HTTP网关
  DisableJSONRPC     bool //使用json-rpc
 
  serviceMapMu sync.RWMutex
  serviceMap   map[string]*service   //server端提供的service的记录表
 
  mu         sync.RWMutex            
  activeConn map[net.Conn]struct{}   // 存活的连接
  doneChan   chan struct{}           // server完成管道
  seq        uint64                  // server端编号
 
  inShutdown int32
  onShutdown []func(s *Server)      //禁止一个套接字的IO
 
  // TLSConfig for creating tls tcp connection.
  tlsConfig *tls.Config            // tcp连接的配置
  // BlockCrypt for kcp.BlockCrypt
  options map[string]interface{}  //kip协议时提供的一些限制
 
  // CORS options
  corsOptions *CORSOptions
 
  Plugins PluginContainer  //插件管理,通过实现插件注册插件,增加server的特性
 
  // AuthFunc can be used to auth.
  AuthFunc func(ctx context.Context, req *protocol.Message, token string) error //认证
 
  handlerMsgNum int32     //处理消息量
}

2.2 server 启动

接下来,定义服务,并启动:

首先你应使用 NewServer 来创建一个服务器实例。其次你可以调用 Serve 或者 ServeHTTP 来监听请求。

package main

import (
    "flag"

    example "github.com/rpcx-ecosystem/rpcx-examples3"
    "github.com/smallnest/rpcx/server"
)

var (
    addr = flag.String("addr", "localhost:8972", "server address")
)

func main() {
    flag.Parse()

    s := server.NewServer()
    //s.RegisterName("Arith", new(example.Arith), "")
    s.Register(new(example.Arith), "")
    s.Serve("tcp", *addr)
}

在你定义完服务后,你会想将它暴露出去来使用。你应该通过启动一个TCP或UDP服务器来监听请求。

服务器支持以如下这些方式启动,监听请求和关闭:

func NewServer(options ...OptionFn) *Server
func (s *Server) Close() error
func (s *Server) RegisterOnShutdown(f func())
func (s *Server) Serve(network, address string) (err error)
func (s *Server) ServeHTTP(w http.ResponseWriter, req *http.Request)

rpcx 提供了 3个 OptionFn 来设置启动选项:

func WithReadTimeout(readTimeout time.Duration) OptionFn
func WithTLSConfig(cfg *tls.Config) OptionFn
func WithWriteTimeout(writeTimeout time.Duration) OptionFn

可以设置 读超时、写超时和tls证书。

  • ServeHTTP 将服务通过HTTP暴露出去。
  • Serve 通过TCP或UDP协议与客户端通信。

2.3.rpcx-server类图

三. rpcx-client

3.1 xClient 及其初始化

type xClient struct {
  failMode     FailMode
  selectMode   SelectMode
  cachedClient map[string]RPCClient
  breakers     sync.Map
  servicePath  string
  option       Option
  mu        sync.RWMutex
  servers   map[string]string
  discovery ServiceDiscovery
  selector  Selector
 
  isShutdown bool
  auth string
  Plugins PluginContainer
  ch chan []*KVPair
  serverMessageChan chan<- *protocol.Message
}

 客户端在初始化时会根据参数FailMode、SelectMode、Discovery、Option来确定调用失败后处理模式、路由选择的模式、发现服务器列表以及可选配置项。FailMode和SelectMode为服务治理 (失败模式与路由选择)的选项定义。在大规模的RPC系统中,许多服务节点在提供同一个服务。

3.2 FailMode

  如果调用失败,客户端应该选择另一个节点或者立即返回错误,失败处理模式FailMode仅对同步调用有效(xClient.Call),而异步调用(xClient.Go)无效,FailMode一共有下面几种值可选择:

type FailMode int
 
const (
  // 自动选择另一台服务器
  Failover FailMode = iota
  // 立即返回错误
  Failfast
  // 再次使用当前客户端
  Failtry
  // 如果第一台服务器在指定时间内没有快速响应,则选择另一台服务器
  Failbackup
)

3.3 SelectMode

  路由选择模式SelectMode则有下面几种情况可选择:

// SelectMode 定义从候选者中选择服务的算法
type SelectMode int
const (
  // 随机选择:从服务节点中随机选择一个节点。由于节点是随机选择,所以并不能保证节点之间负载的均匀
  RandomSelect SelectMode = iota
  // 轮询模式:从服务节点列表中逐个选择依次使用,能保证每个节点均匀被访问,在节点服务能力相差不大时适用。
  RoundRobin
  // 加权轮询模式:使用基于权重的轮询算法
  WeightedRoundRobin
  // 加权网络质量优先,客户端会基于ping(ICMP) 探测各个节点的网络质量,网络质量越好则节点的权重也就越高。
  WeightedICMP
  // 一致性Hash:使用 JumpConsistentHash 选择节点, 相同的servicePath, serviceMethod 和参数会路由到同一个节点上。 JumpConsistentHash 是一个快速计算一致性哈希的算法,但是有一个缺陷是它不能删除节点,如果删除节点,路由需要重新计算一致性哈希。
  ConsistentHash
  // 最近的服务器:它要求服务在注册的时候要设置它所在的地理经纬度
  Closest
 
  // 通过用户进行选择
  SelectByUser = 1000
)

3.4 Option

一些其他的配置选项:

type Option struct {
  Group string
  Retries int   //重试次数
  TLSConfig *tls.Config
  Block interface{}
  RPCPath string
  ConnectTimeout time.Duration  //超时时间
  ReadTimeout time.Duration
  WriteTimeout time.Duration
  BackupLatency time.Duration
  GenBreaker func() Breaker
  SerializeType protocol.SerializeType  //默认通信协议
  CompressType  protocol.CompressType
  Heartbeat         bool       //是否启动心跳
  HeartbeatInterval time.Duration  //心跳的超时时间
}

注意:TCP有保活机制,为什么还需要在应用层维持心跳包,这个跟服务端虽然使用的TCP连接IRC服务器,但依然在上层封装Ping和Pong的原理是一样的。tcp的keep-alive默认是7200秒,也就是2小时,首先是检测时间太长,这么长的时间只能检测连接是否存在并不能检测数据是否能正常收发。而且keep-alive的数据包如果碰到四层负载均衡的中继设备,TCP内部的包会被中继设备接收并不会传到对端,TCP内部的数据包才会被转发。

3.5 调用

func (c *xClient) Call(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) error
func (c *xClient) Go(ctx context.Context, serviceMethod string, args interface{}, reply interface{},done chan *Call) (*Call, error)
func (c *xClient) Fork(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) error
func (c *xClient) Broadcast(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) error
  • Go()方法是异步调用,在异步调用中,失败模式FailMode将会不起作用,它即时返回一个Call结构体实例。
  • Call()方法是同步调用,也是最常用的调用方式,它会根据选择器确定服务器,支持失败模式FailMode,可以设置Option可选项,来进行远程调用,直到服务器返回数据或者超时。
  • Broadcast()方法将请求发送到该服务的所有节点。如果所有的节点都正常返回才算成功。只有在所有节点没有错误的情况下, Broadcast()方法将返回其中的一个节点的返回信息。 如果有节点返回错误的话,Broadcast()方法将返回这些错误信息中的一个。失败模式FailMode和路由选择SelectMode在该方法中都不会生效,最好设置超时避免程序挂起。
  • Fork()方法将请求发送到该服务的所有节点。如果有任何一个节点正常返回,则成功,Fork()方法将返回其中的一个节点的返回结果。 如果所有节点返回错误的话,Fork()方法将返回这些错误信息中的一个。失败模式FailMode和路由选择SelectMode在该方法中都不会生效。

3.6 示例

package main

import (
    "context"
    "flag"
    "log"

    example "github.com/rpcx-ecosystem/rpcx-examples3"
    "github.com/smallnest/rpcx/client"
)

var (
    addr = flag.String("addr", "localhost:8972", "server address")
)

func main() {
    flag.Parse()

    d := client.NewPeer2PeerDiscovery("tcp@"+*addr, "")
    xclient := client.NewXClient("Arith", client.Failtry, client.RandomSelect, d, client.DefaultOption)
    defer xclient.Close()

    args := &example.Args{
        A: 10,
        B: 20,
    }

    reply := &example.Reply{}
    err := xclient.Call(context.Background(), "Mul", args, reply)
    if err != nil {
        log.Fatalf("failed to call: %v", err)
    }

    log.Printf("%d * %d = %d", args.A, args.B, reply.C)

}

相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
相关文章
|
8月前
|
网络性能优化 调度 网络虚拟化
配置HQoS示例
HQoS简介 HQoS通过多级队列进一步细化区分业务流量,对多个用户、多种业务等传输对象进行统一管理和分层调度,在现有的硬件环境下使设备具备内部资源的控制策略,既能够为高级用户提供质量保证,又能够从整体上节约网络建设成本。 交换机的HQoS主要通过流队列和用户队列实现。
136 7
|
8月前
|
SQL JavaScript
js开发:请解释什么是ES6的模板字符串(template string),并给出一个示例。
ES6的模板字符串以反引号包围,支持变量和表达式插入以及多行书写。例如,插入变量值`Hello, ${name}!`,计算表达式`${num1 + num2}`,以及创建多行字符串。模板字符串保留原始空格和缩进,简化了字符串拼接,提高了代码可读性。
70 6
组件配置示例
组件配置示例
59 3
C#简单委托示例——让你一看就会的demo
C#简单委托示例——让你一看就会的demo
|
Cloud Native 架构师 Devops
几个测试示例分享 | 学习笔记
快速学习几个测试示例分享
几个测试示例分享 | 学习笔记
一个完整的 ParentDataWidget 示例
一个完整的 ParentDataWidget 示例
138 0
一个完整的 ParentDataWidget 示例
|
Cloud Native 架构师 机器人
几个测试示例分享|学习笔记
快速学习几个测试示例分享
111 0
几个测试示例分享|学习笔记
|
C++
c++ demo02 类型大小
c++ demo02 类型大小
64 0
|
SQL 缓存 网络协议
【SpringBoot DB 系列】h2databse 集成示例 demo
h2dabase 基于内存的数据库,更常见于嵌入式数据库的使用场景,依赖小,功能齐全;一般来讲,正常的商业项目用到它的场景不多,但是在一些特殊的 case 中,还是比较有用的,比如用于单元测试,业务缓存,一些简单的示例 demo 等;本文将手把手教你创建一个继承 h2dabase 的项目,并支持从 sql 中导入预定好的 schema 和 data
145 0

相关实验场景

更多