ZeroMQ(java)中对IO的封装(StreamEngine)

简介: 哎,各种各样杂七杂八的事情。。。好久没有看代码了,其实要搞明白一个与IO相关的框架,最好的办法就是把它的I/0的读写两个过程搞清楚。。。例如在netty中,如果能将eventLoop的运行原理搞清楚,然后摸清楚整个I/O读写两个过程,那么也就差不太多了。

哎,各种各样杂七杂八的事情。。。好久没有看代码了,其实要搞明白一个与IO相关的框架,最好的办法就是把它的I/0的读写两个过程搞清楚。。。例如在netty中,如果能将eventLoop的运行原理搞清楚,然后摸清楚整个I/O读写两个过程,那么也就差不太多了。。。。

这次来看看ZeroMQ(java)中如何来处理I/O的,先来看看一个类型的定义,IOObject类型,这个类型应该扮演的是工具类的形象,前面看过在ZeroMQ中所谓的IO线程的定义,那么IOObject就是用于直接与IO线程交互的,或者说的更直接的一点就是它是与IO线程里的poller对象交互的。。。

那么先来看看IOObject的类图吧:



这张图应该将IOObject与IOThread以及Poller之间的关系表现的很清楚了吧。。。。IOObject实现了IPollEvents接口,那么也就代表它可以响应IO事件。。。不过其实它并不直接实现这些IO事件,而是将其委托给内部的一个IPollEvents对象。。只不过是做了一层代理而已。。。


好了,接下来来看看IOObject的代码吧,先来看看它的属性申明:

[java]  view plain copy 在CODE上查看代码片 派生到我的代码片
  1. private Poller poller;   //poller对象  
  2. private IPollEvents handler;   //用于执行事件回调的handler  

这个poller就是从IO线程里面获取过来的,handler就是刚刚提到的事件回调的处理对象。。。IOObject不过是对其进行了一层包装而已。。。

那么接下来来看看重要的方法定义:

[java]  view plain copy 在CODE上查看代码片 派生到我的代码片
  1. //在将一个IO对象加入到一个IO线程的时候,要注意确定当前IO对象之前没有加入到任何IO线程或者已经从别的IO线程上面退下来了  
  2. //将当前这个IO对象加入到IO线程上面去,说白了主要是获取这个IO线程的poller对象  
  3. public void plug(IOThread io_thread_) {  
  4.     assert (io_thread_ != null);  
  5.     assert (poller == null);  
  6.     poller = io_thread_.get_poller ();      //获取这个线程的poller对象  
  7. }  

这个方法用于将当前这个IO对象加入到一个IO线程,其实主要的是要获取这个IO线程的Poller对象。。好了,接下来再来看看如何注册channel以及事件吧:

[java]  view plain copy 在CODE上查看代码片 派生到我的代码片
  1. //在poller里面移除channel  
  2. public final void rm_fd(SelectableChannel handle) {  
  3.     poller.rm_fd(handle);  
  4. }  
  5. //给这个channel注册读取的事件  
  6. public final void set_pollin (SelectableChannel handle_) {  
  7.     poller.set_pollin (handle_);  
  8. }  
  9. //在这个channel上面注册写事件  
  10. public final void set_pollout (SelectableChannel handle_) {  
  11.     poller.set_pollout (handle_);  
  12. }  
  13. //注册链接事件  
  14. public final void set_pollconnect(SelectableChannel handle) {  
  15.     poller.set_pollconnect(handle);  
  16. }  
  17. //注册accept事件  
  18. public final void set_pollaccept(SelectableChannel handle) {  
  19.     poller.set_pollaccept(handle);  
  20. }  
  21. //取消读取事件的注册  
  22. public final void reset_pollin(SelectableChannel handle) {  
  23.     poller.reset_pollin (handle);  
  24. }  
  25.   
  26. //取消写事件的注册  
  27. public final void reset_pollout(SelectableChannel handle) {  
  28.     poller.reset_pollout (handle);  
  29. }  

这部分代码应该很简单吧,而且应该对IOObject的用处比较的清楚了,然后至于说IOObject对象如何响应in_event什么的,前面已经说过了,其实是委托给了handler对象来处理。。。好啦,IOObject的分析就算差不多了。。接下来来看看StreamEngine类型的实现吧,还是先来看看它初略的类图吧:



其实觉得看一个类的类图,基本上就能看出这个类的很多情况,好了,不说闲话了,来看看它的属性的定义吧:

