go微服务框架go-micro深度学习(五) stream 调用过程详解

本文涉及的产品
云原生网关 MSE Higress,422元/月
注册配置 MSE Nacos/ZooKeeper,118元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介:     上一篇写了一下rpc调用过程的实现方式,简单来说就是服务端把实现了接口的结构体对象进行反射,抽取方法,签名,保存,客户端调用的时候go-micro封请求数据,服务端接收到请求时,找到需要调用调用的对象和对应的方法,利用反射进行调用,返回数据。

    上一篇写了一下rpc调用过程的实现方式,简单来说就是服务端把实现了接口的结构体对象进行反射,抽取方法,签名,保存,客户端调用的时候go-micro封请求数据,服务端接收到请求时,找到需要调用调用的对象和对应的方法,利用反射进行调用,返回数据。 但是没有说stream的实现方式,感觉单独写一篇帖子来说这个更好一些。上一篇帖子是基础,理解了上一篇,stream实现原理一点即破。先说一下使用方式,再说原理。
当前go-micro对 rpc 调用的方式大概如下:
普通的rpc调用 是这样:

1.连接服务器或者从缓存池得到连接
2.客户端 ->发送数据 -> 服务端接收
3.服务端 ->返回数据 -> 客户端处理数据
4.关闭连接或者把连接返回到缓存池

当前 rps stream的实现方式 是这样子:

1. 连接服务器
2. 客户端多次发送请求-> 服务端接收
3. 服务端多次返回数据-> 客户端处理数据
4. 关闭连接

    当数据量比较大的时候我们可以用stream方式分批次传输数据。对于客户端还是服务端没有限制,我们可以根据自己的需要使用stream方式,使用方式也非常的简单,在定义接口的时候在参数或者返回值前面加上stream然后就可以多次进行传输了,使用的代码还是之前写的例子,代码都在github上:
    比如我的例子中定义了两个使用stream的接口,一个只在返回值使用stream,另一个是在参数和返回值前都加上了stream,最终的使用方式没有区别

    rpc Stream(model.SRequest) returns (stream model.SResponse) {}
    rpc BidirectionalStream(stream model.SRequest) returns (stream model.SResponse) {}

看一下go-micro为我们生成的代码rpcapi.micro.go里,不要被吓到,生成了很多代码,但是没啥理解不了的
Server端

// Server API for Say service
type SayHandler interface {
    // .... others    
    Stream(context.Context, *model.SRequest, Say_StreamStream) error
    BidirectionalStream(context.Context, Say_BidirectionalStreamStream) error
}
type Say_StreamStream interface {
    SendMsg(interface{}) error
    RecvMsg(interface{}) error
    Close() error
    Send(*model.SResponse) error
}
type Say_BidirectionalStreamStream interface {
    SendMsg(interface{}) error
    RecvMsg(interface{}) error
    Close() error
    Send(*model.SResponse) error
    Recv() (*model.SRequest, error)
}
// .... others 

Client端

// Client API for Say service
type SayService interface {    
    //... others
    Stream(ctx context.Context, in *model.SRequest, opts ...client.CallOption) (Say_StreamService, error)
    BidirectionalStream(ctx context.Context, opts ...client.CallOption) (Say_BidirectionalStreamService, error)
}

type Say_StreamService interface {
    SendMsg(interface{}) error
    RecvMsg(interface{}) error
    Close() error
    Recv() (*model.SResponse, error)
}

type Say_BidirectionalStreamService interface {
    SendMsg(interface{}) error
    RecvMsg(interface{}) error
    Close() error
    Send(*model.SRequest) error
    Recv() (*model.SResponse, error)
}

    你会发现参数前面加了 Stream后,生成的代码会把你的参数变成一个接口,这个接口主要要的方法是

    SendMsg(interface{}) error
    RecvMsg(interface{}) error
    Close() error

