Netty异步NIO框架(一)java服务端与客户端实现聊天 websocket通道

简介: Netty异步NIO框架(一)java服务端与客户端实现聊天 websocket通道

介绍

Netty 是 一个异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。

Netty 是一个 NIO 客户端服务器框架,可以快速轻松地开发协议服务器和客户端等网络应用程序。它极大地简化和流线了网络编程,例如 TCP 和 UDP 套接字服务器。

“快速和简单”并不意味着生成的应用程序会受到可维护性或性能问题的影响。Netty 是根据从实现许多协议(如 FTP、SMTP、HTTP 以及各种二进制和基于文本的旧协议)中获得的经验精心设计的。因此,Netty 成功地找到了一种方法,可以在不妥协的情况下实现易于开发、性能、稳定性和灵活性。


设计

各种传输类型的统一 API - 阻塞和非阻塞套接字

基于灵活且可扩展的事件模型,允许明确分离关注点

高度可定制的线程模型——单线程、一个或多个线程池,例如 SEDA

真正的无连接数据报套接字支持(自 3.1 起)


Performance

更高的吞吐量,更低的延迟

更少的资源消耗

最小化不必要的内存拷贝

话不多说,上代码


java代码

maven依赖

<dependency>
  <groupId>io.netty</groupId>
  <artifactId>netty-all</artifactId>
  <version>4.1.73.Final</version>
</dependency>

服务端创建

自定义服务端处理类

package com.netty.server;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import java.util.UUID;
/**
 * @author wuzhenyong
 * ClassName:NettySocketServerHandler.java
 * date:2022-04-29 10:24
 * Description: 服务端自定义处理器
 */
