netty案例,netty4.1中级拓展篇十一《Netty基于ChunkedStream数据流切块传输》

简介: 在Netty这种异步NIO框架的结构下,服务端与客户端通信过程中,高效、频繁、大量的写入大块数据时,因网络传输饱和的可能性就会造成数据处理拥堵、GC频繁、用户掉线的可能性。那么由于写操作是非阻塞的,所以即使没有写出所有的数据,写操作也会在完成时返回并通知ChannelFuture。当这种情况发生时,如果仍然不停地写入,就有内存耗尽的风险。所以在写大块数据时,需要对大块数据进行切割发送处理。

27.jpg

前言介绍

在Netty这种异步NIO框架的结构下,服务端与客户端通信过程中,高效、频繁、大量的写入大块数据时,因网络传输饱和的可能性就会造成数据处理拥堵、GC频繁、用户掉线的可能性。那么由于写操作是非阻塞的,所以即使没有写出所有的数据,写操作也会在完成时返回并通知ChannelFuture。当这种情况发生时,如果仍然不停地写入,就有内存耗尽的风险。所以在写大块数据时,需要对大块数据进行切割发送处理。

https://netty.io/4.0/api/io/netty/handler/stream/ChunkedStream.html

ChunkedInput 的实现

ChunkedFile 从文件中逐块获取数据,当你的平台不支持零拷贝或者你需要转换数据时使用

ChunkedNioFile 和ChunkedFile 类似,只是它使用了FileChannel

ChunkedStream 从InputStream 中逐块传输内容

ChunkedNioStream 从ReadableByteChannel 中逐块传输内容

开发环境

1、jdk1.8【jdk1.7以下只能部分支持netty】

2、Netty4.1.36.Final【netty3.x 4.x 5每次的变化较大,接口类名也随着变化】

3、NetAssist 网络调试助手[获取:关注公众号:bugstack虫洞栈 | 回复;NetAssist+邮箱]

代码示例

itstack-demo-netty-2-11
└── src
    ├── main
    │   └── java
    │       └── org.itstack.demo.netty.server
    │           ├── MyChannelInitializer.java
    │           ├── MyServerChunkHandler.java
    │           ├── MyServerHandler.java
    │           └── NettyServer.java
    └── test
         └── java
             └── org.itstack.demo.test
                 └── ApiTest.java

重点代码块讲解,完整代码,关注公众号:bugstack虫洞栈 | 回复Netty源码,获取

MyChannelInitializer.java | 添加流量分块功能

/**
 * 虫洞栈:https://bugstack.cn
 * 公众号:bugstack虫洞栈  {获取学习源码}
 * Create by fuzhengwei on 2019
 */
public class MyChannelInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel channel) {
        // 基于换行符号
        channel.pipeline().addLast(new LineBasedFrameDecoder(1024));
        // 流量分块
        channel.pipeline().addLast(new ChunkedWriteHandler());
        channel.pipeline().addLast(new MyServerChunkHandler());
        // 解码转String,注意调整自己的编码格式GBK、UTF-8
        channel.pipeline().addLast(new StringDecoder(Charset.forName("GBK")));
        // 解码转String,注意调整自己的编码格式GBK、UTF-8
        channel.pipeline().addLast(new StringEncoder(Charset.forName("GBK")));
        // 在管道中添加我们自己的接收数据实现方法
        channel.pipeline().addLast(new MyServerHandler());
    }
}

MyServerChunkHandler.java | 流量分块实现ChunkedStream(in, 10)

/**
 * 虫洞栈:https://bugstack.cn
 * 公众号:bugstack虫洞栈 | 欢迎关注并获取专题&源码
 * Create by fuzhengwei on 2019
 */
public class MyServerChunkHandler extends ChannelOutboundHandlerAdapter {
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        //内容验证
        if (!(msg instanceof ByteBuf)) {
            super.write(ctx, msg, promise);
            return;
        }
        //获取Byte
        ByteBuf buf = (ByteBuf) msg;
        byte[] data = this.getData(buf);
        //写入流中
        ByteInputStream in = new ByteInputStream();
        in.setBuf(data);
        //消息分块;10个字节,测试过程中可以调整
        ChunkedStream stream = new ChunkedStream(in, 10);
        //管道消息传输承诺
        ChannelProgressivePromise progressivePromise = ctx.channel().newProgressivePromise();
        progressivePromise.addListener(new ChannelProgressiveFutureListener() {
            @Override
            public void operationProgressed(ChannelProgressiveFuture future, long progress, long total) throws Exception {
            }
            @Override
            public void operationComplete(ChannelProgressiveFuture future) throws Exception {
                if (future.isSuccess()) {
                    System.out.println("消息发送成功 success");
                    promise.setSuccess();
                } else {
                    System.out.println("消息发送失败 failure:" + future.cause());
                    promise.setFailure(future.cause());
                }
            }
        });
        ReferenceCountUtil.release(msg);
        ctx.write(stream, progressivePromise);
    }
    //获取Byte
    private byte[] getData(ByteBuf buf) {
        if (buf.hasArray()) {
            return buf.array().clone();
        }
        byte[] data = new byte[buf.readableBytes() - 1];
        buf.readBytes(data);
        return data;
    }
}