[java]  view plain copy 在CODE上查看代码片 派生到我的代码片
  1. private static final int GREETING_SIZE = 12;   //问候msg的大小,12个字节  (10字节的头,1字节的版本,1字节的socket类型)  
  2.   
  3. //  True iff we are registered with an I/O poller.  
  4. private boolean io_enabled;   //如果是true的话,表示当前已经注册到了poller上面去  
  5.   
  6. private SocketChannel handle;   //真正底层用于通信的socketChannel  
  7.   
  8. private ByteBuffer inbuf;  //接收数据的buf  
  9. private int insize;   //记录接收的数据的大小  
  10. private DecoderBase decoder;  //decoder  
  11.   
  12. private Transfer outbuf;   //outbuf  
  13. private int outsize;   //outbuf的大小  
  14. private EncoderBase encoder;  //encoder  
  15.   
  16. //  When true, we are still trying to determine whether  
  17. //  the peer is using versioned protocol, and if so, which  
  18. //  version.  When false, normal message flow has started.  
  19. private boolean handshaking;  //是否是在握手中,当值为false的时候代表握手已经完成了  
  20.   
  21. //  The receive buffer holding the greeting message  
  22. //  that we are receiving from the peer.  
  23. private final ByteBuffer greeting;  //用于接收问候msg的buf  
  24.   
  25. //  The send buffer holding the greeting message  
  26. //  that we are sending to the peer.  
  27. private final ByteBuffer greeting_output_buffer;  //用于发送问候msg的buf  
  28.   
  29. private SessionBase session;    //所属的session  
  30.   
  31.   
  32. private Options options;  //选项配置  
  33.   
  34. // String representation of endpoint  
  35. private String endpoint;   //这里一般是地址信息  
  36.   
  37. private boolean plugged;   //是否已经加入了  
  38. private boolean terminating;  //是否已经停止了  
  39.   
  40. // Socket  
  41. private SocketBase socket;  //所属的socket  
  42.   
  43. private IOObject io_object;    //拥有的IO对象  

这里面有很多重要的属性,例如handler是SocketChannel类型的,可以知道它才是实际上底层用于通信的,然后又inbuf以及outbuf,这两个东西是干嘛用的应该一眼就看出来了吧,然后还有encoder和decoder,呵呵,可以猜到,读取到的数据先要经过decoder的处理才提交给上层,发送出去的数据也会通过encoder处理成二进制再发送出去。。。然后还有一个io_objcet对象。。。

接下来来看看构造方法吧:

[java]  view plain copy 在CODE上查看代码片 派生到我的代码片
  1. //构造函数,第一个参数是底层的channel,  
  2. public StreamEngine (SocketChannel fd_, final Options options_, final String endpoint_)   
  3. {  
  4.     handle = fd_;  
  5.     inbuf = null;  
  6.     insize = 0;  
  7.     io_enabled = false;  
  8.     outbuf = null;  
  9.     outsize = 0;  
  10.     handshaking = true;  //初始化为ture,表示还没有完成握手  
  11.     session = null;  
  12.     options = options_;  
  13.     plugged = false;  
  14.     terminating = false;  
  15.     endpoint = endpoint_;  
  16.     socket = null;  
  17.     greeting = ByteBuffer.allocate (GREETING_SIZE);  //创建用于接收问候msg的buf  
  18.     greeting_output_buffer = ByteBuffer.allocate (GREETING_SIZE);   //创建用于发送握手信息的buf  
  19.     encoder = null;  
  20.     decoder = null;  
  21.   
  22.   
  23.     try {  
  24.         Utils.unblock_socket (handle);  //将底层的channel设置为非阻塞的  
  25.         if (options.sndbuf != 0) {  //设置底层的socket的发送缓冲大小  
  26.             handle.socket().setSendBufferSize((int)options.sndbuf);  
  27.         }  
  28.         if (options.rcvbuf != 0) {  //设置底层的socket的接收缓冲大小  
  29.             handle.socket().setReceiveBufferSize((int)options.rcvbuf);  
  30.         }  
  31.   
  32.     } catch (IOException e) {  
  33.         throw new ZError.IOException(e);  
  34.     }  
  35.   
  36. }  

这个比较有意思的就是将channel设置为了非阻塞的模式,然后设置了底层socket的发送以及接受缓冲的大小。。其余的就没啥意思了。。。

