背景
前一段时间一直在关注一些nio的相关技术,也和公司的架构师交流了一下,学到了一些相关原理,比如nio的优势和劣势。以及一些排查nio bug问题的方式,受益量多。为自己做一下技术储备,以后可以多玩玩nio的相关技术
知识点
unix网络编程第6章: 几种unix下的I/O模型。
- 阻塞I/O模型 (java io)
- 非阻塞I/O模型
- I/O复用 (java 5的nio)
- 信号驱动I/O
- 异步I/O (java7的aio)
1.#include<sys/select.h>
2.
3.int select(int maxfdp1, fd_set *restrict readfds, fd_set *restrict writefds,fd_set *restrict exceptfds, struct timeval* restrict tvptr);
1.#include <sys/select.h>
2.
3.int pselect(int maxfdp1, fd_set *restrict readfds, fd_set *restrict writefds,fd_set *restrict exceptfds, const struct timespec *restrict tsptr,const sigset_t *restrict sigmask);
1.#include <poll.h>
2.
3.int poll(struct pollfd fdarray[], nfds_t nfds, int timeout);
1.struct pollfd {
2. int fd; /* file descriptor to check, or <0 to ignore */
3. short events; /* events of interest on fd */
4. short revents; /* events that occurred on fd */
5.};
2. nfds代表pollfd的长度
返回值:0超时,-1出错, 正数代表准备好的描述符
同样变种的有ppoll函数,具体可以见man ppoll,两者的区别和select/pselect区别一样,多了时间精度的支持+信号屏蔽字
和select系列的区别点,poll不再受限于select中位数组的长度限制,我们可以将关心的描述符添加到poolfd中。
再看epoll函数,是对select/poll的一个增强版:
1. 因为它不会复用文件描述符集合来传递结果而迫使开发者每次等待事件之前都必须重新准备要被侦听的文件描述符集合
2. 另一点原因就是获取事件的时候,它无须遍历整个被侦听的描述符集,只要遍历那些被内核IO事件异步唤醒而加入Ready队列的描述符集合就行了。
epoll的除了提供select/poll 那种IO事件的电平触发(Level Triggered)外,还提供了边沿触发(Edge Triggered),这就使得用户空间程序有可能缓存IO状态,减少epoll_wait/epoll_pwait的调用,提高应用程序效率。
电平触发(Level Triggered): select()和poll()将就绪的文件描述符告诉进程后,如果进程没有对其进行IO操作,那么下次调用select()和poll()的时候将再次报告这些文件描述符,所以它们一般不会丢失就绪的消息
边沿触发(Edge Triggered: 只告诉进程哪些文件描述符刚刚变为就绪状态,它只说一遍,如果我们没有采取行动,那么它将不会再次告知,这种方式称为边缘触发。
几个接口:
1.int epoll_create(int size);
2.
3.int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
4.1. 第一个参数是epoll_create()的返回值.
5.2. 第二个参数表示动作,用三个宏来表示:
6. EPOLL_CTL_ADD:注册新的fd到epfd中;
7. EPOLL_CTL_MOD:修改已经注册的fd的监听事件;
8. EPOLL_CTL_DEL:从epfd中删除一个fd;
9.3. 第三个参数是需要监听的fd
10.4. 第四个参数是告诉内核需要监听什么事
11.events可以是以下几个宏的集合:
12. EPOLLIN :表示对应的文件描述符可以读(包括对端SOCKET正常关闭);
13. EPOLLOUT:表示对应的文件描述符可以写;
14. EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来);
15. EPOLLERR:表示对应的文件描述符发生错误;
16. EPOLLHUP:表示对应的文件描述符被挂断;
17. EPOLLET: 将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的。
18.
19.int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
20.1. 等待事件的产生,类似于select()调用。参数events用来从内核得到事件的集合,maxevents告之内核这个events有多大,这个 maxevents的值不能大于创建epoll_create()时的size,
21.2. 参数timeout是超时时间(毫秒,0会立即返回,-1将不确定,也有 说法说是永久阻塞)。
22.该函数返回需要处理的事件数目,如返回0表示已超时。
1.public static SelectorProvider create() {
2. PrivilegedAction pa = new GetPropertyAction("os.name");
3. String osname = (String) AccessController.doPrivileged(pa);
4. if ("SunOS".equals(osname)) {
5. return new sun.nio.ch.DevPollSelectorProvider();
6. }
7.
8. // use EPollSelectorProvider for Linux kernels >= 2.6
9. if ("Linux".equals(osname)) {
10. pa = new GetPropertyAction("os.version");
11. String osversion = (String) AccessController.doPrivileged(pa);
12. String[] vers = osversion.split("\\.", 0);
13. if (vers.length >= 2) {
14. try {
15. int major = Integer.parseInt(vers[0]);
16. int minor = Integer.parseInt(vers[1]);
17. if (major > 2 || (major == 2 && minor >= 6)) {
18. return new sun.nio.ch.EPollSelectorProvider();
19. }
20. } catch (NumberFormatException x) {
21. // format not recognized
22. }
23. }
24. }
25.
26. return new sun.nio.ch.PollSelectorProvider();
27. }
比较明显: 如果当前是sunos系统,直接使用DevPoll,在linux 2.6内核下,使用Epoll模型,否则使用Poll。
DevPoll估计是sunos自己整的一套poll模型,公司一般使用redhat系列,内核2.6.18,64位主机。所以就介绍下Epoll的实现
java实现类: EPollSelectorImpl
1.// wake up使用的两描述符
2.protected int fd0;
3.protected int fd1;
4.
5.// The poll object , native的实现
6.EPollArrayWrapper pollWrapper;
7.
8.// Maps from file descriptors to keys , 文件描述符和SelectKey的关系
9.private HashMap fdToKey;
1.PollSelectorImpl类: ==============
2.
3.protected int doSelect(long timeout) //具体执行epoll调用
4. throws IOException
5. {
6. if (closed)
7. throw new ClosedSelectorException();
8. processDeregisterQueue();
9. try {
10. begin();
11. pollWrapper.poll(timeout);
12. } finally {
13. end();
14. }
15. processDeregisterQueue();
16. int numKeysUpdated = updateSelectedKeys();
17. if (pollWrapper.interrupted()) {
18. // Clear the wakeup pipe
19. pollWrapper.putEventOps(pollWrapper.interruptedIndex(), 0);
20. synchronized (interruptLock) {
21. pollWrapper.clearInterrupted();
22. IOUtil.drain(fd0);
23. interruptTriggered = false;
24. }
25. }
26. return numKeysUpdated;
27. }
28.
29.继续看下EPollArrayWrappe : ==========================
30.
31. int poll(long timeout) throws IOException {
32. updateRegistrations();
33. updated = epollWait(pollArrayAddress, NUM_EPOLLEVENTS, timeout, epfd);
34. for (int i=0; i<updated; i++) { // 这里判断下当前响应的描述符是否为fd0,后面再细说
35. if (getDescriptor(i) == incomingInterruptFD) {
36. interruptedIndex = i;
37. interrupted = true;
38. break;
39. }
40. }
41. return updated;
42. }
43.<span style="white-space: normal;">
44.继续看下EPollArrayWrapper 的native实现: epollWait(): ====================
45.</span><span style="white-space: normal;"><pre class="java" name="code">JNIEXPORT jint JNICALL
46.Java_sun_nio_ch_EPollArrayWrapper_epollWait(JNIEnv *env, jobject this,
47. jlong address, jint numfds,
48. jlong timeout, jint epfd)
49.{
50. struct epoll_event *events = jlong_to_ptr(address);
51. int res;
52.
53. if (timeout <= 0) { /* Indefinite or no wait */
54. RESTARTABLE((*epoll_wait_func)(epfd, events, numfds, timeout), res);
55. } else { /* Bounded wait; bounded restarts */
56. res = iepoll(epfd, events, numfds, timeout);
57. }
58.
59. if (res < 0) {
60. JNU_ThrowIOExceptionWithLastError(env, "epoll_wait failed");
61. }
62. return res;
63.}
64.
65.static int iepoll(int epfd, struct epoll_event *events, int numfds, jlong timeout)
66.{
67. jlong start, now;
68. int remaining = timeout;
69. struct timeval t;
70. int diff;
71.
72. gettimeofday(&t, NULL);
73. start = t.tv_sec * 1000 + t.tv_usec / 1000; //转化为ns单位
74.
75. for (;;) {
76. int res = (*epoll_wait_func)(epfd, events, numfds, timeout);
77. if (res < 0 && errno == EINTR) { //处理异常
78. if (remaining >= 0) {
79. gettimeofday(&t, NULL);
80. now = t.tv_sec * 1000 + t.tv_usec / 1000;
81. diff = now - start;
82. remaining -= diff;
83. if (diff < 0 || remaining <= 0) {
84. return 0;
85. }
86. start = now;
87. }
88. } else {
89. return res;
90. }
91. }
92.}</pre>
93.</span>
1.EPollSelectorImpl类:
2.
3. EPollSelectorImpl(SelectorProvider sp) {
4. super(sp);
5. int[] fdes = new int[2];
6. IOUtil.initPipe(fdes, false);
7. fd0 = fdes[0];
8. fd1 = fdes[1];
9. pollWrapper = new EPollArrayWrapper();
10. pollWrapper.initInterrupt(fd0, fd1); // 设置中断的两个描述符
11. fdToKey = new HashMap();
12. }
13. public Selector wakeup() {
14. synchronized (interruptLock) {
15. if (!interruptTriggered) {
16. pollWrapper.interrupt(); //调用warpper进行中断
17. interruptTriggered = true;
18. }
19. }
20. return this;
21. }
22.
23.继续看下EPollArrayWrapper :
24.
25. void initInterrupt(int fd0, int fd1) {
26. outgoingInterruptFD = fd1; //保存pipeline的描述符
27. incomingInterruptFD = fd0;
28. epollCtl(epfd, EPOLL_CTL_ADD, fd0, EPOLLIN); //注册到epoll上。
29. }
30. public void interrupt() {
31. interrupt(outgoingInterruptFD); //调用native方法
32. }
33.
34.继续看下EPollArrayWrapper.c native实现:
35.
36.JNIEXPORT void JNICALL Java_sun_nio_ch_EPollArrayWrapper_interrupt(JNIEnv *env, jobject this, jint fd)
37.{
38. int fakebuf[1];
39. fakebuf[0] = 1;
40. if (write(fd, fakebuf, 1) < 0) { //发送一字节的内容,让epoll_wait()能得到及时响应
41. JNU_ThrowIOExceptionWithLastError(env,"write to interrupt fd failed");
42. }
43.}
实现方式也是挺简单的,弄了两个fd,一个往另一个写1byte的内容,促使epoll_wait能得到响应。
异步I/O模型:
暂时还未真实用过,只是大致看过其api,有兴趣可以自己baidu。
最后
后续会起一篇,单独针对nio在服务端和客户端使用上的注意点,主要是吸收了一些大牛门的经验,需要做个总结,消化一下。