java版gRPC实战之四:客户端流

简介: 客户端流式RPC,适用于客户端提交大量数据到服务端的场景,一起来体验一下

欢迎访问我的GitHub

这里分类和汇总了欣宸的全部原创(含配套源码): https://github.com/zq2599/blog_demos

本篇概览

  • 本文是《java版gRPC实战》系列的第四篇,前文掌握了服务端流,适合从服务端获取大量数据的场景,今天的目标是掌握客户端流类型的服务,包括服务提供方和使用方两侧的开发;
  • 先来看看官方资料对客户端流式RPC的介绍:客户端写入一个消息序列并将其发送到服务器,同样也是使用流。一旦客户端完成写入消息,它等待服务器完成读取返回它的响应;
  • 本文由以下几部分组成:
  1. 提前小结几个重要的知识点,稍后开发过程中要重点关注这几个地方;
  2. 在proto文件中定义客户端流类型的gRPC接口,再通过proto生成java代码;
  3. 开发服务端应用;
  4. 开发客户端应用;
  5. 验证;

提前小结

为了突出重点,这里将几个关键的知识点提前给出:

  1. 客户端流的特点,是请求方以流的形式提交数据到响应方;
  2. 一次RPC请求中,请求方可以通过流的方式源源不断的提交数据,直到调用了StreamObserver的onCompleted方法,才算提交数据完成;
  3. 平时咱们调用方法时,方法内部用到的数据是通过入参传进来的,但这里不一样,客户端要传给服务端的数据和gRPC方法的入参没有关系,而是和方法的返回对象有关(执行返回对象的onNext方法可以将数据传给服务端);
  4. 客户端在A线程上传完数据后,服务端的响应是在另一个线程B执行的,因此,如果A线程拿到服务端响应,就要B线程的异步响应方法执行完毕,等待的方法有多种,我用的是CountDownLatch;
  5. 在服务端,开发者要编写的代码和以往web开发不同,不是将数据处理好返回,而是返回一个StreamObserver实例给上层框架,由框架负责处理的逻辑,开发者专注开发StreamObserver的实现即可,例如重写onNext方法,客户端通过流每上传一笔数据,onNext方法都会被外层框架执行一次;
  6. 如果您用的是IDEA,记得勾选下图红框中的选框,否则运行应用的时候可能遇到lombok相关的问题:

在这里插入图片描述

  • 上面提到的这些,会在接下来的开发过程中充分体现出来;

源码下载

名称 链接 备注
项目主页 https://github.com/zq2599/blog_demos 该项目在GitHub上的主页
git仓库地址(https) https://github.com/zq2599/blog_demos.git 该项目源码的仓库地址,https协议
git仓库地址(ssh) git@github.com:zq2599/blog_demos.git 该项目源码的仓库地址,ssh协议
  • 这个git项目中有多个文件夹,《java版gRPC实战》系列的源码在grpc-tutorials文件夹下,如下图红框所示:

在这里插入图片描述

  • grpc-tutorials文件夹下有多个目录,本篇文章对应的服务端代码在client-stream-server-side目录下,客户端代码在client-stream-client-side目录下,如下图:

在这里插入图片描述

在proto文件中定义客户端流类型的gRPC接口

  • 首先要做的就是定义gRPC接口,打开mall.proto,在里面新增方法和相关的数据结构,需要重点关注的是AddToCart方法的入参ProductOrder前面添加了stream修饰,代表该方法是客户端流类型:
// gRPC服务,这是个在线商城的购物车服务
service CartService {
    // 客户端流式:添加多个商品到购物车
    rpc AddToCart (stream ProductOrder) returns (AddCartReply) {}
}

// 提交购物车时的产品信息
message ProductOrder {
    // 商品ID
    int32 productId = 1;
    // 商品数量
    int32 number = 2;
}

// 提交购物车返回结果的数据结构
message AddCartReply {
    // 返回码
    int32 code = 1;
    // 描述信息
    string message = 2;
}
  • 双击下图红框中的task即可生成java代码:

在这里插入图片描述

  • 生成下图红框中的文件:

在这里插入图片描述

  • 接下来开发服务端;

开发服务端应用

  • 在父工程grpc-turtorials下面新建名为client-stream-server-side的模块,其build.gradle内容如下:
// 使用springboot插件
plugins {
    id 'org.springframework.boot'
}

