08、Netty学习笔记—(基于聊天业务:RPC实现)

简介: 08、Netty学习笔记—(基于聊天业务:RPC实现)

前言


源码地址:netty-learn,目录下的netty-聊天室项目,该rpc调用实现是基于聊天室的协议及编解码器处理设定之上。



踩坑点


1、使用gson进行json序列化时,无法序列化反序列化 Class 类型,报错!

问题复现


System.out.println(new Gson().toJson(String.class));



解决方案


针对于Class类型进行自定义适配器:


class ClassCodec implements JsonSerializer<Class<?>>, JsonDeserializer<Class<?>> {
    @Override
    public Class<?> deserialize(JsonElement json, Type typeOfT, JsonDeserializationContext context) throws JsonParseException {
        try {
            String str = json.getAsString();
            return Class.forName(str);
        } catch (ClassNotFoundException e) {
            throw new JsonParseException(e);
        }
    }
    @Override             //   String.class
    public JsonElement serialize(Class<?> src, Type typeOfSrc, JsonSerializationContext context) {
        // class -> json
        return new JsonPrimitive(src.getName());
    }
}



在gson实例化时将其添加入其中:


public enum SerializerAlgorithm implements Serializer{
    Json{
        @Override
        public <T> T deserialize(Class<T> clazz, byte[] bytes) {
            Gson gson = new GsonBuilder().registerTypeAdapter(Class.class, new SerializerAlgorithm.ClassCodec()).create();//注册!
            String json = new String(bytes, StandardCharsets.UTF_8);
            return gson.fromJson(json, clazz);
        }
        @Override
        public <T> byte[] serialize(T object) {
            Gson gson = new GsonBuilder().registerTypeAdapter(Class.class, new SerializerAlgorithm.ClassCodec()).create();
            String json = gson.toJson(object);
            return json.getBytes(StandardCharsets.UTF_8);
        }
    };
}



一、RPC实现细节简述(直击要点)


若是有网络通讯的需求,那么就可以使用netty。对于RPC框架、消息队列的底层就是使用的netty。


简化RPC框架实现部分技术点说明(netty应用)


1、请求、响应对象封装来进行RPC调用,最终返回结果、异常信息。


①处理半包、黏包问题(LTC解码器,参数根据自定义的协议);②实现编解码器(根据指定的协议进行编解码操作,MessageToMessageCodec<ByteBuf, ?>);③编写能够处理指定特定感兴趣的某个请求、响应对象handler进行单独处理(SimpleChannelInboundHandler<?>)

2、针对于所需要远程调用的方法想要像调用本地方法一样,需要实现一个代理类来取到指定要调用的类、方法、请求参数…封装成请求对象通过netty的channel(单例)发送给服务器!


①JDK动态代理;②单例模式(双重检测锁)获取channel。

3、客户端接收:


①调用方法(间接动态代理执行)时肯定是主线程来进行执行的,而响应接收到数据是Eventloop中的线程来使用handler进行处理。那么调用方法线程如何取到进行响应对象处理线程拿到的结果值呢?这里的话就涉及到线程通信了。


解:通过使用netty中的promise来进行处理线程通信问题(promise提供了阻塞等待,多个线程中取到容器值的api)。

②若是进行多次远程调用方法,如何使调用方法的线程来准确的取到其他线程处理响应请求中的值?


解:通过一个分布式id(使用一个并发计数器,确保唯一),对于promise保存可以使用一个map(key为分布式id,value为promise),请求对象中包含该id,服务器响应时,将id再次封装到响应对象里,接着客户端在接收响应时可以拿到id,由此再根据该id取到map中的指定primise对象(取promise时为空间最大化可以直接调用remove取)。

③远程调用的方法怎么才能知道何时得到响应?


解:使用promise的await()来阻塞等待promise取到结果,一旦在响应对象接收到时来对promise进行setSuccess或setFailure就可以调用线程阻塞结束并继续向下执行,通过isSuccess()…API来间接根据结果进行相应操作!

