beanstalkd消息队列在生产环境的应用

简介:

       Beanstalkd 是一个高性能的消息队列中间件,本博文宅鸟将介绍一下这个东东的使用。

一、先通过概念让大家了解Beanstalkd的特性和工作场景。

        Beanstalkd 是一个轻量级消息中间件,它最大特点是将自己定位为基于管道  (tube) 和任务 (job) 的工作队列 (work-queue):

Beanstalkd 支持任务优先级 (priority), 延时 (delay), 超时重发 (time-to-run) 和预留 (buried), 能够很好的支持分布式的后台任务和定时任务处理。

它的内部实现采用 libevent, 服务器-客户端之间用类似 memcached 的轻量级通讯协议,具有有很高的性能。

尽管是内存队列, beanstalkd 提供了 binlog 机制, 当重启 beanstalkd 时,当前任务状态能够从纪录的本地 binlog 中恢复。

管道 (tube):

       管道类似于消息主题 (topic), 在一个 Beanstalkd 中可以支持多个管道, 每个管道都有自己的发布者 (producer) 和消费者 (consumer). 管道之间互相不影响。

任务 (job):

       Beanstalkd 用任务 (job) 代替消息 (message) 的概念。与消息不同,任务有一系列状态:

Beanstalkd


READY- 需要立即处理的任务,当延时 (DELAYED) 任务到期后会自动成为当前任务;

DELAYED- 延迟执行的任务, 当消费者处理任务后, 可以用将消息再次放回 DELAYED 队列延迟执行;

RESERVED- 已经被消费者获取, 正在执行的任务。Beanstalkd 负责检查任务是否在 TTR(time-to-run) 内完成;

BURIED- 保留的任务: 任务不会被执行,也不会消失,除非有人把它 "踢" 回队列;

DELETED- 消息被彻底删除。Beanstalkd 不再维持这些消息。

任务优先级 (priority):

       任务 (job) 可以有 0~2^32 个优先级, 0 代表最高优先级。 beanstalkd 采用最大最小堆 (Min-max heap) 处理任务优先级排序, 任何时刻调用 reserve 命令的消费者总是能拿到当前优先级最高的任务, 时间复杂度为 O(logn).

延时任务 (delay):

       有两种方式可以延时执行任务 (job): 生产者发布任务时指定延时;或者当任务处理完毕后, 消费者再次将任务放入队列延时执行 (RELEASE with <delay>)。这种机制可以实现分布式的 java.util.Timer,这种分布式定时任务的优势是:如果某个消费者节点故障,任务超时重发 (time-to-run) 能够保证任务转移到另外的节点执行。

任务超时重发 (time-to-run):

       Beanstalkd 把任务返回给消费者以后:消费者必须在预设的 TTR (time-to-run) 时间内发送 delete / release/ bury 改变任务状态;否则 Beanstalkd 会认为消息处理失败,然后把任务交给另外的消费者节点执行。如果消费者预计在 TTR (time-to-run) 时间内无法完成任务, 也可以发送 touch 命令, 它的作用是让 Beanstalkd 从系统时间重新计算 TTR (time-to-run).

任务预留 (buried):

       如果任务因为某些原因无法执行, 消费者可以把任务置为 buried 状态让 Beanstalkd 保留这些任务。管理员可以通过 peek buried 命令查询被保留的任务,并且进行人工干预。简单的, kick <n> 能够一次性把 n 条被保留的任务踢回队列。

Beanstalkd 协议:

       Beanstalkd 采用类 memcached 协议, 客户端通过文本命令与服务器交互。这些命令可以简单的分成三组:    

       生产类 - use <tube> / put <priority> <delay> <ttr> [bytes]:  

       生产者用 use 选择一个管道 (tube), 然后用 put 命令向管道发布任务 (job).    

       消费类 - watch <tubes> / reserve / delete <id> / release <id> <priority> <delay> / bury <id> / touch <id>

       消费者用 watch 选择多个管道 (tube), 然后用 reserve 命令获取待执行的任务,这个命令是阻塞的。客户端直到有任务可执行才返回。当任务处理完毕后, 消费者可以彻底删除任务 (DELETE), 释放任务让别人处理 (RELEASE), 或者保留 (BURY) 任务。

       维护类 - peek job / peek delayed / peek ready / peek buried / kick <n>

