Netty异步NIO框架(一)java服务端与客户端实现聊天 websocket通道

简介: Netty异步NIO框架(一)java服务端与客户端实现聊天 websocket通道

介绍

Netty 是 一个异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。

Netty 是一个 NIO 客户端服务器框架,可以快速轻松地开发协议服务器和客户端等网络应用程序。它极大地简化和流线了网络编程,例如 TCP 和 UDP 套接字服务器。

“快速和简单”并不意味着生成的应用程序会受到可维护性或性能问题的影响。Netty 是根据从实现许多协议(如 FTP、SMTP、HTTP 以及各种二进制和基于文本的旧协议)中获得的经验精心设计的。因此,Netty 成功地找到了一种方法,可以在不妥协的情况下实现易于开发、性能、稳定性和灵活性。


设计

各种传输类型的统一 API - 阻塞和非阻塞套接字

基于灵活且可扩展的事件模型,允许明确分离关注点

高度可定制的线程模型——单线程、一个或多个线程池,例如 SEDA

真正的无连接数据报套接字支持(自 3.1 起)


Performance

更高的吞吐量,更低的延迟

更少的资源消耗

最小化不必要的内存拷贝

话不多说,上代码


java代码

maven依赖

<dependency>
  <groupId>io.netty</groupId>
  <artifactId>netty-all</artifactId>
  <version>4.1.73.Final</version>
</dependency>

服务端创建

自定义服务端处理类

package com.netty.server;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import java.util.UUID;
/**
 * @author wuzhenyong
 * ClassName:NettySocketServerHandler.java
 * date:2022-04-29 10:24
 * Description: 服务端自定义处理器
 */