public class NettySocketServerHandler extends SimpleChannelInboundHandler<Object> {
    /**
     * 收到消息请求调用此方法
     *
     * @param ctx ctx
     * @param msg 味精
     * @throws Exception 异常
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("接收到客户端发来消息:" + ctx.channel().remoteAddress() + ":" + msg);
        //返回数据
        ctx.channel().writeAndFlush("服务端发送数据:" + UUID.randomUUID());
    }
    /**
     * 出现异常调用此方法
     *
     * @param ctx   ctx
     * @param cause 导致
     * @throws Exception 异常
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        // 关闭通道
        ctx.close();
    }
    /**
     * 客户端创建连接请求调用此方法
     *
     * @param ctx ctx
     * @throws Exception 异常
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println(ctx.channel().remoteAddress()  + "已上线");
    }
}

自定义服务端初始化信息

package com.netty.server;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import java.util.UUID;
/**
 * @author wuzhenyong
 * ClassName:NettySocketServerHandler.java
 * date:2022-04-29 10:24
 * Description: 服务端自定义处理器
 */
public class NettySocketServerHandler extends SimpleChannelInboundHandler<Object> {
    /**
     * 收到消息请求调用此方法
     *
     * @param ctx ctx
     * @param msg 味精
     * @throws Exception 异常
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("接收到客户端发来消息:" + ctx.channel().remoteAddress() + ":" + msg);
        //返回数据
        ctx.channel().writeAndFlush("服务端发送数据:" + UUID.randomUUID());
    }
    /**
     * 出现异常调用此方法
     *
     * @param ctx   ctx
     * @param cause 导致
     * @throws Exception 异常
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        // 关闭通道
        ctx.close();
    }
    /**
     * 客户端创建连接请求调用此方法
     *
     * @param ctx ctx
     * @throws Exception 异常
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println(ctx.channel().remoteAddress()  + "已上线");
    }
}

服务端启动类

package com.netty.server;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.SneakyThrows;
/**
 * @author wuzhenyong
 * ClassName:NettySocketServer.java
 * date:2022-04-29 10:22
 * Description: 服务端
 */
public class NettySocketServer {
    public static void main(String[] args) throws  Exception {
        //定义线程组 EventLoopGroup为死循环
        //boss线程组一直在接收客户端发起的请求,但是不对请求做处理,boss会将接收到的请i交给worker线程组来处理
        //实际可以用一个线程组来做客户端的请求接收和处理两件事,但是不推荐
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            //启动类定义
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    //指定在服务端启动过程中的一些逻辑,通常情况下我们用不着这个方法
                    .handler(new ChannelInitializer<NioServerSocketChannel>() {
                        @Override
                        protected void initChannel(NioServerSocketChannel nioServerSocketChannel) throws Exception {
                            System.out.println("服务端启动中...");
                        }
                    })
                    //子处理器,自定义处理器
                    .childHandler(new NettySocketServerInitializer());
            //绑定端口
            ChannelFuture channelFuture = serverBootstrap.bind(8899).sync();
            System.out.println("服务器启动成功");
            channelFuture.channel().closeFuture().sync();
        } finally {
            //Netty提供的优雅关闭
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

启动服务端

E:\java\jdk1.8\bin\java.exe -javaagent:D:\idea\IntelliJIDEA2021.2.3\lib\idea_rt.jar=6526:D:\idea\IntelliJIDEA2021.2.3\bin -Dfile.encoding=UTF-8 -classpath E:\java\jdk1.8\jre\lib\charsets.jar;E:\java\jdk1.8\jre\lib\deploy.jar;E:\java\jdk1.8\jre\lib\ext\access-bridge-64.jar;E:\java\jdk1.8\jre\lib\ext\cldrdata.jar;E:\java\jdk1.8\jre\lib\ext\dnsns.jar;E:\java\jdk1.8\jre\lib\ext\jaccess.jar;E:\java\jdk1.8\jre\lib\ext\jfxrt.jar;E:\java\jdk1.8\jre\lib\ext\localedata.jar;E:\java\jdk1.8\jre\lib\ext\nashorn.jar;E:\java\jdk1.8\jre\lib\ext\sunec.jar;E:\java\jdk1.8\jre\lib\ext\sunjce_provider.jar;E:\java\jdk1.8\jre\lib\ext\sunmscapi.jar;E:\java\jdk1.8\jre\lib\ext\sunpkcs11.jar;E:\java\jdk1.8\jre\lib\ext\zipfs.jar;E:\java\jdk1.8\jre\lib\javaws.jar;E:\java\jdk1.8\jre\lib\jce.jar;E:\java\jdk1.8\jre\lib\jfr.jar;E:\java\jdk1.8\jre\lib\jfxswt.jar;E:\java\jdk1.8\jre\lib\jsse.jar;E:\java\jdk1.8\jre\lib\management-agent.jar;E:\java\jdk1.8\jre\lib\plugin.jar;E:\java\jdk1.8\jre\lib\resources.jar;E:\java\jdk1.8\jre\lib\rt.jar;F:\WorkTools\ws\parent\netty-websocket\target\classes;F:\Install\repository\org\projectlombok\lombok\1.18.12\lombok-1.18.12.jar;F:\Install\repository\com\alibaba\fastjson\1.2.69\fastjson-1.2.69.jar;F:\Install\repository\org\springframework\boot\spring-boot-starter-web\2.2.5.RELEASE\spring-boot-starter-web-2.2.5.RELEASE.jar;F:\Install\repository\org\springframework\boot\spring-boot-starter\2.2.5.RELEASE\spring-boot-starter-2.2.5.RELEASE.jar;F:\Install\repository\org\springframework\boot\spring-boot\2.2.5.RELEASE\spring-boot-2.2.5.RELEASE.jar;F:\Install\repository\org\springframework\boot\spring-boot-autoconfigure\2.2.5.RELEASE\spring-boot-autoconfigure-2.2.5.RELEASE.jar;F:\Install\repository\org\springframework\boot\spring-boot-starter-logging\2.2.5.RELEASE\spring-boot-starter-logging-2.2.5.RELEASE.jar;F:\Install\repository\ch\qos\logback\logback-classic\1.2.3\logback-classic-1.2.3.jar;F:\Install\repository\ch\qos\logback\logback-core\1.2.3\logback-core-1.2.3.jar;F:\Install\repository\org\apache\logging\log4j\log4j-to-slf4j\2.12.1\log4j-to-slf4j-2.12.1.jar;F:\Install\repository\org\apache\logging\log4j\log4j-api\2.12.1\log4j-api-2.12.1.jar;F:\Install\repository\org\slf4j\jul-to-slf4j\1.7.30\jul-to-slf4j-1.7.30.jar;F:\Install\repository\jakarta\annotation\jakarta.annotation-api\1.3.5\jakarta.annotation-api-1.3.5.jar;F:\Install\repository\org\yaml\snakeyaml\1.25\snakeyaml-1.25.jar;F:\Install\repository\org\springframework\boot\spring-boot-starter-json\2.2.5.RELEASE\spring-boot-starter-json-2.2.5.RELEASE.jar;F:\Install\repository\com\fasterxml\jackson\core\jackson-databind\2.10.2\jackson-databind-2.10.2.jar;F:\Install\repository\com\fasterxml\jackson\core\jackson-annotations\2.10.2\jackson-annotations-2.10.2.jar;F:\Install\repository\com\fasterxml\jackson\core\jackson-core\2.10.2\jackson-core-2.10.2.jar;F:\Install\repository\com\fasterxml\jackson\datatype\jackson-datatype-jdk8\2.10.2\jackson-datatype-jdk8-2.10.2.jar;F:\Install\repository\com\fasterxml\jackson\datatype\jackson-datatype-jsr310\2.10.2\jackson-datatype-jsr310-2.10.2.jar;F:\Install\repository\com\fasterxml\jackson\module\jackson-module-parameter-names\2.10.2\jackson-module-parameter-names-2.10.2.jar;F:\Install\repository\org\springframework\boot\spring-boot-starter-tomcat\2.2.5.RELEASE\spring-boot-starter-tomcat-2.2.5.RELEASE.jar;F:\Install\repository\org\apache\tomcat\embed\tomcat-embed-core\9.0.31\tomcat-embed-core-9.0.31.jar;F:\Install\repository\org\apache\tomcat\embed\tomcat-embed-el\9.0.31\tomcat-embed-el-9.0.31.jar;F:\Install\repository\org\apache\tomcat\embed\tomcat-embed-websocket\9.0.31\tomcat-embed-websocket-9.0.31.jar;F:\Install\repository\org\springframework\boot\spring-boot-starter-validation\2.2.5.RELEASE\spring-boot-starter-validation-2.2.5.RELEASE.jar;F:\Install\repository\jakarta\validation\jakarta.validation-api\2.0.2\jakarta.validation-api-2.0.2.jar;F:\Install\repository\org\hibernate\validator\hibernate-validator\6.0.18.Final\hibernate-validator-6.0.18.Final.jar;F:\Install\repository\org\jboss\logging\jboss-logging\3.4.1.Final\jboss-logging-3.4.1.Final.jar;F:\Install\repository\com\fasterxml\classmate\1.5.1\classmate-1.5.1.jar;F:\Install\repository\org\springframework\spring-web\5.2.4.RELEASE\spring-web-5.2.4.RELEASE.jar;F:\Install\repository\org\springframework\spring-beans\5.2.4.RELEASE\spring-beans-5.2.4.RELEASE.jar;F:\Install\repository\org\springframework\spring-webmvc\5.2.4.RELEASE\spring-webmvc-5.2.4.RELEASE.jar;F:\Install\repository\org\springframework\spring-aop\5.2.4.RELEASE\spring-aop-5.2.4.RELEASE.jar;F:\Install\repository\org\springframework\spring-context\5.2.4.RELEASE\spring-context-5.2.4.RELEASE.jar;F:\Install\repository\org\springframework\spring-expression\5.2.4.RELEASE\spring-expression-5.2.4.RELEASE.jar;F:\Install\repository\org\springframework\boot\spring-boot-starter-test\2.2.5.RELEASE\spring-boot-starter-test-2.2.5.RELEASE.jar;F:\Install\repository\org\springframework\boot\spring-boot-test\2.2.5.RELEASE\spring-boot-test-2.2.5.RELEASE.jar;F:\Install\repository\org\springframework\boot\spring-boot-test-autoconfigure\2.2.5.RELEASE\spring-boot-test-autoconfigure-2.2.5.RELEASE.jar;F:\Install\repository\com\jayway\jsonpath\json-path\2.4.0\json-path-2.4.0.jar;
............省略...........
服务端启动中...
服务器启动成功

客户端创建

自定义客户端处理器

package com.netty.client;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import java.time.LocalDateTime;
/**
 * @author wuzhenyong
 * ClassName:NettyClientHandler.java
 * date:2022-04-29 10:29
 * Description:
 */
public class NettyClientHandler extends SimpleChannelInboundHandler<Object> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("接收到客户端:" + ctx.channel().remoteAddress() + "发送消息:" + msg);
        ctx.writeAndFlush("客户端发送消息:" + "一个小浪吴啊 >>" + LocalDateTime.now());
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        // 关闭请求
        ctx.close();
    }
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.writeAndFlush("客户端连接初始化");
    }
}

自定义客户端初始化信息

package com.netty.client;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;
/**
 * @author wuzhenyong
 * ClassName:NettyClientInitializer.java
 * date:2022-04-29 10:28
 * Description:
 */
public class NettyClientInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        //声明管道
        ChannelPipeline pipeline = socketChannel.pipeline();
        //绑定自带的解码器,就是对二进制数据的解析工具,至于解码器构造方法的参数之后详细分析
        pipeline.addLast("lengthFieldBasedFrameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
        //编码器
        pipeline.addLast("lengthFieldPrepender", new LengthFieldPrepender(4));
        //由于涉及到服务端和客户端的字符串数据,需要绑定字符串的编解码
        pipeline.addLast("stringDecoder", new StringDecoder(CharsetUtil.UTF_8));
        pipeline.addLast("stringEncoder", new StringEncoder(CharsetUtil.UTF_8));
        //自定义处理器
        pipeline.addLast("myClientHandler", new NettyClientHandler());
    }
}

