Netty入门到超神系列-手撸简单版RPC框架(仿Dubbo)

简介: 原理还是比较简单 : 代理 + 线程池 + Netty 下面做一些解释:首先需要定义一个统一的API接口,例:UserApi , 服务端(provider)需要实现这个接口,提供相应的方法UserApiImpl#save,客户端通过远程来调用该接口。然后需要约定一个协议,服务器如何才能识别到客户端要调用哪个接口?:我这里用 “接口权限定名#方法名#参数” ,的方式来,因为是一个简单版本的RPC。服务端解析该内容就能匹配对应的接口的实现类,然后调用该方法。并把方法的返回值通过Netty写回给客户端使用的编解码器都是比价简单的String的编解码器

前言

学过dubbo的应该知道dubbo底层基于Netty实现,为了加强对Netty的理解,这篇文章我们来仿照dubbo手撸一个简易版本的RPC框架

结构理解

先来看一张图

原理还是比较简单 : 代理 + 线程池 + Netty 下面做一些解释:

  • 首先需要定义一个统一的API接口,例:UserApi , 服务端(provider)需要实现这个接口,提供相应的方法UserApiImpl#save,客户端通过远程来调用该接口。
  • 然后需要约定一个协议,服务器如何才能识别到客户端要调用哪个接口?:我这里用 “接口权限定名#方法名#参数” ,的方式来,因为是一个简单版本的RPC。服务端解析该内容就能匹配对应的接口的实现类,然后调用该方法。并把方法的返回值通过Netty写回给客户端
  • 使用的编解码器都是比价简单的String的编解码器
  • 提供者/服务端(provider)正常拉起NettyServer ,启动即可等待客户端的连接。
  • 消费者/客户端(consumer)需要调用UserApi API接口,但是在消费者这边是没有实现类的,消费者要做的事情就是在发起 UserApi#save调用的时候,底层通过Netty向服务端通信。内容就是 “接口权限定名#方法#参数”。
  • 消费者这边最好的是方式就是为接口生成代理,在代理类去约定协议,组装通信的内容,然后这里还用到了线程池,把发送Netty通信的工作交给新开的线程去处理。
  • 请求发送给服务端,服务端返回一个结果,通过Netty拿到结果返回给消费者即可,整个调用过程结束。

统一API

首先定义一个统一的接口,提供一个save方法

publicinterfaceUserApi {
Stringsave(Stringdata);
}

提供者

第一步:提供者编写一个实现类实现该接口,save方法,返回一个“success” , 消费者通过远程来调用该方法

//实现类publicclassUserApiImplimplementsUserApi {
@OverridepublicStringsave(Stringdata) {
System.out.println("调用方法 UserApiImpl#save ,参数: "+data);
return"success";
    }
}

第二步:编写NettyServer,监听的IP和端口通过方法传入,这里没有什么改动

//提供者方启动publicclassNettyServer {
publicvoidstart(Stringaddress,intport ){
NioEventLoopGroupbossGroup=newNioEventLoopGroup();
NioEventLoopGroupworkGroup=newNioEventLoopGroup();
ServerBootstrapbootstrap=newServerBootstrap();
bootstrap.group(bossGroup,workGroup);
bootstrap.channel(NioServerSocketChannel.class);
bootstrap.childHandler(newChannelInitializer<SocketChannel>() {
@OverrideprotectedvoidinitChannel(SocketChannelch) throwsException {
ChannelPipelinepipeline=ch.pipeline();
pipeline.addLast("decoder",newStringDecoder());
pipeline.addLast("encoder",newStringEncoder());
pipeline.addLast(newServerHandler());
            }
        });
try {
ChannelFuturesync=bootstrap.bind(address, port).sync();
sync.channel().closeFuture().sync();
        } catch (InterruptedExceptione) {
e.printStackTrace();
        }finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
        }
    }
}

第三步:编写Provider启动入口

//启动提供者publicclassProviderStart {
publicstaticvoidmain(String[] args) {
newNettyServer().start("127.0.0.1",1000);
    }
}

第四步:编写ServerHandler,在channelRead0读取到数据,需要解析内容,匹配相关的service接口,且调用方法,把结果写回给客户端

