服务提供类
/** *@author Darkking * * *类说明:短信息发送接口 */ public interface SendSms { boolean sendMail(UserInfo user); } /** *@author Darkking * *类说明:短信息发送服务的实现 */ public class SendSmsImpl implements SendSms { @Override public boolean sendMail(UserInfo user) { try { Thread.sleep(50); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("已发送短信息给:"+user.getName()+"到【"+user.getPhone()+"】"); return true; } }
服务注册类
** *@author Darkking * *类说明:rpc框架的服务端部分 */ public class RpcServerFrameReg { private static ExecutorService executorService = Executors.newFixedThreadPool( Runtime.getRuntime().availableProcessors()); //服务在本地的注册中心,主要是接口名和实现类的对照 private static final Map<String,Class> serviceHolder = new HashMap<>(); //服务的端口号 private int port; public RpcServerFrameReg(int port) { this.port = port; } //服务注册 public void registerSerive(Class<?> serviceInterface,Class impl) throws IOException { Socket socket = null; ObjectOutputStream output = null; ObjectInputStream input = null; /*向注册中心注册服务*/ try{ socket = new Socket(); socket.connect(new InetSocketAddress("127.0.0.1",9999)); output = new ObjectOutputStream(socket.getOutputStream()); output.writeBoolean(false); output.writeUTF(serviceInterface.getName()); output.writeUTF("127.0.0.1"); output.writeInt(port); output.flush(); input = new ObjectInputStream(socket.getInputStream()); if(input.readBoolean()){ serviceHolder.put(serviceInterface.getName(),impl); System.out.println(serviceInterface.getName()+"服务注册成功"); }else{ System.out.println(serviceInterface.getName()+"服务注册失败"); }; }finally { if (socket!=null) socket.close(); if (output!=null) output.close(); if (input!=null) input.close(); } } //处理服务请求任务 private static class ServerTask implements Runnable{ private Socket client = null; public ServerTask(Socket client){ this.client = client; } public void run() { try(ObjectInputStream inputStream = new ObjectInputStream(client.getInputStream()); ObjectOutputStream outputStream = new ObjectOutputStream(client.getOutputStream())){ //方法所在类名接口名 String serviceName = inputStream.readUTF(); //方法的名字 String methodName = inputStream.readUTF(); //方法的入参类型 Class<?>[] parmTypes = (Class<?>[]) inputStream.readObject(); //方法入参的值 Object[] args = (Object[]) inputStream.readObject(); Class serviceClass = serviceHolder.get(serviceName); if (serviceClass == null){ throw new ClassNotFoundException(serviceName+" Not Found"); } Method method = serviceClass.getMethod(methodName,parmTypes); Object result = method.invoke(serviceClass.newInstance(),args); outputStream.writeObject(result); outputStream.flush(); }catch(Exception e){ e.printStackTrace(); }finally { try { client.close(); } catch (IOException e) { e.printStackTrace(); } } } } //启动RPC服务 public void startService() throws IOException{ ServerSocket serverSocket = new ServerSocket(); serverSocket.bind(new InetSocketAddress(port)); System.out.println("RPC server on:"+port+":运行"); try{ while(true){ executorService.execute(new ServerTask(serverSocket.accept())); } }finally { serverSocket.close(); } }
服务注册启动类
/** *@author Darkking * *类说明:rpc的服务端,提供服务 */ public class SmsRpcServerReg { public static void main(String[] args) { new Thread(new Runnable() { public void run() { try{ RpcServerFrameReg serviceServer = new RpcServerFrameReg(9189); serviceServer.registerSerive(SendSms.class, SendSmsImpl.class); serviceServer.startService(); }catch(Exception e){ e.printStackTrace(); } } }).start(); } }
3、客户端代理类
/** *@author Darkking * *类说明:rpc框架的客户端代理部分 */ public class RpcClientFrameReg { //远程代理对象 public static <T> T getRemoteProxyObj(final Class<?> serviceInterface){ final InetSocketAddress addr = new InetSocketAddress("127.0.0.1",9999); return (T) Proxy.newProxyInstance(serviceInterface.getClassLoader(), new Class<?>[]{serviceInterface} ,new DynProxy(serviceInterface,addr)); } //动态代理类 private static class DynProxy implements InvocationHandler { private final Class<?> serviceInterface; private final InetSocketAddress addr; private RegisterServiceVo[] serviceArray;/*远程服务在本地的缓存列表*/ public DynProxy(Class<?> serviceInterface, InetSocketAddress addr) { this.serviceInterface = serviceInterface; this.addr = addr; } @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { Socket socket = null; ObjectOutputStream output = null; ObjectInputStream input = null; /*检索远程服务并填充本地的缓存列表*/ if(serviceArray==null){ try{ socket = new Socket(); socket.connect(addr); output = new ObjectOutputStream(socket.getOutputStream()); output.writeBoolean(true); output.writeUTF(serviceInterface.getName()); output.flush(); input = new ObjectInputStream(socket.getInputStream()); Set<RegisterServiceVo> result = (Set<RegisterServiceVo>)input.readObject(); serviceArray = new RegisterServiceVo[result.size()]; result.toArray(serviceArray); }finally { if (socket!=null) socket.close(); if (output!=null) output.close(); if (input!=null) input.close(); } } /*本地的缓存列表取得一个远端服务器的地址端口 * 可以考虑使用更复杂的算法,以实现服务器的负载均衡 * 这里简单化处理,用随机数挑选*/ Random r = new Random(); int index = r.nextInt(serviceArray.length); InetSocketAddress serviceAddr = new InetSocketAddress(serviceArray[index].getHost(),serviceArray[index].getPort()); try{ socket = new Socket(); socket.connect(serviceAddr); output = new ObjectOutputStream(socket.getOutputStream()); output.writeUTF(serviceInterface.getName());//方法所在的类 output.writeUTF(method.getName());//方法的名 output.writeObject(method.getParameterTypes());//方法的入参类型 output.writeObject(args); output.flush(); input = new ObjectInputStream(socket.getInputStream()); return input.readObject(); }finally{ if (socket!=null) socket.close(); if (output!=null) output.close(); if (input!=null) input.close(); } } } }
客户端远程调用类
/** *@author Darkking *类说明:rpc的客户端,调用远端服务 */ public class RpcClientReg { public static void main(String[] args) { UserInfo userInfo = new UserInfo("你好,Drakking","88888888"); SendSms sendSms = RpcClientFrameReg.getRemoteProxyObj(SendSms.class); System.out.println("Send mail: "+ sendSms.sendMail(userInfo)); } }
执行过程
1、启动服务注册中心
2、服务注册
服务端打印服务注册成功,注册中心打印服务已注册
3、客户端进行服务调用,可以正常调用服务端接口,服务端打印发送信息。
代码资源可以见附件,或者去我的资源里下载。
三、主流的RPC框架
Dubbo
Dubbo作为阿里开源的RPC框架,现在已经成为apache的顶级项目,官网地址如下:http://dubbo.apache.org/en-us/
这里简单介绍下,后续会有专题进行框架搭建以及源码解析。
在Dubbo里,底层采用的是Netty框架,下节会将java中的NIO。支持BIO,BIO等多种线程模型,dubbo的运行机制如下
1、服务容器负责启动,加载,运行服务提供者。
2、服务提供者在启动时,向注册中心注册自己提供的服务。
3、服务消费者在启动时,向注册中心订阅自己所需的服务。
4、注册中心返回服务提供者地址列表给消费者,如果有变更,注册中心将基于长连接推送变更数据给消费者。
5、服务消费者,从提供者地址列表中,基于软负载均衡算法,选一台提供者进行调用,如果调用失败,再选另一台调用。
6、服务消费者和提供者,在内存中累计调用次数和调用时间,定时每分钟发送一次统计数据到监控中心。
Dubbo除了支持RPC,还支持微服务化的服务治理。
SpringCloud
Spring Cloud是一系列框架的集合。它利用SpringBoot的开发便利性巧妙地简化了分布式系统基础设施的开发,如服务发现注册、配置中心、消息总线、负载均衡、断路器、数据监控等,都可以用Spring Boot的开发风格做到一键启动和部署。SpringCloud的RPC实现主要是基于HTTP的RESTful接口。有着跨语言,轻量,易用等特点。并且所属Spring全家桶,包含很多组件,不需要重复造轮子。
gRPC
gRPC是Google开源的通用高性能RPC框架,它支持的是使用Protocol Buffers来编写Service定义,支持较多语言扩平台并且拥有强大的二进制序列化工具集。是一个纯粹的RPC框架
微服务化Dubbo和SpringCloud选型
协议上比较:http相对更规范,更标准,更通用,无论哪种语言都支持http协议。如果你是对外开放API,例如开放平台,外部的编程语言多种多样,你无法拒绝对每种语言的支持,相应的,如果采用http,无疑在你实现SDK之前,支持了所有语言,所以,现在开源中间件,基本最先支持的几个协议都包含RESTful。
RPC协议性能要高的多,例如Protobuf、Thrift、Kyro等,(如果算上序列化)吞吐量大概能达到http的二倍。响应时间也更为出色。千万不要小看这点性能损耗,公认的,微服务做的比较好的,例如,netflix、阿里,曾经都传出过为了提升性能而合并服务。
服务全面上比较:当然是springloud更胜一筹,但也就意味着在使用springloud上其实更重量级一点,dubbo目前版本专注于服务治理,使用上更轻量一点。
就国内的热度来说,如果我们看百度指数的查询结果,springloud和dubbo几乎是半斤八两,dubbo相比起来还略胜一筹
总的来说对外开放的服务推荐采用RESTful,内部调用推荐采用RPC方式。当然不能一概而论,还要看具体的业务场景。