Netty实战 -- 使用Netty实现分布式框架Dubbo RPC

简介: Netty实战 -- 使用Netty实现分布式框架Dubbo RPC
📢📢📢📣📣📣

哈喽!大家好,我是【 Bug 终结者,【CSDN新星创作者】🏆,阿里云技术博主🏆,51CTO人气博主🏆,INfoQ写作专家🏆 <br/>
一位上进心十足,拥有极强学习力的【 Java领域博主】😜😜😜 <br/>
🏅【Bug 终结者】博客的领域是【面向后端技术】的学习,未来会持续更新更多的【后端技术】以及【学习心得】。 偶尔会分享些前端基础知识,会更新实战项目,面向企业级开发应用
🏅 如果有对【后端技术】、【前端领域】感兴趣的【小可爱】,欢迎关注【Bug 终结者】💞💞💞


❤️❤️❤️ 感谢各位大可爱小可爱! ❤️❤️❤️

@[TOC]

⚡Netty系列文章

Netty实战系列文章

⚡Dubbo系列文章

Spring Boot 整合Dubbo + Zookeeper 实现分布式 消费者与服务者的业务调用

一、什么是RPC?

RPC【Remote Procedure Call】是指远程过程调用,是一种进程间通信方式,他是一种技术的思想,而不是规范。它允许程序调用另一个地址空间(通常是共享网络的另一台机器上)的过程或函数,而不用程序员显式编码这个远程调用的细节。即程序员无论是调用本地的还是远程的函数,本质上编写的调用代码基本相同。

常见的RPC框架有: 比较知名的阿里Dubbo、Google的GRPC、Go的RPCX、Apache的Thrift,SpringCloud
在这里插入图片描述

1.1 RPC基本原理

两个或多个应用程序都分布在不同的服务器上,它们之间的调用都像是本地方法调用一样

示意图

在这里插入图片描述

RPC两个核心模块:序列化和通讯

1.2 RPC执行流程

在RPC中,Client叫做服务消费者,Server叫做服务提供者

RPC调用流程说明

  1. 服务消费方(client),以本地调用方式调用服务
  2. client stub 接收到调用后负责将方法、参数等封装成能够进行网络传输的消息体
  3. client stub 将消息进行编码并发送到服务器
  4. server stub 收到消息后进行解码
  5. server stub 根据解码结果调用本地API
  6. 本地服务 执行并返回结果给 server stub
  7. server stub 将返回结果导入进行编码并发送至消费方
  8. client stub 接收到数据进行解码
  9. 服务消费方得到结果

RPC的目标就是将2~8的步骤封装起来,用户无需关注这些细节,可以像调用本地方法一样即可完成远程服务调用

图解

在这里插入图片描述

二、什么是代理模式?

代理模式的定义:代理模式给某一个对象提供一个代理对象,并由代理对象控制对原对象的引用。通俗的来讲代理模式就是我们生活中常见的中介。

Java中代理模式分为静态代理动态代理模式

2.1 案例实现

中午到了,小明很饿,于是在美团外卖上点了一份烤鸭,过了半个小时,外卖到了,小明下去拿外卖,顺利的吃上了烤鸭~

2.2 静态代理方式

由程序员手动创建代理类或工具对象,从而实现调用服务

Subject类

package com.wanshi.netty.dubborpc.netty;

public interface Subject {

    String buy(String msg);
}

SubjectImpl类

package com.wanshi.netty.dubborpc.netty;

public class SubjectImpl implements Subject{
    @Override
    public String buy(String msg) {
        return "买了" + msg;
    }
}

ProxySubject类

package com.wanshi.netty.dubborpc.netty;


public class ProxySubject {

    private Subject subject;

    {
        subject = new SubjectImpl();
    }

    public void buy(String msg) {
        System.out.println("美团外卖,使命必达,跑腿代买!");
        String buy = subject.buy(msg);
        System.out.println(buy);
    }
}

测试类

public static void main(String[] args) {
    ProxySubject subject = new ProxySubject();
    subject.buy("北京烤鸭");
}

效果

在这里插入图片描述

⛽静态代理的优缺点

缺点: 不利于扩展,调用一次就要创建一次对象,从而造成不必要的内存空间浪费

优点: 可读性好,逻辑简单,清晰明了

2.3 动态代理方式

动态代理又分为:JDK动态代理CGLIB动态代理

