Redis系列5-实现简单消息队列 1

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 Tair(兼容Redis),内存型 2GB
简介: Redis系列5-实现简单消息队列

任务异步化

打开浏览器,输入地址,按下回车,打开了页面。于是一个HTTP请求(request)就由客户端发送到服务器,服务器处理请求,返回响应(response)内容。

我们每天都在浏览网页,发送大大小小的请求给服务器。有时候,服务器接到了请求,会发现他也需要给另外的服务器发送请求,或者服务器也需要做另外一些事情,于是最初们发送的请求就被阻塞了,也就是要等待服务器完成其他的事情。

更多的时候,服务器做的额外事情,并不需要客户端等待,这时候就可以把这些额外的事情异步去做。从事异步任务的工具有很多。主要原理还是处理通知消息,针对通知消息通常采取是队列结构。生产和消费消息进行通信和业务实现。


     基于内存的单线程数据库,使Redis的线程安全性与性能极高。而Redis的双向链表数据类型(List)天生就可作为消息队列存储消息.

在这里就不说消息队列的等等一些优点。但是补充一下Redis的List类型的几个命令,你可以指定将一个元素投送到列表的头部(左边)或者尾部(右边),当然也可以指定从列表的头部或尾部取出数据.


在项目中用到了redis作为缓存,再学习了ActiveMq之后想着用redis实现简单的消息队列,下面做记录。

  Redis的列表类型键可以用来实现队列,并且支持阻塞式读取,可以很容易的实现一个高性能的优先队列。同时在更高层面上,Redis还支持"发布/订阅"的消息模式,可以基于此构建一个聊天系统。

一、redis的列表类型天生支持用作消息队列。(类似于MQ的队列模型--任何时候都可以消费,一条消息只能消费一次)

    list操作参考:https://www.cnblogs.com/qlqwjy/p/7789125.html

   在Redis中,List类型是按照插入顺序排序的字符串链表。和数据结构中的普通链表一样,我们可以在其头部(left)和尾部(right)添加新的元素。在插入时,如果该键并不存在,Redis将为该键创建一个新的链表。与此相反,如果链表中所有的元素均被移除,那么该键也将会被从数据库中删除。List中可以包含的最大元素数量是4294967295。

     从元素插入和删除的效率视角来看,如果我们是在链表的两头插入或删除元素,这将会是非常高效的操作,即使链表中已经存储了百万条记录,该操作也可以在常量时间内完成。然而需要说明的是,如果元素插入或删除操作是作用于链表中间,那将会是非常低效的。相信对于有良好数据结构基础的开发者而言,这一点并不难理解。(类似于java的ArrayList)


redis对list的操作命令中。L表示从左边(头部)开始插与弹出,R表示从右边(尾部)开始插与弹出。

 

1.redis中简单的操作list,简单的在命令行操作实现队列

(1)从左向右插入,从右向左弹出:


127.0.0.1:6379> lpush mylist a b c d
(integer) 4
127.0.0.1:6379> lrange mylist 0 -1
1) "d"
2) "c"
3) "b"
4) "a"
127.0.0.1:6379> rpop mylist
"a"
127.0.0.1:6379> rpop mylist
"b"

 

执行完   lpush mylist a b c d  之后数据结构如下:(满足先进先出的队列模式)

 

执行完第一次:rpop mylist之后数据结构如下:

 

(2)从右向左插入,从左向右弹出:


127.0.0.1:6379> rpush mylist2 a b c d
(integer) 4
127.0.0.1:6379> lrange mylist2 0 -1
1) "a"
2) "b"
3) "c"
4) "d"
127.0.0.1:6379> lpop mylist2
"a"
127.0.0.1:6379> lpop mylist2
"b"


 

执行完:rpush mylist2 a b c d之后的数据结构如下

 

第一次执行完   lpop mylist2  之后数据结构如下:(满足先进先出的队列模式)

 

2.JAVA程序实现消息队列

redis.properties


redis.url=localhost
redis.port=6379
redis.maxIdle=30
redis.minIdle=10
redis.maxTotal=100
redis.maxWait=10000

 

获取连接的工具类:

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
/**
 * @Author: cc
 * @Description
 * @Date: 21:32 2020/10/9
 */