publicclassServerHandlerextendsSimpleChannelInboundHandler<String> {
//这里没有Spring的容器,就搞一个MapConcurrentHashMap<String, Object>applicationContext=newConcurrentHashMap<>();
publicServerHandler(){
applicationContext.put("cn.itsource.rpc.api.UserApi",newUserApiImpl());
    }
//消息内容约定   cn.itsource.rpc.api.UserApi#save#数据@OverrideprotectedvoidchannelRead0(ChannelHandlerContextctx, Stringmsg) throwsException {
System.out.println("服务端收到请求 -> "+msg);
String[] msgs=msg.split("#");
StringinterfaceClass=msgs[0];
StringmethodName=msgs[1];
Stringdata=msgs[2];
//调研Bean的方法Objectobj=applicationContext.get(interfaceClass);
Class<?>aClass=obj.getClass();
Methodmethod=aClass.getMethod(methodName,data.getClass());
//调用方法Objectresult=method.invoke(obj,data);
if(resultinstanceofString){
ctx.writeAndFlush((String)result);
        }else{
System.out.println("类型不支持");
        }
    }
}

上面只是简单模拟了一下servie调用,整合Sping可以根据类型去容器中获取Bean。到这服务端编写完成

消费者

消费者要复杂一些,第一步:先编写NettyClient

publicclassNettyClient {
//客户端处理器,是一个 CallableprivateClientHandlerclientHandler=null;
//初始化netty客户端publicvoidinit(Stringaddress, intport){
NioEventLoopGroupworkGroup=newNioEventLoopGroup();
Bootstrapbootstrap=newBootstrap();
bootstrap.group(workGroup);
bootstrap.channel(NioSocketChannel.class);
clientHandler=newClientHandler();
bootstrap.handler(newChannelInitializer<SocketChannel>() {
@OverrideprotectedvoidinitChannel(SocketChannelch) throwsException {
ChannelPipelinepipeline=ch.pipeline();
pipeline.addLast(newStringEncoder());
pipeline.addLast(newStringDecoder());
//一个客户端一个handlerpipeline.addLast(clientHandler);
            }
        });
try {
ChannelFuturesync=bootstrap.connect(address, port).sync();
        } catch (InterruptedExceptione) {
e.printStackTrace();
        }
    }
//执行任务的线程池ExecutorServiceexecutorService=Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
publicObjectgetBean(Class<UserApi>userApiClass) {
//创建 userApi 接口的代理ObjectproxyInstance=Proxy.newProxyInstance(userApiClass.getClassLoader(), newClass[]{userApiClass}, (proxy, method, args) -> {
//初始化客户端,连接Netty服务端if(clientHandler==null)init("127.0.0.1",1000);
//把协议头#数据 ,传递给handlerclientHandler.setContent(userApiClass.getName()+"#"+method.getName()+"#"+args[0]);
//把请求交给线程池处理,clientHandler就是一个线程returnexecutorService.submit(clientHandler).get();
        });
returnproxyInstance;
    }
}

NettyClient中提供了两个方法,init和 getBean方法。在init方法中去初始化NettyClient ,ClientHandler提升为了成员变量,因为下getBean面生成代理的时候会用到,然后getBean方法为接口生成了代理。在代理中把需要发送的内容“类权限定名#方法名#数据”交给Handler,把clientHandler交给线程池去执行。

第二步:编写Handler ,handler是一个Callable 如下

publicclassClientHandlerextendsSimpleChannelInboundHandler<String>implementsCallable<Object> {
//协议头:cn.itsource.rpc.api.UserApi#save#dataprivateStringcontent=null;
//上下文privateChannelHandlerContextctx;
//服务器返回的结果privateObjectresult;
publicvoidsetContent(Stringcontent){
this.content=content;
    }
@OverridepublicvoidchannelActive(ChannelHandlerContextctx) throwsException {
//当和服务器建立连接,就需要保存ChannelHandlerContext//在call方法中发请求会用到ChannelHandlerContextthis.ctx=ctx;
    }
@OverrideprotectedsynchronizedvoidchannelRead0(ChannelHandlerContextctx, Stringmsg) throwsException {
System.out.println("客户端收到结果 = "+msg);
//拿到服务器返回的结果this.result=msg;
//唤醒call方法取走结果notify();
    }
@OverridepublicsynchronizedObjectcall() throwsException {
//发送请求if(ctx==null){
System.out.println("RPC连接失败...");
returnnull;
        }
//把内容写给服务端ctx.writeAndFlush(content);
//服务器返回的结果是在 channelRead0 方法中拿到,这里等待wait();
returnresult;
    }
}

在 channelActive方法中,当和服务端建立连接就把ChannelHandlerContext上下文提升为成员变量,方便在call方法中使用。 当call方法被调用,使用ChannelHandlerContext把内容写给服务端,这个时候call方法需要等待服务端返回结果,使用了wait().

