PHP使用Redis的List(列表)命令实现消息队列

简介: 本篇笔记旨在使用Redis的List(列表)命令实现消息队列,生产者使用lPush命令发布消息,消费者使用rpoplpush命令获取消息,同时将消息放入监听队列,如果处理超时,监听者将把消息弹回消息队列

1.用到的List(列表)命令

命令 作用
lPush 将一个或多个值插入到列表头部
rpoplpush 弹出列表最后一个值,同时插入到另一个列表头部,并返回该值
lRem 删除列表内的给定值
lIndex 按索引获取列表内的值

2.队列的组成

名称 职责
生产者 发布消息
消费者 获取并处理消息
监听者 监听超时的消息,弹回原消息队列,确保消费者挂掉后或处理失败后消息能被其他消费者处理

3.php实现代码

生产者Producter.php

<?php
/**
 * Created by PhpStorm.
 * User: jmsite.cn
 * Date: 2019/1/26
 * Time: 0:13
 */
try {
    //声明消息队列-list的键名
    $queueKey = 'testQueueKey';
    $redis = new Redis();
    $redis->connect('192.168.75.132', 6379);
    //向列表中push10条消息
    for ($i = 0;$i < 10;$i++){
        //为消息生成唯一标识
        $uniqid = uniqid(mt_rand(10000, 99999).getmypid().memory_get_usage(), true);
        $ret = $redis->lPush($queueKey, json_encode(array('uniqid' => $uniqid, 'key' => 'key-'.$i, 'value' => 'data')));
        var_dump($ret);
    }

} catch (Exception $e){
    echo $e->getMessage();
}

消费者Consumer.php

<?php
/**
 * Created by PhpStorm.
 * User: jmsite.cn
 * Date: 2019/1/26
 * Time: 0:14
 */
try {
    //声明消息队列-list的键名
    $queueKey = 'testQueueKey';
    //声明监听者队列-list的键名
    $watchQueueKey = 'watchQueueKey';
    $redis = new Redis();
    $redis->connect('192.168.75.132', 6379);
    //队列先进先出,弹出最先加入的消息,同时放入监听队列
    while (true){
        $ret = $redis->rpoplpush($queueKey, $watchQueueKey);
        if ($ret === false){
            sleep(1);
        } else {
            $retArray = json_decode($ret, true);
            //将唯一id写入缓存设置有效期
            $redis->setex($retArray['uniqid'], 60, 0);
            //模拟失败
            $rand = mt_rand(0,9);
            if ($rand < 3){
                echo "failure:".$ret."\n";
            } else {
                //todo
                //处理成功移除消息
                $redis->lRem($watchQueueKey, $ret, 0);
                echo "success:".$ret."\n";
            }
        }
    }

} catch (Exception $e){
    echo $e->getMessage();
}

监听者Watcher.php

<?php
/**
 * Created by PhpStorm.
 * User: jmsite.cn
 * Date: 2019/1/26
 * Time: 0:15
 */
try {
    //声明消息队列-list的键名
    $queueKey = 'testQueueKey';
    //声明监听者队列-list的键名
    $watchQueueKey = 'watchQueueKey';
    $redis = new Redis();
    $redis->connect('192.168.75.132', 6379);

    while (true){
        //取出列表尾部的一个值
        $ret = $redis->lIndex($watchQueueKey, -1);
        //如果不存在则休眠1秒
        if ($ret === false){
            sleep(1);
        } else {
            $retArray = json_decode($ret, true);
            $idCache = $redis->get($retArray['uniqid']);
            if ($idCache === false){
                //如果已过期,表示任务超时,弹回原队列
                $redis->rpoplpush($watchQueueKey, $queueKey);
                echo "rpoplpush:".$ret."\n";
            } else {
                //处理中,继续等待
                sleep(1);
            }
        }
    }

} catch (Exception $e){
    echo $e->getMessage();
}

4.执行队列

开启监听者php Watcher.php
开启消费者php Consumer.php
执行生产者php Producter.php
生产者输出

int(1)
int(2)
int(3)
int(4)
int(5)
int(6)
int(7)
int(8)
int(9)
int(10)

监听者输出

rpoplpush:{"uniqid":"28580267323642245c4bde640dd8f3.30292468","key":"key-1","value":"data"}
rpoplpush:{"uniqid":"10258267323642245c4bde640e1cd9.95656605","key":"key-4","value":"data"}
rpoplpush:{"uniqid":"43356267323642245c4bde640e88e9.50566706","key":"key-5","value":"data"}
rpoplpush:{"uniqid":"59823267323642245c4bde640e98b5.51512314","key":"key-6","value":"data"}
rpoplpush:{"uniqid":"83293267323642245c4bde640ed753.04622366","key":"key-9","value":"data"}
rpoplpush:{"uniqid":"59823267323642245c4bde640e98b5.51512314","key":"key-6","value":"data"}

消费者输出

