构建异步高并发服务器:Netty与Spring Boot的完美结合

简介: 构建异步高并发服务器:Netty与Spring Boot的完美结合

ChatGPT体验地址

IO

在Java基础中,IO流是一个重要操作,先上八股


BIO:传统的IO,同步阻塞,一个连接一个线程。一般不怎么使用

AIO:JDK7引入的,异步非阻塞IO

NIO:JDK1.4之后新的API,是多路复用,允许你一次性处理多个连接,而不需要等待每个连接的完成。(NIO 多路复用的核心概念是 Selector(选择器)和 Channel(通道)通过Channel、Buffer和Selector来进行数据传输和事件处理)

Netty

Netty是建立在NIO之上的一个框架,提供了更高级的抽象,如ChannelHandler和EventLoop,简化了事件处理和网络编程。

执行流程如下图


具有高性能,高可靠性,高可扩展性,还支持多种协议

我们以聊天流程为例

  1. 服务端启动
  2. 客户端启动
  3. 客户端启动连接上的时候,告知服务端
  4. 服务端读取到客户端的信息后立即发送信息给客户端
  5. 客户端收到信息后也发送给服务端


1. 引入依赖

      <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.76.Final</version>
        </dependency>

2. 服务端

@Slf4j
public class NettyServer {
    private final static int PORT = 9012;
    public static void main(String[] args) throws InterruptedException {
        /**
         * 包含childGroup,childHandler,config,继承的父类AbstractBootstrap包括了parentGroup
         * */
        ServerBootstrap bootstrap = new ServerBootstrap();
        /**
         * EventLoopGroup用于处理所有ServerChannel和Channel的所有事件和IO
         * */
        EventLoopGroup parentGroup = new NioEventLoopGroup();
        EventLoopGroup childGroup = new NioEventLoopGroup();
        try {
            /**
             * 绑定两个事件组
             * */
            bootstrap.group(parentGroup, childGroup)
                    /**
                     * 初始化socket,定义tcp连接的实例
                     * 内部调用ReflectiveChannelFactory实现对NioServerSocketChannel实例化
                     * channelFactory是在AbstractBootstrap,也就是bootstrap的父类
                     * */
                    .channel(NioServerSocketChannel.class)
                    /**
                     * 添加处理器
                     * ChannelInitializer包括了Set<ChannelHandlerContext> initMap
                     *
                     * 这里比较有趣的事情就是使用被注册的channel去初始化其他的channel,
                     * 等初始化结束后移除该channel
                     * 所以SocketChannel是一个工具,
                     *
                     * 在bind绑定端口的时候,进行初始化和注册initAndRegister,
                     * 通过channel = channelFactory.newChannel()得到初始化channel
                     * init(channel)真正开始初始化,
                     * p = channel.pipeline()得到ChannelPipeline,
                     * p.addLast开始添加
                     * ch.eventLoop().execute将childHandler赋值并开启一个任务setAutoRead
                     * 所以最后在监听读取的时候将会按照下面添加的channel进行读取
                     *
                     * ChannelInitializer继承了ChannelInboundHandlerAdapter
                     * 间接继承ChannelHandlerAdapter,ChannelInboundHandler,
                     * */
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            /**
                             * ByteBuf和String之间的转换
                             *
                             *  Decoders解密
                             *  pipeline.addLast("frameDecoder", new {@link LineBasedFrameDecoder}(80))
                             *  pipeline.addLast("stringDecoder", new {@link StringDecoder}(CharsetUtil.UTF_8))
                             *
                             *  Encoder加密
                             *  pipeline.addLast("stringEncoder", new {@link StringEncoder}(CharsetUtil.UTF_8))
                             *
                             *  使用上面的加密解密后就可以直接读取字符串
                             *   void channelRead({@link ChannelHandlerContext} ctx, String msg) {
                             *       ch.write("Did you say '" + msg + "'?\n")
                             *  }
                             *
                             * */
                            pipeline.addLast(new StringDecoder());
                            pipeline.addLast(new StringEncoder());
                            //自定义处理器
                            pipeline.addLast(new ServerHandler1());
                        }
                    });
            ChannelFuture future = bootstrap.bind(PORT).sync();
            log.info("服务器已启动");
            future.channel().closeFuture().sync();
        } finally {
            parentGroup.shutdownGracefully();
            childGroup.shutdownGracefully();
        }
    }
}


这段代码实现了一个使用Netty框架的服务器端,它监听指定的端口并处理客户端的连接请求。


创建一个ServerBootstrap实例,用于启动服务器。

创建两个EventLoopGroup实例,parentGroup用于处理服务器的连接请求,childGroup用于处理客户端的数据通信。

绑定事件组到ServerBootstrap实例。

指定使用的NioServerSocketChannel作为服务器套接字通道的实现类。

添加处理器到ChannelInitializer中,该处理器负责初始化和配置新连接的SocketChannel。

在处理器中,通过ChannelPipeline添加了如下处理器:

StringDecoder:处理传入的字节数据,并将其解码为字符串。

