消息队列

简介:

一个利用条件变量写的消息队列,基于双缓冲的,虽然相比三缓冲的差距不小,但是还是值得拿来学习一下条件变量。


/*
 * BufQueue.h
 *
 *  Created on: May 30, 2013
 *      Author: archy_yu
 */

#ifndef BUFQUEUE_H_
#define BUFQUEUE_H_

#include <list>
#include <pthread.h>
//#include "CommonStruct.h"

typedef  void* CommonItem;

#define BATPROCESS_NUM 5

class BufQueue
{
public:

    BufQueue();

    virtual ~BufQueue();

    int peek(CommonItem &item);

    int append(CommonItem item);

    std::list<CommonItem>* serial_read(std::list<CommonItem>* inlist);

    int set_read_event();

    void WaitReadEventByTimeOut(bool& isReadable);

private:

    std::list<CommonItem>* _write_list;

    std::list<CommonItem>* _read_list;

    pthread_mutex_t _write_mutex;

    pthread_mutex_t _read_mutex;

    pthread_cond_t _read_cond;

};

#endif /* BUFQUEUE_H_ */


/*
 * BufQueue.cpp
 *
 *  Created on: May 30, 2013
 *      Author: archy_yu
 */

#include "BufQueue.h"
#include <sys/time.h>
#include <time.h>

void maketimeout(struct timespec* tsp,long ns = 1)
{
    struct timeval now;

    //get the current time
    gettimeofday(&now,0);
    tsp->tv_sec = now.tv_sec;
    tsp->tv_nsec = now.tv_usec * 1000;  //usec to nsec
    tsp->tv_nsec += 1000*ns;

}

CommonItem PopMsgToPutFromList( std::list<CommonItem>* pList )
{
    if(pList == NULL)
    {
        return NULL;
    }

    if(pList->empty())
    {
        return NULL;
    }
    else
    {
        CommonItem item = pList->front();
        pList->pop_front();
        return item;
    }

}


BufQueue::BufQueue()
{
    pthread_mutex_init(&this->_write_mutex,NULL);
    pthread_mutex_init(&this->_read_mutex,NULL);
    pthread_cond_init(&this->_read_cond,NULL);

    this->_read_list = new std::list<CommonItem>();
    this->_write_list = new std::list<CommonItem>();

}

BufQueue::~BufQueue()
{
    pthread_mutex_destroy(&this->_write_mutex);
    pthread_mutex_destroy(&this->_read_mutex);
    pthread_cond_destroy(&this->_read_cond);

    this->_read_list->clear();
    this->_write_list->clear();

}

int BufQueue::peek(CommonItem &item)
{
    pthread_mutex_lock(&this->_write_mutex);
    item = PopMsgToPutFromList(this->_read_list);
    pthread_mutex_unlock(&this->_write_mutex);

    return 0;
}

int BufQueue::append(CommonItem item)
{
    if(item == NULL)
    {
        return 0;
    }

    bool issetread = false;

    pthread_mutex_lock(&this->_write_mutex);
    this->_write_list->push_back(item);
    issetread = (this->_write_list->size() > BATPROCESS_NUM);
    pthread_mutex_unlock(&this->_write_mutex);

    if(issetread)
    {
        this->set_read_event();
    }

    return 0;
}

std::list<CommonItem>* BufQueue::serial_read(std::list<CommonItem>* inQueue)
{
    pthread_mutex_lock(&this->_write_mutex);

    std::list<CommonItem>* tmplist = this->_write_list;
    this->_write_list = this->_read_list;
    this->_read_list = tmplist;

    tmplist = this->_read_list;
    this->_read_list = inQueue;

    pthread_mutex_unlock(&this->_write_mutex);
    return tmplist;
}

int BufQueue::set_read_event()
{
    pthread_mutex_lock(&this->_read_mutex);
    pthread_cond_signal(&this->_read_cond);
    pthread_mutex_unlock(&this->_read_mutex);
    return 0;
}

void BufQueue::WaitReadEventByTimeOut(bool& isReadable)
{
    pthread_mutex_lock(&this->_read_mutex);
    struct timespec ts;
    maketimeout(&ts,0);
    isReadable = (0 == pthread_cond_timedwait(&this->_read_cond,&this->_read_mutex, &ts));
    pthread_mutex_unlock(&this->_read_mutex);
}

 给出测试代码和用法


BufQueue _queue;

void* process(void* arg)
{

    int i=0;
    while(true)
    {
        int *j = new int();
        *j = i;
        _queue.append((void *)j);
        i ++;
    }
    return NULL;
}


