Netty框架入门(二)之基于Netty实现简单的Rpc调用

本文涉及的产品
应用型负载均衡 ALB,每月750个小时 15LCU
网络型负载均衡 NLB,每月750个小时 15LCU
传统型负载均衡 CLB,每月750个小时 15LCU
简介: Netty框架入门(二)之基于Netty实现简单的Rpc调用

正文


一、思路原理


111.png


当启动服务提供者(Netty服务器),将服务注册到zookeeper上,也就是在zk上创建/xiaojie-dubbo/com.xiaojie.api.UserService/providers/xiaojie-dubbo://127.0.0.1:8080方法名称等信息。

服务发现,从zookeeper节点上获取到服务列表[xiaojie://127.0.0.1:8080...,xiaojie://127.0.0.1:8081....],然后通过轮询、权重、一致性hash,随机等负载均衡算法获取服务。

当有服务下线或者宕机之后,通知服务调用者(zk事件通知实现,我没有实现)服务下线或者宕机。

通过反射,获取实现类的方法,调用之后,消费者通过代理模式,代理出来客户端的类,进行方法的调用,然后通过netty中ChannelInboundHandlerAdapter的channelRead()将获取的数据写出去。

利用户Jboss Marshalling解码器MarshallingDecoder、MarshallingEnecoder进行解码。


二、代码


代码结构如下


111.png


自定义注解代码


package com.xiaojie.dubbo.annotation;
import java.lang.annotation.*;
/**
 * 自定义rpc注解
 */
@Documented
@Inherited
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface RpcAnnotation {
    Class value();
}


服务的注册


package com.xiaojie.dubbo.server.register;
import org.I0Itec.zkclient.ZkClient;
import java.net.URLEncoder;
/**
 * 将服务注册到zk上
 */
public class ServiceRegisterImpl  implements ServiceRegister{
    //定义zk地址
    private String zkServer="127.0.0.1:2181";
    //超时时间
    private Integer timeOut=5000;
    //zkclient
    private ZkClient zkClient;
    public ServiceRegisterImpl() {
        zkClient = new ZkClient(zkServer,timeOut);
    }
    //定义跟节点
    private  String rootNode="/xiaojie-dubbo";
    @Override
    public void register(String serviceName, String servicePath) {
        System.out.println("..............");
        if (!zkClient.exists(rootNode)){
            zkClient.createPersistent(rootNode); //xiaojie-dubbo
        }
        //rootNode+"/"+serviceName
        String serviceNameNode=rootNode+"/"+serviceName;
        if (!zkClient.exists(serviceNameNode)){
            zkClient.createPersistent(serviceNameNode); //xiaojie-dubbo/com.xiaojie.api.UserService
        }
        String providerNode=serviceNameNode+"/"+"providers";
        if (!zkClient.exists(providerNode)){
            zkClient.createPersistent(providerNode);//xiaojie-dubbo/com.xiaojie.api.UserService/providers
        }
        //创建我们服务地址
        String serviceAddresNodePath = providerNode + "/" + URLEncoder.encode(servicePath);
        System.out.println("serviceAddresNodePath:" + serviceAddresNodePath);
        if (zkClient.exists(serviceAddresNodePath)) {
            zkClient.delete(serviceAddresNodePath);
        }
        zkClient.createEphemeral(serviceAddresNodePath);//xiaojie-dubbo/com.xiaojie.api.UserService/providers/getName....
    }
}


服务绑定


package com.xiaojie.dubbo.server.rpc;
import com.xiaojie.dubbo.annotation.RpcAnnotation;
import com.xiaojie.dubbo.server.handler.ServerHandler;
import com.xiaojie.dubbo.server.marshalling.MarshallingCodeCFactory;
import com.xiaojie.dubbo.server.register.ServiceRegister;
import com.xiaojie.dubbo.server.register.ServiceRegisterImpl;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import java.util.HashMap;
import java.util.Map;
public class NettyServer {
    /**
     * 核心过程是将服务绑定到zk
     * 启动netty服务实现监听
     */
    private ServiceRegister serviceRegister;
    //host
    private String host;
    //端口号
    private Integer port;
    private Map<String,Object> handlerMap=new HashMap<>();
    public NettyServer( String host, Integer port) {
        this.host = host;
        this.port = port;
        serviceRegister = new ServiceRegisterImpl();
    }
    public  void bind(Object obj){
    //获取类上注解
        RpcAnnotation declaredAnnotation = obj.getClass().getDeclaredAnnotation(RpcAnnotation.class);
        if (declaredAnnotation == null) {
            throw  new RuntimeException("declaredAnnotation is null");
        }
        //获取名称,也就是serviceName
        String serviceName = declaredAnnotation.value().getName();
        String serviceAddr="xiaojie://"+host+":"+port;
        serviceRegister.register(serviceName,serviceAddr);
        handlerMap.put(serviceName,obj);
    }
    /**
     * 创建netty服务端,开启监听
     */
    public void initNetty(){
        //创建线程接收请求
        NioEventLoopGroup bossGroup = new NioEventLoopGroup();
        //创建线程处理请求
        NioEventLoopGroup workGroup = new NioEventLoopGroup();
        ServerBootstrap serverBootstrap=new ServerBootstrap();
        serverBootstrap.group(bossGroup,workGroup).channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        socketChannel.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
                        socketChannel.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
                        socketChannel.pipeline().addLast(new ServerHandler(handlerMap));
                    }
                });
        //绑定端口号
        try {
            ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
            //等待服务器监听端口
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            //关闭
            bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
        }
    }
    //启动
    public  void start(Object obj){
        bind(obj);
        initNetty();
    }
}


