深入分析Channel和FileChannel

简介: 我是小假 期待与你的下一次相遇 ~

Java Channel FileChannel

NIO 中的 Buffer可以认为是装载数据的容器,有了容器,还需要传输数据的通道才能完成数据的传输。

Channel 可以认为它是本地 I/O 设备、网络 I/O 的通信桥梁,只有搭建了这座桥梁,数据才能被写入 Buffer 。

Channel

在 NIO 中,Channel 和 Buffer 是相辅相成的,只能从 Channel 读取数据到 Buffer 中,或者从 Buffer 写入数据到 Channle,如下图:

Channel 类似于 OIO 中的流(Stream),但是又有所区别:

  • 流是单向的,但 Channel 是双向的,可读可写。
  • 流是阻塞的,但 Channle 可以异步读写。
  • 流中的数据可以选择性的先读到缓存中,而 Channel 的数据总是要先读到一个 Buffer 中,或从 Buffer 中写入,如上图。

NIO 中通过 Channel 封装了对数据源的操作,通过 Channel 可以操作数据源,但是又不必关注数据源的具体物理结构,这个数据源可以是文件,也可以是socket。

Channel 的接口定义如下:

  1. publicinterface Channel extends Closeable {
  2.    public boolean isOpen();
  3.    public void close() throws IOException;
  4. }

Channel 接口仅定义两个方法:

  • isOpen():Channel 是否打开
  • close():关闭 Channel

它的主要实现有:

  • FileChannel:文件通道,用于文件的数据读写。
  • SocketChannel:套接字通道,能通过 TCP 读写网络中的数据。
  • ServerSocketChannel:服务器套接字通道,监听新进来的 TCP 连接,像 web 服务器那样,对每一个新进来的连接都会创建一个 SocketChannel
  • DatagramChannel:数据报通道,能通过 UDP 读写网络中的数据。

基本类图如下:

下面就 FileChannel 做详细介绍。

FileChannel

FileChannel 主要是用来读写和映射一个系统文件的 Channel,它是一个抽象类,具体由 FileChannelImpl 来实现。