用于维护管道内的任务状态, 在不改变任务状态的条件下获取任务。可以用消费类命令改变这些任务的状态。

被保留 (buried) 的任务可以用 kick 命令 "踢" 回队列。

          协议文档: https://raw.github.com/kr/beanstalkd/master/doc/protocol.txt


Beanstalkd 不足:

Beanstalkd 没有提供主备同步 + 故障切换机制, 在应用中有成为单点的风险。实际应用中,可以用数据库为任务 (job) 提供持久化存储。

Beanstalkd


另外, 和 memcached 类似, Beanstalkd 依赖 libevent 的单线程事件分发机制, 不能有效利用多核 cpu 的性能。这一点可以通过单机部署多个实例克服。


二、部署安装:

Beanstalkd 的安装非常简单:

在Ubuntu和debian下使用下面命令:

1
sudo  apt-get  install  beanstalkd

安装后编辑配置文件:

1
vim  /etc/default/beanstalkd

163040445.jpg

把START=NO改为:START=yes即可

更多关于安装可以参考官网



通过命令可以启动、停止Beanstalk


1
2
3
/etc/init .d /beanstalkd  start
lsof  -i:11300
/etc/init .d /beanstalkd  stop


163604656.jpg


启动后,就可以通过客户端进行调用了:

Beanstalk支持多种客户端语言:

php,java,perl,c,c++,lua,python,go,ruby等等(了解更多可以来官网)。

我们将通过php给大家介绍在生产环境下面的使用。

就拿录视频制程序使用到的Beanstalk来给大家介绍:

先介绍一下程序结构:

视频录制程序分为两个方面,一个是产生录制任务的脚本(生产者),还有一个处理录制任务脚本(消费者)。

首先把php的客户端下载后,加入到项目中。下面把代码贴出来:


生产者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#!/usr/bin/php
<?php
require_once  'Configuration.php' ;
require_once  'Record.class.php' ;
require_once  'BeanStalk.class.php' ;
$now =time();
$model  new  RecordModel ();
$records = $model ->checkStartRecord( $now );
//print_r($records);
//exit();
$beanstalk  = BeanStalk::open (  array  (
             'servers'  =>  array  (
                 Configuration:: $record_config [ 'beanStak' ]
                 ),
             'select'  =>  'random peek'
             ) );
$beanstalk ->use_tube (  'records'  );
foreach  $records  as  $record  ) {
     $beanstalk ->put ( 0, 0, 10, json_encode (  $record  ) );
}
?>


消费者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
<?php
require_once ( 'config.php' );
require_once ( 'func.php' );
require_once ( 'BeanStalk.class.php' );
$beanstalk  = BeanStalk::open( array (
             'servers'        =>  array $config [ 'beanStak' ] ),
             'select'         =>  'random peek'
             ));
$beanstalk ->watch( 'records' );
while (true){
     //$beanstalk->watch('records');
     $job  $beanstalk ->reserve_with_timeout();
     if ( is_object ( $job )){
         $data = $job ->get();
         $json =json_decode( $data ,true);
         print_r( $json );
         if (! empty ( $json [ "live_name" ])&&! empty ( $json [ "start_time" ])&&! empty ( $json [ "end_time" ])&&! empty ( $json [ "vod_id" ])){
             //print_r($json);
             if (! empty ( $json [ "afterplay" ])&& $json [ "afterplay" ]==1)
             $cmd = "{$config['afterplaycmd']} {$json[" live_name "]} {$json[" vod_id "]} {$json['start_time']} {$json['end_time']}" ;
             else
             $cmd = "{$config['recordcmd']} {$json[" live_name "]} {$json[" vod_id "]} {$json['start_time']} {$json['end_time']}" ;
                
             echo  $cmd ;
             $chkcmd = "ps -ef |grep '" . $cmd . "'  |grep -v 'grep'|wc -l" ;
             //$chkcmd="ps -ef |wc -l";
             //echo $chkcmd;
             $count =system( $chkcmd );
             //echo $count;
             if ( $count ==0)
             {
                 //system($cmd);
                 exec ( $cmd , $res , $rc );
                 //print_r($res);
                 //print_r($rc);
             }
             Beanstalk:: delete ( $job );    // Delete the job.
             $info = array ();
             $info [ "vod_id" ]= $json [ 'vod_id' ];
             $info [ "record_msg" ]= "startjob" ;
             $data = array ();
             $data [ "type" ]= "reciveRecords" ;
             $data [ "message" ]= $info ;
             $url = $config [ 'recordStatus' ];
             $httpcode  = 200;
             $result  = test_api( $httpcode , $url , "post" ,json_encode( $data ));
             print_r( $data );
         }
         //$beanstalk->watch('records');
     }
     sleep(1);
}
?>

