花了一星期,自己写了个简单的RPC框架

简介: 花了一星期,自己写了个简单的RPC框架

想法的来源


      学了netty框架以及看了一下一小部分的netty框架的源码,听说dubbo是基于netty框架的一个优秀的落地实现,所以看了一小部分dubbo的源码,感觉学习netty总要有一个方式证明自己曾经学过,所以写下这一篇小笔记,写给自己看。


源码下载


https://github.com/cbeann/NettyRpcDemooo


前提


Zookeeper知识点


微信截图_20230225232513.png


Netty知识点


Spring的生命周期


自定义starter


动手实践


注意


下面只贴了部分代码和写这个小demo的想法,全部代码在  https://github.com/cbeann/NettyRpcDemooo  中


目标


1)zk做为注册中心


2)服务提供者提供集群方法,服务消费者轮询调用


3)服务提供者上线和下线服务消费者可以感知


4)整合自定义starter


5)没有使用序列化,使用json字符串进行接口调用


RPC程序需要的配置参数


服务提供者


注册中心ZK的IP


注册中心ZK的端口


服务提供者的名称


服务提供者的端口(NettyServer的端口,不是SpringBoot的端口)


服务消费者


注册中心ZK的IP


注册中心ZK的端口


服务消费者的名称


zk中服务提供者的存储目录结构


如果要满足集群版本的服务提供者存储,zk的存储设计也应该好好的想一想,有一点可以明确的创建的节点有临时节点,这样服务消费者才能通过某种机制监听到服务提供者上下线并进行业务逻辑操作。


本文中的存储结构如图所示


/rpc为项目根目录(持久化节点)

/rpc/provider下存储的是服务提供者集群(持久化节点)

/rpc/provider/XXX存储的是服务提供者名称(持久化节点)

/rpc/provider/XXX/IP:端口存储的提供者名称为XXX的实例(临时节点)


1.png


服务提供者思路


服务提供者的思路其实比服务消费者简单多了,其实只需要解决把ioc容器放入到自定义的SimpleChannelInboundHandler中,然后读取json字符串获取class、方法和参数,然后在ioc容器中获取类并反射调用方法返回结果即可。


在配置文件中拿到服务提供者NettyServer中ServerBootstrap的端口,然后启动ServerBootstrap,并向zk暴露自己的服务(添加临时节点信息),服务提供者就完成了。


  • 1)在配置文件中拿到ServerBootstrap的端口,调用下面的构造器

public NettyServer(int port) {
    this.port = port;
  }


  • 2)通过ApplicationContextAware,保存ApplicationContext在ServerHander中

public class NettyServer implements ApplicationContextAware{
private ApplicationContext applicationContext;
@Override
  public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
    this.applicationContext = applicationContext;
  }
}
/*
 socketChannel.pipeline().addLast(new RcpServerHandler(applicationContext));
*/


  • 3)最后通过Bean的生命周期,创建Bean完毕后调用@PostConstruct的方法

/** 启动Netty服务器 */
  @PostConstruct
  public void postConstruct() {
    // 向外暴露端口,zk添加服务信息
    doExport();
    // 异步开启netty服务器
    new Thread(
            () -> {
              this.bind();//ServerBootstrap.bind(ip)
            })
        .start();
  }


  • 4)此时Netty服务器已经启动并且保存ApplicationContext在ServerHander中

部分代码如下所示:


package com.rpc.server;
import com.rpc.properties.RpcProperties;
import com.rpc.properties.RpcServerProperties;
import com.rpc.zk.ZKServer;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import javax.annotation.PostConstruct;
import java.net.InetAddress;
/**
 * @author chaird
 * @create 2021-02-07 2:32
 */