定义如下:

  1. package java.nio.channels;
  2. publicabstractclass FileChannel
  3.    extends AbstractInterruptibleChannel
  4.    implements SeekableByteChannel, GatheringByteChannel, ScatteringByteChannel{
  5.    /**
  6.     * 初始化一个无参构造器.
  7.     */
  8.    protected FileChannel() { }
  9.    //打开或创建一个文件,返回一个文件通道来访问文件
  10.    public static FileChannel open(Path path,
  11.                                   Set<? extends OpenOption> options,
  12.                                   FileAttribute<?>... attrs)
  13.        throws IOException
  14.    {
  15.        FileSystemProvider provider = path.getFileSystem().provider();
  16.        return provider.newFileChannel(path, options, attrs);
  17.    }
  18.    privatestaticfinal FileAttribute<?>[] NO_ATTRIBUTES = new FileAttribute[0];
  19.    //打开或创建一个文件,返回一个文件通道来访问文件
  20.    public static FileChannel open(Path path, OpenOption... options)
  21.        throws IOException
  22.    {
  23.        Set<OpenOption> set = new HashSet<OpenOption>(options.length);
  24.        Collections.addAll(set, options);
  25.        return open(path, set, NO_ATTRIBUTES);
  26.    }
  27.    //从这个通道读入一个字节序列到给定的缓冲区
  28.    public abstract int read(ByteBuffer dst) throws IOException;
  29.    //从这个通道读入指定开始位置和长度的字节序列到给定的缓冲区
  30.    public abstract long read(ByteBuffer[] dsts, int offset, int length)
  31.        throws IOException;
  32.    /**
  33.     * 从这个通道读入一个字节序列到给定的缓冲区
  34.     */
  35.    public final long read(ByteBuffer[] dsts) throws IOException {
  36.        return read(dsts, 0, dsts.length);
  37.    }
  38.    /**
  39.     * 从给定的缓冲区写入字节序列到这个通道
  40.     */
  41.    public abstract int write(ByteBuffer src) throws IOException;
  42.    /**
  43.     * 从给定缓冲区的子序列向该信道写入字节序列
  44.     */
  45.    public abstract long write(ByteBuffer[] srcs, int offset, int length)
  46.        throws IOException;
  47.    /**
  48.     * 从给定的缓冲区写入字节序列到这个通道
  49.     */
  50.    public final long write(ByteBuffer[] srcs) throws IOException {
  51.        return write(srcs, 0, srcs.length);
  52.    }
  53.    /**
  54.     * 返回通道读写缓冲区中的开始位置
  55.     */
  56.    public abstract long position() throws IOException;
  57.    /**
  58.     * 设置通道读写缓冲区中的开始位置
  59.     */
  60.    public abstract FileChannel position(long newPosition) throws IOException;
  61.    /**
  62.     * 返回此通道文件的当前大小
  63.     */
  64.    public abstract long size() throws IOException;
  65.    /**
  66.     * 通过指定的参数size来截取通道的大小
  67.     */
  68.    public abstract FileChannel truncate(long size) throws IOException;
  69.    /**
  70.     * 强制将通道中的更新文件写入到存储设备(磁盘等)中
  71.     */
  72.    public abstract void force(boolean metaData) throws IOException;
  73.    /**
  74.     * 将当前通道中的文件写入到可写字节通道中
  75.     * position就是开始写的位置,long就是写的长度
  76.     */
  77.    public abstract long transferTo(long position, long count,
  78.                                    WritableByteChannel target)
  79.        throws IOException;
  80.    /**
  81.     * 将当前通道中的文件写入可读字节通道中
  82.   * position就是开始写的位置,long就是写的长度
  83.     */
  84.    public abstract long transferFrom(ReadableByteChannel src,
  85.                                      long position, long count)
  86.        throws IOException;
  87.    /**
  88.     * 从通道中读取一系列字节到给定的缓冲区中
  89.     * 从指定的读取开始位置position处读取
  90.     */
  91.    public abstract int read(ByteBuffer dst, long position) throws IOException;
  92.    /**
  93.     * 从给定的缓冲区写入字节序列到这个通道
  94.     * 从指定的读取开始位置position处开始写
  95.     */
  96.    public abstract int write(ByteBuffer src, long position) throws IOException;
  97.    // -- Memory-mapped buffers --
  98.    /**
  99.     * 一个文件映射模式类型安全枚举
  100.     */
  101.    publicstaticclass MapMode {
  102.        //只读映射模型
  103.        publicstaticfinal MapMode READ_ONLY
  104.            = new MapMode("READ_ONLY");
  105.        //读写映射模型
  106.        publicstaticfinal MapMode READ_WRITE
  107.            = new MapMode("READ_WRITE");
  108.        /**
  109.         * 私有模式(复制在写)映射
  110.         */
  111.        publicstaticfinal MapMode PRIVATE
  112.            = new MapMode("PRIVATE");
  113.        privatefinal String name;
  114.        private MapMode(String name) {
  115.            this.name = name;
  116.        }
  117.    }
  118.    /**
  119.     * 将该通道文件的一个区域直接映射到内存中
  120.     */
  121.    public abstract MappedByteBuffer map(MapMode mode,
  122.                                         long position, long size)
  123.        throws IOException;
  124.    /**
  125.     * 获取当前通道文件的给定区域上的锁
  126.     * 区域就是从position处开始,size长度
  127.     * shared为true代表获取共享锁,false代表获取独占锁
  128.     */
  129.    public abstract FileLock lock(long position, long size, boolean shared)
  130.        throws IOException;
  131.    /**
  132.     * 获取当前通道文件上的独占锁
  133.     */
  134.    public final FileLock lock() throws IOException {
  135.        return lock(0L, Long.MAX_VALUE, false);
  136.    }
  137.    /**
  138.     * 尝试获取给定的通道文件区域上的锁
  139.     * 区域就是从position处开始,size长度
  140.   * shared为true代表获取共享锁,false代表获取独占锁
  141.     */
  142.    public abstract FileLock tryLock(long position, long size, boolean shared)
  143.        throws IOException;
  144.    /**
  145.     * 尝试获取当前通道文件上的独占锁
  146.     */
  147.    public final FileLock tryLock() throws IOException {
  148.        return tryLock(0L, Long.MAX_VALUE, false);
  149.    }
  150. }