4、服务端异常调用处理情况:远程调用方法若是出现异常应该将完整的错误信息可以记录到日志中,而不是直接一股脑的发给客户端,传回客户端的应该是异常的简略信息,使用一个Exception来封装一下简略异常描述返回即可。


扩展:


企业里一般都是把API接口单独作为一个二方包,其他人要调用你的方法时,只需要引用你的二方包即可。

泛型使用通配符?只能取,不能存,这是语法上的要求(所以若是要存就设置Object),但是有个特例就是null可以放!


二、具体代码实现


Message

说明:某个业务的请求、响应对象继承该Message抽象类。


import com.changlu.message.rpc.RpcRequestMessage;
import com.changlu.message.rpc.RpcResponseMessage;
import lombok.Data;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
@Data
public abstract class Message implements Serializable {
    public static Class<?> getMessageClass(int messageType) {
        return messageClasses.get(messageType);
    }
    private int sequenceId;//序列号id(唯一)
    private int messageType;
    public abstract int getMessageType();
    //...
    public static final int RPC_MESSAGE_TYPE_REQUEST = 101;
    public static final int RPC_MESSAGE_TYPE_RESPONSE = 102;
    private static final Map<Integer, Class<?>> messageClasses = new HashMap<>();
    static {
  //...
        messageClasses.put(RPC_MESSAGE_TYPE_REQUEST, RpcRequestMessage.class);
        messageClasses.put(RPC_MESSAGE_TYPE_RESPONSE, RpcResponseMessage.class);
    }
}


客户端


RpcRequestMessage

说明:进行远程调用时传输的请求对象封装。


import com.changlu.message.Message;
import lombok.Data;
import lombok.ToString;
/**
 * @ClassName RpcRequestMessage
 * @Author ChangLu
 * @Date 2022/1/16 21:38
 * @Description RPC消息请求对象
 */
@Data
@ToString(callSuper = true)
public class RpcRequestMessage extends Message {
    /**
     * 接口全限定名
     */
    private String interfaceName;
    /**
     * 调用接口的方法名
     */
    private String methodName;
    /**
     * 方法返回类型
     */
    private Class<?> returnType;
    /**
     * 方法参数类型数组
     */
    private Class[] parameterTypes;
    /**
     * 方法调用参数值
     */
    private Object[] parameterValue;
    public RpcRequestMessage(int sequenceId,String interfaceName, String methodName, Class<?> returnType, Class[] parameterTypes, Object[] parameterValue) {
        super.setSequenceId(sequenceId);
        this.interfaceName = interfaceName;
        this.methodName = methodName;
        this.returnType = returnType;
        this.parameterTypes = parameterTypes;
        this.parameterValue = parameterValue;
    }
    @Override
    public int getMessageType() {
        return RPC_MESSAGE_TYPE_REQUEST;
    }
}


RpcResponseMessageHandler

说明:针对于服务器响应时封装的RpcResponseMessage消息处理handler。


import com.changlu.message.rpc.RpcResponseMessage;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.concurrent.Promise;
import lombok.extern.slf4j.Slf4j;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
 * @ClassName RpcResponseMessageHandler
 * @Author ChangLu
 * @Date 2022/1/17 19:22
 * @Description 针对于处理RpcResponseMessage的handler
 */
@Slf4j
@ChannelHandler.Sharable
public class RpcResponseMessageHandler extends SimpleChannelInboundHandler<RpcResponseMessage> {
    public static Map<Integer, Promise> promiseMap = new ConcurrentHashMap<>();
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, RpcResponseMessage msg) throws Exception {
        log.debug("{}", msg);
        final Object returnValue = msg.getReturnValue();
        final Exception ex = msg.getExceptionValue();
        //根据id取出指定的promise
        final Promise<Object> promise = promiseMap.remove(msg.getSequenceId());
        if (ex == null){
            promise.setSuccess(returnValue);
        }else{
            promise.setFailure(ex);
        }
    }
}


