【JAVA】如何基于Netty实现简单的RPC 框架

简介: 【JAVA】如何基于Netty实现简单的RPC 框架

如何基于Netty实现简单的RPC 框架

1. 项目模块与依赖

common 模块依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>myRPC</artifactId>
        <groupId>com.sgg</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>common</artifactId>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
        <!--netty依赖 -->
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
        </dependency>
        <!--json依赖 -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.80</version>
        </dependency>
        <!--lombok依赖 -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.22</version>
        </dependency>
    </dependencies>
</project>

rpc-client模块依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>myRPC</artifactId>
        <groupId>com.sgg</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>rpc-client</artifactId>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>com.sgg</groupId>
            <artifactId>common</artifactId>
            <version>1.0-SNAPSHOT</version>
        </dependency>
    </dependencies>
</project>

rpc-server 模块依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>myRPC</artifactId>
        <groupId>com.sgg</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>rpc-server</artifactId>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>com.sgg</groupId>
            <artifactId>common</artifactId>
            <version>1.0-SNAPSHOT</version>
        </dependency>

        <!--spring相关依赖 -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-autoconfigure</artifactId>
        </dependency>
    </dependencies>

</project>

myRPC

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.sgg</groupId>
    <artifactId>myRPC</artifactId>
    <packaging>pom</packaging>
    <version>1.0-SNAPSHOT</version>
    <modules>
        <module>common</module>
        <module>rpc-client</module>
        <module>rpc-server</module>
    </modules>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.4.RELEASE</version>
    </parent>


</project>

2. common 通用模块

image-20220507102547079

2.1 RpcRequest

package com.sgg.pojo;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * @author sz
 * @DATE 2022/5/6  21:54
 */
@Data
@AllArgsConstructor
@NoArgsConstructor
public class RpcRequest {

    /**
     * 全限定类名
     */
    private String className;


    /**
     * 方法名
     */
    private String methodName;

    /**
     * 参数类型
     */
    private Class<?>[] parameterTypes;


    /**
     * 实参
     */
    private Object[] paramters;
}

2.2 RpcResponse

package com.sgg.pojo;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * @author sz
 * @DATE 2022/5/6  21:54
 */
@Data
@AllArgsConstructor
@NoArgsConstructor
public class RpcResponse {

    //返回状态码
    private Integer code;
    //返回结果
    private String result;
    //错误信息
    private String error;

}

2.3 User

package com.sgg.pojo;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * @author sz
 * @DATE 2022/5/6  21:55
 */
@Data
@AllArgsConstructor
@NoArgsConstructor
public class User {

    private Integer id;
    private String name;

}

2.4 UserService

package com.sgg.service;

import com.sgg.pojo.User;

public interface UserService {
    User getUserById(Integer id);
}

3. rpc-server 服务端模块

image-20220507102807642

3.1 MyServiceRpc

package com.sgg.anno;


import java.lang.annotation.*;

@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface MyServiceRpc {
}

3.2 MyServerHandler

package com.sgg.handler;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.sgg.anno.MyServiceRpc;
import com.sgg.pojo.RpcRequest;
import com.sgg.pojo.RpcResponse;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.springframework.beans.BeansException;
import org.springframework.cglib.reflect.FastClass;
import org.springframework.cglib.reflect.FastMethod;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;

import java.lang.reflect.InvocationTargetException;
import java.util.*;

/**
 * @author sz
 * @DATE 2022/5/6  22:16
 */
@Component
public class MyServerHandler extends SimpleChannelInboundHandler<String> implements ApplicationContextAware {

