第 11 章 用 Netty 自己实现 Dubbo RPC

简介: 第 11 章 用 Netty 自己实现 Dubbo RPC

11.1 RPC 基本介绍

  1. RPC(Remote Procedure Call)—远程过程调用,是一个计算机通信协议。该协议允许运行于一台计算机的程序调用另一台计算机的子程序,而程序员无需额外地为这个交互作用编程
  2. 两个或多个应用程序都分布在不同的服务器上,它们之间的调用都像是本地方法调用一样(如图)

  1. 常见的 RPC 框架有:比较知名的如阿里的 DubboGooglegRPCGo 语言的 rpcxApachethriftSpring 旗下的 SpringCloud

11.2 RPC 调用流程图

11.3 PRC 调用流程说明

  1. 服务消费方(client)以本地调用方式调用服务
  2. client stub 接收到调用后负责将方法、参数等封装成能够进行网络传输的消息体
  3. client stub 将消息进行编码并发送到服务端
  4. server stub 收到消息后进行解码
  5. server stub 根据解码结果调用本地的服务
  6. 本地服务执行并将结果返回给 server stub
  7. server stub 将返回导入结果进行编码并发送至消费方
  8. client stub 接收到消息并进行解码
  9. 服务消费方(client)得到结果

小结:RPC 的目标就是将 2 - 8 这些步骤都封装起来,用户无需关心这些细节,可以像调用本地方法一样即可完成远程服务调用

11.4 自己实现 Dubbo RPC(基于 Netty)

11.4.1 需求说明

  1. Dubbo 底层使用了 Netty 作为网络通讯框架,要求用 Netty 实现一个简单的 RPC 框架
  2. 模仿 Dubbo,消费者和提供者约定接口和协议,消费者远程调用提供者的服务,提供者返回一个字符串,消费者打印提供者返回的数据。底层网络通信使用 Netty 4.1.20

11.4.2 设计说明

  1. 创建一个接口,定义抽象方法。用于消费者和提供者之间的约定。
  2. 创建一个提供者,该类需要监听消费者的请求,并按照约定返回数据。
  3. 创建一个消费者,该类需要透明的调用自己不存在的方法,内部需要使用 Netty 请求提供者返回数据
  4. 开发的分析图

11.4.3 代码实现

package com.atguigu.netty.dubborpc.publicinterface;
//这个是接口,是服务提供方和 服务消费方都需要
public interface HelloService {
    String hello(String mes);
}
package com.atguigu.netty.dubborpc.provider;
import com.atguigu.netty.dubborpc.publicinterface.HelloService;
public class HelloServiceImpl implements HelloService {
    private static int count = 0;
    //当有消费方调用该方法时, 就返回一个结果
    @Override
    public String hello(String mes) {
        System.out.println("收到客户端消息=" + mes);
        //根据mes 返回不同的结果
        if (mes != null) {
            return "你好客户端, 我已经收到你的消息 [" + mes + "] 第" + (++count) + " 次";
        } else {
            return "你好客户端, 我已经收到你的消息 ";
        }
    }
}
package com.atguigu.netty.dubborpc.provider;
import com.atguigu.netty.dubborpc.netty.NettyServer;
//ServerBootstrap 会启动一个服务提供者,就是 NettyServer
public class ServerBootstrap {
    public static void main(String[] args) {
        //代码代填..
        NettyServer.startServer("127.0.0.1", 7000);
    }
}
package com.atguigu.netty.dubborpc.netty;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
public class NettyServer {
    public static void startServer(String hostName, int port) {
        startServer0(hostName, port);
    }
    //编写一个方法,完成对NettyServer的初始化和启动
    private static void startServer0(String hostname, int port) {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                                      @Override
                                      protected void initChannel(SocketChannel ch) throws Exception {
                                          ChannelPipeline pipeline = ch.pipeline();
                                          pipeline.addLast(new StringDecoder());
                                          pipeline.addLast(new StringEncoder());
                                          pipeline.addLast(new NettyServerHandler()); //业务处理器
                                      }
                                  }
                    );
            ChannelFuture channelFuture = serverBootstrap.bind(hostname, port).sync();
            System.out.println("服务提供方开始提供服务~~");
            channelFuture.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}