dependencies {
    implementation 'org.projectlombok:lombok'
    implementation 'org.springframework.boot:spring-boot-starter'
    // 作为gRPC服务提供方,需要用到此库
    implementation 'net.devh:grpc-server-spring-boot-starter'
    // 依赖自动生成源码的工程
    implementation project(':grpc-lib')
    // annotationProcessor不会传递,使用了lombok生成代码的模块,需要自己声明annotationProcessor
    annotationProcessor 'org.projectlombok:lombok'
}
  • 配置文件application.yml:
spring:
  application:
    name: client-stream-server-side
# gRPC有关的配置,这里只需要配置服务端口号
grpc:
  server:
    port: 9900
  • 启动类ClientStreamServerSideApplication.java的代码就不贴了,普通的springboot启动类而已;
  • 重点是提供grpc服务的GrpcServerService.java,请结合前面小结的第五点来阅读代码,咱们要做的就是给上层框架返回一个匿名类,至于里面的onNext、onCompleted方法何时被调用是上层框架决定的,另外还准备了成员变量totalCount,这样就可以记录总数了:
package com.bolingcavalry.grpctutorials;

import com.bolingcavalry.grpctutorials.lib.AddCartReply;
import com.bolingcavalry.grpctutorials.lib.CartServiceGrpc;
import com.bolingcavalry.grpctutorials.lib.ProductOrder;
import io.grpc.stub.StreamObserver;
import lombok.extern.slf4j.Slf4j;
import net.devh.boot.grpc.server.service.GrpcService;

@GrpcService
@Slf4j
public class GrpcServerService extends CartServiceGrpc.CartServiceImplBase {

    @Override
    public StreamObserver<ProductOrder> addToCart(StreamObserver<AddCartReply> responseObserver) {
        // 返回匿名类,给上层框架使用
        return new StreamObserver<ProductOrder>() {

            // 记录处理产品的总量
            private int totalCount = 0;

            @Override
            public void onNext(ProductOrder value) {
                log.info("正在处理商品[{}],数量为[{}]",
                        value.getProductId(),
                        value.getNumber());

                // 增加总量
                totalCount += value.getNumber();
            }

            @Override
            public void onError(Throwable t) {
                log.error("添加购物车异常", t);
            }

            @Override
            public void onCompleted() {
                log.info("添加购物车完成,共计[{}]件商品", totalCount);
                responseObserver.onNext(AddCartReply.newBuilder()
                                                    .setCode(10000)
                                                    .setMessage(String.format("添加购物车完成,共计[%d]件商品", totalCount))
                                                    .build());
                responseObserver.onCompleted();
            }
        };
    }
}

开发客户端应用

  • 在父工程grpc-turtorials下面新建名为client-stream-server-side的模块,其build.gradle内容如下:
plugins {
    id 'org.springframework.boot'
}

dependencies {
    implementation 'org.projectlombok:lombok'
    implementation 'org.springframework.boot:spring-boot-starter'
    implementation 'org.springframework.boot:spring-boot-starter-web'
    implementation 'net.devh:grpc-client-spring-boot-starter'
    implementation project(':grpc-lib')
}
  • 配置文件application.yml,设置自己的web端口号和服务端地址:
server:
  port: 8082
spring:
  application:
    name: client-stream-client-side

grpc:
  client:
    # gRPC配置的名字,GrpcClient注解会用到
    client-stream-server-side:
      # gRPC服务端地址
      address: 'static://127.0.0.1:9900'
      enableKeepAlive: true
      keepAliveWithoutCalls: true
      negotiationType: plaintext
  • 启动类ClientStreamClientSideApplication.java的代码就不贴了,普通的springboot启动类而已;
  • 正常情况下我们都是用StreamObserver处理服务端响应,这里由于是异步响应,需要额外的方法从StreamObserver中取出业务数据,于是定一个新接口,继承自StreamObserver,新增getExtra方法可以返回String对象,详细的用法稍后会看到:
package com.bolingcavalry.grpctutorials;

import io.grpc.stub.StreamObserver;

public interface ExtendResponseObserver<T> extends StreamObserver<T> {
    String getExtra();
}
  • 重头戏来了,看看如何远程调用客户端流类型的gRPC接口,前面小结提到的2、3、4点都会涉及到,代码中已经添加详细注释:
package com.bolingcavalry.grpctutorials;