服务发现


package com.xiaojie.dubbo.server.discover;
import org.I0Itec.zkclient.ZkClient;
import java.util.List;
public class ServiceDiscoverImpl  implements  ServiceDiscover{
    //定义zk地址
    private String zkServer="127.0.0.1:2181";
    //超时时间
    private Integer timeOut=5000;
    //zkclient
    private ZkClient zkClient;
    public ServiceDiscoverImpl() {
        zkClient = new ZkClient(zkServer,timeOut);
    }
    //定义跟节点
    private  String rootNode="/xiaojie-dubbo";
    @Override
    public List<String> discoverList(String serviceName) {
        List<String> children = zkClient.getChildren(rootNode + "/" + serviceName + "/" + "providers");
        return children;
    }
}


传送Request的实体类


package com.xiaojie.dubbo.server.req;
import java.io.Serializable;
public class RpcRequest implements Serializable {
    private static final long SerialVersionUID = 1L;
    /**
     * 类的className
     */
    private String className;
    /**
     * 方法名称
     */
    private String methodName;
    /**
     * 参数类型
     */
    Class<?> parameterTypes[];
    /**
     * 参数value
     */
    Object paramsValue[];
    public RpcRequest(String className, String methodName, Class<?>[] parameterTypes, Object[] paramsValue) {
        this.className = className;
        this.methodName = methodName;
        this.parameterTypes = parameterTypes;
        this.paramsValue = paramsValue;
    }
    public String getClassName() {
        return className;
    }
    public String getMethodName() {
        return methodName;
    }
    public Class<?>[] getParameterTypes() {
        return parameterTypes;
    }
    public Object[] getParamsValue() {
        return paramsValue;
    }
    public void setClassName(String className) {
        this.className = className;
    }
    public void setMethodName(String methodName) {
        this.methodName = methodName;
    }
    public void setParameterTypes(Class<?>[] parameterTypes) {
        this.parameterTypes = parameterTypes;
    }
    public void setParamsValue(Object[] paramsValue) {
        this.paramsValue = paramsValue;
    }
    @Override
    public String toString() {
        return className + "," + methodName + "," + parameterTypes + paramsValue;
    }
}


通过反射执行方法