    private static ApplicationContext app;
    private static HashMap<String, Object> cache = new HashMap<>();

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        app = applicationContext;
        //拿到容器中所有标注了@MyServiceRpc 注解的 bean
        Map<String, Object> beansWithAnnotation = app.getBeansWithAnnotation(MyServiceRpc.class);
        //拿到bean实现的接口的全限定类名
        Set<Map.Entry<String, Object>> entries = beansWithAnnotation.entrySet();
        entries.stream().forEach(ent->{
            Class<?>[] interfaces = ent.getValue().getClass().getInterfaces();
            if (null!=interfaces && interfaces.length != 0){
                Arrays.stream(interfaces).forEach(inter->{
                    cache.put(inter.getName(),ent.getValue());
                });
            }
        });
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("客户端连接 : "+ctx.channel().remoteAddress().toString().substring(1));
    }

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, String json) throws Exception {
        //封装结果
        RpcResponse rpcResponse = new RpcResponse();

        Object result = null;
        try {
            //将json字符串转换为RpcRequest 对象
            RpcRequest rpcRequest = JSONObject.parseObject(json, RpcRequest.class);
            //拿到需要调用的类
            String className = rpcRequest.getClassName();
            Object bean = cache.get(className);
            //需要调用的方法名
            String methodName = rpcRequest.getMethodName();
            //方法参数类型
            Class<?>[] parameterTypes = rpcRequest.getParameterTypes();
            //方法实参
            Object[] paramters = rpcRequest.getParamters();

            //反射调用方法
            FastClass fastClass = FastClass.create(bean.getClass());
            FastMethod fastClassMethod = fastClass.getMethod(methodName, parameterTypes);
            result = fastClassMethod.invoke(bean, paramters);
            rpcResponse.setCode(200);
            rpcResponse.setResult((String) result);
        } catch (Exception e) {
            e.printStackTrace();
            rpcResponse.setCode(400);
            rpcResponse.setError(e.getMessage());
        }

        //将结果用json字符串写回去
        channelHandlerContext.writeAndFlush(JSON.toJSONString(rpcResponse));
    }


}

3.3 ServerProvider

package com.sgg.provider;

import com.sgg.handler.MyServerHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ServerChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import org.springframework.stereotype.Service;

import java.io.Closeable;
import java.io.IOException;
import java.net.ServerSocket;

/**
 * @author sz
 * @DATE 2022/5/6  22:07
 */
@Service
public class ServerProvider implements Closeable {

    private NioEventLoopGroup boss ;
    private NioEventLoopGroup work ;



    public void start(String ip,Integer port)  {
        //创建两个线程组
        boss = new NioEventLoopGroup(1);
        //默认线程数 = CPU数 * 2
        work = new NioEventLoopGroup();

        //创建启动组手
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(boss, work)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            //解析字符串
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            pipeline.addLast(new StringEncoder());
                            pipeline.addLast(new StringDecoder());
                            //内容处理
                            pipeline.addLast(new MyServerHandler());
                        }
                    });

            ChannelFuture channelFuture = serverBootstrap.bind(ip, port).sync();
            System.out.println(">>>>>>>服务器启动成功<<<<<<<<");
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
            if (null!=boss){
                boss.shutdownGracefully();
            }
            if (null!=boss){
                work.shutdownGracefully();
            }
        }
    }


    @Override
    public void close() throws IOException {
        System.out.println("容器关闭我被调用了");
        if (null!=boss){
            boss.shutdownGracefully();
        }
        if (null!=boss){
            work.shutdownGracefully();
        }
    }
}

3.4 UserServiceImpl

package com.sgg.service.impl;

import com.sgg.anno.MyServiceRpc;
import com.sgg.pojo.User;
import com.sgg.service.UserService;
import org.springframework.stereotype.Service;

import java.util.HashMap;

/**
 * @author sz
 * @DATE 2022/5/6  22:18
 */
@MyServiceRpc
@Service
public class UserServiceImpl implements UserService {

    private static HashMap<Integer,User> map = new HashMap();

    static {
        map.put(1,new User(1,"张三"));
        map.put(2,new User(2,"李四"));
    }

    @Override
    public User getUserById(Integer id) {
        return map.get(id);
    }

}

3.5 ServerApp

package com.sgg;

import com.sgg.provider.ServerProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * @author sz
 * @DATE 2022/5/6  22:04
 */
@SpringBootApplication
public class ServerApp implements CommandLineRunner {

    @Autowired
    private ServerProvider serverProvider;

    public static void main(String[] args) {
        SpringApplication.run(ServerApp.class,args);
    }

    @Override
    public void run(String... args) throws Exception {
        new Thread(()->{
                serverProvider.start("127.0.0.1",9999);
        }).start();
    }
}

4. rpc-client 客户端模块

image-20220507103000153

4.1 RpcClient

package com.sgg.client;

import com.sgg.handler.MyClientHandler;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

/**
 * @author sz
 * @DATE 2022/5/6  22:54
 */
@Data
@AllArgsConstructor
@NoArgsConstructor
public class RpcClient {

    private String ip;
    private Integer port;