public class NettyServer implements ApplicationContextAware {
  private Integer port;
  private ApplicationContext applicationContext;
  EventLoopGroup bossGroup = new NioEventLoopGroup();
  EventLoopGroup workerGroup = new NioEventLoopGroup();
  ServerBootstrap b = new ServerBootstrap();
  public NettyServer(int port) {
    this.port = port;
  }
  /** * 开启NettyServer的方法 */
  public void bind() {
    try {
      b.group(bossGroup, workerGroup)
          .channel(NioServerSocketChannel.class)
          .option(ChannelOption.SO_BACKLOG, 1024)
          .childHandler(
              new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel socketChannel) throws Exception {
                  // TimeClientHandler是自己定义的方法
                  socketChannel.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8));
                  socketChannel.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8));
                  socketChannel.pipeline().addLast(new RcpServerHandler(applicationContext));
                }
              });
      System.out.println("服务端    Netty服务器启动成功:" + port);
      // 绑定端口
      ChannelFuture f = b.bind(port).sync();
      // 等待服务端监听端口关闭
      f.channel().closeFuture().sync();
    } catch (Exception e) {
      e.printStackTrace();
    } finally {
      // 优雅关闭,释放线程池资源
      bossGroup.shutdownGracefully();
      workerGroup.shutdownGracefully();
    }
  }
  /** 启动Netty服务器 */
  @PostConstruct
  public void postConstruct() {
    // 向外暴露端口
    doExport();
    // 异步开启netty服务器
    new Thread(
            () -> {
              this.bind();
            })
        .start();
  }
  /** 服务暴露(其实就是把服务信息保存到Zookeeper上) */
  private void doExport() {
    ZKServer zkServer = applicationContext.getBean(ZKServer.class);
    RpcServerProperties rpcServerProperties = applicationContext.getBean(RpcServerProperties.class);
    RpcProperties rpcProperties = applicationContext.getBean(RpcProperties.class);
    //   providerGroupDir = /rpc/provider/myProviderName
    String providerGroupDir = rpcProperties.getPath() + rpcProperties.getProviderPath();
    providerGroupDir = providerGroupDir + "/" + rpcServerProperties.getProviderName();
    try {
      // 创建服务名目录(用于集群)
      zkServer.createPathPermanent(providerGroupDir, "");
    } catch (Exception e) {
      e.printStackTrace();
    }
    try {
      String providerAddress = InetAddress.getLocalHost().getHostAddress();
      String providerInstance = providerAddress + ":" + rpcServerProperties.getProviderPort();
      // key(path) = /rpc/provider/myProviderName/127.0.0.1:8080   value:127.0.0.1:8080
      zkServer.createPathTemp(providerGroupDir + "/" + providerInstance, providerInstance);
    } catch (Exception e) {
      e.printStackTrace();
    }
  }
  @Override
  public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
    this.applicationContext = applicationContext;
  }
}


服务消费者思路和难点


服务消费者则从构思遇到的难点进行拆解,服务消费者比服务提供者难多了!!!


服务消费者中怎么存储与多个服务提供者的关系?


2.png


客户端比如果你看过部分Netty的源码,那你比较容易下面的设计思路。没看过也没关系,反正我写的也是个demo,也很容易理解。


 首先从细粒度开始,客户端要和每一个服务提供者建立连接,即和每一个ServerBootstrap建立连接,所以会有如下的自定义实体类(ip,port)


//Netty连接服务提供者的自定义客户端
public class NettyClient {
  public NettyClient(String ip, Integer port) {
    this.ip = ip;
    this.port = port;
  }
  private String ip;
  private Integer port;
  EventLoopGroup group = new NioEventLoopGroup();
  Bootstrap b = new Bootstrap();
  //...
}


  • 如果我们设计的是一个服务提供者,那上面就没有问题了,如果要实现服务提供者的集群方案,那么就需要在上面的基础上再包装一层,定义为服务提供者组(集群),如下所示。

//服务提供者组(集群)
public class NettyClientGroup {
  /** 下一个下标 */
  private AtomicInteger index = new AtomicInteger(0);
  /** 服务提供者名称 */
  private String providerName;
  /** 服务提供者列表 */
  List<NettyClient> providerList = new ArrayList<>();
  /** key:服务提供者ip:端口    value:NettyClient */
  Map<String, NettyClient> providerMap = new HashMap<>();
}


  服务提供者的集群实现了,我可以从NettyClientGroup中选择NettyClient,但是服务提供者不是一种类型的,比如有订单服务提供者、用户服务提供者等,所以外面还要包装一层,NettyClientBootStarp,用于保存服务提供者组名称和服务提供者组的关系,如下所示


//服务消费者启动器
public class NettyClientBootStarp implements ApplicationContextAware {
  /**key:服务提供者组名称,服务提供者组*/
  Map<String, NettyClientGroup> providers = new HashMap<>();
  List<NettyClient> providerList = new ArrayList<>();
  public NettyClientBootStarp() {}
}


  • 此时,存储关系已经很明显了,下面用实例展示


3.png


服务消费者中怎么感知服务提供者上下线以及感知到如何处理?


  • 首先要监听到zk服务提供者节点的上下和下线,如下所示,其实我们是可以监听到的


