Beanstalkd队列 概述

简介:

本文对beanstalkd的介绍简短精炼,适合初次接触消息队列并想使用beanstalk的人。在我看过的一些介绍资料里,这篇文章介绍地最为精炼,本文大多内容修改自它:P。我也是第一次接触消息队列,使用beanstalkd的原因是组里比较资深的经理推荐,说道beanstalkd在某些大公司内都会使用,很轻量级,性能很高(是纯C写的)。


基本特性

同RabbitMQ,ZeroMQ相比,Beanstalkd是一个更加轻量级的消息队列,也可以理解为是一个工作队列,本质上是基于多个tube的workqueue,queue里存放的是job,每个job带一个递增的唯一id,以及消息体(byte[])。

Beanstalkd的job状态多样化,支持任务优先级 (priority), 延时 (delay), 超时重发 (time-to-run) 和预留 (buried), 能够很好的支持分布式的后台任务和定时任务处理。

它的内部实现采用 libevent, 服务器-客户端之间用类似 memcached 的轻量级通讯协议,因此有很高的性能。

beanstalkd 提供了 binlog 机制,启动的时候可以打开此功能。当重启 beanstalkd 时,当前任务状态能够从纪录的本地 binlog 中恢复。虽然没有单点的HA,但是带了容错。


两个重要概念

tube 类似于消息主题 (topic),在一个 Beanstalkd 中可以支持多个管道, 每个管道都有自己的发布者 (producer) 和消费者 (consumer),管道之间可以做到互不影响。

job 真正的消息体,带有唯一id(从1开始递增)和一个body(byte流),有丰富的可控状态,设计地非常好用。

下图详细阐述了job状态的流转流程,理解清楚下图对于使用beanstalkd有重要意义。


READY - 需要立即处理的任务,当延时 (DELAYED) 任务到期后会自动成为当前任务
DELAYED - 延迟执行的任务, 当消费者处理任务后, 可以用将消息再次放回 DELAYED 队列延迟执行
RESERVED - 已经被消费者获取, 正在执行的任务(将不会被别的消费者拿到)。Beanstalkd 负责检查任务是否在 TTR(time-to-run) 内完成;
BURIED - 保留的任务: 任务不会被执行,也不会消失,除非有人把它 "踢" 回队列;
DELETED - 消息被彻底删除。Beanstalkd 不再维持这些消息。

以上这些状态可以通过实际启动beanstalkd后,用telnet连接并执行一些put,reserve操作或者直接编程体验的方式得到更好的理解。


其他概念

任务优先级 (priority)
任务 (job) 可以有 0~2^32 个优先级, 0 代表最高优先级。 beanstalkd 采用最大最小堆 (Min-max heap) 处理任务优先级排序, 任何时刻调用 reserve 命令的消费者总是能拿到当前优先级最高的任务, 时间复杂度为 O(logn). 
 
延时任务 (delay)
有两种方式可以延时执行任务 (job): 生产者发布任务时指定延时;或者当任务处理完毕后, 消费者再次将任务放入队列延时执行 (RELEASE with <delay>)。这种机制可以实现分布式的 java.util.Timer,这种分布式定时任务的优势是:如果某个消费者节点故障,任务超时重发 (time-to-run) 能够保证任务转移到另外的节点执行。
 
任务超时重发 (time-to-run)
Beanstalkd 把任务返回给消费者以后:消费者必须在预设的 TTR (time-to-run) 时间内发送 delete / release/ bury 改变任务状态;否则 Beanstalkd 会认为消息处理失败,然后把任务交给另外的消费者节点执行。如果消费者预计在 TTR (time-to-run) 时间内无法完成任务, 也可以发送 touch 命令, 它的作用是让 Beanstalkd 从系统时间重新计算 TTR (time-to-run).
 
任务预留 (buried)
如果任务因为某些原因无法执行, 消费者可以把任务置为 buried 状态让 Beanstalkd 保留这些任务。管理员可以通过 peek buried 命令查询被保留的任务,并且进行人工干预。简单的, kick <n> 能够一次性把 n 条被保留的任务踢回队列。
 
Beanstalkd 协议
Beanstalkd 采用类 memcached 协议, 客户端通过文本命令与服务器交互。这些命令可以简单的分成三组: 
 
生产类 - use <tube> / put <priority> <delay> <ttr> [bytes]:  
生产者用 use 选择一个管道 (tube), 然后用 put 命令向管道发布任务 (job).
 
消费类 - watch <tubes> / reserve / delete <id> / release <id> <priority> <delay> / bury <id> / touch <id>
消费者用 watch 选择多个管道 (tube), 然后用 reserve 命令获取待执行的任务,这个命令是阻塞的。客户端直到有任务可执行才返回。当任务处理完毕后, 消费者可以彻底删除任务 (DELETE), 释放任务让别人处理 (RELEASE), 或者保留 (BURY) 任务。
 
