开发者社区> 邴越> 正文

Redis笔记(七)Java实现Redis消息队列

简介: 这里我使用Redis的发布、订阅功能实现简单的消息队列,基本的命令有publish、subscribe等。 在Jedis中,有对应的java方法,但是只能发布字符串消息。为了传输对象,需要将对象进行序列化,并封装成字符串进行处理。
+关注继续查看

这里我使用Redis的发布、订阅功能实现简单的消息队列,基本的命令有publish、subscribe等。

在Jedis中,有对应的java方法,但是只能发布字符串消息。为了传输对象,需要将对象进行序列化,并封装成字符串进行处理。


使用Redis实现消息队列

1.封装一个消息对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class Message implements Serializable{
 
private static final long serialVersionUID = 1L;
 
private String titile;
private String info;
 
public Message(String titile,String info){
this.titile=titile;
this.info=info;
}
 
public String getTitile() {
return titile;
}
public void setTitile(String titile) {
this.titile = titile;
}
public String getInfo() {
return info;
}
public void setInfo(String info) {
this.info = info;
}
}

  

2.为这个消息对象提供序列化方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class MessageUtil {
 
//convert To String
public static String convertToString(Object obj,String charset) throws IOException{
 
ByteArrayOutputStream bo = new ByteArrayOutputStream();
ObjectOutputStream oo = new ObjectOutputStream(bo);
oo.writeObject(obj);
String str = bo.toString(charset);
bo.close();
oo.close();
return str;
}
 
//convert To Message
public static Object convertToMessage(byte[] bytes) throws Exception{
ByteArrayInputStream in = new ByteArrayInputStream(bytes);
ObjectInputStream sIn = new ObjectInputStream(in);
return sIn.readObject();
 
}
}

  

3.从Jedis连接池中获取连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class RedisUtil {
 
/**
* Jedis connection pool
* @Title: config
*/
public static JedisPool getJedisPool(){
ResourceBundle bundle=ResourceBundle.getBundle("redis");
String host=bundle.getString("host");
int port=Integer.valueOf(bundle.getString("port"));
int timeout=Integer.valueOf(bundle.getString("timeout"));
//  String password=bundle.getString("password");
 
JedisPoolConfig config=new JedisPoolConfig();
config.setMaxActive(Integer.valueOf(bundle.getString("maxActive")));
config.setMaxWait(Integer.valueOf(bundle.getString("maxWait")));
config.setTestOnBorrow(Boolean.valueOf(bundle.getString("testOnBorrow")));
config.setTestOnReturn(Boolean.valueOf(bundle.getString("testOnReturn")));
 
JedisPool pool=new JedisPool(config, host, port, timeout);
 
return pool;
}
}

  

4.创建Provider类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class Producer {
 
private Jedis jedis;
private JedisPool pool;
 
public Producer(){
pool=RedisUtil.getJedisPool();
jedis = pool.getResource();
}
 
 
public void provide(String channel,Message message) throws IOException{
String str1=MessageUtil.convertToString(channel,"UTF-8");
String str2=MessageUtil.convertToString(message,"UTF-8");
jedis.publish(str1, str2);
}
 
//close the connection
public void close() throws IOException {
//将Jedis对象归还给连接池,关闭连接
pool.returnResource(jedis);
}
}

  

5.创建Consumer类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
public class Consumer {
 
private Jedis jedis;
private JedisPool pool;
 
public Consumer(){
pool=RedisUtil.getJedisPool();
jedis = pool.getResource();
}
 
 
public void consum(String channel) throws IOException{
JedisPubSub jedisPubSub = new JedisPubSub() {
// 取得订阅的消息后的处理
public void onMessage(String channel, String message) {
System.out.println("Channel:"+channel);
System.out.println("Message:"+message.toString());
}
 
// 初始化订阅时候的处理
public void onSubscribe(String channel, int subscribedChannels) {
System.out.println("onSubscribe:"+channel);
}
 
// 取消订阅时候的处理
public void onUnsubscribe(String channel, int subscribedChannels) {
System.out.println("onUnsubscribe:"+channel);
}
 
// 初始化按表达式的方式订阅时候的处理
public void onPSubscribe(String pattern, int subscribedChannels) {
// System.out.println(pattern + "=" + subscribedChannels);
}
 
// 取消按表达式的方式订阅时候的处理
public void onPUnsubscribe(String pattern, int subscribedChannels) {
// System.out.println(pattern + "=" + subscribedChannels);
}
 
// 取得按表达式的方式订阅的消息后的处理
public void onPMessage(String pattern, String channel, String message) {
System.out.println(pattern + "=" + channel + "=" + message);
}
};
 
jedis.subscribe(jedisPubSub, channel);
}
 
//close the connection
public void close() throws IOException {
//将Jedis对象归还给连接池
pool.returnResource(jedis);
}
}

  

6.测试方法

1
2
3
4
5
6
7
8
9
10
11
12
13
public static void main(String[] args){
 
Message msg=new Message("hello!""this is the first message!");
 
Producer producer=new Producer();
Consumer consumer=new Consumer();
try {
producer.provide("chn1",msg);
consumer.consum("chn1");
catch (IOException e) {
e.printStackTrace();
}
}

  

 


版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

相关文章
redis灵魂拷问:如何使用stream实现消息队列
redis灵魂拷问:如何使用stream实现消息队列
213 0
redis 实现消息队列及常用命令(三)|学习笔记
快速学习 redis 实现消息队列及常用命令(三)
121 0
基于Redis实现消息队列
基于Redis实现消息队列
187 0
【Redis】浅尝Redis Stream做消息队列
SpringBoot整合Redis5.0新特性Redis Stream
424 0
SpringBoot整合Redis实现消息队列
SpringBoot整合Redis实现消息队列
199 0
别再用 Redis List 实现消息队列了,Stream 专为队列而生
Stream 是 Redis 5.0 引入的一种专门为消息队列设计的数据类型,Stream 是一个包含 0 个或者多个元素的有序队列,这些元素根据 ID 的大小进行有序排列。
246 0
Redis 竟然能用 List 实现消息队列
今天,码哥结合消息队列的特点一步步带大家分析使用 Redis 的 List 作为消息队列的实现原理,并分享如何把 SpringBoot 与 Redission 整合运用到项目中。
298 0
Redis和消息队列使用实战
 消息队列是在乐视这边非常普遍使用的技术。在我们部门内部,不同的项目使用的消息队列实现也不一样。下面是支付系统的流转图
106 0
AspNetCore结合Redis实践消息队列
这是年中首发在博客园上的文章,个人觉得是AspNetCore结合Redis做的一次比较优秀的消息队列重构,其中对于点对点/发布-订阅的思路应该也是面试必考题。
242 0
Redis消息队列发展历程
Redis是目前最受欢迎的kv类数据库,当然它的功能越来越多,早已不限定在kv场景,消息队列就是Redis中一个重要的功能。Redis从2010年发布1.0版本就具备一个消息队列的雏形,随着10多年的迭代,其消息队列的功能也越来越完善,作为一个全内存的消息队列,适合应用与要求高吞吐、低延时的场景。本文将来盘一下Redis消息队列功能的发展历程,历史版本有哪些不足,后续版本是如何来解决这些问题的。
1157 0
+关注
邴越
关注分布式系统及高可用架构,探讨职业规划,实践持续学习,公众号「架构进化论」
文章
问答
文章排行榜
最热
最新
相关电子书
更多
消息队列 Kafka 版差异化特性
立即下载
消息队列kafka介绍
立即下载
企业互联网架构之消息队列
立即下载