[java]  view plain copy 在CODE上查看代码片 派生到我的代码片
  1. //将当前engine加入到IO线程以及session,其实这里最主要的事情是将channel注册到poller上面去  
  2. public void plug (IOThread io_thread_,  
  3.         SessionBase session_)  {  
  4.     assert (!plugged);  
  5.     plugged = true;  //标志位  
  6.       
  7.     //  Connect to session object.  
  8.     assert (session == null);  
  9.     assert (session_ != null);  
  10.     session = session_;    //当前所属的session  
  11.     socket = session.get_soket ();  //获取所属的scoekt,这个是ZMQ的socket  
  12.   
  13.     io_object = new IOObject(null);  //创建IO对象,  
  14.     io_object.set_handler(this);  //设置IO对象的事件回调  
  15.     //  Connect to I/O threads poller object.  
  16.     io_object.plug (io_thread_);  // 将IO对象搞到这个IO线程上面去,其实最主要的就是获取这个IO线程的poller对象  
  17.     io_object.add_fd (handle);   //将底层的channel加入  
  18.     io_enabled = true; //表示已经加入了  
  19.       
  20.     //  Send the 'length' and 'flags' fields of the identity message.  
  21.     //  The 'length' field is encoded in the long format.  
  22.     //设置发送的问候msg的信息  
  23.     greeting_output_buffer.put ((byte) 0xff);  
  24.     greeting_output_buffer.putLong (options.identity_size + 1);  
  25.     greeting_output_buffer.put ((byte) 0x7f);  
  26.   
  27.     io_object.set_pollin (handle);  //注册当前channel的读事件  
  28.     //  When there's a raw custom encoder, we don't send 10 bytes frame  
  29.     boolean custom = false;  
  30.     try {  
  31.         custom = options.encoder != null && options.encoder.getDeclaredField ("RAW_ENCODER") != null;  
  32.     } catch (SecurityException e) {  
  33.     } catch (NoSuchFieldException e) {  
  34.     }  
  35.       
  36.     if (!custom) {  
  37.         outsize = greeting_output_buffer.position ();  
  38.         outbuf = new Transfer.ByteBufferTransfer ((ByteBuffer) greeting_output_buffer.flip ());  //设置需要发送的buf,将问候信息发送出去  
  39.         io_object.set_pollout (handle);  
  40.     }          
  41.       
  42.     //  Flush all the data that may have been already received downstream.  
  43.     in_event ();  //看是否有数据读取了  
  44. }  

这个方法用于将当前IO对象注册到IO线程上面去,并且还要管理session,可以看到这里主要是利用IOObject对象,用于在poller对象上面注册channel,以及读写事件。。。另外还有对握手信息的处理。。。好了,握手这部分的内容,因为现在还没有看,不知道具体的流程是啥样的,就先放一下。。。再来看两个重要的方法定义吧:

