redis实现队列

本文涉及的产品
云数据库 Redis 版,社区版 2GB
推荐场景:
搭建游戏排行榜
简介: Redis设计用来做缓存的,但是由于它自身的某种特性使得它可以用来做消息队列。它有几个阻塞式的API可以使用,正是这些阻塞式的API让其有能力做消息队列; 另外,做消息队列的其他特性例如FIFO(先入先出)也很容易实现,只需要一个list对象从头取数据,从尾部塞数据即可。


image.png

Redis设计用来做缓存的,但是由于它自身的某种特性使得它可以用来做消息队列。

它有几个阻塞式的API可以使用,正是这些阻塞式的API让其有能力做消息队列;

另外,做消息队列的其他特性例如FIFO(先入先出)也很容易实现,只需要一个list对象从头取数据,从尾部塞数据即可。

Redis能做消息队列还得益于其list对象blpop brpop接口以及Pub/Sub(发布/订阅)的某些接口,它们都是阻塞版的,所以可以用来做消息队列。(List : lpush / rpop)

一.生产者消费者模式

1.简介

1.使用list结构作为队列,rpush生产消息,lpop消费消息,当lpop没有消息的时候,要适当sleep一会再重试。

或者,不用sleep,直接用blpop指令,在没有消息的时候,它会阻塞住直到消息到来。但是redis没有akc功能

2.代码

1. @Component
2. @RequestMapping("/RedisApplication")
3. public class RedisApplication {
4. 
5. @Autowired
6. private RedisTemplate redisTemplate;
7. 
8. @RequestMapping(value = "/testFIFO")
9. public void testFIFO() throws InterruptedException {
10.         System.out.println("---------------开始放入队列--------------");
11. for (int i = 0; i < 5; i++) {
12. String arg = "key" + i;
13.             redisTemplate.opsForList().leftPush("FIFOKEY", arg);
14.         }
15.         System.out.println("----------------放入队列停止------------");
16. while (true) {
17. Object outKey = redisTemplate.opsForList().rightPop("FIFOKEY");
18. if (outKey != null) {
19.                 System.out.println(outKey);
20.             } else {
21.                 Thread.sleep(500);
22.             }
23.         }
24.     }
25. }

二.发布订阅者模式

1.简介

使用pub/pub/sub主题订阅者模式,可以实现1:N的消息队列。

缺点:在消费者下线的情况下,生产的消息会丢失。此场景,建议用MQ。

2.代码

被请求的接口(被订阅者),使用redis队列,放于队列中。

1. package com.airboot.bootdemo.controller;
2. 
3. import org.springframework.beans.factory.annotation.Autowired;
4. import org.springframework.data.redis.core.RedisTemplate;
5. import org.springframework.web.bind.annotation.GetMapping;
6. import org.springframework.web.bind.annotation.RequestMapping;
7. import org.springframework.web.bind.annotation.RestController;
8. 
9. 
10. @RestController
11. @RequestMapping("/SentRedisController")
12. public class SentRedisController {
13. 
14. @Autowired
15. private RedisTemplate redisTemplate;
16. 
17. /**
18.      * redis生产者测试
19.      * @param data
20.      * @return
21.      */
22. @GetMapping("/send1")
23.     String send1(String data) {
24.         redisTemplate.convertAndSend("testkafka", data);
25. return "success";
26.     }
27. /**
28.      * redis生产者测试
29.      * @param data
30.      * @return
31.      */
32. @GetMapping("/send2")
33.     String send2(String data) {
34.         redisTemplate.convertAndSend("testkafka1", data);
35. return "success";
36.     }
37. }

配置监听器。监听队列。

1. package com.airboot.bootdemo.config;
2. 
3. import com.airboot.bootdemo.controller.RedisSubscriber;
4. import com.airboot.bootdemo.controller.RedisSubscriberTwo;
5. import org.springframework.beans.factory.annotation.Autowired;
6. import org.springframework.context.annotation.Bean;
7. import org.springframework.context.annotation.Configuration;
8. import org.springframework.data.redis.connection.RedisConnectionFactory;
9. import org.springframework.data.redis.core.RedisTemplate;
10. import org.springframework.data.redis.listener.PatternTopic;
11. import org.springframework.data.redis.listener.RedisMessageListenerContainer;
12. import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
13. import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
14. import org.springframework.data.redis.serializer.StringRedisSerializer;
15. 
16. 
17. @Configuration
18. public class RedisConfig {
19. 
20. @Autowired
21. private RedisTemplate redisTemplate;
22. 
23. //序列化通用设置
24. @Bean
25. public RedisTemplate redisTemplateInit() {
26.         Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
27. // 设置序列化Key的实例化对象
28.         redisTemplate.setKeySerializer(new StringRedisSerializer());
29. // 设置序列化Value的实例化对象
30.         redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());
31.         redisTemplate.setHashKeySerializer(jackson2JsonRedisSerializer);
32.         redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);
33. return redisTemplate;
34.     }
35. 
36. //配置监听 配置使类RedisSubscriber RedisSubscriberTwo 去监听testkafka1, 
37. //testkafka这两个队列
38. @Bean
39.     RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
40.                                             RedisSubscriber listenerAdapter,
41.                                             RedisSubscriberTwo listenerAdapter2){
42.         RedisMessageListenerContainer container = new RedisMessageListenerContainer();
43.         container.setConnectionFactory(connectionFactory);
44. //订阅了一个叫chat 的通道
45.         container.addMessageListener(listenerAdapter, new PatternTopic("testkafka"));
46.         container.addMessageListener(listenerAdapter, new PatternTopic("testkafka1"));//配置要订阅的订阅项
47.         container.addMessageListener(listenerAdapter2, new PatternTopic("testkafka"));//配置要订阅的订阅项
48. //这个container 可以添加多个 messageListener
49. return container;
50.     }
51. 
52. }