打开 FileChannel

在使用 FileChannle 之前必须要先打开它,但是无法直接打开一个 FileChannel,需要通过使用一个 InputStream、OutputStream、RandomAcessFile 来获取一个 FileChannel 实例,如下:

  1. RandomAccessFile accessFile = new RandomAccessFile("/Users/chenssy/Documents/FileChannel.txt","rw");
  2. FileChannel fileChannel = accessFile.getChannel();

调用 getChannel() 即可获取 FileChannel 实例,源码如下:

  1. public final FileChannel getChannel() {
  2.    synchronized (this) {
  3.        if (channel == null) {
  4.            channel = FileChannelImpl.open(fd, path, true, rw, this);
  5.        }
  6.        return channel;
  7.    }
  8. }

getChnnel() 方法很简单,直接调用 FileChannelImpl 的静态方法 open()

  1. public static FileChannel open(Path path,
  2.        Set<? extends OpenOption> options,
  3.        FileAttribute<?>... attrs) throws IOException{
  4.    FileSystemProvider provider = path.getFileSystem().provider();
  5.    return provider.newFileChannel(path, options, attrs);
  6. }

从 FileChannel 读数据

调用 FileChannel 的 read() 方法即可从 FileChannel 中获取数据,当然不是直接获取,而是需要先写入到 Buffer 中,所以调用 read() 之前,需要分配一个 Buffer,然后调用 read() ,该方法返回 int 表示有多少数据读取到了 Buffer 中了,如果返回 -1 表示已经到文件末尾了。

  1. ByteBuffer buffer = ByteBuffer.allocate(1024);
  2. int readCount = fileChannel.read(buffer);

