【JAVA】如何基于Netty实现简单的RPC 框架

简介: 【JAVA】如何基于Netty实现简单的RPC 框架

如何基于Netty实现简单的RPC 框架

1. 项目模块与依赖

common 模块依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>myRPC</artifactId>
        <groupId>com.sgg</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>common</artifactId>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
        <!--netty依赖 -->
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
        </dependency>
        <!--json依赖 -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.80</version>
        </dependency>
        <!--lombok依赖 -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.22</version>
        </dependency>
    </dependencies>
</project>

rpc-client模块依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>myRPC</artifactId>
        <groupId>com.sgg</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>rpc-client</artifactId>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>com.sgg</groupId>
            <artifactId>common</artifactId>
            <version>1.0-SNAPSHOT</version>
        </dependency>
    </dependencies>
</project>

rpc-server 模块依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>myRPC</artifactId>
        <groupId>com.sgg</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>rpc-server</artifactId>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>com.sgg</groupId>
            <artifactId>common</artifactId>
            <version>1.0-SNAPSHOT</version>
        </dependency>

        <!--spring相关依赖 -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-autoconfigure</artifactId>
        </dependency>
    </dependencies>

</project>

myRPC

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.sgg</groupId>
    <artifactId>myRPC</artifactId>
    <packaging>pom</packaging>
    <version>1.0-SNAPSHOT</version>
    <modules>
        <module>common</module>
        <module>rpc-client</module>
        <module>rpc-server</module>
    </modules>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.4.RELEASE</version>
    </parent>


</project>

2. common 通用模块

image-20220507102547079

2.1 RpcRequest

package com.sgg.pojo;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * @author sz
 * @DATE 2022/5/6  21:54
 */
@Data
@AllArgsConstructor
@NoArgsConstructor
public class RpcRequest {

    /**
     * 全限定类名
     */
    private String className;


    /**
     * 方法名
     */
    private String methodName;

    /**
     * 参数类型
     */
    private Class<?>[] parameterTypes;


    /**
     * 实参
     */
    private Object[] paramters;
}

2.2 RpcResponse

package com.sgg.pojo;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * @author sz
 * @DATE 2022/5/6  21:54
 */
@Data
@AllArgsConstructor
@NoArgsConstructor
public class RpcResponse {

    //返回状态码
    private Integer code;
    //返回结果
    private String result;
    //错误信息
    private String error;

}

2.3 User

package com.sgg.pojo;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * @author sz
 * @DATE 2022/5/6  21:55
 */
@Data
@AllArgsConstructor
@NoArgsConstructor
public class User {

    private Integer id;
    private String name;

}

2.4 UserService

package com.sgg.service;

import com.sgg.pojo.User;

public interface UserService {
    User getUserById(Integer id);
}

3. rpc-server 服务端模块

image-20220507102807642

3.1 MyServiceRpc

package com.sgg.anno;


import java.lang.annotation.*;

@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface MyServiceRpc {
}

3.2 MyServerHandler

package com.sgg.handler;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.sgg.anno.MyServiceRpc;
import com.sgg.pojo.RpcRequest;
import com.sgg.pojo.RpcResponse;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.springframework.beans.BeansException;
import org.springframework.cglib.reflect.FastClass;
import org.springframework.cglib.reflect.FastMethod;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;

import java.lang.reflect.InvocationTargetException;
import java.util.*;

/**
 * @author sz
 * @DATE 2022/5/6  22:16
 */
@Component
public class MyServerHandler extends SimpleChannelInboundHandler<String> implements ApplicationContextAware {

