深入Jetty源码之Buffer

简介:

概述

在Jetty中Buffer是对Java中Stream IO中的buffer和NIO中的buffer的抽象表示,它主要用于缓存连接中读取和写入的数据。在Jetty中,对每个连接使用Buffer从其InputStream中读取字节数据,或将处理后的响应字节写入OutputStream中,从而Jetty其他模块在处理请求和响应数据时直接和Buffer打交道,而不需要关注底层IO流。 

Buffer接口定义

Jetty中Buffer接口定义如下:
public  interface Buffer  extends Cloneable {
     //  基于Buffer主要用于向当前连接读写数据,因而它定义了两个核心的方法:readFrom和writeTo。其中readFrom方法从InputStream中读取字节数据,writeTo方法将响应字节写入OutputStream中,即向Connection中读取和写入数据,写完后清理当前Buffer。
     int readFrom(InputStream in,  int max)  throws IOException;
     void writeTo(OutputStream out)  throws IOException;

     //  在Buffer从InputStream(Connection)中读取数据后,Buffer接口还提供了很多不同的方法用于从Buffer中读取或写入字节数据。

    
//  Jetty中的Buffer是一个FIFO的字节队列,它的设计类似NIO中Buffer的设计:每次get操作都从getIndex开始,并且getIndex会向前移动读取的字节数的长度;每次的peek操作也getIndex开始,但是peek操作不会使getIndex向前移动;每次put操作都从putIndex开始,并且putIndex会向前移动写入的字节数的长度;每次poke操作也会从putIndex开始,但是poke操作不会使putIndex向前移动;mark操作会以getIndex作为基准设置markIndex的值,从而在reset时会将getIndex重置到之前mark过的位置;capacity表示该Buffer最多可存储的字节数,而length表示从getIndex到putIndex还存在的字节数。并且Buffer永远保证以下关系总是成立:markIndex<=getIndex<=putIndex<=capacity

     byte get();
     int get( byte[] b,  int offset,  int length);
    Buffer get( int length);
     int getIndex();

     void mark();
     void mark( int offset);
     int markIndex();

     byte peek();
     byte peek( int index);
    Buffer peek( int index,  int length);
     int peek( int index,  byte[] b,  int offset,  int length);

     int poke( int index, Buffer src);
     void poke( int index,  byte b);
     int poke( int index,  byte b[],  int offset,  int length);

     int put(Buffer src);
     void put( byte b);
     int put( byte[] b, int offset,  int length);
     int put( byte[] b);
     int putIndex();

     int length();
     void clear();
     void reset();
     void setGetIndex( int newStart);
     void setMarkIndex( int newMark);
     void setPutIndex( int newLimit);

     //  一个Buffer还有独立的两种状态:access级别和volatile。
access级别有:IMMUTABLE,表示当前Buffer所有的Index和内容都不能被改变;READONLY,表示当前Buffer是只读的,即getIndex和markIndex可以被改变,而putIndex以及Buffer内容不可以;READWRITE,表示所有的Index以及Buffer的内容可以被改变。
volatile状态表示当前Buffer是否会通过其他路径被修改,默认情况下,ByteArrayBuffer、DirectNIOBuffer等是NON_VOLATILE状态,而View是VOLATILE状态(除非View内部的Buffer是IMMUTABLE)。VOLATILE的状态感觉不是一个比较严谨的概念,比如对一个View它是VOLATILE的,但是在这种情况下,它内部包装的Buffer应该也变成VOLATILE状态,并且在所有的View被回收后,其内部包装的Buffer应该重新变成NON_VOLATILE状态。要实现这种严谨逻辑应该是可以做的到的,只是会比较麻烦,而且貌似也没必要,因而Jetty并没有尝试着去这样做。

     //  返回包含当前Buffer从getIndex到putIndex内容的Buffer,并且返回的Buffer不可以被其他路径修改。如果当前Buffer是NON_VOLATILE,则直接返回当前Buffer(这个实现是不严谨的,因为在这种情况下,其实有两个Buffer实例可以修改同一份数据),否则,克隆一个新的Buffer,这样对新的Buffer的修改不会影响原Buffer的内容。
    Buffer asNonVolatileBuffer();
     //  返回一个只读的Buffer(View),该只读的Buffer的读取不会影响原来Buffer的Index。
    Buffer asReadOnlyBuffer();
     //  拷贝一个不可修改的ByteBuffer。
    Buffer asImmutableBuffer();
     //  返回一个可修改的Buffer,并且对返回的Buffer的内容修改会影响原有的Buffer。
    Buffer asMutableBuffer();

     //  当前Buffer是否不可被修改,即Buffer内容和所有Index都不能被修改。
     boolean isImmutable();
     //  当前Buffer是否是只读的。
     boolean isReadOnly();
     //  是否当前Buffer内容可以通过其他路径被修改,比如View一般情况下是VOLATILE状态(除非View内部的Buffer是IMMUTABLE)。
    boolean isVolatile();
    
