Netty各组件基本用法、入站和出站详情、群聊系统的实现、粘包和拆包

简介: Netty各组件基本用法、入站和出站详情、群聊系统的实现、粘包和拆包


Bootstrap和ServerBootstrap

Bootstrap是Netty的启动程序,一个Netty应用通常由一个Bootstrap开始。Bootstrap的主要作用是配置Netty程序,串联Netty的各个组件。




Future和ChannelFuture


这个方法是异步的(交给别的线程去执行该任务),当执行到这之后,netty不一定启动了

ChannelFuture channelFuture = bootstrap.bind(9090);
channelFuture.addListener(new ChannelFutureListener() {
    @Override
    public void operationComplete(ChannelFuture funture) throws Exception {
        if(funture.isSuccess()){
            System.out.println("监听9090成功");
        }else{
            System.out.println("监听9090失败");
        }
    }
});

该方法可以知晓有没有启动成功,或者改为同步的方式
ChannelFuture channelFuture = bootstrap.bind(9090).sync();


ChannelFuture和Future的子类,提供了针对于Channel的异步监听操作


Channel

NioSocketChannel:异步的客户端才CP Socket连接通道
NioServerSocketChannel:异步的服务端TCP Socket连接通道
NioDatagramChannel:异步的UDP连接通道
NioSctpChannel:异步的客户端Sctp连接通道
NioSctpServerChannel:异步的服务端Sctp连接通道


Selector

通过Selector多路复用器实现IO的多路复用。Selector可以监听多个连接的Channel事件,同时可以不断的查询已注册Channel是否处于就绪状态,实现一个线程可以高效的管理多个Channel。


NioEventLoop和NioEventLoopGroup

NioEventLoop本质就是一个线程,一个NioEventLoop就对应一个线程,但可以达到异步的处理事务,因为NioEventLoop内部维护了一个异步任务队列,用于存储需要在事件循环中执行的任务。通过将任务添加到队列中,NioEventLoop可以在空闲时间执行这些任务,从而实现了异步提交事务的能力。
NioEventLoopGroup管理NioEventLoop的生命周期,可以理解为是线程池,内部维护了一组线程。每个线程(即NioEventLoop)负责处理多个Channel上的事件。注意,一个Channel只对应一个线程,NioEventLoop和Channel它们是一对多的关系。
一个线程可以管理多个channel,但一个channel只能被一个线程执行


ByteBuf

初始情况


写入数据


读取数据


已读的区域:[0,readerIndex]

可读的区域:[readIndex,writeIndex)

可写的区域:[writeIndex,capacity)

示例代码

public class ByteBufDemo {
    public static void main(String[] args) {
        //创建一个有10个容量数据的ByteBuf对象
        ByteBuf buf = Unpooled.buffer(10);
        System.out.println("init buf:"+buf);
        //添加数据
        for(int i = 0;i<5;i++){
            buf.writeByte(i);
        }
        System.out.println("after write:"+buf);
        //按照索引读取数据
        for(int i = 0;i<3;i++){
            System.out.println(buf.getByte(i));
        }
        System.out.println("after get:"+buf);
        //读取数据
        for(int i = 0;i<3;i++){
            System.out.println(buf.readByte());
        }
        System.out.println("after read:"+buf);
    }
}

Channel相关组件


ChannelHandler

ChannelHandler用于处理拦截IO事件,往往在ChannelHandler中可以加入业务处理逻辑。ChannelHandler执行完后会将io事件转发到ChannelPipeline中的下一个处理程序。

ChannelHandlerContext

保存Channel相关的上下文信息,并关联一个ChannelHandler对象。

ChannelPipeline

ChannelPipeline是一个双向链表,其中保存着多个ChannelHandler。ChannelPipeline实现了一种高级形式的过滤器模式,在IO操作时发生的入站和出站事件,会导致ChannelPipeline中的多个ChannelHandler被依次调用。




入站详情



现在我们的客户端和服务端之间就有三个拦截器
我们在NettyServerHandler里面收到信息就不用解码了,为什么,因为解码器的拦截器已经帮我们做好了


当我们服务端读数据的时候,会从客户端读数据==入站,因为解码的handler和业务处理的handler是入站拦截器,所以会对数据产生作用,但编码的handler不会,因为它是一个出站handler


出站详情

站在服务端的立场

在Netty中,客户端和服务端的addLast方法有一些不同之处。具体来说,它们的区别如下:

1. 顺序:当调用addLast方法添加处理器时,它们的顺序略有不同。对于客户端来说,添加的处理器是按照添加的顺序进行顺序执行的,即先添加的处理器先执行。而对于服务端来说,添加的处理器是按照逆序执行的,即先添加的处理器后执行。
2. 作用对象:客户端的addLast方法主要作用于Outbound事件,用于处理从客户端发送到服务端的请求。而服务端的addLast方法主要作用于Inbound事件,用于处理从服务端接收到的请求。

3. 处理逻辑:客户端和服务端的addLast方法所添加的处理器,通常具有不同的处理逻辑。客户端的处理器通常用于编码请求、发送请求等操作。服务端的处理器通常用于解码请求、处理请求、编码响应等操作。


