由浅入深RPC通信原理实战2

本文涉及的产品
传统型负载均衡 CLB,每月750个小时 15LCU
网络型负载均衡 NLB,每月750个小时 15LCU
应用型负载均衡 ALB,每月750个小时 15LCU
简介: 由浅入深RPC通信原理实战2

3.6 超时重试机制

异常重试需要关注的点:

  • 保证被重试的业务服务是具有幂等性的;
  • 超时重试前重置计时;
  • 针对业务返回的异常,设置重试是异常名单;
  • 重试时负载均衡选取节点时要剔除前一次访问的节点

3.7 时间轮算法

概念

在时钟轮机制中,有时间槽和时钟轮的概念,时间槽就相当于时钟的刻度;而时钟轮就相当于指针跳动的一个周期,我们可以将每个任务放到对应的时间槽位上。

1、每个任务会按要求只扫描执行一次,能很好的解决CPU 浪费的问题

2、秒级轮,分钟轮,小时轮除了用于检测rpc调用是否超时,也可以将定时心跳的任务添加到时间轮中,当前时间的心跳执行完后再将下一秒的心跳任务添加到时间轮中,这样就能做到每秒的定时心跳

3.8 负载均衡策略

用途

RPC Server为了高可用,可用选择做集群,因此在RPC Client端调用时要使用相应的均衡策略,这属于客户端负载均衡。

dubbo的负载均衡方法:

  • 基于权重随机算法:将请求按照权重进行分配,权重越大,分配的越多。
  • 基于最小活跃调用数算法:活跃少越少,表明该服务提供者效率越高,单位时间内可处理更多的请求。
  • 基于hash一致性:适用于服务有状态的场景。
  • 基于加权轮询算法:权重越大,节点选中的更多。

3.9 熔断限流

熔断作用

熔断器如同电力过载保护器。它可以实现快速失败,如果它在一段时间内侦测到许多类似的错误,会强迫其以后的多个调用快速失败,不再访问远程服务器,从而防止应用程序不断地尝试执行可能会失败的操作,使得应用程序继续执行而不用等待修正错误,或者浪费CPU时间去等到长时间的超时产生。熔断器也可以使应用程序能够诊断错误是否已经修正,如果已经修正,应用程序会再次尝试恢复调用操作

限流

作用

实际生产环境中,每个服务节点都可能由于访问量过大而引起一系列问题,就需要业务提供方能够进行自我保护,从而保证在高访问量、高并发的场景下,系统依然能够稳定,高效运行。

限流器的作用是用来限制其请求的速率,保护后台响应服务,以免服务过载导致服务不可用现象出现。

3.10 滑动窗口算法

3.11 限流组件

4 RPC框架简易实现

4.1 服务端

服务端提供客户端所期待的服务,一般包括三个部分:服务接口,服务实现以及服务的注册暴露三部分,如下:服务接口

public interface HelloService {
    String hello(String name);
    String hi(String msg);
}

服务实现

public class HelloServiceImpl implements HelloService{
    @Override
    public String hello(String name) {
        return "Hello " + name;
    }
    @Override
    public String hi(String msg) {
        return "Hi, " + msg;
    }
}

服务暴露:只有把服务暴露出来,才能让客户端进行调用,这是RPC框架功能之一。

public class RpcProvider {
    public static void main(String[] args) throws Exception {
        HelloService service = new HelloServiceImpl();
        // RPC框架将服务暴露出来,供客户端消费
        RpcFramework.export(service, 1234);
    }
}

4.2 客户端

客户端消费服务端所提供的服务,一般包括两个部分:服务接口和服务引用两个部分,如下:服务接口:与服务端共享同一个服务接口

public interface HelloService {
  String hello(String name);
  String hi(String msg);
}

服务引用:消费端通过RPC框架进行远程调用,这也是RPC框架功能之一

public class RpcConsumer {
    public static void main(String[] args) throws Exception {
        // 由RpcFramework生成的HelloService的代理
        HelloService service = RpcFramework.refer(HelloService.class, "127.0.0.1", 1234);
        String hello = service.hello("World");
        System.out.println("客户端收到远程调用的结果 : " + hello);
    }
}

4.3 RPC框架原型实现

