go微服务框架go-micro深度学习(五) stream 调用过程详解

本文涉及的产品
云原生网关 MSE Higress,422元/月
任务调度 XXL-JOB 版免费试用,400 元额度,开发版规格
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介:     上一篇写了一下rpc调用过程的实现方式,简单来说就是服务端把实现了接口的结构体对象进行反射,抽取方法,签名,保存,客户端调用的时候go-micro封请求数据,服务端接收到请求时,找到需要调用调用的对象和对应的方法,利用反射进行调用,返回数据。

    上一篇写了一下rpc调用过程的实现方式,简单来说就是服务端把实现了接口的结构体对象进行反射,抽取方法,签名,保存,客户端调用的时候go-micro封请求数据,服务端接收到请求时,找到需要调用调用的对象和对应的方法,利用反射进行调用,返回数据。 但是没有说stream的实现方式,感觉单独写一篇帖子来说这个更好一些。上一篇帖子是基础,理解了上一篇,stream实现原理一点即破。先说一下使用方式,再说原理。
当前go-micro对 rpc 调用的方式大概如下:
普通的rpc调用 是这样:

1.连接服务器或者从缓存池得到连接
2.客户端 ->发送数据 -> 服务端接收
3.服务端 ->返回数据 -> 客户端处理数据
4.关闭连接或者把连接返回到缓存池
AI 代码解读

当前 rps stream的实现方式 是这样子:

1. 连接服务器
2. 客户端多次发送请求-> 服务端接收
3. 服务端多次返回数据-> 客户端处理数据
4. 关闭连接
AI 代码解读

    当数据量比较大的时候我们可以用stream方式分批次传输数据。对于客户端还是服务端没有限制,我们可以根据自己的需要使用stream方式,使用方式也非常的简单,在定义接口的时候在参数或者返回值前面加上stream然后就可以多次进行传输了,使用的代码还是之前写的例子,代码都在github上:
    比如我的例子中定义了两个使用stream的接口,一个只在返回值使用stream,另一个是在参数和返回值前都加上了stream,最终的使用方式没有区别

    rpc Stream(model.SRequest) returns (stream model.SResponse) {}
    rpc BidirectionalStream(stream model.SRequest) returns (stream model.SResponse) {}
AI 代码解读

看一下go-micro为我们生成的代码rpcapi.micro.go里,不要被吓到,生成了很多代码,但是没啥理解不了的
Server端

// Server API for Say service
type SayHandler interface {
    // .... others    
    Stream(context.Context, *model.SRequest, Say_StreamStream) error
    BidirectionalStream(context.Context, Say_BidirectionalStreamStream) error
}
type Say_StreamStream interface {
    SendMsg(interface{}) error
    RecvMsg(interface{}) error
    Close() error
    Send(*model.SResponse) error
}
type Say_BidirectionalStreamStream interface {
    SendMsg(interface{}) error
    RecvMsg(interface{}) error
    Close() error
    Send(*model.SResponse) error
    Recv() (*model.SRequest, error)
}
// .... others 
AI 代码解读

Client端

// Client API for Say service
type SayService interface {    
    //... others
    Stream(ctx context.Context, in *model.SRequest, opts ...client.CallOption) (Say_StreamService, error)
    BidirectionalStream(ctx context.Context, opts ...client.CallOption) (Say_BidirectionalStreamService, error)
}

type Say_StreamService interface {
    SendMsg(interface{}) error
    RecvMsg(interface{}) error
    Close() error
    Recv() (*model.SResponse, error)
}

type Say_BidirectionalStreamService interface {
    SendMsg(interface{}) error
    RecvMsg(interface{}) error
    Close() error
    Send(*model.SRequest) error
    Recv() (*model.SResponse, error)
}
AI 代码解读

    你会发现参数前面加了 Stream后,生成的代码会把你的参数变成一个接口,这个接口主要要的方法是

    SendMsg(interface{}) error
    RecvMsg(interface{}) error
    Close() error
AI 代码解读

