服务网格GRPC协议多种编程语言实践-2.GRPC协议示例的实现

本文涉及的产品
容器服务 Serverless 版 ACK Serverless,317元额度 多规格
容器服务 Serverless 版 ACK Serverless,952元额度 多规格
容器镜像服务 ACR,镜像仓库100个 不限时长
简介: 4种编程语言实现grpc的4种通信模型

1 Generated-code

首先本地开发环境需要安装grpc和protobuf,这里以macos为例:brew install grpc protobuf

无论使用什么编程语言实现GRPC协议的服务,都需要将protobuf定义转换为该语言的代码。

  • Java的构建工具Maven提供了自动转换插件protobuf-maven-plugin,执行mvn package会自动使用protoc-gen-grpc-java创建grpc的模板代码。详见hello-grpc-java/pom.xml
  • Go需要执行go get github.com/golang/protobuf/protoc-gen-go安装,然后使用protoc命令生成grpc代码。详见hello-grpc-go/proto2go.sh
  • NodeJs需要执行npm install -g grpc-tools安装grpc_tools_node_protoc,然后使用protoc命令生成grpc代码。详见hello-grpc-nodejs/proto2js.sh
  • Python需要执行pip install grpcio-tools安装grpcio-tools ,然后使用protoc命令生成grpc代码。详见hello-grpc-python/proto2py.sh

在示例工程中,每种语言的代码目录中都有一个proto目录,其中的landing.proto文件是示例根目录下proto/landing.proto文件的软连接,这样有利于统一更新protobuf的定义。

2 通信实现

hello数组

private final List<String> HELLO_LIST = Arrays.asList("Hello", "Bonjour", "Hola", "こんにちは", "Ciao", "안녕하세요");

kv.put("data", HELLO_LIST.get(index));
var helloList = []string{"Hello", "Bonjour", "Hola", "こんにちは", "Ciao", "안녕하세요"}

kv["data"] = helloList[index]
let hellos = ["Hello", "Bonjour", "Hola", "こんにちは", "Ciao", "안녕하세요"]

kv.set("data", hellos[index])
hellos = ["Hello", "Bonjour", "Hola", "こんにちは", "Ciao", "안녕하세요"]

result.kv["data"] = hellos[index]

talk

Unary RPC类型的实现是最经典的,阻塞式一发一收。

// 使用blockingStub与服务端通信
public TalkResponse talk(TalkRequest talkRequest) {
    return blockingStub.talk(talkRequest);
}
//服务端处理请求后触发StreamObserver实例的两个事件onNext和onCompleted
public void talk(TalkRequest request, StreamObserver<TalkResponse> responseObserver) {
    ...
    responseObserver.onNext(response);
    responseObserver.onCompleted();
}
func talk(client pb.LandingServiceClient, request *pb.TalkRequest) {
    r, err := client.Talk(context.Background(), request)
}

func (s *ProtoServer) Talk(ctx context.Context, request *pb.TalkRequest) (*pb.TalkResponse, error) {
    return &pb.TalkResponse{
        Status:  200,
        Results: []*pb.TalkResult{s.buildResult(request.Data)},
    }, nil
}
function talk(client, request) {
    client.talk(request, function (err, response) {
            ...
    })
}
                
function talk(call, callback) {
    const talkResult = buildResult(call.request.getData())
    ...
    callback(null, response)
}                
def talk(stub):
    response = stub.talk(request)
    
def talk(self, request, context):
    result = build_result(request.data)
    ...
    return response

talkOneAnswerMore

Server streaming RPC类型的实现重点是客户端在发送请求后如何处理流式响应数据,以及服务端的流式返回响应。

public List<TalkResponse> talkOneAnswerMore(TalkRequest request) {
    Iterator<TalkResponse> talkResponses = blockingStub.talkOneAnswerMore(request);
    talkResponses.forEachRemaining(talkResponseList::add);
    return talkResponseList;
}

public void talkOneAnswerMore(TalkRequest request, StreamObserver<TalkResponse> responseObserver) {
    String[] datas = request.getData().split(",");
    for (String data : datas) {...}
    talkResponses.forEach(responseObserver::onNext);
    responseObserver.onCompleted();
}
func talkOneAnswerMore(client pb.LandingServiceClient, request *pb.TalkRequest) {
    stream, err := client.TalkOneAnswerMore(context.Background(), request)
    for {
        r, err := stream.Recv()
        if err == io.EOF {
            break
        }
    ...
    }
}

