消息队列

简介:

一个利用条件变量写的消息队列,基于双缓冲的,虽然相比三缓冲的差距不小,但是还是值得拿来学习一下条件变量。


/*
 * BufQueue.h
 *
 *  Created on: May 30, 2013
 *      Author: archy_yu
 */

#ifndef BUFQUEUE_H_
#define BUFQUEUE_H_

#include <list>
#include <pthread.h>
//#include "CommonStruct.h"

typedef  void* CommonItem;

#define BATPROCESS_NUM 5

class BufQueue
{
public:

    BufQueue();

    virtual ~BufQueue();

    int peek(CommonItem &item);

    int append(CommonItem item);

    std::list<CommonItem>* serial_read(std::list<CommonItem>* inlist);

    int set_read_event();

    void WaitReadEventByTimeOut(bool& isReadable);

private:

    std::list<CommonItem>* _write_list;

    std::list<CommonItem>* _read_list;

    pthread_mutex_t _write_mutex;

    pthread_mutex_t _read_mutex;

    pthread_cond_t _read_cond;

};

#endif /* BUFQUEUE_H_ */


/*
 * BufQueue.cpp
 *
 *  Created on: May 30, 2013
 *      Author: archy_yu
 */

#include "BufQueue.h"
#include <sys/time.h>
#include <time.h>

void maketimeout(struct timespec* tsp,long ns = 1)
{
    struct timeval now;

    //get the current time
    gettimeofday(&now,0);
    tsp->tv_sec = now.tv_sec;
    tsp->tv_nsec = now.tv_usec * 1000;  //usec to nsec
    tsp->tv_nsec += 1000*ns;

}

CommonItem PopMsgToPutFromList( std::list<CommonItem>* pList )
{
    if(pList == NULL)
    {
        return NULL;
    }

    if(pList->empty())
    {
        return NULL;
    }
    else
    {
        CommonItem item = pList->front();
        pList->pop_front();
        return item;
    }

}


BufQueue::BufQueue()
{
    pthread_mutex_init(&this->_write_mutex,NULL);
    pthread_mutex_init(&this->_read_mutex,NULL);
    pthread_cond_init(&this->_read_cond,NULL);

    this->_read_list = new std::list<CommonItem>();
    this->_write_list = new std::list<CommonItem>();

}

BufQueue::~BufQueue()
{
    pthread_mutex_destroy(&this->_write_mutex);
    pthread_mutex_destroy(&this->_read_mutex);
    pthread_cond_destroy(&this->_read_cond);

    this->_read_list->clear();
    this->_write_list->clear();

}

int BufQueue::peek(CommonItem &item)
{
    pthread_mutex_lock(&this->_write_mutex);
    item = PopMsgToPutFromList(this->_read_list);
    pthread_mutex_unlock(&this->_write_mutex);

    return 0;
}

int BufQueue::append(CommonItem item)
{
    if(item == NULL)
    {
        return 0;
    }

    bool issetread = false;

    pthread_mutex_lock(&this->_write_mutex);
    this->_write_list->push_back(item);
    issetread = (this->_write_list->size() > BATPROCESS_NUM);
    pthread_mutex_unlock(&this->_write_mutex);

    if(issetread)
    {
        this->set_read_event();
    }

    return 0;
}

std::list<CommonItem>* BufQueue::serial_read(std::list<CommonItem>* inQueue)
{
    pthread_mutex_lock(&this->_write_mutex);

    std::list<CommonItem>* tmplist = this->_write_list;
    this->_write_list = this->_read_list;
    this->_read_list = tmplist;

    tmplist = this->_read_list;
    this->_read_list = inQueue;

    pthread_mutex_unlock(&this->_write_mutex);
    return tmplist;
}

int BufQueue::set_read_event()
{
    pthread_mutex_lock(&this->_read_mutex);
    pthread_cond_signal(&this->_read_cond);
    pthread_mutex_unlock(&this->_read_mutex);
    return 0;
}

void BufQueue::WaitReadEventByTimeOut(bool& isReadable)
{
    pthread_mutex_lock(&this->_read_mutex);
    struct timespec ts;
    maketimeout(&ts,0);
    isReadable = (0 == pthread_cond_timedwait(&this->_read_cond,&this->_read_mutex, &ts));
    pthread_mutex_unlock(&this->_read_mutex);
}

 给出测试代码和用法


BufQueue _queue;

void* process(void* arg)
{

    int i=0;
    while(true)
    {
        int *j = new int();
        *j = i;
        _queue.append((void *)j);
        i ++;
    }
    return NULL;
}


int main(int argc,char* argv[])
{
    pthread_t pid;
    pthread_create(&pid,0,process,0);

    long long int start = TimeKit::get_tick();

    while(true)
    {

        list<void *>* queue_to_read = new list<void *>();

        bool read = false;
        _queue.wait_readevent_by_timeout(read);

        if(read)
        {
            queue_to_read = _queue.serial_read(queue_to_read);

            list<void *>::iterator iter;
            for(iter = queue_to_read->begin();iter != queue_to_read->end();iter ++)
            {
                int* j = (int*)(*iter);

                if(100000 <= (*j))
                {
                    long long int end = TimeKit::get_tick();
                    printf("%ld",(end - start));
                    return 0;
                }
                printf("%d\n",(*j));
            }

        }

    }


    /*
    _recv_net_msg_queue = new DuplexList();
    _send_net_msg_queue = new DuplexList();

    InputMonitor _recv_thread(_recv_net_msg_queue);
    OutPutMonitor _send_thread(_send_net_msg_queue);

    _recv_thread.open("192.168.9.221",6000);

    _recv_thread.start();
    _send_thread.start();

    int count = 0;

    while(true)
    {
        MessageBlock* mb = NULL;
        if(_recv_net_msg_queue->peek((CommonItem &)mb) == 0)
        {
            if(count % 1000 == 0)
            {
//                printf("process %d msg\n",count);
            }
            mb->msg_type(NORMAL_MSG_TYPE);
            _send_net_msg_queue->append(mb);
            count ++;
        }
        else
        {
            usleep(2);
        }
    }
*/
    return 0;
}

有兴趣的可以测试下,有什么问题可以联系我!


相关文章
|
7月前
|
消息中间件 微服务
消息队列的适用场景
消息队列的适用场景
81 0
|
7月前
|
消息中间件 负载均衡 数据挖掘
以12306讲解为什么要使用消息队列
以12306讲解为什么要使用消息队列
82 0
|
消息中间件 前端开发 大数据
一、消息队列
一、消息队列
|
消息中间件 数据库
|
消息中间件 Java 数据库
消息队列(五)
消息队列(五)
169 0
消息队列(五)
|
消息中间件 存储 缓存
消息队列(三)
消息队列(三)
128 0
消息队列(三)
|
消息中间件 存储 缓存
消息队列(六)
消息队列(六)
243 0
消息队列(六)
|
消息中间件 存储 中间件
消息队列(四)
消息队列(四)
207 0
消息队列(四)
|
消息中间件 网络协议
消息队列(二)
消息队列(二)
154 0
消息队列(二)
|
消息中间件 SQL 关系型数据库
消息队列
消息队列
239 0