剩下的两个接口方法是根据你是发送还是接收生成的,如果有发送就会有Send(你的参数),如果有接收会生成Rev() (你的参数, error),但这两个方法只是为了让你使用时方便,里面调用的还是SendMsg(interface)和RecvMsg(interface)方法,但是他们是怎么工作的,如何多次发送和接收传输的数据,是不是感觉很神奇。

我就以TsBidirectionalStream 方法为例开始分析,上一篇和再早之前的帖子已经说了服务端启动的时候都做了哪些操作,这里就不再赘述,
服务端的实现,很简单,不断的获取客户端发过来的数据,再给客户端一次一次的返回一些数据。

/*
 模拟数据
 */
func (s *Say) BidirectionalStream(ctx context.Context, stream rpcapi.Say_BidirectionalStreamStream) error {
    for {
        req, err := stream.Recv()
        if err == io.EOF {
            break
        }
        if err != nil {
            return err
        }
        for i := int64(0); i < req.Count; i++ {
            if err := stream.Send(&model.SResponse{Value: []string {lib.RandomStr(lib.Random(3, 6))}}); err != nil {
                return err
            }
        }
    }
    return nil
}
AI 代码解读

启动服务,服务开始监听客户端传过来的数据.....
客户端调用服务端方法:

// 调用 
func TsBidirectionalStream(client rpcapi.SayService) {
    rspStream, err := client.BidirectionalStream(context.Background())
    if err != nil {
        panic(err)
    }
    // send
    go func() {
        rspStream.Send(&model.SRequest{Count: 2})
        rspStream.Send(&model.SRequest{Count: 5})
        // close the stream
        if err := rspStream.Close(); err != nil {
            fmt.Println("stream close err:", err)
        }
    }()
     // recv
    idx := 1
    for  {
        rsp, err := rspStream.Recv()

        if err == io.EOF {
            break
        } else if err != nil {
            panic(err)
        }

        fmt.Printf("test stream get idx %d  data  %v\n", idx, rsp)
        idx++
    }
    fmt.Println("Read Value End")
}
AI 代码解读

当客户端在调用rpc的stream方法是要很得到stream

rspStream, err := client.BidirectionalStream(context.Background())
// 
func (c *sayService) BidirectionalStream(ctx context.Context, opts ...client.CallOption) (Say_BidirectionalStreamService, error) {
    req := c.c.NewRequest(c.name, "Say.BidirectionalStream", &model.SRequest{})
    stream, err := c.c.Stream(ctx, req, opts...)
    if err != nil {
        return nil, err
    }
    return &sayServiceBidirectionalStream{stream}, nil
}
AI 代码解读

这个调用c.c.Stream(ctx, req, opts...)是关键,他的内部实现就是和服务器进行连接,然后返回一个stream,进行操作。

客户端:和服务端建立连接,返回Stream,进行接收和发送数据
服务端:接收客户端连接请求,利用反射找到相应的方法,组织Strem,传给方法,进行数据的发送和接收
AI 代码解读

建立连接的时候就是一次rpc调用,服务端接受连接,然后客户端发送一次调用,但是传输的是空数据,服务端利用反射找到具体的方法,组织stream,调用具体方法,利用这个连接,客户端和服务端进行多次通信。

