PHP使用Beanstalkd实例

简介: 本篇笔记记录了PHP使用Pheanstalk类连接Beanstalkd,实现任务(消息)的生产以及消费的过程

相关笔记:
Beanstalkd消息/任务队列
CentOS编译和yum安装Beanstalkd及service和systemctl管理
Composer在Windows和Linux的安装和使用
有关Beanstalkd的基本概念,编译和yum的安装方法已经在上述笔记中记录了,今天练习下PHP使用Beanstalkd的过程,我选择的是使用Pheanstalk类来连接Beanstalkd

1.使用Composer安装Pheanstalk

composer require pda/pheanstalk

2.实现代码

php查看beanstalkd状态脚本Status.php

<?php
/**
 * Created by PhpStorm.
 * User: jmsite.cn
 * Date: 2019/1/21
 * Time: 10:32
 */
require "../vendor/autoload.php";

use Pheanstalk\Pheanstalk;

$pheanstalk = new Pheanstalk('192.168.75.135',11300);
print_r($pheanstalk->stats());

生产者代码Producter.php

<?php
/**
 * Created by PhpStorm.
 * User: jmsite.cn
 * Date: 2019/1/20
 * Time: 16:30
 */
require "../vendor/autoload.php";

use Pheanstalk\Pheanstalk;

$pheanstalk = new Pheanstalk('192.168.75.135',11300);

for ($i=0;$i<50;$i++){
    $data = array(
        'key' => 'testkey'.$i,
        'value' => 'testvalue',
        'time' => time(),
    );
    $ret = $pheanstalk->putInTube('test-tube', json_encode($data), Pheanstalk::DEFAULT_PRIORITY, Pheanstalk::DEFAULT_DELAY, Pheanstalk::DEFAULT_TTR);
    var_dump($ret);
}

消费者代码Consumer.php

<?php
/**
 * Created by PhpStorm.
 * User: jmsite.cn
 * Date: 2019/1/20
 * Time: 16:31
 */
set_time_limit(0);
ini_set('default_socket_timeout', 900);
require "../vendor/autoload.php";

use Pheanstalk\Pheanstalk;

$pheanstalk = new Pheanstalk('192.168.75.135',11300);

while (true){
    $job = $pheanstalk
        ->watch('test-tube')
        ->ignore('default')
        ->reserve();
    if ($job){
        sleep(2);
        echo $job->getData();
        echo "\n";
        $pheanstalk->delete($job);
    }
}

打开命令行/终端窗口,执行生产者,会向tube写入50条任务

PS E:\repository\work\beanstalk> php .\Producter.php
int(101)
int(102)
int(103)
int(104)
int(105)
int(106)
int(107)
int(108)
int(109)
int(110)
int(111)
int(112)
int(113)
int(114)
......

由此可见,$pheanstalk->putInTube成功后返回的是job的id
查看状态

