Netty框架入门(二)之基于Netty实现简单的Rpc调用

简介: Netty框架入门(二)之基于Netty实现简单的Rpc调用

正文


一、思路原理


111.png


当启动服务提供者(Netty服务器),将服务注册到zookeeper上,也就是在zk上创建/xiaojie-dubbo/com.xiaojie.api.UserService/providers/xiaojie-dubbo://127.0.0.1:8080方法名称等信息。

服务发现,从zookeeper节点上获取到服务列表[xiaojie://127.0.0.1:8080...,xiaojie://127.0.0.1:8081....],然后通过轮询、权重、一致性hash,随机等负载均衡算法获取服务。

当有服务下线或者宕机之后,通知服务调用者(zk事件通知实现,我没有实现)服务下线或者宕机。

通过反射,获取实现类的方法,调用之后,消费者通过代理模式,代理出来客户端的类,进行方法的调用,然后通过netty中ChannelInboundHandlerAdapter的channelRead()将获取的数据写出去。

利用户Jboss Marshalling解码器MarshallingDecoder、MarshallingEnecoder进行解码。


二、代码


代码结构如下


111.png


自定义注解代码


package com.xiaojie.dubbo.annotation;
import java.lang.annotation.*;
/**
 * 自定义rpc注解
 */
@Documented
@Inherited
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface RpcAnnotation {
    Class value();
}


服务的注册


package com.xiaojie.dubbo.server.register;
import org.I0Itec.zkclient.ZkClient;
import java.net.URLEncoder;
/**
 * 将服务注册到zk上
 */
public class ServiceRegisterImpl  implements ServiceRegister{
    //定义zk地址
    private String zkServer="127.0.0.1:2181";
    //超时时间
    private Integer timeOut=5000;
    //zkclient
    private ZkClient zkClient;
    public ServiceRegisterImpl() {
        zkClient = new ZkClient(zkServer,timeOut);
    }
    //定义跟节点
    private  String rootNode="/xiaojie-dubbo";
    @Override
    public void register(String serviceName, String servicePath) {
        System.out.println("..............");
        if (!zkClient.exists(rootNode)){
            zkClient.createPersistent(rootNode); //xiaojie-dubbo
        }
        //rootNode+"/"+serviceName
        String serviceNameNode=rootNode+"/"+serviceName;
        if (!zkClient.exists(serviceNameNode)){
            zkClient.createPersistent(serviceNameNode); //xiaojie-dubbo/com.xiaojie.api.UserService
        }
        String providerNode=serviceNameNode+"/"+"providers";
        if (!zkClient.exists(providerNode)){
            zkClient.createPersistent(providerNode);//xiaojie-dubbo/com.xiaojie.api.UserService/providers
        }
        //创建我们服务地址
        String serviceAddresNodePath = providerNode + "/" + URLEncoder.encode(servicePath);
        System.out.println("serviceAddresNodePath:" + serviceAddresNodePath);
        if (zkClient.exists(serviceAddresNodePath)) {
            zkClient.delete(serviceAddresNodePath);
        }
        zkClient.createEphemeral(serviceAddresNodePath);//xiaojie-dubbo/com.xiaojie.api.UserService/providers/getName....
    }
}


服务绑定


package com.xiaojie.dubbo.server.rpc;
import com.xiaojie.dubbo.annotation.RpcAnnotation;
import com.xiaojie.dubbo.server.handler.ServerHandler;
import com.xiaojie.dubbo.server.marshalling.MarshallingCodeCFactory;
import com.xiaojie.dubbo.server.register.ServiceRegister;
import com.xiaojie.dubbo.server.register.ServiceRegisterImpl;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import java.util.HashMap;
import java.util.Map;
public class NettyServer {
    /**
     * 核心过程是将服务绑定到zk
     * 启动netty服务实现监听
     */
    private ServiceRegister serviceRegister;
    //host
    private String host;
    //端口号
    private Integer port;
    private Map<String,Object> handlerMap=new HashMap<>();
    public NettyServer( String host, Integer port) {
        this.host = host;
        this.port = port;
        serviceRegister = new ServiceRegisterImpl();
    }
    public  void bind(Object obj){
    //获取类上注解
        RpcAnnotation declaredAnnotation = obj.getClass().getDeclaredAnnotation(RpcAnnotation.class);
        if (declaredAnnotation == null) {
            throw  new RuntimeException("declaredAnnotation is null");
        }
        //获取名称,也就是serviceName
        String serviceName = declaredAnnotation.value().getName();
        String serviceAddr="xiaojie://"+host+":"+port;
        serviceRegister.register(serviceName,serviceAddr);
        handlerMap.put(serviceName,obj);
    }
    /**
     * 创建netty服务端,开启监听
     */
    public void initNetty(){
        //创建线程接收请求
        NioEventLoopGroup bossGroup = new NioEventLoopGroup();
        //创建线程处理请求
        NioEventLoopGroup workGroup = new NioEventLoopGroup();
        ServerBootstrap serverBootstrap=new ServerBootstrap();
        serverBootstrap.group(bossGroup,workGroup).channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        socketChannel.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
                        socketChannel.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
                        socketChannel.pipeline().addLast(new ServerHandler(handlerMap));
                    }
                });
        //绑定端口号
        try {
            ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
            //等待服务器监听端口
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            //关闭
            bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
        }
    }
    //启动
    public  void start(Object obj){
        bind(obj);
        initNetty();
    }
}


