【JAVA】如何基于Netty实现简单的RPC 框架

简介: 【JAVA】如何基于Netty实现简单的RPC 框架

如何基于Netty实现简单的RPC 框架

1. 项目模块与依赖

common 模块依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>myRPC</artifactId>
        <groupId>com.sgg</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>common</artifactId>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
        <!--netty依赖 -->
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
        </dependency>
        <!--json依赖 -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.80</version>
        </dependency>
        <!--lombok依赖 -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.22</version>
        </dependency>
    </dependencies>
</project>

rpc-client模块依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>myRPC</artifactId>
        <groupId>com.sgg</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>rpc-client</artifactId>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>com.sgg</groupId>
            <artifactId>common</artifactId>
            <version>1.0-SNAPSHOT</version>
        </dependency>
    </dependencies>
</project>

rpc-server 模块依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>myRPC</artifactId>
        <groupId>com.sgg</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>rpc-server</artifactId>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>com.sgg</groupId>
            <artifactId>common</artifactId>
            <version>1.0-SNAPSHOT</version>
        </dependency>

        <!--spring相关依赖 -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-autoconfigure</artifactId>
        </dependency>
    </dependencies>

</project>

myRPC

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.sgg</groupId>
    <artifactId>myRPC</artifactId>
    <packaging>pom</packaging>
    <version>1.0-SNAPSHOT</version>
    <modules>
        <module>common</module>
        <module>rpc-client</module>
        <module>rpc-server</module>
    </modules>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.4.RELEASE</version>
    </parent>


</project>

2. common 通用模块

image-20220507102547079

2.1 RpcRequest

package com.sgg.pojo;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * @author sz
 * @DATE 2022/5/6  21:54
 */
@Data
@AllArgsConstructor
@NoArgsConstructor
public class RpcRequest {

    /**
     * 全限定类名
     */
    private String className;


    /**
     * 方法名
     */
    private String methodName;

    /**
     * 参数类型
     */
    private Class<?>[] parameterTypes;


    /**
     * 实参
     */
    private Object[] paramters;
}

2.2 RpcResponse

package com.sgg.pojo;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * @author sz
 * @DATE 2022/5/6  21:54
 */
@Data
@AllArgsConstructor
@NoArgsConstructor
public class RpcResponse {

    //返回状态码
    private Integer code;
    //返回结果
    private String result;
    //错误信息
    private String error;

}

2.3 User

package com.sgg.pojo;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * @author sz
 * @DATE 2022/5/6  21:55
 */
@Data
@AllArgsConstructor
@NoArgsConstructor
public class User {

    private Integer id;
    private String name;

}

2.4 UserService

package com.sgg.service;

import com.sgg.pojo.User;

public interface UserService {
    User getUserById(Integer id);
}

3. rpc-server 服务端模块

image-20220507102807642

3.1 MyServiceRpc

package com.sgg.anno;


import java.lang.annotation.*;

@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface MyServiceRpc {
}

3.2 MyServerHandler

package com.sgg.handler;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.sgg.anno.MyServiceRpc;
import com.sgg.pojo.RpcRequest;
import com.sgg.pojo.RpcResponse;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.springframework.beans.BeansException;
import org.springframework.cglib.reflect.FastClass;
import org.springframework.cglib.reflect.FastMethod;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;

import java.lang.reflect.InvocationTargetException;
import java.util.*;

/**
 * @author sz
 * @DATE 2022/5/6  22:16
 */
@Component
public class MyServerHandler extends SimpleChannelInboundHandler<String> implements ApplicationContextAware {