在程序运行时,运用反射机制动态创建而成,达到调用服务

使用以上的Subject类和实现类

SubjectInvocationHandler处理器类

package com.wanshi.netty.dubborpc.netty;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;

public class SubjectInvocationHandler implements InvocationHandler {

    private Object obj;

    public SubjectInvocationHandler(Object obj) {
        this.obj = obj;
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        Object res = method.invoke(obj, args);
        return res;
    }
}

ProxyFactry工厂类

package com.wanshi.netty.dubborpc.netty;

import java.lang.reflect.Proxy;

public class ProxyFactry {

    public static Subject getInstance() {
        SubjectImpl subject = new SubjectImpl();
        System.out.println("美团外卖,使命必达,跑腿代买!");
        SubjectInvocationHandler subjectInvocationHandler = new SubjectInvocationHandler(subject);
        Subject proxy = (Subject) Proxy.newProxyInstance(subject.getClass().getClassLoader(),
                subject.getClass().getInterfaces(), subjectInvocationHandler);
        return proxy;
    }
}

测试类

public static void main(String[] args) {
    Subject subject = ProxyFactry.getInstance();
    String buy = subject.buy("饮料");
    System.out.println(buy);
}

效果
在这里插入图片描述

⛽动态代理的优缺点

两种动态代理对照表

JDK原生动态代理 CGLIB动态代理
核心原理 基于 ”接口实现“方式 基于类集成方式
优点 Java原生支持的,不需要任何依赖 对与代理的目标对象无限制,无需实现接口
不足之处 只能基于接口实现 无法处理final方法
实现方式 Java原生支持 需要引入Jar文件依赖

三、Netty实现DubboRPC

3.1 需求说明

  1. dubbo 底层使用了Netty作为网络通讯框架,要求用Netty实现一个简单的RPC框架
  2. 模仿dubbo,消费者和提供者约定接口和协议,消费者远程调用提供者的服务,提供者返回一个字符串,消费者打印提供者返回的数据,底层使用Netty框架

3.2 剖析需求

  1. 创建接口,定义抽象方法,用于服务消费者与服务提供者之间的约定
  2. 创建一个提供者,该类需要监听消费者的请求,并按照约定返回数据
  3. 创建一个消费者,该类需要 透明的调用自己不存在的方法,内部使用Netty 请求提供者返回数据

在这里插入图片描述

3.3 效果图

在这里插入图片描述

3.4 核心源码

♻️共用接口API

HelloService

package com.wanshi.netty.dubborpc.publicinterface;

/**
 * 公共接口,提供服务
 */
public interface HelloService {

    String hello(String msg);
}

♻️服务提供者

ServerBootstrap启动类

package com.wanshi.netty.dubborpc.provider;

import com.wanshi.netty.dubborpc.netty.NettyServer;

/**
 * 服务提供者启动类,监听消费者,并绑定端口8888
 */
public class ServerBootstrap {

    public static void main(String[] args) {
        NettyServer.startServer("127.0.0.1", 8888);
    }
}

NettyServer

package com.wanshi.netty.dubborpc.netty;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

/**
 * Netty服务器类,启动服务
 */
public class NettyServer {


    /**
     * 开启服务方法,调用内部私有启动服务方法,此类写法很常用,进一层的封装了API
     * @param hostName
     * @param port
     */
    public static void startServer(String hostName, int port) {
        startServer0(hostName, port);
    }