package com.atguigu.netty.dubborpc.netty;
import com.atguigu.netty.dubborpc.customer.ClientBootstrap;
import com.atguigu.netty.dubborpc.provider.HelloServiceImpl;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
//服务器这边handler比较简单
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //获取客户端发送的消息,并调用服务
        System.out.println("msg=" + msg);
        //客户端在调用服务器的api 时,我们需要定义一个协议
        //比如我们要求 每次发消息是都必须以某个字符串开头 "HelloService#hello#你好"
        if (msg.toString().startsWith(ClientBootstrap.providerName)) {
            String result = new HelloServiceImpl().hello(msg.toString().substring(msg.toString().lastIndexOf("#") + 1));
            ctx.writeAndFlush(result);
        }
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}
package com.atguigu.netty.dubborpc.netty;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.util.concurrent.Callable;
public class NettyClientHandler extends ChannelInboundHandlerAdapter implements Callable {
    private ChannelHandlerContext context;//上下文
    private String result; //返回的结果
    private String para; //客户端调用方法时,传入的参数
    //与服务器的连接创建后,就会被调用, 这个方法是第一个被调用(1)
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println(" channelActive 被调用  ");
        context = ctx; //因为我们在其它方法会使用到 ctx
    }
    //收到服务器的数据后,调用方法 (4)
    //
    @Override
    public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println(" channelRead 被调用  ");
        result = msg.toString();
        notify(); //唤醒等待的线程
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
    //被代理对象调用, 发送数据给服务器,-> wait -> 等待被唤醒(channelRead) -> 返回结果 (3)-》5
    @Override
    public synchronized Object call() throws Exception {
        System.out.println(" call1 被调用  ");
        context.writeAndFlush(para);
        //进行wait
        wait(); //等待channelRead 方法获取到服务器的结果后,唤醒
        System.out.println(" call2 被调用  ");
        return result; //服务方返回的结果
    }
    //(2)
    void setPara(String para) {
        System.out.println(" setPara  ");
        this.para = para;
    }
}
package com.atguigu.netty.dubborpc.netty;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import java.lang.reflect.Proxy;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class NettyClient {
    //创建线程池
    private static ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
    private static NettyClientHandler client;
    private int count = 0;
    //编写方法使用代理模式,获取一个代理对象
    public Object getBean(final Class<?> serivceClass, final String providerName) {
        return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
                new Class<?>[]{serivceClass}, (proxy, method, args) -> {
                    System.out.println("(proxy, method, args) 进入...." + (++count) + " 次");
                    //{}  部分的代码,客户端每调用一次 hello, 就会进入到该代码
                    if (client == null) {
                        initClient();
                    }
                    //设置要发给服务器端的信息
                    //providerName 协议头 args[0] 就是客户端调用api hello(???), 参数
                    client.setPara(providerName + args[0]);
                    //
                    return executor.submit(client).get();
                });
    }
    //初始化客户端
    private static void initClient() {
        client = new NettyClientHandler();
        //创建EventLoopGroup
        NioEventLoopGroup group = new NioEventLoopGroup();
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(group)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.TCP_NODELAY, true)
                .handler(
                        new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ChannelPipeline pipeline = ch.pipeline();
                                pipeline.addLast(new StringDecoder());
                                pipeline.addLast(new StringEncoder());
                                pipeline.addLast(client);
                            }
                        }
                );
        try {
            bootstrap.connect("127.0.0.1", 7000).sync();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
package com.atguigu.netty.dubborpc.customer;
import com.atguigu.netty.dubborpc.netty.NettyClient;
import com.atguigu.netty.dubborpc.publicinterface.HelloService;
public class ClientBootstrap {
    //这里定义协议头
    public static final String providerName = "HelloService#hello#";
    public static void main(String[] args) throws Exception {
        //创建一个消费者
        NettyClient customer = new NettyClient();
        //创建代理对象
        HelloService service = (HelloService) customer.getBean(HelloService.class, providerName);
        for (; ; ) {
            Thread.sleep(2 * 1000);
            //通过代理对象调用服务提供者的方法(服务)
            String res = service.hello("你好 dubbo~");
            System.out.println("调用的结果 res= " + res);
        }
    }
}


目录
相关文章
|
8月前
|
Dubbo Java 应用服务中间件
微服务学习 | Springboot整合Dubbo+Nacos实现RPC调用
微服务学习 | Springboot整合Dubbo+Nacos实现RPC调用
|
8月前
|
负载均衡 Dubbo Java
Dubbo 3.x:探索阿里巴巴的开源RPC框架新技术
随着微服务架构的兴起,远程过程调用(RPC)框架成为了关键组件。Dubbo,作为阿里巴巴的开源RPC框架,已经演进到了3.x版本,带来了许多新特性和技术改进。本文将探讨Dubbo 3.x中的一些最新技术,包括服务注册与发现、负载均衡、服务治理等,并通过代码示例展示其使用方式。
429 9
|
8月前
|
Dubbo Cloud Native 网络协议
【Dubbo3技术专题】「服务架构体系」第一章之Dubbo3新特性要点之RPC协议分析介绍
【Dubbo3技术专题】「服务架构体系」第一章之Dubbo3新特性要点之RPC协议分析介绍
109 1
|
2月前
|
Dubbo 安全 应用服务中间件
Apache Dubbo 正式发布 HTTP/3 版本 RPC 协议,弱网效率提升 6 倍
在 Apache Dubbo 3.3.0 版本之后,官方推出了全新升级的 Triple X 协议,全面支持 HTTP/1、HTTP/2 和 HTTP/3 协议。本文将围绕 Triple 协议对 HTTP/3 的支持进行详细阐述,包括其设计目标、实际应用案例、性能测试结果以及源码架构分析等内容。
|
4月前
|
XML 负载均衡 监控
分布式-dubbo-简易版的RPC框架
分布式-dubbo-简易版的RPC框架
|
8月前
|
负载均衡 Dubbo Java
理解 Netty 和 Dubbo 之间的关系:相互作用与作用域
在现代分布式系统和微服务架构中,高性能、高可用的通信框架是成功的关键要素之一。Netty 和 Dubbo 是两个非常流行的开源项目,它们在分布式系统的构建中发挥着重要作用。
|
8月前
|
Dubbo Java 应用服务中间件
Spring Boot整合Dubbo+Zookeeper实现RPC调用
Spring Boot整合Dubbo+Zookeeper实现RPC调用 技术栈说明 Dubbo:Dubbo作为RPC框架,能在多个服务之间实现远程服务的调用。比如有两个独立的微服务A和B,A服务想要调用B服务时,因为两者不在同个内存空间中,不能直接调用,所以可以通过Dubbo实现这点。 功能和Spring Cloud的Feign相同,两者都是应用于微服务架构的远程调用框架 Zookeeper:作为注册中心去管理Dubbo服务,这点和Eureka、Nacos相同。 概述 通过一个示例说明Dubbo+Zookeeper在Spring Boot中的应用。 现有两个服务provider和con
202 4
|
8月前
|
Java fastjson 数据安全/隐私保护
【Dubbo3技术专题】「云原生微服务开发实战」 一同探索和分析研究RPC服务的底层原理和实现
【Dubbo3技术专题】「云原生微服务开发实战」 一同探索和分析研究RPC服务的底层原理和实现
169 0
|
8月前
|
Java Spring
Spring Boot+Netty实现远程过程调用(RPC)
Spring Boot+Netty实现远程过程调用(RPC)
175 0
|
XML Dubbo Java
Dubbo第一讲:从RPC到Dubbo框架详解
Dubbo第一讲:从RPC到Dubbo框架详解
164 0