我们知道Dubbo是一个RPC框架,那RPC框架需要实现什么?需要实现的是调用远程服务和本地服务一样方便,同时提高调用远程服务的性能。而服务端和客户端之间的关系,其实就是一个生产和消费的关系。
客户端与服务端交互关系图
1.服务消费方以本地调用方式调用服务2.clientstub接收到调用后负责将方法、参数等封装成能够进行网络传输的消息体3.clientstub将消息进行编码并发送到服务端4.serverstub根据解码结果调用本地的服务5.serverstub将返回导入结果进行编码并发送至消费方6.本地服务执行并将结果返回给serverstub7.serverstub将返回导入结果进行编码并发送至消费方8.clientstub接收到消息并进行解码9.服务消费方(client)得到结果
RPC的目标是将2-8步骤进行封装,用户无需关系这些细节,也即实现远程调用和调用本地方法一样。
Server服务提供方
server
/*** 服务提供方* @author Administrator**/publicinterfaceHelloNetty { Stringhello(); } /*** 实现HelloNetty接口* @author Administrator**/publicclassHelloNettyImplimplementsHelloNetty { publicStringhello() { return"hello,netty"; } } /*** 服务提供方* @author Administrator**/publicinterfaceHelloRPC { Stringhello(Stringname); } /*** HelloRPC接口的实现* @author Administrator**/publicclassHelloRPCImplimplementsHelloRPC { publicStringhello(Stringname) { return"hello,"+name; } }
Server Stub
封装需要传递的消息实体类
/*** 封装类信息,实体类用来封装消费方发起远程调用时传给服务方的数据* @author Administrator**///封装类信息publicclassClassInfoimplementsSerializable { privatestaticfinallongserialVersionUID=1L; privateStringclassName; //类名privateStringmethodName;//方法名privateClass<?>[] types; //参数类型privateObject[] objects;//参数列表publicStringgetClassName() { returnclassName; } publicvoidsetClassName(StringclassName) { this.className=className; } publicStringgetMethodName() { returnmethodName; } publicvoidsetMethodName(StringmethodName) { this.methodName=methodName; } publicClass<?>[] getTypes() { returntypes; } publicvoidsetTypes(Class<?>[] types) { this.types=types; } publicObject[] getObjects() { returnobjects; } publicvoidsetObjects(Object[] objects) { this.objects=objects; } }
服务器端业务处理
/*** 服务器端业务处理类* @author Administrator**/publicclassInvokeHandlerextendsChannelInboundHandlerAdapter{ //得到某接口下某个实现类的名字privateStringgetImplClassName(ClassInfoclassInfo) throwsException{ //服务方接口和实现类所在的包路径StringinterfacePath="com.study.nettyRpc.server"; intlastDot=classInfo.getClassName().lastIndexOf("."); StringinterfaceName=classInfo.getClassName().substring(lastDot); ClasssuperClass=Class.forName(interfacePath+interfaceName); Reflectionsreflections=newReflections(interfacePath); //得到某接口下的所有实现类Set<Class>ImplClassSet=reflections.getSubTypesOf(superClass); if(ImplClassSet.size()==0){ System.out.println("未找到实现类"); returnnull; }elseif(ImplClassSet.size()>1){ System.out.println("找到多个实现类,未明确使用哪一个"); returnnull; }else { //把集合转换为数组Class[] classes=ImplClassSet.toArray(newClass[0]); returnclasses[0].getName(); //得到实现类的名字 } } //读取客户端发来的数据并通过反射调用实现类的方法publicvoidchannelRead(ChannelHandlerContextctx, Objectmsg) throwsException { ClassInfoclassInfo= (ClassInfo) msg; Objectclazz=Class.forName(getImplClassName(classInfo)).newInstance(); Methodmethod=clazz.getClass().getMethod(classInfo.getMethodName(), classInfo.getTypes()); //通过反射调用实现类的方法Objectresult=method.invoke(clazz, classInfo.getObjects()); ctx.writeAndFlush(result); } }
网络处理服务器Server端
/*** 网络处理服务器* @author Administrator**/publicclassNettyRPCServer { privateintport; publicNettyRPCServer(intport) { this.port=port; } publicvoidstart() { EventLoopGroupbossGroup=newNioEventLoopGroup(); EventLoopGroupworkerGroup=newNioEventLoopGroup(); try { ServerBootstrapserverBootstrap=newServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true) .localAddress(port).childHandler( newChannelInitializer<SocketChannel>() { protectedvoidinitChannel(SocketChannelch) throwsException { ChannelPipelinepipeline=ch.pipeline(); //编码器pipeline.addLast("encoder", newObjectEncoder()); //解码器pipeline.addLast("decoder", newObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null))); //服务器端业务处理类pipeline.addLast(newInvokeHandler()); } }); ChannelFuturefuture=serverBootstrap.bind(port).sync(); System.out.println("......server is ready......"); future.channel().closeFuture().sync(); } catch (Exceptione) { //优雅关闭bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } publicstaticvoidmain(String[] args) throwsException { newNettyRPCServer(9999).start(); } }
Client客户端
client stub
客户端代理类
/*** 客户端代理类* @author Administrator**/publicclassNettyRPCProxy { //根据接口创建代理对象publicstaticObjectcreate(Classtarget){ returnProxy.newProxyInstance(target.getClassLoader(), newClass[]{target}, newInvocationHandler(){ publicObjectinvoke(Objectproxy, Methodmethod, Object[] args) throwsThrowable { //封装ClassInfoClassInfoclassInfo=newClassInfo(); classInfo.setClassName(target.getName()); classInfo.setMethodName(method.getName()); classInfo.setObjects(args); classInfo.setTypes(method.getParameterTypes()); //开始用Netty发送数据EventLoopGroupgroup=newNioEventLoopGroup(); ResultHandlerresultHandler=newResultHandler(); try{ Bootstrapb=newBootstrap(); b.group(group) .channel(NioSocketChannel.class) .handler(newChannelInitializer<SocketChannel>(){ protectedvoidinitChannel(SocketChannelch) throwsException { ChannelPipelinepipeline=ch.pipeline(); //编码器pipeline.addLast("encoder",newObjectEncoder()); //解码器pipeline.addLast("decoder",newObjectDecoder(Integer.MAX_VALUE,ClassResolvers.cacheDisabled(null))); //客户端业务处理类pipeline.addLast("handler",resultHandler); } }); ChannelFuturefuture=b.connect("127.0.0.1",9999).sync(); future.channel().writeAndFlush(classInfo).sync(); future.channel().closeFuture().sync(); }finally{ group.shutdownGracefully(); } returnresultHandler.getResponse(); } }); } }
客户端业务处理类
/*** 客户端业务处理类* @author Administrator**/publicclassResultHandlerextendsChannelInboundHandlerAdapter{ privateObjectresponse; publicObjectgetResponse(){ returnresponse; } //读取服务器端返回的数据(远程调用的结果)publicvoidchannelRead(ChannelHandlerContextctx,Objectmsg){ response=msg; ctx.close(); } }
客户端调用
/*** 服务调用方* * @author Administrator**/publicclassTestNettyRPC { publicstaticvoidmain(String[] args) { //第1次远程调用HelloNettyhelloNetty=(HelloNetty) NettyRPCProxy.create(HelloNetty.class); System.out.println(helloNetty.hello()); //第2次远程调用HelloRPChelloRPC= (HelloRPC) NettyRPCProxy.create(HelloRPC.class); System.out.println(helloRPC.hello("RPC")); } }
启动之后运行结果