完成RPC实现和调用的方法很多,有gRPC,Springcloud,Thrift,当然还有我们阿里出品的大名鼎鼎的Dubbo,还有一个大数据领域的框架,肯定也会实现RPC的机制。使用Hadoop实现RPC通信前,先准备一下我们的proto文件以及生成协议类,具体参考https://developer.aliyun.com/article/1172921?spm=a2c6h.13148508.setting.16.5ce84f0eKKzUID。将HelloGrpc.java和HelloGrpcServiceGrpc.java拷贝到我们的项目目录。下面来实现Hadoop提供的工具类如何实现的,首先引入依赖
<dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>2.8.5</version></dependency>
然后定义我们的接口以及接口的实现,主要是对proto协议规范的东西进行封装和方法实现,最重要的一点是接口要继承Hadoop里的RPC接口VersionedProtocol
packagecom.proto.server; importorg.apache.hadoop.ipc.VersionedProtocol; importtest.hello.HelloGrpc; publicinterfaceHelloHadoopRpcServiceextendsVersionedProtocol { publiclongversionID=2L; HelloGrpc.HelloGrpcResponsequery(HelloGrpc.HelloGrpcRequestrequest); }
packagecom.proto.server; importorg.apache.hadoop.ipc.ProtocolSignature; importtest.hello.HelloGrpc; importjava.io.IOException; publicclassHelloHadoopRpcServiceImplimplementsHelloHadoopRpcService{ publicHelloGrpc.HelloGrpcResponsequery(HelloGrpc.HelloGrpcRequestrequest) { Stringname=request.getName(); HelloGrpc.HelloGrpcResponse.Builderbuilder=HelloGrpc.HelloGrpcResponse.newBuilder(); builder.setResult("Welcome "+name); HelloGrpc.HelloGrpcResponseresponse=builder.build(); returnresponse; } publiclonggetProtocolVersion(Strings, longl) throwsIOException { returnversionID; } publicProtocolSignaturegetProtocolSignature(Strings, longl, inti) throwsIOException { returnnewProtocolSignature(); } }
上述接口和实现类写好了以后,下面就是将上面的接口和实现暴露出去
packagecom.proto.server; importorg.apache.hadoop.conf.Configuration; importorg.apache.hadoop.ipc.RPC; importjava.io.IOException; publicclassHelloHadoopRpcServer { publicstaticvoidmain(String[] args) throwsIOException { Configurationconf=newConfiguration(); // 构建 Rpc ServerRPC.Serverserver=newRPC.Builder(conf) .setProtocol(HelloHadoopRpcService.class) .setInstance(newHelloHadoopRpcServiceImpl()) .setPort(29998) .setNumHandlers(1) .build(); server.start(); } }
最后使用客户端调用上述的服务
packagecom.proto.client; importcom.proto.server.HelloHadoopRpcService; importorg.apache.hadoop.conf.Configuration; importorg.apache.hadoop.ipc.RPC; importtest.hello.HelloGrpc; importjava.io.IOException; importjava.net.InetSocketAddress; publicclassHelloHadoopRpcClient { publicstaticvoidmain(String[] args) throwsIOException { Configurationconf=newConfiguration(); Stringhostname="localhost"; intport=29998; // 获取代理HelloHadoopRpcServiceprotocolProxy=RPC .getProxy(HelloHadoopRpcService.class, 2, newInetSocketAddress(hostname, port), conf); // 构建请求对象HelloGrpc.HelloGrpcRequest.Builderbuilder=HelloGrpc.HelloGrpcRequest.newBuilder(); HelloGrpc.HelloGrpcRequestname=builder.setName("anisbob").build(); // 发送 RPC 请求,获取响应HelloGrpc.HelloGrpcResponseresponse=null; try { response=protocolProxy.query(name); } catch (Exceptione) { e.printStackTrace(); } // 处理响应Stringresult=response.getResult(); System.out.println("查询结果: result = "+result); } }
按照顺序启动上述服务端和客户端程序,在客户端控制台可以看到输出:查询结果: result = Welcome anisbob。
仅仅是为了好奇和兴趣,才写了上面的代码,还是gRPC的方式比较常规一点。