ZMQ/ZeroMQ的三种消息模式

简介: ZMQ/ZeroMQ的三种消息模式

ZMQ/ZeroMQ的三种消息模式


一、 Reuqest-Reply(请求-应答模式)

       1、使用Request-Reply模式,需要遵循一定的规律。

       2、客户端必要先发送消息,在接收消息;服务端必须先进行接收客户端发送过来的消息,在发送应答给客户端,如此循环

       3、服务端和客户端谁先启动,效果都是一样的。

       4、服务端在收到消息之前,会一直阻塞,等待客户端连上来。

创建一个客户端和服务端,客户端发送消息给服务端,服务端返回消息给客户端,客户端和服务器谁先启动都可以。

       server.cpp

#include <zmq.hpp>
#include <string>
#include <iostream>
#ifndef _WIN32
#include <unistd.h>
#else
#include <windows.h>
#define sleep(n)  Sleep(n)
#endif
int main () {
    //  Prepare our context and socket
    zmq::context_t context (2);
    zmq::socket_t socket (context, zmq::socket_type::rep);
    socket.bind ("tcp://*:5555");
    while (true) {
        zmq::message_t request;
        //  Wait for next request from client
        socket.recv (request, zmq::recv_flags::none);
        std::cout << "Received Hello" << std::endl;
        //  Do some 'work'
        sleep(1);
        //  Send reply back to client
        zmq::message_t reply (5);
        memcpy (reply.data (), "World", 5);
        socket.send (reply, zmq::send_flags::none);
    }
    return 0;
}

client.cpp

#include <zmq.hpp>
#include <string>
#include <iostream>
int main ()
{
    //  Prepare our context and socket
    zmq::context_t context (1);
    zmq::socket_t socket (context, zmq::socket_type::req);
    std::cout << "Connecting to hello world server..." << std::endl;
    socket.connect ("tcp://localhost:5555");
    //  Do 10 requests, waiting each time for a response
    for (int request_nbr = 0; request_nbr != 10; request_nbr++) {
        zmq::message_t request (5);
        memcpy (request.data (), "Hello", 5);
        std::cout << "Sending Hello " << request_nbr << "..." << std::endl;
        socket.send (request, zmq::send_flags::none);
        //  Get the reply.
        zmq::message_t reply;
        socket.recv (reply, zmq::recv_flags::none);
        std::cout << "Received World " << request_nbr << std::endl;
    }
    return 0;
}

二、Publisher-Subscriber(发布-订阅模式)

       Publisher-Subscriber模式,消息是单向流动的,发布者只能发布消息,不能接受消息;订阅者只能接受消息,不能发送消息。

       服务端发布消息的过程中,如果有订阅者退出,不影响发布者继续发布消息,当订阅者再次连接上来,收到的消息是后来发布的消息

       比较晚加入的订阅者,或者中途离开的订阅者,必然会丢掉一部分信息

       如果发布者停止,所有的订阅者会阻塞,等发布者再次上线的时候回继续接受消息。

       "慢连接": 我们不知道订阅者是何时开始接受消息的,就算启动"订阅者",在启动"发布者", "订阅者"还是会缺失一部分的消息,因为建立连接是需要时间的,虽然时间很短,但不是零。ZMQ在后台是进行异步的IO传输,在建立TCP连接的短短的时间段内,ZMQ就可以发送很多消息了。

publisher.cpp

#include <zmq.hpp>
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#if (defined (WIN32))
#include <zhelpers.hpp>
#endif
#define within(num) (int) ((float) num * random () / (RAND_MAX + 1.0))
int main () {
    //  Prepare our context and publisher
    zmq::context_t context (1);
    zmq::socket_t publisher (context, zmq::socket_type::pub);
    publisher.bind("tcp://*:5556");
    publisher.bind("ipc://weather.ipc");        // Not usable on Windows.
    //  Initialize random number generator
    srandom ((unsigned) time (NULL));
    while (1) {
        int zipcode, temperature, relhumidity;
        //  Get values that will fool the boss
        zipcode     = within (100000);
        temperature = within (215) - 80;
        relhumidity = within (50) + 10;
        //  Send message to all subscribers
        zmq::message_t message(20);
        snprintf ((char *) message.data(), 20 ,
          "%05d %d %d", zipcode, temperature, relhumidity);
        publisher.send(message, zmq::send_flags::none);
    }
    return 0;
}

subscriber.cpp

