在学习NIO时,ByteBuffer、Channel、Selector三个组件是必须了解的。前面我们说到ByteBuffer是作为缓冲区进行数据的存放或者获取。通常我们需要进行flip翻转操作,但是这个在Netty中,有一个更为强大的类可以替代ByteBuf,其不需要进行翻转,也可以进行读写的双向操作。要将数据打包到缓冲区中,通常需要使用通道,而通道作为传输数据的载体,也即它可以使数据从一端到另一端,因此就必须进行了解。
Channel中,我们也看到其子类有很多,通常都是用于读写操作的。其中ByteChannel可以进行读写操作,也即可以进行双向操作。
操作过程:首先创建流对象,有了流对象获取通道,然后准备好写入或者读入通道的bytebuffer信息,使用通道写入或者读入。写入或者读入之后,不要忘记关闭通道、关闭流。这里介绍简单的读写操作,更为详细的可以去参考jdk的api。
1.进行写操作
/*** 进行channel的学习:* nio中buffer、channel、selector三个组件,其中buffer提供了读写操作的条件(缓冲区),而channel提供通道,* 而selector则是多路复用的技术* channel通道:用来传输数据的通道* 我们先来看FileChannel:主要是读取、写入、映射和操作文件的通道,该通道永远是阻塞的操作* 先看写操作 int write(ByteBuffer src);*/publicclassChannelWriteTest { publicstaticvoidmain(String[] args) throwsException { //从通道的当前位置开始写入fileChannelTest1(); System.out.println("================"); //从remaining写入通道fileChannelTest2(); System.out.println("================"); //write方法具有同步性fileChennelTest3(); } //write方法具有同步性privatestaticvoidfileChennelTest3() throwsInterruptedException, IOException { //fileChannel中的write方法的同步性FileOutputStreamfos=newFileOutputStream(newFile("test2.txt")); FileChannelfileChannel=fos.getChannel(); //启动两个线程for (inti=0; i<10; i++) { Threadthread1=newThread() { publicvoidrun() { ByteBufferbuffer=ByteBuffer.wrap("学无止境\r\n".getBytes()); try { fileChannel.write(buffer); } catch (IOExceptione) { log.error("写入数据失败:{}"+e.getMessage()); } } }; Threadthread2=newThread() { publicvoidrun() { ByteBufferbuffer=ByteBuffer.wrap("山高人为峰\r\n".getBytes()); try { fileChannel.write(buffer); } catch (IOExceptione) { log.error("写入数据失败:{}"+e.getMessage()); } } }; thread1.start(); thread2.start(); } Thread.sleep(3000); fileChannel.close(); fos.close(); } //从remaining写入通道privatestaticvoidfileChannelTest2() throwsIOException { //创建流对象、获取通道,进行字节包装,使用通道进行写入FileOutputStreamfos=newFileOutputStream(newFile("test1.txt")); FileChannelfileChannel=fos.getChannel(); try { //子节1:abcdeByteBufferbuffer1=ByteBuffer.wrap("abcde".getBytes()); //子节2:12345ByteBufferbuffer2=ByteBuffer.wrap("12345".getBytes()); //首先写入abcde,此时buffer2的当前位置是1,因此下一个位置是2,同时上界是3,因此2、3位置会放入,因此会将2、3放入到//abcde中,变成 ab23e,因此同时通道的位置设置当前位置是2,因此下一个位置是3,因此其位置为2+2=4fileChannel.write(buffer1); buffer2.position(1); buffer2.limit(3); fileChannel.position(2); fileChannel.write(buffer2); System.out.println(fileChannel.position()); } catch (Exceptione) { log.error("写入数据失败:{}"+e.getMessage()); } //关闭流、通道fileChannel.close(); fos.close(); } //从通道的当前位置开始写入privatestaticvoidfileChannelTest1() throwsException { //创建流对象FileOutputStreamfos=newFileOutputStream(newFile("test.txt")); //获取通道FileChannelfileChannel=fos.getChannel(); try { //将byte包装成byteBuffer,使用fileChannel的write方法将其写入到file中ByteBufferbuffer=ByteBuffer.wrap("abcde".getBytes()); //写入之前的位置是0System.out.println("A fileChannel.postion()="+fileChannel.position()); //写入数据System.out.println("write() 1 返回值:"+fileChannel.write(buffer)); //当前位置变成5System.out.println("B fileChannel.postion()="+fileChannel.position()); //此时的文件通道的位置是2fileChannel.position(2); //将其buffer的位置变成0buffer.rewind(); //然后往文件里面写数据System.out.println("write() 2 返回值:"+fileChannel.write(buffer)); //可以看到文件通道的位置变成了7System.out.println("C fileChannel.postion()="+fileChannel.position()); } catch (IOExceptione) { log.error("写入数据失败:{}"+e.getMessage()); } //关通道、流fileChannel.close(); fos.close(); } }
2.进行读操作
/*** fileChannel读操作:int read(ByteBuffer dst);* 将字节序列从此通道的当前位置读入给定的缓冲区的当前位置* int中返回的值:* 正数表示从通道的当前位置向bytebuffer缓冲区中读的字节个数* 0表示从通道中没有读取任何数据* -1表示到达流的末端*/publicclassChannelReadTest { publicstaticvoidmain(String[] args) throwsException { //使用read方法操作fileChannelTest1(); System.out.println("======================="); //从通道的当前位置开始读取fileChannelTest2(); System.out.println("======================="); //将字节放入ByteBuffer当前位置fileChannelTest3(); System.out.println("======================="); fileChannelTest4(); System.out.println("======================="); //从通道读取的数据大于缓冲区容量fileChannelTest5(); System.out.println("======================="); //从通道读取的字节放入缓冲区的remaining空间中fileChannelTest6(); } privatestaticvoidfileChannelTest6() throwsIOException { FileInputStreamfis=newFileInputStream(newFile("test.txt")); FileChannelfileChannel=fis.getChannel(); ByteBufferbyteBuffer=ByteBuffer.allocate(100); byteBuffer.position(1); byteBuffer.limit(3); fileChannel.read(byteBuffer); fileChannel.close(); fis.close(); byteBuffer.rewind(); for (inti=0; i<byteBuffer.limit(); i++) { byteeachByte=byteBuffer.get(); if (eachByte==0) { System.out.println("空格"); } else { System.out.println((char) eachByte); } } } privatestaticvoidfileChannelTest5() throwsIOException { FileInputStreamfis=newFileInputStream(newFile("test.txt")); FileChannelfileChannel=fis.getChannel(); ByteBufferbyteBuffer=ByteBuffer.allocate(3); System.out.println("A"+fileChannel.position()); fileChannel.read(byteBuffer); System.out.println("B"+fileChannel.position()); fileChannel.close(); fis.close(); byteBuffer.rewind(); for (inti=0; i<byteBuffer.limit(); i++) { System.out.println((char) byteBuffer.get()); } } privatestaticvoidfileChannelTest4() throwsInterruptedException, IOException { FileInputStreamfis=newFileInputStream(newFile("test4.txt")); FileChannelfileChannel=fis.getChannel(); for (inti=0; i<1; i++) { Threadthread1=newThread() { publicvoidrun() { try { ByteBufferbyteBuffer=ByteBuffer.allocate(5); intreadLength=fileChannel.read(byteBuffer); while (readLength!=-1) { byte[] getByte=byteBuffer.array(); System.out.println(newString(getByte, 0, readLength)); byteBuffer.clear(); readLength=fileChannel.read(byteBuffer); } } catch (Exceptione) { log.error("写入数据失败:{}"+e.getMessage()); } } }; Threadthread2=newThread() { publicvoidrun() { try { ByteBufferbyteBuffer=ByteBuffer.allocate(5); intreadLength=fileChannel.read(byteBuffer); while (readLength!=-1) { byte[] getByte=byteBuffer.array(); System.out.println(newString(getByte, 0, readLength)); byteBuffer.clear(); readLength=fileChannel.read(byteBuffer); } } catch (Exceptione) { log.error("写入数据失败:{}"+e.getMessage()); } } }; thread1.start(); thread2.start(); } Thread.sleep(3000); fileChannel.close(); fis.close(); } privatestaticvoidfileChannelTest3() throwsIOException { FileInputStreamfos=newFileInputStream(newFile("test3.txt")); FileChannelfileChannel=fos.getChannel(); //当前位置为2,因此下一个位置为3fileChannel.position(2); //而长度为5,因此打印结果:3 4 5 空格位置 空格位置ByteBufferbyteBuffer=ByteBuffer.allocate(5); fileChannel.read(byteBuffer); byteBuffer.position(3); //向byteBuffer读入cdfileChannel.read(byteBuffer); byte[] getByteArray=byteBuffer.array(); for (inti=0; i<getByteArray.length; i++) { if (getByteArray[i] ==0) { System.out.println(" 空格 "); } else { System.out.println((char) getByteArray[i]); } } fileChannel.close(); fos.close(); } //从通道的当前位置开始读取privatestaticvoidfileChannelTest2() throwsIOException { FileInputStreamfis=newFileInputStream(newFile("test3.txt")); FileChannelfileChannel=fis.getChannel(); //当前位置为2,因此下一个位置为3fileChannel.position(2); //而长度为5,因此打印结果:3 4 5 空格位置 空格位置ByteBufferbyteBuffer=ByteBuffer.allocate(5); fileChannel.read(byteBuffer); byte[] getByteArray=byteBuffer.array(); for (inti=0; i<getByteArray.length; i++) { System.out.println((char) getByteArray[i]); } fileChannel.close(); fis.close(); } //使用read方法操作privatestaticvoidfileChannelTest1() throwsException { FileInputStreamfis=newFileInputStream(newFile("test3.txt")); FileChannelfileChannel=fis.getChannel(); ByteBufferbyteBuffer=ByteBuffer.allocate(5); intreadLength=fileChannel.read(byteBuffer); System.out.println(readLength); //由于byteBuffer没有remaing剩余空间,因此返回的就是0readLength=fileChannel.read(byteBuffer); System.out.println(readLength); byteBuffer.clear(); readLength=fileChannel.read(byteBuffer); //到达流的末尾值为-1System.out.println(readLength); byteBuffer.clear(); fileChannel.close(); fis.close(); } }
3.进行批量写操作
/*** 进行批量写操作 long write(ByteBuffer[] src)* 将每个缓冲区的remaining字节序列写入此通道的当前位置*/publicclassChannelBatchWriteTest { publicstaticvoidmain(String[] args) throwsException { //进行批量写操作fileChannelBatchWriteTest(); fileChannelBatchWriteTest2(); fileChannelBatchWriteTest3(); } privatestaticvoidfileChannelBatchWriteTest() throwsIOException { FileOutputStreamfos=newFileOutputStream(newFile("a.txt")); FileChannelfileChannel=fos.getChannel(); fileChannel.write(ByteBuffer.wrap("123456".getBytes())); fileChannel.position(3); ByteBufferbyteBuffer1=ByteBuffer.wrap("000001".getBytes()); ByteBufferbyteBuffer2=ByteBuffer.wrap("000002".getBytes()); //将多个单的bytebuffer放入byteBuffers中,再写入ByteBuffer[] byteBuffers=newByteBuffer[]{byteBuffer1,byteBuffer2}; fileChannel.write(byteBuffers); fileChannel.close(); fos.close(); } privatestaticvoidfileChannelBatchWriteTest2() throwsIOException { FileOutputStreamfos=newFileOutputStream(newFile("b.txt")); FileChannelfileChannel=fos.getChannel(); fileChannel.write(ByteBuffer.wrap("123456".getBytes())); fileChannel.position(3); ByteBufferbyteBuffer1=ByteBuffer.wrap("abcde1".getBytes()); ByteBufferbyteBuffer2=ByteBuffer.wrap("uxdax2".getBytes()); //将多个单的bytebuffer放入byteBuffers中,再写入ByteBuffer[] byteBuffers=newByteBuffer[]{byteBuffer1,byteBuffer2}; byteBuffer1.position(1); byteBuffer1.limit(3); byteBuffer2.position(2); byteBuffer2.limit(4); fileChannel.write(byteBuffers); fileChannel.close(); fos.close(); } //write方法具有同步性privatestaticvoidfileChannelBatchWriteTest3() throwsInterruptedException, IOException { //fileChannel中的write方法的同步性FileOutputStreamfos=newFileOutputStream(newFile("c.txt")); FileChannelfileChannel=fos.getChannel(); //启动两个线程for (inti=0; i<10; i++) { Threadthread1=newThread() { publicvoidrun() { ByteBufferbuffer1=ByteBuffer.wrap("学无止境\r\n".getBytes()); ByteBufferbuffer2=ByteBuffer.wrap("吾生有崖亦无涯\r\n".getBytes()); try { ByteBuffer[] byteBuffers=newByteBuffer[]{buffer1,buffer2}; fileChannel.write(byteBuffers); } catch (IOExceptione) { log.error("写入数据失败:{}"+e.getMessage()); } } }; Threadthread2=newThread() { publicvoidrun() { ByteBufferbuffer1=ByteBuffer.wrap("山高人为峰\r\n".getBytes()); ByteBufferbuffer2=ByteBuffer.wrap("前行有路\r\n".getBytes()); ByteBuffer[] byteBuffers=newByteBuffer[]{buffer1,buffer2}; try { fileChannel.write(byteBuffers); } catch (IOExceptione) { log.error("写入数据失败:{}"+e.getMessage()); } } }; thread1.start(); thread2.start(); } Thread.sleep(3000); fileChannel.close(); fos.close(); } }
4.进行批量读操作
/*** 进行批量读操作 long read(ByteBuffer[] dsts)*/publicclassChannelBatchReadTest { publicstaticvoidmain(String[] args) throwsException { //进行批量读操作fileChannelBatchReadTest1(); System.out.println("=========="); //从通道的当前位置开始读取fileChannelBatchReadTet2(); System.out.println("=========="); //进行批量读,将字节放入ByteBuffer当前位置fileChannelReadBatchTest3(); System.out.println("=========="); //批量读的同步性fileChannelReadBatchTest4(); //从通道读取的数据大于缓冲区容量fileChannelBatchReadTest5(); //从通道的字节放入缓冲去的remaining空间中fileChannelBatchReadTest6(); } privatestaticvoidfileChannelBatchReadTest6() throwsIOException { FileInputStreamfis=newFileInputStream(newFile("test1.txt")); FileChannelfileChannel=fis.getChannel(); ByteBufferbyteBuffer=ByteBuffer.allocate(7); byteBuffer.position(1); byteBuffer.limit(3); ByteBufferbyteBuffer1=ByteBuffer.allocate(7); byteBuffer.position(2); byteBuffer.limit(4); ByteBuffer[] byteBuffers=newByteBuffer[]{byteBuffer, byteBuffer1}; fileChannel.read(byteBuffers); fileChannel.close(); fis.close(); byteBuffer.rewind(); byteBuffer1.rewind(); for (inti=0; i<byteBuffers.length; i++) { ByteBufferbyteBuffer2=byteBuffers[i]; byte[] getByte=byteBuffer2.array(); for (intj=0; j<getByte.length; j++) { if (getByte[j] ==0) { System.out.println("空格"); } else { System.out.println((char) getByte[j]); } System.out.println(); } } } privatestaticvoidfileChannelBatchReadTest5() throwsIOException { FileInputStreamfis=newFileInputStream(newFile("test1.txt")); FileChannelfileChannel=fis.getChannel(); ByteBufferbyteBuffer=ByteBuffer.allocate(2); ByteBufferbyteBuffer1=ByteBuffer.allocate(2); ByteBuffer[] byteBuffers=newByteBuffer[]{byteBuffer, byteBuffer1}; System.out.println("A "+fileChannel.position()); longreadLength=fileChannel.read(byteBuffers); System.out.println("B "+fileChannel.position() +"readLength="+readLength); fileChannel.close(); fis.close(); byteBuffer.rewind(); byteBuffer1.rewind(); byteBuffer.position(1); for (inti=0; i<byteBuffers.length; i++) { byte[] getByte=byteBuffers[i].array(); for (intk=0; k<getByte.length; k++) { System.out.print((char) getByte[k]); } System.out.println(); } } privatestaticvoidfileChannelReadBatchTest4() throwsException { FileInputStreamfis=newFileInputStream(newFile("d.txt")); FileChannelfileChannel=fis.getChannel(); for (inti=0; i<10; i++) { Threadthread1=newThread() { publicvoidrun() { try { ByteBufferbyteBuffer=ByteBuffer.allocate(8); ByteBufferbyteBuffer1=ByteBuffer.allocate(8); ByteBuffer[] byteBuffers=newByteBuffer[]{byteBuffer, byteBuffer1}; longreadLength=fileChannel.read(byteBuffers); while (readLength!=-1) { synchronized (ChannelBatchReadTest.class) { for (intj=0; j<byteBuffers.length; j++) { byte[] getByte=byteBuffers[j].array(); for (intk=0; k<getByte.length; k++) { System.out.println((char) getByte[k]); } } } byteBuffer.clear(); byteBuffer1.clear(); readLength=fileChannel.read(byteBuffers); } } catch (Exceptione) { log.error("写入数据失败:{}"+e.getMessage()); } } }; Threadthread2=newThread() { publicvoidrun() { try { ByteBufferbyteBuffer=ByteBuffer.allocate(8); ByteBufferbyteBuffer1=ByteBuffer.allocate(8); ByteBuffer[] byteBuffers=newByteBuffer[]{byteBuffer, byteBuffer1}; longreadLength=fileChannel.read(byteBuffers); while (readLength!=-1) { synchronized (ChannelBatchReadTest.class) { for (intj=0; j<byteBuffers.length; j++) { byte[] getByte=byteBuffers[j].array(); for (intk=0; k<getByte.length; k++) { System.out.println((char) getByte[k]); } } } byteBuffer.clear(); byteBuffer1.clear(); readLength=fileChannel.read(byteBuffers); } } catch (Exceptione) { log.error("写入数据失败:{}"+e.getMessage()); } } }; thread1.start(); thread2.start(); } Thread.sleep(3000); fileChannel.close(); fis.close(); } privatestaticvoidfileChannelReadBatchTest3() throwsIOException { FileInputStreamfis=newFileInputStream(newFile("test1.txt")); FileChannelfileChannel=fis.getChannel(); ByteBufferbyteBuffer=ByteBuffer.allocate(2); ByteBufferbyteBuffer1=ByteBuffer.allocate(2); ByteBuffer[] byteBuffers=newByteBuffer[]{byteBuffer, byteBuffer1}; byteBuffer.position(1); fileChannel.read(byteBuffers); for (inti=0; i<byteBuffers.length; i++) { byte[] getByte=byteBuffers[i].array(); for (intj=0; j<getByte.length; j++) { if (getByte[j] ==0) { System.out.println("空格"); } else { System.out.println((char) getByte[j]); } System.out.println(); } fileChannel.close(); fis.close(); } } privatestaticvoidfileChannelBatchReadTet2() throwsIOException { FileInputStreamfis=newFileInputStream(newFile("test1.txt")); FileChannelfileChannel=fis.getChannel(); ByteBufferbyteBuffer=ByteBuffer.allocate(2); ByteBufferbyteBuffer1=ByteBuffer.allocate(2); ByteBuffer[] byteBuffers=newByteBuffer[]{byteBuffer, byteBuffer1}; fileChannel.read(byteBuffers); for (inti=0; i<byteBuffers.length; i++) { byte[] getByte=byteBuffers[i].array(); for (intj=0; j<getByte.length; j++) { System.out.println((char) getByte[j]); } System.out.println(); } fileChannel.close(); fis.close(); } privatestaticvoidfileChannelBatchReadTest1() throwsIOException { FileInputStreamfis=newFileInputStream(newFile("test1.txt")); FileChannelfileChannel=fis.getChannel(); ByteBufferbyteBuffer=ByteBuffer.allocate(2); ByteBufferbyteBuffer1=ByteBuffer.allocate(2); ByteBuffer[] byteBuffers=newByteBuffer[]{byteBuffer, byteBuffer1}; longreadLength=fileChannel.read(byteBuffers); System.out.println(readLength); byteBuffer.clear(); byteBuffer1.clear(); readLength=fileChannel.read(byteBuffers); System.out.println(readLength); byteBuffer.clear(); byteBuffer1.clear(); readLength=fileChannel.read(byteBuffers); System.out.println(readLength); byteBuffer.clear(); byteBuffer1.clear(); readLength=fileChannel.read(byteBuffers); System.out.println(readLength); byteBuffer.clear(); byteBuffer1.clear(); fileChannel.close(); fis.close(); } }
5.进行部分批量写操作
/*** 进行部分批量写操作 long write(ByteBuffer[] srcs,int offset,int length)* 以指定缓冲数组的offset下标开始,向后使用length个字节缓冲区,* 再将每个缓冲区的remaining剩余字节子序列写入此通道的当前位置*/publicclassChannelPartBatchWriteTest { publicstaticvoidmain(String[] args) throwsException { fileChannelPartBatchWriteTest1(); fileChannelPartBatchWriteTest2(); } privatestaticvoidfileChannelPartBatchWriteTest1() throwsIOException { FileOutputStreamfos=newFileOutputStream(newFile("test.txt")); FileChannelfileChannel=fos.getChannel(); ByteBufferbyteBuffer=ByteBuffer.wrap("abcde".getBytes()); ByteBufferbyteBuffer1=ByteBuffer.wrap("12345".getBytes()); ByteBuffer[] byteBuffers=newByteBuffer[]{byteBuffer,byteBuffer1}; fileChannel.write(ByteBuffer.wrap("QSDXXX".getBytes())); fileChannel.position(2); fileChannel.write(byteBuffers,0,2); fileChannel.close(); fos.close(); } privatestaticvoidfileChannelPartBatchWriteTest2() throwsIOException { FileOutputStreamfos=newFileOutputStream(newFile("test.txt")); FileChannelfileChannel=fos.getChannel(); ByteBufferbyteBuffer=ByteBuffer.wrap("abcde".getBytes()); ByteBufferbyteBuffer1=ByteBuffer.wrap("12345".getBytes()); byteBuffer1.position(1); byteBuffer1.limit(3); ByteBufferbyteBuffer3=ByteBuffer.wrap("dwsrdf".getBytes()); byteBuffer3.position(2); byteBuffer3.limit(4); ByteBuffer[] byteBuffers=newByteBuffer[]{byteBuffer,byteBuffer1,byteBuffer3}; fileChannel.write(byteBuffers,1,2); fileChannel.position(2); fileChannel.write(byteBuffers,0,2); fileChannel.close(); fos.close(); } }
6.进行部分批量读操作
/** * 进行批量读操作 long read(ByteBuffer[] dsts,int offset,int length) */ public class ChannelPartBatchReadTest { public static void main(String[] args) throws Exception{ fileChannelPartBatchReadTest1(); System.out.println("==============="); fileChannelPartBatchReadTest2(); } private static void fileChannelPartBatchReadTest1() throws IOException { FileInputStream fis = new FileInputStream(new File("e.txt")); FileChannel fileChannel = fis.getChannel(); ByteBuffer byteBuffer = ByteBuffer.allocate(2); ByteBuffer byteBuffer1 = ByteBuffer.allocate(2); ByteBuffer[] byteBuffers = new ByteBuffer[]{byteBuffer, byteBuffer1}; long readLength = fileChannel.read(byteBuffers,0,2); System.out.println("readLength=" + readLength); byteBuffer.clear(); byteBuffer1.clear(); readLength = fileChannel.read(byteBuffers,0,2); System.out.println("readLength=" + readLength); byteBuffer.clear(); byteBuffer1.clear(); readLength = fileChannel.read(byteBuffers,0,2); System.out.println("readLength=" + readLength); byteBuffer.clear(); byteBuffer1.clear(); fileChannel.close(); fis.close(); } private static void fileChannelPartBatchReadTest2() throws IOException { FileInputStream fis = new FileInputStream(new File("e.txt")); FileChannel fileChannel = fis.getChannel(); fileChannel.position(2); ByteBuffer byteBuffer = ByteBuffer.allocate(2); ByteBuffer byteBuffer1 = ByteBuffer.allocate(2); ByteBuffer[] byteBuffers = new ByteBuffer[]{byteBuffer, byteBuffer1}; fileChannel.read(byteBuffers,0,2); for (int i = 0; i < byteBuffers.length; i++) { byte[] getByte = byteBuffers[i].array(); for (int k = 0; k < getByte.length; k++) { System.out.print((char) getByte[k]); } System.out.println(); } fileChannel.close(); fis.close(); } }
今天就介绍到这里,下一篇我们学习多路复用必备组件Selector。