ZMQ/ZeroMQ的三种消息模式
一、 Reuqest-Reply(请求-应答模式)
1、使用Request-Reply模式,需要遵循一定的规律。
2、客户端必要先发送消息,在接收消息;服务端必须先进行接收客户端发送过来的消息,在发送应答给客户端,如此循环
3、服务端和客户端谁先启动,效果都是一样的。
4、服务端在收到消息之前,会一直阻塞,等待客户端连上来。
创建一个客户端和服务端,客户端发送消息给服务端,服务端返回消息给客户端,客户端和服务器谁先启动都可以。
server.cpp
#include <zmq.hpp> #include <string> #include <iostream> #ifndef _WIN32 #include <unistd.h> #else #include <windows.h> #define sleep(n) Sleep(n) #endif int main () { // Prepare our context and socket zmq::context_t context (2); zmq::socket_t socket (context, zmq::socket_type::rep); socket.bind ("tcp://*:5555"); while (true) { zmq::message_t request; // Wait for next request from client socket.recv (request, zmq::recv_flags::none); std::cout << "Received Hello" << std::endl; // Do some 'work' sleep(1); // Send reply back to client zmq::message_t reply (5); memcpy (reply.data (), "World", 5); socket.send (reply, zmq::send_flags::none); } return 0; }
client.cpp
#include <zmq.hpp> #include <string> #include <iostream> int main () { // Prepare our context and socket zmq::context_t context (1); zmq::socket_t socket (context, zmq::socket_type::req); std::cout << "Connecting to hello world server..." << std::endl; socket.connect ("tcp://localhost:5555"); // Do 10 requests, waiting each time for a response for (int request_nbr = 0; request_nbr != 10; request_nbr++) { zmq::message_t request (5); memcpy (request.data (), "Hello", 5); std::cout << "Sending Hello " << request_nbr << "..." << std::endl; socket.send (request, zmq::send_flags::none); // Get the reply. zmq::message_t reply; socket.recv (reply, zmq::recv_flags::none); std::cout << "Received World " << request_nbr << std::endl; } return 0; }
二、Publisher-Subscriber(发布-订阅模式)
Publisher-Subscriber模式,消息是单向流动的,发布者只能发布消息,不能接受消息;订阅者只能接受消息,不能发送消息。
服务端发布消息的过程中,如果有订阅者退出,不影响发布者继续发布消息,当订阅者再次连接上来,收到的消息是后来发布的消息
比较晚加入的订阅者,或者中途离开的订阅者,必然会丢掉一部分信息
如果发布者停止,所有的订阅者会阻塞,等发布者再次上线的时候回继续接受消息。
"慢连接": 我们不知道订阅者是何时开始接受消息的,就算启动"订阅者",在启动"发布者", "订阅者"还是会缺失一部分的消息,因为建立连接是需要时间的,虽然时间很短,但不是零。ZMQ在后台是进行异步的IO传输,在建立TCP连接的短短的时间段内,ZMQ就可以发送很多消息了。
publisher.cpp
#include <zmq.hpp> #include <stdio.h> #include <stdlib.h> #include <time.h> #if (defined (WIN32)) #include <zhelpers.hpp> #endif #define within(num) (int) ((float) num * random () / (RAND_MAX + 1.0)) int main () { // Prepare our context and publisher zmq::context_t context (1); zmq::socket_t publisher (context, zmq::socket_type::pub); publisher.bind("tcp://*:5556"); publisher.bind("ipc://weather.ipc"); // Not usable on Windows. // Initialize random number generator srandom ((unsigned) time (NULL)); while (1) { int zipcode, temperature, relhumidity; // Get values that will fool the boss zipcode = within (100000); temperature = within (215) - 80; relhumidity = within (50) + 10; // Send message to all subscribers zmq::message_t message(20); snprintf ((char *) message.data(), 20 , "%05d %d %d", zipcode, temperature, relhumidity); publisher.send(message, zmq::send_flags::none); } return 0; }
subscriber.cpp
#include <zmq.hpp> #include <iostream> #include <sstream> int main (int argc, char *argv[]) { zmq::context_t context (1); // Socket to talk to server std::cout << "Collecting updates from weather server...\n" << std::endl; zmq::socket_t subscriber (context, zmq::socket_type::sub); subscriber.connect("tcp://localhost:5556"); // Subscribe to zipcode, default is NYC, 10001 const char *filter = (argc > 1)? argv [1]: "10001 "; subscriber.setsockopt(ZMQ_SUBSCRIBE, filter, strlen (filter)); // Process 100 updates int update_nbr; long total_temp = 0; for (update_nbr = 0; update_nbr < 100; update_nbr++) { zmq::message_t update; int zipcode, temperature, relhumidity; subscriber.recv(update, zmq::recv_flags::none); std::istringstream iss(static_cast<char*>(update.data())); iss >> zipcode >> temperature >> relhumidity ; total_temp += temperature; } std::cout << "Average temperature for zipcode '"<< filter <<"' was "<<(int) (total_temp / update_nbr) <<"F" << std::endl; return 0; }
三、Push-Pull(平行管道模式/分布式处理)
1、Ventilator:任务发布器会生成大量可以并行运算的任务。
2、Worker:有一组worker会处理这些任务。
3、Sink:结果接收器会在末端接收所有的Worker的处理结果,进行汇总。
4、Worker上游和"任务发布器"相连,下游和"结果接收器"相连。
5、"任务发布器" 和 "结果接收器"是这个网路结构中比较稳定的部分,由他们绑定至端点。
6、Worker只是连接两个端点。
7、需要等Worker全部启动后,在进行任务分发。Socket的连接会消耗一定时间(慢连接), 如果不尽兴同步的话,第一个Worker启动。
8、会一下子接收很多任务。
9、"任务分发器" 会向Worker均匀的分发任务(负载均衡机制)。
10、"结果接收器" 会均匀地从Worker处收集消息(公平队列机制)。
taskvent.cpp
#include <zmq.hpp> #include <stdlib.h> #include <stdio.h> #include <unistd.h> #include <iostream> #define within(num) (int) ((float) num * random () / (RAND_MAX + 1.0)) int main (int argc, char *argv[]) { zmq::context_t context (1); // Socket to send messages on zmq::socket_t sender(context, ZMQ_PUSH); sender.bind("tcp://*:5557"); std::cout << "Press Enter when the workers are ready: " << std::endl; getchar (); std::cout << "Sending tasks to workers...\n" << std::endl; // The first message is "0" and signals start of batch zmq::socket_t sink(context, ZMQ_PUSH); sink.connect("tcp://localhost:5558"); zmq::message_t message(2); memcpy(message.data(), "0", 1); sink.send(message); // Initialize random number generator srandom ((unsigned) time (NULL)); // Send 100 tasks int task_nbr; int total_msec = 0; // Total expected cost in msecs for (task_nbr = 0; task_nbr < 100; task_nbr++) { int workload; // Random workload from 1 to 100msecs workload = within (100) + 1; total_msec += workload; message.rebuild(10); memset(message.data(), '\0', 10); sprintf ((char *) message.data(), "%d", workload); sender.send(message); } std::cout << "Total expected cost: " << total_msec << " msec" << std::endl; sleep (1); // Give 0MQ time to deliver return 0; }
taskwork.cpp
#include "zhelpers.hpp" #include <string> int main (int argc, char *argv[]) { zmq::context_t context(1); // Socket to receive messages on zmq::socket_t receiver(context, ZMQ_PULL); receiver.connect("tcp://localhost:5557"); // Socket to send messages to zmq::socket_t sender(context, ZMQ_PUSH); sender.connect("tcp://localhost:5558"); // Process tasks forever while (1) { zmq::message_t message; int workload; // Workload in msecs receiver.recv(&message); std::string smessage(static_cast<char*>(message.data()), message.size()); std::istringstream iss(smessage); iss >> workload; // Do the work s_sleep(workload); // Send results to sink message.rebuild(); sender.send(message); // Simple progress indicator for the viewer std::cout << "." << std::flush; } return 0; }
tasksink.cpp
#include <zmq.hpp> #include <time.h> #include <sys/time.h> #include <iostream> int main (int argc, char *argv[]) { // Prepare our context and socket zmq::context_t context(1); zmq::socket_t receiver(context,ZMQ_PULL); receiver.bind("tcp://*:5558"); // Wait for start of batch zmq::message_t message; receiver.recv(&message); // Start our clock now struct timeval tstart; gettimeofday (&tstart, NULL); // Process 100 confirmations int task_nbr; int total_msec = 0; // Total calculated cost in msecs for (task_nbr = 0; task_nbr < 100; task_nbr++) { receiver.recv(&message); if (task_nbr % 10 == 0) std::cout << ":" << std::flush; else std::cout << "." << std::flush; } // Calculate and report duration of batch struct timeval tend, tdiff; gettimeofday (&tend, NULL); if (tend.tv_usec < tstart.tv_usec) { tdiff.tv_sec = tend.tv_sec - tstart.tv_sec - 1; tdiff.tv_usec = 1000000 + tend.tv_usec - tstart.tv_usec; } else { tdiff.tv_sec = tend.tv_sec - tstart.tv_sec; tdiff.tv_usec = tend.tv_usec - tstart.tv_usec; } total_msec = tdiff.tv_sec * 1000 + tdiff.tv_usec / 1000; std::cout << "\nTotal elapsed time: " << total_msec << " msec\n" << std::endl; return 0; }