    /**
     * 真正启动服务的方法
     * @param hostName
     * @param port
     */
    private static void startServer0(String hostName, int port) {
        //创建2个线程,一个为主线程仅创建1个,另外创建工作线程CPU核数*2个
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            //服务器启动类
            ServerBootstrap serverBootstrap = new ServerBootstrap();

            //初始化参数
            serverBootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(new StringDecoder());
                            pipeline.addLast(new StringEncoder());
                            pipeline.addLast(new NettyServerHandler());
                        }
                    });

            System.out.println("服务端提供服务准备就绪...");

            //绑定端口,启动服务,异步执行方法
            ChannelFuture channelFuture = serverBootstrap.bind(hostName, port).sync();
            channelFuture.channel().closeFuture().sync();
        }  catch (Exception e) {
            e.printStackTrace();
        } finally {
            //优雅的关闭线程
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

♻️服务消费者

ClientBootstrap

package com.wanshi.netty.dubborpc.consumer;

import com.wanshi.netty.dubborpc.netty.NettyClient;
import com.wanshi.netty.dubborpc.publicinterface.HelloService;

public class ClientBootstrap {

    public static final String providerName = "hello#";

    public static void main(String[] args) throws InterruptedException {
        NettyClient client = new NettyClient();

        HelloService service = (HelloService) client.getBean(HelloService.class, providerName);

        for (;;) {
            Thread.sleep(2000);
            String hello = service.hello("你好鸭 dubbo~");
            System.out.println("服务端返回的结果:" + hello + "\n\n");
        }
    }
}

NettyClient

package com.wanshi.netty.dubborpc.netty;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

import java.lang.reflect.Proxy;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class NettyClient {

    //创建线程池,大小为CPU核数*2
    private ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

    //将客户端处理器提升至全局变量
    private NettyClientHandler clientHandler;

    //记录调用服务的次数
    private int count;

    /**
     * 代理对象,执行方法,这里用到了代理模式
     * @param serviceClass
     * @param providerName
     * @return
     */
    public Object getBean(final Class<?> serviceClass, String providerName) {
        return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
                new Class<?>[]{serviceClass}, (proxy, method, args) -> {

                    System.out.println("(proxy, method, args) 进入,第" + (++count) + "次调用远程服务");
                    if (clientHandler == null) {
                        initClient();
                    }
                    clientHandler.setParam(providerName + args[0]);
                    return executorService.submit(clientHandler).get();
                });
    }

    /**
     * 初始化客户端
     */
    public void initClient() {
        clientHandler = new NettyClientHandler();
        EventLoopGroup group = new NioEventLoopGroup();

        Bootstrap bootstrap = new Bootstrap();

        bootstrap.group(group)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.TCP_NODELAY, true)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ChannelPipeline pipeline = ch.pipeline();
                        pipeline.addLast(new StringDecoder());
                        pipeline.addLast(new StringEncoder());
                        pipeline.addLast(clientHandler);
                    }
                });

        try {
            bootstrap.connect("127.0.0.1", 8888).sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

四、源码下载

本教程已上传至GitHub代码托管平台,希望点个Star鸭~

NettyDubboRPC

使用Git爬取GitHub文件

♨️往期精彩热文回顾

✈️ **[Netty进阶 -- WebSocket长连接开发
](https://blog.csdn.net/weixin_45526437/article/details/123351605?spm=1001.2014.3001.5502)**
✈️ **[Netty进阶 -- 非阻塞网络编程 实现群聊+私聊+心跳检测系统
](https://blog.csdn.net/weixin_45526437/article/details/123197281?spm=1001.2014.3001.5502)**

✈️ **[一分钟教你快速 搭建Vue脚手架(Vue-Cli)项目并整合ElementUI
](https://blog.csdn.net/weixin_45526437/article/details/123247055?spm=1001.2014.3001.5502)**

⛵小结

以上就是【Bug 终结者】对基于Netty实现Dubbo RPC 简单的概述,手写Dubbo RPC,代码有点难度,但坚持的啃下来,多敲上几遍,熟能生巧,加油,相信你会对RPC有一个新的理解,加油,编程路上,你我都是追梦人~

如果这篇【文章】有帮助到你,希望可以给【 Bug 终结者】点个赞👍,创作不易,如果有对【 后端技术】、【 前端领域】感兴趣的小可爱,也欢迎关注❤️❤️❤️ 【 Bug 终结者】❤️❤️❤️,我将会给你带来巨大的【收获与惊喜】💝💝💝!
相关文章
|
9月前
|
数据采集 缓存 NoSQL
分布式新闻数据采集系统的同步效率优化实战
本文介绍了一个针对高频新闻站点的分布式爬虫系统优化方案。通过引入异步任务机制、本地缓存池、Redis pipeline 批量写入及身份池策略,系统采集效率提升近两倍,数据同步延迟显著降低,实现了分钟级热点追踪能力,为实时舆情监控与分析提供了高效、稳定的数据支持。
368 1
分布式新闻数据采集系统的同步效率优化实战
|
9月前
|
人工智能 Kubernetes 数据可视化
Kubernetes下的分布式采集系统设计与实战:趋势监测失效引发的架构进化
本文回顾了一次关键词监测任务在容器集群中失效的全过程,分析了中转IP复用、调度节奏和异常处理等隐性风险,并提出通过解耦架构、动态IP分发和行为模拟优化采集策略,最终实现稳定高效的数据抓取与分析。
162 2
Kubernetes下的分布式采集系统设计与实战:趋势监测失效引发的架构进化
|
10月前
|
缓存 NoSQL 算法
高并发秒杀系统实战(Redis+Lua分布式锁防超卖与库存扣减优化)
秒杀系统面临瞬时高并发、资源竞争和数据一致性挑战。传统方案如数据库锁或应用层锁存在性能瓶颈或分布式问题,而基于Redis的分布式锁与Lua脚本原子操作成为高效解决方案。通过Redis的`SETNX`实现分布式锁,结合Lua脚本完成库存扣减,确保操作原子性并大幅提升性能(QPS从120提升至8,200)。此外,分段库存策略、多级限流及服务降级机制进一步优化系统稳定性。最佳实践包括分层防控、黄金扣减法则与容灾设计,强调根据业务特性灵活组合技术手段以应对高并发场景。
2729 7
|
11月前
|
监控 Java 调度
SpringBoot中@Scheduled和Quartz的区别是什么?分布式定时任务框架选型实战
本文对比分析了SpringBoot中的`@Scheduled`与Quartz定时任务框架。`@Scheduled`轻量易用,适合单机简单场景,但存在多实例重复执行、无持久化等缺陷;Quartz功能强大,支持分布式调度、任务持久化、动态调整和失败重试,适用于复杂企业级需求。文章通过特性对比、代码示例及常见问题解答,帮助开发者理解两者差异,合理选择方案。记住口诀:单机简单用注解,多节点上Quartz;若是任务要可靠,持久化配置不能少。
968 4
|
11月前
|
消息中间件 运维 Kafka
直播预告|Kafka+Flink 双引擎实战:手把手带你搭建分布式实时分析平台!
直播预告|Kafka+Flink 双引擎实战:手把手带你搭建分布式实时分析平台!
313 11
|
11月前
|
消息中间件 运维 Kafka
直播预告|Kafka+Flink双引擎实战:手把手带你搭建分布式实时分析平台!
在数字化转型中,企业亟需从海量数据中快速提取价值并转化为业务增长动力。5月15日19:00-21:00,阿里云三位技术专家将讲解Kafka与Flink的强强联合方案,帮助企业零门槛构建分布式实时分析平台。此组合广泛应用于实时风控、用户行为追踪等场景,具备高吞吐、弹性扩缩容及亚秒级响应优势。直播适合初学者、开发者和数据工程师,参与还有机会领取定制好礼!扫描海报二维码或点击链接预约直播:[https://developer.aliyun.com/live/255088](https://developer.aliyun.com/live/255088)
666 35
直播预告|Kafka+Flink双引擎实战:手把手带你搭建分布式实时分析平台!
|
数据采集 存储 数据可视化
分布式爬虫框架Scrapy-Redis实战指南
本文介绍如何使用Scrapy-Redis构建分布式爬虫系统,采集携程平台上热门城市的酒店价格与评价信息。通过代理IP、Cookie和User-Agent设置规避反爬策略,实现高效数据抓取。结合价格动态趋势分析,助力酒店业优化市场策略、提升服务质量。技术架构涵盖Scrapy-Redis核心调度、代理中间件及数据解析存储,提供完整的技术路线图与代码示例。
1518 0
分布式爬虫框架Scrapy-Redis实战指南
|
消息中间件 分布式计算 并行计算
Python 高级编程与实战:构建分布式系统
本文深入探讨了 Python 中的分布式系统,介绍了 ZeroMQ、Celery 和 Dask 等工具的使用方法,并通过实战项目帮助读者掌握这些技术。ZeroMQ 是高性能异步消息库,支持多种通信模式;Celery 是分布式任务队列,支持异步任务执行;Dask 是并行计算库,适用于大规模数据处理。文章结合具体代码示例,帮助读者理解如何使用这些工具构建分布式系统。
|
机器学习/深度学习 分布式计算 API
Python 高级编程与实战:深入理解并发编程与分布式系统
在前几篇文章中,我们探讨了 Python 的基础语法、面向对象编程、函数式编程、元编程、性能优化、调试技巧、数据科学、机器学习、Web 开发、API 设计、网络编程和异步IO。本文将深入探讨 Python 在并发编程和分布式系统中的应用,并通过实战项目帮助你掌握这些技术。