    private static ApplicationContext app;
    private static HashMap<String, Object> cache = new HashMap<>();

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        app = applicationContext;
        //拿到容器中所有标注了@MyServiceRpc 注解的 bean
        Map<String, Object> beansWithAnnotation = app.getBeansWithAnnotation(MyServiceRpc.class);
        //拿到bean实现的接口的全限定类名
        Set<Map.Entry<String, Object>> entries = beansWithAnnotation.entrySet();
        entries.stream().forEach(ent->{
            Class<?>[] interfaces = ent.getValue().getClass().getInterfaces();
            if (null!=interfaces && interfaces.length != 0){
                Arrays.stream(interfaces).forEach(inter->{
                    cache.put(inter.getName(),ent.getValue());
                });
            }
        });
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("客户端连接 : "+ctx.channel().remoteAddress().toString().substring(1));
    }

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, String json) throws Exception {
        //封装结果
        RpcResponse rpcResponse = new RpcResponse();

        Object result = null;
        try {
            //将json字符串转换为RpcRequest 对象
            RpcRequest rpcRequest = JSONObject.parseObject(json, RpcRequest.class);
            //拿到需要调用的类
            String className = rpcRequest.getClassName();
            Object bean = cache.get(className);
            //需要调用的方法名
            String methodName = rpcRequest.getMethodName();
            //方法参数类型
            Class<?>[] parameterTypes = rpcRequest.getParameterTypes();
            //方法实参
            Object[] paramters = rpcRequest.getParamters();

            //反射调用方法
            FastClass fastClass = FastClass.create(bean.getClass());
            FastMethod fastClassMethod = fastClass.getMethod(methodName, parameterTypes);
            result = fastClassMethod.invoke(bean, paramters);
            rpcResponse.setCode(200);
            rpcResponse.setResult((String) result);
        } catch (Exception e) {
            e.printStackTrace();
            rpcResponse.setCode(400);
            rpcResponse.setError(e.getMessage());
        }

        //将结果用json字符串写回去
        channelHandlerContext.writeAndFlush(JSON.toJSONString(rpcResponse));
    }


}

3.3 ServerProvider

package com.sgg.provider;

import com.sgg.handler.MyServerHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ServerChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import org.springframework.stereotype.Service;

import java.io.Closeable;
import java.io.IOException;
import java.net.ServerSocket;

/**
 * @author sz
 * @DATE 2022/5/6  22:07
 */
@Service
public class ServerProvider implements Closeable {

    private NioEventLoopGroup boss ;
    private NioEventLoopGroup work ;



    public void start(String ip,Integer port)  {
        //创建两个线程组
        boss = new NioEventLoopGroup(1);
        //默认线程数 = CPU数 * 2
        work = new NioEventLoopGroup();

        //创建启动组手
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(boss, work)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            //解析字符串
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            pipeline.addLast(new StringEncoder());
                            pipeline.addLast(new StringDecoder());
                            //内容处理
                            pipeline.addLast(new MyServerHandler());
                        }
                    });

            ChannelFuture channelFuture = serverBootstrap.bind(ip, port).sync();
            System.out.println(">>>>>>>服务器启动成功<<<<<<<<");
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
            if (null!=boss){
                boss.shutdownGracefully();
            }
            if (null!=boss){
                work.shutdownGracefully();
            }
        }
    }


    @Override
    public void close() throws IOException {
        System.out.println("容器关闭我被调用了");
        if (null!=boss){
            boss.shutdownGracefully();
        }
        if (null!=boss){
            work.shutdownGracefully();
        }
    }
}

3.4 UserServiceImpl

package com.sgg.service.impl;

import com.sgg.anno.MyServiceRpc;
import com.sgg.pojo.User;
import com.sgg.service.UserService;
import org.springframework.stereotype.Service;

import java.util.HashMap;

/**
 * @author sz
 * @DATE 2022/5/6  22:18
 */
@MyServiceRpc
@Service
public class UserServiceImpl implements UserService {

    private static HashMap<Integer,User> map = new HashMap();

    static {
        map.put(1,new User(1,"张三"));
        map.put(2,new User(2,"李四"));
    }

    @Override
    public User getUserById(Integer id) {
        return map.get(id);
    }

}

3.5 ServerApp

package com.sgg;

import com.sgg.provider.ServerProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * @author sz
 * @DATE 2022/5/6  22:04
 */
@SpringBootApplication
public class ServerApp implements CommandLineRunner {

    @Autowired
    private ServerProvider serverProvider;

    public static void main(String[] args) {
        SpringApplication.run(ServerApp.class,args);
    }

    @Override
    public void run(String... args) throws Exception {
        new Thread(()->{
                serverProvider.start("127.0.0.1",9999);
        }).start();
    }
}

4. rpc-client 客户端模块

image-20220507103000153

4.1 RpcClient

package com.sgg.client;

import com.sgg.handler.MyClientHandler;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

/**
 * @author sz
 * @DATE 2022/5/6  22:54
 */