Client

说明:建立连接及初始化channel,JDK动态代理实现。


import com.changlu.message.rpc.RpcRequestMessage;
import com.changlu.protocol.MessageCodecSharable;
import com.changlu.protocol.ProcotolFrameDecoder;
import com.changlu.server.handler.rpc.RpcResponseMessageHandler;
import com.changlu.server.service.rpc.RpcService;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import lombok.extern.slf4j.Slf4j;
import java.lang.reflect.Proxy;
import java.util.concurrent.atomic.AtomicInteger;
/**
 * @ClassName RpcClient
 * @Author ChangLu
 * @Date 2022/1/16 21:45
 * @Description RPC远程调用客户端
 */
@Slf4j
public class RpcClient {
    private static volatile Channel channel = null;
    private static Object lock = new Object();
    //并发计数器
    private static AtomicInteger seqId = new AtomicInteger(0);
    private static <T> T getProxyService(Class<T> clazz) {
        ClassLoader classLoader = clazz.getClassLoader();
        Class[] classes = {clazz};
        return (T)Proxy.newProxyInstance(classLoader, classes, (proxy, method, args)->{
            RpcRequestMessage message = new RpcRequestMessage(
                    seqId.getAndIncrement(),
                    clazz.getName(),  //示例:com.changlu.server.service.rpc.RpcService
                    method.getName(), //sayHello
                    method.getReturnType(),  //String.class
                    method.getParameterTypes(), //new Class[]{String.class}
                    args  //new Object[]{"changlu"}
            );
            //用于其他线程与当前主线程进行通信
            final DefaultPromise<Object> promise = new DefaultPromise<>(getChannel().eventLoop());
            RpcResponseMessageHandler.promiseMap.put(message.getSequenceId(), promise);
            //远程调用(获取channel方式是通过单例模式来获取)
            getChannel().writeAndFlush(message);
            promise.await();//阻塞等待
            if (promise.isSuccess()) {
                return promise.getNow();
            }else{
                throw new RuntimeException((Exception)promise.getNow());
            }
        });
    }
    /**
     * 单例(懒汉,双重检测锁):获取channel
     * @return
     */
    public static Channel getChannel(){
        if (channel != null) {
            return channel;
        }
        synchronized (lock) {
            if (channel != null){
                return channel;
            }
            initChannel();//初始化channel
            return channel;
        }
    }
    /**
     * 初始化channel
     */
    private static void initChannel() {
        MessageCodecSharable messageCodec = new MessageCodecSharable();
        LoggingHandler loggingHandler = new LoggingHandler(LogLevel.DEBUG);
        RpcResponseMessageHandler responseMessageHandler = new RpcResponseMessageHandler();
        NioEventLoopGroup worker = new NioEventLoopGroup();
        Bootstrap bootstrap = new Bootstrap()
                    .group(worker)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<NioSocketChannel>() {
                        @Override
                        protected void initChannel(NioSocketChannel ch) throws Exception {
//                            ch.pipeline().addLast(new StringEncoder());
                            ch.pipeline().addLast(new ProcotolFrameDecoder());
                            ch.pipeline().addLast(loggingHandler);
                            ch.pipeline().addLast(messageCodec);
                            ch.pipeline().addLast(responseMessageHandler);
                            ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                                @Override
                                public void channelActive(ChannelHandlerContext ctx) throws Exception {
                                    //初始测试
//                                    final RpcRequestMessage message = new RpcRequestMessage(
//                                            1,
//                                            "com.changlu.server.service.rpc.RpcService",
//                                            "sayHello",
//                                            String.class,
//                                            new Class[]{String.class},
//                                            new Object[]{"changlu"}
//                                    );
//                                    ctx.channel().writeAndFlush(message);
                                }
                            });
                        }
                    });
