beanstalkd 消息队列的第一手资料

简介:
beanstalk 消息队列 小结 
协议说明和各状态转换情况
基本知识点:
  1. 对于beanstalk 消息队列中每条数据都为 job
  2. beanstalk service端 ,会维护 tubes[多个管道]
  3. client端可以监听,使用多 tube
  4. client端可以指定 use 管道[ client生成一个新的job时会把此job提交到 指定管道]
  5. client端可以指定 watch 管道 [ client接收处理job时会到 指定管道得到待处理的job]
官方示意图:
put            reserve               delete
-----> [READY] ---------> [RESERVED] --------> *poof*
一般情况:
1. 任务提交到service端,job 管理放入内存空间并为其标记状态 [READY] 
2. client通过轮训竞争得到次状态, job 改为  [RESERVED]
   2.1 当在默认时间 120 秒内没处理完 , job.stats.timeouts 就会大于 0 
      同时其他 轮训竞争client会拿到这个job【 注意了 每次timeouts时,在轮训的客户端就会得到次job,状态都为 ready,timeouts>0 】
3. 随便其中一台client处理完 job.delete   , 其他 client 中的此job 都会    *poof*  
deom - python beanstalkc 中 job.stats 参考:
使用 easy_install beanstalkc 
API 参考 : http://github.com/earl/beanstalkc/blob/master/TUTORIAL
刚生成的 beanstalk
{'buries': 0, 'releases': 0, 'tube': 'default', 'timeouts': 0, 'ttr': 120, 
'age': 6, 'pri': 2147483648L, 'delay': 0, ' state ': ' reserved ', ' time-left ':  114
'kicks': 0, 'id': 2}
以timeout了的 beanstalk,并且在其他client轮训到 job
{'buries': 0, 'releases': 0, 'tube': 'default', 'timeouts': 1, 'ttr': 120, 
'age': 417, 'pri': 2147483648L, 'delay': 0, ' state ': ' reserved ', ' time-left ':  110
'kicks': 0, 'id': 2}
{'buries': 0, 'releases': 0, 'tube': 'default', 'timeouts': 1, 'ttr': 120, 'age': 415, 
'pri': 2147483648L, 'delay': 0, ' state ': ' reserved ', ' time-left ':  4294967163L
'kicks': 0, 'id': 2}
当没所有client 的 job 都到期 了 状态
{'buries': 0, 'releases': 0, 'tube': 'default', 'timeouts': 2, 'ttr': 120, 
'age': 417, 'pri': 2147483648L, 'delay': 0, ' state ': ' ready ', ' time-left ':  4294967161L
'kicks': 0, 'id': 2}
{'buries': 0, 'releases': 0, 'tube': 'default', 'timeouts': 2, 'ttr': 120, 'age': 415, 
'pri': 2147483648L, 'delay': 0, ' state ': ' ready ', ' time-left ':  4294967163L
'kicks': 0, 'id': 2}
其中 client1 job.delete
client1 job.stats  *poof*
client2 job.stats  *poof*
比较全的状态说明 - [官方文档]
http://github.com/kr/beanstalkd/blob/v1.1/doc/protocol.txt?raw=true
官方示意图:
  

