一个利用条件变量写的消息队列,基于双缓冲的,虽然相比三缓冲的差距不小,但是还是值得拿来学习一下条件变量。
/*
* BufQueue.h
*
* Created on: May 30, 2013
* Author: archy_yu
*/
#ifndef BUFQUEUE_H_
#define BUFQUEUE_H_
#include <list>
#include <pthread.h>
//#include "CommonStruct.h"
typedef void* CommonItem;
#define BATPROCESS_NUM 5
class BufQueue
{
public:
BufQueue();
virtual ~BufQueue();
int peek(CommonItem &item);
int append(CommonItem item);
std::list<CommonItem>* serial_read(std::list<CommonItem>* inlist);
int set_read_event();
void WaitReadEventByTimeOut(bool& isReadable);
private:
std::list<CommonItem>* _write_list;
std::list<CommonItem>* _read_list;
pthread_mutex_t _write_mutex;
pthread_mutex_t _read_mutex;
pthread_cond_t _read_cond;
};
#endif /* BUFQUEUE_H_ */
/*
* BufQueue.cpp
*
* Created on: May 30, 2013
* Author: archy_yu
*/
#include "BufQueue.h"
#include <sys/time.h>
#include <time.h>
void maketimeout(struct timespec* tsp,long ns = 1)
{
struct timeval now;
//get the current time
gettimeofday(&now,0);
tsp->tv_sec = now.tv_sec;
tsp->tv_nsec = now.tv_usec * 1000; //usec to nsec
tsp->tv_nsec += 1000*ns;
}
CommonItem PopMsgToPutFromList( std::list<CommonItem>* pList )
{
if(pList == NULL)
{
return NULL;
}
if(pList->empty())
{
return NULL;
}
else
{
CommonItem item = pList->front();
pList->pop_front();
return item;
}
}
BufQueue::BufQueue()
{
pthread_mutex_init(&this->_write_mutex,NULL);
pthread_mutex_init(&this->_read_mutex,NULL);
pthread_cond_init(&this->_read_cond,NULL);
this->_read_list = new std::list<CommonItem>();
this->_write_list = new std::list<CommonItem>();
}
BufQueue::~BufQueue()
{
pthread_mutex_destroy(&this->_write_mutex);
pthread_mutex_destroy(&this->_read_mutex);
pthread_cond_destroy(&this->_read_cond);
this->_read_list->clear();
this->_write_list->clear();
}
int BufQueue::peek(CommonItem &item)
{
pthread_mutex_lock(&this->_write_mutex);
item = PopMsgToPutFromList(this->_read_list);
pthread_mutex_unlock(&this->_write_mutex);
return 0;
}
int BufQueue::append(CommonItem item)
{
if(item == NULL)
{
return 0;
}
bool issetread = false;
pthread_mutex_lock(&this->_write_mutex);
this->_write_list->push_back(item);
issetread = (this->_write_list->size() > BATPROCESS_NUM);
pthread_mutex_unlock(&this->_write_mutex);
if(issetread)
{
this->set_read_event();
}
return 0;
}
std::list<CommonItem>* BufQueue::serial_read(std::list<CommonItem>* inQueue)
{
pthread_mutex_lock(&this->_write_mutex);
std::list<CommonItem>* tmplist = this->_write_list;
this->_write_list = this->_read_list;
this->_read_list = tmplist;
tmplist = this->_read_list;
this->_read_list = inQueue;
pthread_mutex_unlock(&this->_write_mutex);
return tmplist;
}
int BufQueue::set_read_event()
{
pthread_mutex_lock(&this->_read_mutex);
pthread_cond_signal(&this->_read_cond);
pthread_mutex_unlock(&this->_read_mutex);
return 0;
}
void BufQueue::WaitReadEventByTimeOut(bool& isReadable)
{
pthread_mutex_lock(&this->_read_mutex);
struct timespec ts;
maketimeout(&ts,0);
isReadable = (0 == pthread_cond_timedwait(&this->_read_cond,&this->_read_mutex, &ts));
pthread_mutex_unlock(&this->_read_mutex);
}
给出测试代码和用法
BufQueue _queue;
void* process(void* arg)
{
int i=0;
while(true)
{
int *j = new int();
*j = i;
_queue.append((void *)j);
i ++;
}
return NULL;
}
int main(int argc,char* argv[])
{
pthread_t pid;
pthread_create(&pid,0,process,0);
long long int start = TimeKit::get_tick();
while(true)
{
list<void *>* queue_to_read = new list<void *>();
bool read = false;
_queue.wait_readevent_by_timeout(read);
if(read)
{
queue_to_read = _queue.serial_read(queue_to_read);
list<void *>::iterator iter;
for(iter = queue_to_read->begin();iter != queue_to_read->end();iter ++)
{
int* j = (int*)(*iter);
if(100000 <= (*j))
{
long long int end = TimeKit::get_tick();
printf("%ld",(end - start));
return 0;
}
printf("%d\n",(*j));
}
}
}
/*
_recv_net_msg_queue = new DuplexList();
_send_net_msg_queue = new DuplexList();
InputMonitor _recv_thread(_recv_net_msg_queue);
OutPutMonitor _send_thread(_send_net_msg_queue);
_recv_thread.open("192.168.9.221",6000);
_recv_thread.start();
_send_thread.start();
int count = 0;
while(true)
{
MessageBlock* mb = NULL;
if(_recv_net_msg_queue->peek((CommonItem &)mb) == 0)
{
if(count % 1000 == 0)
{
// printf("process %d msg\n",count);
}
mb->msg_type(NORMAL_MSG_TYPE);
_send_net_msg_queue->append(mb);
count ++;
}
else
{
usleep(2);
}
}
*/
return 0;
}
有兴趣的可以测试下,有什么问题可以联系我!