public class JedisPoolUtils {
    private static JedisPool pool = null;
    static {
        //加载配置文件
        InputStream in = JedisPoolUtils.class.getClassLoader().getResourceAsStream("redis.properties");
        Properties pro = new Properties();
        try {
            pro.load(in);
        } catch (IOException e) {
            e.printStackTrace();
        }
        //获得池子对象
        JedisPoolConfig poolConfig = new JedisPoolConfig();
        poolConfig.setMaxIdle(Integer.parseInt(pro.get("redis.maxIdle").toString()));//最大闲置个数
        poolConfig.setMaxWaitMillis(Integer.parseInt(pro.get("redis.maxWait").toString()));//最大闲置个数
        poolConfig.setMinIdle(Integer.parseInt(pro.get("redis.minIdle").toString()));//最小闲置个数
        poolConfig.setMaxTotal(Integer.parseInt(pro.get("redis.maxTotal").toString()));//最大连接数
        pool = new JedisPool(poolConfig, pro.getProperty("redis.url"), Integer.parseInt(pro.get("redis.port").toString()));
    }
    //获得jedis资源的方法
    public static Jedis getJedis() {
        return pool.getResource();
    }
    public static void main(String[] args) {
        Jedis jedis = getJedis();
        System.out.println(jedis);
    }
}
 (1)消息生产者:(开启5个线程生产消息)
import redis.clients.jedis.Jedis;
/**
 * @Author: cc
 * @Description
 * @Date: 21:29 2020/10/9
 */
public class MessageProducer extends Thread {
    public static final String MESSAGE_KEY = "message:queue";
    private volatile int count;
    public void putMessage(String message) {
        Jedis jedis = JedisPoolUtils.getJedis();
        Long size = jedis.lpush(MESSAGE_KEY, message);
        System.out.println(Thread.currentThread().getName() + " put message,size=" + size + ",count=" + count);
        count++;
    }
    @Override
    public synchronized void run() {
        for (int i = 0; i < 5; i++) {
            putMessage("message" + count);
        }
    }
    public static void main(String[] args) {
        MessageProducer messageProducer = new MessageProducer();
        Thread t1 = new Thread(messageProducer, "thread1");
        Thread t2 = new Thread(messageProducer, "thread2");
        Thread t3 = new Thread(messageProducer, "thread3");
        Thread t4 = new Thread(messageProducer, "thread4");
        Thread t5 = new Thread(messageProducer, "thread5");
        t1.start();
        t2.start();
        t3.start();
        t4.start();
        t5.start();
    }
}


结果:(证明了redis是单线程操作,只能一个一个操作)

thread1 put message,size=1,count=0
thread1 put message,size=2,count=1
thread1 put message,size=3,count=2
thread1 put message,size=4,count=3
thread1 put message,size=5,count=4
thread3 put message,size=6,count=5
thread3 put message,size=7,count=6
thread3 put message,size=8,count=7
thread3 put message,size=9,count=8
thread3 put message,size=10,count=9
thread4 put message,size=11,count=10
thread4 put message,size=12,count=11
thread4 put message,size=13,count=12
thread4 put message,size=14,count=13
thread4 put message,size=15,count=14
thread5 put message,size=16,count=15
 redis后台查看:
127.0.0.1:6379> lrange message:queue 0 -1
 1) "message24"
 2) "message23"
 3) "message22"
 4) "message21"
 5) "message20"
 6) "message19"
 7) "message18"
 8) "message17"
 9) "message16"
10) "message15"
11) "message14"
12) "message13"
13) "message12"
14) "message11"
15) "message10"
16) "message9"
17) "message8"
18) "message7"
19) "message6"
20) "message5"
21) "message4"
22) "message3"
23) "message2"
24) "message1"
25) "message0"


 

(2)消息消费者:(开启两个线程消费消息)

import redis.clients.jedis.Jedis;
/**
 * @Author: cc
 * @Description
 * @Date: 22:34 2020/10/9
 */
public class MessageConsumer implements Runnable {
    public static final String MESSAGE_KEY = "message:queue";
    private volatile int count;
    public void consumerMessage() {
        Jedis jedis = JedisPoolUtils.getJedis();
        String message = jedis.rpop(MESSAGE_KEY);
        System.out.println(Thread.currentThread().getName() + " consumer message,message=" + message + ",count=" + count);
        count++;
    }
    @Override
    public void run() {
        while (true) {
            consumerMessage();
        }
    }
    public static void main(String[] args) {
        MessageConsumer messageConsumer = new MessageConsumer();
        Thread t1 = new Thread(messageConsumer, "thread6");
        Thread t2 = new Thread(messageConsumer, "thread7");
        t1.start();
        t2.start();
    }
}


 