    public RpcClient(String ip, Integer port) {
        this.ip = ip;
        this.port = port;
        init();
    }

    private NioEventLoopGroup eventLoopGroup;
    private Channel channel;
    private MyClientHandler myClientHandler = new MyClientHandler();
    private ExecutorService executorService = Executors.newCachedThreadPool();

    public Object sendMess(String message) throws ExecutionException, InterruptedException {
        myClientHandler.setRequestMsg(message);
        Future submit = executorService.submit(myClientHandler);
        return submit.get();
    }

   public void init() {
        //创建线程组
        eventLoopGroup = new NioEventLoopGroup();
        //创建启动组手
        Bootstrap bootstrap = new Bootstrap();
        //分组
        try {
            bootstrap.group(eventLoopGroup)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.SO_KEEPALIVE, Boolean.TRUE)
                    .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            pipeline.addLast(new StringEncoder());
                            pipeline.addLast(new StringDecoder());
                            //业务
                            pipeline.addLast(myClientHandler);
                        }
                    });

            channel = bootstrap.connect(ip, port).sync().channel();
        } catch (Exception e) {
            e.printStackTrace();
            if (null != channel) {
                channel.close();
            }
            if (null != eventLoopGroup) {
                eventLoopGroup.shutdownGracefully();
            }
        }
    }

    public void close() {
        if (null != channel) {
            channel.close();
        }
        if (null != eventLoopGroup) {
            eventLoopGroup.shutdownGracefully();
        }
    }
}

4.2 MyClientHandler

package com.sgg.handler;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

import java.util.concurrent.Callable;

/**
 * @author sz
 * @DATE 2022/5/6  23:04
 */
public class MyClientHandler extends SimpleChannelInboundHandler<String> implements Callable {

    private String requestMsg;
    private String responseMsg;

    private ChannelHandlerContext context;

    public void setRequestMsg(String str) {
        this.requestMsg = str;
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        this.context = ctx;
    }

    @Override
    protected synchronized void channelRead0(ChannelHandlerContext channelHandlerContext, String str) throws Exception {
        this.responseMsg = str;
        //唤醒
        notify();
    }

    @Override
    public synchronized Object call() throws Exception {
        this.context.writeAndFlush(requestMsg);
        //线程等待  拿到响应数据
        wait();
        return responseMsg;
    }
}

4.3 RpcProxy

package com.sgg.proxy;

import com.alibaba.fastjson.JSON;
import com.sgg.client.RpcClient;
import com.sgg.pojo.RpcRequest;
import com.sgg.pojo.RpcResponse;
import com.sgg.pojo.User;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;

/**
 * @author sz
 * @DATE 2022/5/6  22:46
 */
public class RpcProxy {

    public static Object createProxy(Class target) {
        return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
                new Class[]{target},
                (Object proxy, Method method, Object[] args) -> {

                    RpcRequest rpcRequest = new RpcRequest();
                    //设置类名
                    rpcRequest.setClassName(method.getDeclaringClass().getName());
                    //设置方法名
                    rpcRequest.setMethodName(method.getName());
                    //设置方法参数类型
                    rpcRequest.setParameterTypes(method.getParameterTypes());
                    //设置方法实际参数
                    rpcRequest.setParamters(args);

                    //发送信息,拿到返回值
                    RpcClient rpcClient = new RpcClient("127.0.0.1", 9999);
                    String mess = (String) rpcClient.sendMess(JSON.toJSONString(rpcRequest));
                    //转换为rpcResponse
                    RpcResponse rpcResponse = JSON.parseObject(mess, RpcResponse.class);
                    //拿到返回结果
                    if (200==rpcResponse.getCode()){
                        return JSON.parseObject(rpcResponse.getResult(), User.class);
                    }

                    return null;
                }
        );
    }

}

4.4 ClientApp

package com.sgg;

import com.sgg.pojo.User;
import com.sgg.proxy.RpcProxy;
import com.sgg.service.UserService;

/**
 * @author sz
 * @DATE 2022/5/6  22:44
 */

public class ClientApp {