[java]  view plain copy 在CODE上查看代码片 派生到我的代码片
  1. //当底层的chanel有数据可以读取的时候的回调方法  
  2. public void in_event ()  {  
  3.     if (handshaking)  
  4.         if (!handshake ())  
  5.             return;  
  6.       
  7.     assert (decoder != null);  
  8.     boolean disconnection = false;  
  9.   
  10.     //  If there's no data to process in the buffer...  
  11.     if (insize == 0) {  //如果inbuf里面没有数据需要处理  
  12.   
  13.         //  Retrieve the buffer and read as much data as possible.  
  14.         //  Note that buffer can be arbitrarily large. However, we assume  
  15.         //  the underlying TCP layer has fixed buffer size and thus the  
  16.         //  number of bytes read will be always limited.  
  17.         inbuf = decoder.get_buffer ();  //从解码器里面获取buf,用于写入读取的数据,因为在已经设置了底层socket的TCP接收缓冲区的大小  
  18.         insize = read (inbuf);  //用于将发送过来的数据写到buf中去,并记录大小  
  19.         inbuf.flip();  //这里准备从buf里面读取数据了  
  20.   
  21.         //  Check whether the peer has closed the connection.  
  22.         if (insize == -1) {  //如果是-1的话,表示底层的socket连接已经出现了问题  
  23.             insize = 0;  
  24.             disconnection = true;  
  25.         }  
  26.     }  
  27.   
  28.     //  Push the data to the decoder.  
  29.     int processed = decoder.process_buffer (inbuf, insize);  //解析这些读取到的数据  
  30.   
  31.     if (processed == -1) {  
  32.         disconnection = true;  
  33.     } else {  
  34.   
  35.         //  Stop polling for input if we got stuck.  
  36.         if (processed < insize)  //如果处理的数据居然还没有读到的数据多,那么取消读取事件的注册  
  37.             io_object.reset_pollin (handle);  
  38.   
  39.         //  Adjust the buffer.  
  40.         insize -= processed;  //还剩下没有处理的数据的大小  
  41.     }  
  42.   
  43.     //  Flush all messages the decoder may have produced.  
  44.     session.flush ();  //将decoder解析出来的数据交给session  
  45.   
  46.     //  An input error has occurred. If the last decoded message  
  47.     //  has already been accepted, we terminate the engine immediately.  
  48.     //  Otherwise, we stop waiting for socket events and postpone  
  49.     //  the termination until after the message is accepted.  
  50.     if (disconnection) {   //表示已经断开了连接,那么需要处理一下  
  51.         if (decoder.stalled ()) {  
  52.             io_object.rm_fd (handle);  
  53.             io_enabled = false;  
  54.         } else  
  55.             error ();  
  56.     }  
  57.   
  58. }  
  59.   
  60. //表示可以写数据了  
  61. public void out_event ()   {  
  62.     //  If write buffer is empty, try to read new data from the encoder.  
  63.     if (outsize == 0) {  //需要写的数据量为0  
  64.   
  65.         //  Even when we stop polling as soon as there is no  
  66.         //  data to send, the poller may invoke out_event one  
  67.         //  more time due to 'speculative write' optimisation.  
  68.         if (encoder == null) {  
  69.              assert (handshaking);  
  70.              return;  
  71.         }  
  72.           
  73.         outbuf = encoder.get_data (null);  //从encoder里面获取数据  
  74.         outsize = outbuf.remaining();  
  75.         //  If there is no data to send, stop polling for output.  
  76.         if (outbuf.remaining() == 0) {   //如果确实没有数据要写,那么取消写事件的注册  
  77.             io_object.reset_pollout (handle);  
  78.               
  79.             // when we use custom encoder, we might want to close  
  80.             if (encoder.is_error()) {  
  81.                 error();  
  82.             }  
  83.   
  84.             return;  
  85.         }  
  86.     }  
  87.   
  88.     //  If there are any data to write in write buffer, write as much as  
  89.     //  possible to the socket. Note that amount of data to write can be  
  90.     //  arbitratily large. However, we assume that underlying TCP layer has  
  91.     //  limited transmission buffer and thus the actual number of bytes  
  92.     //  written should be reasonably modest.  
  93.     int nbytes = write (outbuf);  //写数据  
  94.   
  95.     //  IO error has occurred. We stop waiting for output events.  
  96.     //  The engine is not terminated until we detect input error;  
  97.     //  this is necessary to prevent losing incomming messages.  
  98.     if (nbytes == -1) {  //如果-1,那么表示底层用到的socket其实已经出现了问题  
  99.         io_object.reset_pollout (handle);  //取消写事件的注册  
  100.   
  101.         if (terminating)  
  102.             terminate ();  
  103.   
  104.         return;  
  105.     }  
  106.   
  107.     outsize -= nbytes;  //这里更新需要写的数据的数量  
  108.   
  109.     //  If we are still handshaking and there are no data  
  110.     //  to send, stop polling for output.  
  111.     if (handshaking)  
  112.         if (outsize == 0)  
  113.             io_object.reset_pollout (handle);  
  114.       
  115.     // when we use custom encoder, we might want to close after sending a response  
  116.     if (outsize == 0) {  
  117.         if (encoder != null && encoder.is_error ()) {  
  118.             error();  
  119.             return;  
  120.         }  
  121.         if (terminating)  
  122.             terminate ();  
  123.     }  
  124.   
  125. }  

这两个方法是用于相应IO事件的,前面提到的IOObject将IO事件其实委托给了内部的handler来处理,其实这个handler对象就是SteamEngine对象,也就是底层的channel有数据可以读写的时候,将会用上面的两个方法来处理。这里就可以看到读写事件最原始的处理流程了,而且也看到了encoder以及decoder的用处。。。这里代码应该还算是比较的简单,由于这部分还涉及到与上层的session对象之间的交互,这个还要等到以后来分析。。。


好了,那么到这里ZeroMQ中IO的处理流程也就算是有了基本的了解了。。。。

