Netty实现简单RPC调用

简介: 我们知道Dubbo是一个RPC框架,那RPC框架需要实现什么?需要实现的是调用远程服务和本地服务一样方便,同时提高调用远程服务的性能。而服务端和客户端之间的关系,其实就是一个生产和消费的关系。

我们知道Dubbo是一个RPC框架,那RPC框架需要实现什么?需要实现的是调用远程服务和本地服务一样方便,同时提高调用远程服务的性能。而服务端和客户端之间的关系,其实就是一个生产和消费的关系。

客户端与服务端交互关系图

0 (11).jpg

1.服务消费方以本地调用方式调用服务2.clientstub接收到调用后负责将方法、参数等封装成能够进行网络传输的消息体3.clientstub将消息进行编码并发送到服务端4.serverstub根据解码结果调用本地的服务5.serverstub将返回导入结果进行编码并发送至消费方6.本地服务执行并将结果返回给serverstub7.serverstub将返回导入结果进行编码并发送至消费方8.clientstub接收到消息并进行解码9.服务消费方(client)得到结果

RPC的目标是将2-8步骤进行封装,用户无需关系这些细节,也即实现远程调用和调用本地方法一样。

Server服务提供方

server
/*** 服务提供方* @author Administrator**/publicinterfaceHelloNetty {
Stringhello();
}
/*** 实现HelloNetty接口* @author Administrator**/publicclassHelloNettyImplimplementsHelloNetty {
@OverridepublicStringhello() {
return"hello,netty";
    }
}
/*** 服务提供方* @author Administrator**/publicinterfaceHelloRPC {
Stringhello(Stringname);
}
/*** HelloRPC接口的实现* @author Administrator**/publicclassHelloRPCImplimplementsHelloRPC {
@OverridepublicStringhello(Stringname) {
return"hello,"+name;
    }
}
Server Stub

封装需要传递的消息实体类

/*** 封装类信息,实体类用来封装消费方发起远程调用时传给服务方的数据* @author Administrator**///封装类信息publicclassClassInfoimplementsSerializable {
privatestaticfinallongserialVersionUID=1L;
privateStringclassName;  //类名privateStringmethodName;//方法名privateClass<?>[] types; //参数类型privateObject[] objects;//参数列表publicStringgetClassName() {
returnclassName;
  }
publicvoidsetClassName(StringclassName) {
this.className=className;
  }
publicStringgetMethodName() {
returnmethodName;
  }
publicvoidsetMethodName(StringmethodName) {
this.methodName=methodName;
  }
publicClass<?>[] getTypes() {
returntypes;
  }
publicvoidsetTypes(Class<?>[] types) {
this.types=types;
  }
publicObject[] getObjects() {
returnobjects;
  }
publicvoidsetObjects(Object[] objects) {
this.objects=objects;
  }
}
服务器端业务处理
/*** 服务器端业务处理类* @author Administrator**/publicclassInvokeHandlerextendsChannelInboundHandlerAdapter{
//得到某接口下某个实现类的名字privateStringgetImplClassName(ClassInfoclassInfo) throwsException{
//服务方接口和实现类所在的包路径StringinterfacePath="com.study.nettyRpc.server";
intlastDot=classInfo.getClassName().lastIndexOf(".");
StringinterfaceName=classInfo.getClassName().substring(lastDot);
ClasssuperClass=Class.forName(interfacePath+interfaceName);
Reflectionsreflections=newReflections(interfacePath);
//得到某接口下的所有实现类Set<Class>ImplClassSet=reflections.getSubTypesOf(superClass);
if(ImplClassSet.size()==0){
System.out.println("未找到实现类");
returnnull;
        }elseif(ImplClassSet.size()>1){
System.out.println("找到多个实现类,未明确使用哪一个");
returnnull;
        }else {
//把集合转换为数组Class[] classes=ImplClassSet.toArray(newClass[0]);
returnclasses[0].getName(); //得到实现类的名字        }
    }
@Override//读取客户端发来的数据并通过反射调用实现类的方法publicvoidchannelRead(ChannelHandlerContextctx, Objectmsg) throwsException {
ClassInfoclassInfo= (ClassInfo) msg;
Objectclazz=Class.forName(getImplClassName(classInfo)).newInstance();
Methodmethod=clazz.getClass().getMethod(classInfo.getMethodName(), classInfo.getTypes());
//通过反射调用实现类的方法Objectresult=method.invoke(clazz, classInfo.getObjects());
ctx.writeAndFlush(result);
    }
}
网络处理服务器Server端
/*** 网络处理服务器* @author Administrator**/publicclassNettyRPCServer {
privateintport;
publicNettyRPCServer(intport) {
this.port=port;
  }
publicvoidstart() {
EventLoopGroupbossGroup=newNioEventLoopGroup();
EventLoopGroupworkerGroup=newNioEventLoopGroup();
try {
ServerBootstrapserverBootstrap=newServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
                  .channel(NioServerSocketChannel.class)
                  .option(ChannelOption.SO_BACKLOG, 128)
                  .childOption(ChannelOption.SO_KEEPALIVE, true)
                  .localAddress(port).childHandler(
newChannelInitializer<SocketChannel>() {
@OverrideprotectedvoidinitChannel(SocketChannelch) throwsException {
ChannelPipelinepipeline=ch.pipeline();
//编码器pipeline.addLast("encoder", newObjectEncoder());
//解码器pipeline.addLast("decoder", newObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));
//服务器端业务处理类pipeline.addLast(newInvokeHandler());
                              }
                          });
ChannelFuturefuture=serverBootstrap.bind(port).sync();
System.out.println("......server is ready......");
future.channel().closeFuture().sync();
      } catch (Exceptione) {
//优雅关闭bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
      }
  }
publicstaticvoidmain(String[] args) throwsException {
newNettyRPCServer(9999).start();
  }
}