#include <zmq.hpp>
#include <iostream>
#include <sstream>
int main (int argc, char *argv[])
{
    zmq::context_t context (1);
    //  Socket to talk to server
    std::cout << "Collecting updates from weather server...\n" << std::endl;
    zmq::socket_t subscriber (context, zmq::socket_type::sub);
    subscriber.connect("tcp://localhost:5556");
    //  Subscribe to zipcode, default is NYC, 10001
  const char *filter = (argc > 1)? argv [1]: "10001 ";
    subscriber.setsockopt(ZMQ_SUBSCRIBE, filter, strlen (filter));
    //  Process 100 updates
    int update_nbr;
    long total_temp = 0;
    for (update_nbr = 0; update_nbr < 100; update_nbr++) {
        zmq::message_t update;
        int zipcode, temperature, relhumidity;
        subscriber.recv(update, zmq::recv_flags::none);
        std::istringstream iss(static_cast<char*>(update.data()));
    iss >> zipcode >> temperature >> relhumidity ;
    total_temp += temperature;
    }
    std::cout   << "Average temperature for zipcode '"<< filter
          <<"' was "<<(int) (total_temp / update_nbr) <<"F"
          << std::endl;
    return 0;
}

三、Push-Pull(平行管道模式/分布式处理)

       1、Ventilator:任务发布器会生成大量可以并行运算的任务。

       2、Worker:有一组worker会处理这些任务。

       3、Sink:结果接收器会在末端接收所有的Worker的处理结果,进行汇总。

       4、Worker上游和"任务发布器"相连,下游和"结果接收器"相连。

       5、"任务发布器" 和 "结果接收器"是这个网路结构中比较稳定的部分,由他们绑定至端点。

       6、Worker只是连接两个端点。

       7、需要等Worker全部启动后,在进行任务分发。Socket的连接会消耗一定时间(慢连接), 如果不尽兴同步的话,第一个Worker启动。

       8、会一下子接收很多任务。

       9、"任务分发器" 会向Worker均匀的分发任务(负载均衡机制)。

       10、"结果接收器" 会均匀地从Worker处收集消息(公平队列机制)。

taskvent.cpp

#include <zmq.hpp>
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <iostream>
#define within(num) (int) ((float) num * random () / (RAND_MAX + 1.0))
int main (int argc, char *argv[])
{
    zmq::context_t context (1);
    //  Socket to send messages on
    zmq::socket_t  sender(context, ZMQ_PUSH);
    sender.bind("tcp://*:5557");
    std::cout << "Press Enter when the workers are ready: " << std::endl;
    getchar ();
    std::cout << "Sending tasks to workers...\n" << std::endl;
    //  The first message is "0" and signals start of batch
    zmq::socket_t sink(context, ZMQ_PUSH);
    sink.connect("tcp://localhost:5558");
    zmq::message_t message(2);
    memcpy(message.data(), "0", 1);
    sink.send(message);
    //  Initialize random number generator
    srandom ((unsigned) time (NULL));
    //  Send 100 tasks
    int task_nbr;
    int total_msec = 0;     //  Total expected cost in msecs
    for (task_nbr = 0; task_nbr < 100; task_nbr++) {
        int workload;
        //  Random workload from 1 to 100msecs
        workload = within (100) + 1;
        total_msec += workload;
        message.rebuild(10);
        memset(message.data(), '\0', 10);
        sprintf ((char *) message.data(), "%d", workload);
        sender.send(message);
    }
    std::cout << "Total expected cost: " << total_msec << " msec" << std::endl;
    sleep (1);              //  Give 0MQ time to deliver
    return 0;
}

taskwork.cpp

#include "zhelpers.hpp"
#include <string>
int main (int argc, char *argv[])
{
    zmq::context_t context(1);
    //  Socket to receive messages on
    zmq::socket_t receiver(context, ZMQ_PULL);
    receiver.connect("tcp://localhost:5557");
    //  Socket to send messages to
    zmq::socket_t sender(context, ZMQ_PUSH);
    sender.connect("tcp://localhost:5558");
    //  Process tasks forever
    while (1) {
        zmq::message_t message;
        int workload;           //  Workload in msecs
        receiver.recv(&message);
        std::string smessage(static_cast<char*>(message.data()), message.size());
        std::istringstream iss(smessage);
        iss >> workload;
        //  Do the work
        s_sleep(workload);
        //  Send results to sink
        message.rebuild();
        sender.send(message);
        //  Simple progress indicator for the viewer
        std::cout << "." << std::flush;
    }
    return 0;
}

tasksink.cpp