FileChannel 仅定义了方法,具体实现在 FileChannelImpl,如下:

  1. public int read(ByteBuffer dst) throws IOException {
  2.    ensureOpen();
  3.    if (!readable)
  4.        thrownew NonReadableChannelException();
  5.        // 加锁
  6.    synchronized (positionLock) {
  7.        int n = 0;
  8.        int ti = -1;
  9.        try {
  10.            begin();
  11.            ti = threads.add();
  12.            if (!isOpen())
  13.                return0;
  14.            do {
  15.                // 通过IOUtil.read实现
  16.                n = IOUtil.read(fd, dst, -1, nd);
  17.            } while ((n == IOStatus.INTERRUPTED) && isOpen());
  18.            return IOStatus.normalize(n);
  19.        } finally {
  20.            threads.remove(ti);
  21.            end(n > 0);
  22.            assert IOStatus.check(n);
  23.        }
  24.    }
  25. }
  • 首先确保该 Channel 是打开的
  • 然后加锁,主要是因为写入缓冲区需要保证线程安全
  • 最后通过 IOUtils.read() 实现
  1. static int read(FileDescriptor fd, ByteBuffer dst, long position, NativeDispatcher nd) throws IOException{
  2.  // 1 申请一块临时堆外DirectByteBuffer
  3.  ByteBuffer bb = Util.getTemporaryDirectBuffer(dst.remaining());
  4.  try {
  5.      // 2 先往DirectByteBuffer写入数据,提高效率
  6.      int n = readIntoNativeBuffer(fd, bb, position, nd);
  7.      bb.flip();
  8.      if (n > 0)
  9.          // 3 再拷贝到传入的buffer
  10.          dst.put(bb);
  11.      return n;
  12.  } finally {
  13.      Util.offerFirstTemporaryDirectBuffer(bb);
  14.  }
  15. }
  • 首先申请一块临时的堆外 DirectByteBuffer
  • 然后先往 DirectByteBuffer 写入数据,因为这样能够提高效率,为什么会提高效率,后文分析。
  • 最后拷贝到 ByteBuffer
    写数据到 FileChannel
    read()方法是从 FileChannel 中读取数据,那 write()方法则是从 ByteBuffer中读取数据写入到 Channel 中。调用 write() 需要先申请一个 ByteBuffer ,如下:
  1. ByteBuffer buffer = ByteBuffer.allocate(1024);
  2. fileChannel.write(buffer);
  • 同样,实现是在 FileChannelImpl 中。
  1. public int write(ByteBuffer src) throws IOException {
  2.  ensureOpen();
  3.  if (!writable)
  4.      thrownew NonWritableChannelException();
  5.  synchronized (positionLock) {
  6.      int n = 0;
  7.      int ti = -1;
  8.      try {
  9.          begin();
  10.          ti = threads.add();
  11.          if (!isOpen())
  12.              return0;
  13.          do {
  14.              n = IOUtil.write(fd, src, -1, nd);
  15.          } while ((n == IOStatus.INTERRUPTED) && isOpen());
  16.          return IOStatus.normalize(n);
  17.      } finally {
  18.          threads.remove(ti);
  19.          end(n > 0);
  20.          assert IOStatus.check(n);
  21.      }
  22.  }
  23. }
  • read() 方法实现一模一样,先确定该 Channel 是打开的,然后加锁,最后调用 IOUtil 的 write()
  1. static int write(FileDescriptor fd, ByteBuffer src, long position, NativeDispatcher nd)
  2. throws IOException{
  3. if (src instanceof DirectBuffer)
  4.     return writeFromNativeBuffer(fd, src, position, nd);
  5. int pos = src.position();
  6. int lim = src.limit();
  7. assert (pos <= lim);
  8. int rem = (pos <= lim ? lim - pos : 0);
  9. // 2 否则构造一块跟传入缓冲区一样大小的DirectBuffer
  10. ByteBuffer bb = Util.getTemporaryDirectBuffer(rem);
  11. try {
  12.     bb.put(src);
  13.     bb.flip();
  14.     src.position(pos);
  15.     // 3 调用writeFromNativeBuffer读取
  16.     int n = writeFromNativeBuffer(fd, bb, position, nd);
  17.     if (n > 0) {
  18.         // now update src
  19.         src.position(pos + n);
  20.     }
  21.     return n;
  22. } finally {
  23.     Util.offerFirstTemporaryDirectBuffer(bb);
  24. }
  25. }
  • 首先判断传入的 Buffer 是否为 DirectBuffer,如果是的话,就直接写入
  • 否则则构造一块跟传入 Buffer 一样大小的 DirectBuffer
  • 最后调用 writeFromNativeBuffer()
    关闭 FileChannel
    保持好习惯,用完了一定要记得关闭:close()
  1. public final void close() throws IOException {
  2.  synchronized (closeLock) {
  3.      if (!open)
  4.          return;
  5.      open = false;
  6.      implCloseChannel();
  7.  }
  8. }
  • 调用 implCloseChannel() 释放 Channel。
  1. protected void implCloseChannel() throws IOException {
  2.  // 释放文件锁
  3.  if (fileLockTable != null) {
  4.      for (FileLock fl: fileLockTable.removeAll()) {
  5.          synchronized (fl) {
  6.              if (fl.isValid()) {
  7.                  //释放锁
  8.                  nd.release(fd, fl.position(), fl.size());
  9.                  ((FileLockImpl)fl).invalidate();
  10.              }
  11.          }
  12.      }
  13.  }
  14.  // 通知当前通道所有被阻塞线程
  15.  threads.signalAndWait();
  16.  if (parent != null) {
  17.      ((java.io.Closeable)parent).close();
  18.  } else {
  19.      nd.close(fd);
  20.  }
  21. }
  • 关闭 FileChannel 时,需要释放所有锁和文件流。
    示例

    读数据
  1. public static void main(String[] args) throws Exception {
  2.  RandomAccessFile accessFile = new RandomAccessFile("/Users/chenssy/Documents/FileChannel.txt","rw");
  3.  FileChannel fileChannel = accessFile.getChannel();
  4.  ByteBuffer buffer = ByteBuffer.allocate(1024);
  5.  fileChannel.read(buffer);
  6.  System.out.println(new String(buffer.array()));
  7.  fileChannel.close();
  8. }

  • 写数据
  1. public static void main(String[] args) throws Exception {
  2.  String fileContent = "写入数据";
  3.  RandomAccessFile accessFile = new RandomAccessFile("/Users/chenssy/Documents/FileChannel.txt","rw");
  4.  FileChannel fileChannel = accessFile.getChannel();
  5.  ByteBuffer buffer = ByteBuffer.allocate(1024);
  6.  buffer.put(fileContent.getBytes("UTF-8"));
  7.  buffer.flip();
  8.  fileChannel.write(buffer);
  9.  fileChannel.close();
  10. }


