转载请注明出处:
1.定义消息格式
在 src/main/proto 目录下创建 person.proto 文件,并定义消息格式,例如:
syntax = "proto3"; package example; message Person { string name = 1; int32 age = 2; repeated string interests = 3; }
这个文件定义了一个名为 Person 的消息类型,包括三个字段:name、age 和 interests
2.生成代码
使用 protoc 工具来生成 Java 代码,需要安装相应的插件和工具,可以通过 Maven 或 Gradle 等构建工具自动下载和配置。这里演示手动下载和安装的方式。
首先下载 protoc 工具及其插件,例如从官方网站下载对应版本的 protoc-3.x.x-linux-x86_64.zip,以及 protoc-gen-grpc-java 插件,例如从 Maven 中央仓库下载最新版的 protoc-gen-grpc-java-1.42.0-linux-x86_64.exe。
然后解压 protoc 工具,将 protoc 命令所在的路径添加到环境变量 PATH 中,例如:
export PATH="/path/to/protoc/bin:$PATH"
接下来安装 protobuf-java 和 grpc-java 两个依赖,例如通过 Maven 引入以下依赖:
<dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> <version>3.17.3</version> </dependency> <dependency> <groupId>io.grpc</groupId> <artifactId>grpc-netty-shaded</artifactId> <version>1.42.0</version> </dependency> <dependency> <groupId>io.grpc</groupId> <artifactId>grpc-protobuf</artifactId> <version>1.42.0</version> </dependency> <dependency> <groupId>io.grpc</groupId> <artifactId>grpc-stub</artifactId> <version>1.42.0</version> </dependency> <dependency> <groupId>io.grpc</groupId> <artifactId>grpc-testing</artifactId> <version>1.42.0</version> <scope>test</scope> </dependency>
接着使用 protoc 生成 Java 代码,例如:
protoc --java_out=src/main/java --grpc-java_out=src/main/java src/main/proto/person.proto
这个命令会在 src/main/java/example 目录下生成 Person.java、PersonGrpc.java 和 PersonGrpc$PersonStub.java 等文件。
3.实现服务端
package example; import io.grpc.Server; import io.grpc.ServerBuilder; import io.grpc.stub.StreamObserver; import java.io.IOException; public class PersonServer { private final int port; private final Server server; public PersonServer(int port) throws IOException { this.port = port; this.server = ServerBuilder.forPort(port) .addService(new PersonServiceImpl()) .build(); } public void start() throws IOException { server.start(); System.out.println("Server started, listening on " + port); Runtime.getRuntime().addShutdownHook(new Thread(() -> { System.err.println("*** shutting down gRPC server since JVM is shutting down"); PersonServer.this.stop(); System.err.println("*** server shut down"); })); } public void stop() { if (server != null) { server.shutdown(); } } private static class PersonServiceImpl extends PersonGrpc.PersonImplBase { @Override public void getMessage(PersonRequest request, StreamObserver<PersonResponse> responseObserver) { String name = request.getName(); int age = request.getAge(); String interests = String.join(", ", request.getInterestsList()); // 将请求的内容响应回去 PersonResponse response = PersonResponse.newBuilder() .setMessage("Received person info: name=" + name + ", age=" + age + ", interests=[" + interests + "]") .build(); responseObserver.onNext(response); responseObserver.onCompleted(); } } public static void main(String[] args) throws IOException, InterruptedException { PersonServer server = new PersonServer(8989); server.start(); server.blockUntilShutdown(); } }
4.实现客户端
package example; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import io.grpc.stub.StreamObserver; import java.util.Arrays; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; public class PersonClient { private final ManagedChannel channel; private final PersonGrpc.PersonStub stub; public PersonClient(String host, int port) { this.channel = ManagedChannelBuilder.forAddress(host, port) .usePlaintext() .build(); this.stub = PersonGrpc.newStub(channel); } public void shutdown() throws InterruptedException { channel.shutdown().awaitTermination(5, TimeUnit.SECONDS); } public void sendMessage(String name, int age, String... interests) throws InterruptedException { final CountDownLatch latch = new CountDownLatch(1); StreamObserver<PersonResponse> responseObserver = new StreamObserver<PersonResponse>() { @Override public void onNext(PersonResponse response) { System.out.println("Received response: " + response.getMessage()); } @Override public void onError(Throwable t) { System.err.println("Received error: " + t.getMessage()); latch.countDown(); } @Override public void onCompleted() { System.out.println("Request completed"); latch.countDown(); } }; List<String> interestList = Arrays.asList(interests); PersonRequest request = PersonRequest.newBuilder() .setName(name) .setAge(age) .addAllInterests(interestList) .build(); stub.getMessage(request, responseObserver); latch.await(); } public static void main(String[] args) throws InterruptedException { PersonClient client = new PersonClient("localhost", 8989); // 向服务器发送请求 client.sendMessage("Alice", 20, "reading", "swimming"); // 关闭连接 client.shutdown(); } }
这个客户端会向服务器发送一个包含 name、age 和 interests 字段的 PersonRequest 消息,并等待接收服务器的响应信息。
标签: 网络与传输协议