mooon-agent接收状态机代码摘要-阿里云开发者社区

开发者社区> 一见蓝天> 正文

mooon-agent接收状态机代码摘要

简介: recv_machine.h 点击(此处)折叠或打开 #ifndef MOOON_AGENT_RECV_MACHINE_H #define MOOON_AGENT_RECV_MACHINE_H #include agent/message.
+关注继续查看
  • recv_machine.h

点击(此处)折叠或打开

  1. #ifndef MOOON_AGENT_RECV_MACHINE_H
  2. #define MOOON_AGENT_RECV_MACHINE_H
  3. #include agent/message.h>
  4. AGENT_NAMESPACE_BEGIN

  5. class CAgentThread;
  6. class CRecvMachine
  7. {
  8. private:
  9.     /***
  10.       * 接收状态值
  11.       */
  12.     typedef enum recv_state_t
  13.     {
  14.         rs_header, /** 接收消息头状态 */
  15.         rs_body /** 接收消息体状态 */
  16.     }TRecvState;

  17.     /***
  18.       * 接收状态上下文
  19.       */
  20.     struct RecvStateContext
  21.     {
  22.         const char* buffer; /** 当前的数据buffer */
  23.         size_t buffer_size; /** 当前的数据字节数 */
  24.         
  25.         RecvStateContext(const char* buf=NULL, size_t buf_size=0)
  26.          :buffer(buf)
  27.          ,buffer_size(buf_size)
  28.         {
  29.         }
  30.         
  31.         RecvStateContext(const RecvStateContext& other)
  32.          :buffer(other.buffer)
  33.          ,buffer_size(other.buffer_size)
  34.         {
  35.         }
  36.         
  37.         RecvStateContext& operator =(const RecvStateContext& other)
  38.         {
  39.             buffer = other.buffer;
  40.             buffer_size = other.buffer_size;
  41.             return *this;
  42.         }
  43.     };
  44.     
  45. public:
  46.     CRecvMachine(CAgentThread* thread);
  47.     util::handle_result_t work(const char* buffer, size_t buffer_size);
  48.     void reset();
  49.     
  50. private:
  51.     void set_next_state(recv_state_t next_state)
  52.     {
  53.         _recv_state = next_state;
  54.         _finished_size = 0;
  55.     }
  56.     
  57.     util::handle_result_t handle_header(const RecvStateContext& cur_ctx, RecvStateContext* next_ctx);
  58.     util::handle_result_t handle_body(const RecvStateContext& cur_ctx, RecvStateContext* next_ctx);
  59.     util::handle_result_t handle_error(const RecvStateContext& cur_ctx, RecvStateContext* next_ctx);
  60.        
  61. private:
  62.     CAgentThread* _thread; /** 需要通过CAgentThread取得CProcessorManager */
  63.     agent_message_header_t _header; /** 消息头,这个大小是固定的 */
  64.     recv_state_t _recv_state; /** 当前的接收状态 */
  65.     size_t _finished_size; /** 当前状态已经接收到的字节数,注意不是总的已经接收到的字节数,只针对当前状态 */
  66. };

  67. AGENT_NAMESPACE_END
  68. #endif // MOOON_AGENT_RECV_MACHINE_H


  • recv_machine.cpp

