java版gRPC实战之三:服务端流

简介: 服务端流类型的gRPC服务,如何开发又如何调用,一起来尝试一下吧

欢迎访问我的GitHub

这里分类和汇总了欣宸的全部原创(含配套源码): https://github.com/zq2599/blog_demos

关于gRPC定义的四种类型

本文是《java版gRPC实战》系列的第三篇,前文咱们实战体验了简单的RPC请求和响应,那种简单的请求响应方式其实只是gRPC定义的四种类型之一,这里给出《gRPC 官方文档中文版》对这四种gRPC类型的描述:

  1. 简单 RPC:客户端使用存根(stub)发送请求到服务器并等待响应返回,就像平常的函数调用一样;
  2. 服务器端流式 RPC:客户端发送请求到服务器,拿到一个流去读取返回的消息序列。 客户端读取返回的流,直到里面没有任何消息;(即本篇内容)
  3. 客户端流式 RPC:客户端写入一个消息序列并将其发送到服务器,同样也是使用流。一旦 客户端完成写入消息,它等待服务器完成读取返回它的响应;
  4. 双向流式 RPC:是双方使用读写流去发送一个消息序列。两个流独立操作,因此客户端和服务器 可以以任意喜欢的顺序读写:比如, 服务器可以在写入响应前等待接收所有的客户端消息,或者可以交替 的读取和写入消息,或者其他读写的组合。 每个流中的消息顺序被预留;

本篇概览

本篇是服务端流类型的gRPC服务实战,包括以下内容:

  1. 开发一个gRPC服务,类型是服务端流;
  2. 开发一个客户端,调用前面发布的gRPC服务;
  3. 验证;
  • 不多说了,开始上代码;

源码下载

名称 链接 备注
项目主页 https://github.com/zq2599/blog_demos 该项目在GitHub上的主页
git仓库地址(https) https://github.com/zq2599/blog_demos.git 该项目源码的仓库地址,https协议
git仓库地址(ssh) git@github.com:zq2599/blog_demos.git 该项目源码的仓库地址,ssh协议
  • 这个git项目中有多个文件夹,《java版gRPC实战》系列的源码在grpc-tutorials文件夹下,如下图红框所示:

在这里插入图片描述

  • grpc-tutorials文件夹下有多个目录,本篇文章对应的服务端代码在server-stream-server-side目录下,客户端代码在server-stream-client-side目录下,如下图:

在这里插入图片描述

开发一个gRPC服务,类型是服务端流

  • 首先要开发的是gRPC服务端,一共要做下图所示的七件事:

在这里插入图片描述

  • 打开grpc-lib模块,在src/main/proto目录下新增文件mall.proto,里面定一个了一个gRPC方法ListOrders及其入参和返回对象,内容如下,要注意的是返回值要用关键字stream修饰,表示该接口类型是服务端流:
syntax = "proto3";

option java_multiple_files = true;
// 生成java代码的package
option java_package = "com.bolingcavalry.grpctutorials.lib";
// 类名
option java_outer_classname = "MallProto";

// gRPC服务,这是个在线商城的订单查询服务
service OrderQuery {
    // 服务端流式:订单列表接口,入参是买家信息,返回订单列表(用stream修饰返回值)
    rpc ListOrders (Buyer) returns (stream Order) {}
}

// 买家ID
message Buyer {
    int32 buyerId = 1;
}

// 返回结果的数据结构
message Order {
    // 订单ID
    int32 orderId = 1;
    // 商品ID
    int32 productId = 2;
    // 交易时间
    int64 orderTime = 3;
    // 买家备注
    string buyerRemark = 4;
}
  • 双击下图红框位置的generateProto,即可根据proto生成java代码:

在这里插入图片描述

  • 新生成的java代码如下图红框:

在这里插入图片描述

  • 在父工程grpc-turtorials下面新建名为server-stream-server-side的模块,其build.gradle内容如下:
// 使用springboot插件
plugins {
    id 'org.springframework.boot'
}