    public static void main(String[] args) {

        UserService proxy = (UserService) RpcProxy.createProxy(UserService.class);
        User userById = proxy.getUserById(2);
        System.out.println("userById = " + userById);
    }

}
相关文章
|
10天前
|
编解码 分布式计算 网络协议
Netty高性能网络框架(一)
Netty高性能网络框架(一)
|
10天前
|
人工智能 开发框架 Java
重磅发布!AI 驱动的 Java 开发框架:Spring AI Alibaba
随着生成式 AI 的快速发展,基于 AI 开发框架构建 AI 应用的诉求迅速增长,涌现出了包括 LangChain、LlamaIndex 等开发框架,但大部分框架只提供了 Python 语言的实现。但这些开发框架对于国内习惯了 Spring 开发范式的 Java 开发者而言,并非十分友好和丝滑。因此,我们基于 Spring AI 发布并快速演进 Spring AI Alibaba,通过提供一种方便的 API 抽象,帮助 Java 开发者简化 AI 应用的开发。同时,提供了完整的开源配套,包括可观测、网关、消息队列、配置中心等。
540 8
|
9天前
|
算法 Java
Java项目不使用框架如何实现限流?
Java项目不使用框架如何实现限流?
19 2
|
14天前
|
Kubernetes Java Android开发
用 Quarkus 框架优化 Java 微服务架构的设计与实现
Quarkus 是专为 GraalVM 和 OpenJDK HotSpot 设计的 Kubernetes Native Java 框架,提供快速启动、低内存占用及高效开发体验,显著优化了 Java 在微服务架构中的表现。它采用提前编译和懒加载技术实现毫秒级启动,通过优化类加载机制降低内存消耗,并支持多种技术和框架集成,如 Kubernetes、Docker 及 Eclipse MicroProfile,助力开发者轻松构建强大微服务应用。例如,在电商场景中,可利用 Quarkus 快速搭建商品管理和订单管理等微服务,提升系统响应速度与稳定性。
31 5
|
15天前
|
机器学习/深度学习 数据采集 JavaScript
ADR智能监测系统源码,系统采用Java开发,基于SpringBoot框架,前端使用Vue,可自动预警药品不良反应
ADR药品不良反应监测系统是一款智能化工具,用于监测和分析药品不良反应。该系统通过收集和分析病历、处方及实验室数据,快速识别潜在不良反应,提升用药安全性。系统采用Java开发,基于SpringBoot框架,前端使用Vue,具备数据采集、清洗、分析等功能模块,并能生成监测报告辅助医务人员决策。通过集成多种数据源并运用机器学习算法,系统可自动预警药品不良反应,有效减少药害事故,保障公众健康。
ADR智能监测系统源码,系统采用Java开发,基于SpringBoot框架,前端使用Vue,可自动预警药品不良反应
|
9天前
|
设计模式 缓存 算法
Netty框架的重要性
Netty框架的重要性
|
10天前
|
XML 负载均衡 监控
分布式-dubbo-简易版的RPC框架
分布式-dubbo-简易版的RPC框架
|
1月前
|
Java 数据库连接 Apache
Java进阶-主流框架总结与详解
这些仅仅是 Java 众多框架中的一部分。每个框架都有其特定的用途和优势,了解并熟练运用这些框架,对于每一位 Java 开发者来说都至关重要。同时,选择合适框架的关键在于理解框架的设计哲学、核心功能及其在项目中的应用场景。随着技术的不断进步,这些框架也在不断更新和迭代以适应新的开发者需求。
39 1
|
2月前
|
存储 Java 程序员
Java中的集合框架:从入门到精通
【8月更文挑战第30天】在Java的世界里,集合框架是一块基石,它不仅承载着数据的存储和操作,还体现了面向对象编程的精髓。本篇文章将带你遨游Java集合框架的海洋,从基础概念到高级应用,一步步揭示它的奥秘。你将学会如何选择合适的集合类型,掌握集合的遍历技巧,以及理解集合框架背后的设计哲学。让我们一起探索这个强大工具,解锁数据结构的新视角。
|
2月前
|
存储 算法 Java
Java中的集合框架深度解析云上守护:云计算与网络安全的协同进化
【8月更文挑战第29天】在Java的世界中,集合框架是数据结构的代言人。它不仅让数据存储变得优雅而高效,还为程序员提供了一套丰富的工具箱。本文将带你深入理解集合框架的设计哲学,探索其背后的原理,并分享一些实用的使用技巧。无论你是初学者还是资深开发者,这篇文章都将为你打开一扇通往高效编程的大门。
下一篇
无影云桌面