服务发现


package com.xiaojie.dubbo.server.discover;
import org.I0Itec.zkclient.ZkClient;
import java.util.List;
public class ServiceDiscoverImpl  implements  ServiceDiscover{
    //定义zk地址
    private String zkServer="127.0.0.1:2181";
    //超时时间
    private Integer timeOut=5000;
    //zkclient
    private ZkClient zkClient;
    public ServiceDiscoverImpl() {
        zkClient = new ZkClient(zkServer,timeOut);
    }
    //定义跟节点
    private  String rootNode="/xiaojie-dubbo";
    @Override
    public List<String> discoverList(String serviceName) {
        List<String> children = zkClient.getChildren(rootNode + "/" + serviceName + "/" + "providers");
        return children;
    }
}


传送Request的实体类


package com.xiaojie.dubbo.server.req;
import java.io.Serializable;
public class RpcRequest implements Serializable {
    private static final long SerialVersionUID = 1L;
    /**
     * 类的className
     */
    private String className;
    /**
     * 方法名称
     */
    private String methodName;
    /**
     * 参数类型
     */
    Class<?> parameterTypes[];
    /**
     * 参数value
     */
    Object paramsValue[];
    public RpcRequest(String className, String methodName, Class<?>[] parameterTypes, Object[] paramsValue) {
        this.className = className;
        this.methodName = methodName;
        this.parameterTypes = parameterTypes;
        this.paramsValue = paramsValue;
    }
    public String getClassName() {
        return className;
    }
    public String getMethodName() {
        return methodName;
    }
    public Class<?>[] getParameterTypes() {
        return parameterTypes;
    }
    public Object[] getParamsValue() {
        return paramsValue;
    }
    public void setClassName(String className) {
        this.className = className;
    }
    public void setMethodName(String methodName) {
        this.methodName = methodName;
    }
    public void setParameterTypes(Class<?>[] parameterTypes) {
        this.parameterTypes = parameterTypes;
    }
    public void setParamsValue(Object[] paramsValue) {
        this.paramsValue = paramsValue;
    }
    @Override
    public String toString() {
        return className + "," + methodName + "," + parameterTypes + paramsValue;
    }
}


通过反射执行方法


package com.xiaojie.dubbo.server.handler;
import com.xiaojie.dubbo.server.req.RpcRequest;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.lang.reflect.Method;
import java.util.Map;
public class ServerHandler  extends ChannelInboundHandlerAdapter {
    private Map<String,Object> handlerMap;
    public ServerHandler(Map<String, Object> handlerMap) {
        this.handlerMap = handlerMap;
    }
    @Override
    /**
     * 服务器端监听客户端的消息
     */
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        RpcRequest req=(RpcRequest) msg;
        if (req==null){
            throw  new RuntimeException("req is null");
        }
        //获取serviceName 接口路径
        String serviceName = req.getClassName();
        //实现类
        Object objectImpl = handlerMap.get(serviceName);
        if (objectImpl == null) {
            throw  new RuntimeException("service is not exist");
        }
        //反射获取方法
        Method method = objectImpl.getClass().getMethod(req.getMethodName(), req.getParameterTypes());
        //执行实现类方法
        Object result = method.invoke(objectImpl,req.getParamsValue());
        //返回结果给客户端
        ctx.writeAndFlush(result);
    }
}


客户端通过代理模式执行方法