可以多个消费者通过上面配置多个类的监听就可以。

1. package com.airboot.bootdemo.controller;
2. 
3. import org.springframework.beans.factory.annotation.Autowired;
4. import org.springframework.data.redis.connection.Message;
5. import org.springframework.data.redis.core.RedisTemplate;
6. import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
7. import org.springframework.stereotype.Component;
8. 
9. @Component
10. public class RedisSubscriberTwo extends MessageListenerAdapter {
11. 
12. @Autowired
13. private RedisTemplate<String, String> redisTemplate;
14. 
15. @Override
16.     public void onMessage(Message message, byte[] bytes) {
17. System.out.println(message);
18.         byte[] body = message.getBody();
19.         byte[] channel = message.getChannel();
20. String msg = redisTemplate.getStringSerializer().deserialize(body);
21. String topic = redisTemplate.getStringSerializer().deserialize(channel);
22. System.out.println("监听到topic为2" + topic + "的消息:" + msg);
23.     }
24. }
25. 
26. 
27. package com.airboot.bootdemo.controller;
28. 
29. import org.springframework.beans.factory.annotation.Autowired;
30. import org.springframework.data.redis.connection.Message;
31. import org.springframework.data.redis.core.RedisTemplate;
32. import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
33. import org.springframework.stereotype.Component;
34. 
35. @Component
36. public class RedisSubscriber extends MessageListenerAdapter {
37. //
38. @Autowired
39. private RedisTemplate<String, String> redisTemplate;
40. 
41. @Override
42.     public void onMessage(Message message, byte[] bytes) {
43. System.out.println(message);
44.         byte[] body = message.getBody();
45.         byte[] channel = message.getChannel();
46. String msg = redisTemplate.getStringSerializer().deserialize(body);
47. String topic = redisTemplate.getStringSerializer().deserialize(channel);
48. System.out.println("监听到topic为" + topic + "的消息:" + msg);
49.     }
50. }

三.延时队列

1.简介

上面的例子我们已经了一个简易的消息队列。我们继续思考一个现实的场景,假定这些是一些游戏商品,它需要添加"延迟销售"特性,在未来某个时候才可以开始处理这些游戏商品数据。 那么要实现这个延迟的特性,我们需要修改现有队列的实现。

  1. 在消息数据的信息中包含延迟处理消息的执行时间,如果工作进程发现消息的执行时间还没到,那么它将会在短暂的等待之后重新把消息数据推入队列中。(延迟发送消息)
  2. 使用有序集合来存储这些需要延时消费的消息数据,将任务的执行时间设置为分值,在开启一个工作进程查找有序集合里面是否有可以立刻执行的任务,如果有的话就从有序集合中移除消息并且消费。

2.代码

