zeromq_传说中最快的消息队列

简介:

Zeromq的资源:

Zeromq模式:

http://blog.codingnow.com/2011/02/zeromq_message_patterns.html

zeromq主页:

http://www.zeromq.org/

Zeromq Guild:

http://zguide.zeromq.org/page:all#Fixing-the-World

Zero wiki:

http://en.wikipedia.org/wiki/%C3%98MQ

zeromq系列:

http://iyuan.iteye.com/blog/972949

Zeromq资源阅读:

ØMQ(Zeromq) 是一个更为高效的传输层

优势是:

1 程序接口库是一个并发框架

2 在集群和超级计算机上表现得比TCP更快

3 通过inproc, IPC, TCP, 和 multicast进行传播消息

4 通过发散,订阅,流水线,请求的方式连接

5 对于不定规模的多核消息传输应用使用异步IO

6 有非常大并且活跃的开源社区

7 支持30+的语言

8 支持多种系统

 

Zeromq定义为“史上最快的消息队列”

从网络通信的角度看,它处于会话层之上,应用层之下。

ØMQ (ZeroMQ, 0MQ, zmq) looks like an embeddable networking library but acts like a concurrency framework. It gives you sockets that carry whole messages across various transports like in-process, inter-process, TCP, and multicast. You can connect sockets N-to-N with patterns like fanout, pub-sub, task distribution, and request-reply. It's fast enough to be the fabric for clustered products. Its asynchronous I/O model gives you scalable multicore applications, built as asynchronous message-processing tasks. It has a score of language APIs and runs on most operating systems. ØMQ is from iMatix and is LGPL open source.


Zeromq中传递的数据格式是由用户自己负责,就是说如果server发送的string是有带"\0"的,那么client就必须要知道有这个

 

Pub_Sub模式。

the subscriber will always miss the first messages that the publisher sends. This is because as the subscriber connects to the publisher (something that takes a small but non-zero time), the publisher may already be sending messages out.

在这种模式下很可能发布者刚启动时发布的数据出现丢失,原因是用zmq发送速度太快,在订阅者尚未与发布者建立联系时,已经开始了数据发布(内部局域网没这么夸张的)。官网给了两个解决方案;1,发布者sleep一会再发送数据(这个被标注成愚蠢的);2,使用proxy。

Zeromq示例:

1 获取例子

git clone --depth=1 git://github.com/imatix/zguide.git

2 服务器端:

(当服务器收到消息的时候,服务器回复“World”)

      1      

      2      

      3      

      4      

      5      

      6      

      7      

      8      

      9      

      10      

      11      

      12      

      13      

      14      

      15      

      16      

      17      

      18      

      19      

      20      

      21      

      22      

      23      

      24      

      25      

      26      

     

     

   /*

   *  Hello World server

   *  Binds REP socket to tcp://*:5555

   *  Expects "Hello" from client, replies with "World"

   * @author Ian Barber

   */

    

   $context = new ZMQContext(1);

    

   //  Socket to talk to clients

   $responder = new ZMQSocket($context, ZMQ::SOCKET_REP);

   $responder->bind("tcp://*:5555");

    

   while(true) {

       //  Wait for next request from client

       $request = $responder->recv();

       printf ("Received request: [%s]\n", $request);

    

       //  Do some 'work'

       sleep (1);

    

       //  Send reply back to client

       $responder->send("World");    

 

}

     

3 客户端:

(客户端发送消息)

      1      

      2      

      3      

      4      

      5      

      6      

      7      

      8      

      9      

      10      

      11      

      12      

      13      

      14      

      15      

      16      

      17      

      18      

      19      

      20      

      21      

      22      

      23      

     

     

   /*

   *  Hello World client

   *  Connects REQ socket to tcp://localhost:5555

   *  Sends "Hello" to server, expects "World" back

   * @author Ian Barber

   */

    

   $context = new ZMQContext();

    

   //  Socket to talk to server

   echo "Connecting to hello world server…\n";

   $requester = new ZMQSocket($context, ZMQ::SOCKET_REQ);

   $requester->connect("tcp://localhost:5555");

    

   for($request_nbr = 0; $request_nbr != 10; $request_nbr++) {

       printf ("Sending request %d…\n", $request_nbr);

       $requester->send("Hello");

        

       $reply = $requester->recv();

       printf ("Received reply %d: [%s]\n", $request_nbr, $reply);

 

}

     

      1      

 

天气气候订阅系统:(pub-sub)

1 server端:

      1      

      2      

      3      

      4      

      5      

      6      

      7      

      8      

      9      

      10      

      11      

      12      

      13      

      14      

      15      

      16      

      17      

      18      

      19      

      20      

      21      

      22      

      23      

      24      

     

     

   /*

   *  Weather update server

   *  Binds PUB socket to tcp://*:5556

   *  Publishes random weather updates

   * @author Ian Barber

   */

    

   //  Prepare our context and publisher

   $context = new ZMQContext();

   $publisher = $context->getSocket(ZMQ::SOCKET_PUB);

   $publisher->bind("tcp://*:5556");

   $publisher->bind("ipc://weather.ipc");

    

   while (true) {

       //  Get values that will fool the boss

       $zipcode     = mt_rand(0, 100000);

       $temperature = mt_rand(-80, 135);

       $relhumidity = mt_rand(10, 60);

    

       //  Send message to all subscribers

       $update = sprintf ("%05d %d %d", $zipcode, $temperature, $relhumidity);

       $publisher->send($update);

   }

     