对象编解码

对象编码器


对象解码器




ProtoBuf和ProtoStuff

为了编解码提升性能,可以使用Protobuf域者Protpstuff对数据进行序列话和反序列化,效率更高。

第一步:导入依赖

<dependency>
    <groupId>com.dyuproject.protostuff</groupId>
    <artifactId>protostuff-api</artifactId>
    <version>1.0.10</version>
</dependency>
<dependency>
    <groupId>com.dyuproject.protostuff</groupId>
    <artifactId>protostuff-core</artifactId>
    <version>1.0.10</version>
</dependency>
<dependency>
    <groupId>com.dyuproject.protostuff</groupId>
    <artifactId>protostuff-runtime</artifactId>
    <version>1.0.11</version>
</dependency>

第二步:引入工具类

public class ProtostuffUtils {
    /**
     * 避免每次序列化都重新申请Buffer空间
     */
    private static LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
    /**
     * 缓存Schema
     */
    private static Map<Class<?>, Schema<?>> schemaCache = new ConcurrentHashMap<>();
    /**
     * 序列化方法,把指定对象序列化成字节数组
     *
     * @param obj
     * @param <T>
     * @return
     */
    @SuppressWarnings("unchecked")
    public static <T> byte[] serialize(T obj) {
        Class<T> clazz = (Class<T>) obj.getClass();
        Schema<T> schema = getSchema(clazz);
        byte[] data;
        try {
            data = ProtostuffIOUtil.toByteArray(obj, schema, buffer);
        } finally {
            buffer.clear();
        }
        return data;
    }
    /**
     * 反序列化方法,将字节数组反序列化成指定Class类型
     *
     * @param data
     * @param clazz
     * @param <T>
     * @return
     */
    public static <T> T deserialize(byte[] data, Class<T> clazz) {
        Schema<T> schema = getSchema(clazz);
        T obj = schema.newMessage();
        ProtostuffIOUtil.mergeFrom(data, obj, schema);
        return obj;
    }
    @SuppressWarnings("unchecked")
    private static <T> Schema<T> getSchema(Class<T> clazz) {
        Schema<T> schema = (Schema<T>) schemaCache.get(clazz);
        if (Objects.isNull(schema)) {
            //这个schema通过RuntimeSchema进行懒创建并缓存
            //所以可以一直调用RuntimeSchema.getSchema(),这个方法是线程安全的
            schema = RuntimeSchema.getSchema(clazz);
            if (Objects.nonNull(schema)) {
                schemaCache.put(clazz, schema);
            }
        }
        return schema;
    }
}

netty实现群聊系统

服务端基本代码

//群聊系统的服务器
public class ChatServer {
    public static void main(String[] args) throws Exception {
        EventLoopGroup boosGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        //配置参数
        serverBootstrap.group(boosGroup,workerGroup)
                .channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG,1024)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        //获得pipleline
                        ChannelPipeline pipeline = ch.pipeline();
                        //添加handler
                        pipeline.addLast(new StringDecoder());
                        pipeline.addLast(new StringEncoder());
                        //添加业务处理handler
                        pipeline.addLast(new ChatServerHandler());
                    }
                });
        System.out.println("聊天室启动了...");
        ChannelFuture channelFuture = serverBootstrap.bind(9090).sync();
        channelFuture.channel().closeFuture().sync();
        boosGroup.shutdownGracefully();
        workerGroup.shutdownGracefully();
    }
}

服务端业务代码

public class ChatServerHandler extends SimpleChannelInboundHandler<String>{
    private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    //存放Channel的容器,而且还可以执行对每个channel执行的任务
    private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    //有客户端上线了
    //有新的客户端连接了,将该客户端的上线信息广播给其它所有客户端
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        //得到客户端的channel
        Channel channel = ctx.channel();
        String message = "客户端-"+channel.remoteAddress()+"于"+sdf.format(new Date())+"上线了\n";
        //得到其它客户端的channel向其它客户端发送该客户端的channel
        channelGroup.writeAndFlush(message);
        //加入到channelGroup中
        channelGroup.add(channel);
    }
    /*
    * 客户端下线则广播给其它客户端*/
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        //生成一个下线的信息
        String message = "客户端-"+channel.remoteAddress()+"于"+sdf.format(new Date())+"下线了\n";
        //广播给其它客户端
        channelGroup.writeAndFlush(message);
    }
    /*
    *具体读数据的业务 */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        //获得当前发消息的客户端channel
        Channel channel =   ctx.channel();
        //遍历所有的channel
        channelGroup.forEach(ch->{
            if(channel!=ch){
                ch.writeAndFlush("客户端-"+channel.remoteAddress()+"于"+sdf.format(new Date())+"说:"+
                        msg+"\n");
            }else{
                ch.writeAndFlush("我于"+sdf.format(new Date())+"说:"+msg+"\n");
            }
        });
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        super.exceptionCaught(ctx, cause);
    }
}