//        Channel channel = null;
        try {
            channel = bootstrap.connect("127.0.0.1", 8080).sync().channel();
            log.debug("客户端连接成功!");
            channel.closeFuture().addListener(new GenericFutureListener<Future<? super Void>>() {
                @Override
                public void operationComplete(Future<? super Void> future) throws Exception {
                    log.debug("客户端关闭连接!");
                    worker.shutdownGracefully();
                }
            });
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }



服务端

RpcResponseMessage

说明:服务器端方法调用结束时响应给客户端的对象封装。


import com.changlu.message.Message;
import lombok.Data;
import lombok.ToString;
/**
 * @ClassName RpcResponseMessage
 * @Author ChangLu
 * @Date 2022/1/16 21:38
 * @Description RPC消息请求对象
 */
@Data
@ToString(callSuper = true)
public class RpcResponseMessage extends Message {
    /**
     * 返回值
     */
    private Object returnValue;
    /**
     * 异常值
     */
    private Exception exceptionValue;
    public RpcResponseMessage() {
    }
    public RpcResponseMessage(Object returnValue, Exception exceptionValue) {
        this.returnValue = returnValue;
        this.exceptionValue = exceptionValue;
    }
    @Override
    public int getMessageType() {
        return RPC_MESSAGE_TYPE_RESPONSE;
    }
}


service、serviceImpl

说明:用于进行rpc调用测试的接口及实现类。


/**
 * @ClassName RpcService
 * @Author ChangLu
 * @Date 2022/1/16 22:02
 * @Description service接口
 */
public interface RpcService {
    /**
     *
     * @param name 用户名
     * @return hello,用户名!
     */
    String sayHello(String name);
}
/**
 * @ClassName RpcSerivceImpl
 * @Author ChangLu
 * @Date 2022/1/16 22:03
 * @Description RpcService实现类
 */
public class RpcServiceImpl implements RpcService{
    @Override
    public String sayHello(String name) {
//        int i = 1/0;  //测试抛出异常
        return "hello," + name + "!";
    }
}



接口类匹配实现类实例机制(配置文件)

说明:对于指定接口的具体实现类,我们通过读取配置文件来进行处理。


application.properties:模拟直接从Spring中根据接口来取出指定的实例(或代理类)


# 接口=实现类
com.changlu.server.service.rpc.RpcService=com.changlu.server.service.rpc.RpcServiceImpl


ServicesFactory.java:之后服务器接收到远程接口调用传来的请求对象即可根据传来的接口取到指定的实现类


import com.sun.deploy.config.Config;
import java.io.InputStream;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/**
 * @ClassName ServicesFactory
 * @Author ChangLu
 * @Date 2022/1/16 22:34
 * @Description 获取service实现类工厂
 */