2 client端:

      1      

      2      

      3      

      4      

      5      

      6      

      7      

      8      

      9      

      10      

      11      

      12      

      13      

      14      

      15      

      16      

      17      

      18      

      19      

      20      

      21      

      22      

      23      

      24      

      25      

      26      

      27      

      28      

     

     

   /*

   *  Weather update client

   *  Connects SUB socket to tcp://localhost:5556

   *  Collects weather updates and finds avg temp in zipcode

   * @author Ian Barber

   */

    

   $context = new ZMQContext();

    

   //  Socket to talk to server

   echo "Collecting updates from weather server…", PHP_EOL;

   $subscriber = new ZMQSocket($context, ZMQ::SOCKET_SUB);

   $subscriber->connect("tcp://localhost:5556");

    

   //  Subscribe to zipcode, default is NYC, 10001

   $filter = $_SERVER['argc'] > 1 ? $_SERVER['argv'][1] : "10001";

   $subscriber->setSockOpt(ZMQ::SOCKOPT_SUBSCRIBE, $filter);

    

   //  Process 100 updates

   $total_temp = 0;

   for ($update_nbr = 0; $update_nbr < 100; $update_nbr++) {

       $string = $subscriber->recv();

       sscanf ($string, "%d %d %d", $zipcode, $temperature, $relhumidity);

       $total_temp += $temperature;

   }

   printf ("Average temperature for zipcode '%s' was %dF\n",

       $filter, (int) ($total_temp / $update_nbr));

     

      1      

------------------------

      1      

pub-sub的proxy模式:

      1      

图示是:

clip_image001_thumb[2]

Proxy节点的代码:

      1      

      2      

      3      

      4      

      5      

      6      

      7      

      8      

      9      

      10      

      11      

      12      

      13      

      14      

      15      

      16      

      17      

      18      

      19      

      20      

      21      

      22      

      23      

      24      

      25      

      26      

      27      

      28      

      29      

      30      

      31      

      32      

     

     

   /*

   *  Weather proxy device

   * @author Ian Barber

   */

    

   $context = new ZMQContext();

    

   //  This is where the weather server sits

   $frontend = new ZMQSocket($context, ZMQ::SOCKET_SUB);

   $frontend->connect("tcp://192.168.55.210:5556");

    

   //  This is our public endpoint for subscribers

   $backend = new ZMQSocket($context, ZMQ::SOCKET_PUB);

   $backend->bind("tcp://10.1.1.0:8100");

    

   //  Subscribe on everything

   $frontend->setSockOpt(ZMQ::SOCKOPT_SUBSCRIBE, "");

    

   //  Shunt messages out to our own subscribers

   while(true) {

       while(true) {

           //  Process all parts of the message

           $message = $frontend->recv();

           $more = $frontend->getSockOpt(ZMQ::SOCKOPT_RCVMORE);

           $backend->send($message, $more ? ZMQ::SOCKOPT_SNDMORE : 0);

           if(!$more) {

               break; // Last message part

           }

       }

 

}

     

其实就是proxy同时是作为pub又作为sub的


本文转自轩脉刃博客园博客,原文链接:http://www.cnblogs.com/yjf512/archive/2012/03/03/2378024.html,如需转载请自行联系原作者

相关文章
|
6月前
|
消息中间件 网络性能优化 开发工具
消息队列 MQ使用问题之如何确保消息的唯一性
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
6月前
|
消息中间件 C语言 RocketMQ
消息队列 MQ操作报错合集之出现"Connection reset by peer"的错误,该如何处理
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
6月前
|
消息中间件 Java C语言
消息队列 MQ使用问题之在使用C++客户端和GBase的ESQL进行编译时出现core dump,该怎么办
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
2月前
|
消息中间件 存储 Kafka
MQ 消息队列核心原理,12 条最全面总结!
本文总结了消息队列的12个核心原理,涵盖消息顺序性、ACK机制、持久化及高可用性等内容。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
|
4月前
|
消息中间件
手撸MQ消息队列——循环数组
队列是一种常用的数据结构,类似于栈,但采用先进先出(FIFO)的原则。生活中常见的排队场景就是队列的应用实例。在数据结构中,队列通常用数组实现,包括入队(队尾插入元素)和出队(队头移除元素)两种基本操作。本文介绍了如何用数组实现队列,包括定义数组长度、维护队头和队尾下标(front 和 tail),并通过取模运算解决下标越界问题。此外,还讨论了队列的空与满状态判断,以及并发和等待机制的实现。通过示例代码展示了队列的基本操作及优化方法,确保多线程环境下的正确性和高效性。
58 0
手撸MQ消息队列——循环数组
|
5月前
|
消息中间件 存储 缓存
一个用过消息队列的人,竟不知为何要用 MQ?
一个用过消息队列的人,竟不知为何要用 MQ?
197 1
|
6月前
|
消息中间件 开发工具 RocketMQ
消息队列 MQ使用问题之一直连接master失败,是什么原因
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
6月前
|
消息中间件 Prometheus 监控
消息队列 MQ使用问题之如何将旧集群的store目录迁移到新集群
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
6月前
|
消息中间件 安全 PHP
消息队列 MQ使用问题之如何获取PHP客户端代码
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。