    private static ApplicationContext app;
    private static HashMap<String, Object> cache = new HashMap<>();

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        app = applicationContext;
        //拿到容器中所有标注了@MyServiceRpc 注解的 bean
        Map<String, Object> beansWithAnnotation = app.getBeansWithAnnotation(MyServiceRpc.class);
        //拿到bean实现的接口的全限定类名
        Set<Map.Entry<String, Object>> entries = beansWithAnnotation.entrySet();
        entries.stream().forEach(ent->{
            Class<?>[] interfaces = ent.getValue().getClass().getInterfaces();
            if (null!=interfaces && interfaces.length != 0){
                Arrays.stream(interfaces).forEach(inter->{
                    cache.put(inter.getName(),ent.getValue());
                });
            }
        });
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("客户端连接 : "+ctx.channel().remoteAddress().toString().substring(1));
    }

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, String json) throws Exception {
        //封装结果
        RpcResponse rpcResponse = new RpcResponse();

        Object result = null;
        try {
            //将json字符串转换为RpcRequest 对象
            RpcRequest rpcRequest = JSONObject.parseObject(json, RpcRequest.class);
            //拿到需要调用的类
            String className = rpcRequest.getClassName();
            Object bean = cache.get(className);
            //需要调用的方法名
            String methodName = rpcRequest.getMethodName();
            //方法参数类型
            Class<?>[] parameterTypes = rpcRequest.getParameterTypes();
            //方法实参
            Object[] paramters = rpcRequest.getParamters();

            //反射调用方法
            FastClass fastClass = FastClass.create(bean.getClass());
            FastMethod fastClassMethod = fastClass.getMethod(methodName, parameterTypes);
            result = fastClassMethod.invoke(bean, paramters);
            rpcResponse.setCode(200);
            rpcResponse.setResult((String) result);
        } catch (Exception e) {
            e.printStackTrace();
            rpcResponse.setCode(400);
            rpcResponse.setError(e.getMessage());
        }

        //将结果用json字符串写回去
        channelHandlerContext.writeAndFlush(JSON.toJSONString(rpcResponse));
    }


}

3.3 ServerProvider

package com.sgg.provider;

import com.sgg.handler.MyServerHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ServerChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import org.springframework.stereotype.Service;

import java.io.Closeable;
import java.io.IOException;
import java.net.ServerSocket;

/**
 * @author sz
 * @DATE 2022/5/6  22:07
 */
@Service
public class ServerProvider implements Closeable {

    private NioEventLoopGroup boss ;
    private NioEventLoopGroup work ;



    public void start(String ip,Integer port)  {
        //创建两个线程组
        boss = new NioEventLoopGroup(1);
        //默认线程数 = CPU数 * 2
        work = new NioEventLoopGroup();

        //创建启动组手
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(boss, work)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            //解析字符串
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            pipeline.addLast(new StringEncoder());
                            pipeline.addLast(new StringDecoder());
                            //内容处理
                            pipeline.addLast(new MyServerHandler());
                        }
                    });

            ChannelFuture channelFuture = serverBootstrap.bind(ip, port).sync();
            System.out.println(">>>>>>>服务器启动成功<<<<<<<<");
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
            if (null!=boss){
                boss.shutdownGracefully();
            }
            if (null!=boss){
                work.shutdownGracefully();
            }
        }
    }


    @Override
    public void close() throws IOException {
        System.out.println("容器关闭我被调用了");
        if (null!=boss){
            boss.shutdownGracefully();
        }
        if (null!=boss){
            work.shutdownGracefully();
        }
    }
}

3.4 UserServiceImpl

package com.sgg.service.impl;

import com.sgg.anno.MyServiceRpc;
import com.sgg.pojo.User;
import com.sgg.service.UserService;
import org.springframework.stereotype.Service;

import java.util.HashMap;

/**
 * @author sz
 * @DATE 2022/5/6  22:18
 */
@MyServiceRpc
@Service
public class UserServiceImpl implements UserService {

    private static HashMap<Integer,User> map = new HashMap();

    static {
        map.put(1,new User(1,"张三"));
        map.put(2,new User(2,"李四"));
    }

    @Override
    public User getUserById(Integer id) {
        return map.get(id);
    }

}

3.5 ServerApp

package com.sgg;

import com.sgg.provider.ServerProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * @author sz
 * @DATE 2022/5/6  22:04
 */
@SpringBootApplication
public class ServerApp implements CommandLineRunner {

    @Autowired
    private ServerProvider serverProvider;

    public static void main(String[] args) {
        SpringApplication.run(ServerApp.class,args);
    }

    @Override
    public void run(String... args) throws Exception {
        new Thread(()->{
                serverProvider.start("127.0.0.1",9999);
        }).start();
    }
}

4. rpc-client 客户端模块

image-20220507103000153

4.1 RpcClient

package com.sgg.client;