维护类 - peek job / peek delayed / peek ready / peek buried / kick <n>
用于维护管道内的任务状态, 在不改变任务状态的条件下获取任务。可以用消费类命令改变这些任务的状态。
被保留 (buried) 的任务可以用 kick 命令 "踢" 回队列。


java客户端

在官方推荐的几个java的客户端里,我选择了 TrendrrBeanstalk,是基于MIT许可证的开源项目,源码一共六个.java文件。用起来很简单,但是 没有封装peek相关的操作。如果你有更好的java client的话欢迎交流沟通哈 :)  随意来个example。
public class BeanstalkExample {
	
	protected static Log log = LogFactory.getLog(BeanstalkExample.class);
	
	public static void main(String[] args) {
		try {
			clientExample();
			//pooledExample();
		} catch (BeanstalkException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}

	/**
	 * Example for using an unpooled client
	 * 
	 * @throws BeanstalkException
	 */
	public static void clientExample() throws BeanstalkException {
		BeanstalkClient client = new BeanstalkClient("ip", 8300, "example");
		
		//client.put(1l, 0, 5000, "this is some data".getBytes());
		log.info(client.tubeStats());
		//while(client.reserve(60) != null) {
			BeanstalkJob job = client.reserve(60);
			log.info("Get job: " + job.getId());
			client.deleteJob(job);
		//}
		
		client.close(); // closes the connection
	}

	public static void pooledExample() throws BeanstalkException {
		BeanstalkPool pool = new BeanstalkPool("jx-crm-flare00.jx.baidu.com", 8300, 30, // poolsize
				"example" // tube to use
		);
		
		BeanstalkClient client = pool.getClient();
		
		client.put(1l, 0, 5000, "this is some data".getBytes());
		BeanstalkJob job = client.reserve(60);
		
		client.deleteJob(job);
		
		client.close(); // returns the connection to the pool
	}
}


(全文完)

目录
相关文章
|
开发工具 git
iterm2 oh-my-zsh 自动提示命令
iterm2 oh-my-zsh 自动提示命令
iterm2 oh-my-zsh 自动提示命令
|
Shell Linux C语言
Makefile基础教程(自动生成依赖关系)
Makefile基础教程(自动生成依赖关系)
368 0
|
NoSQL PHP Redis
Mac PHP安装Redis扩展
php安装redis的扩展 采用pecl命令进行安装; pecl命令,在使用brew 安装php时,已经为我们安装上了,这里我们直接使用即可。 我们先进入php的bin目录看下命令是否存在,对应路径如下: cd /opt/homebrew/Cellar/php@7.3/7.3.32 这里的7.3为我通过brew install [php@7.3]安装的php具体版本号,大家可以通过ls命令查看文件夹下是否存在pecl命令
1567 0
|
2月前
|
人工智能 自然语言处理 监控
软件开发效率低,如何通过AI技术实现软件开发的全面智能化?—— 解析大模型和RAG技术的关键作用
三桥君指出大模型与RAG技术正推动软件工程智能化变革,覆盖需求分析、代码生成、测试和CI/CD全流程。AI可自动解析需求、生成代码、检测缺陷并优化部署,显著提升效率与质量。RAG技术通过检索增强生成,使AI具备知识库实时调用能力。二者的结合让开发者向架构师角色转型,降低人力成本。本文AI专家三桥君从技术融合、场景应用及行业影响展开分析,指出智能化将重塑软件开发范式,开启软件工程新黄金时代。
280 0
|
消息中间件 数据采集 关系型数据库
离线数仓(三)【业务日志采集平台搭建】(2)
离线数仓(三)【业务日志采集平台搭建】
|
人工智能 分布式计算 安全
开源无处不在,发展创新下又有何弊端
开源无处不在,发展创新下又有何弊端
2215 0
|
数据采集 存储 消息中间件
iLogtail开源之路
2022年6月底,阿里云iLogtail代码完整开源,正式发布了完整功能的iLogtail社区版。iLogtail作为阿里云SLS官方标配的采集器,多年以来一直稳定服务阿里集团、蚂蚁集团以及众多公有云上的企业客户,目前已经有千万级的安装量,每天采集数十PB的可观测数据,广泛应用于线上监控、问题分析/定位、运营分析、安全分析等多种场景。此次完整开源,iLogtail社区版首次在内核能力上与企业版完全对齐,开发者可以构建出与企业版性能相当的iLogtail云原生可观测性数据采集器。
1265 1
iLogtail开源之路
|
分布式计算 运维 大数据
MaxCompute资源管理——使用成本优化功能实现包年包月计算资源降本增效
MaxCompute提供成本优化(计算资源优化推荐)功能,可基于实际作业请求量和资源配置期望,对包年包月一级Quota类型的计算资源生成更优的资源配置方案,帮助进一步提升计算资源利用率,优化计算成本。本文我们一起通过典型场景案例来看看如何通过成本优化(计算资源优化推荐)功能提供降本增效的参考建议。
839 1
|
缓存 安全 前端开发