客户端启动类

package com.netty.client;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.SneakyThrows;
import java.util.Scanner;
/**
 * @author wuzhenyong
 * ClassName:NettySocketClient.java
 * date:2022-04-29 10:26
 * Description:
 */
public class NettySocketClient {
    public static void main(String[] args) throws  Exception  {
        //客户端只需要一个线程组
        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
        try {
            //声明客户端启动类
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class).handler(new NettyClientInitializer());
            ChannelFuture channelFuture = bootstrap.connect("localhost", 8899).sync();
            // 三秒后关闭连接请求
            Thread.sleep(1000 * 3);
            channelFuture.channel().close().sync();
        } finally {
            //优雅关闭
            eventLoopGroup.shutdownGracefully();
        }
    }
}

启动服务端与客户端

服务端控制台打印

客户端启动后建立连接请求并发送一条信息,服务端打印日志

03f5cf8907ca4bf18ca87d47a67422b3.png


以下是客户端发送消息,服务端打印接收到消息日志

2fe6febdf4db45e2bd08bce436ea026e.png

客户端控制台打印

客户端接收到服务端的信息打印

c32938cba9e041438a1204c0499295d0.png

为什么服务端和客户端会一直打印消息?

1.服务端自定义处理器重新read0方法,此方法代码是接收到消息请求后会对发送消息的通道(也就是创建连接的客户端初始化时会发送一个连接请求,及发送一条“客户端连接初始化”信息)