success:{"uniqid":"47280267323557445c4bde640dbfb4.78962728","key":"key-0","value":"data"}
failure:{"uniqid":"28580267323642245c4bde640dd8f3.30292468","key":"key-1","value":"data"}
success:{"uniqid":"39394267323642245c4bde640de992.34641654","key":"key-2","value":"data"}
success:{"uniqid":"41335267323642245c4bde640df980.38466514","key":"key-3","value":"data"}
failure:{"uniqid":"10258267323642245c4bde640e1cd9.95656605","key":"key-4","value":"data"}
failure:{"uniqid":"43356267323642245c4bde640e88e9.50566706","key":"key-5","value":"data"}
failure:{"uniqid":"59823267323642245c4bde640e98b5.51512314","key":"key-6","value":"data"}
success:{"uniqid":"43817267323642245c4bde640ec189.44008738","key":"key-7","value":"data"}
success:{"uniqid":"69276267323642245c4bde640ecb91.04877522","key":"key-8","value":"data"}
failure:{"uniqid":"83293267323642245c4bde640ed753.04622366","key":"key-9","value":"data"}
success:{"uniqid":"28580267323642245c4bde640dd8f3.30292468","key":"key-1","value":"data"}
success:{"uniqid":"10258267323642245c4bde640e1cd9.95656605","key":"key-4","value":"data"}
success:{"uniqid":"43356267323642245c4bde640e88e9.50566706","key":"key-5","value":"data"}
failure:{"uniqid":"59823267323642245c4bde640e98b5.51512314","key":"key-6","value":"data"}
success:{"uniqid":"83293267323642245c4bde640ed753.04622366","key":"key-9","value":"data"}
success:{"uniqid":"59823267323642245c4bde640e98b5.51512314","key":"key-6","value":"data"}

我们看到消费者第一次执行时失败的消息,超时后又被弹回了消息队列,消费者有了再次执行的机会,监听者的职责就是确保消费者执行失败或挂掉后消息还能再弹回原队列得到再次执行
原文地址:https://www.jmsite.cn/blog-615.html

相关文章
|
5月前
|
存储 缓存 监控
Redis设计与实现——Redis命令参考与高级特性
Redis 是一个高性能的键值存储系统,支持丰富的数据类型(字符串、列表、哈希、集合等)和多种高级功能。本文档涵盖 Redis 的核心命令分类,包括数据类型操作、事务与脚本、持久化、集群管理、系统监控等。特别介绍了事务的原子性特性、Lua 脚本的执行方式及优势、排序机制、发布订阅模型以及慢查询日志和监视器工具的使用方法。适用于开发者快速掌握 Redis 常用命令及其应用场景,优化系统性能与可靠性。
|
1月前
|
存储 缓存 NoSQL
Redis基础命令与数据结构概览
Redis是一个功能强大的键值存储系统,提供了丰富的数据结构以及相应的操作命令来满足现代应用程序对于高速读写和灵活数据处理的需求。通过掌握这些基础命令,开发者能够高效地对Redis进行操作,实现数据存储和管理的高性能方案。
79 12
|
1月前
|
存储 消息中间件 NoSQL
【Redis】常用数据结构之List篇:从常用命令到典型使用场景
本文将系统探讨 Redis List 的核心特性、完整命令体系、底层存储实现以及典型实践场景,为读者构建从理论到应用的完整认知框架,助力开发者在实际业务中高效运用这一数据结构解决问题。
|
2月前
|
存储 缓存 人工智能
Redis六大常见命令详解:从set/get到过期策略的全方位解析
本文将通过结构化学习路径,帮助读者实现从命令语法掌握到工程化实践落地的能力跃迁,系统性提升 Redis 技术栈的应用水平。
|
3月前
|
NoSQL Redis
Lua脚本协助Redis分布式锁实现命令的原子性
利用Lua脚本确保Redis操作的原子性是分布式锁安全性的关键所在,可以大幅减少由于网络分区、客户端故障等导致的锁无法正确释放的情况,从而在分布式系统中保证数据操作的安全性和一致性。在将这些概念应用于生产环境前,建议深入理解Redis事务与Lua脚本的工作原理以及分布式锁的可能问题和解决方案。
154 8
|
5月前
|
存储 缓存 NoSQL
Redis中的常用命令-get&set&keys&exists&expire&ttl&type的详细解析
总的来说,这些Redis命令提供了处理存储在内存中的键值对的便捷方式。通过理解和运用它们,你可以更有效地在Redis中操作数据,使其更好地服务于你的应用。
382 17
|
5月前
|
消息中间件 NoSQL Linux
Redis的基本介绍和安装方式(包括Linux和Windows版本),以及常用命令的演示
Redis(Remote Dictionary Server)是一个高性能的开源键值存储数据库。它支持字符串、列表、散列、集合等多种数据类型,具有持久化、发布/订阅等高级功能。由于其出色的性能和广泛的使用场景,Redis在应用程序中常作为高速缓存、消息队列等用途。
879 16
|
PHP
php 执行命令函数
/** Method to execute a command in the terminal Uses : 1. system 2. passthru 3.
982 0
|
1月前
|
关系型数据库 MySQL PHP
PHP和Mysql前后端交互效果实现
本文介绍了使用PHP连接MySQL数据库的基本函数及其实现案例。内容涵盖数据库连接、选择数据库、执行查询、获取结果等常用操作,并通过用户登录和修改密码的功能实例,展示了PHP与MySQL的交互过程及代码实现。
176 0
PHP和Mysql前后端交互效果实现
|
6月前
|
关系型数据库 MySQL Linux
查看Linux、Apache、MySQL、PHP版本的技巧
以上就是查看Linux、Apache、MySQL、PHP版本信息的方法。希望这些信息能帮助你更好地理解和使用你的LAMP技术栈。
313 17