先简单说明下(完全自己理解的,欢迎拍砖。本人E人太差~看官档费劲,谅解下): 
job.stats状态 = [READY] 待处理,  [RESERVED] 正处理, [DELAYED]延迟状态 ,  [BURIED] 隐藏状态
1. 延迟提交
py.client1.put>>> beanstalk.put('yes!', delay=10)
py.client3.reserve>>> job = beanstalk.reserve()
# 等待 10  秒
2. 管道测试
put-job到service端 可以指定 put的tube管道
如: 
py.client1.put>>> beanstalk.use('foo') 
py.client1.put>>> beanstalk.put('hey!')
py.client2.reserve>>> job = beanstalk.reserve()
# 一直拥塞,应为 他 watch 管道 'default'
py.client3.reserve>>> beanstalk.watch('foo')
# beanstalk.ignore('bar') 放弃监听 bar
py.client3.reserve>>> job = beanstalk.reserve()
py.client3.reserve>>> job.body #输出 'hey!' 
3. 隐藏状态 现在吧 client 1/2/3 的 use watch 的管道都调回 default
py.client2.reserve>>> job = beanstalk.reserve()
py.client3.reserve>>> job = beanstalk.reserve()
py.client1.put>>> beanstalk.put('隐藏状态!')
py.client2.reserve>>> job.bury() #2 轮训得到 并且 修改 job 为隐藏状态
# 120 秒后 client3 没有轮训得到 此job 
py.client2.reserve>>> job.stats() 
{'buries': 1, 'releases': 0, 'tube': 'default', 'timeouts': 0, 'ttr': 120, 
'age': 188, 'pri': 2147483648L, 'delay': 0, 'state': 'buried',
'time-left': 4294967228L, 'kicks': 0, 'id': 11}
py.client2.reserve>>> beanstalk.kick( job.stats()['id'] ) #修改状态为 reserved
# 立刻 client3 得到 job
py.client3.reserve>>> job.stats()
{'buries': 1, 'releases': 0, 'tube': 'default', 'timeouts': 0, 'ttr': 120, 'age': 313, 
'pri': 2147483648L, 'delay': 0, 'state': 'reserved', 
'time-left': 110, 'kicks': 1, 'id': 11}
# 这时候 client2 / 3 同时 有 job 11 状态 'buries': 1,'timeouts': 0,'state': 'reserved'
4. peek 窥见
  可以得到 一个 stats - read 的 job ,其他 client 可以 job = beanstalk.reserve() 
  后马上 job.stats 会变成  [RESERVED] 
  py.client2.reserve>>> job = beanstalk.peek_ready()
  取得 job 并看 本 client 能 处理能
>>> job = beanstalk.peek(3)
>>> job.body
    'yes!'
>>> job.stats()['state']
    'ready'
这种形式西 job 不能 bury 等修改状态,但 可以 delete
peek 系类
peek_buried

peek_ready

本文转自博客园刘凯毅的博客,原文链接:beanstalkd 消息队列的第一手资料,如需转载请自行联系原博主。

目录
相关文章
|
4月前
|
消息中间件 网络性能优化 开发工具
消息队列 MQ使用问题之如何确保消息的唯一性
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
4月前
|
消息中间件 C语言 RocketMQ
消息队列 MQ操作报错合集之出现"Connection reset by peer"的错误,该如何处理
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
4月前
|
消息中间件 Java C语言
消息队列 MQ使用问题之在使用C++客户端和GBase的ESQL进行编译时出现core dump,该怎么办
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
17天前
|
消息中间件 存储 Kafka
MQ 消息队列核心原理,12 条最全面总结!
本文总结了消息队列的12个核心原理,涵盖消息顺序性、ACK机制、持久化及高可用性等内容。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
|
2月前
|
消息中间件
手撸MQ消息队列——循环数组
队列是一种常用的数据结构,类似于栈,但采用先进先出(FIFO)的原则。生活中常见的排队场景就是队列的应用实例。在数据结构中,队列通常用数组实现,包括入队(队尾插入元素)和出队(队头移除元素)两种基本操作。本文介绍了如何用数组实现队列,包括定义数组长度、维护队头和队尾下标(front 和 tail),并通过取模运算解决下标越界问题。此外,还讨论了队列的空与满状态判断,以及并发和等待机制的实现。通过示例代码展示了队列的基本操作及优化方法,确保多线程环境下的正确性和高效性。
37 0
手撸MQ消息队列——循环数组
|
3月前
|
消息中间件 存储 缓存
一个用过消息队列的人,竟不知为何要用 MQ?
一个用过消息队列的人,竟不知为何要用 MQ?
157 1
|
4月前
|
消息中间件 开发工具 RocketMQ
消息队列 MQ使用问题之一直连接master失败,是什么原因
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
4月前
|
消息中间件 Prometheus 监控
消息队列 MQ使用问题之如何将旧集群的store目录迁移到新集群
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
4月前
|
消息中间件 安全 PHP
消息队列 MQ使用问题之如何获取PHP客户端代码
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。