结果:(满足先进先出的规则)--虽然消息已经消费完了,但是仍然在不停的rpop,所以造成浪费


thread6 consumer message,message=message0,count=0
thread6 consumer message,message=message1,count=1
thread6 consumer message,message=message2,count=2
thread6 consumer message,message=message3,count=3
thread7 consumer message,message=message4,count=4
thread6 consumer message,message=message5,count=5
thread7 consumer message,message=message6,count=6
thread6 consumer message,message=message7,count=7
thread7 consumer message,message=message8,count=8
thread6 consumer message,message=message9,count=9
thread7 consumer message,message=message10,count=10
thread6 consumer message,message=message11,count=11
thread7 consumer message,message=message12,count=12
thread6 consumer message,message=message13,count=13
thread7 consumer message,message=message14,count=14
thread6 consumer message,message=message15,count=15
thread7 consumer message,message=message16,count=16
thread6 consumer message,message=message17,count=16
thread7 consumer message,message=message18,count=18
thread6 consumer message,message=message19,count=19
thread7 consumer message,message=message20,count=20
thread6 consumer message,message=message21,count=20
thread7 consumer message,message=message22,count=22
thread6 consumer message,message=message23,count=22
thread7 consumer message,message=message24,count=24
thread6 consumer message,message=null,count=25
thread7 consumer message,message=null,count=26
thread6 consumer message,message=null,count=27
thread7 consumer message,message=null,count=28
thread6 consumer message,message=null,count=28
thread7 consumer message,message=null,count=30
thread6 consumer message,message=null,count=31
...

 

   但上述例子中消息消费者有一个问题存在,即需要不停的调用rpop方法查看List中是否有待处理消息。每调用一次都会发起一次连接,这会造成不必要的浪费。也许你会使用Thread.sleep()等方法让消费者线程隔一段时间再消费,但这样做有两个问题:

   1)、如果生产者速度大于消费者消费速度,消息队列长度会一直增大,时间久了会占用大量内存空间。

   2)、如果睡眠时间过长,这样不能处理一些时效性的消息,睡眠时间过短,也会在连接上造成比较大的开销。

 

补充:brpop和blpop实现阻塞读取(重要)

  也就是上面的操作需要一直调用rpop命令或者lpop命令才可以实现不停的监听且消费消息。为了解决这一问题,redis提供了阻塞命令 brpop和blpop。下面以brpop命名为例进行试验:

  brpop命令可以接收多个键,其完整的命令格式为 BRPOP key [key ...] timeout,如:brpop key1 0。意义是同时检测多个键,如果所有键都没有元素则阻塞,如果其中一个有元素则从该键中弹出该元素(会按照key的顺序进行读取,可以实现具有优先级的队列)。例如下面试验:

开启两个客户端,第一个客户端中采用brpop阻塞读取两个键:

127.0.0.1:6379> brpop mylist1 mylist2 0

第二个客户端增加mylist1 :

127.0.0.1:6379> lpush mylist1 1 2
(integer) 2

 

 

则在第一个客户端显示:

127.0.0.1:6379> brpop mylist1 mylist2 0
1) "mylist1"
2) "1"
(56.31s)

 

也就是brpop会阻塞队列,并且每次也是弹出一个消息,如果没有消息会阻塞。

 

如果多个键都有元素则按照从左到右读取第一个键中的一个元素,例如我们现在queue1和queue2各自添加一个元素:

127.0.0.1:6379> lpush queue1 1 2
(integer) 2
127.0.0.1:6379> lpush queue2 3 4
(integer) 2

 

然后执行brpop命令:(会返回读取的key和value,第一个是返回的key,第二个是value)

127.0.0.1:6379> brpop queue1 queue2 2
1) "queue1"
2) "1"

 

  借此特性可以实现区分优先级的任务队列。也就是brpop会按照key的顺序依次读取一个数据。

 

 

改造上面代码实现阻塞读取:

import redis.clients.jedis.Jedis;
import java.util.List;
/**
 * @Author: cc
 * @Description
 * @Date: 22:34 2020/10/9
 */