2.然后服务端执行此方法,控制台输出接收到的消息后,又对发送消息的通道进行回复消息,所以客户端也会收到服务端的消息

a9fd3708b134440fb4aa6bbe859df15b.png

我们来看客户端的自定义处理器

1.①代码是建立连接请求执行,然后发送一条消息

2.服务端接收到消息后进行打印,然后也会写一条数据发送给客户端

3.客户端接收到消息进行打印,回再写一条数据进行回复

所以就会造成了死循环,而我在客户端启动类线程等待三秒结束连接请求

eb6bd0deb2684ca2a5e2a773f2c298df.png








以上服务端与客户端的聊天就结束了。

后端使用netty websocket通道,前端使用websocket进行实现私聊或者群发消息敬请期待下一篇。

感谢浏览。


相关文章
|
9月前
|
人工智能 Java API
MCP客户端调用看这一篇就够了(Java版)
本文详细介绍了MCP(Model Context Protocol)客户端的开发方法,包括在没有MCP时的痛点、MCP的作用以及如何通过Spring-AI框架和原生SDK调用MCP服务。文章首先分析了MCP协议的必要性,接着分别讲解了Spring-AI框架和自研SDK的使用方式,涵盖配置LLM接口、工具注入、动态封装工具等步骤,并提供了代码示例。此外,还记录了开发过程中遇到的问题及解决办法,如版本冲突、服务连接超时等。最后,文章探讨了框架与原生SDK的选择,认为框架适合快速构建应用,而原生SDK更适合平台级开发,强调了两者结合使用的价值。
12376 33
MCP客户端调用看这一篇就够了(Java版)
|
9月前
|
存储 网络协议 Java
Java获取客户端IP问题:返回127.0.0.1
总结:要解决Java获取客户端IP返回127.0.0.1的问题,首先要找出原因,再采取合适的解决方案。请参考上述方案来改进代码,确保在各种网络环境下都能正确获取客户端IP地址。希望本文对您有所帮助。
603 25
|
网络协议 前端开发
netty的TCP服务端和客户端实现
本文介绍了使用Netty框架实现TCP服务端和客户端的步骤,包括添加Netty依赖、编写服务端和客户端的代码,涉及NioEventLoopGroup、ServerBootstrap、Bootstrap、ChannelInitializer等核心组件,以及如何启动服务端监听和客户端连接。
1064 4
|
Java
探索Java新境界!异步+事件驱动,打造响应式编程热潮,未来已来!
【8月更文挑战第30天】在现代软件开发中,系统响应性和可扩展性至关重要。Java作为主流编程语言,提供了多种机制如Future、CompletableFuture及事件驱动编程,有效提升应用性能。本文探讨Java异步编程模型与事件驱动编程,并介绍响应式模式,助您构建高效、灵活的应用程序。
204 3
|
监控 Java
java异步判断线程池所有任务是否执行完
通过上述步骤,您可以在Java中实现异步判断线程池所有任务是否执行完毕。这种方法使用了 `CompletionService`来监控任务的完成情况,并通过一个独立线程异步检查所有任务的执行状态。这种设计不仅简洁高效,还能确保在大量任务处理时程序的稳定性和可维护性。希望本文能为您的开发工作提供实用的指导和帮助。
461 17
|
JSON NoSQL Java
redis的java客户端的使用(Jedis、SpringDataRedis、SpringBoot整合redis、redisTemplate序列化及stringRedisTemplate序列化)
这篇文章介绍了在Java中使用Redis客户端的几种方法,包括Jedis、SpringDataRedis和SpringBoot整合Redis的操作。文章详细解释了Jedis的基本使用步骤,Jedis连接池的创建和使用,以及在SpringBoot项目中如何配置和使用RedisTemplate和StringRedisTemplate。此外,还探讨了RedisTemplate序列化的两种实践方案,包括默认的JDK序列化和自定义的JSON序列化,以及StringRedisTemplate的使用,它要求键和值都必须是String类型。
redis的java客户端的使用(Jedis、SpringDataRedis、SpringBoot整合redis、redisTemplate序列化及stringRedisTemplate序列化)
|
存储 Java API
Java实现导出多个excel表打包到zip文件中,供客户端另存为窗口下载
Java实现导出多个excel表打包到zip文件中,供客户端另存为窗口下载
1018 4
|
分布式计算 Java Hadoop
Hadoop-30 ZooKeeper集群 JavaAPI 客户端 POM Java操作ZK 监听节点 监听数据变化 创建节点 删除节点
Hadoop-30 ZooKeeper集群 JavaAPI 客户端 POM Java操作ZK 监听节点 监听数据变化 创建节点 删除节点
285 1
|
JavaScript Java 中间件
Java CompletableFuture 异步超时实现探索
本文探讨了在JDK 8中`CompletableFuture`缺乏超时中断任务能力的问题,提出了一种异步超时实现方案,通过自定义工具类模拟JDK 9中`orTimeout`方法的功能,解决了任务超时无法精确控制的问题,适用于多线程并行执行优化场景。
419 0
JAVA并发编程系列(13)Future、FutureTask异步小王子
本文详细解析了Future及其相关类FutureTask的工作原理与应用场景。首先介绍了Future的基本概念和接口方法,强调其异步计算特性。接着通过FutureTask实现了一个模拟外卖订单处理的示例,展示了如何并发查询外卖信息并汇总结果。最后深入分析了FutureTask的源码,包括其内部状态转换机制及关键方法的实现原理。通过本文,读者可以全面理解Future在并发编程中的作用及其实现细节。