相关文章
|
XML 缓存 Java
搞透 IOC、Spring IOC ,看这篇就够了!
本文详细解析了Spring框架的核心内容——IOC(控制反转)及其依赖注入(DI)的实现原理,帮助读者理解如何通过IOC实现组件解耦,提高程序的灵活性和可维护性。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
|
Java Linux iOS开发
如何配置 Java 环境变量:设置 JAVA_HOME 和 PATH
本文详细介绍如何在Windows和Linux/macOS系统上配置Java环境变量。
21019 12
|
11月前
|
人工智能 安全 Java
掌握Java反射:在项目中高效应用反射机制
Java反射是一种强大功能,允许程序在运行时动态获取类信息、创建对象、调用方法和访问字段,提升程序灵活性。它在框架开发、动态代理、注解处理等场景中广泛应用,如Spring和Hibernate。但反射也存在性能开销、安全风险和代码复杂性,应谨慎使用。
244 0
|
应用服务中间件 nginx 索引
|
API Android开发 iOS开发
web: 手机键盘自动获取短信验证码,点击自动填充输入框
web: 手机键盘自动获取短信验证码,点击自动填充输入框
1841 0
|
缓存 Dubbo Java
3步让Dubbo项目快速集成Sentinel
在微服务系统中,缓存、限流、熔断是保证系统高可用的三板斧。本文通过3个步骤,让Dubbo项目快速集成使用Sentinel实现系统限流。
|
Ubuntu Shell 数据安全/隐私保护
ubuntu创建用户命令
可以用adduser和useradd来添加用户,用userdel来删除用户。最简单的命令:sudo adduser test会自动同名组,创建/home/test/,从etc/skel/复制文件,并设定密码和相关初始身份信息原始一点的命令:sudo useradd -mk /home/test2 -...
1903 0
|
Java 程序员
Java四舍五入大揭秘:Math.round(11.5)为何等于12?
小米是一位热爱技术的29岁程序员,他在文章中探讨了一道常见的Java面试题——Math.round(11.5)和Math.round(-11.5)的结果及其背后的原理。通过详细解析,小米揭示了Java中四舍五入的特殊规则,并介绍了Math.round()的内部实现机制,即对正数加0.5后向下取整,对负数则先减0.5再向下取整。文章还对比了Math.ceil、Math.floor和Math.rint的不同之处,帮助读者更好地理解和记忆这些数学函数。
471 11
|
安全 Python
【Python】 已解决:(pip提示)[notice] To update, run: python.exe -m pip install --upgrade pip
【Python】 已解决:(pip提示)[notice] To update, run: python.exe -m pip install --upgrade pip
1795 0
【Python】 已解决:(pip提示)[notice] To update, run: python.exe -m pip install --upgrade pip
|
Java API 开发者
如何触发Thread.sleep抛出的InterruptedException?
其实说来也简单,就是Thread.sleep所在的线程被interrupt了就抛出InterruptedException,但由于历史遗留问题这个十分罕见的情况变成了所有开发者都需要处理的常情,从这个细节的设计可以看出Thread这个类的历史遗留相当严重,其实java早已无法完全兼容旧版本,删除废弃了很多API,但仍然无法改良这些关键部分的设计。
856 0