public class NettySocketServerHandler extends SimpleChannelInboundHandler<Object> {
    /**
     * 收到消息请求调用此方法
     *
     * @param ctx ctx
     * @param msg 味精
     * @throws Exception 异常
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("接收到客户端发来消息:" + ctx.channel().remoteAddress() + ":" + msg);
        //返回数据
        ctx.channel().writeAndFlush("服务端发送数据:" + UUID.randomUUID());
    }
    /**
     * 出现异常调用此方法
     *
     * @param ctx   ctx
     * @param cause 导致
     * @throws Exception 异常
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        // 关闭通道
        ctx.close();
    }
    /**
     * 客户端创建连接请求调用此方法
     *
     * @param ctx ctx
     * @throws Exception 异常
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println(ctx.channel().remoteAddress()  + "已上线");
    }
}

自定义服务端初始化信息

package com.netty.server;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import java.util.UUID;
/**
 * @author wuzhenyong
 * ClassName:NettySocketServerHandler.java
 * date:2022-04-29 10:24
 * Description: 服务端自定义处理器
 */
public class NettySocketServerHandler extends SimpleChannelInboundHandler<Object> {
    /**
     * 收到消息请求调用此方法
     *
     * @param ctx ctx
     * @param msg 味精
     * @throws Exception 异常
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("接收到客户端发来消息:" + ctx.channel().remoteAddress() + ":" + msg);
        //返回数据
        ctx.channel().writeAndFlush("服务端发送数据:" + UUID.randomUUID());
    }
    /**
     * 出现异常调用此方法
     *
     * @param ctx   ctx
     * @param cause 导致
     * @throws Exception 异常
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        // 关闭通道
        ctx.close();
    }
    /**
     * 客户端创建连接请求调用此方法
     *
     * @param ctx ctx
     * @throws Exception 异常
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println(ctx.channel().remoteAddress()  + "已上线");
    }
}

服务端启动类

package com.netty.server;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.SneakyThrows;
/**
 * @author wuzhenyong
 * ClassName:NettySocketServer.java
 * date:2022-04-29 10:22
 * Description: 服务端
 */
public class NettySocketServer {
    public static void main(String[] args) throws  Exception {
        //定义线程组 EventLoopGroup为死循环
        //boss线程组一直在接收客户端发起的请求,但是不对请求做处理,boss会将接收到的请i交给worker线程组来处理
        //实际可以用一个线程组来做客户端的请求接收和处理两件事,但是不推荐
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            //启动类定义
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    //指定在服务端启动过程中的一些逻辑,通常情况下我们用不着这个方法
                    .handler(new ChannelInitializer<NioServerSocketChannel>() {
                        @Override
                        protected void initChannel(NioServerSocketChannel nioServerSocketChannel) throws Exception {
                            System.out.println("服务端启动中...");
                        }
                    })
                    //子处理器,自定义处理器
                    .childHandler(new NettySocketServerInitializer());
            //绑定端口
            ChannelFuture channelFuture = serverBootstrap.bind(8899).sync();
            System.out.println("服务器启动成功");
            channelFuture.channel().closeFuture().sync();
        } finally {
            //Netty提供的优雅关闭
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

启动服务端

E:\java\jdk1.8\bin\java.exe -javaagent:D:\idea\IntelliJIDEA2021.2.3\lib\idea_rt.jar=6526:D:\idea\IntelliJIDEA2021.2.3\bin -Dfile.encoding=UTF-8 -classpath E:\java\jdk1.8\jre\lib\charsets.jar;E:\java\jdk1.8\jre\lib\deploy.jar;E:\java\jdk1.8\jre\lib\ext\access-bridge-64.jar;E:\java\jdk1.8\jre\lib\ext\cldrdata.jar;E:\java\jdk1.8\jre\lib\ext\dnsns.jar;E:\java\jdk1.8\jre\lib\ext\jaccess.jar;E:\java\jdk1.8\jre\lib\ext\jfxrt.jar;E:\java\jdk1.8\jre\lib\ext\localedata.jar;E:\java\jdk1.8\jre\lib\ext\nashorn.jar;E:\java\jdk1.8\jre\lib\ext\sunec.jar;E:\java\jdk1.8\jre\lib\ext\sunjce_provider.jar;E:\java\jdk1.8\jre\lib\ext\sunmscapi.jar;E:\java\jdk1.8\jre\lib\ext\sunpkcs11.jar;E:\java\jdk1.8\jre\lib\ext\zipfs.jar;E:\java\jdk1.8\jre\lib\javaws.jar;E:\java\jdk1.8\jre\lib\jce.jar;E:\java\jdk1.8\jre\lib\jfr.jar;E:\java\jdk1.8\jre\lib\jfxswt.jar;E:\java\jdk1.8\jre\lib\jsse.jar;E:\java\jdk1.8\jre\lib\management-agent.jar;E:\java\jdk1.8\jre\lib\plugin.jar;E:\java\jdk1.8\jre\lib\resources.jar;E:\java\jdk1.8\jre\lib\rt.jar;F:\WorkTools\ws\parent\netty-websocket\target\classes;F:\Install\repository\org\projectlombok\lombok\1.18.12\lombok-1.18.12.jar;F:\Install\repository\com\alibaba\fastjson\1.2.69\fastjson-1.2.69.jar;F:\Install\repository\org\springframework\boot\spring-boot-starter-web\2.2.5.RELEASE\spring-boot-starter-web-2.2.5.RELEASE.jar;F:\Install\repository\org\springframework\boot\spring-boot-starter\2.2.5.RELEASE\spring-boot-starter-2.2.5.RELEASE.jar;F:\Install\repository\org\springframework\boot\spring-boot\2.2.5.RELEASE\spring-boot-2.2.5.RELEASE.jar;F:\Install\repository\org\springframework\boot\spring-boot-autoconfigure\2.2.5.RELEASE\spring-boot-autoconfigure-2.2.5.RELEASE.jar;F:\Install\repository\org\springframework\boot\spring-boot-starter-logging\2.2.5.RELEASE\spring-boot-starter-logging-2.2.5.RELEASE.jar;F:\Install\repository\ch\qos\logback\logback-classic\1.2.3\logback-classic-1.2.3.jar;F:\Install\repository\ch\qos\logback\logback-core\1.2.3\logback-core-1.2.3.jar;F:\Install\repository\org\apache\logging\log4j\log4j-to-slf4j\2.12.1\log4j-to-slf4j-2.12.1.jar;F:\Install\repository\org\apache\logging\log4j\log4j-api\2.12.1\log4j-api-2.12.1.jar;F:\Install\repository\org\slf4j\jul-to-slf4j\1.7.30\jul-to-slf4j-1.7.30.jar;F:\Install\repository\jakarta\annotation\jakarta.annotation-api\1.3.5\jakarta.annotation-api-1.3.5.jar;F:\Install\repository\org\yaml\snakeyaml\1.25\snakeyaml-1.25.jar;F:\Install\repository\org\springframework\boot\spring-boot-starter-json\2.2.5.RELEASE\spring-boot-starter-json-2.2.5.RELEASE.jar;F:\Install\repository\com\fasterxml\jackson\core\jackson-databind\2.10.2\jackson-databind-2.10.2.jar;F:\Install\repository\com\fasterxml\jackson\core\jackson-annotations\2.10.2\jackson-annotations-2.10.2.jar;F:\Install\repository\com\fasterxml\jackson\core\jackson-core\2.10.2\jackson-core-2.10.2.jar;F:\Install\repository\com\fasterxml\jackson\datatype\jackson-datatype-jdk8\2.10.2\jackson-datatype-jdk8-2.10.2.jar;F:\Install\repository\com\fasterxml\jackson\datatype\jackson-datatype-jsr310\2.10.2\jackson-datatype-jsr310-2.10.2.jar;F:\Install\repository\com\fasterxml\jackson\module\jackson-module-parameter-names\2.10.2\jackson-module-parameter-names-2.10.2.jar;F:\Install\repository\org\springframework\boot\spring-boot-starter-tomcat\2.2.5.RELEASE\spring-boot-starter-tomcat-2.2.5.RELEASE.jar;F:\Install\repository\org\apache\tomcat\embed\tomcat-embed-core\9.0.31\tomcat-embed-core-9.0.31.jar;F:\Install\repository\org\apache\tomcat\embed\tomcat-embed-el\9.0.31\tomcat-embed-el-9.0.31.jar;F:\Install\repository\org\apache\tomcat\embed\tomcat-embed-websocket\9.0.31\tomcat-embed-websocket-9.0.31.jar;F:\Install\repository\org\springframework\boot\spring-boot-starter-validation\2.2.5.RELEASE\spring-boot-starter-validation-2.2.5.RELEASE.jar;F:\Install\repository\jakarta\validation\jakarta.validation-api\2.0.2\jakarta.validation-api-2.0.2.jar;F:\Install\repository\org\hibernate\validator\hibernate-validator\6.0.18.Final\hibernate-validator-6.0.18.Final.jar;F:\Install\repository\org\jboss\logging\jboss-logging\3.4.1.Final\jboss-logging-3.4.1.Final.jar;F:\Install\repository\com\fasterxml\classmate\1.5.1\classmate-1.5.1.jar;F:\Install\repository\org\springframework\spring-web\5.2.4.RELEASE\spring-web-5.2.4.RELEASE.jar;F:\Install\repository\org\springframework\spring-beans\5.2.4.RELEASE\spring-beans-5.2.4.RELEASE.jar;F:\Install\repository\org\springframework\spring-webmvc\5.2.4.RELEASE\spring-webmvc-5.2.4.RELEASE.jar;F:\Install\repository\org\springframework\spring-aop\5.2.4.RELEASE\spring-aop-5.2.4.RELEASE.jar;F:\Install\repository\org\springframework\spring-context\5.2.4.RELEASE\spring-context-5.2.4.RELEASE.jar;F:\Install\repository\org\springframework\spring-expression\5.2.4.RELEASE\spring-expression-5.2.4.RELEASE.jar;F:\Install\repository\org\springframework\boot\spring-boot-starter-test\2.2.5.RELEASE\spring-boot-starter-test-2.2.5.RELEASE.jar;F:\Install\repository\org\springframework\boot\spring-boot-test\2.2.5.RELEASE\spring-boot-test-2.2.5.RELEASE.jar;F:\Install\repository\org\springframework\boot\spring-boot-test-autoconfigure\2.2.5.RELEASE\spring-boot-test-autoconfigure-2.2.5.RELEASE.jar;F:\Install\repository\com\jayway\jsonpath\json-path\2.4.0\json-path-2.4.0.jar;
............省略...........
服务端启动中...
服务器启动成功

客户端创建

自定义客户端处理器

package com.netty.client;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import java.time.LocalDateTime;
/**
 * @author wuzhenyong
 * ClassName:NettyClientHandler.java
 * date:2022-04-29 10:29
 * Description:
 */
public class NettyClientHandler extends SimpleChannelInboundHandler<Object> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("接收到客户端:" + ctx.channel().remoteAddress() + "发送消息:" + msg);
        ctx.writeAndFlush("客户端发送消息:" + "一个小浪吴啊 >>" + LocalDateTime.now());
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        // 关闭请求
        ctx.close();
    }
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.writeAndFlush("客户端连接初始化");
    }
}

自定义客户端初始化信息

package com.netty.client;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;
/**
 * @author wuzhenyong
 * ClassName:NettyClientInitializer.java
 * date:2022-04-29 10:28
 * Description:
 */
public class NettyClientInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        //声明管道
        ChannelPipeline pipeline = socketChannel.pipeline();
        //绑定自带的解码器,就是对二进制数据的解析工具,至于解码器构造方法的参数之后详细分析
        pipeline.addLast("lengthFieldBasedFrameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
        //编码器
        pipeline.addLast("lengthFieldPrepender", new LengthFieldPrepender(4));
        //由于涉及到服务端和客户端的字符串数据,需要绑定字符串的编解码
        pipeline.addLast("stringDecoder", new StringDecoder(CharsetUtil.UTF_8));
        pipeline.addLast("stringEncoder", new StringEncoder(CharsetUtil.UTF_8));
        //自定义处理器
        pipeline.addLast("myClientHandler", new NettyClientHandler());
    }
}

