Python通过amqp消息队列协议中的Qpid实现数据通信

简介:

简介:

    这两天看了消息队列通信,打算在配置平台上应用起来。以前用过zeromq但是这东西太快了,还有就是rabbitmq有点大,新浪的朋友推荐了qpid,简单轻便。自己总结了下文档,大家可以瞅瞅。


AMQP(消息队列协议Advanced Message Queuing Protocol)是一种消息协议 ,等同于JMS,但是JMS只是java平台的方案,AMQP是一个跨语言的协议。


AMQP 不分语言平台,主流的语言都支持,运维这边的perl,python,ruby更是支持,所以大家就放心用吧。


主流的消息队列通信类型:

1
2
3
4
5
6
7
点对点:A 发消息给 B。
广播:A 发给所有其他人的消息
组播:A 发给多个但不是所有其他人的消息。
Requester/response:类似访问网页的通信方式,客户端发请求并等待,服务端回复该请求
Pub-sub:类似杂志发行,出版杂志的人并不知道谁在看这本杂志,订阅的人并不关心谁在发表这本杂志。出版的人只管将信息发布出去,订阅的人也只在需要的时候收到该信息。
Store-and-forward:存储转发模型类似信件投递,写信的人将消息写给某人,但在将信件发出的时候,收信的人并不一定在家等待,也并不知道有消息给他。但这个消息不会丢失,会放在收信者的信箱中。这种模型允许信息的异步交换。
其他通信模型。。。


Publisher --->Exchange ---> MessageQueue --->Consumer


整个过程是异步的.Publisher,Consumer相互不知道对方的存在,Exchange负责交换/路由,依靠Routing Key,每个消息者有一个Routing Key,每个Binding将自已感兴趣的RoutingKey告诉Exchange,以便Exchange将相关的消息转发给相应的Queue !


几个概念

1
2
3
4
5
6
7
8
9
10
11
12
13
几个概念
Producer,Routing Key,Exchange,Binding,Queue,Consumer.
Producer: 消息的创建者,消息的发送者
Routing Key:唯一用来映射消息该进入哪个队列的标识
Exchange:负责消息的路由,交换
Binding:定义Queue和Exchange的映射关系
Queue:消息队列
Consumer:消息的使用者
Exchange类型
Fan-Out:类似于广播方式,不管RoutingKey
Direct:根据RoutingKey,进行关联投寄
Topic:类似于Direct,但是支持多个Key关联,以组的方式投寄。
       key以.来定义界限。类似于usea.news,usea.weather.这两个消息是一组的。


wKioL1MA096hj-AFAAENzUSW7c0736.jpg


QPID


Qpid 是 Apache 开发的一款面向对象的消息中间件,它是一个 AMQP 的实现,可以和其他符合 AMQP 协议的系统进行通信。Qpid 提供了 C++/Python/Java/C# 等主流编程语言的客户端库,安装使用非常方便。相对于其他的 AMQP 实现,Qpid 社区十分活跃,有望成为标准 AMQP 中间件产品。除了符合 AMQP 基本要求之外,Qpid 提供了很多额外的 HA 特性,非常适于集群环境下的消息通信!


基本功能外提供以下特性:


采用 Corosync(?)来保证集群环境下的Fault-tolerant(?) 特性

支持XML的Exchange,消息为XML时,彩用Xquery过滤

支持plugin

提供安全认证,可对producer/consumer提供身份认证

qpidd --port --no-data-dir --auth

port:端口

--no-data-dir:不指定数据目录

--auth:不启用安全身份认证



启动后自动创建一些Exchange,amp.topic,amp.direct,amp.fanout


tools:


Qpid-config:维护Queue,Exchange,内部配置

Qpid-route:配置broker Federation(联盟?集群?)

Qpid-tool:监控


咱们说完介绍了,这里就赶紧测试下。


服务器端的安装:


1
2
3
yum install qpid-cpp-server
yum install qpid-tools
/etc/init.d/qpidd start


发布端的测试代码

wKioL1MAxo6idPUMAAM2EoX3xTs484.jpg