剩下的两个接口方法是根据你是发送还是接收生成的,如果有发送就会有Send(你的参数),如果有接收会生成Rev() (你的参数, error),但这两个方法只是为了让你使用时方便,里面调用的还是SendMsg(interface)和RecvMsg(interface)方法,但是他们是怎么工作的,如何多次发送和接收传输的数据,是不是感觉很神奇。

我就以TsBidirectionalStream 方法为例开始分析,上一篇和再早之前的帖子已经说了服务端启动的时候都做了哪些操作,这里就不再赘述,
服务端的实现,很简单,不断的获取客户端发过来的数据,再给客户端一次一次的返回一些数据。

/*
 模拟数据
 */
func (s *Say) BidirectionalStream(ctx context.Context, stream rpcapi.Say_BidirectionalStreamStream) error {
    for {
        req, err := stream.Recv()
        if err == io.EOF {
            break
        }
        if err != nil {
            return err
        }
        for i := int64(0); i < req.Count; i++ {
            if err := stream.Send(&model.SResponse{Value: []string {lib.RandomStr(lib.Random(3, 6))}}); err != nil {
                return err
            }
        }
    }
    return nil
}

启动服务,服务开始监听客户端传过来的数据.....
客户端调用服务端方法:

// 调用 
func TsBidirectionalStream(client rpcapi.SayService) {
    rspStream, err := client.BidirectionalStream(context.Background())
    if err != nil {
        panic(err)
    }
    // send
    go func() {
        rspStream.Send(&model.SRequest{Count: 2})
        rspStream.Send(&model.SRequest{Count: 5})
        // close the stream
        if err := rspStream.Close(); err != nil {
            fmt.Println("stream close err:", err)
        }
    }()
     // recv
    idx := 1
    for  {
        rsp, err := rspStream.Recv()

        if err == io.EOF {
            break
        } else if err != nil {
            panic(err)
        }

        fmt.Printf("test stream get idx %d  data  %v\n", idx, rsp)
        idx++
    }
    fmt.Println("Read Value End")
}

当客户端在调用rpc的stream方法是要很得到stream

rspStream, err := client.BidirectionalStream(context.Background())
// 
func (c *sayService) BidirectionalStream(ctx context.Context, opts ...client.CallOption) (Say_BidirectionalStreamService, error) {
    req := c.c.NewRequest(c.name, "Say.BidirectionalStream", &model.SRequest{})
    stream, err := c.c.Stream(ctx, req, opts...)
    if err != nil {
        return nil, err
    }
    return &sayServiceBidirectionalStream{stream}, nil
}

这个调用c.c.Stream(ctx, req, opts...)是关键,他的内部实现就是和服务器进行连接,然后返回一个stream,进行操作。

客户端:和服务端建立连接,返回Stream,进行接收和发送数据
服务端:接收客户端连接请求,利用反射找到相应的方法,组织Strem,传给方法,进行数据的发送和接收

建立连接的时候就是一次rpc调用,服务端接受连接,然后客户端发送一次调用,但是传输的是空数据,服务端利用反射找到具体的方法,组织stream,调用具体方法,利用这个连接,客户端和服务端进行多次通信。

