Netty 粘包-阿里云开发者社区

开发者社区> 阿里云MVP> 正文
登录阅读全文

Netty 粘包

简介: tcp是一个“流”的协议,一个完整的包可能会被TCP拆分成多个包进行发送,也可能把小的封装成一个大的数据包发送,这就是所谓的TCP粘包和拆包问题。

粘包、拆包问题说明

假设客户端分别发送数据包D1和D2给服务端,由于服务端一次性读取到的字节数是不确定的,所以可能存在以下4种情况。
1.服务端分2次读取到了两个独立的包,分别是D1,D2,没有粘包和拆包;
2.服务端一次性接收了两个包,D1和D2粘在一起了,被成为TCP粘包;
3.服务端分2次读取到了两个数据包,第一次读取到了完整的D1和D2包的部分内容,第二次读取到了D2包的剩余内容,这被称为拆包;
4.服务端分2次读取到了两个数据包,第一次读取到了部分D1,第二次读取D1剩余的部分和完整的D2包;

Netty 重现TCP粘包

简单的echo服务实现,client发送一条数据,server显示一条数据

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

public class EchoClient {

    private ChannelFuture m_future = null;

    public void send(String echo) {
        ChannelFuture future = m_future;
        ByteBuf buf = PooledByteBufAllocator.DEFAULT.buffer(1 * 1024); // 1K
        buf.writeBytes(echo.getBytes());
        Channel channel = future.channel();
        channel.writeAndFlush(buf);
    }

    public void closeChannel() {
        try {
            if (m_future != null && m_future.channel().remoteAddress() != null) {
                System.out.println("close channel " + m_future.channel().remoteAddress());
                m_future.channel().closeFuture();
            }
        } catch (Exception ignore) {
        }
    }

    public void connect(int port, String host) throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {
                    }
                });

            m_future = b.connect(host, port).sync();

        } catch (Throwable t) {
            t.printStackTrace();
        }
    }

    public static void main(String[] args) throws Exception {
        int port = 8080;
        EchoClient echo = new EchoClient();
        echo.connect(port, "127.0.0.1");
        for (int i = 0; i < 100; i++)
            echo.send("Hello World!");
        Thread.sleep(100);
        echo.closeChannel();
    }
}

server端实现


import java.io.UnsupportedEncodingException;
import java.util.List;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.ByteToMessageDecoder;

public class EchoServer {

    public void bind(int port) throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 1024)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel channel) throws Exception {
                        channel.pipeline().addLast(new EchoServerHandler());
                    }
                });
            b.bind(port).sync();

        }  catch (Throwable t) {
                t.printStackTrace();
       }
    }

    public class EchoServerHandler extends ChannelInboundHandlerAdapter {

        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            ByteBuf buf = (ByteBuf) msg;
            byte[] req = new byte[buf.readableBytes()];
            buf.readBytes(req);
            String body = new String(req, "UTF-8");
            System.out.println("echo  : " + body);
        }

        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            ctx.flush();
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            ctx.close();
        }
    }

    public static void main(String[] args) throws Exception {
        int port = 8080;
        if (args != null && args.length > 0) {
            try {
                port = Integer.valueOf(args[0]);
            } catch (NumberFormatException e) {
                // 采用默认值
            }
        }
        new EchoServer().bind(port);
    }
}

client一共发送了100次”hello world!”,我们希望是server端同样显示100次”hello world!”,但是,只显示了2次(每次执行大概率不会相同):

echo  : Hello World!Hello World!Hello World!Hello World!Hello World!Hello World!Hello World!Hello World!Hello World!Hello World!Hello World!Hello World!Hello World!Hello World!Hello World!Hello World!Hello World!Hello World!Hello World!Hello World!Hello World!Hello World!Hello World!Hello World!Hello World!Hello World!Hello World!Hello World!Hello World!Hello World!Hello World!Hello World!Hello World!Hello World!Hello World!Hello World!Hello World!Hello World!Hello World!Hello World!Hello World!Hello World!Hello World!Hello World!Hello World!Hello World!Hello World!Hello World!Hello World!Hello World!Hello World!Hello World!Hello World!Hello World!Hello World!Hello World!Hello World!Hello World!Hello World!Hello World!Hello World!Hello World!Hello World!Hello World!Hello World!Hello World!Hello World!Hello World!Hello World!Hello World!Hello World!Hello World!Hello World!Hello World!Hello World!Hello World!Hello World!Hello World!Hello World!Hello World!Hello World!Hello World!Hello World!Hello World!Hello World!Hell
echo  : o World!Hello World!Hello World!Hello World!Hello World!Hello World!Hello World!Hello World!Hello World!Hello World!Hello World!Hello World!Hello World!Hello World!Hello World!

Netty Decoder

所谓TCP粘包,就是一条message,比如10K大小的,需要拆分成多个TCP包发送到服务端,Netty必须收齐所有的包,把它们粘在一起,才能开始解码,但是Netty不知道什么时候算收齐一个message。

所以说最重要的是你要告诉netty,我的一个message已经收完了,可以进行下一步的处理了

LineBasedFrameDecoder

这是一个一换行符为界限的解码器,如果有换行我们就把他标记为结束位置,这样就组成一个独立的包。我们可以设置最大长度。如果连续读取到最大长度后没有发现换行符,就会抛出异常,同时忽略掉之前读到的异常码流。

这个Decoder明确告诉Netty,读到换行符,我的一个message就收完了,可以进行下一步的处理了

所以如果要解决刚才的粘包问题,只要在EchoServer的initChannel中最前面加入下面代码,并且在EchoClient发送”Hello World!”,变为”Hello World!n”就可以了

channel.pipeline().addLast(new LineBasedFrameDecoder(1024)); 
channel.pipeline().addLast(new EchoServerHandler());

还有很多其他很有用的Decoder,
DelimiterBasedFrameDecoder 允许我们设置特定的分隔符用来分隔;
FixedLengthFrameDecoder 固定长度的解码器;
StringDecoder 直接将流转化为String;

自定义的Decoder

其实上面的那些Decoder只在特殊情况下才能用到,一般工程项目中,传输的都是自定义的对象,那就需要自定义编码解码规则了。

我现在使用的方法:每个对象自定义一个Decoder,进行encode,decode,在encode时,前4个字节定义消息的长度。客户端每次发送encode对象后的数据。服务端解码,首先判断传输过来的数据是否超过4字节,如果没有,返回,等待下一次decode。如果超过4字节,取出4字节的值,获取消息的总长度,如果此时收到的消息还不够(消息的总长度+4),那么返回,等待下一次的decode。如果此时收到的消息已经超过(消息的总长度+4),那么就获取(消息的总长度+4),获取到我需要的消息,执行后面的业务处理。

作者:glowd

原文:https://blog.csdn.net/zengqiang1/article/details/70160407

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

分享:
+ 订阅

阿里云最有价值专家,是专注于帮助他人充分了解和使用阿里云技术的意见领袖。

官方博客
官网链接
精彩专题