package com.xiaojie.dubbo.server.handler;
import com.xiaojie.dubbo.server.req.RpcRequest;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.lang.reflect.Method;
import java.util.Map;
public class ServerHandler  extends ChannelInboundHandlerAdapter {
    private Map<String,Object> handlerMap;
    public ServerHandler(Map<String, Object> handlerMap) {
        this.handlerMap = handlerMap;
    }
    @Override
    /**
     * 服务器端监听客户端的消息
     */
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        RpcRequest req=(RpcRequest) msg;
        if (req==null){
            throw  new RuntimeException("req is null");
        }
        //获取serviceName 接口路径
        String serviceName = req.getClassName();
        //实现类
        Object objectImpl = handlerMap.get(serviceName);
        if (objectImpl == null) {
            throw  new RuntimeException("service is not exist");
        }
        //反射获取方法
        Method method = objectImpl.getClass().getMethod(req.getMethodName(), req.getParameterTypes());
        //执行实现类方法
        Object result = method.invoke(objectImpl,req.getParamsValue());
        //返回结果给客户端
        ctx.writeAndFlush(result);
    }
}


客户端通过代理模式执行方法


package com.xiaojie.dubbo.server.proxy;
import com.xiaojie.dubbo.server.discover.ServiceDiscover;
import com.xiaojie.dubbo.server.discover.ServiceDiscoverImpl;
import com.xiaojie.dubbo.server.handler.ClientHandler;
import com.xiaojie.dubbo.server.loadBalance.LoadBalance;
import com.xiaojie.dubbo.server.loadBalance.LoopBalance;
import com.xiaojie.dubbo.server.loadBalance.RandomBalance;
import com.xiaojie.dubbo.server.loadBalance.WeightBalance;
import com.xiaojie.dubbo.server.marshalling.MarshallingCodeCFactory;
import com.xiaojie.dubbo.server.req.RpcRequest;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URLDecoder;
import java.util.List;
public class RpcClientProxy {
    public  static  <T> T create(Class<T> interfaceClass){
        return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class[]{interfaceClass}, new InvocationHandler() {
            @Override
            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                //使用代理拼接地址
                ServiceDiscover serviceDiscover=new ServiceDiscoverImpl();
                List<String> strings = serviceDiscover.discoverList(interfaceClass.getName());
                LoadBalance loadBalance=new RandomBalance();
                String servicePath = URLDecoder.decode((String) loadBalance.select(strings));//[xiaojie-dubbo://192.168.1.110:8080,xiaojie-dubbo://192.168.1.110:8081]
                String[] split = servicePath.split(":");
                String host=split[1].replace("//","");
                Integer port= Integer.valueOf(split[2]);
                //封装具体参数
                RpcRequest rpcRequest = new RpcRequest(interfaceClass.getName(), method.getName(), method.getParameterTypes(), args);
                //启动客户端,发送结果
                return  sendMsg(host,port,rpcRequest);
            }
        });
    }
    /**
     * 客户端发送消息
     * @return
     */
    public static Object sendMsg(String host, Integer port, RpcRequest rpcRequest){
        final ClientHandler clientHandler = new ClientHandler();
        NioEventLoopGroup group = new NioEventLoopGroup();
        Bootstrap bootstrap=new Bootstrap();
        bootstrap.group(group).channel(NioSocketChannel.class)
                .remoteAddress( new InetSocketAddress( host,port))
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        socketChannel.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
                        socketChannel.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
                            socketChannel.pipeline().addLast(clientHandler);
                    }
                });
        //发起同步连接
        try {
            ChannelFuture channelFuture = bootstrap.connect().sync();
            //客户端发送
            channelFuture.channel().writeAndFlush(rpcRequest);
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            group.shutdownGracefully();
        }
        return  clientHandler.getResponse();
    }
}


完整代码https://gitee.com/whisperofjune/netty-dubbo-rpc.git

参考:蚂蚁课堂的netty部分

