Redis设计用来做缓存的,但是由于它自身的某种特性使得它可以用来做消息队列。
它有几个阻塞式的API可以使用,正是这些阻塞式的API让其有能力做消息队列;
另外,做消息队列的其他特性例如FIFO(先入先出)也很容易实现,只需要一个list对象从头取数据,从尾部塞数据即可。
Redis能做消息队列还得益于其list对象blpop brpop接口以及Pub/Sub(发布/订阅)的某些接口,它们都是阻塞版的,所以可以用来做消息队列。(List : lpush / rpop)
一.生产者消费者模式
1.简介
1.使用list结构作为队列,rpush生产消息,lpop消费消息,当lpop没有消息的时候,要适当sleep一会再重试。
或者,不用sleep,直接用blpop指令,在没有消息的时候,它会阻塞住直到消息到来。但是redis没有akc功能
2.代码
1. @Component 2. @RequestMapping("/RedisApplication") 3. public class RedisApplication { 4. 5. @Autowired 6. private RedisTemplate redisTemplate; 7. 8. @RequestMapping(value = "/testFIFO") 9. public void testFIFO() throws InterruptedException { 10. System.out.println("---------------开始放入队列--------------"); 11. for (int i = 0; i < 5; i++) { 12. String arg = "key" + i; 13. redisTemplate.opsForList().leftPush("FIFOKEY", arg); 14. } 15. System.out.println("----------------放入队列停止------------"); 16. while (true) { 17. Object outKey = redisTemplate.opsForList().rightPop("FIFOKEY"); 18. if (outKey != null) { 19. System.out.println(outKey); 20. } else { 21. Thread.sleep(500); 22. } 23. } 24. } 25. }
二.发布订阅者模式
1.简介
使用pub/pub/sub主题订阅者模式,可以实现1:N的消息队列。
缺点:在消费者下线的情况下,生产的消息会丢失。此场景,建议用MQ。
2.代码
被请求的接口(被订阅者),使用redis队列,放于队列中。
1. package com.airboot.bootdemo.controller; 2. 3. import org.springframework.beans.factory.annotation.Autowired; 4. import org.springframework.data.redis.core.RedisTemplate; 5. import org.springframework.web.bind.annotation.GetMapping; 6. import org.springframework.web.bind.annotation.RequestMapping; 7. import org.springframework.web.bind.annotation.RestController; 8. 9. 10. @RestController 11. @RequestMapping("/SentRedisController") 12. public class SentRedisController { 13. 14. @Autowired 15. private RedisTemplate redisTemplate; 16. 17. /** 18. * redis生产者测试 19. * @param data 20. * @return 21. */ 22. @GetMapping("/send1") 23. String send1(String data) { 24. redisTemplate.convertAndSend("testkafka", data); 25. return "success"; 26. } 27. /** 28. * redis生产者测试 29. * @param data 30. * @return 31. */ 32. @GetMapping("/send2") 33. String send2(String data) { 34. redisTemplate.convertAndSend("testkafka1", data); 35. return "success"; 36. } 37. }
配置监听器。监听队列。
1. package com.airboot.bootdemo.config; 2. 3. import com.airboot.bootdemo.controller.RedisSubscriber; 4. import com.airboot.bootdemo.controller.RedisSubscriberTwo; 5. import org.springframework.beans.factory.annotation.Autowired; 6. import org.springframework.context.annotation.Bean; 7. import org.springframework.context.annotation.Configuration; 8. import org.springframework.data.redis.connection.RedisConnectionFactory; 9. import org.springframework.data.redis.core.RedisTemplate; 10. import org.springframework.data.redis.listener.PatternTopic; 11. import org.springframework.data.redis.listener.RedisMessageListenerContainer; 12. import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer; 13. import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer; 14. import org.springframework.data.redis.serializer.StringRedisSerializer; 15. 16. 17. @Configuration 18. public class RedisConfig { 19. 20. @Autowired 21. private RedisTemplate redisTemplate; 22. 23. //序列化通用设置 24. @Bean 25. public RedisTemplate redisTemplateInit() { 26. Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class); 27. // 设置序列化Key的实例化对象 28. redisTemplate.setKeySerializer(new StringRedisSerializer()); 29. // 设置序列化Value的实例化对象 30. redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer()); 31. redisTemplate.setHashKeySerializer(jackson2JsonRedisSerializer); 32. redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer); 33. return redisTemplate; 34. } 35. 36. //配置监听 配置使类RedisSubscriber RedisSubscriberTwo 去监听testkafka1, 37. //testkafka这两个队列 38. @Bean 39. RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, 40. RedisSubscriber listenerAdapter, 41. RedisSubscriberTwo listenerAdapter2){ 42. RedisMessageListenerContainer container = new RedisMessageListenerContainer(); 43. container.setConnectionFactory(connectionFactory); 44. //订阅了一个叫chat 的通道 45. container.addMessageListener(listenerAdapter, new PatternTopic("testkafka")); 46. container.addMessageListener(listenerAdapter, new PatternTopic("testkafka1"));//配置要订阅的订阅项 47. container.addMessageListener(listenerAdapter2, new PatternTopic("testkafka"));//配置要订阅的订阅项 48. //这个container 可以添加多个 messageListener 49. return container; 50. } 51. 52. }
可以多个消费者通过上面配置多个类的监听就可以。
1. package com.airboot.bootdemo.controller; 2. 3. import org.springframework.beans.factory.annotation.Autowired; 4. import org.springframework.data.redis.connection.Message; 5. import org.springframework.data.redis.core.RedisTemplate; 6. import org.springframework.data.redis.listener.adapter.MessageListenerAdapter; 7. import org.springframework.stereotype.Component; 8. 9. @Component 10. public class RedisSubscriberTwo extends MessageListenerAdapter { 11. 12. @Autowired 13. private RedisTemplate<String, String> redisTemplate; 14. 15. @Override 16. public void onMessage(Message message, byte[] bytes) { 17. System.out.println(message); 18. byte[] body = message.getBody(); 19. byte[] channel = message.getChannel(); 20. String msg = redisTemplate.getStringSerializer().deserialize(body); 21. String topic = redisTemplate.getStringSerializer().deserialize(channel); 22. System.out.println("监听到topic为2" + topic + "的消息:" + msg); 23. } 24. } 25. 26. 27. package com.airboot.bootdemo.controller; 28. 29. import org.springframework.beans.factory.annotation.Autowired; 30. import org.springframework.data.redis.connection.Message; 31. import org.springframework.data.redis.core.RedisTemplate; 32. import org.springframework.data.redis.listener.adapter.MessageListenerAdapter; 33. import org.springframework.stereotype.Component; 34. 35. @Component 36. public class RedisSubscriber extends MessageListenerAdapter { 37. // 38. @Autowired 39. private RedisTemplate<String, String> redisTemplate; 40. 41. @Override 42. public void onMessage(Message message, byte[] bytes) { 43. System.out.println(message); 44. byte[] body = message.getBody(); 45. byte[] channel = message.getChannel(); 46. String msg = redisTemplate.getStringSerializer().deserialize(body); 47. String topic = redisTemplate.getStringSerializer().deserialize(channel); 48. System.out.println("监听到topic为" + topic + "的消息:" + msg); 49. } 50. }
三.延时队列
1.简介
上面的例子我们已经了一个简易的消息队列。我们继续思考一个现实的场景,假定这些是一些游戏商品,它需要添加"延迟销售"特性,在未来某个时候才可以开始处理这些游戏商品数据。 那么要实现这个延迟的特性,我们需要修改现有队列的实现。
- 在消息数据的信息中包含延迟处理消息的执行时间,如果工作进程发现消息的执行时间还没到,那么它将会在短暂的等待之后重新把消息数据推入队列中。(延迟发送消息)
- 使用有序集合来存储这些需要延时消费的消息数据,将任务的执行时间设置为分值,在开启一个工作进程查找有序集合里面是否有可以立刻执行的任务,如果有的话就从有序集合中移除消息并且消费。
2.代码
1. package com.airboot.bootdemo.utils; 2. 3. import com.alibaba.fastjson.JSON; 4. import com.alibaba.fastjson.TypeReference; 5. import org.springframework.data.redis.core.RedisTemplate; 6. import org.springframework.data.redis.core.ZSetOperations; 7. 8. import java.lang.reflect.Type; 9. import java.util.Set; 10. import java.util.UUID; 11. 12. 13. public class RedisDelayQueue<T> { 14. 15. 16. private String queueKey; 17. 18. public RedisTemplate redisTemplate; 19. 20. // fastjson 序列化对象中存在 generic 类型时,需要使用 TypeReference 21. private Type TaskType = new TypeReference<TaskItem<T>>() { 22. }.getType(); 23. 24. public RedisDelayQueue(RedisTemplate redisTemplate, String queueKey) { 25. this.queueKey = queueKey; 26. this.redisTemplate = redisTemplate; 27. } 28. 29. static class TaskItem<T> { 30. public String id; 31. public T msg; 32. } 33. 34. public void delay(T msg) { 35. TaskItem<T> item = new TaskItem<T>(); 36. //分配唯一的uuid 37. item.id = UUID.randomUUID().toString(); 38. item.msg = msg; 39. //fastjson序列化 40. String s = JSON.toJSONString(item); 41. ZSetOperations operations = redisTemplate.opsForZSet(); 42. //塞入延时队列,5s后再试 43. operations.add(queueKey, s, System.currentTimeMillis() + 5000); 44. } 45. 46. public void loop() { 47. while (!Thread.interrupted()) { 48. //只取一条 49. Set<String> values = redisTemplate.opsForZSet().rangeByScore(queueKey, 0, System.currentTimeMillis(), 0, 1); 50. if (values.isEmpty()) { 51. try { 52. //歇会继续 53. Thread.sleep(500); 54. } catch (InterruptedException e) { 55. break; 56. } 57. continue; 58. } 59. String s = values.iterator().next(); 60. if (redisTemplate.opsForZSet().remove(queueKey, s) > 0) { 61. //多进程同时调用,只有一个会remove成功 62. TaskItem<T> task = JSON.parseObject(s, TaskType); 63. //执行业务逻辑 64. handleTask(task.msg); 65. } 66. } 67. } 68. 69. private void handleTask(T msg) { 70. System.out.println(msg); 71. } 72. }
调用。
1. @RequestMapping("/testDelayQueue") 2. public void testDelayQueue() { 3. RedisDelayQueue queue = new RedisDelayQueue(redisTemplate, "DelayQueue"); 4. Thread producer = new Thread() { 5. 6. @Override 7. public void run() { 8. for (int i = 0; i < 10; i++) { 9. queue.delay("DelayQueue" + i); 10. } 11. } 12. 13. }; 14. Thread consumer = new Thread() { 15. 16. @Override 17. public void run() { 18. queue.loop(); 19. } 20. 21. }; 22. producer.start(); 23. consumer.start(); 24. try { 25. producer.join(); 26. Thread.sleep(6000); 27. consumer.interrupt(); 28. consumer.join(); 29. } catch (InterruptedException e) { 30. } 31. }
3.原理
主要就是几个语句:
1. 放入redis中 ,权值为当前时间后5秒 2. operations.add(queueKey, s, System.currentTimeMillis() + 5000);
1. //获取时间小于当前时间的 在set中的第一条数据 所以在5秒之后就获取到了上面那条数据 2. Set<String> values = redisTemplate.opsForZSet().rangeByScore(queueKey, 0, System.currentTimeMillis(), 0, 1);
1. 移除上面查到的数据 模仿数据被消费 2. redisTemplate.opsForZSet().remove(queueKey, s)