异步消息队列zeromq实现服务器间高性能通信

简介:

ZeroMQ 是一个很有个性的项目,它原来是定位为“史上最快消息队列”,所以名字里面有“MQ”两个字母,但是后来逐渐演变发展,慢慢淡化了消息队列的身影,改称为消息内核,或者消息层了。从网络通信的角度看,它处于会话层之上,应用层之下,有了它,你甚至不需要自己写一行的socket函数调用就能完成复杂的网络通信工作。


服务端 :

1
2
3
4
5
6
7
8
9
10
import  zeromq
import  console;
var context  =  zeromq.context()
var responder  =  context.zmq_socket_reply()  / / 创建套接字
responder.bind(   "tcp://*:5559" )
console.log( "服务端已启动" )
do {
console.log( "服务端收到消息" ,responder.recv() );
responder.send( "World" )
} while ( sleep ( 1 ) )


客户端:

1
2
3
4
5
6
7
8
9
import  zeromq
import  console;
var context  =  zeromq.context()
var requester  =  context.zmq_socket_request();
requester.connect(  "tcp://localhost:5559"  )
requester.send( "Hello" );   / / 发送消息
var  str  =  requester.recv();  / / 接收字符串
console.log ( "客户端收到消息 " str  );
context.term();  / / 关闭


三种基本模式(它有很多种)

