Netty手写RPC框架

简介: 创建Request类,继承Message,klass是调用的Class目标,name,parameterType,argument分别是方法名称,参数类型,参数

首发于Enaium的个人博客


协议就用上篇文章的协议

public class Message implements Serializable {
   
    private final long order;

    public Message(long order) {
   
        this.order = order;
    }

    public long getOrder() {
   
        return order;
    }
}

只不过Message加了个Order熟悉,

创建Request类,继承Message,klass是调用的Class目标,name,parameterType,argument分别是方法名称,参数类型,参数

public class Request extends Message {
   
    private final String klass;
    private final String name;
    private final Class<?>[] parameterType;
    private final Object[] argument;

    public Request(long order, String klass, String name, Class<?>[] parameterType, Object[] argument) {
   
        super(order);
        this.klass = klass;
        this.name = name;
        this.parameterType = parameterType;
        this.argument = argument;
    }

    public String getKlass() {
   
        return klass;
    }

    public String getName() {
   
        return name;
    }

    public Class<?>[] getParameterType() {
   
        return parameterType;
    }

    public Object[] getArgument() {
   
        return argument;
    }
}

创建Response类继承Message,result调用的结果,throwable调用的异常

public class Response extends Message {
   
    private final Object result;
    private final Throwable throwable;

    public Response(long order, Object result, Throwable throwable) {
   
        super(order);
        this.result = result;
        this.throwable = throwable;
    }

    public Object getResult() {
   
        return result;
    }

    public Throwable getThrowable() {
   
        return throwable;
    }
}

创建一个PRCHandler类,来处理请求,用反射调用即可

public class PRCHandler extends SimpleChannelInboundHandler<Request> {
   
    @Override
    public void channelRead0(ChannelHandlerContext channelHandlerContext, Request request) {
   
        try {
   
            Class<?> aClass = Class.forName(request.getKlass());
            Object o = aClass.getConstructor().newInstance();
            Object invoke = aClass.getMethod(request.getName(), request.getParameterType()).invoke(o, request.getArgument());
            channelHandlerContext.channel().writeAndFlush(new Response(request.getOrder(), invoke, null));
        } catch (ClassNotFoundException | InvocationTargetException | InstantiationException | IllegalAccessException | NoSuchMethodException e) {
   
            e.printStackTrace();
            channelHandlerContext.channel().writeAndFlush(new Response(request.getOrder(), null, e.getCause()));
        }
    }
}

接着启动服务器,服务器就这样写好了

public class RPCServer {
   

    private static final LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.INFO);
    private static final MessageCodec MESSAGE_CODEC = new MessageCodec();
    private static final PRCHandler PRC_HANDLER = new PRCHandler();

    public static void main(String[] args) {
   
        NioEventLoopGroup boss = new NioEventLoopGroup();
        NioEventLoopGroup worker = new NioEventLoopGroup();
        try {
   
            Channel localhost = new ServerBootstrap().group(boss, worker).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {
   
                @Override
                public void initChannel(SocketChannel channel) {
   
                    channel.pipeline().addLast(LOGGING_HANDLER);
                    channel.pipeline().addLast(MESSAGE_CODEC);
                    channel.pipeline().addLast(PRC_HANDLER);
                }
            }).bind("localhost", 3828).sync().channel();
            System.out.println("Runnable...");
            localhost.closeFuture().sync();
        } catch (InterruptedException e) {
   
            e.printStackTrace();
        } finally {
   
            boss.shutdownGracefully();
            worker.shutdownGracefully();
        }
    }
}

现在在test里测试一下,写好客户端连接,Hanlder先不用太关注

public class Main {
   

    private static final LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.INFO);
    private static final MessageCodec MESSAGE_CODEC = new MessageCodec();
    private static final Handler HANDLER = new Handler();

    private static Channel channel;

    public static void main(String[] args) {
   
        NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup();
        try {
   
            channel = new Bootstrap().group(nioEventLoopGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {
   
                @Override
                public void initChannel(SocketChannel channel) {
   
                    channel.pipeline().addLast(LOGGING_HANDLER);
                    channel.pipeline().addLast(MESSAGE_CODEC);
                    channel.pipeline().addLast(HANDLER);
                }
            }).connect("localhost", 3828).sync().channel();
        } catch (InterruptedException e) {
   
            e.printStackTrace();
        }

        Runtime.getRuntime().addShutdownHook(new Thread(nioEventLoopGroup::shutdownGracefully));
    }
}

创建一个Call注解,klass是目标类,name是目标类的方法

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface Call {
   
    String klass();

    String name();
}

现在创建一个目标类

public class Target {
   
    public String render() {
   
        return "RENDER HELLO WORLD!";
    }
}

创建个一个Service接口

public interface Service {
   
    @Call(klass = "cn.enaium.Target", name = "render")
    String render();
}

接着使用动态代理

@SuppressWarnings("unchecked")
private static <T> T getService(Class<T> klass) {
   

}

没有Call注解的返回null