func (s *ProtoServer) TalkOneAnswerMore(request *pb.TalkRequest, stream pb.Landing..Server) error {
    datas := strings.Split(request.Data, ",")
    for _, d := range datas {
        stream.Send(&pb.TalkResponse{...})
}
function talkOneAnswerMore(client, request) {
    let call = client.talkOneAnswerMore(request)
    call.on('data', function (response) {
        ...
    })
}

function talkOneAnswerMore(call) {
    let datas = call.request.getData().split(",")
    for (const data in datas) {
        ...
        call.write(response)
    }
    call.end()
}
def talk_one_answer_more(stub):
    responses = stub.talkOneAnswerMore(request)
    for response in responses:
        logger.info(response)

def talkOneAnswerMore(self, request, context):
    datas = request.data.split(",")
    for data in datas:
        yield response

talkMoreAnswerOne

Client streaming RPC类型的实现重点是客户端以流式发送请求后,告诉服务端请求结束,服务端会将多次请求分别处理,在收到结束时一次返回给客户端。

public void talkMoreAnswerOne(List<TalkRequest> requests) throws InterruptedException {
    final CountDownLatch finishLatch = new CountDownLatch(1);
    StreamObserver<TalkResponse> responseObserver = new StreamObserver<TalkResponse>() {
        @Override
        public void onNext(TalkResponse talkResponse) {
            log.info("Response=\n{}", talkResponse);
        }
        @Override
        public void onCompleted() {
            finishLatch.countDown();
        }
    };
    final StreamObserver<TalkRequest> requestObserver = asyncStub.talkMoreAnswerOne(responseObserver);
    try {
        requests.forEach(request -> {
            if (finishLatch.getCount() > 0) {
                requestObserver.onNext(request);
        });
    requestObserver.onCompleted();
}
                         
public StreamObserver<TalkRequest> talkMoreAnswerOne(StreamObserver<TalkResponse> responseObserver) {
    return new StreamObserver<TalkRequest>() {
        @Override
        public void onNext(TalkRequest request) {
            talkRequests.add(request);
        }
        @Override
        public void onCompleted() {
            responseObserver.onNext(buildResponse(talkRequests));
            responseObserver.onCompleted();
        }
    };
}
func talkMoreAnswerOne(client pb.LandingServiceClient, requests []*pb.TalkRequest) {
    stream, err := client.TalkMoreAnswerOne(context.Background())
    for _, request := range requests {
        stream.Send(request)
    }
    r, err := stream.CloseAndRecv()
}

func (s *ProtoServer) TalkMoreAnswerOne(stream pb.LandingService_TalkMoreAnswerOneServer) error {
    for {
        in, err := stream.Recv()
        if err == io.EOF {
            talkResponse := &pb.TalkResponse{
                Status:  200,
                Results: rs,
            }
            stream.SendAndClose(talkResponse)
            return nil
        }
        rs = append(rs, s.buildResult(in.Data))
    }
}
function talkMoreAnswerOne(client, requests) {
    let call = client.talkMoreAnswerOne(function (err, response) {
        ...
    })
    requests.forEach(request => {
        call.write(request)
    })
    call.end()
}

function talkMoreAnswerOne(call, callback) {
    let talkResults = []
    call.on('data', function (request) {
        talkResults.push(buildResult(request.getData()))
    })
    call.on('end', function () {
        let response = new messages.TalkResponse()
        response.setStatus(200)
        response.setResultsList(talkResults)
        callback(null, response)
    })
}
def talk_more_answer_one(stub):
    response_summary = stub.talkMoreAnswerOne(request_iterator)

def generate_request():
    for _ in range(0, 3):
        yield request
        
def talkMoreAnswerOne(self, request_iterator, context):
    for request in request_iterator:
        response.results.append(build_result(request.data))
    return response  

talkBidirectional

Bidirectional streaming RPC类型的实现重点是客户端以流式发送请求后,告诉服务端请求结束,服务端会在每次请求后将结果返回,并在收到结束时,告诉客户端结束。

public void talkBidirectional(List<TalkRequest> requests) throws InterruptedException {
    final CountDownLatch finishLatch = new CountDownLatch(1);
    StreamObserver<TalkResponse> responseObserver = new StreamObserver<TalkResponse>() {
        @Override
        public void onNext(TalkResponse talkResponse) {
            log.info("Response=\n{}", talkResponse);
        }
        @Override
        public void onCompleted() {
            finishLatch.countDown();
        }
    };
    final StreamObserver<TalkRequest> requestObserver = asyncStub.talkBidirectional(responseObserver);
    try {
        requests.forEach(request -> {
            if (finishLatch.getCount() > 0) {
                requestObserver.onNext(request);
    ...
    requestObserver.onCompleted();
}

public StreamObserver<TalkRequest> talkBidirectional(StreamObserver<TalkResponse> responseObserver) {
    return new StreamObserver<TalkRequest>() {
        @Override
        public void onNext(TalkRequest request) {
            responseObserver.onNext(TalkResponse.newBuilder()
                    .setStatus(200)
                    .addResults(buildResult(request.getData())).build());
        }
        @Override
        public void onCompleted() {
            responseObserver.onCompleted();
        }
    };
}
func talkBidirectional(client pb.LandingServiceClient, requests []*pb.TalkRequest) {
    stream, err := client.TalkBidirectional(context.Background())
    waitc := make(chan struct{})
    go func() {
        for {
            r, err := stream.Recv()
            if err == io.EOF {
                // read done.
                close(waitc)
                return
            }
        }
    }()
    for _, request := range requests {
        stream.Send(request)
    }
    stream.CloseSend()
    <-waitc
}

func (s *ProtoServer) TalkBidirectional(stream pb.LandingService_TalkBidirectionalServer) error {
    for {
        in, err := stream.Recv()
        if err == io.EOF {
            return nil
        }
        stream.Send(talkResponse)
    }
}
function talkBidirectional(client, requests) {
    let call = client.talkBidirectional()
    call.on('data', function (response) {
        ...
    })
    requests.forEach(request => {
        call.write(request)
    })
    call.end()
}

function talkBidirectional(call) {
    call.on('data', function (request) {
        call.write(response)
    })
    call.on('end', function () {
        call.end()
    })
}
def talk_bidirectional(stub):
    responses = stub.talkBidirectional(request_iterator)
    for response in responses:
        logger.info(response)

def talkBidirectional(self, request_iterator, context):
    for request in request_iterator:
        yield response

2 要点实现

环境变量

private static String getGrcServer() {
    String server = System.getenv("GRPC_SERVER");
    if (server == null) {
        return "localhost";
    }
    return server;
}
func grpcServer() string {
    server := os.Getenv("GRPC_SERVER")
    if len(server) == 0 {
        return "localhost"
    } else {
        return server
    }
}
function grpcServer() {
    let server = process.env.GRPC_SERVER;
    if (typeof server !== 'undefined' && server !== null) {
        return server
    } else {
        return "localhost"
    }
}
def grpc_server():
    server = os.getenv("GRPC_SERVER")
    if server:
        return server
    else:
        return "localhost"

随机数

public static String getRandomId() {
    return String.valueOf(random.nextInt(5));
}
func randomId(max int) string {
    return strconv.Itoa(rand.Intn(max))
}
function randomId(max) {
    return Math.floor(Math.random() * Math.floor(max)).toString()
}
def random_id(end):
    return str(random.randint(0, end))

时间戳

TalkResult.newBuilder().setId(System.nanoTime())
result.Id = time.Now().UnixNano()
result.setId(Math.round(Date.now() / 1000))
result.id = int((time.time()))

UUID

kv.put("id", UUID.randomUUID().toString());
import (
    "github.com/google/uuid"
)
kv["id"] = uuid.New().String()
kv.set("id", uuid.v1())
result.kv["id"] = str(uuid.uuid1())

Sleep

TimeUnit.SECONDS.sleep(1);
time.Sleep(2 * time.Millisecond)
let sleep = require('sleep')

sleep.msleep(2)
time.sleep(random.uniform(0.5, 1.5))

3 验证

功能验证

完成功能开发后,我们在一个终端启动GRPC服务,在另一个终端启动客户端。客户端分别对4个通信接口进行请求。

java
mvn exec:java -Dexec.mainClass="org.feuyeux.grpc.server.ProtoServer"
mvn exec:java -Dexec.mainClass="org.feuyeux.grpc.client.ProtoClient"
go
go run server.go
go run client/proto_client.go
nodejs
node proto_server.js
node proto_client.js
python
python server/protoServer.py
python client/protoClient.py

交叉通信

在功能验证基础上,我们启动任意一种编程语言实现的Server端,然后使用其他4种客户端进行验证。以确保不同编程语言实现的GRPC通信行为一致。这步验证是后续容器化和网格化的基础,因为每种编程语言的Server端都会作为同一个Kubernetes Service的一个版本的Deployment发布,它们的行为必须一致,以保证路由到不同版本,结果是一致的。

4 构建和分发

构建

通信功能验证完毕后,接下来是将4个工程编译、构建、打包。这一步的输出是制作镜像的输入。

java

分别构建服务端和客户端的jar,将其拷贝到docker目录备用。

mvn clean install -DskipTests -f server_pom
cp target/hello-grpc-java.jar ../docker/

mvn clean install -DskipTests -f client_pom
cp target/hello-grpc-java.jar ../docker/
go

go编译的二进制是平台相关的,因为我们最终要部署到linux上,因此构建命令如下。然后将二进制拷贝到docker目录备用。

env GOOS=linux GOARCH=amd64 go build -o proto_server server.go
mv proto_server ../docker/

env GOOS=linux GOARCH=amd64 go build -o proto_client client/proto_client.go
mv proto_client ../docker/
nodejs

node需要在docker镜像中进行构建,才能支持运行时所需的各种c++依赖。因此这一步主要是拷贝备用。

cp ../hello-grpc-nodejs/proto_server.js node
cp ../hello-grpc-nodejs/package.json node
cp -R ../hello-grpc-nodejs/common node
cp -R ../proto node
cp ../hello-grpc-nodejs/*_client.js node
python

python无需编译,直接拷贝备用即可。

cp -R ../hello-grpc-python/server py
cp ../hello-grpc-python/start_server.sh py
cp -R ../proto py
cp ../hello-grpc-python/proto2py.sh py
cp -R ../hello-grpc-python/client py
cp ../hello-grpc-python/start_client.sh py

dockerfile

构建完毕后,docker路径下存储了dockerfile所需的全部文件。这里将dockerfile中重点信息说明如下。

  • 基础镜像我们尽量选择alpine,因为尺寸最小。python的基础镜像注意选择2.7版本的python:2,因为示例使用的是python2。如果对python3有强需求,请基于示例代码酌情修改。
  • nodejs需要安装c++及编译器make,npm包需要安装grpc-tools。

这里以nodejs server的镜像作为示例,说明构建镜像的过程。

grpc-server-node.dockerfile
FROM node:14.11-alpine
RUN apk add --update \
      python \
      make \
      g++ \
  && rm -rf /var/cache/apk/*
RUN npm config set registry https://registry.npm.taobao.org && npm install -g node-pre-gyp grpc-tools --unsafe-perm
COPY node/package.json .
RUN npm install --unsafe-perm
COPY node .
ENTRYPOINT ["node","proto_server.js"]
docker build
docker build -f grpc-server-node.dockerfile -t registry.cn-beijing.aliyuncs.com/asm_repo/grpc_server_node:1.0.0 .

镜像列表

最终我们会构建出8个镜像,使用push命令将镜像分发到到阿里云ACR服务,作为下一篇kube的基础。

  • registry.cn-beijing.aliyuncs.com/asm_repo/grpc_server_java:1.0.0
  • registry.cn-beijing.aliyuncs.com/asm_repo/grpc_client_java:1.0.0
  • registry.cn-beijing.aliyuncs.com/asm_repo/grpc_server_go:1.0.0
  • registry.cn-beijing.aliyuncs.com/asm_repo/grpc_client_go:1.0.0
  • registry.cn-beijing.aliyuncs.com/asm_repo/grpc_server_node:1.0.0
  • registry.cn-beijing.aliyuncs.com/asm_repo/grpc_client_node:1.0.0
  • registry.cn-beijing.aliyuncs.com/asm_repo/grpc_server_python:1.0.0
  • registry.cn-beijing.aliyuncs.com/asm_repo/grpc_client_python:1.0.0
目录
相关文章
|
3月前
|
运维 负载均衡 监控
探索微服务架构下的服务网格(Service Mesh)实践之路
【8月更文挑战第30天】 在当今日益复杂的分布式系统中,微服务架构已成为众多企业解决系统扩展与维护难题的利器。然而,随着服务的不断增多和网络交互的复杂性提升,传统的微服务管理方式开始显得力不从心。服务网格(Service Mesh)作为一种新兴的解决方案,旨在通过提供应用层的网络基础设施来简化服务间通讯,并增强系统的可观察性和安全性。本文将分享我在采用服务网格技术过程中的经验与思考,探讨如何在现代云原生环境中有效地实施服务网格,以及它给开发和运维带来的变革。
|
5月前
|
负载均衡 测试技术 网络安全
阿里云服务网格ASM多集群实践(一)多集群管理概述
服务网格多集群管理网络打通和部署模式的多种最佳实践
|
4月前
|
Cloud Native 测试技术 开发者
阿里云服务网格ASM多集群实践(二):高效按需的应用多环境部署与全链路灰度发布
介绍服务网格ASM提出的一种多集群部署下的多环境部署与全链路灰度发布解决方案。
|
6月前
|
监控 负载均衡 数据安全/隐私保护
探索微服务架构下的服务网格(Service Mesh)实践
【5月更文挑战第6天】 在现代软件工程的复杂多变的开发环境中,微服务架构已成为构建、部署和扩展应用的一种流行方式。随着微服务架构的普及,服务网格(Service Mesh)作为一种新兴技术范式,旨在提供一种透明且高效的方式来管理微服务间的通讯。本文将深入探讨服务网格的核心概念、它在微服务架构中的作用以及如何在实际项目中落地实施服务网格。通过剖析服务网格的关键组件及其与现有系统的协同工作方式,我们揭示了服务网格提高系统可观察性、安全性和可操作性的内在机制。此外,文章还将分享一些实践中的挑战和应对策略,为开发者和企业决策者提供实用的参考。
|
6月前
|
运维 监控 负载均衡
探索微服务架构下的服务网格(Service Mesh)实践之路
【4月更文挑战第30天】 在现代云计算的大背景下,微服务架构以其灵活性和可扩展性成为众多企业转型的首选。然而,随着服务的激增和网络交互的复杂化,传统的服务通信模式已无法满足需求,服务网格(Service Mesh)应运而生。本文通过分析服务网格的核心组件、运作机制以及在企业中的实际应用案例,探讨了服务网格在微服务架构中的关键作用及其带来的变革,同时提出了实施过程中面临的挑战和解决策略。
|
6月前
|
运维 监控 Cloud Native
云原生架构下的服务网格演进与实践
【5月更文挑战第23天】 随着云计算技术的不断成熟,云原生架构已成为推动企业数字化转型的关键动力。本文将深入探讨服务网格在云原生环境中的重要性,分析其在微服务管理、流量控制和安全性方面的创新应用。通过对服务网格的技术和实践案例的剖析,揭示其如何优化云原生应用的部署、运行和管理,为企业构建更加动态、可靠和高效的分布式系统提供策略指导。
|
6月前
|
运维 监控 负载均衡
探索微服务架构下的服务网格(Service Mesh)实践
【4月更文挑战第28天】 在现代云原生应用的后端开发领域,微服务架构已成为一种广泛采用的设计模式。随着分布式系统的复杂性增加,服务之间的通信变得愈加关键。本文将深入探讨服务网格这一创新技术,它旨在提供一种透明且高效的方式来管理、监控和保护微服务间的交互。我们将从服务网格的基本概念出发,分析其在实际应用中的优势与挑战,并通过一个案例研究来展示如何在现有的后端系统中集成服务网格。
|
6月前
|
Oracle 关系型数据库
oracle asm 磁盘显示offline
oracle asm 磁盘显示offline
328 2
|
29天前
|
存储 Oracle 关系型数据库
数据库数据恢复—Oracle ASM磁盘组故障数据恢复案例
Oracle数据库数据恢复环境&故障: Oracle ASM磁盘组由4块磁盘组成。Oracle ASM磁盘组掉线 ,ASM实例不能mount。 Oracle数据库故障分析&恢复方案: 数据库数据恢复工程师对组成ASM磁盘组的磁盘进行分析。对ASM元数据进行分析发现ASM存储元数据损坏,导致磁盘组无法挂载。
|
6月前
|
存储 Oracle 关系型数据库
【数据库数据恢复】Oracle数据库ASM磁盘组掉线的数据恢复案例
oracle数据库ASM磁盘组掉线,ASM实例不能挂载。数据库管理员尝试修复数据库,但是没有成功。
【数据库数据恢复】Oracle数据库ASM磁盘组掉线的数据恢复案例
下一篇
无影云桌面