public class ServicesFactory {
    private static Properties properties;
    private static Map<Class<?>, Object> servicesMap = new ConcurrentHashMap<>();
    static {
        try (InputStream is = Config.class.getResourceAsStream("/application.properties")) {
            properties = new Properties();
            properties.load(is);
            final Set<String> names = properties.stringPropertyNames();
            for (String name : names) {
                //匹配所有的Service结尾键值对
                if (name.endsWith("Service")){
                    final Class<?> interfaceClass = Class.forName(name);
                    final Class<?> instanceClass = Class.forName(properties.getProperty(name));
                    //map以【接口:实现类实例】存储
                    servicesMap.put(interfaceClass, instanceClass.newInstance());
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    //输入:接口class  输出:对应接口实现类实例(properties配置)
    public static  <T> T getServiceImpl(Class<T> clazz){
        return (T) servicesMap.get(clazz);
    }
}


RpcRequestMessageHandler

说明:针对于解码得到的RpcRequestMessage对象来进行方法调用,根据响应结果来设置返回值或异常。


import com.changlu.message.rpc.RpcRequestMessage;
import com.changlu.message.rpc.RpcResponseMessage;
import com.changlu.server.service.rpc.ServicesFactory;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.extern.slf4j.Slf4j;
import java.lang.reflect.Method;
/**
 * @ClassName RpcRequestMessageHandler
 * @Author ChangLu
 * @Date 2022/1/16 22:05
 * @Description TODO
 */
@Slf4j
@ChannelHandler.Sharable
public class RpcRequestMessageHandler extends SimpleChannelInboundHandler<RpcRequestMessage> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, RpcRequestMessage message){
        RpcResponseMessage responseMessage = new RpcResponseMessage();
        responseMessage.setSequenceId(message.getSequenceId());//将请求序列号存储到响应对象中
        Class<?> clazz = null;
        Method method = null;
        Object returnVal = null;
        try {
            clazz = Class.forName(message.getInterfaceName());
            method = clazz.getMethod(message.getMethodName(), message.getParameterTypes());
            returnVal = method.invoke(ServicesFactory.getServiceImpl(clazz), message.getParameterValue());
            //运行成功设置值
            responseMessage.setReturnValue(returnVal);
        } catch (Exception e) {
            e.printStackTrace();
            //出现异常封装好异常信息后传出
            responseMessage.setExceptionValue(new Exception(e.getCause().getMessage()));
        }
        ctx.channel().writeAndFlush(responseMessage);
    }
    public static void main(String[] args) throws Exception{
        final RpcRequestMessage message = new RpcRequestMessage(
                1,
                "com.changlu.server.service.rpc.RpcService",
                "sayHello",
                String.class,
                new Class[]{String.class},
                new Object[]{"changlu"}
        );
        final Class<?> clazz = Class.forName(message.getInterfaceName());
        final Method method = clazz.getMethod(message.getMethodName(), message.getParameterTypes());
        final Object returnVal = method.invoke(ServicesFactory.getServiceImpl(clazz), message.getParameterValue());
        System.out.println(returnVal);
        //ServicesFactory.getServiceImpl测试:根据配置文件来获取指定实现子类的实例!
//        final Object service = ServicesFactory.getServiceImpl(Class.forName(message.getInterfaceName()));
//        RpcServiceImpl rpcService = (RpcServiceImpl)service;
//        System.out.println(rpcService.sayHello("changlu"));
    }
}


Server

说明:服务器实现。


import com.changlu.protocol.MessageCodecSharable;
import com.changlu.protocol.ProcotolFrameDecoder;
import com.changlu.server.handler.rpc.RpcRequestMessageHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;
import java.net.InetSocketAddress;
/**
 * @ClassName RpcServer
 * @Author ChangLu
 * @Date 2022/1/16 21:45
 * @Description RPC服务器
 */
@Slf4j
public class RpcServer {
    public static void main(String[] args) {
        NioEventLoopGroup boss = new NioEventLoopGroup();
        NioEventLoopGroup worker = new NioEventLoopGroup(2);
        //handler
        MessageCodecSharable messageCodec = new MessageCodecSharable();
        LoggingHandler loggingHandler = new LoggingHandler(LogLevel.DEBUG);
        RpcRequestMessageHandler rpcRequestMessageHandler = new RpcRequestMessageHandler();
        try {
            final ChannelFuture future = new ServerBootstrap()
                    .group(boss, worker)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<NioSocketChannel>() {
                        @Override
                        protected void initChannel(NioSocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new ProcotolFrameDecoder());
                            ch.pipeline().addLast(loggingHandler);
                            ch.pipeline().addLast(messageCodec);
                            ch.pipeline().addLast(rpcRequestMessageHandler);
                            ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                                @Override
                                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                    log.debug("get => {}", msg);
                                }
                            });
                        }
                    }).bind(new InetSocketAddress(8080)).sync();
            final Channel channel = future.channel();
            log.debug("服务器启动成功!");
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}



三、测试