int main(int argc,char* argv[])
{
    pthread_t pid;
    pthread_create(&pid,0,process,0);

    long long int start = TimeKit::get_tick();

    while(true)
    {

        list<void *>* queue_to_read = new list<void *>();

        bool read = false;
        _queue.wait_readevent_by_timeout(read);

        if(read)
        {
            queue_to_read = _queue.serial_read(queue_to_read);

            list<void *>::iterator iter;
            for(iter = queue_to_read->begin();iter != queue_to_read->end();iter ++)
            {
                int* j = (int*)(*iter);

                if(100000 <= (*j))
                {
                    long long int end = TimeKit::get_tick();
                    printf("%ld",(end - start));
                    return 0;
                }
                printf("%d\n",(*j));
            }

        }

    }


    /*
    _recv_net_msg_queue = new DuplexList();
    _send_net_msg_queue = new DuplexList();

    InputMonitor _recv_thread(_recv_net_msg_queue);
    OutPutMonitor _send_thread(_send_net_msg_queue);

    _recv_thread.open("192.168.9.221",6000);

    _recv_thread.start();
    _send_thread.start();

    int count = 0;

    while(true)
    {
        MessageBlock* mb = NULL;
        if(_recv_net_msg_queue->peek((CommonItem &)mb) == 0)
        {
            if(count % 1000 == 0)
            {
//                printf("process %d msg\n",count);
            }
            mb->msg_type(NORMAL_MSG_TYPE);
            _send_net_msg_queue->append(mb);
            count ++;
        }
        else
        {
            usleep(2);
        }
    }
*/
    return 0;
}

有兴趣的可以测试下,有什么问题可以联系我!


相关文章
|
JavaScript 小程序
js数组去重的10种有效方法 vue 数组去重
js数组去重的10种有效方法 vue 数组去重
286 1
|
9月前
|
存储 编解码 搜索推荐
文生图架构设计原来如此简单之社区与共享机制
工作流共享是文生图应用社区建设的核心功能,它使用户能够分享创作经验和技巧,形成知识共享生态。工作流序列化与存储设计需要解决复杂工作流的高效表示问题。
220 10
|
存储 人工智能 大数据
TDengine 用户大会精彩回顾:AI+数据驱动汽车、能源、烟草、电力应用的未来
TDengine用户大会在京成功举办,聚焦“时序数据助你决胜AI时代”。涛思数据创始人陶建辉携手中科院院士王怀民等业界领袖,探讨时序数据最新进展及AI技术应用。会上发布了《时序大数据平台-TDengine核心原理与实战》一书,为企业与开发者提供宝贵指南。自2019年开源以来,TDengine已拥有57万用户实例,Star数达23.1k。王怀民赞赏TDengine全面创新,立足全球市场。大会还涉及数据库智能化运维、能源行业数字化转型等议题,并设有三大专场,深入讨论海量数据应用、智能制造新能源及新型电力系统,展示了TDengine在各领域的应用潜力与技术革新。
381 0
TDengine 用户大会精彩回顾:AI+数据驱动汽车、能源、烟草、电力应用的未来
|
运维 前端开发 C#
一套以用户体验出发的.NET8 Web开源框架
一套以用户体验出发的.NET8 Web开源框架
336 7
一套以用户体验出发的.NET8 Web开源框架
|
SQL 运维 监控
南大通用GBase 8a MPP Cluster Linux端SQL进程监控工具
南大通用GBase 8a MPP Cluster Linux端SQL进程监控工具
|
存储 前端开发 JavaScript
前端中对象的深度应用与最佳实践
前端对象应用涉及在网页开发中使用JavaScript等技术创建和操作对象,以实现动态交互效果。通过定义属性和方法,对象可以封装数据和功能,提升代码的组织性和复用性,是现代Web开发的核心技术之一。
|
域名解析 运维 负载均衡
LVS+Keepalived 负载均衡(二)28-1
【8月更文挑战第28天】LVS+Keepalived 负载均衡 配置 LVS VIP
275 5
|
算法 数据挖掘 数据处理
第三届 Data-Juicer 数据挑战赛:ModelScope-Sora“数据导演”创意竞速
欢迎进入第三届 Data-Juicer 数据挑战赛:“ModelScope-Sora 文生视频大模型数据挑战赛”。
|
SQL 消息中间件 JSON
flink kafka connector源码解读(超详细)
为了掌握Flink自定义Connector,本文直接从源码出发,研究Flink的kafka connector是如何实现的?
1621 0
flink kafka connector源码解读(超详细)