消息队列

简介:

一个利用条件变量写的消息队列,基于双缓冲的,虽然相比三缓冲的差距不小,但是还是值得拿来学习一下条件变量。


/*
 * BufQueue.h
 *
 *  Created on: May 30, 2013
 *      Author: archy_yu
 */

#ifndef BUFQUEUE_H_
#define BUFQUEUE_H_

#include <list>
#include <pthread.h>
//#include "CommonStruct.h"

typedef  void* CommonItem;

#define BATPROCESS_NUM 5

class BufQueue
{
public:

    BufQueue();

    virtual ~BufQueue();

    int peek(CommonItem &item);

    int append(CommonItem item);

    std::list<CommonItem>* serial_read(std::list<CommonItem>* inlist);

    int set_read_event();

    void WaitReadEventByTimeOut(bool& isReadable);

private:

    std::list<CommonItem>* _write_list;

    std::list<CommonItem>* _read_list;

    pthread_mutex_t _write_mutex;

    pthread_mutex_t _read_mutex;

    pthread_cond_t _read_cond;

};

#endif /* BUFQUEUE_H_ */


/*
 * BufQueue.cpp
 *
 *  Created on: May 30, 2013
 *      Author: archy_yu
 */

#include "BufQueue.h"
#include <sys/time.h>
#include <time.h>

void maketimeout(struct timespec* tsp,long ns = 1)
{
    struct timeval now;

    //get the current time
    gettimeofday(&now,0);
    tsp->tv_sec = now.tv_sec;
    tsp->tv_nsec = now.tv_usec * 1000;  //usec to nsec
    tsp->tv_nsec += 1000*ns;

}

CommonItem PopMsgToPutFromList( std::list<CommonItem>* pList )
{
    if(pList == NULL)
    {
        return NULL;
    }

    if(pList->empty())
    {
        return NULL;
    }
    else
    {
        CommonItem item = pList->front();
        pList->pop_front();
        return item;
    }

}


BufQueue::BufQueue()
{
    pthread_mutex_init(&this->_write_mutex,NULL);
    pthread_mutex_init(&this->_read_mutex,NULL);
    pthread_cond_init(&this->_read_cond,NULL);

    this->_read_list = new std::list<CommonItem>();
    this->_write_list = new std::list<CommonItem>();

}

BufQueue::~BufQueue()
{
    pthread_mutex_destroy(&this->_write_mutex);
    pthread_mutex_destroy(&this->_read_mutex);
    pthread_cond_destroy(&this->_read_cond);

    this->_read_list->clear();
    this->_write_list->clear();

}

int BufQueue::peek(CommonItem &item)
{
    pthread_mutex_lock(&this->_write_mutex);
    item = PopMsgToPutFromList(this->_read_list);
    pthread_mutex_unlock(&this->_write_mutex);

    return 0;
}

int BufQueue::append(CommonItem item)
{
    if(item == NULL)
    {
        return 0;
    }

    bool issetread = false;

    pthread_mutex_lock(&this->_write_mutex);
    this->_write_list->push_back(item);
    issetread = (this->_write_list->size() > BATPROCESS_NUM);
    pthread_mutex_unlock(&this->_write_mutex);

    if(issetread)
    {
        this->set_read_event();
    }

    return 0;
}

std::list<CommonItem>* BufQueue::serial_read(std::list<CommonItem>* inQueue)
{
    pthread_mutex_lock(&this->_write_mutex);

    std::list<CommonItem>* tmplist = this->_write_list;
    this->_write_list = this->_read_list;
    this->_read_list = tmplist;

    tmplist = this->_read_list;
    this->_read_list = inQueue;

    pthread_mutex_unlock(&this->_write_mutex);
    return tmplist;
}

int BufQueue::set_read_event()
{
    pthread_mutex_lock(&this->_read_mutex);
    pthread_cond_signal(&this->_read_cond);
    pthread_mutex_unlock(&this->_read_mutex);
    return 0;
}

void BufQueue::WaitReadEventByTimeOut(bool& isReadable)
{
    pthread_mutex_lock(&this->_read_mutex);
    struct timespec ts;
    maketimeout(&ts,0);
    isReadable = (0 == pthread_cond_timedwait(&this->_read_cond,&this->_read_mutex, &ts));
    pthread_mutex_unlock(&this->_read_mutex);
}

 给出测试代码和用法


BufQueue _queue;

void* process(void* arg)
{

    int i=0;
    while(true)
    {
        int *j = new int();
        *j = i;
        _queue.append((void *)j);
        i ++;
    }
    return NULL;
}