import com.sgg.handler.MyClientHandler;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

/**
 * @author sz
 * @DATE 2022/5/6  22:54
 */
@Data
@AllArgsConstructor
@NoArgsConstructor
public class RpcClient {

    private String ip;
    private Integer port;

    public RpcClient(String ip, Integer port) {
        this.ip = ip;
        this.port = port;
        init();
    }

    private NioEventLoopGroup eventLoopGroup;
    private Channel channel;
    private MyClientHandler myClientHandler = new MyClientHandler();
    private ExecutorService executorService = Executors.newCachedThreadPool();

    public Object sendMess(String message) throws ExecutionException, InterruptedException {
        myClientHandler.setRequestMsg(message);
        Future submit = executorService.submit(myClientHandler);
        return submit.get();
    }

   public void init() {
        //创建线程组
        eventLoopGroup = new NioEventLoopGroup();
        //创建启动组手
        Bootstrap bootstrap = new Bootstrap();
        //分组
        try {
            bootstrap.group(eventLoopGroup)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.SO_KEEPALIVE, Boolean.TRUE)
                    .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            pipeline.addLast(new StringEncoder());
                            pipeline.addLast(new StringDecoder());
                            //业务
                            pipeline.addLast(myClientHandler);
                        }
                    });

            channel = bootstrap.connect(ip, port).sync().channel();
        } catch (Exception e) {
            e.printStackTrace();
            if (null != channel) {
                channel.close();
            }
            if (null != eventLoopGroup) {
                eventLoopGroup.shutdownGracefully();
            }
        }
    }

    public void close() {
        if (null != channel) {
            channel.close();
        }
        if (null != eventLoopGroup) {
            eventLoopGroup.shutdownGracefully();
        }
    }
}

4.2 MyClientHandler

package com.sgg.handler;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

import java.util.concurrent.Callable;

/**
 * @author sz
 * @DATE 2022/5/6  23:04
 */
public class MyClientHandler extends SimpleChannelInboundHandler<String> implements Callable {

    private String requestMsg;
    private String responseMsg;

    private ChannelHandlerContext context;

    public void setRequestMsg(String str) {
        this.requestMsg = str;
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        this.context = ctx;
    }

    @Override
    protected synchronized void channelRead0(ChannelHandlerContext channelHandlerContext, String str) throws Exception {
        this.responseMsg = str;
        //唤醒
        notify();
    }

    @Override
    public synchronized Object call() throws Exception {
        this.context.writeAndFlush(requestMsg);
        //线程等待  拿到响应数据
        wait();
        return responseMsg;
    }
}

4.3 RpcProxy

package com.sgg.proxy;

import com.alibaba.fastjson.JSON;
import com.sgg.client.RpcClient;
import com.sgg.pojo.RpcRequest;
import com.sgg.pojo.RpcResponse;
import com.sgg.pojo.User;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;

/**
 * @author sz
 * @DATE 2022/5/6  22:46
 */
public class RpcProxy {

    public static Object createProxy(Class target) {
        return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
                new Class[]{target},
                (Object proxy, Method method, Object[] args) -> {

                    RpcRequest rpcRequest = new RpcRequest();
                    //设置类名
                    rpcRequest.setClassName(method.getDeclaringClass().getName());
                    //设置方法名
                    rpcRequest.setMethodName(method.getName());
                    //设置方法参数类型
                    rpcRequest.setParameterTypes(method.getParameterTypes());
                    //设置方法实际参数
                    rpcRequest.setParamters(args);

                    //发送信息,拿到返回值
                    RpcClient rpcClient = new RpcClient("127.0.0.1", 9999);
                    String mess = (String) rpcClient.sendMess(JSON.toJSONString(rpcRequest));
                    //转换为rpcResponse
                    RpcResponse rpcResponse = JSON.parseObject(mess, RpcResponse.class);
                    //拿到返回结果
                    if (200==rpcResponse.getCode()){
                        return JSON.parseObject(rpcResponse.getResult(), User.class);
                    }

                    return null;
                }
        );
    }

}

4.4 ClientApp

package com.sgg;

import com.sgg.pojo.User;
import com.sgg.proxy.RpcProxy;
import com.sgg.service.UserService;