1. 请求应答模式(req 和 rep

消息双向的,有来有往,req端请求的消息,rep端必须答复给req

2. 订阅发布模式 (sub 和 pub

消息单向的,有去无回的。可按照发布端可发布制定主题的消息,订阅端可订阅喜欢的主题,订阅端只会收到自己已经订阅的主题。发布端发布一条消息,可被多个订阅端同事收到。

3. push pull模式

消息单向的,也是有去无回的。push的任何一个消息,始终只会有一个pull端收到消息.

后续的代理模式和路由模式等都是在三种基本模式上面的扩展或变异。


阻塞 和 非阻塞


以上三种基本模式都支持阻塞模式和非阻塞模式。

req 和 rep的阻塞模式是这样的(其实跟原生的socket实现也非常像)


大家用过socket的,客户端要是先启动的话,会连接失败,或者是短时间内有超时问题。


224720261.png


服务端


1
2
3
4
5
6
7
8
9
10
11
12
13
14
# Echo server program
import  socket
HOST =  ''                  # Symbolic name meaning all available interfaces
PORT =  50007               # Arbitrary non-privileged port
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind((HOST, PORT))
s.listen( 1 )
conn, addr = s.accept()
print  'Connected by' , addr
while  1 :
data = conn.recv( 1024 )
if  not data:  break
conn.sendall(data)
conn.close()


客户端:

1
2
3
4
5
6
7
8
9
10
# Echo client program
import  socket
HOST  =  'daring.cwi.nl'     # The remote host
PORT  =  50007               # The same port as used by the server
=  socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((HOST, PORT))
s.sendall( 'Hello, world' )
data  =  s.recv( 1024 )
s.close()
print  'Received' repr (data)


如果使用ActiveMQ/RabbitMQ之类的有代理MQ系统,只要保证MQ代理最先启动, 就可以保证系统的正常运行。而对于无代理的ZeroMQ来说,似乎比较难办。 在刚刚开始使用ZeroMQ时,我也一直担心这个问题,总是小心翼翼地首先启动调 用bind指令的程序,然后启动执行connect指令的程序。这样其实只是利用了 ZeroMQ的高速数据传输能力,以及ZeroMQ对IPC和socket的良好封装特性,还是 没有解决进程启动顺序的问题。后来,偶然实验了一下,发现bind程序和 connect程序无论谁先启动,其实都不影响整个系统的正常运行。


咱们再用mq测试下

1
2
3
4
5
6
7
8
9
10
11
12
13
import  zmq
context = zmq.Context()
# Socket to talk to server
print  "Connecting to hello world server…"
socket = context.socket(zmq.REQ)
socket.connect ( "tcp://localhost:5555" )
# Do  10  requests, waiting  each  time  for  a response
for  request  in  range ( 10 ):
print  "Sending request " , request, "…"
socket.send ( "Hello" )
# Get the reply.
message = socket.recv()
print  "Received reply " , request,  "[" , message,  "]"



1
2
3
4
5
6
7
8
9
10
11
12
13
import  zmq
import  time
context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind( "tcp://*:5555" )
while  True:
#  Wait  for  next request from client
message = socket.recv()
print  "Received request: " , message
#  Do some  'work'
time.sleep ( 1 )        #   Do some  'work'
#  Send reply back to client
socket.send( "World" )



225025331.png


可以看出发布者绑定绑定一个端口,订阅者通过连接发布者接受订阅的消息。

官网描述这种模式要注意以下几点:

####

这里的Publish-Subscribe模型是一个很典型的PUB-SUB模型,即发布者(Publisher)只能发送数据,它发送时指明发送数据的类型,而订阅者(Subscriber)则只接收它关心的类型的消息。

###

1. pub/sub模式下,sub事实上可以连接多个pub,每次只连接一个connect,所以接收到的消息可以是叫错的,以至于不会单个pub掩盖了其他pub


2. 如果存在某个pub没有被任何sub连接,则该pub会丢弃所有的消息


3. 如果你采用tcp的连接方式,sub很慢,消息将会堆积在pub,后期会对该问题有个较好的解决


4. 目前的而版本,过滤发生在sub端,而不是pub端,意思就是说一个pub会发送所有的消息到所有的sub, 由sub决定是要drop这个msg.


zeromq是lib库,部署完成后可自行编写server和client,编译时指定-lzmq即可


读速度 | 写速度
send 1000000 message cost 659 ms | recv 50000 message cost 36 ms
send 1000000 message cost 597 ms | recv 50000 message cost 33 ms
send 1000000 message cost 735 ms | recv 50000 message cost 34 ms
send 1000000 message cost 727 ms | recv 50000 message cost 33 ms
send 1000000 message cost 741 ms | recv 50000 message cost 33 ms
send 1000000 message cost 798 ms | recv 50000 message cost 32 ms
send 1000000 message cost 665 ms | recv 50000 message cost 34 ms
平均读取速度 147w | 平均写入速度 144w /s



为题提高性能 可以用gevent框架


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import  gevent
from  gevent_zeromq  import  zmq
# Global Context
context  =  zmq.Context()  #它是GreenContext的一个简写,确保greenlet化socket
def  server():
server_socket  =  context.socket(zmq.REQ)  #创建一个socket,使用mq类型模式REQ/REP(请求/回复,服务器是请求),还有PUB/SUB(发布/订阅),push/pull等
server_socket.bind( "tcp://127.0.0.1:5000" #绑定socket
for  request  in  range ( 1 , 10 ):
server_socket.send( "Hello" )
print ( 'Switched to Server for ' , request)
server_socket.recv()   #这里发生上下文切换
def  client():
client_socket  =  context.socket(zmq.REP)  (客户端是回复)
client_socket.connect( "tcp://127.0.0.1:5000" )   #连接server的socket端口
for  request  in  range ( 1 , 10 ):
client_socket.recv()
print ( 'Switched to Client for ' , request)
client_socket.send( "World" )
publisher  =  gevent.spawn(server)
client     =  gevent.spawn(client)
gevent.joinall([publisher, client])




 本文转自 rfyiamcool 51CTO博客,原文链接:http://blog.51cto.com/rfyiamcool/1207004,如需转载请自行联系原作者



相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
1月前
|
消息中间件 存储 监控
消息队列通信的优缺点
【10月更文挑战第29天】消息队列通信具有诸多优点,如解耦性强、异步通信、缓冲削峰等,能够有效地提高系统的灵活性、可扩展性和稳定性。但同时也存在一些缺点,如系统复杂性增加、性能开销、数据一致性挑战和实时性受限等。在实际应用中,需要根据具体的业务需求和场景,权衡其优缺点,合理地选择和使用消息队列通信机制,以实现系统的高效运行和优化。
|
6月前
|
消息中间件 测试技术 RocketMQ
消息队列 MQ产品使用合集之在异步发送消息函数sendMessage()中出现了错误,错误代码为-3,该如何解决
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
1月前
|
消息中间件 存储 供应链
进程间通信方式-----消息队列通信
【10月更文挑战第29天】消息队列通信是一种强大而灵活的进程间通信机制,它通过异步通信、解耦和缓冲等特性,为分布式系统和多进程应用提供了高效的通信方式。在实际应用中,需要根据具体的需求和场景,合理地选择和使用消息队列,以充分发挥其优势,同时注意其可能带来的复杂性和性能开销等问题。
|
2月前
|
网络协议 Unix Linux
一个.NET开源、快速、低延迟的异步套接字服务器和客户端库
一个.NET开源、快速、低延迟的异步套接字服务器和客户端库
102 4
|
2月前
|
存储 监控 NoSQL
Redis的实现二: c、c++的网络通信编程技术,让服务器处理多个client
本文讨论了在C/C++中实现服务器处理多个客户端的技术,重点介绍了事件循环和非阻塞IO的概念,以及如何在Linux上使用epoll来高效地监控和管理多个文件描述符。
38 0
|
4月前
|
API Windows
揭秘网络通信的魔法:Win32多线程技术如何让服务器化身超级英雄,同时与成千上万客户端对话!
【8月更文挑战第16天】在网络编程中,客户/服务器模型让客户端向服务器发送请求并接收响应。Win32 API支持在Windows上构建此类应用。首先要初始化网络环境并通过`socket`函数创建套接字。服务器需绑定地址和端口,使用`bind`和`listen`函数准备接收连接。对每个客户端调用`accept`函数并在新线程中处理。客户端则通过`connect`建立连接,双方可通过`send`和`recv`交换数据。多线程提升服务器处理能力,确保高效响应。
59 6
|
4月前
|
网络协议 安全 Unix
6! 用Python脚本演示TCP 服务器与客户端通信过程!
6! 用Python脚本演示TCP 服务器与客户端通信过程!
|
4月前
|
运维 安全 网络安全
运维笔记:基于阿里云跨地域服务器通信
运维笔记:基于阿里云跨地域服务器通信
209 1
|
4月前
|
网络协议 C# 开发者
WPF与Socket编程的完美邂逅:打造流畅网络通信体验——从客户端到服务器端,手把手教你实现基于Socket的实时数据交换
【8月更文挑战第31天】网络通信在现代应用中至关重要,Socket编程作为其实现基础,即便在主要用于桌面应用的Windows Presentation Foundation(WPF)中也发挥着重要作用。本文通过最佳实践,详细介绍如何在WPF应用中利用Socket实现网络通信,包括创建WPF项目、设计用户界面、实现Socket通信逻辑及搭建简单服务器端的全过程。具体步骤涵盖从UI设计到前后端交互的各个环节,并附有详尽示例代码,助力WPF开发者掌握这一关键技术,拓展应用程序的功能与实用性。
155 0
|
4月前
|
Rust 安全 开发者
惊爆!Xamarin 携手机器学习,开启智能应用新纪元,个性化体验与跨平台优势完美融合大揭秘!
【8月更文挑战第31天】随着互联网的发展,Web应用对性能和安全性要求不断提高。Rust凭借卓越的性能、内存安全及丰富生态,成为构建高性能Web服务器的理想选择。本文通过一个简单示例,展示如何使用Rust和Actix-web框架搭建基本Web服务器,从创建项目到运行服务器全程指导,帮助读者领略Rust在Web后端开发中的强大能力。通过实践,读者可以体验到Rust在性能和安全性方面的优势,以及其在Web开发领域的巨大潜力。
47 0