ZMQ之处理多个套接字(zmq_poll函数)

简介: ZMQ之处理多个套接字(zmq_poll函数)

ZMQ之处理多个套接字(zmq_poll函数)


在之前的示例中,主程序的循环体内会做以下几件事:

               1、等待套接字的消息。

               2、处理消息。

               3、返回第一步。

       如果我们想要读取多个套接字中的消息呢?最简单的方法是将套接字连接到多个端点上,让ZMQ使用公平队列的机制来接受消息。如果不同端点上的套接字类型是一致的,那可以使用这种方法。但是,如果一个套接字的类型是PULL,另一个是PUB怎么办?如果现在开始混用套接字类型,那将来就没有可靠性可言了。

       正确的方法应该是使用zmq_poll()函数。更好的方法是将zmq_poll()包装成一个框架,编写一个事件驱动的反应器,但这个就比较复杂了,我们这里暂不讨论。

       我们先不使用zmq_poll(),而用NOBLOCK(非阻塞)的方式来实现从多个套接字读取消息的功能。下面将气象信息服务和并行处理这两个示例结合起来:        

       msreader: Multiple socket reader in C

#include "zhelpers.h"
int main (void) 
{
    //  准备上下文和套接字
    void *context = zmq_init (1);
    //  连接至任务分发器
    void *receiver = zmq_socket (context, ZMQ_PULL);
    zmq_connect (receiver, "tcp://localhost:5557");
    //  连接至天气服务
    void *subscriber = zmq_socket (context, ZMQ_SUB);
    zmq_connect (subscriber, "tcp://localhost:5556");
    zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, "10001 ", 6);
    //  处理从两个套接字中接收到的消息
    //  这里我们会优先处理从任务分发器接收到的消息
    while (1) {
        //  处理等待中的任务
        int rc;
        for (rc = 0; !rc; ) {
            zmq_msg_t task;
            zmq_msg_init (&task);
            if ((rc = zmq_recv (receiver, &task, ZMQ_NOBLOCK)) == 0) {
                //  处理任务
            }
            zmq_msg_close (&task);
        }
        //  处理等待中的气象更新
        for (rc = 0; !rc; ) {
            zmq_msg_t update;
            zmq_msg_init (&update);
            if ((rc = zmq_recv (subscriber, &update, ZMQ_NOBLOCK)) == 0) {
                //  处理气象更新
            }
            zmq_msg_close (&update);
        }
        // 没有消息,等待1毫秒
        s_sleep (1);
    }
    //  程序不会运行到这里,但还是做正确的退出清理工作
    zmq_close (receiver);
    zmq_close (subscriber);
    zmq_term (context);
    return 0;
}

这种方式的缺点之一是,在收到第一条消息之前会有1毫秒的延迟,这在高压力的程序中还是会构成问题的。此外,你还需要翻阅诸如nanosleep()的函数,不会造成循环次数的激增。

       示例中将任务分发器的优先级提升了,你可以做一个改进,轮流处理消息,正如ZMQ内部做的公平队列机制一样。

       下面,让我们看看如何用zmq_poll()来实现同样的功能:

       mspoller: Multiple socket poller in C

#include "zhelpers.h"
int main (void) 
{
    void *context = zmq_init (1);
    //  连接任务分发器
    void *receiver = zmq_socket (context, ZMQ_PULL);
    zmq_connect (receiver, "tcp://localhost:5557");
    //  连接气象更新服务
    void *subscriber = zmq_socket (context, ZMQ_SUB);
    zmq_connect (subscriber, "tcp://localhost:5556");
    zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, "10001 ", 6);
    //  初始化轮询对象
    zmq_pollitem_t items [] = {
        { receiver, 0, ZMQ_POLLIN, 0 },
        { subscriber, 0, ZMQ_POLLIN, 0 }
    };
    //  处理来自两个套接字的消息
    while (1) {
        zmq_msg_t message;
        zmq_poll (items, 2, -1);
        if (items [0].revents & ZMQ_POLLIN) {
            zmq_msg_init (&message);
            zmq_recv (receiver, &message, 0);
            //  处理任务
            zmq_msg_close (&message);
        }
        if (items [1].revents & ZMQ_POLLIN) {
            zmq_msg_init (&message);
            zmq_recv (subscriber, &message, 0);
            //  处理气象更新
            zmq_msg_close (&message);
        }
    }
    //  程序不会运行到这儿
    zmq_close (receiver);
    zmq_close (subscriber);
    zmq_term (context);
    return 0;
}
相关文章
|
6月前
|
消息中间件 Unix Linux
【ZMQ polling机制】ZMQ异步接收机制以及与epoll/select的对比分析
【ZMQ polling机制】ZMQ异步接收机制以及与epoll/select的对比分析
422 0
|
12月前
|
消息中间件 C语言
一个使用zmq_recv 接收 5555端口的demo
这个程序会一直运行,接收来自5555端口的消息,并打印出来。每接收到一个消息,它会等待1秒,然后发送一个"World"的回复。
75 0
|
消息中间件 网络协议
ZMQ与TCP的区别
ZMQ与TCP的区别
ZMQ与TCP的区别
|
数据库
为socket的recv/send设置超时
为socket的recv/send设置超时
257 0
stream_socket_accept设置非阻塞,socket_accept设置非阻塞
stream_socket_accept设置非阻塞,socket_accept设置非阻塞
196 0