/**
 * @author sz
 * @DATE 2022/5/6  22:44
 */

public class ClientApp {

    public static void main(String[] args) {

        UserService proxy = (UserService) RpcProxy.createProxy(UserService.class);
        User userById = proxy.getUserById(2);
        System.out.println("userById = " + userById);
    }

}
相关文章
|
4天前
|
NoSQL 前端开发 Java
Lettuce的特性和内部实现问题之Lettuce基于Netty框架实现的问题如何解决
Lettuce的特性和内部实现问题之Lettuce基于Netty框架实现的问题如何解决
|
5天前
|
Java 容器
java集合框架复习----(1)
这篇文章提供了Java集合框架的复习资料,包括集合的概念、Collection接口的使用,以及如何通过代码示例演示集合的操作,如增加、删除元素,以及遍历集合元素。
java集合框架复习----(1)
|
5天前
|
存储 安全 Java
java集合框架复习----(2)List
这篇文章是关于Java集合框架中List集合的详细复习,包括List的特点、常用方法、迭代器的使用,以及ArrayList、Vector和LinkedList三种实现类的比较和泛型在Java中的使用示例。
java集合框架复习----(2)List
|
5天前
|
存储 安全 Java
java集合框架复习----(4)Map、List、set
这篇文章是Java集合框架的复习总结,重点介绍了Map集合的特点和HashMap的使用,以及Collections工具类的使用示例,同时回顾了List、Set和Map集合的概念和特点,以及Collection工具类的作用。
java集合框架复习----(4)Map、List、set
|
11天前
|
存储 算法 Java
14 Java集合(集合框架+泛型+ArrayList类+LinkedList类+Vector类+HashSet类等)
14 Java集合(集合框架+泛型+ArrayList类+LinkedList类+Vector类+HashSet类等)
30 2
14 Java集合(集合框架+泛型+ArrayList类+LinkedList类+Vector类+HashSet类等)
|
5天前
|
存储 Java
java集合框架复习----(3)Set
这篇文章详细介绍了Java集合框架中的Set集合,包括HashSet和TreeSet的特点、实现原理和使用示例,展示了Set集合的无序性、元素唯一性以及如何通过自定义比较器实现元素的排序。
|
6天前
|
前端开发 Java Spring
springboot 整合 netty框架, 实现 心跳检测,自动重连
springboot 整合 netty框架, 实现 心跳检测,自动重连
|
6天前
|
安全 前端开发 Java
Web端系统开发解决跨域问题——以Java SpringBoot框架配置Cors为例
在Web安全上下文中,源(Origin)是指一个URL的协议、域名和端口号的组合。这三个部分共同定义了资源的来源,浏览器会根据这些信息来判断两个资源是否属于同一源。例如,https://www.example.com:443和http://www.example.com虽然域名相同,但由于协议和端口号不同,它们被视为不同的源。同源(Same-Origin)是指两个URL的协议、域名和端口号完全相同。只有当这些条件都满足时,浏览器才认为这两个资源来自同一源,从而允许它们之间的交互操作。
Web端系统开发解决跨域问题——以Java SpringBoot框架配置Cors为例
|
6天前
|
JavaScript 前端开发 网络协议
WebSocket在Java Spring Boot+Vue框架中实现消息推送功能
在现代Web应用中,实时消息提醒是一项非常重要的功能,能够极大地提升用户体验。WebSocket作为一种在单个TCP连接上进行全双工通信的协议,为实现实时消息提醒提供了高效且低延迟的解决方案。本文将详细介绍如何在Java Spring Boot后端和Vue前端框架中利用WebSocket实现消息提醒功能。
20 0
|
17天前
|
Java 测试技术 API
深入理解单元测试:JUnit框架在Java中的应用
【8月更文挑战第3天】本文将引导读者通过JUnit框架的镜头,探索单元测试的奥秘。我们将一起揭开单元测试的神秘面纱,了解其在软件开发中的关键作用,并具体学习如何在Java项目中应用JUnit进行有效的单元测试。文章不仅会涉及理论概念,还将通过具体的代码示例,展示如何编写和运行单元测试,以确保软件质量。让我们开始吧,一起踏上这段提升代码质量和开发效率的旅程。
14 0