基于Netty,20分钟手撸一个RPC框架

简介: 基于Netty,20分钟手撸一个RPC框架

Netty是一款高性能的网络传输框架,作为基础通信组件被RPC框架广泛使用。例如Dubbo协议中使用它进行节点间通信,Hadoop中的Avro组件使用它进行数据文件共享。那么我们就来尝试使用Netty,实现一个简单的RPC框架。

首先,和使用Dubbo类似,我们先抽象出一个服务的API接口,服务提供者实现这个接口中的方法,服务消费者直接调用接口进行访问:

public interface TestService {
    String test(String message);
}

服务方实现该接口,供消费者调用:

public class TestServiceImpl implements TestService {
    @Override
    public String test(String message) {
        System.out.println("Server has received:"+ message);
        if (message !=null){
            return "hi client, Server has Received:["+ message+"]";
        }else{
            return "empty message";
        }
    }
}

然后我们开始使用Netty创建服务的Server端:

public class NettyServer {
    public static void startServer(String hostname,int port){
        EventLoopGroup bossGroup=new NioEventLoopGroup(1);
        EventLoopGroup workerGroup=new NioEventLoopGroup();
        try{
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup,workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(new StringDecoder());
                            pipeline.addLast(new StringEncoder());
                            pipeline.addLast(new NettyServerHandler());
                        }
                    });
            ChannelFuture future = serverBootstrap.bind(hostname, port).sync();
            System.out.println("服务端启动");
            future.channel().closeFuture().sync();
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

在创建Server端时,我们在ChannelPipeline中添加了Netty自带的String类型的编码器与解码器,最后添加我们的业务逻辑处理的handler。

类似于Dubbo在调用中使用了自己的Dubbo协议,我们在调用服务之前,也需要自定义我们的协议,如果接收到的消息不是按照我们定义的协议,则不予处理。这里定义一个简单的协议,来规定我们的消息的开头以什么开始:

public class Protocol {
    public static final String HEADER="My#Protolcol#Header#";
}

创建服务端的handler,用于处理业务逻辑。新建一个类继承ChannelInboundHandlerAdapter,通过channelRead方法接收客户端发送的消息,在方法中判断消息是否以我们自定义的协议头开头,如果是则读取消息,并调用本地方法,最后通过writeAndFlush返回调用的结果。

public class NettyServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("msg="+msg);
        if(msg.toString().startsWith(Protocol.HEADER)){
            String result = new TestServiceImpl().test(msg.toString().substring(msg.toString().lastIndexOf("#") + 1));
            ctx.writeAndFlush(result);
        }
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

至此,服务端我们就已经写完了,再开始写客户端。因为客户端的代码有一点特殊性,所以我们先写处理业务逻辑的NettyClientHandler,之后再实现client端的Netty初始化方法

在handler中,我们要使用多线程来调用服务端的服务,使用channelRead接收服务端返回的结果,所以除了继承ChannelInboundHandlerAdapter父类外,还要实现Callable接口,并重写其中的call方法。

public class NettyClientHandler extends ChannelInboundHandlerAdapter implements Callable {
    private ChannelHandlerContext context;
    //返回的结果
    private String result;
    //客户端调用方法时,传入的参数
    private String param;
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        context = ctx;
    }
    @Override
    public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        result = msg.toString();
        //唤醒等待线程
        notify();
    }
    @Override
    public synchronized Object call() throws Exception {
        context.writeAndFlush(param);
        wait();
        return result;
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
    public void setParam(String param) {
        this.param = param;
    }
}

在上面的代码中,创建了变量context用于存储当前handler的ChannelHandlerContext,这是为了在call方法中使用该context发送消息。与服务器的连接创建后,首先会执行channelActive方法,给该context赋值。

需要注意的是,call方法和channelRead方法的synchronized关键字非常重要,在执行wait方法的时候会释放锁,从而使channelRead方法获取锁,在读取到服务端返回的消息后使用notify唤醒call方法的线程,返回结果。

说完了NettyClientHandler ,我们回过头来写Netty客户端的启动类NettyClient。首先,我们创建一个线程池,用来在后面执行访问的请求,线程池的大小定义为我们的cpu可用线程数。

private static ExecutorService executor 
      = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

因为客户端调用的是接口,需要使用代理模式创建代理对象,我们创建一个getProxy方法用来获取代理对象并进行方法增强:

public Object getProxy(final Class<?> serviceClass, final String protocolHead) {
    return Proxy.newProxyInstance(this.getClass().getClassLoader(), new Class<?>[]{serviceClass}, new InvocationHandler() {
        @Override
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            if (clientHandler == null) {
                initClient();
            }
            clientHandler.setParam(protocolHead + args[0]);
            return executor.submit(clientHandler).get();
        }
    });
}

这里调用了线程池的submit方法提交任务,调用handler中的call方法发送请求。上面的args[0]是调用时的参数,initClient方法用于初始化Netty的client端,代码如下:

private static void initClient() {
    clientHandler = new NettyClientHandler();
    NioEventLoopGroup group = new NioEventLoopGroup();
    try {
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(group)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.TCP_NODELAY, true)
                .handler(
                        new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ChannelPipeline pipeline = ch.pipeline();
                                pipeline.addLast(new StringDecoder());
                                pipeline.addLast(new StringEncoder());
                                pipeline.addLast(clientHandler);
                            }
                        }
                );
        bootstrap.connect("127.0.0.1", 7000).sync();
        System.out.println("客户端启动");
    } catch (Exception e) {
        e.printStackTrace();
    }
}

NettyClient端的ChannelPipeline中同样添加了编码解码器,与我们自己实现的业务逻辑handler。

至此,客户端与服务端的功能就完成了,我们创建启动类,先启动服务端:

public class ProviderBootstrap {
    public static void main(String[] args) {
        NettyServer.startServer("127.0.0.1",7000);
    }
}

再启动客户端:

public class ConsumerBootstrap {
    public static void main(String[] args) {
        NettyClient consumer = new NettyClient();
        TestService proxy =(TestService) consumer.getProxy(TestService.class, Protocol.HEADER);
        String result = proxy.test("hi,i am client");
        System.out.println("result: "+result);
    }
}

最后看一下运行结果,先看服务提供者:

image.png

收到的消息以我们的协议开头,将协议头剔除后获得消息正文,作为RPC调用方法的参数,传递给请求的方法。再看服务消费者端:

image.png

接收到了服务提供端返回的信息。这样,一个简单的RPC框架就已经实现了。

相关文章
|
NoSQL 前端开发 Java
Lettuce的特性和内部实现问题之Lettuce基于Netty框架实现的问题如何解决
Lettuce的特性和内部实现问题之Lettuce基于Netty框架实现的问题如何解决
213 0
|
21天前
|
监控 前端开发 安全
Netty 高性能网络编程框架技术详解与实践指南
本文档全面介绍 Netty 高性能网络编程框架的核心概念、架构设计和实践应用。作为 Java 领域最优秀的 NIO 框架之一,Netty 提供了异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。本文将深入探讨其 Reactor 模型、ChannelPipeline、编解码器、内存管理等核心机制,帮助开发者构建高性能的网络应用系统。
152 0
|
5月前
|
Java 开发者 索引
Netty基础—6.Netty实现RPC服务
本文详细介绍了RPC(远程过程调用)的相关概念及其实现细节,涵盖动态代理、Netty客户端和服务端处理、编码解码器以及超时功能的实现。
|
编解码 分布式计算 网络协议
Netty高性能网络框架(一)
Netty高性能网络框架(一)
|
存储 缓存 Linux
【实战指南】嵌入式RPC框架设计实践:六大核心类构建高效RPC框架
在先前的文章基础上,本文讨论如何通过分层封装提升一个针对嵌入式Linux的RPC框架的易用性。设计包括自动服务注册、高性能通信、泛型序列化和简洁API。框架分为6个关键类:BindingHub、SharedRingBuffer、Parcel、Binder、IBinder和BindInterface。BindingHub负责服务注册,SharedRingBuffer实现高效数据传输,Parcel处理序列化,而Binder和IBinder分别用于服务端和客户端交互。BindInterface提供简单的初始化接口,简化应用集成。测试案例展示了客户端和服务端的交互,验证了RPC功能的有效性。
679 81
|
12月前
|
NoSQL Java Redis
redis的基本命令,并用netty操作redis(不使用springboot或者spring框架)就单纯的用netty搞。
这篇文章介绍了Redis的基本命令,并展示了如何使用Netty框架直接与Redis服务器进行通信,包括设置Netty客户端、编写处理程序以及初始化Channel的完整示例代码。
248 1
redis的基本命令,并用netty操作redis(不使用springboot或者spring框架)就单纯的用netty搞。
|
11月前
|
自然语言处理 负载均衡 API
gRPC 一种现代、开源、高性能的远程过程调用 (RPC) 可以在任何地方运行的框架
gRPC 是一种现代开源高性能远程过程调用(RPC)框架,支持多种编程语言,可在任何环境中运行。它通过高效的连接方式,支持负载平衡、跟踪、健康检查和身份验证,适用于微服务架构、移动设备和浏览器客户端连接后端服务等场景。gRPC 使用 Protocol Buffers 作为接口定义语言,支持四种服务方法:一元 RPC、服务器流式处理、客户端流式处理和双向流式处理。
|
Dubbo 网络协议 Java
RPC框架:一文带你搞懂RPC
这篇文章全面介绍了RPC(远程过程调用)的概念、原理和应用场景,解释了RPC如何工作以及为什么在分布式系统中广泛使用,并探讨了几种常用的RPC框架如Thrift、gRPC、Dubbo和Spring Cloud,同时详细阐述了RPC调用流程和实现透明化远程服务调用的关键技术,包括动态代理和消息的编码解码过程。
RPC框架:一文带你搞懂RPC
|
XML 负载均衡 监控
分布式-dubbo-简易版的RPC框架
分布式-dubbo-简易版的RPC框架
|
设计模式 缓存 算法
Netty框架的重要性
Netty框架的重要性