RPC框架主要包括两大功能:一个用于服务端暴露服务,一个用于客户端引用服务。服务端暴露服务

    /**
     * 暴露服务
     *
     * @param service 服务实现
     * @param port    服务端口
     * @throws Exception
     */
    public static void export(final Object service, int port) throws Exception {
        if (service == null) {
            throw new IllegalArgumentException("service instance == null");
        }
        if (port <= 0 || port > 65535) {
            throw new IllegalArgumentException("Invalid port " + port);
        }
        System.out.println("Export service " + service.getClass().getName() + " on port " + port);
        // 建立Socket服务端
        ServerSocket server = new ServerSocket(port);
        for (; ; ) {
            try {
                // 监听Socket请求
                final Socket socket = server.accept();
                new Thread(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            try {
                                /* 获取请求流,Server解析并获取请求*/
                                // 构建对象输入流,从源中读取对象到程序中
                                ObjectInputStream input = new ObjectInputStream(
                                    socket.getInputStream());
                                try {
                                    System.out.println("\nServer解析请求 : ");
                                    String methodName = input.readUTF();
                                    System.out.println("methodName : " + methodName);
                                    // 泛型与数组是不兼容的,除了通配符作泛型参数以外
                                    Class<?>[] parameterTypes = (Class<?>[])input.readObject();
                                    System.out.println(
                                        "parameterTypes : " + Arrays.toString(parameterTypes));
                                    Object[] arguments = (Object[])input.readObject();
                                    System.out.println("arguments : " + Arrays.toString(arguments));
                                    /* Server 处理请求,进行响应*/
                                    ObjectOutputStream output = new ObjectOutputStream(
                                        socket.getOutputStream());
                                    try {
                                        // service类型为Object的(可以发布任何服务),故只能通过反射调用处理请求
                                        // 反射调用,处理请求
                                        Method method = service.getClass().getMethod(methodName,
                                            parameterTypes);
                                        Object result = method.invoke(service, arguments);
                                        System.out.println("\nServer 处理并生成响应 :");
                                        System.out.println("result : " + result);
                                        output.writeObject(result);
                                    } catch (Throwable t) {
                                        output.writeObject(t);
                                    } finally {
                                        output.close();
                                    }
                                } finally {
                                    input.close();
                                }
                            } finally {
                                socket.close();
                            }
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                }).start();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

从该RPC框架的简易实现来看,RPC服务端逻辑是:首先创建ServerSocket负责监听特定端口并接收客户连接请求,然后使用Java原生的序列化/反序列化机制来解析得到请求,包括所调用方法的名称、参数列表和实参,最后反射调用服务端对服务接口的具体实现并将得到的结果回传至客户端。至此,一次简单PRC调用的服务端流程执行完毕。

客户端引用服务

    /**
     * 引用服务
     *
     * @param <T>            接口泛型
     * @param interfaceClass 接口类型
     * @param host           服务器主机名
     * @param port           服务器端口
     * @return 远程服务,返回代理对象
     * @throws Exception
     */
    @SuppressWarnings("unchecked")
    public static <T> T refer(final Class<T> interfaceClass, final String host, final int port)
        throws Exception {
        if (interfaceClass == null) {
            throw new IllegalArgumentException("Interface class == null");
        }
        // JDK 动态代理的约束,只能实现对接口的代理
        if (!interfaceClass.isInterface()) {
            throw new IllegalArgumentException(
                "The " + interfaceClass.getName() + " must be interface class!");
        }
        if (host == null || host.length() == 0) {
            throw new IllegalArgumentException("Host == null!");
        }
        if (port <= 0 || port > 65535) {
            throw new IllegalArgumentException("Invalid port " + port);
        }
        System.out.println(
            "Get remote service " + interfaceClass.getName() + " from server " + host + ":" + port);
        // JDK 动态代理
        T proxy = (T)Proxy.newProxyInstance(interfaceClass.getClassLoader(),
            new Class<?>[] {interfaceClass}, new InvocationHandler() {
                // invoke方法本意是对目标方法的增强,在这里用于发送RPC请求和接收响应
                @Override
                public Object invoke(Object proxy, Method method, Object[] arguments)
                    throws Throwable {
                    // 创建Socket客户端,并与服务端建立链接
                    Socket socket = new Socket(host, port);
                    try {
                        /* 客户端像服务端进行请求,并将请求参数写入流中*/
                        // 将对象写入到对象输出流,并将其发送到Socket流中去
                        ObjectOutputStream output = new ObjectOutputStream(
                            socket.getOutputStream());
                        try {
                            // 发送请求
                            System.out.println("\nClient发送请求 : ");
                            output.writeUTF(method.getName());
                            System.out.println("methodName : " + method.getName());
                            output.writeObject(method.getParameterTypes());
                            System.out.println("parameterTypes : " + Arrays.toString(method
                                .getParameterTypes()));
                            output.writeObject(arguments);
                            System.out.println("arguments : " + Arrays.toString(arguments));
                            /* 客户端读取并返回服务端的响应*/
                            ObjectInputStream input = new ObjectInputStream(
                                socket.getInputStream());
                            try {
                                Object result = input.readObject();
                                if (result instanceof Throwable) {
                                    throw (Throwable)result;
                                }
                                System.out.println("\nClient收到响应 : ");
                                System.out.println("result : " + result);
                                return result;
                            } finally {
                                input.close();
                            }
                        } finally {
                            output.close();
                        }
                    } finally {
                        socket.close();
                    }
                }
            });
        return proxy;
    }

从该RPC框架的简易实现来看,RPC客户端逻辑是:首先创建Socket客户端并与服务端建立链接,然后使用Java原生的序列化/反序列化机制将调用请求发送给客户端,包括所调用方法的名称、参数列表将服务端的响应返回给用户即可。至此,一次简单PRC调用的客户端流程执行完毕。特别地,从代码实现来看,实现透明的PRC调用的关键就是 动态代理,这是RPC框架实现的灵魂所在。RPC原型实现

public class RpcFramework {
    /**
     * 暴露服务
     *
     * @param service 服务实现
     * @param port    服务端口
     * @throws Exception
     */
    public static void export(final Object service, int port) throws Exception {
        if (service == null) {
            throw new IllegalArgumentException("service instance == null");
        }
        if (port <= 0 || port > 65535) {
            throw new IllegalArgumentException("Invalid port " + port);
        }
        System.out.println("Export service " + service.getClass().getName() + " on port " + port);
        // 建立Socket服务端
        ServerSocket server = new ServerSocket(port);
        for (; ; ) {
            try {
                // 监听Socket请求
                final Socket socket = server.accept();
                new Thread(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            try {
                                /* 获取请求流,Server解析并获取请求*/
                                // 构建对象输入流,从源中读取对象到程序中
                                ObjectInputStream input = new ObjectInputStream(
                                    socket.getInputStream());
                                try {
                                    System.out.println("\nServer解析请求 : ");
                                    String methodName = input.readUTF();
                                    System.out.println("methodName : " + methodName);
                                    // 泛型与数组是不兼容的,除了通配符作泛型参数以外
                                    Class<?>[] parameterTypes = (Class<?>[])input.readObject();
                                    System.out.println(
                                        "parameterTypes : " + Arrays.toString(parameterTypes));
                                    Object[] arguments = (Object[])input.readObject();
                                    System.out.println("arguments : " + Arrays.toString(arguments));
                                    /* Server 处理请求,进行响应*/
                                    ObjectOutputStream output = new ObjectOutputStream(
                                        socket.getOutputStream());
                                    try {
                                        // service类型为Object的(可以发布任何服务),故只能通过反射调用处理请求
                                        // 反射调用,处理请求
                                        Method method = service.getClass().getMethod(methodName,
                                            parameterTypes);
                                        Object result = method.invoke(service, arguments);
                                        System.out.println("\nServer 处理并生成响应 :");
                                        System.out.println("result : " + result);
                                        output.writeObject(result);
                                    } catch (Throwable t) {
                                        output.writeObject(t);
                                    } finally {
                                        output.close();
                                    }
                                } finally {
                                    input.close();
                                }
                            } finally {
                                socket.close();
                            }
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                }).start();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    /**
     * 引用服务
     *
     * @param <T>            接口泛型
     * @param interfaceClass 接口类型
     * @param host           服务器主机名
     * @param port           服务器端口
     * @return 远程服务,返回代理对象
     * @throws Exception
     */
    @SuppressWarnings("unchecked")
    public static <T> T refer(final Class<T> interfaceClass, final String host, final int port)
        throws Exception {
        if (interfaceClass == null) {
            throw new IllegalArgumentException("Interface class == null");
        }
        // JDK 动态代理的约束,只能实现对接口的代理
        if (!interfaceClass.isInterface()) {
            throw new IllegalArgumentException(
                "The " + interfaceClass.getName() + " must be interface class!");
        }
        if (host == null || host.length() == 0) {
            throw new IllegalArgumentException("Host == null!");
        }
        if (port <= 0 || port > 65535) {
            throw new IllegalArgumentException("Invalid port " + port);
        }
        System.out.println(
            "Get remote service " + interfaceClass.getName() + " from server " + host + ":" + port);
        // JDK 动态代理
        T proxy = (T)Proxy.newProxyInstance(interfaceClass.getClassLoader(),
            new Class<?>[] {interfaceClass}, new InvocationHandler() {
                // invoke方法本意是对目标方法的增强,在这里用于发送RPC请求和接收响应
                @Override
                public Object invoke(Object proxy, Method method, Object[] arguments)
                    throws Throwable {
                    // 创建Socket客户端,并与服务端建立链接
                    Socket socket = new Socket(host, port);
                    try {
                        /* 客户端像服务端进行请求,并将请求参数写入流中*/
                        // 将对象写入到对象输出流,并将其发送到Socket流中去
                        ObjectOutputStream output = new ObjectOutputStream(
                            socket.getOutputStream());
                        try {
                            // 发送请求
                            System.out.println("\nClient发送请求 : ");
                            output.writeUTF(method.getName());
                            System.out.println("methodName : " + method.getName());
                            output.writeObject(method.getParameterTypes());
                            System.out.println("parameterTypes : " + Arrays.toString(method
                                .getParameterTypes()));
                            output.writeObject(arguments);
                            System.out.println("arguments : " + Arrays.toString(arguments));
                            /* 客户端读取并返回服务端的响应*/
                            ObjectInputStream input = new ObjectInputStream(
                                socket.getInputStream());
                            try {
                                Object result = input.readObject();
                                if (result instanceof Throwable) {
                                    throw (Throwable)result;
                                }
                                System.out.println("\nClient收到响应 : ");
                                System.out.println("result : " + result);
                                return result;
                            } finally {
                                input.close();
                            }
                        } finally {
                            output.close();
                        }
                    } finally {
                        socket.close();
                    }
                }
            });
        return proxy;
    }
}


相关实践学习
部署高可用架构
本场景主要介绍如何使用云服务器ECS、负载均衡SLB、云数据库RDS和数据传输服务产品来部署多可用区高可用架构。
负载均衡入门与产品使用指南
负载均衡(Server Load Balancer)是对多台云服务器进行流量分发的负载均衡服务,可以通过流量分发扩展应用系统对外的服务能力,通过消除单点故障提升应用系统的可用性。 本课程主要介绍负载均衡的相关技术以及阿里云负载均衡产品的使用方法。
目录
相关文章
|
1月前
|
缓存 Java 中间件
jvm性能调优实战 -55RPC调用引发的OOM故障
jvm性能调优实战 -55RPC调用引发的OOM故障
67 0
|
9月前
|
Dubbo Java 应用服务中间件
由浅入深RPC通信原理实战1
由浅入深RPC通信原理实战1
43 0
|
11天前
|
消息中间件 Linux Android开发
实战高效RPC方案在嵌入式环境中的应用与揭秘
该文介绍了在嵌入式环境中应用和设计高效RPC方案的过程。作者参考了Android的Binder机制,采用共享环形缓冲区来解决进程间同步返回值的问题。选择共享内存是因为其零拷贝、低延迟和灵活访问模式的优势,而环形缓冲区则提供了FIFO特性,便于数据有序传输并优化内存管理。文中提到了关键接口`write`和`read`的实现,以及一个简单的`CalculateSum`接口调用示例,展示了RPC方案的实际效果。该方案旨在提供一种轻量级、高性能的嵌入式RPC通信方法。
|
10天前
|
网络协议 网络架构
RPC原理解析
RPC原理解析
15 0
|
1月前
|
Java fastjson 数据安全/隐私保护
【Dubbo3技术专题】「云原生微服务开发实战」 一同探索和分析研究RPC服务的底层原理和实现
【Dubbo3技术专题】「云原生微服务开发实战」 一同探索和分析研究RPC服务的底层原理和实现
61 0
|
1月前
|
消息中间件 缓存 API
|
1月前
|
Go
Go语言RPC实战:打造自己的远程调用服务
Go语言RPC实战:打造自己的远程调用服务
54 0
|
1月前
|
消息中间件 Dubbo Java
Simple RPC - 01 框架原理及总体架构初探
Simple RPC - 01 框架原理及总体架构初探
58 0
|
8月前
|
缓存 Java API
02RPC - socket nio原理
02RPC - socket nio原理
26 0
|
1月前
|
负载均衡 Dubbo Java
Dubbo 3.x:探索阿里巴巴的开源RPC框架新技术
随着微服务架构的兴起,远程过程调用(RPC)框架成为了关键组件。Dubbo,作为阿里巴巴的开源RPC框架,已经演进到了3.x版本,带来了许多新特性和技术改进。本文将探讨Dubbo 3.x中的一些最新技术,包括服务注册与发现、负载均衡、服务治理等,并通过代码示例展示其使用方式。
127 9