public class MessageConsumer implements Runnable {
    public static final String MESSAGE_KEY = "message:queue";
    private volatile int count;
    private Jedis jedis = JedisPoolUtils.getJedis();
    public void consumerMessage() {
        List<String> brpop = jedis.brpop(0, MESSAGE_KEY);//0是timeout,返回的是一个集合,第一个是消息的key,第二个是消息的内容
        System.out.println(brpop);
    }
    @Override
    public void run() {
        while (true) {
            consumerMessage();
        }
    }
    public static void main(String[] args) {
        MessageConsumer messageConsumer = new MessageConsumer();
        Thread t1 = new Thread(messageConsumer, "thread6");
        Thread t2 = new Thread(messageConsumer, "thread7");
        t1.start();
        t2.start();
    }
}

  然后可以运行Customer,清空控制台,可以看到程序没有任何输出,阻塞在了brpop这儿。然后在打开Redis的客户端,输入指令client list,可以查看当前的连接个数。

  当启动生产者生产消息之后,消费者会自动消费消息,而且消费者会阻塞直到有消息。

[message:queue, message0]
[message:queue, message1]
[message:queue, message2]
[message:queue, message3]
[message:queue, message4]
[message:queue, message5]
[message:queue, message6]
[message:queue, message7]
[message:queue, message8]
[message:queue, message9]
[message:queue, message10]
[message:queue, message11]
[message:queue, message12]
[message:queue, message13]
[message:queue, message14]
[message:queue, message15]
[message:queue, message16]
[message:queue, message17]
[message:queue, message18]
[message:queue, message19]
[message:queue, message20]
[message:queue, message21]
[message:queue, message22]
[message:queue, message23]
[message:queue, message24]
相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
3月前
|
消息中间件 缓存 NoSQL
Redis 是一个高性能的键值对存储系统,常用于缓存、消息队列和会话管理等场景。
【10月更文挑战第4天】Redis 是一个高性能的键值对存储系统,常用于缓存、消息队列和会话管理等场景。随着数据增长,有时需要将 Redis 数据导出以进行分析、备份或迁移。本文详细介绍几种导出方法:1)使用 Redis 命令与重定向;2)利用 Redis 的 RDB 和 AOF 持久化功能;3)借助第三方工具如 `redis-dump`。每种方法均附有示例代码,帮助你轻松完成数据导出任务。无论数据量大小,总有一款适合你。
89 6
|
6月前
|
消息中间件 存储 负载均衡
Redis使用ZSET实现消息队列使用总结二
Redis使用ZSET实现消息队列使用总结二
87 0
|
4月前
|
消息中间件 存储 NoSQL
剖析 Redis List 消息队列的三种消费线程模型
Redis 列表(List)是一种简单的字符串列表,它的底层实现是一个双向链表。 生产环境,很多公司都将 Redis 列表应用于轻量级消息队列 。这篇文章,我们聊聊如何使用 List 命令实现消息队列的功能以及剖析消费者线程模型 。
118 20
剖析 Redis List 消息队列的三种消费线程模型
|
3月前
|
消息中间件 分布式计算 NoSQL
大数据-41 Redis 类型集合(2) bitmap位操作 geohash空间计算 stream持久化消息队列 Z阶曲线 Base32编码
大数据-41 Redis 类型集合(2) bitmap位操作 geohash空间计算 stream持久化消息队列 Z阶曲线 Base32编码
39 2
|
3月前
|
消息中间件 存储 NoSQL
python 使用redis实现支持优先级的消息队列详细说明和代码
python 使用redis实现支持优先级的消息队列详细说明和代码
59 0
|
5月前
|
消息中间件 NoSQL Redis
Redis Stream消息队列之基本语法与使用方式
这篇文章详细介绍了Redis Stream消息队列的基本语法和使用方式,包括消息的添加、读取、删除、修剪以及消费者组的使用和管理,强调了其在消息持久化和主备复制方面的优势。
94 0
|
6月前
|
消息中间件 存储 NoSQL
Redis使用ZSET实现消息队列使用总结一
Redis使用ZSET实现消息队列使用总结一
151 0
|
消息中间件 NoSQL Go
|
26天前
|
存储 缓存 NoSQL
解决Redis缓存数据类型丢失问题
解决Redis缓存数据类型丢失问题
171 85
|
1天前
|
存储 缓存 NoSQL
云端问道21期方案教学-应对高并发,利用云数据库 Tair(兼容 Redis®*)缓存实现极速响应
云端问道21期方案教学-应对高并发,利用云数据库 Tair(兼容 Redis®*)缓存实现极速响应