import com.bolingcavalry.grpctutorials.lib.AddCartReply;
import com.bolingcavalry.grpctutorials.lib.CartServiceGrpc;
import com.bolingcavalry.grpctutorials.lib.ProductOrder;
import io.grpc.stub.StreamObserver;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import net.devh.boot.grpc.client.inject.GrpcClient;
import org.springframework.stereotype.Service;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

@Service
@Slf4j
public class GrpcClientService {

    @GrpcClient("client-stream-server-side")
    private CartServiceGrpc.CartServiceStub cartServiceStub;

    public String addToCart(int count) {
        
        CountDownLatch countDownLatch = new CountDownLatch(1);
        
        // responseObserver的onNext和onCompleted会在另一个线程中被执行,
        // ExtendResponseObserver继承自StreamObserver
        ExtendResponseObserver<AddCartReply> responseObserver = new ExtendResponseObserver<AddCartReply>() {

            String extraStr;

            @Override
            public String getExtra() {
                return extraStr;
            }

            private int code;

            private String message;

            @Override
            public void onNext(AddCartReply value) {
                log.info("on next");
                code = value.getCode();
                message = value.getMessage();
            }

            @Override
            public void onError(Throwable t) {
                log.error("gRPC request error", t);
                extraStr = "gRPC error, " + t.getMessage();
                countDownLatch.countDown();
            }

            @Override
            public void onCompleted() {
                log.info("on complete");
                extraStr = String.format("返回码[%d],返回信息:%s" , code, message);
                countDownLatch.countDown();
            }
        };
        
        // 远程调用,此时数据还没有给到服务端
        StreamObserver<ProductOrder> requestObserver = cartServiceStub.addToCart(responseObserver);
        
        for(int i=0; i<count; i++) {
            // 发送一笔数据到服务端
            requestObserver.onNext(build(101 + i, 1 + i));
        }

        // 客户端告诉服务端:数据已经发完了
        requestObserver.onCompleted();

        try {
            // 开始等待,如果服务端处理完成,那么responseObserver的onCompleted方法会在另一个线程被执行,
            // 那里会执行countDownLatch的countDown方法,一但countDown被执行,下面的await就执行完毕了,
            // await的超时时间设置为2秒
            countDownLatch.await(2, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            log.error("countDownLatch await error", e);
        }

        log.info("service finish");
        // 服务端返回的内容被放置在requestObserver中,从getExtra方法可以取得
        return responseObserver.getExtra();
    }

    /**
     * 创建ProductOrder对象
     * @param productId
     * @param num
     * @return
     */
    private static ProductOrder build(int productId, int num) {
        return ProductOrder.newBuilder().setProductId(productId).setNumber(num).build();
    }
}
  • 最后做个web接口,可以通过web请求验证远程调用:
package com.bolingcavalry.grpctutorials;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.util.List;

@RestController
public class GrpcClientController {

    @Autowired
    private GrpcClientService grpcClientService;

    @RequestMapping("/")
    public String printMessage(@RequestParam(defaultValue = "1") int count) {
        return grpcClientService.addToCart(count);
    }
}
  • 编码完成,开始验证;

验证

  • 启动服务端ClientStreamServerSideApplication:

在这里插入图片描述

  • 启动客户端ClientStreamClientSideApplication:

在这里插入图片描述

  • 浏览器输入http://localhost:8082/?count=100,响应如下,可见远程调用gRPC服务成功:

在这里插入图片描述

  • 下面是服务端日志,可见逐一处理了客户端的每一笔数据:

在这里插入图片描述

  • 下面是客户端日志,可见由于CountDownLatch的作用,发起gRPC请求的线程一直等待responseObserver.onCompleted在另一个线程被执行完后,才会继续执行:

在这里插入图片描述