//注册默认的watcher
zk = new ZooKeeper(url, 5000, watcher);
String listenProviderPath = path + providerPath;
// 给某节点添加watcher
zk.getChildren(listenProviderPath, true);


  • 监听到需要做什么业务逻辑呢?

就是把某个上线或者下线的服务提供者从NettyClientBootStarp 中移除,但是此处我用了一个比较极端的操作,即重新链接所有的服务提供者,这样就可以添加或者删除掉变动的提供者


public class RpcServiceChangeWatcher implements Watcher, ApplicationContextAware {
  private RpcProperties rpcProperties;
  private NettyClientBootStarp nettyClientBootStarp;
  @Override
  public void process(WatchedEvent event) {
    System.out.println(event);
    // 实际业务
    try {
      nettyClientBootStarp.refreshProviders();
    } catch (Exception e) {
      e.printStackTrace();
    }
    // 重新监听
    String providersPath = rpcProperties.getPath() + rpcProperties.getProviderPath();
    try {
      zkServer.getZk().getChildren(providersPath, true);
    } catch (Exception e) {
      e.printStackTrace();
    }
  }
}


怎么实现服务消费者调用正常的controller就能通过NettyClient发送数据到NettyServer?


如下所示,我在SpringBoot的客户端中的HelloController中放入StudentService(接口,非实现类),怎么就能发送数据呢????


@RestController
public class HelloController {
  @RpcService("provider01") // 自定义注解,其中value为服务提供者名称,类似OpenFeign的使用
  private StudentService studentService;
  @GetMapping("/index/{id}")
  public Object hello(@PathVariable Integer id) {
    String res = studentService.getId(id);
    return res;
  }


答案:代理、反射、自定义注解


  • 首先自定义注解,其中value为服务提供者名称

/**
 * 该注解用于注入远程服务
 */
@Target(ElementType.FIELD) // 方法注解
@Retention(RetentionPolicy.RUNTIME) // 运行时注解
public @interface RpcService {
  String value();//服务提供者名称
}


然后自定义BeanPostProcessor(Bean的生命周期知识点),目的是包装Bean,大体的逻辑为判断类里是否有自定义注解@RpcService,并获取注解的value,即需要调用的远程服务的名称;遍历获取标注该注解的属性,生成包含属性为(NettyClientBootStarp )的代理对象,然后注入到对象的该属性中,这样从逻辑上解决了 空指针异常


public class RcpServiceInjectBeanPostProcessor
    implements InstantiationAwareBeanPostProcessor, ApplicationContextAware {
  private ApplicationContext context;
  @Override
  public Object postProcessBeforeInitialization(Object bean, String beanName)
      throws BeansException {
    // 判断类里是否有@RpcService注解
    Class<?> clazz = context.getType(beanName);
    if (Objects.isNull(clazz)) {
      return bean;
    }
    Field[] declaredFields = clazz.getDeclaredFields();
    for (Field field : declaredFields) {
      // 找出标记了InjectService注解的属性
      RpcService injectService = field.getAnnotation(RpcService.class);
      if (injectService == null) {
        continue;
      }
      // 获取服务名称
      String providerName = injectService.value();
      // 获取接口Class
      Class<?> fieldClass = field.getType();
      // 获取nettyClient
      NettyClientBootStarp nettyClientBootStarp = context.getBean(NettyClientBootStarp.class);
      RpcFactoryProxy rpcFactoryProxy =
          new RpcFactoryProxy(fieldClass, providerName, nettyClientBootStarp);
      Object proxy = rpcFactoryProxy.getProxy();
      Object object = bean;
      field.setAccessible(true);
      try {
        // 请开始你的表演
        field.set(object, proxy);
      } catch (IllegalAccessException e) {
        e.printStackTrace();
      }
    }
    return bean;
  }
  @Override
  public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
    return bean;
  }
  @Override
  public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
    this.context = applicationContext;
  }
  public RcpServiceInjectBeanPostProcessor() {
    System.out.println("-----RcpServiceInjectBeanPostProcessor-----------");
  }
}


通过反射发送数据。


(1)invoke方法可以获取到调用的接口名称、方法名称和参数


(2)代理对象构造的时候传入了需要调用的服务名称和NettyClientBootStarp


(3)在invoke中把接口名称、方法名称和参数通过Netty发送给服务提供者是可以实现的