客户端基本代码

public class ChatClient {
    public static void main(String[] args) throws Exception {
        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(eventLoopGroup)
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ChannelPipeline pipeline = ch.pipeline();
                        pipeline.addLast(new StringDecoder());
                        pipeline.addLast(new StringEncoder());
                        pipeline.addLast(new ChatClientHandler());
                    }
                });
        //发送消息
        ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9090).sync();
        Channel channel = channelFuture.channel();
        System.out.println("欢迎进入Yc聊天室");
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNextLine()){
            String message = scanner.nextLine();
            channel.writeAndFlush(message);
        }
        eventLoopGroup.shutdownGracefully();
    }
}

客户端业务代码

public class ChatClientHandler extends SimpleChannelInboundHandler<String> {
    //打印在控制台
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
        System.out.println(s);
    }
}

粘包和拆包

TCP协议特点

作为一个流式传输协议,数据在TCP中传输是没有边界的。就是说,客户端发送的多条数据,有可能会被认为是一条数据。或者,客户端发送的一条数据,有可能会被分成多条数据。这是由于TCP协议并不了解上层业务数据的具体含义,在使用TCP协议传输数据时,是根据TCP缓冲区的实际情况进行数据包的划分。

举个例子

我们要发两句话
我是杨 他是李
可能别人收到的信息就是我是杨他是李一条数据,也可能收到我是 杨他是李这两句话
假设我们这有个客户端


发送两百次消息


就可能得到这样的结果
粘包:缓冲区还可以放的下 拆包:缓冲区不可以放的下(乱码发生的原因是因为一个字的字节放在不同缓冲区内发送)

目录
相关文章
|
3月前
|
编解码 网络协议 开发者
Netty运行原理问题之NettyTCP的粘包和拆包的问题如何解决
Netty运行原理问题之NettyTCP的粘包和拆包的问题如何解决
|
4月前
|
消息中间件 缓存 算法
基于Netty的自研流系统缓存实现挑战: 内存碎片与OOM困境
基于Netty的自研流系统缓存实现挑战: 内存碎片与OOM困境
56 1
基于Netty的自研流系统缓存实现挑战: 内存碎片与OOM困境
|
3月前
|
前端开发 网络协议
Netty实战巅峰:从零构建高性能IM即时通讯系统,解锁并发通信新境界
【8月更文挑战第3天】Netty是一款高性能、异步事件驱动的网络框架,适用于开发高并发网络应用,如即时通讯(IM)系统。本文将指导你利用Netty从零构建高性能IM程序,介绍Netty基础及服务器/客户端设计。服务器端使用`ServerBootstrap`启动,客户端通过`Bootstrap`连接服务器。示例展示了简单的服务器启动过程。通过深入学习,可进一步实现用户认证等功能,打造出更完善的IM系统。
157 1
|
3月前
|
移动开发 网络协议 算法
(十)Netty进阶篇:漫谈网络粘包、半包问题、解码器与长连接、心跳机制实战
在前面关于《Netty入门篇》的文章中,咱们已经初步对Netty这个著名的网络框架有了认知,本章的目的则是承接上文,再对Netty中的一些进阶知识进行阐述,毕竟前面的内容中,仅阐述了一些Netty的核心组件,想要真正掌握Netty框架,对于它我们应该具备更为全面的认知。
198 2
|
5月前
|
网络协议
netty粘包问题分析
netty粘包问题分析
42 0
|
5月前
|
Java
Netty传输object并解决粘包拆包问题
Netty传输object并解决粘包拆包问题
49 0
|
5月前
|
Java
Netty中粘包拆包问题解决探讨
Netty中粘包拆包问题解决探讨
36 0
|
存储 缓存 NoSQL
跟着源码学IM(十一):一套基于Netty的分布式高可用IM详细设计与实现(有源码)
本文将要分享的是如何从零实现一套基于Netty框架的分布式高可用IM系统,它将支持长连接网关管理、单聊、群聊、聊天记录查询、离线消息存储、消息推送、心跳、分布式唯一ID、红包、消息同步等功能,并且还支持集群部署。
13501 1
|
6月前
|
消息中间件 Oracle Dubbo
Netty 源码共读(一)如何阅读JDK下sun包的源码
Netty 源码共读(一)如何阅读JDK下sun包的源码
131 1
|
11月前
|
NoSQL Java Redis
跟着源码学IM(十二):基于Netty打造一款高性能的IM即时通讯程序
关于Netty网络框架的内容,前面已经讲了两个章节,但总归来说难以真正掌握,毕竟只是对其中一个个组件进行讲解,很难让诸位将其串起来形成一条线,所以本章中则会结合实战案例,对Netty进行更深层次的学习与掌握,实战案例也并不难,一个非常朴素的IM聊天程序。 原本打算做个多人斗地主练习程序,但那需要织入过多的业务逻辑,因此一方面会带来不必要的理解难度,让案例更为复杂化,另一方面代码量也会偏多,所以最终依旧选择实现基本的IM聊天程序,既简单,又能加深对Netty的理解。
162 1