3. I/O Handler的管理
3.1. IO句柄与Select_Reactor的分发集成
3.1.1. dispatch_io_handlers 函数
ace/Select_Reactor_T.cpp dispatch_io_handlers 函数
template <class ACE_SELECT_REACTOR_TOKEN> int ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::dispatch_io_handlers (ACE_Select_Reactor_Handle_Set &dispatch_set, int &number_of_active_handles, int &number_of_handlers_dispatched) { ACE_TRACE ("ACE_Select_Reactor_T::dispatch_io_handlers"); // Handle output events (this code needs to come first to handle the // obscure case of piggy-backed data coming along with the final // handshake message of a nonblocking connection). if (this->dispatch_io_set (number_of_active_handles, number_of_handlers_dispatched, ACE_Event_Handler::WRITE_MASK, dispatch_set.wr_mask_, this->ready_set_.wr_mask_, &ACE_Event_Handler::handle_output) == -1) { number_of_active_handles -= number_of_handlers_dispatched; return -1; } // ACELIB_DEBUG ((LM_DEBUG, ACE_TEXT ("ACE_Select_Reactor_T::dispatch - EXCEPT\n"))); if (this->dispatch_io_set (number_of_active_handles, number_of_handlers_dispatched, ACE_Event_Handler::EXCEPT_MASK, dispatch_set.ex_mask_, this->ready_set_.ex_mask_, &ACE_Event_Handler::handle_exception) == -1) { number_of_active_handles -= number_of_handlers_dispatched; return -1; } // ACELIB_DEBUG ((LM_DEBUG, ACE_TEXT ("ACE_Select_Reactor_T::dispatch - READ\n"))); if (this->dispatch_io_set (number_of_active_handles, number_of_handlers_dispatched, ACE_Event_Handler::READ_MASK, dispatch_set.rd_mask_, this->ready_set_.rd_mask_, &ACE_Event_Handler::handle_input) == -1) { number_of_active_handles -= number_of_handlers_dispatched; return -1; } number_of_active_handles -= number_of_handlers_dispatched; return 0; }
3.1.2. dispatch_io_set 函数
ace/Select_Reactor_T.cpp dispatch_io_set 函数
template <class ACE_SELECT_REACTOR_TOKEN> int ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::dispatch_io_set (int number_of_active_handles, int &number_of_handlers_dispatched, int mask, ACE_Handle_Set &dispatch_mask, ACE_Handle_Set &ready_mask, ACE_EH_PTMF callback) { ACE_TRACE ("ACE_Select_Reactor_T::dispatch_io_set"); ACE_HANDLE handle; ACE_Handle_Set_Iterator handle_iter (dispatch_mask); while ((handle = handle_iter ()) != ACE_INVALID_HANDLE && number_of_handlers_dispatched < number_of_active_handles) { ++number_of_handlers_dispatched; this->notify_handle (handle, mask, ready_mask, this->handler_rep_.find (handle), callback); // clear the bit from that dispatch mask, // so when we need to restart the iteration (rebuilding the iterator...) // we will not dispatch the already dispatched handlers this->clear_dispatch_mask (handle, mask); if (this->state_changed_) { handle_iter.reset_state (); this->state_changed_ = false; } } return 0; }
3.1.3. notify_handle 函数
ace/Select_Reactor_T.cpp notify_handle 函数
template <class ACE_SELECT_REACTOR_TOKEN> void ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::notify_handle (ACE_HANDLE handle, ACE_Reactor_Mask mask, ACE_Handle_Set &ready_mask, ACE_Event_Handler *event_handler, ACE_EH_PTMF ptmf) { ACE_TRACE ("ACE_Select_Reactor_T::notify_handle"); // Check for removed handlers. if (event_handler == 0) return; bool const reference_counting_required = event_handler->reference_counting_policy ().value () == ACE_Event_Handler::Reference_Counting_Policy::ENABLED; // Call add_reference() if needed. if (reference_counting_required) { event_handler->add_reference (); } int const status = (event_handler->*ptmf) (handle); if (status < 0) this->remove_handler_i (handle, mask); else if (status > 0) ready_mask.set_bit (handle); // Call remove_reference() if needed. if (reference_counting_required) event_handler->remove_reference (); }
如果回调函数 status<0, 则调用移除该 handle 的对应 mask:this->remove_handler_i(handle,mask) 。
如果回调函数 status>0, 这表明该 handle 需要继续被调度处理,则需要将该 handle 设置到 ready_set_ 中再次被派发。
3.2. handler注册
3.2.1. register_handler 函数流程
ace/Select_Reactor_T.cpp register_handler 函数
template <class ACE_SELECT_REACTOR_TOKEN> int ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::register_handler (ACE_HANDLE handle, ACE_Event_Handler *handler, ACE_Reactor_Mask mask) { ACE_TRACE ("ACE_Select_Reactor_T::register_handler"); ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_TOKEN, ace_mon, this->token_, -1)); return this->register_handler_i (handle, handler, mask); }
3.2.2. register_handler_i 函数流程
ace/Select_Reactor_T.cpp register_handler 函数
template <class ACE_SELECT_REACTOR_TOKEN> int ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::register_handler_i (ACE_HANDLE handle, ACE_Event_Handler *event_handler, ACE_Reactor_Mask mask) { ACE_TRACE ("ACE_Select_Reactor_T::register_handler_i"); // Insert the <handle, event_handle> tuple into the Handler // Repository. return this->handler_rep_.bind (handle, event_handler, mask); }
3.2.3. bind 函数
映射数据结构定义
行26实现了真正的关联操作,this->event_handlers_.bind(handle,event_handler,entry); 其中变量 event_handlers_ 在程序中定义为*map_type*, 在Unix下具体定义如下typedefACE_Array_Base<value_type>map_type;,也即使用handle 句柄值作为数组的索引,实现了map的功能。在windows下使用了ACE_Hash_Map_Manager_Ex作为map实现的类, typedefACE_Hash_Map_Manager_Ex<key_type,value_type,ACE_Hash<key_type>,std::equal_to<key_type>,ACE_Null_Mutex>map_type;,其中 key_type的定义为:typedefACE_HANDLEkey_type;,另外value_type定义为 typedefACE_Event_Handler*value_type;。
ace/Select_Reactor_Base.cpp bind 函数
// Bind the <ACE_Event_Handler *> to the <ACE_HANDLE>. int ACE_Select_Reactor_Handler_Repository::bind (ACE_HANDLE handle, ACE_Event_Handler *event_handler, ACE_Reactor_Mask mask) { ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::bind"); if (event_handler == 0) return -1; if (handle == ACE_INVALID_HANDLE) handle = event_handler->get_handle (); if (this->invalid_handle (handle)) return -1; // Is this handle already in the Reactor? bool existing_handle = false; #if defined (ACE_WIN32) map_type::ENTRY * entry = 0; int const result = this->event_handlers_.bind (handle, event_handler, entry); if (result == -1) { return -1; } else if (result == 1) // Entry already exists. { // Cannot use a different handler for an existing handle. if (event_handler != entry->item ()) { return -1; } else { // Remember that this handle is already registered in the // Reactor. existing_handle = true; } } #else // Check if this handle is already registered. ACE_Event_Handler * const current_handler = this->event_handlers_[handle]; if (current_handler) { // Cannot use a different handler for an existing handle. if (current_handler != event_handler) return -1; // Remember that this handle is already registered in the // Reactor. existing_handle = true; } this->event_handlers_[handle] = event_handler; if (this->max_handlep1_ < handle + 1) this->max_handlep1_ = handle + 1; #endif /* ACE_WIN32 */ if (this->select_reactor_.is_suspended_i (handle)) { this->select_reactor_.bit_ops (handle, mask, this->select_reactor_.suspend_set_, ACE_Reactor::ADD_MASK); } else { this->select_reactor_.bit_ops (handle, mask, this->select_reactor_.wait_set_, ACE_Reactor::ADD_MASK); // Note the fact that we've changed the state of the <wait_set_>, // which is used by the dispatching loop to determine whether it can // keep going or if it needs to reconsult select(). // this->select_reactor_.state_changed_ = 1; } // If new entry, call add_reference() if needed. if (!existing_handle) event_handler->add_reference (); return 0; }
3.3. handler移除
3.3.1. remove_handler 函数流程
ace/Select_Reactor_T.cpp remove_handler 函数
template <class ACE_SELECT_REACTOR_TOKEN> int ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::remove_handler (ACE_HANDLE handle, ACE_Reactor_Mask mask) { ACE_TRACE ("ACE_Select_Reactor_T::remove_handler"); ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_TOKEN, ace_mon, this->token_, -1)); return this->remove_handler_i (handle, mask); }
3.3.2. remove_handler_i 函数流程
ace/Select_Reactor_T.cpp remove_handler_i 函数
template <class ACE_SELECT_REACTOR_TOKEN> int ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::remove_handler_i (ACE_HANDLE handle, ACE_Reactor_Mask mask) { ACE_TRACE ("ACE_Select_Reactor_T::remove_handler_i"); // Unbind this handle. return this->handler_rep_.unbind (handle, mask); }
3.3.3. unbind 函数
ace/Select_Reactor_Base.cpp unbind 函数
int ACE_Select_Reactor_Handler_Repository::unbind ( ACE_HANDLE handle, map_type::iterator pos, ACE_Reactor_Mask mask) { ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::unbind"); // Retrieve event handler before unbinding it from the map. The // iterator pointing to it will no longer be valid once the handler // is unbound. ACE_Event_Handler * const event_handler = (pos == this->event_handlers_.end () ? 0 : ACE_SELECT_REACTOR_EVENT_HANDLER (pos)); // Clear out the <mask> bits in the Select_Reactor's wait_set. this->select_reactor_.bit_ops (handle, mask, this->select_reactor_.wait_set_, ACE_Reactor::CLR_MASK); // And suspend_set. this->select_reactor_.bit_ops (handle, mask, this->select_reactor_.suspend_set_, ACE_Reactor::CLR_MASK); // Note the fact that we've changed the state of the <wait_set_>, // which is used by the dispatching loop to determine whether it can // keep going or if it needs to reconsult select(). // this->select_reactor_.state_changed_ = 1; // If there are no longer any outstanding events on this <handle> // then we can totally shut down the Event_Handler. bool const has_any_wait_mask = (this->select_reactor_.wait_set_.rd_mask_.is_set (handle) || this->select_reactor_.wait_set_.wr_mask_.is_set (handle) || this->select_reactor_.wait_set_.ex_mask_.is_set (handle)); bool const has_any_suspend_mask = (this->select_reactor_.suspend_set_.rd_mask_.is_set (handle) || this->select_reactor_.suspend_set_.wr_mask_.is_set (handle) || this->select_reactor_.suspend_set_.ex_mask_.is_set (handle)); bool complete_removal = false; if (!has_any_wait_mask && !has_any_suspend_mask) { #if defined (ACE_WIN32) if (event_handler != 0 && this->event_handlers_.unbind (pos) == -1) return -1; // Should not happen! #else this->event_handlers_[handle] = 0; if (this->max_handlep1_ == handle + 1) { // We've deleted the last entry, so we need to figure out // the last valid place in the array that is worth looking // at. ACE_HANDLE const wait_rd_max = this->select_reactor_.wait_set_.rd_mask_.max_set (); ACE_HANDLE const wait_wr_max = this->select_reactor_.wait_set_.wr_mask_.max_set (); ACE_HANDLE const wait_ex_max = this->select_reactor_.wait_set_.ex_mask_.max_set (); ACE_HANDLE const suspend_rd_max = this->select_reactor_.suspend_set_.rd_mask_.max_set (); ACE_HANDLE const suspend_wr_max = this->select_reactor_.suspend_set_.wr_mask_.max_set (); ACE_HANDLE const suspend_ex_max = this->select_reactor_.suspend_set_.ex_mask_.max_set (); // Compute the maximum of six values. this->max_handlep1_ = wait_rd_max; if (this->max_handlep1_ < wait_wr_max) this->max_handlep1_ = wait_wr_max; if (this->max_handlep1_ < wait_ex_max) this->max_handlep1_ = wait_ex_max; if (this->max_handlep1_ < suspend_rd_max) this->max_handlep1_ = suspend_rd_max; if (this->max_handlep1_ < suspend_wr_max) this->max_handlep1_ = suspend_wr_max; if (this->max_handlep1_ < suspend_ex_max) this->max_handlep1_ = suspend_ex_max; ++this->max_handlep1_; } #endif /* ACE_WIN32 */ // The handle has been completely removed. complete_removal = true; } if (event_handler == 0) return -1; bool const requires_reference_counting = event_handler->reference_counting_policy ().value () == ACE_Event_Handler::Reference_Counting_Policy::ENABLED; // Close down the <Event_Handler> unless we've been instructed not // to. if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::DONT_CALL) == 0) (void) event_handler->handle_close (handle, mask); // Call remove_reference() if the removal is complete and reference // counting is needed. if (complete_removal && requires_reference_counting) { (void) event_handler->remove_reference (); } return 0; }
警告
remove_handler 函数在后续调用 unbind 函数中,会触发对 handle_close 行107-108 的调用,需要注意,否则在 handle_close 函数中 使用 remove_handler 如果没有设置 DONT_CALL 标志,会导致 handle_close 的循环调用。
3.4. handler暂停
3.4.1. suspend_handler 函数
ace/Select_Reactor_T.cpp suspend_handler 函数
template <class ACE_SELECT_REACTOR_TOKEN> int ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::suspend_handler (ACE_HANDLE handle) { ACE_TRACE ("ACE_Select_Reactor_T::suspend_handler"); ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_TOKEN, ace_mon, this->token_, -1)); return this->suspend_i (handle); }
3.4.2. suspend_i 函数
ace/Select_Reactor_T.cpp suspend_i 函数
template <class ACE_SELECT_REACTOR_TOKEN> int ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::suspend_i (ACE_HANDLE handle) { ACE_TRACE ("ACE_Select_Reactor_T::suspend_i"); if (this->handler_rep_.find (handle) == 0) return -1; if (this->wait_set_.rd_mask_.is_set (handle)) { this->suspend_set_.rd_mask_.set_bit (handle); this->wait_set_.rd_mask_.clr_bit (handle); } if (this->wait_set_.wr_mask_.is_set (handle)) { this->suspend_set_.wr_mask_.set_bit (handle); this->wait_set_.wr_mask_.clr_bit (handle); } if (this->wait_set_.ex_mask_.is_set (handle)) { this->suspend_set_.ex_mask_.set_bit (handle); this->wait_set_.ex_mask_.clr_bit (handle); } // Kobi: we need to remove that handle from the // dispatch set as well. We use that function with all the relevant // masks - rd/wr/ex - all the mask. it is completely suspended this->clear_dispatch_mask (handle, ACE_Event_Handler::RWE_MASK); return 0; }
3.5. handler恢复
3.5.1. resume_handler 函数
ace/Select_Reactor_T.cpp resume_handler 函数
template <class ACE_SELECT_REACTOR_TOKEN> int ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::resume_handler (ACE_HANDLE handle) { ACE_TRACE ("ACE_Select_Reactor_T::resume_handler"); ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_TOKEN, ace_mon, this->token_, -1)); return this->resume_i (handle); }
3.5.2. resume_handler_i 函数
ace/Select_Reactor_T.cpp resume_handler_i 函数
template <class ACE_SELECT_REACTOR_TOKEN> int ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::resume_i (ACE_HANDLE handle) { ACE_TRACE ("ACE_Select_Reactor_T::resume_i"); if (this->handler_rep_.find (handle) == 0) return -1; if (this->suspend_set_.rd_mask_.is_set (handle)) { this->wait_set_.rd_mask_.set_bit (handle); this->suspend_set_.rd_mask_.clr_bit (handle); } if (this->suspend_set_.wr_mask_.is_set (handle)) { this->wait_set_.wr_mask_.set_bit (handle); this->suspend_set_.wr_mask_.clr_bit (handle); } if (this->suspend_set_.ex_mask_.is_set (handle)) { this->wait_set_.ex_mask_.set_bit (handle); this->suspend_set_.ex_mask_.clr_bit (handle); } return 0; }