dependencies {
    implementation 'org.projectlombok:lombok'
    implementation 'org.springframework.boot:spring-boot-starter'
    // 作为gRPC服务提供方,需要用到此库
    implementation 'net.devh:grpc-server-spring-boot-starter'
    // 依赖自动生成源码的工程
    implementation project(':grpc-lib')
}
  • 新建配置文件application.yml:
spring:
  application:
    name: server-stream-server-side
# gRPC有关的配置,这里只需要配置服务端口号
grpc:
  server:
    port: 9899
  • 启动类:
package com.bolingcavalry.grpctutorials;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class ServerStreamServerSideApplication {
    public static void main(String[] args) {
        SpringApplication.run(ServerStreamServerSideApplication.class, args);
    }
}
  • 接下来是最关键的gRPC服务,代码如下,可见responseObserver.onNext方法被多次调用,用以向客户端持续输出数据,最后通过responseObserver.onCompleted结束输出:
package com.bolingcavalry.grpctutorials;

import com.bolingcavalry.grpctutorials.lib.Buyer;
import com.bolingcavalry.grpctutorials.lib.Order;
import com.bolingcavalry.grpctutorials.lib.OrderQueryGrpc;
import io.grpc.stub.StreamObserver;
import net.devh.boot.grpc.server.service.GrpcService;
import java.util.ArrayList;
import java.util.List;

@GrpcService
public class GrpcServerService extends OrderQueryGrpc.OrderQueryImplBase {

    /**
     * mock一批数据
     * @return
     */
    private static List<Order> mockOrders(){
        List<Order> list = new ArrayList<>();
        Order.Builder builder = Order.newBuilder();

        for (int i = 0; i < 10; i++) {
            list.add(builder
                    .setOrderId(i)
                    .setProductId(1000+i)
                    .setOrderTime(System.currentTimeMillis()/1000)
                    .setBuyerRemark(("remark-" + i))
                    .build());
        }

        return list;
    }

    @Override
    public void listOrders(Buyer request, StreamObserver<Order> responseObserver) {
        // 持续输出到client
        for (Order order : mockOrders()) {
            responseObserver.onNext(order);
        }
        // 结束输出
        responseObserver.onCompleted();
    }
}
  • 至此,服务端开发完成,咱们再开发一个springboot应用作为客户端,看看如何远程调用listOrders接口,得到responseObserver.onNext方法输出的数据;

开发一个客户端,调用前面发布的gRPC服务

  • 客户端模块的基本功能是提供一个web接口,其内部会调用服务端的listOrders接口,将得到的数据返回给前端,如下图:

在这里插入图片描述

  • 在父工程grpc-turtorials下面新建名为server-stream-client-side的模块,其build.gradle内容如下:
plugins {
    id 'org.springframework.boot'
}

dependencies {
    implementation 'org.projectlombok:lombok'
    implementation 'org.springframework.boot:spring-boot-starter'
    implementation 'org.springframework.boot:spring-boot-starter-web'
    implementation 'net.devh:grpc-client-spring-boot-starter'
    implementation project(':grpc-lib')
}
  • 应用配置信息application.yml内容如下,可见是端口和gRPC服务端地址的配置:
server:
  port: 8081
spring:
  application:
    name: server-stream-client-side

grpc:
  client:
    # gRPC配置的名字,GrpcClient注解会用到
    server-stream-server-side:
      # gRPC服务端地址
      address: 'static://127.0.0.1:9899'
      enableKeepAlive: true
      keepAliveWithoutCalls: true
      negotiationType: plaintext
  • 服务端的listOrders接口返回的Order对象里面有很多gRPC相关的内容,不适合作为web接口的返回值,因此定义一个DispOrder类作为web接口返回值:
package com.bolingcavalry.grpctutorials;

import lombok.AllArgsConstructor;
import lombok.Data;
import java.io.Serializable;

@Data
@AllArgsConstructor
public class DispOrder {
    private int orderId;
    private int productId;
    private String orderTime;
    private String buyerRemark;
}
  • 平淡无奇的启动类:
package com.bolingcavalry.grpctutorials;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class ServerStreamClientSideApplication {

    public static void main(String[] args) {
        SpringApplication.run(ServerStreamClientSideApplication.class, args);
    }
}
  • 重点来了,GrpcClientService.java,里面展示了如何远程调用gRPC服务的listOrders接口,可见对于服务端流类型的接口,客户端这边通过stub调用会得到Iterator类型的返回值,接下来要做的就是遍历Iterator:
package com.bolingcavalry.grpctutorials;

import com.bolingcavalry.grpctutorials.lib.Buyer;
import com.bolingcavalry.grpctutorials.lib.Order;
import com.bolingcavalry.grpctutorials.lib.OrderQueryGrpc;
import io.grpc.StatusRuntimeException;
import lombok.extern.slf4j.Slf4j;
import net.devh.boot.grpc.client.inject.GrpcClient;
import org.springframework.stereotype.Service;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

@Service
@Slf4j
public class GrpcClientService {

    @GrpcClient("server-stream-server-side")
    private OrderQueryGrpc.OrderQueryBlockingStub orderQueryBlockingStub;

    public List<DispOrder> listOrders(final String name) {
        // gRPC的请求参数
        Buyer buyer = Buyer.newBuilder().setBuyerId(101).build();

        // gRPC的响应
        Iterator<Order> orderIterator;

        // 当前方法的返回值
        List<DispOrder> orders = new ArrayList<>();

        // 通过stub发起远程gRPC请求
        try {
            orderIterator = orderQueryBlockingStub.listOrders(buyer);
        } catch (final StatusRuntimeException e) {
            log.error("error grpc invoke", e);
            return new ArrayList<>();
        }

        DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");

        log.info("start put order to list");
        while (orderIterator.hasNext()) {
            Order order = orderIterator.next();

            orders.add(new DispOrder(order.getOrderId(),
                                    order.getProductId(),
                                    // 使用DateTimeFormatter将时间戳转为字符串
                                    dtf.format(LocalDateTime.ofEpochSecond(order.getOrderTime(), 0, ZoneOffset.of("+8"))),
                                    order.getBuyerRemark()));
            log.info("");
        }

        log.info("end put order to list");

        return orders;
    }
}
  • 最后做一个controller类,对外提供一个web接口,里面会调用GrpcClientService的方法:
package com.bolingcavalry.grpctutorials;

import com.bolingcavalry.grpctutorials.lib.Order;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.util.List;

@RestController
public class GrpcClientController {

    @Autowired
    private GrpcClientService grpcClientService;

    @RequestMapping("/")
    public List<DispOrder> printMessage(@RequestParam(defaultValue = "will") String name) {
        return grpcClientService.listOrders(name);
    }
}
  • 至此,编码完成,开始验证

验证

  1. 启动server-stream-server-side,启动成功后会监听9989端口:

在这里插入图片描述

  1. 启动server-stream-client-side,再在浏览器访问:http://localhost:8081/?name=Tom ,得到结果如下(firefox自动格式化json数据),可见成功地获取了gRPC的远程数据:

在这里插入图片描述
至此,服务端流类型的gRPC接口的开发和使用实战就完成了,接下来的章节还会继续学习另外两种类型;

欢迎关注阿里云开发者社区博客:程序员欣宸