一些测试代码转自: ibm 的开发社区 


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
#!/usr/bin/env python
#xiaorui.cc
#转自ibm开发社区
import  optparse, time
from qpid.messaging  import  *
from qpid.util  import  URL
from qpid.log  import  enable, DEBUG, WARN
def nameval(st):
   idx = st.find( "=" )
   if  idx >=  0 :
     name = st[ 0 :idx]
     value = st[idx+ 1 :]
   else :
     name = st
     value = None
   return  name, value
parser = optparse.OptionParser(usage= "usage: %prog [options] ADDRESS [ CONTENT ... ]" ,
                                description= "Send messages to the supplied address." )
parser.add_option( "-b" "--broker" default = "localhost" ,
                   help= "connect to specified BROKER (default %default)" )
parser.add_option( "-r" "--reconnect" , action= "store_true" ,
                   help= "enable auto reconnect" )
parser.add_option( "-i" "--reconnect-interval" , type= "float" default = 3 ,
                   help= "interval between reconnect attempts" )
parser.add_option( "-l" "--reconnect-limit" , type= "int" ,
                   help= "maximum number of reconnect attempts" )
parser.add_option( "-c" "--count" , type= "int" default = 1 ,
                   help= "stop after count messages have been sent, zero disables (default %default)" )
parser.add_option( "-t" "--timeout" , type= "float" default =None,
                   help= "exit after the specified time" )
parser.add_option( "-I" "--id" , help= "use the supplied id instead of generating one" )
parser.add_option( "-S" "--subject" , help= "specify a subject" )
parser.add_option( "-R" "--reply-to" , help= "specify reply-to address" )
parser.add_option( "-P" "--property" , dest= "properties" , action= "append" default =[],
                   meta var = "NAME=VALUE" , help= "specify message property" )
parser.add_option( "-M" "--map" , dest= "entries" , action= "append" default =[],
                   meta var = "KEY=VALUE" ,
                   help= "specify map entry for message body" )
parser.add_option( "-v" , dest= "verbose" , action= "store_true" ,
                   help= "enable logging" )
opts, args = parser.parse_args()
if  opts.verbose:
   enable( "qpid" , DEBUG)
else :
   enable( "qpid" , WARN)
if  opts.id  is  None:
   spout_id = str(uuid4())
else :
   spout_id = opts.id
if  args:
   addr = args.pop( 0 )
else :
   parser.error( "address is required" )
content = None
if  args:
   text =  " " .join(args)
else :
   text = None
if  opts.entries:
   content = {}
   if  text:
     content[ "text" ] = text
   for  in  opts.entries:
     name, val = nameval(e)
     content[name] = val
else :
   content = text
conn = Connection(opts.broker,
                   reconnect=opts.reconnect,
                   reconnect_interval=opts.reconnect_interval,
                   reconnect_limit=opts.reconnect_limit)
try :
   conn.open()
   ssn = conn.session()
   snd = ssn.sender(addr)
   count =  0
   start = time.time()
   while  (opts.count ==  0  or count < opts.count) and \
         (opts.timeout  is  None or time.time() - start < opts.timeout):
     msg = Message(subject=opts.subject,
                   reply_to=opts.reply_to,
                   content=content)
     msg.properties[ "spout-id" ] =  "%s:%s"  % (spout_id, count)
     for  in  opts.properties:
       name, val = nameval(p)
       msg.properties[name] = val
     snd.send(msg)
     count +=  1
     print msg
except SendError, e:
   print e
except KeyboardInterrupt:
   pass
conn.close()


客户端的测试代码:

wKiom1MAxznzudhOAARm6bOwuRE241.jpg


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
#!/usr/bin/env python
#xiaorui.cc
##转自ibm开发社区
import  optparse
from  qpid.messaging  import  *
from  qpid.util  import  URL
from  qpid.log  import  enable, DEBUG, WARN
parser  =  optparse.OptionParser(usage = "usage: %prog [options] ADDRESS ..." ,
                                description = "Drain messages from the supplied address." )