测试结果

启动服务端NettyServer

itstack-demo-netty server start done. {关注公众号:bugstack虫洞栈,获取源码}
链接报告开始
链接报告信息:有一客户端链接到本服务端
链接报告IP:127.0.0.1
链接报告Port:7397
链接报告完毕
消息发送成功 success
2019-09-15 16:36:04 接收到消息:hi 微信公众号:bugstack虫洞栈 | 欢迎关注并获取专题文章和源码
消息发送成功 success
2019-09-15 16:36:04 接收到消息:
消息发送成功 success
Process finished with exit code -1

启动NetAssist网络调试助手 | 发送测试消息[结尾加换行]

hi 微信公众号:bugstack虫洞栈 | 欢迎关注并获取专题文章和源码

28.jpg

目录
相关文章
|
Java
Netty传输object并解决粘包拆包问题
Netty传输object并解决粘包拆包问题
210 0
|
前端开发 网络协议 Java
Netty | 工作流程图分析 & 核心组件说明 & 代码案例实践
Netty | 工作流程图分析 & 核心组件说明 & 代码案例实践
1161 1
|
网络协议
由浅入深Netty聊天室案例
由浅入深Netty聊天室案例
172 0
|
消息中间件 分布式计算 NoSQL
由浅入深Netty入门案例
由浅入深Netty入门案例
215 0
Netty入门到超神系列-聊天室案例
对于服务端而言需要做如下事情 selector监听客户端的链接 如果有“读”事件,就从通道读取数据 把数据转发给其他所有的客户端,要过滤掉发消息过来的客户端不用转发 对于客户端而言需要做如下事情 selector监听服务端的“读”事件 如果有数据从通道中读取数据,打印到控制台 监听键盘输入,向服务端发送消息
224 0
|
机器学习/深度学习 存储 Java
【Netty】Netty 入门案例分析 ( Netty 模型解析 | Netty 服务器端代码 | Netty 客户端代码 )
【Netty】Netty 入门案例分析 ( Netty 模型解析 | Netty 服务器端代码 | Netty 客户端代码 )
359 0
【Netty】Netty 入门案例分析 ( Netty 模型解析 | Netty 服务器端代码 | Netty 客户端代码 )
|
Java Maven
【Netty】Netty 入门案例分析 ( Netty 线程模型 | Netty 案例需求 | IntelliJ IDEA 项目导入 Netty 开发库 )(二)
【Netty】Netty 入门案例分析 ( Netty 线程模型 | Netty 案例需求 | IntelliJ IDEA 项目导入 Netty 开发库 )(二)
639 0
【Netty】Netty 入门案例分析 ( Netty 线程模型 | Netty 案例需求 | IntelliJ IDEA 项目导入 Netty 开发库 )(二)
|
Java
【Netty】Netty 入门案例分析 ( Netty 线程模型 | Netty 案例需求 | IntelliJ IDEA 项目导入 Netty 开发库 )(一)
【Netty】Netty 入门案例分析 ( Netty 线程模型 | Netty 案例需求 | IntelliJ IDEA 项目导入 Netty 开发库 )(一)
559 0
【Netty】Netty 入门案例分析 ( Netty 线程模型 | Netty 案例需求 | IntelliJ IDEA 项目导入 Netty 开发库 )(一)
|
Java Maven
Netty之入门案例
前面给大家介绍了NIO,我们会发现用NIO实现异步非阻塞的网络通信代码量非常大,而且并不是很好理解,在实际的开发中一般我们也都是会实现基于NIO的框架来操作的,比如Netty,这样开发效率有高而且Bug也少。
Netty之入门案例
|
9月前
|
算法 Java 容器
Netty源码—4.客户端接入流程
本文主要介绍了关于Netty客户端连接接入问题整理、Reactor线程模型和服务端启动流程、Netty新连接接入的整体处理逻辑、新连接接入之检测新连接、新连接接入之创建NioSocketChannel、新连接接入之绑定NioEventLoop线程、新连接接入之注册Selector和注册读事件、注册Reactor线程总结、新连接接入总结