1. package com.airboot.bootdemo.utils;
2. 
3. import com.alibaba.fastjson.JSON;
4. import com.alibaba.fastjson.TypeReference;
5. import org.springframework.data.redis.core.RedisTemplate;
6. import org.springframework.data.redis.core.ZSetOperations;
7. 
8. import java.lang.reflect.Type;
9. import java.util.Set;
10. import java.util.UUID;
11. 
12. 
13. public class RedisDelayQueue<T> {
14. 
15. 
16. private String queueKey;
17. 
18. public RedisTemplate redisTemplate;
19. 
20. // fastjson 序列化对象中存在 generic 类型时,需要使用 TypeReference
21. private Type TaskType = new TypeReference<TaskItem<T>>() {
22.     }.getType();
23. 
24. public RedisDelayQueue(RedisTemplate redisTemplate, String queueKey) {
25. this.queueKey = queueKey;
26. this.redisTemplate = redisTemplate;
27.     }
28. 
29. static class TaskItem<T> {
30. public String id;
31. public T msg;
32.     }
33. 
34. public void delay(T msg) {
35. TaskItem<T> item = new TaskItem<T>();
36. //分配唯一的uuid
37.         item.id = UUID.randomUUID().toString();
38.         item.msg = msg;
39. //fastjson序列化
40. String s = JSON.toJSONString(item);
41.         ZSetOperations operations = redisTemplate.opsForZSet();
42. //塞入延时队列,5s后再试
43.         operations.add(queueKey, s, System.currentTimeMillis() + 5000);
44.     }
45. 
46. public void loop() {
47. while (!Thread.interrupted()) {
48. //只取一条
49. Set<String> values = redisTemplate.opsForZSet().rangeByScore(queueKey, 0, System.currentTimeMillis(), 0, 1);
50. if (values.isEmpty()) {
51. try {
52. //歇会继续
53. Thread.sleep(500);
54.                 } catch (InterruptedException e) {
55. break;
56.                 }
57. continue;
58.             }
59. String s = values.iterator().next();
60. if (redisTemplate.opsForZSet().remove(queueKey, s) > 0) {
61. //多进程同时调用,只有一个会remove成功
62. TaskItem<T> task = JSON.parseObject(s, TaskType);
63. //执行业务逻辑
64. handleTask(task.msg);
65.             }
66.         }
67.     }
68. 
69. private void handleTask(T msg) {
70. System.out.println(msg);
71.     }
72. }

调用。

1. @RequestMapping("/testDelayQueue")
2. public void testDelayQueue() {
3. RedisDelayQueue queue = new RedisDelayQueue(redisTemplate, "DelayQueue");
4. Thread producer = new Thread() {
5. 
6. @Override
7. public void run() {
8. for (int i = 0; i < 10; i++) {
9.                     queue.delay("DelayQueue" + i);
10.                 }
11.             }
12. 
13.         };
14. Thread consumer = new Thread() {
15. 
16. @Override
17. public void run() {
18.                 queue.loop();
19.             }
20. 
21.         };
22.         producer.start();
23.         consumer.start();
24. try {
25.             producer.join();
26.             Thread.sleep(6000);
27.             consumer.interrupt();
28.             consumer.join();
29.         } catch (InterruptedException e) {
30.         }
31.     }

3.原理

主要就是几个语句:

1. 放入redis中 ,权值为当前时间后5秒
2. operations.add(queueKey, s, System.currentTimeMillis() + 5000);
1. //获取时间小于当前时间的 在set中的第一条数据 所以在5秒之后就获取到了上面那条数据
2. Set<String> values = redisTemplate.opsForZSet().rangeByScore(queueKey, 0, System.currentTimeMillis(), 0, 1);
1. 移除上面查到的数据 模仿数据被消费
2. redisTemplate.opsForZSet().remove(queueKey, s)


相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
1月前
|
消息中间件 NoSQL Java
别再用 Redis List 实现消息队列了,Stream 专为队列而生
别再用 Redis List 实现消息队列了,Stream 专为队列而生
92 0
|
1月前
|
存储 NoSQL API
【小小思考】Redis实现去重任务队列
【2月更文挑战第1天】思考一下如何用Redis实现去重的任务队列,主要有List 、List + Set/Hash/Bloom Filter、ZSet、Lua和开源库等方式。
124 1
|
6月前
|
NoSQL Redis
redis队列
redis队列
32 0
|
8月前
|
消息中间件 NoSQL Java
Redis实现延迟队列,我研究了两种方案,发现并不简单
前段时间有个小项目需要使用延迟任务,谈到延迟任务,我脑子第一时间一闪而过的就是使用消息队列来做,比如RabbitMQ的死信队列又或者RocketMQ的延迟队列,但是奈何这是一个小项目,并没有引入MQ,我也不太想因为一个延迟任务就引入MQ,增加系统复杂度,所以这个方案直接就被pass了。
|
10月前
|
NoSQL Go Redis
Asynq: 基于Redis实现的Go生态分布式任务队列和异步处理库
Asynq: 基于Redis实现的Go生态分布式任务队列和异步处理库
348 0
|
10月前
|
NoSQL Go Redis
Redis与异步队列
使用Redis可以很方便地实现异步队列。
72 0
|
11月前
|
存储 NoSQL Go
基于redis实现延迟队列
基于redis实现延迟队列
|
11月前
|
消息中间件 监控 NoSQL
RocketMq普通消息,死信队列,消息幂等性(redis)
RocketMQ作为一款纯java、分布式、队列模型的开源消息中间件,支持事务消息、顺序消息、批量消息、定时消息、消息回溯等。
188 0
|
消息中间件 存储 缓存
把Redis当作队列来用,真的合适吗?下
把Redis当作队列来用,真的合适吗?下
189 1
把Redis当作队列来用,真的合适吗?下
|
消息中间件 NoSQL JavaScript
用 Redis 实现延迟队列,我研究了两种方案,发现并不简单 下
用 Redis 实现延迟队列,我研究了两种方案,发现并不简单 下

热门文章

最新文章