若转载请注明出处!若有疑问,请回复交流!
目录
相关文章
|
3月前
|
安全 Java 编译器
Java的封装详解
封装和多态是面向对象编程(OOP)的重要概念。封装通过私有属性和公共方法实现数据隐藏和保护,使类的内部细节对外部不可见;多态则通过方法重载和重写实现同一方法在不同对象上的不同表现形式,增强了代码的灵活性和可维护性。两者结合使用,可以使Java程序更加安全、灵活且易于维护。
255 82
|
3月前
|
Java
Java的封装详解
封装是Java中实现数据隐藏和保护的核心机制。它通过将对象的状态和行为结合并限制外部直接访问,确保类的内部细节对外不可见,仅能通过公共方法访问和修改对象状态。封装带来了数据隐藏、提高代码可维护性和增强安全性等好处。在Java中,封装主要通过将属性设为私有并提供getter和setter方法来实现。这种方式不仅保护了数据完整性,还允许在修改类内部实现时不影响外部代码,从而提升程序的健壮性和可读性。
289 80
|
2月前
|
存储 缓存 Java
java基础:IO流 理论与代码示例(详解、idea设置统一utf-8编码问题)
这篇文章详细介绍了Java中的IO流,包括字符与字节的概念、编码格式、File类的使用、IO流的分类和原理,以及通过代码示例展示了各种流的应用,如节点流、处理流、缓存流、转换流、对象流和随机访问文件流。同时,还探讨了IDEA中设置项目编码格式的方法,以及如何处理序列化和反序列化问题。
86 1
java基础:IO流 理论与代码示例(详解、idea设置统一utf-8编码问题)
|
3月前
|
Java 编译器
封装,继承,多态【Java面向对象知识回顾①】
本文回顾了Java面向对象编程的三大特性:封装、继承和多态。封装通过将数据和方法结合在类中并隐藏实现细节来保护对象状态,继承允许新类扩展现有类的功能,而多态则允许对象在不同情况下表现出不同的行为,这些特性共同提高了代码的复用性、扩展性和灵活性。
封装,继承,多态【Java面向对象知识回顾①】
|
3月前
|
SQL Java 编译器
Java——类与对象(封装)
封装是面向对象编程中的概念,指将数据(属性)和相关操作(方法)组合成独立单元(类),使外部无法直接访问对象的内部状态,只能通过提供的方法进行交互,从而保护数据安全。例如,手机将各种组件封装起来,只暴露必要的接口供外部使用。实现封装时,使用`private`关键字修饰成员变量,并提供`get`和`set`方法进行访问和修改。此外,介绍了包的概念、导入包的方式及其注意事项,以及`static`关键字的使用,包括静态变量和方法的初始化与代码块的加载顺序。
50 10
Java——类与对象(封装)
|
3月前
|
安全 Java API
【Java面试题汇总】Java基础篇——String+集合+泛型+IO+异常+反射(2023版)
String常量池、String、StringBuffer、Stringbuilder有什么区别、List与Set的区别、ArrayList和LinkedList的区别、HashMap底层原理、ConcurrentHashMap、HashMap和Hashtable的区别、泛型擦除、ABA问题、IO多路复用、BIO、NIO、O、异常处理机制、反射
【Java面试题汇总】Java基础篇——String+集合+泛型+IO+异常+反射(2023版)
|
2月前
|
Java 数据处理 开发者
揭秘Java IO流:字节流与字符流的神秘面纱!
揭秘Java IO流:字节流与字符流的神秘面纱!
39 1
|
2月前
|
自然语言处理 Java 数据处理
Java IO流全解析:字节流和字符流的区别与联系!
Java IO流全解析:字节流和字符流的区别与联系!
95 1
|
3月前
|
安全 Java 数据安全/隐私保护
Java 封装怎么理解
封装是Java中的一种重要机制,它将对象的状态(数据)和行为(方法)打包在一起并控制外部访问权限,以保护数据不被随意修改。封装的主要目的包括数据保护、接口设计和增强模块性。通过使用`private`、`protected`及`public`等访问控制修饰符,结合getter和setter方法,可以有效隐藏对象内部实现细节。下面是一个简单的`BankAccount`类示例,展示了如何通过封装保护类的内部状态,确保数据安全和一致性,简化类的使用。理解封装有助于编写高质量代码和设计优秀程序架构。
40 9
|
3月前
|
Java 数据安全/隐私保护
Java 封装详解
在 Java 中,封装是面向对象编程的关键特性,通过将对象的状态(数据)和行为(方法)结合并利用访问控制保护数据,防止外部随意访问和修改。主要特点包括访问控制(如 `private` 和 `protected`)、数据隐藏及方法暴露(如 getter 和 setter)。封装的优点在于保护数据、隐藏实现细节、易于维护以及提高代码可读性。下面是一个简单的 `Person` 类封装示例,展示了如何通过 getter 和 setter 控制对类内部状态的访问,并进行合法性检查。总结而言,封装有助于构建清晰、易用且可维护的代码结构,是编写高质量 Java 程序的重要原则。
48 7