客户端启动类

package com.netty.client;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.SneakyThrows;
import java.util.Scanner;
/**
 * @author wuzhenyong
 * ClassName:NettySocketClient.java
 * date:2022-04-29 10:26
 * Description:
 */
public class NettySocketClient {
    public static void main(String[] args) throws  Exception  {
        //客户端只需要一个线程组
        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
        try {
            //声明客户端启动类
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class).handler(new NettyClientInitializer());
            ChannelFuture channelFuture = bootstrap.connect("localhost", 8899).sync();
            // 三秒后关闭连接请求
            Thread.sleep(1000 * 3);
            channelFuture.channel().close().sync();
        } finally {
            //优雅关闭
            eventLoopGroup.shutdownGracefully();
        }
    }
}

启动服务端与客户端

服务端控制台打印

客户端启动后建立连接请求并发送一条信息,服务端打印日志

03f5cf8907ca4bf18ca87d47a67422b3.png


以下是客户端发送消息,服务端打印接收到消息日志

2fe6febdf4db45e2bd08bce436ea026e.png

客户端控制台打印

客户端接收到服务端的信息打印

c32938cba9e041438a1204c0499295d0.png

为什么服务端和客户端会一直打印消息?

1.服务端自定义处理器重新read0方法,此方法代码是接收到消息请求后会对发送消息的通道(也就是创建连接的客户端初始化时会发送一个连接请求,及发送一条“客户端连接初始化”信息)