该案例针对于远程调用成功、出现异常进行测试。


成功


client:
public class RpcClient {
    public static void main(String[] args) {
        final RpcService proxyService = getProxyService(RpcService.class);
        //测试
        System.out.println(proxyService.sayHello("changlu"));
        System.out.println(proxyService.sayHello("liner"));
        System.out.println(proxyService.sayHello("world!"));
    }
}


实际调用实现类:


/**
 * @ClassName RpcSerivceImpl
 * @Author ChangLu
 * @Date 2022/1/16 22:03
 * @Description RpcService实现类
 */
public class RpcServiceImpl implements RpcService{
    @Override
    public String sayHello(String name) {
        return "hello," + name + "!";
    }
}


结果:





异常


client:调用一次方法即可


public class RpcClient {
    public static void main(String[] args) {
        final RpcService proxyService = getProxyService(RpcService.class);
        //测试
        System.out.println(proxyService.sayHello("changlu"));
    }
}


/**
 * @ClassName RpcSerivceImpl
 * @Author ChangLu
 * @Date 2022/1/16 22:03
 * @Description RpcService实现类
 */
public class RpcServiceImpl implements RpcService{
    @Override
    public String sayHello(String name) {
        int i = 1/0;  //测试抛出异常
        return "hello," + name + "!";
    }
}


结果:



相关文章
|
Dubbo Java 应用服务中间件
Netty入门到超神系列-手撸简单版RPC框架(仿Dubbo)
原理还是比较简单 : 代理 + 线程池 + Netty 下面做一些解释: 首先需要定义一个统一的API接口,例:UserApi , 服务端(provider)需要实现这个接口,提供相应的方法UserApiImpl#save,客户端通过远程来调用该接口。 然后需要约定一个协议,服务器如何才能识别到客户端要调用哪个接口?:我这里用 “接口权限定名#方法名#参数” ,的方式来,因为是一个简单版本的RPC。服务端解析该内容就能匹配对应的接口的实现类,然后调用该方法。并把方法的返回值通过Netty写回给客户端 使用的编解码器都是比价简单的String的编解码器
180 0
|
前端开发
Netty手写RPC框架
创建Request类,继承Message,klass是调用的Class目标,name,parameterType,argument分别是方法名称,参数类型,参数
96 0
|
7月前
|
Java Spring
Spring Boot+Netty实现远程过程调用(RPC)
Spring Boot+Netty实现远程过程调用(RPC)
173 0
|
7月前
|
JSON 算法 Dubbo
Netty入门实践-模拟IM聊天
本文以入门实践为主,通过原理+代码的方式,实现一个简易IM聊天功能。分为2个部分:Netty的核心概念、IM聊天简易实现。
|
Java 中间件 大数据
Netty快速入门RPC项目
Netty快速入门RPC项目
78 0
Netty快速入门RPC项目
|
负载均衡
06RPC - netty实现RPC以及Zookeeper
06RPC - netty实现RPC以及Zookeeper
66 0
|
开发框架 JavaScript 前端开发
如何使用SpringBoot和Netty实现一个WebSocket服务器,并配合Vue前端实现聊天功能?
如何使用SpringBoot和Netty实现一个WebSocket服务器,并配合Vue前端实现聊天功能?
324 0
|
前端开发 JavaScript
Netty异步NIO框架(二)websocket 前端后端聊天 私聊及群聊
Netty异步NIO框架(二)websocket 前端后端聊天 私聊及群聊
|
网络协议 前端开发 Java
Netty异步NIO框架(一)java服务端与客户端实现聊天 websocket通道
Netty异步NIO框架(一)java服务端与客户端实现聊天 websocket通道
|
前端开发 JavaScript Java
Seata 高性能RPC通信的实现基石-Netty篇
Netty是一个异步的、基于事件驱动的网络应用框架,用以快速开发高性能、高可靠性的网络 IO 程序。
189 0