第一次学习java的时候,学习到IO的时候总感觉很奇怪,他有三个基本字节流文件IO类,FileInputStream,FileOutputStream,RandomAccessFile。自己本身是从C 学起的,学到C++,unix编程,一直都是拿着文件指针或者文件描述符来进行操作,也是可以跳读的。感觉java的文件操作把c的给分开细化了,由于初学java,并没有仔细的去思考过这个问题。后来知道jvm还有直接内存,就很好奇直接内存到底是什么,为什么java nio中很多都和直接内存相关,我在看视频的时候,里面的老师讲java nio用直接内存拷贝文件,压根没有走到用户态,用户态程序发了一条指令,然后文件就从内核态进行拷贝了。听到这里,我感觉java玩出了新高度,我在C里完全没见过的高度。于是我找了openjdk的代码来阅读,看看是如何实现的,看过源码后,很多问题都解决了,也明白了很多都是谎言。
下面主要来说一下java的阻塞io--bio。主要是说linux的实现。里面有一些linux c的库函数,我会做简单的介绍。
文件操作的过程
具体说之前,必须先普及一个操作系统知识,文件的读取和写入的大概的流程。这里说的是一般情况,用户态没有办法直接操作文件,必须通过系统调用通过内核进行操作。例如读取文件,是从磁盘到内核主存再到用户主存,文件写入是先从用户主存到内核主存再到磁盘。内存映射也是操作的一种方法,这种情况就不需要内核进行数据的拷贝,用户态可以操作内存一样操作文件。
所以说视频上讲的先不进入用户态,直接内核态进行文件拷贝的说法,就有点比较匪夷所思了。
java的操作
以FileInputStream为例来说明,FileOutputStream,RandomAccessFile可以用类似的方法来查看。
FileInputStream
public FileInputStream(File file) throws FileNotFoundException {
String name = (file != null ? file.getPath() : null);
SecurityManager security = System.getSecurityManager();
if (security != null) {
security.checkRead(name);
}
if (name == null) {
throw new NullPointerException();
}
if (file.isInvalid()) {
throw new FileNotFoundException("Invalid file path");
}
//文件描述符
fd = new FileDescriptor();
fd.attach(this);
path = name;
//打开文件
open(name);
}
private void open(String name) throws FileNotFoundException {
open0(name);
}
private native void open0(String name) throws FileNotFoundException;
java里也是维护了文件描述符的,你也看到了,他只是new了这么一个FileDescriptor对象,也没做什么操作。可能比较疑惑,但是写过jni的人都了解,jni是运行native反调java的。文件描述符的设置我们下面在native部分说明。
JNIEXPORT void JNICALL
Java_java_io_FileInputStream_open0(JNIEnv *env, jobject this, jstring path) {
fileOpen(env, this, path, fis_fd, O_RDONLY);
}
在open函数中,直接调用了fileOpen的方法,后面就直接找c的实现了,不会再单独从java找到调用jni的c的类。fileOpen在solaris\native\java\io\io_util_md.c中。
void
fileOpen(JNIEnv *env, jobject this, jstring path, jfieldID fid, int flags)
{
WITH_PLATFORM_STRING(env, path, ps) {
FD fd;
#if defined(__linux__) || defined(_ALLBSD_SOURCE)
char *p = (char *)ps + strlen(ps) - 1;
while ((p > ps) && (*p == '/'))
*p-- = '\0';
#endif
//打开文件
fd = handleOpen(ps, flags, 0666);
if (fd != -1) {
//设置文件表示符
SET_FD(this, fd, fid);
} else {
throwFileNotFoundException(env, path);
}
} END_PLATFORM_STRING(env, ps);
}
在fileOpen中打开了文件,并且把文件描述符设置回去了。这里才是java对象真正获取到文件描述符的地方。
#define open64 open
#define RESTARTABLE(_cmd, _result) do { \
do { \
_result = _cmd; \
} while((_result == -1) && (errno == EINTR)); \
} while(0)
FD
handleOpen(const char *path, int oflag, int mode) {
FD fd;
RESTARTABLE(open64(path, oflag, mode), fd);
if (fd != -1) {
struct stat64 buf64;
int result;
RESTARTABLE(fstat64(fd, &buf64), result);
if (result != -1) {
if (S_ISDIR(buf64.st_mode)) {
close(fd);
errno = EISDIR;
fd = -1;
}
} else {
close(fd);
fd = -1;
}
}
return fd;
}
为了方便阅读,我把重要的宏定义都列举了出来,open64实际就是open,RESTARTABLE其实做的就是把第一个方法运行结果赋值给第二个参数,说白了就是 fd=open64(path, oflag, mode),里面有循环保证运行。这里就能看到实际调用的就是open函数。
再说说read。
jint
readSingle(JNIEnv *env, jobject this, jfieldID fid) {
jint nread;
char ret;
FD fd = GET_FD(this, fid);
if (fd == -1) {
JNU_ThrowIOException(env, "Stream Closed");
return -1;
}
nread = IO_Read(fd, &ret, 1);
if (nread == 0) { /* EOF */
return -1;
} else if (nread == -1) { /* error */
JNU_ThrowIOExceptionWithLastError(env, "Read error");
}
return ret & 0xFF;
}
#define IO_Read handleRead
ssize_t
handleRead(FD fd, void *buf, jint len)
{
ssize_t result;
RESTARTABLE(read(fd, buf, len), result);
return result;
}
read中,你最后会找到一个叫IO_Read的函数,实际这个也是宏定义,上面代码中我把这个宏对应的代码贴出,你能看到最后调用的是read函数。宏声明在solaris\native\java\io\io_util_md.h中。这里确实比较绕,使用了宏,而不是直接调用方法。
java堆和native堆
FileOutputStream,RandomAccessFile也是同相同的方法去看,发现都是比较熟悉的系统api的调用。还有一个想说的就是数组的读取,在看到用数组读取的时候你能看到这样的代码,这个代码在read的实现中(带数组的重载)。
(*env)->SetByteArrayRegion(env, bytes, off, nread, (jbyte *)buf);
很多人不写jni,所以看着比较迷惑,这里把c的数组的值,赋值给java的数组,java的对象一般都是在java的堆中的,而native的代码是在native的栈或者堆中的,如果java想用,那么必须有个从native的堆到java的堆中拷贝的过程。这个麻烦的地方就是DirectByteBuffer存在的意义,DirectByteBuffer虽然是java堆中的对象,但是引用native的数据,DirectByteBuffer有点类似指针的意思。
FileChannel的读取
FileInputStream可以通过getChannel获取到FileChannel的对象,我们来看看FileChannel是怎么读取数据的。
private static int readIntoNativeBuffer(FileDescriptor fd, ByteBuffer bb,
long position, NativeDispatcher nd)
throws IOException
{
int pos = bb.position();
int lim = bb.limit();
assert (pos <= lim);
int rem = (pos <= lim ? lim - pos : 0);
if (rem == 0)
return 0;
int n = 0;
if (position != -1) {
n = nd.pread(fd, ((DirectBuffer)bb).address() + pos,
rem, position);
} else {
n = nd.read(fd, ((DirectBuffer)bb).address() + pos, rem);
}
if (n > 0)
bb.position(pos + n);
return n;
}
在读取的时候会分开两种情况
#define pread64 pread
JNIEXPORT jint JNICALL
Java_sun_nio_ch_FileDispatcherImpl_read0(JNIEnv *env, jclass clazz,
jobject fdo, jlong address, jint len)
{
jint fd = fdval(env, fdo);
void *buf = (void *)jlong_to_ptr(address);
return convertReturnVal(env, read(fd, buf, len), JNI_TRUE);
}
JNIEXPORT jint JNICALL
Java_sun_nio_ch_FileDispatcherImpl_pread0(JNIEnv *env, jclass clazz, jobject fdo,
jlong address, jint len, jlong offset)
{
jint fd = fdval(env, fdo);
void *buf = (void *)jlong_to_ptr(address);
return convertReturnVal(env, pread64(fd, buf, len, offset), JNI_TRUE);
}
调用的也就是系统函数的read和pread。
使用FileChannel并且使用了DirectByteBuffer就可以省去拷贝到java堆空间的操作了,读取速度肯定是有提高的,但是java堆的堆空间是运行时就开辟出来的,native的得开始申请,这个也是有时间消耗的,所以具体的运行速度还是看情况的,单纯看文件读取到内存这块,毕竟还是省去了一部分操作,FileChannel效果更好。
map
#define mmap64 mmap
JNIEXPORT jlong JNICALL
Java_sun_nio_ch_FileChannelImpl_map0(JNIEnv *env, jobject this,
jint prot, jlong off, jlong len)
{
void *mapAddress = 0;
jobject fdo = (*env)->GetObjectField(env, this, chan_fd);
jint fd = fdval(env, fdo);
int protections = 0;
int flags = 0;
if (prot == sun_nio_ch_FileChannelImpl_MAP_RO) {
protections = PROT_READ;
flags = MAP_SHARED;
} else if (prot == sun_nio_ch_FileChannelImpl_MAP_RW) {
protections = PROT_WRITE | PROT_READ;
flags = MAP_SHARED;
} else if (prot == sun_nio_ch_FileChannelImpl_MAP_PV) {
protections = PROT_WRITE | PROT_READ;
flags = MAP_PRIVATE;
}
//映射
mapAddress = mmap64(
0, /* Let OS decide location */
len, /* Number of bytes to map */
protections, /* File permissions */
flags, /* Changes are shared */
fd, /* File descriptor of mapped file */
off); /* Offset into file */
if (mapAddress == MAP_FAILED) {
if (errno == ENOMEM) {
JNU_ThrowOutOfMemoryError(env, "Map failed");
return IOS_THROWN;
}
return handle(env, -1, "Map failed");
}
return ((jlong) (unsigned long) mapAddress);
}
FileChannel的map使用的就是mmap,这个是真正把数据映射到内存了,不需要再经过内核态的数据拷贝了。
Files.copy和FileChannel.transferTo的比较
jdk7引入了Files这个类,方便了很多文件操作,但是很多人认为这个操作过于方便,不适合大文件等等,应该使用transferTo,transferFrom。
下面我们来看看两者从理论分析上哪个更快
public long transferTo(long position, long count,
WritableByteChannel target)
throws IOException
{
ensureOpen();
if (!target.isOpen())
throw new ClosedChannelException();
if (!readable)
throw new NonReadableChannelException();
if (target instanceof FileChannelImpl &&
!((FileChannelImpl)target).writable)
throw new NonWritableChannelException();
if ((position < 0) || (count < 0))
throw new IllegalArgumentException();
long sz = size();
if (position > sz)
return 0;
int icount = (int)Math.min(count, Integer.MAX_VALUE);
if ((sz - position) < icount)
icount = (int)(sz - position);
long n;
// Attempt a direct transfer, if the kernel supports it
if ((n = transferToDirectly(position, icount, target)) >= 0)
return n;
// Attempt a mapped transfer, but only to trusted channel types
if ((n = transferToTrustedChannel(position, icount, target)) >= 0)
return n;
// Slow path for untrusted targets
return transferToArbitraryChannel(position, icount, target);
}
这里使用了三种不同的尝试去拷贝文件
transferToDirectly最后调用的是transferTo0
JNIEXPORT jlong JNICALL
Java_sun_nio_ch_FileChannelImpl_transferTo0(JNIEnv *env, jobject this,
jobject srcFDO,
jlong position, jlong count,
jobject dstFDO)
{
jint srcFD = fdval(env, srcFDO);
jint dstFD = fdval(env, dstFDO);
#if defined(__linux__)
off64_t offset = (off64_t)position;
jlong n = sendfile64(dstFD, srcFD, &offset, (size_t)count);
if (n < 0) {
if (errno == EAGAIN)
return IOS_UNAVAILABLE;
if ((errno == EINVAL) && ((ssize_t)count >= 0))
return IOS_UNSUPPORTED_CASE;
if (errno == EINTR) {
return IOS_INTERRUPTED;
}
JNU_ThrowIOExceptionWithLastError(env, "Transfer failed");
return IOS_THROWN;
}
return n;
#elif defined (__solaris__)
sendfilevec64_t sfv;
size_t numBytes = 0;
jlong result;
sfv.sfv_fd = srcFD;
sfv.sfv_flag = 0;
sfv.sfv_off = (off64_t)position;
sfv.sfv_len = count;
result = sendfilev64(dstFD, &sfv, 1, &numBytes);
/* Solaris sendfilev() will return -1 even if some bytes have been
* transferred, so we check numBytes first.
*/
if (numBytes > 0)
return numBytes;
if (result < 0) {
if (errno == EAGAIN)
return IOS_UNAVAILABLE;
if (errno == EOPNOTSUPP)
return IOS_UNSUPPORTED_CASE;
if ((errno == EINVAL) && ((ssize_t)count >= 0))
return IOS_UNSUPPORTED_CASE;
if (errno == EINTR)
return IOS_INTERRUPTED;
JNU_ThrowIOExceptionWithLastError(env, "Transfer failed");
return IOS_THROWN;
}
return result;
#elif defined(__APPLE__)
off_t numBytes;
int result;
numBytes = count;
result = sendfile(srcFD, dstFD, position, &numBytes, NULL, 0);
if (numBytes > 0)
return numBytes;
if (result == -1) {
if (errno == EAGAIN)
return IOS_UNAVAILABLE;
if (errno == EOPNOTSUPP || errno == ENOTSOCK || errno == ENOTCONN)
return IOS_UNSUPPORTED_CASE;
if ((errno == EINVAL) && ((ssize_t)count >= 0))
return IOS_UNSUPPORTED_CASE;
if (errno == EINTR)
return IOS_INTERRUPTED;
JNU_ThrowIOExceptionWithLastError(env, "Transfer failed");
return IOS_THROWN;
}
return result;
#elif defined(_AIX)
jlong max = (jlong)java_lang_Integer_MAX_VALUE;
struct sf_parms sf_iobuf;
jlong result;
if (position > max)
return IOS_UNSUPPORTED_CASE;
if (count > max)
count = max;
memset(&sf_iobuf, 0, sizeof(sf_iobuf));
sf_iobuf.file_descriptor = srcFD;
sf_iobuf.file_offset = (off_t)position;
sf_iobuf.file_bytes = count;
result = send_file(&dstFD, &sf_iobuf, SF_SYNC_CACHE);
/* AIX send_file() will return 0 when this operation complete successfully,
* return 1 when partial bytes transfered and return -1 when an error has
* Occured.
*/
if (result == -1) {
if (errno == EWOULDBLOCK)
return IOS_UNAVAILABLE;
if ((errno == EINVAL) && ((ssize_t)count >= 0))
return IOS_UNSUPPORTED_CASE;
if (errno == EINTR)
return IOS_INTERRUPTED;
if (errno == ENOTSOCK)
return IOS_UNSUPPORTED;
JNU_ThrowIOExceptionWithLastError(env, "Transfer failed");
return IOS_THROWN;
}
if (sf_iobuf.bytes_sent > 0)
return (jlong)sf_iobuf.bytes_sent;
return IOS_UNSUPPORTED_CASE;
#else
return IOS_UNSUPPORTED_CASE;
#endif
}
这里最后发现使用是sendfile的调用
private static final long MAPPED_TRANSFER_SIZE = 8L*1024L*1024L;
private long transferToTrustedChannel(long position, long count,
WritableByteChannel target)
throws IOException
{
boolean isSelChImpl = (target instanceof SelChImpl);
if (!((target instanceof FileChannelImpl) || isSelChImpl))
return IOStatus.UNSUPPORTED;
// Trusted target: Use a mapped buffer
long remaining = count;
while (remaining > 0L) {
long size = Math.min(remaining, MAPPED_TRANSFER_SIZE);
try {
MappedByteBuffer dbb = map(MapMode.READ_ONLY, position, size);
try {
// ## Bug: Closing this channel will not terminate the write
int n = target.write(dbb);
assert n >= 0;
remaining -= n;
if (isSelChImpl) {
// one attempt to write to selectable channel
break;
}
assert n > 0;
position += n;
} finally {
unmap(dbb);
}
} catch (ClosedByInterruptException e) {
// target closed by interrupt as ClosedByInterruptException needs
// to be thrown after closing this channel.
assert !target.isOpen();
try {
close();
} catch (Throwable suppressed) {
e.addSuppressed(suppressed);
}
throw e;
} catch (IOException ioe) {
// Only throw exception if no bytes have been written
if (remaining == count)
throw ioe;
break;
}
}
return count - remaining;
}
transferToTrustedChannel是通过了mmap,一次最大是使用8m。
transferToArbitraryChannel下面代码有个一次分配的最大值8192。只选取长度小的来申请空间。
private static final int TRANSFER_SIZE = 8192;
private long transferFromArbitraryChannel(ReadableByteChannel src,
long position, long count)
throws IOException
{
// Untrusted target: Use a newly-erased buffer
int c = (int)Math.min(count, TRANSFER_SIZE);
ByteBuffer bb = Util.getTemporaryDirectBuffer(c);
long tw = 0; // Total bytes written
long pos = position;
try {
Util.erase(bb);
while (tw < count) {
bb.limit((int)Math.min((count - tw), (long)TRANSFER_SIZE));
// ## Bug: Will block reading src if this channel
// ## is asynchronously closed
int nr = src.read(bb);
if (nr <= 0)
break;
bb.flip();
int nw = write(bb, pos);
tw += nw;
if (nw != nr)
break;
pos += nw;
bb.clear();
}
return tw;
} catch (IOException x) {
if (tw > 0)
return tw;
throw x;
} finally {
Util.releaseTemporaryDirectBuffer(bb);
}
}
重要的方法就是里面的read和write了。
private int readInternal(ByteBuffer dst, long position) throws IOException {
assert !nd.needsPositionLock() || Thread.holdsLock(positionLock);
int n = 0;
int ti = -1;
try {
begin();
ti = threads.add();
if (!isOpen())
return -1;
do {
n = IOUtil.read(fd, dst, position, nd);
} while ((n == IOStatus.INTERRUPTED) && isOpen());
return IOStatus.normalize(n);
} finally {
threads.remove(ti);
end(n > 0);
assert IOStatus.check(n);
}
}
read走到了IOUtil.read,最后就是上面readIntoNativeBuffer的方法,最后调用的就是底层的read和pread。write最后走到的就是pwrite和write的系统调用。方法的位置在solaris\native\sun\nio\ch\FileDispatcherImpl.c
Files的实现在sun\nio\fs\UnixCopyFile.java中调用了native方法transfer。
JNIEXPORT void JNICALL
Java_sun_nio_fs_UnixCopyFile_transfer
(JNIEnv* env, jclass this, jint dst, jint src, jlong cancelAddress)
{
char buf[8192];
volatile jint* cancel = (jint*)jlong_to_ptr(cancelAddress);
for (;;) {
ssize_t n, pos, len;
RESTARTABLE(read((int)src, &buf, sizeof(buf)), n);
if (n <= 0) {
if (n < 0)
throwUnixException(env, errno);
return;
}
if (cancel != NULL && *cancel != 0) {
throwUnixException(env, ECANCELED);
return;
}
pos = 0;
len = n;
do {
char* bufp = buf;
bufp += pos;
RESTARTABLE(write((int)dst, bufp, len), n);
if (n == -1) {
throwUnixException(env, errno);
return;
}
pos += n;
len -= n;
} while (len > 0);
}
}
这里的buffer也一样是8192。系统调用也是read和write。
相比之下transferTo的效果要更好一些。
笔者以前根据jdk7的IO特性,写了一个工具包https://gitee.com/xpbob/commonIO里面有响应的代码,可以在不同的环境下做一下测试。
总结
java bio中最终都是系统函数的调用,外面说的各种神奇的地方或多或少都有偏差,所以想更好的理解java,一定的c功底还是需要的。
很多人理解java nio直接就是非阻塞io,其实nio是new io的简称,从代码的角度看,旧的io是所有的数据都在java堆中的,而新的io其实更多的io数据在直接内存里,减少了native堆到java堆的拷贝。