@Data
@AllArgsConstructor
@NoArgsConstructor
public class RpcClient {

    private String ip;
    private Integer port;

    public RpcClient(String ip, Integer port) {
        this.ip = ip;
        this.port = port;
        init();
    }

    private NioEventLoopGroup eventLoopGroup;
    private Channel channel;
    private MyClientHandler myClientHandler = new MyClientHandler();
    private ExecutorService executorService = Executors.newCachedThreadPool();

    public Object sendMess(String message) throws ExecutionException, InterruptedException {
        myClientHandler.setRequestMsg(message);
        Future submit = executorService.submit(myClientHandler);
        return submit.get();
    }

   public void init() {
        //创建线程组
        eventLoopGroup = new NioEventLoopGroup();
        //创建启动组手
        Bootstrap bootstrap = new Bootstrap();
        //分组
        try {
            bootstrap.group(eventLoopGroup)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.SO_KEEPALIVE, Boolean.TRUE)
                    .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            pipeline.addLast(new StringEncoder());
                            pipeline.addLast(new StringDecoder());
                            //业务
                            pipeline.addLast(myClientHandler);
                        }
                    });

            channel = bootstrap.connect(ip, port).sync().channel();
        } catch (Exception e) {
            e.printStackTrace();
            if (null != channel) {
                channel.close();
            }
            if (null != eventLoopGroup) {
                eventLoopGroup.shutdownGracefully();
            }
        }
    }

    public void close() {
        if (null != channel) {
            channel.close();
        }
        if (null != eventLoopGroup) {
            eventLoopGroup.shutdownGracefully();
        }
    }
}

4.2 MyClientHandler

package com.sgg.handler;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

import java.util.concurrent.Callable;

/**
 * @author sz
 * @DATE 2022/5/6  23:04
 */
public class MyClientHandler extends SimpleChannelInboundHandler<String> implements Callable {

    private String requestMsg;
    private String responseMsg;

    private ChannelHandlerContext context;

    public void setRequestMsg(String str) {
        this.requestMsg = str;
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        this.context = ctx;
    }

    @Override
    protected synchronized void channelRead0(ChannelHandlerContext channelHandlerContext, String str) throws Exception {
        this.responseMsg = str;
        //唤醒
        notify();
    }

    @Override
    public synchronized Object call() throws Exception {
        this.context.writeAndFlush(requestMsg);
        //线程等待  拿到响应数据
        wait();
        return responseMsg;
    }
}

4.3 RpcProxy

package com.sgg.proxy;

import com.alibaba.fastjson.JSON;
import com.sgg.client.RpcClient;
import com.sgg.pojo.RpcRequest;
import com.sgg.pojo.RpcResponse;
import com.sgg.pojo.User;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;

/**
 * @author sz
 * @DATE 2022/5/6  22:46
 */
public class RpcProxy {

    public static Object createProxy(Class target) {
        return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
                new Class[]{target},
                (Object proxy, Method method, Object[] args) -> {

                    RpcRequest rpcRequest = new RpcRequest();
                    //设置类名
                    rpcRequest.setClassName(method.getDeclaringClass().getName());
                    //设置方法名
                    rpcRequest.setMethodName(method.getName());
                    //设置方法参数类型
                    rpcRequest.setParameterTypes(method.getParameterTypes());
                    //设置方法实际参数
                    rpcRequest.setParamters(args);

                    //发送信息,拿到返回值
                    RpcClient rpcClient = new RpcClient("127.0.0.1", 9999);
                    String mess = (String) rpcClient.sendMess(JSON.toJSONString(rpcRequest));
                    //转换为rpcResponse
                    RpcResponse rpcResponse = JSON.parseObject(mess, RpcResponse.class);
                    //拿到返回结果
                    if (200==rpcResponse.getCode()){
                        return JSON.parseObject(rpcResponse.getResult(), User.class);
                    }

                    return null;
                }
        );
    }

}

4.4 ClientApp

package com.sgg;

import com.sgg.pojo.User;
import com.sgg.proxy.RpcProxy;
import com.sgg.service.UserService;

/**
 * @author sz
 * @DATE 2022/5/6  22:44
 */

public class ClientApp {