下面我们介绍一个可以管理Beanstalk的php工具,地址如下

https://github.com/jimbojsb/bstools

把该工具安装后,就可以查看Beanstalk的各种情况了


165839569.jpg


165651192.jpg


到此结束,不足之处欢迎拍砖




本文转自birdinroom 51CTO博客,原文链接:http://blog.51cto.com/birdinroom/1344109,如需转载请自行联系原作者

相关文章
|
7月前
|
消息中间件 分布式计算 监控
Python面试:消息队列(RabbitMQ、Kafka)基础知识与应用
【4月更文挑战第18天】本文探讨了Python面试中RabbitMQ与Kafka的常见问题和易错点,包括两者的基础概念、特性对比、Python客户端使用、消息队列应用场景及消息可靠性保证。重点讲解了消息丢失与重复的避免策略,并提供了实战代码示例,帮助读者提升在分布式系统中使用消息队列的能力。
243 2
|
7月前
|
消息中间件 监控 大数据
Kafka消息队列架构与应用场景探讨:面试经验与必备知识点解析
【4月更文挑战第9天】本文详尽探讨了Kafka的消息队列架构,包括Broker、Producer、Consumer、Topic和Partition等核心概念,以及消息生产和消费流程。此外,还介绍了Kafka在微服务、实时数据处理、数据管道和数据仓库等场景的应用。针对面试,文章解析了Kafka与传统消息队列的区别、实际项目挑战及解决方案,并展望了Kafka的未来发展趋势。附带Java Producer和Consumer的代码示例,帮助读者巩固技术理解,为面试做好准备。
749 0
|
7月前
|
消息中间件 NoSQL Java
Redis Streams在Spring Boot中的应用:构建可靠的消息队列解决方案【redis实战 二】
Redis Streams在Spring Boot中的应用:构建可靠的消息队列解决方案【redis实战 二】
1152 1
|
6月前
|
消息中间件 Java RocketMQ
消息队列 MQ产品使用合集之当SpringBoot应用因网络不通而启动失败时,该如何解决
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
6月前
|
消息中间件 存储 Java
Java中的消息队列应用与性能优化
Java中的消息队列应用与性能优化
|
6月前
|
消息中间件 Arthas 监控
消息队列 MQ产品使用合集之每次重置reconsumeTimes就无法达到死信阈值,重试次数是否就要应用方控制
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
消息队列 MQ产品使用合集之每次重置reconsumeTimes就无法达到死信阈值,重试次数是否就要应用方控制
|
5月前
|
消息中间件 监控 Java
在Java应用中实现微服务间的消息队列通信
在Java应用中实现微服务间的消息队列通信
|
5月前
|
消息中间件 存储 Java
Java中的消息队列应用与性能优化
Java中的消息队列应用与性能优化
|
7月前
|
消息中间件 存储 传感器
Kafka消息队列原理及应用详解
【5月更文挑战第6天】Apache Kafka是高性能的分布式消息队列,常用于实时数据管道和流应用。它提供高性能、持久化、分布式和可伸缩的消息处理,支持解耦、异步通信和流量控制。Kafka的核心概念包括Broker、Topic、Partition、Producer、Consumer和Consumer Group。其特点是高吞吐、低延迟、数据持久化、分布式架构和容错性。常见应用包括实时数据流处理、日志收集、消息传递和系统间数据交换。
|
6月前
|
消息中间件 Serverless Windows
消息队列 MQ产品使用合集之MQTT协议是否可以应用于社交软件的系统通知场景
阿里云消息队列MQ(Message Queue)是一种高可用、高性能的消息中间件服务,它允许您在分布式应用的不同组件之间异步传递消息,从而实现系统解耦、流量削峰填谷以及提高系统的可扩展性和灵活性。以下是使用阿里云消息队列MQ产品的关键点和最佳实践合集。