ByteBuffer 大小分配

简介: ByteBuffer 大小分配
  • 每个 channel 都需要记录可能被切分的消息,因为 ByteBuffer 不能被多个 channel 共同使用,因此需要为每个 channel 维护一个独立的 ByteBuffer
  • ByteBuffer 不能太大,比如一个 ByteBuffer 1Mb 的话,要支持百万连接就要 1Tb 内存,因此需要设计大小可变的 ByteBuffer
  • 一种思路是首先分配一个较小的 buffer,例如 4k,如果发现数据不够,再分配 8k 的 buffer,将 4k buffer 内容拷贝至 8k buffer,优点是消息连续容易处理,缺点是数据拷贝耗费性能,参考实现 tutorials.jenkov.com/java-perfor…
  • 另一种思路是用多个数组组成 buffer,一个数组不够,把多出来的内容写入新的数组,与前面的区别是消息存储不连续解析复杂,优点是避免了拷贝引起的性能损耗

4.5 处理 write 事件

一次无法写完例子

  • 非阻塞模式下,无法保证把 buffer 中所有数据都写入 channel,因此需要追踪 write 方法的返回值(代表实际写入字节数)
  • 用 selector 监听所有 channel 的可写事件,每个 channel 都需要一个 key 来跟踪 buffer,但这样又会导致占用内存过多,就有两阶段策略
  • 当消息处理器第一次写入消息时,才将 channel 注册到 selector 上
  • selector 检查 channel 上的可写事件,如果所有的数据写完了,就取消 channel 的注册
  • 如果不取消,会每次可写均会触发 write 事件 public class WriteServer {
  • public static void main(String[] args) throws IOException { ServerSocketChannel ssc = ServerSocketChannel.open(); ssc.configureBlocking(false); ssc.bind(new InetSocketAddress(8080));
Selector selector = Selector.open();
ssc.register(selector, SelectionKey.OP_ACCEPT);
while(true) {
    selector.select();
    Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
    while (iter.hasNext()) {
        SelectionKey key = iter.next();
        iter.remove();
        if (key.isAcceptable()) {
            SocketChannel sc = ssc.accept();
            sc.configureBlocking(false);
            SelectionKey sckey = sc.register(selector, SelectionKey.OP_READ);
            // 1. 向客户端发送内容
            StringBuilder sb = new StringBuilder();
            for (int i = 0; i < 3000000; i++) {
                sb.append("a");
            }
            ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());
            int write = sc.write(buffer);
            // 3. write 表示实际写了多少字节
            System.out.println("实际写入字节:" + write);
            // 4. 如果有剩余未读字节,才需要关注写事件
            if (buffer.hasRemaining()) {
                // read 1  write 4
                // 在原有关注事件的基础上,多关注 写事件
                sckey.interestOps(sckey.interestOps() + SelectionKey.OP_WRITE);
                // 把 buffer 作为附件加入 sckey
                sckey.attach(buffer);
            }
        } else if (key.isWritable()) {
            ByteBuffer buffer = (ByteBuffer) key.attachment();
            SocketChannel sc = (SocketChannel) key.channel();
            int write = sc.write(buffer);
            System.out.println("实际写入字节:" + write);
            if (!buffer.hasRemaining()) { // 写完了
                key.interestOps(key.interestOps() - SelectionKey.OP_WRITE);
                key.attach(null);
            }
        }
    }
}
  • } }

客户端 public class WriteClient { public static void main(String[] args) throws IOException { Selector selector = Selector.open(); SocketChannel sc = SocketChannel.open(); sc.configureBlocking(false); sc.register(selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ); sc.connect(new InetSocketAddress("localhost", 8080)); int count = 0; while (true) { selector.select(); Iterator iter = selector.selectedKeys().iterator(); while (iter.hasNext()) { SelectionKey key = iter.next(); iter.remove(); if (key.isConnectable()) { System.out.println(sc.finishConnect()); } else if (key.isReadable()) { ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024); count += sc.read(buffer); buffer.clear(); System.out.println(count); } } } } }


目录
相关文章
2. ByteBuffer
2. ByteBuffer
39 0
|
4月前
|
存储
victoriaMetrics之byteBuffer
victoriaMetrics之byteBuffer
50 2
|
4月前
|
缓存 Java 索引
ByteBuffer 字节缓冲区
ByteBuffer 字节缓冲区
45 0
|
4月前
|
Java
Buffer 缓冲区操作
Buffer 缓冲区操作
37 0
2.1 ByteBuffer 正确使用姿势
2.1 ByteBuffer 正确使用姿势
70 0
|
索引
2.3 ByteBuffer 常见方法
2.3 ByteBuffer 常见方法
69 0
|
算法 Java 索引
ByteBuffer
ByteBuffer
85 0
|
JSON 数据格式
Buffer 对象
Buffer 对象
111 0
|
存储 消息中间件 缓存
ByteBuffer总结
ByteBuffer总结
130 0
|
Java API 索引
ByteBuf 和 ByteBuffer 的区别, ByteBuf 动态扩容源码分析
ByteBuf 和 ByteBuffer 的区别, ByteBuf 动态扩容源码分析
587 0