     //  除了以上的操作,Buffer还提供了一些其他用于操作Buffer内部字节的方法:

    
// 如果内部使用字节数组表示,返回该字节数组,否则,返回null。
     byte[] array();
     //  获取从getIndex到putIndex的字节数组,其长度为length。
     byte[] asArray();
     //  如果当前Buffer是对另一个Buffer的包装,则返回内部被包装的Buffer实例,否则返回当前Buffer本身。
    Buffer buffer();

     int capacity();
     //  返回当前Buffer剩余的空间,即capacity-putIndex。
     int space();
     //  清除Buffer内容,即设置getIndex和putIndex为0,以及markIndex为-1。
    
//  整理Buffer内容,即将markIndex >= 0 ? min(getIndex, markIndex) : getIndex到putIndex的内容移动到Buffer的起始位置,同时修改相应的getIndex、markIndex、putIndex。
     void compact();
     //  当前Buffer是否有可用字节,即是否putIndex>getIndex。
     boolean hasContent();

     //  跳过n个字节,即getIndex增加min(remaining(), n)的值。
     int skip( int n);

     //  切割出当前Buffer从getIndex到putIndex的View,一般来说它是volatile的(除非它是immutable类型的Buffer)。
    Buffer slice();
     //  切割出当前Buffer从markIndex到putIndex的View,一般来说它是volatile的(除非它是immutable类型的Buffer)。
    Buffer sliceFromMark();
    Buffer sliceFromMark( int length);

     //  返回包含当前Buffer状态和内容的字符串。
    String toDetailString();
     boolean equalsIgnoreCase(Buffer buffer);
 }

类图


主要实现类

AbstractBuffer:
所有Buffer的基类,是对Buffer接口的基本实现。
ByteBuffer:
它继承自AbstractBuffer主要的非NIO的Buffer实现,内部使用字节数组做缓存,直接读InputStream和写OutputStream。
DirectNIOBuffer:
它实现了NIOBuffer接口,继承自AbstractBuffer,内部使用Direct的ByteBuffer做缓存,使用ReadableByteChannel和WritableByteChannel分别对InputStream(readFrom传入)和OutputStream(writeTo传入)包装,并在这两个方法中使用包装后的Channel读写数据。
IndirectNIOBuffer:
它继承自ByteBuffer,内部使用非direct的ByteBuffer做缓存,并且它也直接对InputStream和OutputStream读写。
RandomAccessFileBuffer:
它继承自AbstractBuffer,内部使用RandomAccessFile做缓存。
View:
它继承自AbstractBuffer,内部使用另一个Buffer作为缓存,并且对非IMMUTABLE的Buffer,很多时候,它是VOLATILE。View如其名,它是对内部Buffer的视图,对View内容以及Index的修改会影响内部Buffer中相应的值。

Buffers

Buffers是Buffer的抽象工厂,它用于创建Header的Buffer和Body的Buffer,并且可以根据给定的size获得相应的Buffer实例,在Buffer使用完成后,还可以通过returnBuffer方法将它归还个Buffers以复用。在创建Buffers子类时,可以将指定Header和Body各自Buffer的类型,从而在内部创建相应Buffer时会创建相应类型的Buffer,支持的Buffer类型有:BYTE_ARRAY、DIRECT、INDIRECT。

Jetty中有两个Buffers的实现: PooledBuffers和ThreadLocalBuffers。

PooledBuffers 使用ConcurrentLinkedQueue构建各自的Header、Body、Other类型的Buffer池,它有一个maxSize值用于控制该Buffers中包含的所有类型的Buffer的总数。  ThreadLocalBuffers 将Header、Body、Other类型的Buffer保存在ThreadLocal中。

Jetty还提供了 BuffersFactory 用于创建不同类型的Buffers:通过在参数中maxSize是否大于等于0以决定是使用PooledBuffers还是ThreadLocalBuffers。 

BufferCache/BufferDateCache

Jetty还为Buffer提供了两个特殊的类:BufferCache和BufferDateCache。

BufferCache
用于存储一个可以使用存储的String值、索引值等获取相应的Buffer实例,主要用于HttpHeaders、HttpMethods等一些预定义的值。
BufferDateCache
继承自DateCache,它存储了上一次使用一个long类型的date值格式化出的Buffer实例,从而实现部分复用(复用在同一秒得到的Request请求时创建的Buffer,因为时间也只能在这种情况下被复用,因而才会有这样的实现),在Request类中使用。

相关文章
|
缓存 分布式计算 API
Spark Netty与Jetty (源码阅读十一)
  spark呢,对Netty API又做了一层封装,那么Netty是什么呢~是个鬼。它基于NIO的服务端客户端框架,具体不再说了,下面开始。   创建了一个线程工厂,生成的线程都给定一个前缀名。      像一般的netty框架一样,创建Netty的EventLoopGroup:      在常用...
1094 0