parser.add_option( "-b" "--broker" , default = "localhost" ,
                   help = "connect to specified BROKER (default %default)" )
parser.add_option( "-c" "--count" type = "int" ,
                   help = "number of messages to drain" )
parser.add_option( "-f" "--forever" , action = "store_true" ,
                   help = "ignore timeout and wait forever" )
parser.add_option( "-r" "--reconnect" , action = "store_true" ,
                   help = "enable auto reconnect" )
parser.add_option( "-i" "--reconnect-interval" type = "float" , default = 3 ,
                   help = "interval between reconnect attempts" )
parser.add_option( "-l" "--reconnect-limit" type = "int" ,
                   help = "maximum number of reconnect attempts" )
parser.add_option( "-t" "--timeout" type = "float" , default = 0 ,
                   help = "timeout in seconds to wait before exiting (default %default)" )
parser.add_option( "-p" "--print" , dest = "format" , default = "%(M)s" ,
                   help = "format string for printing messages (default %default)" )
parser.add_option( "-v" , dest = "verbose" , action = "store_true" ,
                   help = "enable logging" )
opts, args  =  parser.parse_args()
if  opts.verbose:
   enable( "qpid" , DEBUG)
else :
   enable( "qpid" , WARN)
if  args:
   addr  =  args.pop( 0 )
else :
   parser.error( "address is required" )
if  opts.forever:
   timeout  =  None
else :
   timeout  =  opts.timeout
class  Formatter:
   def  __init__( self , message):
     self .message  =  message
     self .environ  =  { "M" self .message,
                     "P" self .message.properties,
                     "C" self .message.content}
   def  __getitem__( self , st):
     return  eval (st,  self .environ)
conn  =  Connection(opts.broker,
                   reconnect = opts.reconnect,
                   reconnect_interval = opts.reconnect_interval,
                   reconnect_limit = opts.reconnect_limit)
try :
   conn. open ()
   ssn  =  conn.session()
   rcv  =  ssn.receiver(addr)
   count  =  0
   while  not  opts.count  or  count < opts.count:
     try :
       msg  =  rcv.fetch(timeout = timeout)
       print  opts. format  %  Formatter(msg)
       count  + =  1
       ssn.acknowledge()
     except  Empty:
       break
except  ReceiverError, e:
   print  e
except  KeyboardInterrupt:
   pass
conn.close()


Browse 模式的意思是,浏览的意思,一个特殊的需求,我访问了一次,别人也能访问。

Consume 模式的意思是,我浏览了一次后,删除这一条。别人就访问不到啦。

这个是浏览模式:

wKiom1MAx87zwLEFAAbCk_wE0PY041.jpg


sub-pub 通信的例子


Pub-sub 是另一种很有用的通信模型。恐怕它的名字就源于出版发行这种现实中的信息传递方式吧,publisher 就是出版商,subscriber 就是订阅者。


1
2
3
4
5
服务端
qpid-config add exchange topic news-service
./spout news-service/news xiaorui.cc
客户端:
./drain -t  120  news-service/#.news


PUB端,创建TOPIC点 !

wKiom1MAzGDg3nZbAAPRnFen8Y4988.jpg


SUB端,也就是接收端!

wKiom1MAzG2Tsf9YAAO76LbxAS8638.jpg


总结:

 qpid挺好用的,比rabbitmq要轻型,比zeromq保险点! 各方面的文档也都很健全,值得一用。    话说,这三个消息队列我也都用过,最一开始用的是redis的pubsub做日志收集和信息通知,后来在做集群相关的项目的时候,我自己搞了一套zeromq的分布式任务分发,和saltstack很像,当然了远没有万人用的salt强大。  rabbitmq的用法,更是看中他的安全和持久化,当然性能真的不咋地。

关于qpid的性能我没有亲自做大量的测试,但是听朋友说,加持久化可以到7k,不加持久化可以到1500左右。






 本文转自 rfyiamcool 51CTO博客,原文链接:http://blog.51cto.com/rfyiamcool/1359631,如需转载请自行联系原作者