PS E:\repository\work\beanstalk> php Status.php
Pheanstalk\Response\ArrayResponse Object
(
    [_name:Pheanstalk\Response\ArrayResponse:private] => OK
    [storage:ArrayObject:private] => Array
        (
            [current-jobs-urgent] => 0
            [current-jobs-ready] => 50
            [current-jobs-reserved] => 0
            [current-jobs-delayed] => 0
            [current-jobs-buried] => 0
            ......

结果中显示处于ready待读取状态的job是50个
打开两个或以上命令行/终端窗口,执行消费者,模拟多消费者竞争
消费者1

PS E:\repository\work\beanstalk> php .\Consumer.php
{"key":"testkey0","value":"testvalue","time":1548039103}
{"key":"testkey1","value":"testvalue","time":1548039103}
{"key":"testkey2","value":"testvalue","time":1548039103}
{"key":"testkey4","value":"testvalue","time":1548039103}
{"key":"testkey6","value":"testvalue","time":1548039103}
{"key":"testkey8","value":"testvalue","time":1548039103}
{"key":"testkey10","value":"testvalue","time":1548039103}
{"key":"testkey12","value":"testvalue","time":1548039103}
{"key":"testkey14","value":"testvalue","time":1548039103}
{"key":"testkey16","value":"testvalue","time":1548039103}
{"key":"testkey18","value":"testvalue","time":1548039103}
{"key":"testkey20","value":"testvalue","time":1548039103}
{"key":"testkey22","value":"testvalue","time":1548039103}
{"key":"testkey24","value":"testvalue","time":1548039103}
{"key":"testkey26","value":"testvalue","time":1548039103}
{"key":"testkey28","value":"testvalue","time":1548039103}
{"key":"testkey30","value":"testvalue","time":1548039103}
{"key":"testkey32","value":"testvalue","time":1548039103}
{"key":"testkey34","value":"testvalue","time":1548039103}
{"key":"testkey36","value":"testvalue","time":1548039103}
{"key":"testkey38","value":"testvalue","time":1548039103}
{"key":"testkey40","value":"testvalue","time":1548039103}
{"key":"testkey42","value":"testvalue","time":1548039103}
{"key":"testkey44","value":"testvalue","time":1548039103}
{"key":"testkey46","value":"testvalue","time":1548039103}
{"key":"testkey48","value":"testvalue","time":1548039103}

消费者2

PS E:\repository\work\beanstalk> php .\Consumer.php
{"key":"testkey3","value":"testvalue","time":1548039103}
{"key":"testkey5","value":"testvalue","time":1548039103}
{"key":"testkey7","value":"testvalue","time":1548039103}
{"key":"testkey9","value":"testvalue","time":1548039103}
{"key":"testkey11","value":"testvalue","time":1548039103}
{"key":"testkey13","value":"testvalue","time":1548039103}
{"key":"testkey15","value":"testvalue","time":1548039103}
{"key":"testkey17","value":"testvalue","time":1548039103}
{"key":"testkey19","value":"testvalue","time":1548039103}
{"key":"testkey21","value":"testvalue","time":1548039103}
{"key":"testkey23","value":"testvalue","time":1548039103}
{"key":"testkey25","value":"testvalue","time":1548039103}
{"key":"testkey27","value":"testvalue","time":1548039103}
{"key":"testkey29","value":"testvalue","time":1548039103}
{"key":"testkey31","value":"testvalue","time":1548039103}
{"key":"testkey33","value":"testvalue","time":1548039103}
{"key":"testkey35","value":"testvalue","time":1548039103}
{"key":"testkey37","value":"testvalue","time":1548039103}
{"key":"testkey39","value":"testvalue","time":1548039103}
{"key":"testkey41","value":"testvalue","time":1548039103}
{"key":"testkey43","value":"testvalue","time":1548039103}
{"key":"testkey45","value":"testvalue","time":1548039103}
{"key":"testkey47","value":"testvalue","time":1548039103}
{"key":"testkey49","value":"testvalue","time":1548039103}

两个消费者竞争着完成了全部任务,由于我的beanstalkd启动时开启了binlog持久,所以beanstalkd重启后任务也不会丢失

3.需要注意的事项

1.创建job时,设置的超时时间Pheanstalk::DEFAULT_TTR一定要比消费者处理一个job的时间要长,否则job在超时之后会被tube更改为ready状态,被其他消费者获取,而此时当前消费者还在处理该job,这就出现了一个job被多个消费者重复执行的可怕现象
2.Pheanstalk的维护者发生了变化,在新版的Pheanstalk中是不支持长连接的,当客户端socket连接服务器时间超过php.ini中设置的default_socket_timeout时,如果未能从服务端tube获得job,连接将会被断开,所以消费者进程需要维护,以便在退出后可以重新开启进程,推荐使用supervisord维护消费者进程。
判断socket超时的代码

public function getLine($length = null)
    {
        $timeout = ini_get('default_socket_timeout');
        $timer   = microtime(true);
        do {
            $data = isset($length) ?
                $this->_wrapper()->fgets($this->_socket, $length) :
                $this->_wrapper()->fgets($this->_socket);

            if ($this->_wrapper()->feof($this->_socket)) {
                throw new Exception\SocketException('Socket closed by server!');
            }
            if (($data === false) && microtime(true) - $timer > $timeout) {
                $this->disconnect();
                throw new Exception\SocketException('Socket timed out!');
            }
        } while ($data === false);

        return rtrim($data);
    }

原文地址:https://www.jmsite.cn/blog-572.html

相关文章
|
10天前
|
设计模式 算法 数据库连接
PHP中的设计模式:提高代码的可维护性与扩展性本文旨在探讨PHP中常见的设计模式及其应用,帮助开发者编写出更加灵活、可维护和易于扩展的代码。通过深入浅出的解释和实例演示,我们将了解如何使用设计模式解决实际开发中的问题,并提升代码质量。
在软件开发过程中,设计模式是一套经过验证的解决方案模板,用于处理常见的软件设计问题。PHP作为流行的服务器端脚本语言,也有其特定的设计模式应用。本文将重点介绍几种PHP中常用的设计模式,包括单例模式、工厂模式和策略模式,并通过实际代码示例展示它们的具体用法。同时,我们还将讨论如何在实际项目中合理选择和应用这些设计模式,以提升代码的可维护性和扩展性。
|
13天前
|
设计模式 数据库连接 PHP
PHP中的设计模式:如何提高代码的可维护性与扩展性在软件开发领域,PHP 是一种广泛使用的服务器端脚本语言。随着项目规模的扩大和复杂性的增加,保持代码的可维护性和可扩展性变得越来越重要。本文将探讨 PHP 中的设计模式,并通过实例展示如何应用这些模式来提高代码质量。
设计模式是经过验证的解决软件设计问题的方法。它们不是具体的代码,而是一种编码和设计经验的总结。在PHP开发中,合理地使用设计模式可以显著提高代码的可维护性、复用性和扩展性。本文将介绍几种常见的设计模式,包括单例模式、工厂模式和观察者模式,并通过具体的例子展示如何在PHP项目中应用这些模式。
|
10天前
|
设计模式 SQL 安全
PHP中的设计模式:单例模式的深入探索与实践在PHP的编程实践中,设计模式是解决常见软件设计问题的最佳实践。单例模式作为设计模式中的一种,确保一个类只有一个实例,并提供全局访问点,广泛应用于配置管理、日志记录和测试框架等场景。本文将深入探讨单例模式的原理、实现方式及其在PHP中的应用,帮助开发者更好地理解和运用这一设计模式。
在PHP开发中,单例模式通过确保类仅有一个实例并提供一个全局访问点,有效管理和访问共享资源。本文详细介绍了单例模式的概念、PHP实现方式及应用场景,并通过具体代码示例展示如何在PHP中实现单例模式以及如何在实际项目中正确使用它来优化代码结构和性能。
|
5月前
|
网络安全 PHP
[网络安全/CTF] BUUCTF极客大挑战2019PHP解题详析(Dirsearch使用实例+php反序列化)
[网络安全/CTF] BUUCTF极客大挑战2019PHP解题详析(Dirsearch使用实例+php反序列化)
118 0
|
存储 网络安全 PHP
[CTF/网络安全]攻防世界unserialize3解题详析及php序列化反序列化实例讲解
序列化是指将数据结构或对象转换为可传输或可存储的格式的过程。这通常需要将数据转换为字节流或其他形式的编码格式,以便能够在不同的系统和应用程序之间进行传输或存储。
331 0
|
安全 PHP
CTF-PHP常见考点实例小结
CTF-PHP常见考点实例小结
CTF-PHP常见考点实例小结
|
缓存 移动开发 NoSQL
php结合redis实现高并发下的抢购、秒杀功能的实例
php结合redis实现高并发下的抢购、秒杀功能的实例
247 0
|
算法 PHP 数据安全/隐私保护
AES-128-CBC-Pkcs7Padding加密PHP实例
AES-128-CBC-Pkcs7Padding加密PHP实例
463 0
AES-128-CBC-Pkcs7Padding加密PHP实例
|
PHP
PHP实现Workerman实例 高性能PHP Socket即时通讯框架
PHP实现Workerman实例 高性能PHP Socket即时通讯框架
403 0
|
消息中间件 PHP Windows
PHP实现php-amqplib/php-amqplib实例RabbitMq
PHP实现php-amqplib/php-amqplib实例RabbitMq
225 0