lpxxn
+关注
目录
打赏
0
0
1
0
32
分享
相关文章
探索Go语言在微服务架构中的优势
在微服务架构的浪潮中,Go语言以其简洁、高效和并发处理能力脱颖而出。本文将深入探讨Go语言在构建微服务时的性能优势,包括其在内存管理、网络编程、并发模型以及工具链支持方面的特点。通过对比其他流行语言,我们将揭示Go语言如何成为微服务架构中的一股清流。
177 53
【环境】Rocky8使用gvm配置Go多版本管理的微服务开发环境(go-zero)
通过本文的介绍,我们详细讲解了如何在Rocky8上使用gvm来管理多个Go版本,并配置go-zero框架的开发环境。通过gvm的灵活管理,开发者可以轻松切换不同的Go版本,以适应不同项目的需求。同时,go-zero框架的使用进一步提升了微服务开发的效率和质量。希望本文能帮助开发者构建高效的Go语言开发环境,提高项目开发的灵活性和稳定性。
105 63
eino — 基于go语言的大模型应用开发框架(二)
本文介绍了如何使用Eino框架实现一个基本的LLM(大语言模型)应用。Eino中的`ChatModel`接口提供了与不同大模型服务(如OpenAI、Ollama等)交互的统一方式,支持生成完整响应、流式响应和绑定工具等功能。`Generate`方法用于生成完整的模型响应,`Stream`方法以流式方式返回结果,`BindTools`方法为模型绑定工具。此外,还介绍了通过`Option`模式配置模型参数及模板功能,支持基于前端和用户自定义的角色及Prompt。目前主要聚焦于`ChatModel`的`Generate`方法,后续将继续深入学习。
276 7
eino — 基于go语言的大模型应用开发框架(一)
Eino 是一个受开源社区优秀LLM应用开发框架(如LangChain和LlamaIndex)启发的Go语言框架,强调简洁性、可扩展性和可靠性。它提供了易于复用的组件、强大的编排框架、简洁明了的API、最佳实践集合及实用的DevOps工具,支持快速构建和部署LLM应用。Eino不仅兼容多种模型库(如OpenAI、Ollama、Ark),还提供详细的官方文档和活跃的社区支持,便于开发者上手使用。
196 8
MNN:阿里开源的轻量级深度学习推理框架,支持在移动端等多种终端上运行,兼容主流的模型格式
MNN 是阿里巴巴开源的轻量级深度学习推理框架,支持多种设备和主流模型格式,具备高性能和易用性,适用于移动端、服务器和嵌入式设备。
689 18
MNN:阿里开源的轻量级深度学习推理框架,支持在移动端等多种终端上运行,兼容主流的模型格式
深度学习工具和框架详细指南:PyTorch、TensorFlow、Keras
在深度学习的世界中,PyTorch、TensorFlow和Keras是最受欢迎的工具和框架,它们为研究者和开发者提供了强大且易于使用的接口。在本文中,我们将深入探索这三个框架,涵盖如何用它们实现经典深度学习模型,并通过代码实例详细讲解这些工具的使用方法。
纯Go语言开发人脸检测、瞳孔/眼睛定位与面部特征检测插件-助力GoFly快速开发框架
开发纯go插件的原因是因为目前 Go 生态系统中几乎所有现有的人脸检测解决方案都是纯粹绑定到一些 C/C++ 库,如 OpenCV 或 dlib,但通过 cgo 调用 C 程序会引入巨大的延迟,并在性能方面产生显著的权衡。此外,在许多情况下,在各种平台上安装 OpenCV 是很麻烦的。使用纯Go开发的插件不仅在开发时方便,在项目部署和项目维护也能省很多时间精力。
Go 语言中常用的 ORM 框架,如 GORM、XORM 和 BeeORM,分析了它们的特点、优势及不足,并从功能特性、性能表现、易用性和社区活跃度等方面进行了比较,旨在帮助开发者根据项目需求选择合适的 ORM 框架。
本文介绍了 Go 语言中常用的 ORM 框架,如 GORM、XORM 和 BeeORM,分析了它们的特点、优势及不足,并从功能特性、性能表现、易用性和社区活跃度等方面进行了比较,旨在帮助开发者根据项目需求选择合适的 ORM 框架。
389 4
Go语言中几种流行的Web框架,如Beego、Gin和Echo,分析了它们的特点、性能及适用场景,并讨论了如何根据项目需求、性能要求、团队经验和社区支持等因素选择最合适的框架
本文概述了Go语言中几种流行的Web框架,如Beego、Gin和Echo,分析了它们的特点、性能及适用场景,并讨论了如何根据项目需求、性能要求、团队经验和社区支持等因素选择最合适的框架。
361 1
Go语言在微服务架构中的应用实践
在微服务架构的浪潮中,Go语言以其简洁、高效和并发处理能力脱颖而出,成为构建微服务的理想选择。本文将探讨Go语言在微服务架构中的应用实践,包括Go语言的特性如何适应微服务架构的需求,以及在实际开发中如何利用Go语言的特性来提高服务的性能和可维护性。我们将通过一个具体的案例分析,展示Go语言在微服务开发中的优势,并讨论在实际应用中可能遇到的挑战和解决方案。
AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等