public class RpcFactoryProxy<T> implements InvocationHandler {
  private Class<T> proxyInterface;
  // 这里可以维护一个缓存,存这个接口的方法抽象的对象
  private NettyClientBootStarp nettyClientBootStarp;
  private String serviceName;
  public RpcFactoryProxy(
      Class<T> proxyInterface, String serviceName, NettyClientBootStarp nettyClient) {
    this.serviceName = serviceName;
    this.proxyInterface = proxyInterface;
    this.nettyClientBootStarp = nettyClient;
  }
  @Override
  public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
    System.out.println("invoke");
    Map<String, NettyClientGroup> providers = nettyClientBootStarp.getProviders();
    NettyClientGroup nettyClientGroup = providers.get(serviceName);
    if (null == nettyClientGroup) {
      RpcResponse response = RpcResponse.NO_SERVICE();
      return response.getReturnValue();
    }
    NettyClient nettyClient = nettyClientGroup.next();
    if (null == nettyClient) {
      RpcResponse response = RpcResponse.NO_SERVICE();
      return response.getReturnValue();
    }
    RpcRequest rpcRequest = new RpcRequest();
    rpcRequest.setRequestId(UUID.randomUUID().toString().substring(0, 8));
    // 设置服务名称
    rpcRequest.setServiceName(serviceName);
    // 设置是哪个类
    rpcRequest.setClazzName(proxyInterface.getName());
    // 设置哪个方法
    rpcRequest.setMethodName(method.getName());
    // 设置参数类型
    Class<?>[] parameterTypes = method.getParameterTypes();
    String[] parameterTypeString = Class2String.class2String(parameterTypes);
    rpcRequest.setParameterTypeStrings(parameterTypeString);
    // 设置参数
    rpcRequest.setParameters(args);
    // 发送消息
    RpcResponse response = nettyClient.sendMessage(rpcRequest);
    if (response == null) {
      response = RpcResponse.TIME_OUT(rpcRequest.getRequestId());
    }
    return response.getReturnValue();
  }
  public T getProxy() {
    return (T)
        Proxy.newProxyInstance(proxyInterface.getClassLoader(), new Class[] {proxyInterface}, this);
  }
}


Netty客户端的SimpleChannelInboundHandler都是打印数据,我怎么实现要调用后返回数据而不是打印数据呢?


首先自定义实现Future的实现类RpcFuture,注意这里面有一个CountDownLatch(1)


public class RpcFuture<T> implements Future<T> {
    private T response;
    /**
     * 因为请求和响应是一一对应的,所以这里是1
     */
    private CountDownLatch countDownLatch = new CountDownLatch(1);
    /**
     * 获取响应,直到有结果才返回
     * @return
     * @throws InterruptedException
     * @throws ExecutionException
     */
    @Override
    public T get() throws InterruptedException, ExecutionException {
        countDownLatch.await();
        return response;
    }
    @Override
    public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        if (countDownLatch.await(timeout,unit)){
            return response;
        }
        return null;
    }
    public void setResponse(T response) {
        this.response = response;
        countDownLatch.countDown();
    }
}


当发送数据的时候创建一个RcpFuture,然后把该RcpFuture存在一个地方FuturePool,此时调用RcpFuture.get方法是阻塞的(阻塞原因CountDownLatch ),如下所示


public RpcResponse sendMessage(RpcRequest msg) {
    // 存起来
    RpcFuture<RpcResponse> future = new RpcFuture<>();
    FuturePool.put(msg.getRequestId(), future);
    RpcResponse rpcResponse = null;
    try {
      String s = JSONUtil.toJsonStr(msg);
      f.channel().writeAndFlush(s);
      rpcResponse = future.get(2000, TimeUnit.MILLISECONDS);
    } catch (Exception e) {
      e.printStackTrace();
    } finally {
      FuturePool.remove(msg.getRequestId());
    }
    return rpcResponse;
  }


那么在打印的地方通过key获取RpcFuture ,然后把结果通过RpcFuture.setResponse方法设计进去,如下所示,上面就能返回结果了。


public class RpcClientHandler extends SimpleChannelInboundHandler<String> {
  @Override
  protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
    String resp = msg;
    System.out.println("resp:"+resp);
    RpcResponse rpcResponse = JSONUtil.toBean(resp, RpcResponse.class);
    RpcFuture future = FuturePool.get(rpcResponse.getRequestId());
    future.setResponse(rpcResponse);
  }


总结


1)一个小案例的实现要整合多个知识点,并且写了这篇晦涩难懂的文章,还是花了点时间的。


2)dubbo中使用的FactoryBean实现的远程调用,可以看看人家的思路。


3)学习到了很多东西,比如我现在才知道zk的监听机制只监听一次。


4)需要用到的知识点太多了,自定义注解,BeanPostProcessor,反射,Future等,是个学习Netty不错的案例。