点击(此处)折叠或打开

  1. #include "recv_machine.h"
  2. #include "agent_thread.h"
  3. AGENT_NAMESPACE_BEGIN

  4. CRecvMachine::CRecvMachine(CAgentThread* thread)
  5.  :_thread(thread)
  6. {
  7.     set_next_state(rs_header);
  8. }

  9. // 状态机入口函数
  10. // 状态机工作原理:-> rs_header -> rs_body -> rs_header
  11. // -> rs_header -> rs_error -> rs_header
  12. // -> rs_header -> rs_body -> rs_error -> rs_header
  13. // 参数说明:
  14. // buffer - 本次收到的数据,注意不是总的
  15. // buffer_size - 本次收到的数据字节数
  16. util::handle_result_t CRecvMachine::work(const char* buffer, size_t buffer_size)
  17. {
  18.     RecvStateContext next_ctx(buffer, buffer_size);
  19.     util::handle_result_t hr = util::handle_continue;
  20.     
  21.     // 状态机循环条件为:util::handle_continue == hr
  22.     while (util::handle_continue == hr)
  23.     {
  24.         RecvStateContext cur_ctx(next_ctx);
  25.         
  26.         switch (_recv_state)
  27.         {
  28.         case rs_header:
  29.             hr = handle_header(cur_ctx, &next_ctx);
  30.             break;
  31.         case rs_body:
  32.             hr = handle_body(cur_ctx, &next_ctx);
  33.             break;
  34.         default:
  35.             hr = handle_error(cur_ctx, &next_ctx);
  36.             break;
  37.         }
  38.     }
  39.         
  40.     return hr;
  41. }

  42. void CRecvMachine::reset()
  43. {
  44.     set_next_state(rs_header);
  45. }

  46. // 处理消息头部
  47. // 参数说明:
  48. // cur_ctx - 当前上下文,
  49. // cur_ctx.buffer为当前收到的数据buffer,包含了消息头,但也可能包含了消息体。
  50. // cur_ctx.buffer_size为当前收到字节数
  51. // next_ctx - 下一步上下文,
  52. // 由于cur_ctx.buffer可能包含了消息体,所以在一次接收receive动作后,
  53. // 会涉及到消息头和消息体两个状态,这里的next_ctx实际为下一步handle_body的cur_ctx
  54. util::handle_result_t CRecvMachine::handle_header(const RecvStateContext& cur_ctx, RecvStateContext* next_ctx)
  55. {
  56.     if (_finished_size + cur_ctx.buffer_size sizeof(agent_message_header_t))
  57.     {
  58.         memcpy(reinterpret_castchar*>(&_header) + _finished_size
  59.               ,cur_ctx.buffer
  60.               ,cur_ctx.buffer_size);
  61.               
  62.         _finished_size += cur_ctx.buffer_size;
  63.         return util::handle_continue;
  64.     }
  65.     else
  66.     {
  67.         size_t need_size = sizeof(agent_message_header_t) - _finished_size;
  68.         memcpy(reinterpret_castchar*>(&_header) + _finished_size
  69.               ,cur_ctx.buffer
  70.               ,need_size);
  71.               
  72.         // TODO: Check header here
  73.         
  74.         size_t remain_size = cur_ctx.buffer_size - need_size;
  75.         if (remain_size > 0)
  76.         {
  77.             next_ctx->buffer = cur_ctx.buffer + need_size;
  78.             next_ctx->buffer_size = cur_ctx.buffer_size - need_size;
  79.         }
  80.         
  81.         // 只有当包含消息体时,才需要进行状态切换,
  82.         // 否则维持rs_header状态不变
  83.         if (_header.size > 0)
  84.         {
  85.             // 切换状态
  86.             set_next_state(rs_body);
  87.         }
  88.         else
  89.         {
  90.             CProcessorManager* processor_manager = _thread->get_processor_manager();
  91.             if (!processor_manager->on_message(_header, 0, NULL, 0))
  92.             {
  93.                 return util::handle_error;
  94.             }
  95.         }
  96.                 
  97.         return (remain_size > 0)
  98.               ? util::handle_continue // 控制work过程是否继续循环
  99.               : util::handle_finish;
  100.     }
  101. }

  102. // 处理消息体
  103. // 参数说明:
  104. // cur_ctx - 当前上下文,
  105. // cur_ctx.buffer为当前收到的数据buffer,包含了消息体,但也可能包含了消息头。
  106. // cur_ctx.buffer_size为当前收到字节数
  107. // next_ctx - 下一步上下文,
  108. // 由于cur_ctx.buffer可能包含了消息头,所以在一次接收receive动作后,
  109. // 会涉及到消息头和消息体两个状态,这里的next_ctx实际为下一步handle_header的cur_ctx
  110. util::handle_result_t CRecvMachine::handle_body(const RecvStateContext& cur_ctx, RecvStateContext* next_ctx)
  111. {
  112.     CProcessorManager* processor_manager = _thread->get_processor_manager();
  113.     
  114.     if (_finished_size + cur_ctx.buffer_size _header.size)
  115.     {
  116.         if (!processor_manager->on_message(_header, _finished_size, cur_ctx.buffer, cur_ctx.buffer_size))
  117.         {
  118.             return util::handle_error;
  119.         }
  120.         
  121.         _finished_size += cur_ctx.buffer_size;
  122.         return util::handle_continue;
  123.     }
  124.     else
  125.     {
  126.         size_t need_size = _header.size - _finished_size;
  127.         if (!processor_manager->on_message(_header, _finished_size, cur_ctx.buffer, need_size))
  128.         {
  129.             return util::handle_error;
  130.         }
  131.         
  132.         // 切换状态
  133.         set_next_state(rs_header);
  134.         
  135.         size_t remain_size = cur_ctx.buffer_size - need_size;
  136.         if (remain_size > 0)
  137.         {
  138.             next_ctx->buffer = cur_ctx.buffer + need_size;
  139.             next_ctx->buffer_size = cur_ctx.buffer_size - need_size;
  140.             return util::handle_continue;
  141.         }

  142.         return util::handle_finish;
  143.     }
  144. }

  145. util::handle_result_t CRecvMachine::handle_error(const RecvStateContext& cur_ctx, RecvStateContext* next_ctx)
  146. {
  147.     //AGENT_LOG_ERROR("Network error.\n");
  148.     set_next_state(rs_header); // 无条件切换到rs_header,这个时候应当断开连接重连接
  149.     return util::handle_error;
  150. }

  151. AGENT_NAMESPACE_END

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

相关文章
+关注
一见蓝天
擅长面向对象设计、C++程序开发,在金山和华为呆过,互联网从业十多年,有分布式计算和存储系统经验。
409
文章
0
问答
文章排行榜
最热
最新
相关电子书
更多
文娱运维技术
立即下载
《SaaS模式云原生数据仓库应用场景实践》
立即下载
《看见新力量:二》电子书
立即下载