    public static void main(String[] args) {

        UserService proxy = (UserService) RpcProxy.createProxy(UserService.class);
        User userById = proxy.getUserById(2);
        System.out.println("userById = " + userById);
    }

}
相关文章
|
2月前
|
Java 数据库
在Java中使用Seata框架实现分布式事务的详细步骤
通过以上步骤,利用 Seata 框架可以实现较为简单的分布式事务处理。在实际应用中,还需要根据具体业务需求进行更详细的配置和处理。同时,要注意处理各种异常情况,以确保分布式事务的正确执行。
|
21天前
|
存储 安全 Java
Java 集合框架中的老炮与新秀:HashTable 和 HashMap 谁更胜一筹?
嗨,大家好,我是技术伙伴小米。今天通过讲故事的方式,详细介绍 Java 中 HashMap 和 HashTable 的区别。从版本、线程安全、null 值支持、性能及迭代器行为等方面对比,帮助你轻松应对面试中的经典问题。HashMap 更高效灵活,适合单线程或需手动处理线程安全的场景;HashTable 较古老,线程安全但性能不佳。现代项目推荐使用 ConcurrentHashMap。关注我的公众号“软件求生”,获取更多技术干货!
39 3
|
2月前
|
消息中间件 Java Kafka
在Java中实现分布式事务的常用框架和方法
总之,选择合适的分布式事务框架和方法需要综合考虑业务需求、性能、复杂度等因素。不同的框架和方法都有其特点和适用场景,需要根据具体情况进行评估和选择。同时,随着技术的不断发展,分布式事务的解决方案也在不断更新和完善,以更好地满足业务的需求。你还可以进一步深入研究和了解这些框架和方法,以便在实际应用中更好地实现分布式事务管理。
|
2月前
|
JSON Java Apache
非常实用的Http应用框架,杜绝Java Http 接口对接繁琐编程
UniHttp 是一个声明式的 HTTP 接口对接框架,帮助开发者快速对接第三方 HTTP 接口。通过 @HttpApi 注解定义接口,使用 @GetHttpInterface 和 @PostHttpInterface 等注解配置请求方法和参数。支持自定义代理逻辑、全局请求参数、错误处理和连接池配置,提高代码的内聚性和可读性。
213 3
|
5天前
|
并行计算 算法 Java
Java中的Fork/Join框架详解
Fork/Join框架是Java并行计算的强大工具,尤其适用于需要将任务分解为子任务的场景。通过正确使用Fork/Join框架,可以显著提升应用程序的性能和响应速度。在实际应用中,应结合具体需求选择合适的任务拆分策略,以最大化并行计算的效率。
36 23
|
2月前
|
人工智能 前端开发 Java
基于开源框架Spring AI Alibaba快速构建Java应用
本文旨在帮助开发者快速掌握并应用 Spring AI Alibaba,提升基于 Java 的大模型应用开发效率和安全性。
269 12
基于开源框架Spring AI Alibaba快速构建Java应用
|
2月前
|
存储 缓存 安全
Java 集合框架优化:从基础到高级应用
《Java集合框架优化:从基础到高级应用》深入解析Java集合框架的核心原理与优化技巧,涵盖列表、集合、映射等常用数据结构,结合实际案例,指导开发者高效使用和优化Java集合。
56 4
|
2月前
|
消息中间件 Java 数据库连接
Java 反射最全详解 ,框架设计必掌握!
本文详细解析Java反射机制,包括反射的概念、用途、实现原理及应用场景。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
Java 反射最全详解 ,框架设计必掌握!
|
3月前
|
前端开发 Java 数据库连接
Spring 框架:Java 开发者的春天
Spring 框架是一个功能强大的开源框架,主要用于简化 Java 企业级应用的开发,由被称为“Spring 之父”的 Rod Johnson 于 2002 年提出并创立,并由Pivotal团队维护。
116 1
Spring 框架:Java 开发者的春天
|
2月前
|
开发框架 Java 关系型数据库
Java哪个框架适合开发API接口?
在快速发展的软件开发领域,API接口连接了不同的系统和服务。Java作为成熟的编程语言,其生态系统中出现了许多API开发框架。Magic-API因其独特优势和强大功能,成为Java开发者优选的API开发框架。本文将从核心优势、实际应用价值及未来展望等方面,深入探讨Magic-API为何值得选择。
91 2