5)新的一年,希望大家步步高升,一年比一年好。


目录
相关文章
|
10月前
|
负载均衡 Dubbo Java
Dubbo 3.x:探索阿里巴巴的开源RPC框架新技术
随着微服务架构的兴起,远程过程调用(RPC)框架成为了关键组件。Dubbo,作为阿里巴巴的开源RPC框架,已经演进到了3.x版本,带来了许多新特性和技术改进。本文将探讨Dubbo 3.x中的一些最新技术,包括服务注册与发现、负载均衡、服务治理等,并通过代码示例展示其使用方式。
519 9
|
10月前
|
JSON 负载均衡 网络协议
Rpc编程系列文章第二篇:RPC框架设计目标
Rpc编程系列文章第二篇:RPC框架设计目标
|
10月前
|
设计模式 负载均衡 网络协议
【分布式技术专题】「分布式技术架构」实践见真知,手把手教你如何实现一个属于自己的RPC框架(架构技术引导篇)
【分布式技术专题】「分布式技术架构」实践见真知,手把手教你如何实现一个属于自己的RPC框架(架构技术引导篇)
399 0
|
10月前
|
Dubbo Java 应用服务中间件
Rpc编程系列文章第三篇:Hessian RPC一个老的RPC框架
Rpc编程系列文章第三篇:Hessian RPC一个老的RPC框架
|
4月前
|
自然语言处理 负载均衡 API
gRPC 一种现代、开源、高性能的远程过程调用 (RPC) 可以在任何地方运行的框架
gRPC 是一种现代开源高性能远程过程调用(RPC)框架,支持多种编程语言,可在任何环境中运行。它通过高效的连接方式,支持负载平衡、跟踪、健康检查和身份验证,适用于微服务架构、移动设备和浏览器客户端连接后端服务等场景。gRPC 使用 Protocol Buffers 作为接口定义语言,支持四种服务方法:一元 RPC、服务器流式处理、客户端流式处理和双向流式处理。
|
7月前
|
Dubbo 网络协议 Java
RPC框架:一文带你搞懂RPC
这篇文章全面介绍了RPC(远程过程调用)的概念、原理和应用场景,解释了RPC如何工作以及为什么在分布式系统中广泛使用,并探讨了几种常用的RPC框架如Thrift、gRPC、Dubbo和Spring Cloud,同时详细阐述了RPC调用流程和实现透明化远程服务调用的关键技术,包括动态代理和消息的编码解码过程。
RPC框架:一文带你搞懂RPC
|
9月前
|
存储 缓存 Linux
【实战指南】嵌入式RPC框架设计实践:六大核心类构建高效RPC框架
在先前的文章基础上,本文讨论如何通过分层封装提升一个针对嵌入式Linux的RPC框架的易用性。设计包括自动服务注册、高性能通信、泛型序列化和简洁API。框架分为6个关键类:BindingHub、SharedRingBuffer、Parcel、Binder、IBinder和BindInterface。BindingHub负责服务注册,SharedRingBuffer实现高效数据传输,Parcel处理序列化,而Binder和IBinder分别用于服务端和客户端交互。BindInterface提供简单的初始化接口,简化应用集成。测试案例展示了客户端和服务端的交互,验证了RPC功能的有效性。
525 12
|
6月前
|
XML 负载均衡 监控
分布式-dubbo-简易版的RPC框架
分布式-dubbo-简易版的RPC框架
|
7月前
|
XML 存储 JSON
(十二)探索高性能通信与RPC框架基石:Json、ProtoBuf、Hessian序列化详解
如今这个分布式风靡的时代,网络通信技术,是每位技术人员必须掌握的技能,因为无论是哪种分布式技术,都离不开心跳、选举、节点感知、数据同步……等机制,而究其根本,这些技术的本质都是网络间的数据交互。正因如此,想要构建一个高性能的分布式组件/系统,不得不思考一个问题:怎么才能让数据传输的速度更快?
167 1
|
8月前
|
分布式计算 负载均衡 数据安全/隐私保护
什么是RPC?有哪些RPC框架?
RPC(Remote Procedure Call,远程过程调用)是一种允许运行在一台计算机上的程序调用另一台计算机上子程序的技术。这种技术屏蔽了底层的网络通信细节,使得程序间的远程通信如同本地调用一样简单。RPC机制使得开发者能够构建分布式计算系统,其中不同的组件可以分布在不同的计算机上,但它们之间可以像在同一台机器上一样相互调用。
200 8