学习路上,你不孤单,欣宸原创一路相伴...
相关文章
|
2月前
|
存储 Java 开发者
Java Map实战:用HashMap和TreeMap轻松解决复杂数据结构问题!
【10月更文挑战第17天】本文深入探讨了Java中HashMap和TreeMap两种Map类型的特性和应用场景。HashMap基于哈希表实现,支持高效的数据操作且允许键值为null;TreeMap基于红黑树实现,支持自然排序或自定义排序,确保元素有序。文章通过具体示例展示了两者的实战应用,帮助开发者根据实际需求选择合适的数据结构,提高开发效率。
72 2
|
1天前
|
Java
Java基础却常被忽略:全面讲解this的实战技巧!
本次分享来自于一道Java基础的面试试题,对this的各种妙用进行了深度讲解,并分析了一些关于this的常见面试陷阱,主要包括以下几方面内容: 1.什么是this 2.this的场景化使用案例 3.关于this的误区 4.总结与练习
|
17天前
|
Java 程序员
Java基础却常被忽略:全面讲解this的实战技巧!
小米,29岁程序员,分享Java中`this`关键字的用法。`this`代表当前对象引用,用于区分成员变量与局部变量、构造方法间调用、支持链式调用及作为参数传递。文章还探讨了`this`在静态方法和匿名内部类中的使用误区,并提供了练习题。
19 1
|
28天前
|
安全 Java 开发者
Java 多线程并发控制:深入理解与实战应用
《Java多线程并发控制:深入理解与实战应用》一书详细解析了Java多线程编程的核心概念、并发控制技术及其实战技巧,适合Java开发者深入学习和实践参考。
52 6
|
27天前
|
存储 安全 Java
Java多线程编程中的并发容器:深入解析与实战应用####
在本文中,我们将探讨Java多线程编程中的一个核心话题——并发容器。不同于传统单一线程环境下的数据结构,并发容器专为多线程场景设计,确保数据访问的线程安全性和高效性。我们将从基础概念出发,逐步深入到`java.util.concurrent`包下的核心并发容器实现,如`ConcurrentHashMap`、`CopyOnWriteArrayList`以及`BlockingQueue`等,通过实例代码演示其使用方法,并分析它们背后的设计原理与适用场景。无论你是Java并发编程的初学者还是希望深化理解的开发者,本文都将为你提供有价值的见解与实践指导。 --- ####
|
2月前
|
存储 消息中间件 安全
JUC组件实战:实现RRPC(Java与硬件通过MQTT的同步通信)
【10月更文挑战第9天】本文介绍了如何利用JUC组件实现Java服务与硬件通过MQTT的同步通信(RRPC)。通过模拟MQTT通信流程,使用`LinkedBlockingQueue`作为消息队列,详细讲解了消息发送、接收及响应的同步处理机制,包括任务超时处理和内存泄漏的预防措施。文中还提供了具体的类设计和方法实现,帮助理解同步通信的内部工作原理。
JUC组件实战:实现RRPC(Java与硬件通过MQTT的同步通信)
|
2月前
|
JSON Java 开发工具
Java服务端集成Google FCM推送的注意事项和实际经验
本文分享了作者在公司APP海外发布过程中,选择Google FCM进行消息推送的集成经验。文章详细解析了Java集成FCM推送的多种实现方式,包括HTTP请求和SDK集成,并指出了通知栏消息和透传消息的区别与应用场景。同时,作者还探讨了Firebase项目的创建、配置和服务端集成的注意事项,帮助读者解决文档混乱和选择困难的问题。
91 1
|
2月前
|
开发框架 Java 程序员
揭开Java反射的神秘面纱:从原理到实战应用!
本文介绍了Java反射的基本概念、原理及应用场景。反射允许程序在运行时动态获取类的信息并操作其属性和方法,广泛应用于开发框架、动态代理和自定义注解等领域。通过反射,可以实现更灵活的代码设计,但也需注意其性能开销。
55 1
|
2月前
|
JSON Java 开发工具
Java服务端集成Google FCM推送的注意事项和实际经验
公司的app要上海外,涉及到推送功能,经过综合考虑,选择Google FCM进行消息推送。 查看一些集成博客和官方文档,看的似懂非懂,迷迷惑惑。本篇文章除了将我实际集成的经验分享出来,也会对看到的博客及其中产生的疑惑、注意事项一一评论。 从官方文档和众多博客中,你会发现Java集成FCM推送有多种实现方式,会让生产生文档很乱,不知作何选择的困惑。
105 0
|
3月前
|
Java 数据中心 微服务
Java高级知识:线程池隔离与信号量隔离的实战应用
在Java并发编程中,线程池隔离与信号量隔离是两种常用的资源隔离技术,它们在提高系统稳定性、防止系统过载方面发挥着重要作用。
68 0