开发者社区 > 云原生 > 消息队列 > 正文

System V消息队列与posix消息队列测试

我已经测试了两种情况:

系统V消息队列:发送1,000,000条消息“ hello”,msgsend()并从另一个进程获取它们msgget() posix消息队列:使用发送1,000,000条消息“ hello”,mq_send()并使用mq_receive() 然后在两种情况下,我都已计算出将消息发送到队列的CPU处理时间。

系统V mq CPU推送时间:0.8(秒)

posix mq CPU推送时间:1.4(秒)(!)

系统V的代码:

#include <stdio.h>
#include <string.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <time.h>

#define EXPERIMENT_COUNT 100

clock_t start, end;
double cpu_time_useda_avg = 0;
double cpu_time_used;

struct mesg_buffer { 
    long mesg_type; 
    char mesg_text[100]; 
} message; 

int run_sending_app()
{
    key_t key; 
    int msgid; 

    // ftok to generate unique key 
    key = ftok("/var/tmp/progfile", 65); 

    // msgget creates a message queue 
    // and returns identifier 
    msgid = msgget(key, 0666 | IPC_CREAT); 
    message.mesg_type = 1; 

    // printf("Write Data : "); 
    // fgets(message.mesg_text, sizeof(message.mesg_text), stdin);

    strcpy(message.mesg_text, "hello");
    int e,i;

    for( e = 0; e < EXPERIMENT_COUNT; e++ )
    {
        start = clock();

        for(i = 0; i != 1000000; i++)
            msgsnd(msgid, &message, sizeof(message), 0); 

        end = clock();
        cpu_time_used = ((double) (end - start)) / CLOCKS_PER_SEC;

        printf("cpu_time_used = %f\n", cpu_time_used);

        cpu_time_useda_avg += cpu_time_used;
    }

    printf("cpu_time_useda_avg = %f\n", cpu_time_useda_avg/EXPERIMENT_COUNT);

    // // display the message 
    // printf("Data send is : %s \n", message.mesg_text); 

    return 0; 
}

int run_receiving_app()
{
    key_t key; 
    int msgid;

    // ftok to generate unique key 
    key = ftok("/var/tmp/progfile", 65); 

    // msgget creates a message queue 
    // and returns identifier 
    msgid = msgget(key, 0666 | IPC_CREAT); 

    while(1)
    {
        // msgrcv to receive message 
        msgrcv(msgid, &message, sizeof(message), 0, 0);
        // // display the message 
        // printf("Data Received is : %s \n",  
        //                 message.mesg_text); 
    }


    // to destroy the message queue 
    msgctl(msgid, IPC_RMID, NULL); 

    return 0; 
}

int main(int argc, char const *argv[])
{   
    if( argc > 1 )
        if( !strcasecmp(argv[1], "cli") )
            run_sending_app();
        else if( !strcasecmp(argv[1], "serv") )
            run_receiving_app();

    return 0;
}

Posix的代码:

#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <errno.h>
#include <mqueue.h>
#include <time.h>

#define EXPERIMENT_COUNT 100

clock_t start, end;
double cpu_time_useda_avg = 0;
double cpu_time_used;

#define QUEUE_NAME  "/test_queue"
#define MAX_SIZE    1024
#define MSG_STOP    "exit"

#define CHECK(x) \
    do { \
        if (!(x)) { \
            fprintf(stderr, "%s:%d: ", __func__, __LINE__); \
            perror(#x); \
            exit(-1); \
        } \
    } while (0) \

int mq_run_server()
{
    mqd_t mq;
    struct mq_attr attr;
    char buffer[MAX_SIZE + 1];
    int must_stop = 0;

    /* initialize the queue attributes */
    attr.mq_flags = 0;
    attr.mq_maxmsg = 1000000;
    attr.mq_msgsize = MAX_SIZE;
    attr.mq_curmsgs = 0;

    /* create the message queue */
    // mq_unlink(QUEUE_NAME); exit(0);

    mq = mq_open(QUEUE_NAME, O_CREAT | O_RDONLY, 0644, &attr);
    CHECK((mqd_t)-1 != mq);

    do {
        ssize_t bytes_read;

        /* receive the message */
        bytes_read = mq_receive(mq, buffer, MAX_SIZE, NULL);
        CHECK(bytes_read >= 0);

        buffer[bytes_read] = '\0';
        if (! strncmp(buffer, MSG_STOP, strlen(MSG_STOP)))
        {
            must_stop = 1;
        }
        else
        {
            // printf("Received: %s\n", buffer);
        }
    } while (!must_stop);

    /* cleanup */
    CHECK((mqd_t)-1 != mq_close(mq));
    CHECK((mqd_t)-1 != mq_unlink(QUEUE_NAME));

    return 0;
}

int mq_run_client()
{
    mqd_t mq;
    char buffer[MAX_SIZE];

    /* open the mail queue */
    mq = mq_open(QUEUE_NAME, O_WRONLY);
    CHECK((mqd_t)-1 != mq);


    strcpy(buffer, "hello");
    int e,i;

    for( e = 0; e < EXPERIMENT_COUNT; e++ )
    {
        start = clock();

        for(i = 0; i != 1000000; i++)
            mq_send(mq, buffer, MAX_SIZE, 0);

        end = clock();
        cpu_time_used = ((double) (end - start)) / CLOCKS_PER_SEC;

        printf("cpu_time_used = %f\n", cpu_time_used);

        cpu_time_useda_avg += cpu_time_used;
    }

    printf("cpu_time_useda_avg = %f\n", cpu_time_useda_avg/EXPERIMENT_COUNT);


    // printf("Send to server (enter \"exit\" to stop it):\n");
    // 
    // do {
    //     printf("> ");
    //     fflush(stdout);

    //     memset(buffer, 0, MAX_SIZE);
    //     fgets(buffer, MAX_SIZE, stdin);

    //     /* send the message */
    //     CHECK(0 <= mq_send(mq, buffer, MAX_SIZE, 0));

    // } while (strncmp(buffer, MSG_STOP, strlen(MSG_STOP)));

    /* cleanup */
    CHECK((mqd_t)-1 != mq_close(mq));

    return 0;
}

int main(int argc, char const *argv[])
{
    if( argc > 1 )
        if( !strcasecmp(argv[1], "serv") )
            mq_run_server();
        else if( !strcasecmp(argv[1], "cli") )
            mq_run_client();

    return 0;
}

在这两种情况下,如果我运行./a.out serv,服务器端都会运行,如果我运行./a.out cli,则客户端会运行。

我的问题

为什么与System V MQ相比,Posix MQ性能如此低,而http://man7.org/linux/man-pages/man7/mq_overview.7.html说Posix MQ 与System V MQ 非常相似?

展开
收起
几许相思几点泪 2019-12-29 20:40:36 3227 0
1 条回答
写回答
取消 提交回答
  • 尝试更改:

    mq_send(mq, buffer, MAX_SIZE, 0);
    
    

    mq_send(mq, buffer, 104, 0);
    
    

    要么

    #define MAX_SIZE 104
    
    

    这样您就可以比较相同的数据量。

    2019-12-29 20:40:56
    赞同 展开评论 打赏

多个子产品线联合打造金融级高可用消息服务以及对物联网的原生支持,覆盖多行业。

热门讨论

热门文章

相关电子书

更多
企业互联网架构之消息队列 立即下载
基于消息队列RocketMQ的大型分布式应用上云最佳实践 立即下载
云原生消息队列Apache RocketMQ 立即下载