相关实践学习
小试牛刀,一键部署电商商城
SAE 仅需一键,极速部署一个微服务电商商城,体验 Serverless 带给您的全托管体验,一起来部署吧!
负载均衡入门与产品使用指南
负载均衡(Server Load Balancer)是对多台云服务器进行流量分发的负载均衡服务,可以通过流量分发扩展应用系统对外的服务能力,通过消除单点故障提升应用系统的可用性。 本课程主要介绍负载均衡的相关技术以及阿里云负载均衡产品的使用方法。
相关文章
|
7月前
|
NoSQL 前端开发 Java
Lettuce的特性和内部实现问题之Lettuce基于Netty框架实现的问题如何解决
Lettuce的特性和内部实现问题之Lettuce基于Netty框架实现的问题如何解决
125 0
|
6月前
|
编解码 分布式计算 网络协议
Netty高性能网络框架(一)
Netty高性能网络框架(一)
|
4月前
|
消息中间件 编解码 网络协议
Netty从入门到精通:高性能网络编程的进阶之路
【11月更文挑战第17天】Netty是一个基于Java NIO(Non-blocking I/O)的高性能、异步事件驱动的网络应用框架。使用Netty,开发者可以快速、高效地开发可扩展的网络服务器和客户端程序。本文将带您从Netty的背景、业务场景、功能点、解决问题的关键、底层原理实现,到编写一个详细的Java示例,全面了解Netty,帮助您从入门到精通。
426 0
|
4月前
|
自然语言处理 负载均衡 API
gRPC 一种现代、开源、高性能的远程过程调用 (RPC) 可以在任何地方运行的框架
gRPC 是一种现代开源高性能远程过程调用(RPC)框架,支持多种编程语言,可在任何环境中运行。它通过高效的连接方式,支持负载平衡、跟踪、健康检查和身份验证,适用于微服务架构、移动设备和浏览器客户端连接后端服务等场景。gRPC 使用 Protocol Buffers 作为接口定义语言,支持四种服务方法:一元 RPC、服务器流式处理、客户端流式处理和双向流式处理。
|
5月前
|
NoSQL Java Redis
redis的基本命令,并用netty操作redis(不使用springboot或者spring框架)就单纯的用netty搞。
这篇文章介绍了Redis的基本命令,并展示了如何使用Netty框架直接与Redis服务器进行通信,包括设置Netty客户端、编写处理程序以及初始化Channel的完整示例代码。
125 1
redis的基本命令,并用netty操作redis(不使用springboot或者spring框架)就单纯的用netty搞。
|
7月前
|
Dubbo 网络协议 Java
RPC框架:一文带你搞懂RPC
这篇文章全面介绍了RPC(远程过程调用)的概念、原理和应用场景,解释了RPC如何工作以及为什么在分布式系统中广泛使用,并探讨了几种常用的RPC框架如Thrift、gRPC、Dubbo和Spring Cloud,同时详细阐述了RPC调用流程和实现透明化远程服务调用的关键技术,包括动态代理和消息的编码解码过程。
RPC框架:一文带你搞懂RPC
|
6月前
|
设计模式 缓存 算法
Netty框架的重要性
Netty框架的重要性
|
6月前
|
XML 负载均衡 监控
分布式-dubbo-简易版的RPC框架
分布式-dubbo-简易版的RPC框架
|
7月前
|
前端开发 Java Spring
springboot 整合 netty框架, 实现 心跳检测,自动重连
springboot 整合 netty框架, 实现 心跳检测,自动重连
|
存储 缓存 NoSQL
跟着源码学IM(十一):一套基于Netty的分布式高可用IM详细设计与实现(有源码)
本文将要分享的是如何从零实现一套基于Netty框架的分布式高可用IM系统,它将支持长连接网关管理、单聊、群聊、聊天记录查询、离线消息存储、消息推送、心跳、分布式唯一ID、红包、消息同步等功能,并且还支持集群部署。
13594 1