  • 至此,客户端流类型的gRPC服务及其客户端开发就完成了,这种异步操作与咱们平时开发同步类型的web接口还是有差别的,希望本文能给您带来一些参考,下一篇咱们实战最后一种类型:双向流式;

欢迎关注阿里云开发者社区博客:程序员欣宸

学习路上,你不孤单,欣宸原创一路相伴...
相关文章
|
9天前
|
消息中间件 Java Kafka
"Kafka快速上手:从环境搭建到Java Producer与Consumer实战,轻松掌握分布式流处理平台"
【8月更文挑战第10天】Apache Kafka作为分布式流处理平台的领头羊,凭借其高吞吐量、可扩展性和容错性,在大数据处理、实时日志收集及消息队列领域表现卓越。初学者需掌握Kafka基本概念与操作。Kafka的核心组件包括Producer(生产者)、Broker(服务器)和Consumer(消费者)。Producer发送消息到Topic,Broker负责存储与转发,Consumer则读取这些消息。首先确保已安装Java和Kafka,并启动服务。接着可通过命令行创建Topic,并使用提供的Java API实现Producer发送消息和Consumer读取消息的功能。
31 8
|
11天前
|
jenkins Java 持续交付
【一键搞定!】Jenkins 自动发布 Java 代码的神奇之旅 —— 从零到英雄的持续集成/部署实战秘籍!
【8月更文挑战第9天】随着软件开发自动化的发展,持续集成(CI)与持续部署(CD)已成为现代流程的核心。Jenkins 作为一款灵活且功能丰富的开源 CI/CD 工具,在业界应用广泛。以一家电商公司的 Java 后端服务为例,通过搭建 Jenkins 自动化发布流程,包括创建 Jenkins 项目、配置 Git 仓库、设置构建触发器以及编写构建脚本等步骤,可以实现代码的快速可靠部署。
33 2
|
23天前
|
消息中间件 Java Kafka
Java 客户端访问kafka
Java 客户端访问kafka
29 9
|
23天前
|
Java Android开发 C++
🚀Android NDK开发实战!Java与C++混合编程,打造极致性能体验!📊
【7月更文挑战第28天】在 Android 开发中, NDK 让 Java 与 C++ 混合编程成为可能, 从而提升应用性能。**为何选 NDK?** C++ 在执行效率与内存管理上优于 Java, 特别适合高性能需求场景。**环境搭建** 需 Android Studio 和 NDK, 工具如 CMake。**JNI** 构建 Java-C++ 交互, 通过声明 `native` 方法并在 C++ 中实现。**实战** 示例: 使用 C++ 计算斐波那契数列以提高效率。**总结** 混合编程增强性能, 但增加复杂性, 使用前需谨慎评估。
58 4
|
3天前
|
设计模式 存储 Java
掌握Java设计模式的23种武器(全):深入解析与实战示例
掌握Java设计模式的23种武器(全):深入解析与实战示例
|
5天前
|
Java
Java模拟文件发送给服务器,服务器将文件转发给其他用户,并保存到服务器本地,其他用户可以接收,并保存到本地磁盘,支持各种文件格式,并解决通信中服务器怎么区分客户端发来的文件类型
Java模拟文件发送给服务器,服务器将文件转发给其他用户,并保存到服务器本地,其他用户可以接收,并保存到本地磁盘,支持各种文件格式,并解决通信中服务器怎么区分客户端发来的文件类型
|
1月前
|
缓存 监控 Java
Java虚拟机(JVM)性能调优实战指南
在追求软件开发卓越的征途中,Java虚拟机(JVM)性能调优是一个不可或缺的环节。本文将通过具体的数据和案例,深入探讨JVM性能调优的理论基础与实践技巧,旨在为广大Java开发者提供一套系统化的性能优化方案。文章首先剖析了JVM内存管理机制的工作原理,然后通过对比分析不同垃圾收集器的适用场景及性能表现,为读者揭示了选择合适垃圾回收策略的数据支持。接下来,结合线程管理和JIT编译优化等高级话题,文章详细阐述了如何利用现代JVM提供的丰富工具进行问题诊断和性能监控。最后,通过实际案例分析,展示了性能调优过程中可能遇到的挑战及应对策略,确保读者能够将理论运用于实践,有效提升Java应用的性能。 【
125 10
|
1月前
|
前端开发 Java 关系型数据库
Java中的电子商务网站开发实战
Java中的电子商务网站开发实战
|
1月前
|
并行计算 Java API
Java中的函数式编程实战与Lambda表达式应用
Java中的函数式编程实战与Lambda表达式应用
|
1月前
|
安全 Java 调度
Java并发编程:从基础到实战
【7月更文挑战第3天】在Java的世界中,并发编程是一块充满挑战与机遇的领域。本文将带领读者从理解并发编程的基本概念开始,逐步深入到Java并发工具的使用和高级技巧的应用。我们将一起探索如何在多线程环境下保证数据的一致性和程序的正确性,以及如何通过高效的并发策略来提升应用性能。准备好,让我们开启Java并发编程的旅程,掌握让应用飞一般运行的秘密。
32 1