目录
相关文章
|
8天前
|
Dubbo Java 应用服务中间件
微服务框架Dubbo环境部署实战
微服务框架Dubbo环境部署的实战指南,涵盖了Dubbo的概述、服务部署、以及Dubbo web管理页面的部署,旨在指导读者如何搭建和使用Dubbo框架。
55 17
微服务框架Dubbo环境部署实战
|
6天前
|
存储 Java Maven
从零到微服务专家:用Micronaut框架轻松构建未来架构
【9月更文挑战第5天】在现代软件开发中,微服务架构因提升应用的可伸缩性和灵活性而广受欢迎。Micronaut 是一个轻量级的 Java 框架,适合构建微服务。本文介绍如何从零开始使用 Micronaut 搭建微服务架构,包括设置开发环境、创建 Maven 项目并添加 Micronaut 依赖,编写主类启动应用,以及添加控制器处理 HTTP 请求。通过示例代码展示如何实现简单的 “Hello, World!” 功能,并介绍如何通过添加更多依赖来扩展应用功能,如数据访问、验证和安全性等。Micronaut 的强大和灵活性使你能够快速构建复杂的微服务系统。
26 5
|
6天前
|
缓存 Java 应用服务中间件
随着微服务架构的兴起,Spring Boot凭借其快速开发和易部署的特点,成为构建RESTful API的首选框架
【9月更文挑战第6天】随着微服务架构的兴起,Spring Boot凭借其快速开发和易部署的特点,成为构建RESTful API的首选框架。Nginx作为高性能的HTTP反向代理服务器,常用于前端负载均衡,提升应用的可用性和响应速度。本文详细介绍如何通过合理配置实现Spring Boot与Nginx的高效协同工作,包括负载均衡策略、静态资源缓存、数据压缩传输及Spring Boot内部优化(如线程池配置、缓存策略等)。通过这些方法,开发者可以显著提升系统的整体性能,打造高性能、高可用的Web应用。
27 2
|
6天前
|
Cloud Native 安全 Java
Micronaut对决Spring Boot:谁是微服务领域的王者?揭秘两者优劣,选对框架至关重要!
【9月更文挑战第5天】近年来,微服务架构备受关注,Micronaut和Spring Boot成为热门选择。Micronaut由OCI开发,基于注解的依赖注入,内置多种特性,轻量级且启动迅速;Spring Boot则简化了Spring应用开发,拥有丰富的生态支持。选择框架需考虑项目需求、团队经验、性能要求及社区支持等因素。希望本文能帮助您选择合适的微服务框架,助力您的软件开发项目取得成功!
35 2
|
16天前
|
Cloud Native JavaScript API
一文读懂云原生 go-zero 微服务框架
一文读懂云原生 go-zero 微服务框架
|
机器学习/深度学习 Go 微服务
go微服务框架go-micro深度学习(四) rpc方法调用过程详解
上一篇帖子go微服务框架go-micro深度学习(三) Registry服务的注册和发现详细解释了go-micro是如何做服务注册和发现在,服务端注册server信息,client获取server的地址信息,就可以和服务建立连接,然后就可以进行通信了。
2410 0
|
1天前
|
程序员 Go PHP
为什么大部分的 PHP 程序员转不了 Go 语言?
【9月更文挑战第8天】大部分 PHP 程序员难以转向 Go 语言,主要因为:一、编程习惯与思维方式差异,如语法风格和编程范式;二、学习成本高,需掌握新知识体系且面临项目压力;三、职业发展考量,现有技能价值及市场需求不确定性。学习新语言虽有挑战,但对拓宽职业道路至关重要。
22 10
|
2天前
|
算法 程序员 Go
PHP 程序员学会了 Go 语言就能唬住面试官吗?
【9月更文挑战第8天】学会Go语言可提升PHP程序员的面试印象,但不足以 solely “唬住” 面试官。学习新语言能展现学习能力、拓宽技术视野,并增加就业机会。然而,实际项目经验、深入理解语言特性和综合能力更为关键。全面展示这些方面才能真正提升面试成功率。
20 10
|
1天前
|
编译器 Go
go语言学习记录(关于一些奇怪的疑问)有别于其他编程语言
本文探讨了Go语言中的常量概念,特别是特殊常量iota的使用方法及其自动递增特性。同时,文中还提到了在声明常量时,后续常量可沿用前一个值的特点,以及在遍历map时可能遇到的非顺序打印问题。
|
6天前
|
安全 大数据 Go
深入探索Go语言并发编程:Goroutines与Channels的实战应用
在当今高性能、高并发的应用需求下,Go语言以其独特的并发模型——Goroutines和Channels,成为了众多开发者眼中的璀璨明星。本文不仅阐述了Goroutines作为轻量级线程的优势,还深入剖析了Channels作为Goroutines间通信的桥梁,如何优雅地解决并发编程中的复杂问题。通过实战案例,我们将展示如何利用这些特性构建高效、可扩展的并发系统,同时探讨并发编程中常见的陷阱与最佳实践,为读者打开Go语言并发编程的广阔视野。