#include <zmq.hpp>
#include <time.h>
#include <sys/time.h>
#include <iostream>
int main (int argc, char *argv[])
{
    //  Prepare our context and socket
    zmq::context_t context(1);
    zmq::socket_t receiver(context,ZMQ_PULL);
    receiver.bind("tcp://*:5558");
    //  Wait for start of batch
    zmq::message_t message;
    receiver.recv(&message);
    //  Start our clock now
    struct timeval tstart;
    gettimeofday (&tstart, NULL);
    //  Process 100 confirmations
    int task_nbr;
    int total_msec = 0;     //  Total calculated cost in msecs
    for (task_nbr = 0; task_nbr < 100; task_nbr++) {
        receiver.recv(&message);
        if (task_nbr % 10 == 0)
            std::cout << ":" << std::flush;
        else
            std::cout << "." << std::flush;
    }
    //  Calculate and report duration of batch
    struct timeval tend, tdiff;
    gettimeofday (&tend, NULL);
    if (tend.tv_usec < tstart.tv_usec) {
        tdiff.tv_sec = tend.tv_sec - tstart.tv_sec - 1;
        tdiff.tv_usec = 1000000 + tend.tv_usec - tstart.tv_usec;
    }
    else {
        tdiff.tv_sec = tend.tv_sec - tstart.tv_sec;
        tdiff.tv_usec = tend.tv_usec - tstart.tv_usec;
    }
    total_msec = tdiff.tv_sec * 1000 + tdiff.tv_usec / 1000;
    std::cout << "\nTotal elapsed time: " << total_msec << " msec\n" << std::endl;
    return 0;
}


相关文章
|
消息中间件 存储 网络协议
ZMQ/ZeroMQ简介
ZMQ/ZeroMQ简介
|
2月前
|
消息中间件
RabbitMQ的 RPC 消息模式你会了吗?
【9月更文挑战第11天】RabbitMQ 的 RPC(远程过程调用)消息模式允许客户端向服务器发送请求并接收响应。其基本原理包括:1) 客户端发送请求,创建回调队列并设置关联标识符;2) 服务器接收请求并发送响应至回调队列;3) 客户端根据关联标识符接收并匹配响应。实现步骤涵盖客户端和服务器的连接、信道创建及请求处理。注意事项包括关联标识符唯一性、回调队列管理、错误处理及性能考虑。RPC 模式适用于构建可靠的分布式应用程序,但需根据需求调整优化。
|
3月前
|
消息中间件 安全 Java
构建基于RabbitMQ的安全消息传输管道
【8月更文第28天】在分布式系统中,消息队列如RabbitMQ为应用间的数据交换提供了可靠的支持。然而,随着数据的敏感性增加,确保这些消息的安全传输变得至关重要。本文将探讨如何在RabbitMQ中实施一系列安全措施,包括加密通信、认证和授权机制,以保护敏感信息。
85 1
|
3月前
|
消息中间件
RabbitMQ广播模式
RabbitMQ广播模式
62 1
|
4月前
|
消息中间件 RocketMQ
消息队列 MQ使用问题之如何使用SockJS和Stomp与RabbitMQ建立连接
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
消息中间件 存储 安全
zeromq怎么一个端口发送多个主题
我们这里使用czmq4 版本处理。 在CZMQ的版本4中,在一个端口上发布多个订阅主题。这是通过使用PUB/SUB模式实现的。在这种模式下,一个或多个发布者将消息发布到一个或多个主题,然后一个或多个订阅者可以订阅一个或多个主题来接收消息。
170 0
|
XML 消息中间件 数据格式
[原创]AMQP-RabbitMQ/6/RPC模式/关注消息处理结果
[原创]AMQP-RabbitMQ/6/RPC模式/关注消息处理结果
[原创]AMQP-RabbitMQ/6/RPC模式/关注消息处理结果
|
消息中间件 缓存 JSON
我的mqtt协议和emqttd开源项目个人理解(13) - Hook使用和连接Kafka发送消息,使用brod库
我的mqtt协议和emqttd开源项目个人理解(13) - Hook使用和连接Kafka发送消息,使用brod库
359 0
|
消息中间件 Kafka 开发工具
我的mqtt协议和emqttd开源项目个人理解(12) - Hook使用和连接Kafka发送消息,使用ekaf库
我的mqtt协议和emqttd开源项目个人理解(12) - Hook使用和连接Kafka发送消息,使用ekaf库
476 0
|
存储 网络性能优化 数据库
我的mqtt协议和emqttd开源项目个人理解(8) - 客户端subscribe消息的源码分析
我的mqtt协议和emqttd开源项目个人理解(8) - 客户端subscribe消息的源码分析
155 0