StringEncoder:处理传出的字符串数据,并将其编码为字节。

ServerHandler1:自定义的处理器,用于处理客户端发送的消息。

绑定服务器的端口号,启动服务器。

等待服务器的关闭事件。


  1. 处理器
@Slf4j
public class ServerHandler1 extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        log.info("Client Address ====== {},读取的信息:{}", ctx.channel().remoteAddress(),msg);
        ctx.channel().writeAndFlush("服务端writeAndFlush:我是服务端");
        ctx.fireChannelActive();
        //睡眠
        TimeUnit.MILLISECONDS.sleep(500);
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        //打印异常
        cause.printStackTrace();
        //关闭Channel连接,并通知ChannelFuture,通常是出现异常或者是完成了操作
        ctx.close();
    }
}

4. 客户端

@Slf4j
public class NettyClient {
    private final static int PORT = 9012;
    private final static String IP = "localhost";
    public static void main(String[] args) throws InterruptedException {
        /**
         * 服务端是ServerBootstrap,客户端是Bootstrap
         * Bootstrap引导channel连接,UDP连接用bind方法,TCP连接用connect方法
         * */
        Bootstrap bootstrap = new Bootstrap();
        /**
         * 服务端是EventLoopGroup,客户端是NioEventLoopGroup
         * 这里创建默认0个线程,一个线程工厂,一个选择器提供者
         * */
        NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
        try {
            bootstrap.group(eventLoopGroup)
                    /**
                     * 初始化socket,定义tcp连接的实例
                     * */
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            /**
                             * 进行字符串的转换
                             * */
                            pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
                            pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
                            /**
                             * 自定义处理器
                             * */
                            pipeline.addLast(new ClientHandler1());
                        }
                    });
            ChannelFuture future = bootstrap.connect(IP, PORT).sync();
            log.info("客户端访问");
            future.channel().closeFuture().sync();
        } finally {
            eventLoopGroup.shutdownGracefully();
        }
    }
}

这段代码实现了一个使用Netty框架的客户端,它连接到指定的服务器端并与服务器进行通信。


创建一个Bootstrap实例,用于启动客户端。

创建一个NioEventLoopGroup实例,用于处理客户端的事件和IO操作。

绑定事件组到Bootstrap实例。

指定使用的NioSocketChannel作为客户端套接字通道的实现类。

添加处理器到ChannelInitializer中,该处理器负责初始化和配置客户端连接的SocketChannel。

在处理器中,通过ChannelPipeline添加了如下处理器:

StringDecoder:处理传入的字节数据,并将其解码为字符串。

StringEncoder:处理传出的字符串数据,并将其编码为字节。

ClientHandler1:自定义的处理器,用于处理与服务器之间的通信。

使用Bootstrap的connect()方法连接到指定的服务器IP和端口。

等待连接完成。

在连接成功后,打印日志信息。

等待客户端的关闭事件。

  1. 处理器