2.然后服务端执行此方法,控制台输出接收到的消息后,又对发送消息的通道进行回复消息,所以客户端也会收到服务端的消息

a9fd3708b134440fb4aa6bbe859df15b.png

我们来看客户端的自定义处理器

1.①代码是建立连接请求执行,然后发送一条消息

2.服务端接收到消息后进行打印,然后也会写一条数据发送给客户端

3.客户端接收到消息进行打印,回再写一条数据进行回复

所以就会造成了死循环,而我在客户端启动类线程等待三秒结束连接请求

eb6bd0deb2684ca2a5e2a773f2c298df.png








以上服务端与客户端的聊天就结束了。

后端使用netty websocket通道,前端使用websocket进行实现私聊或者群发消息敬请期待下一篇。

感谢浏览。


相关文章
|
4月前
|
安全 前端开发 Java
《深入理解Spring》:现代Java开发的核心框架
Spring自2003年诞生以来,已成为Java企业级开发的基石,凭借IoC、AOP、声明式编程等核心特性,极大简化了开发复杂度。本系列将深入解析Spring框架核心原理及Spring Boot、Cloud、Security等生态组件,助力开发者构建高效、可扩展的应用体系。(238字)
|
5月前
|
人工智能 Java 开发者
阿里出手!Java 开发者狂喜!开源 AI Agent 框架 JManus 来了,初次见面就心动~
JManus是阿里开源的Java版OpenManus,基于Spring AI Alibaba框架,助力Java开发者便捷应用AI技术。支持多Agent框架、网页配置、MCP协议及PLAN-ACT模式,可集成多模型,适配阿里云百炼平台与本地ollama。提供Docker与源码部署方式,具备无限上下文处理能力,适用于复杂AI场景。当前仍在完善模型配置等功能,欢迎参与开源共建。
2371 58
阿里出手!Java 开发者狂喜!开源 AI Agent 框架 JManus 来了,初次见面就心动~
|
4月前
|
消息中间件 缓存 Java
Spring框架优化:提高Java应用的性能与适应性
以上方法均旨在综合考虑Java Spring 应该程序设计原则, 数据库交互, 编码实践和系统架构布局等多角度因素, 旨在达到高效稳定运转目标同时也易于未来扩展.
290 8
|
4月前
|
存储 安全 Java
《数据之美》:Java集合框架全景解析
Java集合框架是数据管理的核心工具,涵盖List、Set、Map等体系,提供丰富接口与实现类,支持高效的数据操作与算法处理。
|
4月前
|
存储 算法 安全
Java集合框架:理解类型多样性与限制
总之,在 Java 题材中正确地应对多样化与约束条件要求开发人员深入理解面向对象原则、范式编程思想以及JVM工作机理等核心知识点。通过精心设计与周密规划能够有效地利用 Java 高级特征打造出既健壮又灵活易维护系统软件产品。
155 7
|
5月前
|
SQL Java 数据库连接
区分iBatis与MyBatis:两个Java数据库框架的比较
总结起来:虽然从技术角度看,iBATIS已经停止更新但仍然可用;然而考虑到长期项目健康度及未来可能需求变化情况下MYBATISS无疑会是一个更佳选择因其具备良好生命周期管理机制同时也因为社区力量背书确保问题修复新特征添加速度快捷有效.
367 12
|
6月前
|
存储 缓存 安全
Java集合框架(三):Map体系与ConcurrentHashMap
本文深入解析Java中Map接口体系及其实现类,包括HashMap、ConcurrentHashMap等的工作原理与线程安全机制。内容涵盖哈希冲突解决、扩容策略、并发优化,以及不同Map实现的适用场景,助你掌握高并发编程核心技巧。
|
存储 缓存 NoSQL
跟着源码学IM(十一):一套基于Netty的分布式高可用IM详细设计与实现(有源码)
本文将要分享的是如何从零实现一套基于Netty框架的分布式高可用IM系统,它将支持长连接网关管理、单聊、群聊、聊天记录查询、离线消息存储、消息推送、心跳、分布式唯一ID、红包、消息同步等功能,并且还支持集群部署。
13940 1
|
9月前
|
算法 Java 容器
Netty源码—4.客户端接入流程
本文主要介绍了关于Netty客户端连接接入问题整理、Reactor线程模型和服务端启动流程、Netty新连接接入的整体处理逻辑、新连接接入之检测新连接、新连接接入之创建NioSocketChannel、新连接接入之绑定NioEventLoop线程、新连接接入之注册Selector和注册读事件、注册Reactor线程总结、新连接接入总结
|
9月前
|
安全 Java 调度
Netty源码—3.Reactor线程模型二
本文主要介绍了NioEventLoop的执行总体框架、Reactor线程执行一次事件轮询、Reactor线程处理产生IO事件的Channel、Reactor线程处理任务队列之添加任务、Reactor线程处理任务队列之执行任务、NioEventLoop总结。