【Netty】NIO 网络编程 聊天室案例(二)

简介: 【Netty】NIO 网络编程 聊天室案例(二)

三、 NIO 聊天室 客户端 代码分析


客户端的连接与数据接收 : 客户端的工作是连接服务器 , 得到与服务器通信的 套接字通道 ( SocketChannel ) , 注册该通道到 选择器 ( Selector ) , 监听 SelectionKey.OP_READ 读取数据事件 , 接收到数据后显示即可 ;




1 . 连接服务器 : 连接服务器 , 并设置网络通信非阻塞模式 ;


// 创建并配置 服务器套接字通道 ServerSocketChannel
socketChannel = SocketChannel.open(new InetSocketAddress(SERVER_ADDRESS, PORT));
socketChannel.configureBlocking(false);


2 . 获取选择器并注册通道 : 获取 选择器 ( Selector ) , 并将 套接字通道 ( SocketChannel ) 注册给该选择器 ;


// 获取选择器, 并注册 服务器套接字通道 ServerSocketChannel
selector = Selector.open();
//注册通道 : 将 SocketChannel 通道注册给 选择器 ( Selector )
//关注事件 : 关注事件时读取事件, 服务器端从该通道读取数据
//关联缓冲区 :
socketChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(1024));



3 . 监听服务器端下发的消息 : 阻塞监听, 如果有事件触发, 返回触发的事件个数 ; 被触发的 SelectionKey 事件都存放到了 Set<SelectionKey> selectedKeys 集合中 ;


// 阻塞监听, 如果有事件触发, 返回触发的事件个数
// 被触发的 SelectionKey 事件都存放到了 Set<SelectionKey> selectedKeys 集合中
// 下面开始遍历上述 selectedKeys 集合
try {
    int eventTriggerCount = selector.select();
} catch (IOException e) {
    e.printStackTrace();
}



4 . 处理服务器端发送的数据 : 如果监听到服务器下发数据 , 开始遍历当前触发事件的通道 , 调用该通道读取数据到缓冲区 , 之后显示该数据 ;


