libev与多线程
libev支持多线程,也是one loop per-thread的模型。
但是要手动的启动和管理多个libev的细节。
我采用的方式是: N个libev的数据套接字线程 + 1个libev的监听套接字线程。
即,1个主线程里跑1个libev,称为main_libev。负责驱动accept socket,同时还注册1个timer用于定时统计链接相关的数据。
其他N个线程里分别各自的libev,称为data_libev,负责驱动数据的socket。
当有新连接到来时,main_libev的回调里通过enqueue操作把新的fd传给N个线程的任意一个,这个操作是线程安全的。
同时给这个线程发送一个异步事件(async_send操作),告知其可以接客了。
data_libev在async的回调里,取出新的fd,然后加入到自己的ev中,并设置相应的回调。
data_libev的封装
data_libev被封装在类中,同时提供ev_run(),receive_fd(),add_newfds()等操作。
typedef std::list<int> FDList; typedef FDList::iterator FDItr; class LibEVPerThread { public: LibEVPerThread() { m_n_fd_received = 0; m_n_fd_added = 0; pthread_mutex_init(&m_lock, NULL); m_evloop_ptr = ev_loop_new(EVBACKEND_EPOLL); } bool set_id(unsigned int id) { m_id = id; } int ev_run() { ev_async_init(&m_async, async_cb); ev_async_start(m_evloop_ptr, &m_async); ev_set_userdata(m_evloop_ptr, this); printf("begin to ev_run()\n"); ::ev_run(m_evloop_ptr, 0); printf("end to ev_run()\n"); } bool receive_fd(int fd) { printf("ID: %u ----> receive a fd:%d\n", m_id, fd); enque_fd(fd); ev_async_send(m_evloop_ptr, &m_async); } int add_newfds() { FDList tmpfds = deque_fd(); for (FDItr itr=tmpfds.begin(); itr!=tmpfds.end(); ++itr) { m_n_fd_added ++; add_fd(*itr); printf("ID: %u ----> added a fd:%d, totoal: %u\n", m_id, *itr, m_n_fd_added); } } private: int enque_fd(int fd) { pthread_mutex_lock(&m_lock); m_sockfds.push_back(fd); pthread_mutex_unlock(&m_lock); } FDList deque_fd() { FDList tmp; pthread_mutex_lock(&m_lock); tmp.swap(m_sockfds); pthread_mutex_unlock(&m_lock); return tmp; } bool add_fd(int fd) { struct ev_io *evio_ptr = new ev_io(); ev_io_init(evio_ptr, message_cb, fd, EV_READ); ev_io_start(m_evloop_ptr, evio_ptr); } private: pthread_mutex_t m_lock; FDList m_sockfds; struct ev_async m_async; struct ev_loop *m_evloop_ptr; pthread_t m_tid; unsigned int m_n_fd_added; unsigned int m_n_fd_received; unsigned int m_id; };
ev_run() 启动libev。
receive_fd() 提供给外界的接口,负责在新的fd到来后,添加到这个类中,并同时libev。
add_newfds() 提供给自己使用,负责在async的回调中,把新的fd添加到到自己的libev中。
ev_run()启动的时候只有一个async的事件,在它的回调里调用libev类的add_newfds()。
void async_cb(EV_P_ ev_async *w, int events) { EVThreadInfo *ev_thread_info = (EVThreadInfo*)ev_userdata(EV_A); ev_thread_info->add_newfds(); }
main_libev
在main()的线程里使用EV_DEFAULT,监视accept socket的事件。
struct ev_loop *main_evloop = EV_DEFAULT; int accept_fd = init_accept_sock(); ev_io accept_ev; ev_io_init (&accept_ev, on_connection_cb, accept_fd, EV_READ); ev_io_start (main_evloop, &accept_ev);
在它的回调函数on_connection_cb里,首先accept接收一个新的套接字,然后robin的方式选择一个libev类,调用libev类的receive_fd,把新的套接字发送给libev类,并通知。
void on_connection_cb(EV_P_ ev_io *w, int events) { int newfd = accept(w->fd, NULL, 0); EVThreadInfo* ev_thread_info = &g_thread_info_prt[g_robin%g_n_thread]; ev_thread_info->receive_fd(newfd); g_robin++; }
实验和待处理问题
启动服务端
[zunbao.fengzb@rt1f07470.tbc /home/zunbao.fengzb/echo_client_server_libev] $./multi_loop_server 6089 24
启动客户端
[CHECK@ump137035.sqa.cm4 ~/rds_proxy_test/proxy/echo_client_server_libev] $./multi_loop_client 10.98.109.161 6089 24 85
实验结果
in timer callback, qps 1026115 in timer callback, qps 1016615 in timer callback, qps 1025216
client每次发送1个字节。
每个数据包的大小= 1Byte载荷 + IP头部20Byte + tcp头部20Byte+tcp选项12Byte = 53Byte
同时,每个数据包都要回一个ack包,ack包大小= IP头部20Byte + tcp头部20Byte = 40Byte