Object o = Proxy.newProxyInstance(klass.getClassLoader(), new Class<?>[]{
   klass}, (proxy, method, args) -> {
   
    if (!method.isAnnotationPresent(Call.class)) {
   
        return null;
    }
}

使用Promise来获取结果

Object o = Proxy.newProxyInstance(klass.getClassLoader(), new Class<?>[]{
   klass}, (proxy, method, args) -> {
   
    if (!method.isAnnotationPresent(Call.class)) {
   
        return null;
    }
    Promise<Object> promise = new DefaultPromise<>(channel.eventLoop());
    Call annotation = method.getAnnotation(Call.class);
    long increment = Util.increment();
    channel.writeAndFlush(new Request(increment, annotation.klass(), annotation.name(), method.getParameterTypes(), args));
    Main.HANDLER.getPromiseMap().put(increment, promise);
    promise.await();
    if (promise.cause() != null) {
   
        return new RuntimeException(promise.cause());
    }
    return promise.getNow();
});
@SuppressWarnings("unchecked")
private static <T> T getService(Class<T> klass) {
   
    Object o = Proxy.newProxyInstance(klass.getClassLoader(), new Class<?>[]{
   klass}, (proxy, method, args) -> {
   
        if (!method.isAnnotationPresent(Call.class)) {
   
            return null;
        }
        Promise<Object> promise = new DefaultPromise<>(channel.eventLoop());
        Call annotation = method.getAnnotation(Call.class);
        long increment = Util.increment();
        channel.writeAndFlush(new Request(increment, annotation.klass(), annotation.name(), method.getParameterTypes(), args));
        Main.HANDLER.getPromiseMap().put(increment, promise);
        promise.await();
        if (promise.cause() != null) {
   
            return new RuntimeException(promise.cause());
        }
        return promise.getNow();
    });
    return (T) o;
}

序号自增

public class Util {
   
    private static final AtomicLong atomicLong = new AtomicLong();

    public static long increment() {
   
        return atomicLong.incrementAndGet();
    }
}

Handler来处理响应,根据请求的order获取返回值

public class Handler extends SimpleChannelInboundHandler<Response> {
   

    private final Map<Long, Promise<Object>> promiseMap = new HashMap<>();

    @Override
    public void channelRead0(ChannelHandlerContext channelHandlerContext, Response response) throws Exception {
   

        if (null == promiseMap.get(response.getOrder())) {
   
            return;
        }

        Promise<Object> promise = promiseMap.remove(response.getOrder());

        if (response.getResult() != null) {
   
            promise.setSuccess(response.getResult());
        } else {
   
            promise.setFailure(response.getThrowable());
        }
    }

    public Map<Long, Promise<Object>> getPromiseMap() {
   
        return promiseMap;
    }
}

现在来运行服务器和客户端测试一下

源码

视频

目录
相关文章
|
2月前
|
负载均衡 Dubbo Java
Dubbo 3.x:探索阿里巴巴的开源RPC框架新技术
随着微服务架构的兴起,远程过程调用(RPC)框架成为了关键组件。Dubbo,作为阿里巴巴的开源RPC框架,已经演进到了3.x版本,带来了许多新特性和技术改进。本文将探讨Dubbo 3.x中的一些最新技术,包括服务注册与发现、负载均衡、服务治理等,并通过代码示例展示其使用方式。
76 9
|
6月前
|
缓存 网络协议 Dubbo
异步编程 - 12 异步、基于事件驱动的网络编程框架 Netty
异步编程 - 12 异步、基于事件驱动的网络编程框架 Netty
46 0
|
4月前
|
Dubbo Java 应用服务中间件
Rpc编程系列文章第三篇:Hessian RPC一个老的RPC框架
Rpc编程系列文章第三篇:Hessian RPC一个老的RPC框架
|
存储 设计模式 网络协议
Netty网络框架(一)
Netty网络框架
34 1
|
1月前
|
前端开发 Java 数据库连接
探索Java中最常用的框架:Spring、Spring MVC、Spring Boot、MyBatis和Netty
探索Java中最常用的框架:Spring、Spring MVC、Spring Boot、MyBatis和Netty
|
2月前
|
XML JSON Java
RPC框架之Thrift—实现Go和Java远程过程调用
RPC框架之Thrift—实现Go和Java远程过程调用
46 1
|
2月前
|
Java Spring
Spring Boot+Netty实现远程过程调用(RPC)
Spring Boot+Netty实现远程过程调用(RPC)
70 0
|
2月前
|
前端开发 Java 数据库连接
认识Java中最常用的框架:Spring、Spring MVC、Spring Boot、MyBatis和Netty
Spring框架 Spring是一个轻量级的开源框架,用于构建企业级应用。它提供了广泛的功能,包括依赖注入、面向切面编程、事务管理、消息传递等。Spring的核心思想是控制反转(IoC)和面向切面编程(AOP)。
78 3
|
3月前
|
负载均衡 Java 调度
【分布式技术专题】「探索高性能远程通信」基于Netty的分布式通信框架实现(Dispatcher和EventListener)(下)
经过阅读《【分布式技术专题】「探索高性能远程通信」基于Netty的分布式通信框架实现(附通信协议和代码)(上)》,相信您已经对网络通信框架的网络通信层的实现原理和协议模型有了一定的认识和理解。
41 0
【分布式技术专题】「探索高性能远程通信」基于Netty的分布式通信框架实现(Dispatcher和EventListener)(下)
|
3月前
|
Dubbo Java 应用服务中间件
【分布式技术专题】「探索高性能远程通信」基于Netty的分布式通信框架实现(附通信协议和代码)(上)
今天,我要向大家实现一个基于Netty实现的高性能远程通信框架!这个框架利用了 Netty 的强大功能,提供了快速、可靠的远程通信能力。 无论是构建大规模微服务架构还是实现分布式计算,这个分布式通信框架都是一个不可或缺的利器。
64 2
【分布式技术专题】「探索高性能远程通信」基于Netty的分布式通信框架实现(附通信协议和代码)(上)