当服务端返回结果给客户端,客户端通过channelRead0方法拿到结果,并赋值给result成员变量,然后notify();唤醒wait的call方法,在call方法中就可以拿到结果返回。

第三步:就是通过NettyClient拿到UserApi代理,然后调用save方法

//消费者启动publicclassConsumerStart {
publicstaticvoidmain(String[] args) {
NettyClientnettyClient=newNettyClient();
UserApiuserApi= (UserApi)nettyClient.getBean(UserApi.class);
for(inti=0 ; i<10 ; i++){
Stringcontent="data"+i;
System.out.println("客户端发送数据,内容:"+content);
Stringresult=userApi.save(content);
System.out.println("客户端收到返回的数据,内容:"+result);
        }
    }
}

第四步 : 依次启动服务端,客户端,观看效果

客户端

服务端能拿到客户端传过去的数据,客户端也能收到服务端返回的结果。

目录
相关文章
|
1月前
|
Dubbo Java 应用服务中间件
Dubbo学习圣经:从入门到精通 Dubbo3.0 + SpringCloud Alibaba 微服务基础框架
尼恩团队的15大技术圣经,旨在帮助开发者系统化、体系化地掌握核心技术,提升技术实力,从而在面试和工作中脱颖而出。本文介绍了如何使用Dubbo3.0与Spring Cloud Gateway进行整合,解决传统Dubbo架构缺乏HTTP入口的问题,实现高性能的微服务网关。
|
2月前
|
Dubbo Java 应用服务中间件
微服务框架Dubbo环境部署实战
微服务框架Dubbo环境部署的实战指南,涵盖了Dubbo的概述、服务部署、以及Dubbo web管理页面的部署,旨在指导读者如何搭建和使用Dubbo框架。
223 17
微服务框架Dubbo环境部署实战
|
2月前
|
负载均衡 Dubbo NoSQL
Dubbo框架的1个核心设计点
Java领域要说让我最服气的RPC框架当属Dubbo,原因有许多,但是最吸引我的还是它把远程调用这个事情设计得很有艺术。
Dubbo框架的1个核心设计点
|
2月前
|
负载均衡 监控 Dubbo
分布式框架-dubbo
分布式框架-dubbo
|
2月前
|
XML 负载均衡 监控
分布式-dubbo-简易版的RPC框架
分布式-dubbo-简易版的RPC框架
|
3月前
|
开发框架 Dubbo 应用服务中间件
微服务开发框架-----Apache Dubbo
这篇文章介绍了Apache Dubbo微服务开发框架,它提供RPC通信和微服务治理能力,支持服务发现、负载均衡和流量治理等功能,并强调了Dubbo在微服务规模化实践和企业级治理方面的优势。
微服务开发框架-----Apache Dubbo
|
3月前
|
缓存 负载均衡 监控
Dubbo框架整体认知
该文章主要介绍了Dubbo框架的整体认知,包括Dubbo的概念、产生的背景、解决的问题、架构以及功能特性等。
Dubbo框架整体认知
|
6月前
|
设计模式 负载均衡 网络协议
【分布式技术专题】「分布式技术架构」实践见真知,手把手教你如何实现一个属于自己的RPC框架(架构技术引导篇)
【分布式技术专题】「分布式技术架构」实践见真知,手把手教你如何实现一个属于自己的RPC框架(架构技术引导篇)
267 0
|
17天前
|
自然语言处理 负载均衡 API
gRPC 一种现代、开源、高性能的远程过程调用 (RPC) 可以在任何地方运行的框架
gRPC 是一种现代开源高性能远程过程调用(RPC)框架,支持多种编程语言,可在任何环境中运行。它通过高效的连接方式,支持负载平衡、跟踪、健康检查和身份验证,适用于微服务架构、移动设备和浏览器客户端连接后端服务等场景。gRPC 使用 Protocol Buffers 作为接口定义语言,支持四种服务方法:一元 RPC、服务器流式处理、客户端流式处理和双向流式处理。
|
3月前
|
Dubbo 网络协议 Java
RPC框架:一文带你搞懂RPC
这篇文章全面介绍了RPC(远程过程调用)的概念、原理和应用场景,解释了RPC如何工作以及为什么在分布式系统中广泛使用,并探讨了几种常用的RPC框架如Thrift、gRPC、Dubbo和Spring Cloud,同时详细阐述了RPC调用流程和实现透明化远程服务调用的关键技术,包括动态代理和消息的编码解码过程。
RPC框架:一文带你搞懂RPC