相关文章
|
5月前
|
数据采集 存储 JSON
从零到一构建网络爬虫帝国:HTTP协议+Python requests库深度解析
【7月更文挑战第31天】在网络数据的海洋中,使用Python的`requests`库构建网络爬虫就像探索未知的航船。HTTP协议指导爬虫与服务器交流,收集信息。HTTP请求包括请求行、头和体,响应则含状态行、头和体。`requests`简化了发送各种HTTP请求的过程。
93 4
|
2月前
|
测试技术 网络安全 数据安全/隐私保护
Paramiko是一个用于处理SSHv2协议的Python库
Paramiko是一个用于处理SSHv2协议的Python库
37 3
|
3月前
|
数据采集 JSON API
🎓Python网络请求新手指南:requests库带你轻松玩转HTTP协议
本文介绍Python网络编程中不可或缺的HTTP协议基础,并以requests库为例,详细讲解如何执行GET与POST请求、处理响应及自定义请求头等操作。通过简洁易懂的代码示例,帮助初学者快速掌握网络爬虫与API开发所需的关键技能。无论是安装配置还是会话管理,requests库均提供了强大而直观的接口,助力读者轻松应对各类网络编程任务。
123 3
|
3月前
|
机器学习/深度学习 JSON API
HTTP协议实战演练场:Python requests库助你成为网络数据抓取大师
在数据驱动的时代,网络数据抓取对于数据分析、机器学习等至关重要。HTTP协议作为互联网通信的基石,其重要性不言而喻。Python的`requests`库凭借简洁的API和强大的功能,成为网络数据抓取的利器。本文将通过实战演练展示如何使用`requests`库进行数据抓取,包括发送GET/POST请求、处理JSON响应及添加自定义请求头等。首先,请确保已安装`requests`库,可通过`pip install requests`进行安装。接下来,我们将逐一介绍如何利用`requests`库探索网络世界,助你成为数据抓取大师。在实践过程中,务必遵守相关法律法规和网站使用条款,做到技术与道德并重。
57 2
|
3月前
|
数据采集 存储 JSON
从零到一构建网络爬虫帝国:HTTP协议+Python requests库深度解析
在网络数据的海洋中,网络爬虫遵循HTTP协议,穿梭于互联网各处,收集宝贵信息。本文将从零开始,使用Python的requests库,深入解析HTTP协议,助你构建自己的网络爬虫帝国。首先介绍HTTP协议基础,包括请求与响应结构;然后详细介绍requests库的安装与使用,演示如何发送GET和POST请求并处理响应;最后概述爬虫构建流程及挑战,帮助你逐步掌握核心技术,畅游数据海洋。
75 3
|
3月前
|
数据采集 网络协议 API
HTTP协议大揭秘!Python requests库实战,让网络请求变得简单高效
【9月更文挑战第13天】在数字化时代,互联网成为信息传输的核心平台,HTTP协议作为基石,定义了客户端与服务器间的数据传输规则。直接处理HTTP请求复杂繁琐,但Python的`requests`库提供了一个简洁强大的接口,简化了这一过程。HTTP协议采用请求与响应模式,无状态且结构化设计,使其能灵活处理各种数据交换。
86 8
|
2月前
|
消息中间件 存储 NoSQL
python 使用redis实现支持优先级的消息队列详细说明和代码
python 使用redis实现支持优先级的消息队列详细说明和代码
42 0
|
2月前
|
缓存 网络协议 Linux
Python渗透测试之ARP毒化和协议应用
Python渗透测试之ARP毒化和协议应用
36 0
|
3月前
|
Python
HTTP协议不再是迷!Python网络请求实战,带你走进网络世界的奥秘
本文介绍了HTTP协议,它是互联网信息传递的核心。作为客户端与服务器通信的基础,HTTP请求包括请求行、头和体三部分。通过Python的`requests`库,我们可以轻松实现HTTP请求。本文将指导你安装`requests`库,并通过实战示例演示如何发送GET和POST请求。无论你是想获取网页内容还是提交表单数据,都能通过简单的代码实现。希望本文能帮助你在Python网络请求的道路上迈出坚实的一步。
71 0