int main(int argc,char* argv[])
{
    pthread_t pid;
    pthread_create(&pid,0,process,0);

    long long int start = TimeKit::get_tick();

    while(true)
    {

        list<void *>* queue_to_read = new list<void *>();

        bool read = false;
        _queue.wait_readevent_by_timeout(read);

        if(read)
        {
            queue_to_read = _queue.serial_read(queue_to_read);

            list<void *>::iterator iter;
            for(iter = queue_to_read->begin();iter != queue_to_read->end();iter ++)
            {
                int* j = (int*)(*iter);

                if(100000 <= (*j))
                {
                    long long int end = TimeKit::get_tick();
                    printf("%ld",(end - start));
                    return 0;
                }
                printf("%d\n",(*j));
            }

        }

    }


    /*
    _recv_net_msg_queue = new DuplexList();
    _send_net_msg_queue = new DuplexList();

    InputMonitor _recv_thread(_recv_net_msg_queue);
    OutPutMonitor _send_thread(_send_net_msg_queue);

    _recv_thread.open("192.168.9.221",6000);

    _recv_thread.start();
    _send_thread.start();

    int count = 0;

    while(true)
    {
        MessageBlock* mb = NULL;
        if(_recv_net_msg_queue->peek((CommonItem &)mb) == 0)
        {
            if(count % 1000 == 0)
            {
//                printf("process %d msg\n",count);
            }
            mb->msg_type(NORMAL_MSG_TYPE);
            _send_net_msg_queue->append(mb);
            count ++;
        }
        else
        {
            usleep(2);
        }
    }
*/
    return 0;
}

有兴趣的可以测试下,有什么问题可以联系我!


相关文章
|
10天前
|
人工智能 开发工具 iOS开发
Claude Code 新手完全上手指南:安装、国产模型配置与常用命令全解
Claude Code 是一款运行在终端环境中的 AI 编程助手,能够直接在命令行中完成代码生成、项目分析、文件修改、命令执行、Git 管理等开发全流程工作。它最大的特点是**任务驱动、终端原生、轻量高效、多模型兼容**,无需图形界面、不依赖 IDE 插件,能够深度融入开发者日常工作流。
3214 9
|
13天前
|
Shell API 开发工具
Claude Code 快速上手指南(新手友好版)
AI编程工具卷疯啦!Claude Code凭借任务驱动+终端原生的特性,成了开发者的效率搭子。本文从安装、登录、切换国产模型到常用命令,手把手带新手快速上手,全程避坑,30分钟独立用起来。
3255 22
|
2天前
|
人工智能 自然语言处理 文字识别
阿里云百炼Qwen3.7-Max简介:能力、优势、支持订阅计划参考
Qwen3.7-Max是阿里云百炼面向智能体时代推出的新一代旗舰模型,对标GPT-5.5、Claude Opus 4.7等闭源旗舰。该模型支持百万级token上下文窗口,具备顶级推理能力、多模态搜索与视觉理解增强、流式输出低延迟响应等核心优势,覆盖编程、办公、长周期自主执行等复杂场景。同时支持OpenAI接口兼容,便于系统快速迁移。用户可通过Token Plan团队版、Coding Plan或节省计划等订阅方式灵活调用,适合企业级高要求场景使用。
|
6天前
|
人工智能 Linux BI
国内用 Claude Code 终于不用翻墙了:一行命令搞定,自动接 DeepSeek
JeecgBoot AI专题研究 一键脚本:Claude Code + JeecgBoot Skills + DeepSeek 全平台接入 一行命令装好 Claude Code + JeecgBoot Skills + DeepSeek 接入,无需翻墙使用 Claude Code,支持 Wind
2260 4
国内用 Claude Code 终于不用翻墙了:一行命令搞定,自动接 DeepSeek
|
25天前
|
人工智能 JSON 供应链
畅用7个月无影 JVS Claw |手把手教你把JVS改造成「科研与产业地理情报可视化大师」
LucianaiB分享零成本畅用JVS Claw教程(学生认证享7个月使用权),并开源GeoMind项目——将JVS改造为科研与产业地理情报可视化AI助手,支持飞书文档解析、地理编码与腾讯地图可视化,助力产业关系图谱构建。
23595 15
畅用7个月无影 JVS Claw |手把手教你把JVS改造成「科研与产业地理情报可视化大师」
|
12天前
|
人工智能 JSON BI
DeepSeek V4-Pro 接入 Claude Code 完全实战:体验、测试与关键避坑指南
Claude Code 作为当前主流的 AI 编程辅助工具,凭借强大的代码理解、工程执行与自动化能力深受开发者喜爱,但原生模型的使用成本相对较高。为了在保持能力的同时进一步降低开销,不少开发者开始寻找兼容度高、价格更友好的替代模型。DeepSeek V4 系列的发布带来了新的选择,该系列包含 V4-Pro 与 V4-Flash 两款模型,并提供了与 Anthropic 完全兼容的 API 接口,理论上只需简单修改配置,即可让 Claude Code 无缝切换为 DeepSeek 引擎。
2747 3
|
4天前
|
人工智能 自然语言处理 安全
Claude Code 全攻略:命令大全+三种模式+记忆体系+实战工作流完整手册
Claude Code 是当前最流行的终端级 AI 编程助手,能够直接在命令行中完成代码生成、项目理解、文件修改、命令执行、错误修复等全流程开发工作。它不依赖图形界面、不占用额外资源,却能深度理解项目结构,自动生成规范代码,大幅提升研发效率。
828 2
|
11天前
|
人工智能 安全 开发工具
Claude Code 官方工作原理与使用指南
Claude Code 不是传统代码补全工具,而是 Anthropic 推出的终端 AI 代理,具备代理循环、双驱动架构(模型+工具)、全局项目感知、6 种权限模式等核心能力,本文基于官方文档系统解析其工作原理与高效使用技巧。
1502 0