@Slf4j
public class ClientHandler1 extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        log.info("客户端读取的信息:{}", msg);
        ctx.channel().writeAndFlush("客户端writeAndFlush:我是客户端");
        TimeUnit.MILLISECONDS.sleep(5000);
    }
    /**
     * 当事件到达pipeline时候触发
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.channel().writeAndFlush("客户端:开始聊天");
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        //关闭Channel连接
        ctx.close();
    }
}

结果

服务端日志

Client Address ====== /127.0.0.1:63740,读取的信息:客户端:开始聊天
Client Address ====== /127.0.0.1:63740,读取的信息:客户端writeAndFlush:我是客户端
Client Address ====== /127.0.0.1:63740,读取的信息:客户端writeAndFlush:我是客户端

客户端日志

客户端读取的信息:服务端writeAndFlush:我是服务端
客户端读取的信息:服务端writeAndFlush:我是服务端

总结

引导类-Bootstarp和ServerBootstrap


Bootstarp和ServerBootstrap被称为引导类,使你的应用程序和网络层相隔离。类似java项目的启动类。

连接-NioSocketChannel

客户端和服务端的启动都是采用配置的channel去连接处理器,这里服务端和客户端是用NioSocketChannel

事件组-EventLoopGroup和NioEventLoopGroup

客户端使用的是NioEventLoopGroup,服务端使用的是EventLoopGroup。 服务端和客户端的引导类启动后实现了配置的运行,客户端和服务端的连接都是采用NioSocketChannel。 连接的流程:


客户端创建一个channel

channel对应一个EventLoop,EventLoop存放到NioEventLoopGroup中

服务端监听到后,创建一个channel连接,channel对应一个EventLoop,EventLoop存放到子的EventLoopGroup,父的事件组负责监听,一个事件对应一个子事件组。

在这里可以认为父是boss监听组,子是工作组。

当客户端发送信息的时候,先被父监听,然后将异步调用工作组。

消息会经过事件组的所有处理器。

实际上服务端的事件组也可以使用NioEventLoopGroup。

![在这里插入图片描述](https://ucc.alicdn.com/images/user-upload-01/direct/5e676670b49e40dd83155e76094e9017.png


相关文章
|
1月前
|
安全 前端开发 Java
随着企业应用复杂度提升,Java Spring框架以其强大与灵活特性简化开发流程,成为构建高效、可维护应用的理想选择
随着企业应用复杂度提升,Java Spring框架以其强大与灵活特性简化开发流程,成为构建高效、可维护应用的理想选择。依赖注入使对象管理交由Spring容器处理,实现低耦合高内聚;AOP则分离横切关注点如事务管理,增强代码模块化。Spring还提供MVC、Data、Security等模块满足多样需求,并通过Spring Boot简化配置与部署,加速微服务架构构建。掌握这些核心概念与工具,开发者能更从容应对挑战,打造卓越应用。
34 1
|
25天前
|
前端开发 安全 Java
技术进阶:使用Spring MVC构建适应未来的响应式Web应用
【9月更文挑战第2天】随着移动设备的普及,响应式设计至关重要。Spring MVC作为强大的Java Web框架,助力开发者创建适应多屏的应用。本文推荐使用Thymeleaf整合视图,通过简洁的HTML代码提高前端灵活性;采用`@ResponseBody`与`Callable`实现异步处理,优化应用响应速度;运用`@ControllerAdvice`统一异常管理,保持代码整洁;借助Jackson简化JSON处理;利用Spring Security增强安全性;并强调测试的重要性。遵循这些实践,将大幅提升开发效率和应用质量。
53 7
|
21天前
|
缓存 Java 应用服务中间件
随着微服务架构的兴起,Spring Boot凭借其快速开发和易部署的特点,成为构建RESTful API的首选框架
【9月更文挑战第6天】随着微服务架构的兴起,Spring Boot凭借其快速开发和易部署的特点,成为构建RESTful API的首选框架。Nginx作为高性能的HTTP反向代理服务器,常用于前端负载均衡,提升应用的可用性和响应速度。本文详细介绍如何通过合理配置实现Spring Boot与Nginx的高效协同工作,包括负载均衡策略、静态资源缓存、数据压缩传输及Spring Boot内部优化(如线程池配置、缓存策略等)。通过这些方法,开发者可以显著提升系统的整体性能,打造高性能、高可用的Web应用。
51 2
|
1月前
|
网络协议 测试技术 Apache
测试Netty高并发工具
测试Netty高并发工具
63 3
|
1月前
|
前端开发 Java Spring
springboot 整合 netty框架, 实现 心跳检测,自动重连
springboot 整合 netty框架, 实现 心跳检测,自动重连
|
26天前
|
Java 微服务 Spring
驾驭复杂性:Spring Cloud在微服务构建中的决胜法则
【8月更文挑战第31天】Spring Cloud是在Spring Framework基础上打造的微服务解决方案,提供服务发现、配置管理、消息路由等功能,适用于构建复杂的微服务架构。本文介绍如何利用Spring Cloud搭建微服务,包括Eureka服务发现、Config Server配置管理和Zuul API网关等组件的配置与使用。通过Spring Cloud,可实现快速开发、自动化配置,并提升系统的伸缩性和容错性,尽管仍需面对分布式事务等挑战,但其强大的社区支持有助于解决问题。
36 0
|
29天前
|
监控 Java API
Spring Boot中的异步革命:构建高性能的现代Web应用
【8月更文挑战第29天】Spring Boot 是一个简化 Spring 应用开发与部署的框架。异步任务处理通过后台线程执行耗时操作,提升用户体验和系统并发能力。要在 Spring Boot 中启用异步任务,需在配置类上添加 `@EnableAsync` 注解,并定义一个自定义的 `ThreadPoolTaskExecutor` 或使用默认线程池。通过 `@Async` 注解的方法将在异步线程中执行。异步任务适用于发送电子邮件、数据处理、外部 API 调用和定时任务等场景。最佳实践中应注意正确配置线程池、处理返回值和异常、以及监控任务状态,确保系统的稳定性和健壮性。
31 0
|
2月前
|
Java 持续交付 Maven
Spring Boot程序的打包与运行:构建高效部署流程
构建高效的Spring Boot部署流程对于保障应用的快速、稳定上线至关重要。通过采用上述策略,您可以确保部署过程的自动化、可靠性和高效性,从而将专注点放在开发上面。无论是通过Maven的生命周期命令进行打包,还是通过容器技术对部署过程进行优化,选择正确的工具与实践是成功实现这一目标的关键。
108 2
|
2月前
|
消息中间件 Java 开发者
Spring Cloud微服务框架:构建高可用、分布式系统的现代架构
Spring Cloud是一个开源的微服务框架,旨在帮助开发者快速构建在分布式系统环境中运行的服务。它提供了一系列工具,用于在分布式系统中配置、服务发现、断路器、智能路由、微代理、控制总线、一次性令牌、全局锁、领导选举、分布式会话、集群状态等领域的支持。
148 5
|
2月前
|
存储 Java 数据挖掘
构建基于Spring Boot的数据分析平台
构建基于Spring Boot的数据分析平台