package com.xiaojie.dubbo.server.proxy;
import com.xiaojie.dubbo.server.discover.ServiceDiscover;
import com.xiaojie.dubbo.server.discover.ServiceDiscoverImpl;
import com.xiaojie.dubbo.server.handler.ClientHandler;
import com.xiaojie.dubbo.server.loadBalance.LoadBalance;
import com.xiaojie.dubbo.server.loadBalance.LoopBalance;
import com.xiaojie.dubbo.server.loadBalance.RandomBalance;
import com.xiaojie.dubbo.server.loadBalance.WeightBalance;
import com.xiaojie.dubbo.server.marshalling.MarshallingCodeCFactory;
import com.xiaojie.dubbo.server.req.RpcRequest;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URLDecoder;
import java.util.List;
public class RpcClientProxy {
    public  static  <T> T create(Class<T> interfaceClass){
        return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class[]{interfaceClass}, new InvocationHandler() {
            @Override
            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                //使用代理拼接地址
                ServiceDiscover serviceDiscover=new ServiceDiscoverImpl();
                List<String> strings = serviceDiscover.discoverList(interfaceClass.getName());
                LoadBalance loadBalance=new RandomBalance();
                String servicePath = URLDecoder.decode((String) loadBalance.select(strings));//[xiaojie-dubbo://192.168.1.110:8080,xiaojie-dubbo://192.168.1.110:8081]
                String[] split = servicePath.split(":");
                String host=split[1].replace("//","");
                Integer port= Integer.valueOf(split[2]);
                //封装具体参数
                RpcRequest rpcRequest = new RpcRequest(interfaceClass.getName(), method.getName(), method.getParameterTypes(), args);
                //启动客户端,发送结果
                return  sendMsg(host,port,rpcRequest);
            }
        });
    }
    /**
     * 客户端发送消息
     * @return
     */
    public static Object sendMsg(String host, Integer port, RpcRequest rpcRequest){
        final ClientHandler clientHandler = new ClientHandler();
        NioEventLoopGroup group = new NioEventLoopGroup();
        Bootstrap bootstrap=new Bootstrap();
        bootstrap.group(group).channel(NioSocketChannel.class)
                .remoteAddress( new InetSocketAddress( host,port))
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        socketChannel.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
                        socketChannel.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
                            socketChannel.pipeline().addLast(clientHandler);
                    }
                });
        //发起同步连接
        try {
            ChannelFuture channelFuture = bootstrap.connect().sync();
            //客户端发送
            channelFuture.channel().writeAndFlush(rpcRequest);
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            group.shutdownGracefully();
        }
        return  clientHandler.getResponse();
    }
}


完整代码https://gitee.com/whisperofjune/netty-dubbo-rpc.git

参考:蚂蚁课堂的netty部分

相关实践学习
部署高可用架构
本场景主要介绍如何使用云服务器ECS、负载均衡SLB、云数据库RDS和数据传输服务产品来部署多可用区高可用架构。
负载均衡入门与产品使用指南
负载均衡(Server Load Balancer)是对多台云服务器进行流量分发的负载均衡服务,可以通过流量分发扩展应用系统对外的服务能力,通过消除单点故障提升应用系统的可用性。 本课程主要介绍负载均衡的相关技术以及阿里云负载均衡产品的使用方法。
相关文章
|
1月前
|
缓存 网络协议 算法
Netty的基础入门(上)
Netty的基础入门(上)
87 0
|
3月前
|
负载均衡 Dubbo Java
Dubbo 3.x:探索阿里巴巴的开源RPC框架新技术
随着微服务架构的兴起,远程过程调用(RPC)框架成为了关键组件。Dubbo,作为阿里巴巴的开源RPC框架,已经演进到了3.x版本,带来了许多新特性和技术改进。本文将探讨Dubbo 3.x中的一些最新技术,包括服务注册与发现、负载均衡、服务治理等,并通过代码示例展示其使用方式。
85 9
|
3月前
|
缓存 网络协议 算法
《跟闪电侠学Netty》阅读笔记 - Netty入门程序解析
《跟闪电侠学Netty》阅读笔记 - Netty入门程序解析
143 0
|
存储 设计模式 网络协议
Netty网络框架(一)
Netty网络框架
40 1
|
2月前
|
前端开发 Java 数据库连接
探索Java中最常用的框架:Spring、Spring MVC、Spring Boot、MyBatis和Netty
探索Java中最常用的框架:Spring、Spring MVC、Spring Boot、MyBatis和Netty
|
3月前
|
XML JSON Java
RPC框架之Thrift—实现Go和Java远程过程调用
RPC框架之Thrift—实现Go和Java远程过程调用
50 1
|
3月前
|
Java Spring
Spring Boot+Netty实现远程过程调用(RPC)
Spring Boot+Netty实现远程过程调用(RPC)
73 0
|
3月前
|
前端开发 Java 数据库连接
认识Java中最常用的框架:Spring、Spring MVC、Spring Boot、MyBatis和Netty
Spring框架 Spring是一个轻量级的开源框架,用于构建企业级应用。它提供了广泛的功能,包括依赖注入、面向切面编程、事务管理、消息传递等。Spring的核心思想是控制反转(IoC)和面向切面编程(AOP)。
85 3
|
3月前
|
消息中间件 缓存 Java
《跟闪电侠学Netty》阅读笔记 - 开篇入门Netty
《跟闪电侠学Netty》阅读笔记 - 开篇入门Netty
99 0
|
4月前
|
负载均衡 Java 调度
【分布式技术专题】「探索高性能远程通信」基于Netty的分布式通信框架实现(Dispatcher和EventListener)(下)
经过阅读《【分布式技术专题】「探索高性能远程通信」基于Netty的分布式通信框架实现(附通信协议和代码)(上)》,相信您已经对网络通信框架的网络通信层的实现原理和协议模型有了一定的认识和理解。
43 0
【分布式技术专题】「探索高性能远程通信」基于Netty的分布式通信框架实现(Dispatcher和EventListener)(下)