Client客户端

client stub

客户端代理类
/*** 客户端代理类* @author Administrator**/publicclassNettyRPCProxy {
//根据接口创建代理对象publicstaticObjectcreate(Classtarget){
returnProxy.newProxyInstance(target.getClassLoader(), newClass[]{target}, newInvocationHandler(){
@OverridepublicObjectinvoke(Objectproxy, Methodmethod, Object[] args) throwsThrowable {
//封装ClassInfoClassInfoclassInfo=newClassInfo();
classInfo.setClassName(target.getName());
classInfo.setMethodName(method.getName());
classInfo.setObjects(args);
classInfo.setTypes(method.getParameterTypes());
//开始用Netty发送数据EventLoopGroupgroup=newNioEventLoopGroup();
ResultHandlerresultHandler=newResultHandler();
try{
Bootstrapb=newBootstrap();
b.group(group)
                 .channel(NioSocketChannel.class)
                 .handler(newChannelInitializer<SocketChannel>(){
@OverrideprotectedvoidinitChannel(SocketChannelch) throwsException {
ChannelPipelinepipeline=ch.pipeline();
//编码器pipeline.addLast("encoder",newObjectEncoder());
//解码器pipeline.addLast("decoder",newObjectDecoder(Integer.MAX_VALUE,ClassResolvers.cacheDisabled(null)));
//客户端业务处理类pipeline.addLast("handler",resultHandler);
                    }
                 });
ChannelFuturefuture=b.connect("127.0.0.1",9999).sync();
future.channel().writeAndFlush(classInfo).sync();
future.channel().closeFuture().sync();
            }finally{
group.shutdownGracefully();
            }
returnresultHandler.getResponse();
        }             
      });
  }
}
客户端业务处理类
/*** 客户端业务处理类* @author Administrator**/publicclassResultHandlerextendsChannelInboundHandlerAdapter{
privateObjectresponse;
publicObjectgetResponse(){
returnresponse;
    }
//读取服务器端返回的数据(远程调用的结果)@OverridepublicvoidchannelRead(ChannelHandlerContextctx,Objectmsg){
response=msg;
ctx.close();
    }
}
客户端调用
/*** 服务调用方* * @author Administrator**/publicclassTestNettyRPC {
publicstaticvoidmain(String[] args) {
//第1次远程调用HelloNettyhelloNetty=(HelloNetty) NettyRPCProxy.create(HelloNetty.class);
System.out.println(helloNetty.hello());
//第2次远程调用HelloRPChelloRPC=  (HelloRPC) NettyRPCProxy.create(HelloRPC.class);
System.out.println(helloRPC.hello("RPC"));
    }
}

启动之后运行结果

0 (12).jpg

0 (13).jpg

目录
相关文章
|
Dubbo Java 应用服务中间件
Netty入门到超神系列-手撸简单版RPC框架(仿Dubbo)
原理还是比较简单 : 代理 + 线程池 + Netty 下面做一些解释: 首先需要定义一个统一的API接口,例:UserApi , 服务端(provider)需要实现这个接口,提供相应的方法UserApiImpl#save,客户端通过远程来调用该接口。 然后需要约定一个协议,服务器如何才能识别到客户端要调用哪个接口?:我这里用 “接口权限定名#方法名#参数” ,的方式来,因为是一个简单版本的RPC。服务端解析该内容就能匹配对应的接口的实现类,然后调用该方法。并把方法的返回值通过Netty写回给客户端 使用的编解码器都是比价简单的String的编解码器
162 0
|
前端开发
Netty手写RPC框架
创建Request类,继承Message,klass是调用的Class目标,name,parameterType,argument分别是方法名称,参数类型,参数
87 0
|
5月前
|
Java Spring
Spring Boot+Netty实现远程过程调用(RPC)
Spring Boot+Netty实现远程过程调用(RPC)
140 0
|
Java 中间件 大数据
Netty快速入门RPC项目
Netty快速入门RPC项目
72 0
Netty快速入门RPC项目
|
12月前
|
负载均衡
06RPC - netty实现RPC以及Zookeeper
06RPC - netty实现RPC以及Zookeeper
51 0
|
前端开发 JavaScript Java
Seata 高性能RPC通信的实现基石-Netty篇
Netty是一个异步的、基于事件驱动的网络应用框架,用以快速开发高性能、高可靠性的网络 IO 程序。
169 0
|
XML 存储 JSON
JAVA面试——Netty 与 RPC(二)
JAVA面试——Netty 与 RPC
130 0
JAVA面试——Netty 与 RPC(二)
|
XML 编解码 弹性计算
JAVA面试——Netty 与 RPC(一)
JAVA面试——Netty 与 RPC
211 0
JAVA面试——Netty 与 RPC(一)
|
Dubbo Java 应用服务中间件
第 11 章 用 Netty 自己实现 Dubbo RPC
第 11 章 用 Netty 自己实现 Dubbo RPC
172 0
|
Java Spring
Spring Boot+Netty实现远程过程调用(RPC)
Spring Boot+Netty实现远程过程调用(RPC)