// 处理事件集合 :
// 获取当前发生的事件的 SelectionKey 集合, 通过 SelectionKey 可以获取对应的 通道
Set<SelectionKey> keys = selector.selectedKeys();
// 使用迭代器迭代, 涉及到删除操作
Iterator<SelectionKey> keyIterator = keys.iterator();
while (keyIterator.hasNext()) {
    SelectionKey key = keyIterator.next();
    // 根据 SelectionKey 的事件类型, 处理对应通道的业务逻辑
    // 客户端写出数据到服务器端, 服务器端需要读取数据
    if (key.isReadable()) {
        // 获取 通道 ( Channel ) : 通过 SelectionKey 获取
        SocketChannel socketChannel = (SocketChannel) key.channel();
        // 获取 缓冲区 ( Buffer ) : 获取到 通道 ( Channel ) 关联的 缓冲区 ( Buffer )
        ByteBuffer byteBuffer = (ByteBuffer) key.attachment();
        String message = null;
        try {
            // 读取客户端传输的数据
            int readCount = socketChannel.read(byteBuffer);
            byte[] messageBytes = new byte[readCount];
            byteBuffer.flip();
            byteBuffer.get(messageBytes);
            // 处理读取的消息
            message = new String(messageBytes);
            byteBuffer.flip();
            System.out.println(String.format(message));
        } catch (IOException e) {
            //e.printStackTrace();
            // 客户端连接断开
            key.cancel();
            try {
                socketChannel.close();
            } catch (IOException ex) {
                ex.printStackTrace();
            }
        }// try
    }// if (key.isReadable())







四、 NIO 聊天室 服务器端 完整代码


package kim.hsl.nio.chat;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Set;
/**
 * 聊天室服务器端
 *
 * @author hsl
 * @date 2020-05-29 17:24
 */
public class Server {
    /**
     * 服务器监听的端口号
     */
    public static final int PORT = 8888;
    /**
     * 监听 ServerSocketChannel 通道和各个客户端对应的 SocketChannel 通道
     */
    private Selector selector;
    /**
     * 服务器端的套接字通道, 相当于 BIO 中的 ServerSocket
     */
    private ServerSocketChannel serverSocketChannel;
    /**
     * 初始化服务器相关操作
     */
    public Server() {
        initServerSocketChannelAndSelector();
    }
    /**
     * 初始化 服务器套接字通道 和
     */
    private void initServerSocketChannelAndSelector() {
        try {
            // 创建并配置 服务器套接字通道 ServerSocketChannel
            serverSocketChannel = ServerSocketChannel.open();
            serverSocketChannel.socket().bind(new InetSocketAddress(PORT));
            serverSocketChannel.configureBlocking(false);
            // 获取选择器, 并注册 服务器套接字通道 ServerSocketChannel
            selector = Selector.open();
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    /**
     * Selector 开始执行 监听工作
     */
    private void selectorStartSelectOperation() {
        System.out.println("服务器端启动监听 :");
        while (true) {
            // 阻塞监听, 如果有事件触发, 返回触发的事件个数
            // 被触发的 SelectionKey 事件都存放到了 Set<SelectionKey> selectedKeys 集合中
            // 下面开始遍历上述 selectedKeys 集合
            try {
                int eventTriggerCount = selector.select();
            } catch (IOException e) {
                e.printStackTrace();
            }
            // 当前状态说明 :
            // 如果能执行到该位置, 说明 selector.select() 方法返回值大于 0
            // 当前有 1 个或多个事件触发, 下面就是处理事件的逻辑
            // 处理事件集合 :
            // 获取当前发生的事件的 SelectionKey 集合, 通过 SelectionKey 可以获取对应的 通道
            Set<SelectionKey> keys = selector.selectedKeys();
            // 使用迭代器迭代, 涉及到删除操作
            Iterator<SelectionKey> keyIterator = keys.iterator();
            while (keyIterator.hasNext()) {
                SelectionKey key = keyIterator.next();
                // 根据 SelectionKey 的事件类型, 处理对应通道的业务逻辑
                // 客户端连接服务器, 服务器端需要执行 accept 操作
                if (key.isAcceptable()) {
                    //创建通道 : 为该客户端创建一个对应的 SocketChannel 通道
                    //不等待 : 当前已经知道有客户端连接服务器, 因此不需要阻塞等待
                    //非阻塞方法 : ServerSocketChannel 的 accept() 是非阻塞的方法
                    SocketChannel socketChannel = null;
                    try {
                        socketChannel = serverSocketChannel.accept();
                        //如果 ServerSocketChannel 是非阻塞的, 这里的 SocketChannel 也要设置成非阻塞的
                        //否则会报 java.nio.channels.IllegalBlockingModeException 异常
                        socketChannel.configureBlocking(false);
                        //注册通道 : 将 SocketChannel 通道注册给 选择器 ( Selector )
                        //关注事件 : 关注事件时读取事件, 服务器端从该通道读取数据
                        //关联缓冲区 :
                        socketChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(1024));
                        System.out.println(String.format("用户 %s 进入聊天室", socketChannel.getRemoteAddress()));
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
                // 客户端写出数据到服务器端, 服务器端需要读取数据
                if (key.isReadable()) {
                    // 获取 通道 ( Channel ) : 通过 SelectionKey 获取
                    SocketChannel socketChannel = (SocketChannel) key.channel();
                    // 获取 缓冲区 ( Buffer ) : 获取到 通道 ( Channel ) 关联的 缓冲区 ( Buffer )
                    ByteBuffer byteBuffer = (ByteBuffer) key.attachment();
                    String remoteAddress = null;
                    String message = null;
                    try {
                        // 读取客户端传输的数据
                        int readCount = socketChannel.read(byteBuffer);
                        byte[] messageBytes = new byte[readCount];
                        byteBuffer.flip();
                        byteBuffer.get(messageBytes);
                        // 处理读取的消息
                        message = new String(messageBytes);
                        //重置以便下次使用
                        byteBuffer.flip();
                        remoteAddress = socketChannel.getRemoteAddress().toString();
                        System.out.println(String.format("%s : %s", remoteAddress, message));
                    } catch (IOException e) {
                        //e.printStackTrace();
                        // 如果此处出现异常, 说明该客户端离线了, 服务器提示, 取消选择器上的注册信息, 关闭通道
                        try {
                            System.out.println( String.format("%s 用户离线 !", socketChannel.getRemoteAddress()) );
                            key.cancel();
                            socketChannel.close();
                            //继续下一次循环
                            continue;
                        } catch (IOException ex) {
                            ex.printStackTrace();
                        }
                    }
                    // 向其它客户端转发消息, 发送消息的客户端自己就不用再发送该消息了
                    // 遍历所有注册到 选择器 Selector 的 SocketChannel
                    Set<SelectionKey> selectionKeys = selector.keys();
                    for (SelectionKey selectionKey : selectionKeys) {
                        // 获取客户端对应的 套接字通道
                        // 这里不能强转成 SocketChannel, 因为这里可能存在 ServerSocketChannel
                        Channel channel = selectionKey.channel();
                        // 将自己排除在外, 注意这里是地址对比, 就是这两个类不能是同一个地址的类
                        // 这个类的类型必须是 SocketChannel, 排除之前注册的 ServerSocketChannel 干扰
                        if (socketChannel != channel && channel instanceof SocketChannel) {
                            // 将通道转为 SocketChannel, 之后将字符串发送到客户端
                            SocketChannel clientSocketChannel = (SocketChannel) channel;
                            // 写出字符串到其它客户端
                            try {
                                clientSocketChannel.write(ByteBuffer.wrap( ( remoteAddress + " : " + message ).getBytes()));
                            } catch (IOException e) {
                                //e.printStackTrace();
                                // 如果此处出现异常, 说明该客户端离线了, 服务器提示, 取消选择器上的注册信息, 关闭通道
                                try {
                                    System.out.println( String.format("%s 用户离线 !", clientSocketChannel.getRemoteAddress()) );
                                    selectionKey.cancel();
                                    clientSocketChannel.close();
                                } catch (IOException ex) {
                                    ex.printStackTrace();
                                }
                            }
                        }
                    }
                }
                // 处理完毕后, 当前的 SelectionKey 已经处理完毕
                // 从 Set 集合中移除该 SelectionKey
                // 防止重复处理
                keyIterator.remove();
            }
        }
    }
    public static void main(String[] args) {
        Server server = new Server();
        server.selectorStartSelectOperation();
    }
}







五、 NIO 聊天室 客户端 完整代码


package kim.hsl.nio.chat;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Scanner;
import java.util.Set;
public class Client {
    /**
     * 服务器地址
     */
    public final static String SERVER_ADDRESS = "127.0.0.1";
    /**
     * 服务器监听的端口号
     */
    public static final int PORT = 8888;
    /**
     * 监听 SocketChannel 通道的 选择器
     */
    private Selector selector;
    /**
     * 服务器端的套接字通道, 相当于 BIO 中的 ServerSocket
     */
    private SocketChannel socketChannel;
    public Client() {
        initClientSocketChannelAndSelector();
    }
    /**
     * 初始化 服务器套接字通道 和
     */
    private void initClientSocketChannelAndSelector() {
        try {
            // 创建并配置 服务器套接字通道 ServerSocketChannel
            socketChannel = SocketChannel.open(new InetSocketAddress(SERVER_ADDRESS, PORT));
            socketChannel.configureBlocking(false);
            // 获取选择器, 并注册 服务器套接字通道 ServerSocketChannel
            selector = Selector.open();
            //注册通道 : 将 SocketChannel 通道注册给 选择器 ( Selector )
            //关注事件 : 关注事件时读取事件, 服务器端从该通道读取数据
            //关联缓冲区 :
            socketChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(1024));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    /**
     * 向服务器端发送消息
     * @param message
     */
    public void sendMessageToServer(String message){
        try {
            socketChannel.write(ByteBuffer.wrap(message.getBytes()));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    public void readMessageFromServer(){
        // 阻塞监听, 如果有事件触发, 返回触发的事件个数
        // 被触发的 SelectionKey 事件都存放到了 Set<SelectionKey> selectedKeys 集合中
        // 下面开始遍历上述 selectedKeys 集合
        try {
            int eventTriggerCount = selector.select();
        } catch (IOException e) {
            e.printStackTrace();
        }
        // 当前状态说明 :
        // 如果能执行到该位置, 说明 selector.select() 方法返回值大于 0
        // 当前有 1 个或多个事件触发, 下面就是处理事件的逻辑
        // 处理事件集合 :
        // 获取当前发生的事件的 SelectionKey 集合, 通过 SelectionKey 可以获取对应的 通道
        Set<SelectionKey> keys = selector.selectedKeys();
        // 使用迭代器迭代, 涉及到删除操作
        Iterator<SelectionKey> keyIterator = keys.iterator();
        while (keyIterator.hasNext()) {
            SelectionKey key = keyIterator.next();
            // 根据 SelectionKey 的事件类型, 处理对应通道的业务逻辑
            // 客户端写出数据到服务器端, 服务器端需要读取数据
            if (key.isReadable()) {
                // 获取 通道 ( Channel ) : 通过 SelectionKey 获取
                SocketChannel socketChannel = (SocketChannel) key.channel();
                // 获取 缓冲区 ( Buffer ) : 获取到 通道 ( Channel ) 关联的 缓冲区 ( Buffer )
                ByteBuffer byteBuffer = (ByteBuffer) key.attachment();
                String message = null;
                try {
                    // 读取客户端传输的数据
                    int readCount = socketChannel.read(byteBuffer);
                    byte[] messageBytes = new byte[readCount];
                    byteBuffer.flip();
                    byteBuffer.get(messageBytes);
                    // 处理读取的消息
                    message = new String(messageBytes);
                    byteBuffer.flip();
                    System.out.println(String.format(message));
                } catch (IOException e) {
                    //e.printStackTrace();
                    // 客户端连接断开
                    key.cancel();
                    try {
                        socketChannel.close();
                    } catch (IOException ex) {
                        ex.printStackTrace();
                    }
                }// try
            }// if (key.isReadable())
            // 处理完毕后, 当前的 SelectionKey 已经处理完毕
            // 从 Set 集合中移除该 SelectionKey
            // 防止重复处理
            keyIterator.remove();
        }
    }
    public static void main(String[] args) {
        Client client = new Client();
        // 接收服务器端数据线程
        new Thread(new Runnable() {
            @Override
            public void run() {
                while (true){
                    //不停地从服务器端读取数据
                    client.readMessageFromServer();
                }
            }
        }).start();
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNextLine()){
            String message = scanner.nextLine();
            client.sendMessageToServer(message);
        }
    }
}






目录
相关文章
|
3月前
|
安全 算法 网络安全
网络安全与信息安全:构建数字世界的坚固防线在数字化浪潮席卷全球的今天,网络安全与信息安全已成为维系社会秩序、保障个人隐私和企业机密的关键防线。本文旨在深入探讨网络安全漏洞的本质、加密技术的前沿进展以及提升公众安全意识的重要性,通过一系列生动的案例和实用的建议,为读者揭示如何在日益复杂的网络环境中保护自己的数字资产。
本文聚焦于网络安全与信息安全领域的核心议题,包括网络安全漏洞的识别与防御、加密技术的应用与发展,以及公众安全意识的培养策略。通过分析近年来典型的网络安全事件,文章揭示了漏洞产生的深层原因,阐述了加密技术如何作为守护数据安全的利器,并强调了提高全社会网络安全素养的紧迫性。旨在为读者提供一套全面而实用的网络安全知识体系,助力构建更加安全的数字生活环境。
|
3月前
|
编解码 分布式计算 网络协议
Netty高性能网络框架(一)
Netty高性能网络框架(一)
|
7天前
|
JSON 算法 Java
Nettyの网络聊天室&扩展序列化算法
通过本文的介绍,我们详细讲解了如何使用Netty构建一个简单的网络聊天室,并扩展序列化算法以提高数据传输效率。Netty的高性能和灵活性使其成为实现各种网络应用的理想选择。希望本文能帮助您更好地理解和使用Netty进行网络编程。
25 12
|
25天前
|
存储 缓存 监控
Docker容器性能调优的关键技巧,涵盖CPU、内存、网络及磁盘I/O的优化策略,结合实战案例,旨在帮助读者有效提升Docker容器的性能与稳定性。
本文介绍了Docker容器性能调优的关键技巧,涵盖CPU、内存、网络及磁盘I/O的优化策略,结合实战案例,旨在帮助读者有效提升Docker容器的性能与稳定性。
65 7
|
1月前
|
消息中间件 编解码 网络协议
Netty从入门到精通:高性能网络编程的进阶之路
【11月更文挑战第17天】Netty是一个基于Java NIO(Non-blocking I/O)的高性能、异步事件驱动的网络应用框架。使用Netty,开发者可以快速、高效地开发可扩展的网络服务器和客户端程序。本文将带您从Netty的背景、业务场景、功能点、解决问题的关键、底层原理实现,到编写一个详细的Java示例,全面了解Netty,帮助您从入门到精通。
148 0
|
1月前
|
消息中间件 缓存 Java
java nio,netty,kafka 中经常提到“零拷贝”到底是什么?
零拷贝技术 Zero-Copy 是指计算机执行操作时,可以直接从源(如文件或网络套接字)将数据传输到目标缓冲区, 而不需要 CPU 先将数据从某处内存复制到另一个特定区域,从而减少上下文切换以及 CPU 的拷贝时间。
java nio,netty,kafka 中经常提到“零拷贝”到底是什么?
|
2月前
|
机器学习/深度学习 PyTorch 算法框架/工具
深度学习入门案例:运用神经网络实现价格分类
深度学习入门案例:运用神经网络实现价格分类
|
3月前
|
存储 机器人 Linux
Netty(二)-服务端网络编程常见网络IO模型讲解
Netty(二)-服务端网络编程常见网络IO模型讲解
|
2月前
|
机器学习/深度学习 存储 自然语言处理
深度学习入门:循环神经网络------RNN概述,词嵌入层,循环网络层及案例实践!(万字详解!)
深度学习入门:循环神经网络------RNN概述,词嵌入层,循环网络层及案例实践!(万字详解!)
|
4月前
|
网络协议 Java
一文讲明TCP网络编程、Socket套接字的讲解使用、网络编程案例
这篇文章全面讲解了基于Socket的TCP网络编程,包括Socket基本概念、TCP编程步骤、客户端和服务端的通信过程,并通过具体代码示例展示了客户端与服务端之间的数据通信。同时,还提供了多个案例分析,如客户端发送信息给服务端、客户端发送文件给服务端以及服务端保存文件并返回确认信息给客户端的场景。
一文讲明TCP网络编程、Socket套接字的讲解使用、网络编程案例

热门文章

最新文章