这里我使用Redis的发布、订阅功能实现简单的消息队列,基本的命令有publish、subscribe等。
在Jedis中,有对应的java方法,但是只能发布字符串消息。为了传输对象,需要将对象进行序列化,并封装成字符串进行处理。
使用Redis实现消息队列
1.封装一个消息对象
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
public
class
Message
implements
Serializable{
private
static
final
long
serialVersionUID = 1L;
private
String titile;
private
String info;
public
Message(String titile,String info){
this
.titile=titile;
this
.info=info;
}
public
String getTitile() {
return
titile;
}
public
void
setTitile(String titile) {
this
.titile = titile;
}
public
String getInfo() {
return
info;
}
public
void
setInfo(String info) {
this
.info = info;
}
}
|
2.为这个消息对象提供序列化方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
public
class
MessageUtil {
//convert To String
public
static
String convertToString(Object obj,String charset)
throws
IOException{
ByteArrayOutputStream bo =
new
ByteArrayOutputStream();
ObjectOutputStream oo =
new
ObjectOutputStream(bo);
oo.writeObject(obj);
String str = bo.toString(charset);
bo.close();
oo.close();
return
str;
}
//convert To Message
public
static
Object convertToMessage(
byte
[] bytes)
throws
Exception{
ByteArrayInputStream in =
new
ByteArrayInputStream(bytes);
ObjectInputStream sIn =
new
ObjectInputStream(in);
return
sIn.readObject();
}
}
|
3.从Jedis连接池中获取连接
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
public
class
RedisUtil {
/**
* Jedis connection pool
* @Title: config
*/
public
static
JedisPool getJedisPool(){
ResourceBundle bundle=ResourceBundle.getBundle(
"redis"
);
String host=bundle.getString(
"host"
);
int
port=Integer.valueOf(bundle.getString(
"port"
));
int
timeout=Integer.valueOf(bundle.getString(
"timeout"
));
// String password=bundle.getString("password");
JedisPoolConfig config=
new
JedisPoolConfig();
config.setMaxActive(Integer.valueOf(bundle.getString(
"maxActive"
)));
config.setMaxWait(Integer.valueOf(bundle.getString(
"maxWait"
)));
config.setTestOnBorrow(Boolean.valueOf(bundle.getString(
"testOnBorrow"
)));
config.setTestOnReturn(Boolean.valueOf(bundle.getString(
"testOnReturn"
)));
JedisPool pool=
new
JedisPool(config, host, port, timeout);
return
pool;
}
}
|
4.创建Provider类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
public
class
Producer {
private
Jedis jedis;
private
JedisPool pool;
public
Producer(){
pool=RedisUtil.getJedisPool();
jedis = pool.getResource();
}
public
void
provide(String channel,Message message)
throws
IOException{
String str1=MessageUtil.convertToString(channel,
"UTF-8"
);
String str2=MessageUtil.convertToString(message,
"UTF-8"
);
jedis.publish(str1, str2);
}
//close the connection
public
void
close()
throws
IOException {
//将Jedis对象归还给连接池,关闭连接
pool.returnResource(jedis);
}
}
|
5.创建Consumer类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
|
public
class
Consumer {
private
Jedis jedis;
private
JedisPool pool;
public
Consumer(){
pool=RedisUtil.getJedisPool();
jedis = pool.getResource();
}
public
void
consum(String channel)
throws
IOException{
JedisPubSub jedisPubSub =
new
JedisPubSub() {
// 取得订阅的消息后的处理
public
void
onMessage(String channel, String message) {
System.out.println(
"Channel:"
+channel);
System.out.println(
"Message:"
+message.toString());
}
// 初始化订阅时候的处理
public
void
onSubscribe(String channel,
int
subscribedChannels) {
System.out.println(
"onSubscribe:"
+channel);
}
// 取消订阅时候的处理
public
void
onUnsubscribe(String channel,
int
subscribedChannels) {
System.out.println(
"onUnsubscribe:"
+channel);
}
// 初始化按表达式的方式订阅时候的处理
public
void
onPSubscribe(String pattern,
int
subscribedChannels) {
// System.out.println(pattern + "=" + subscribedChannels);
}
// 取消按表达式的方式订阅时候的处理
public
void
onPUnsubscribe(String pattern,
int
subscribedChannels) {
// System.out.println(pattern + "=" + subscribedChannels);
}
// 取得按表达式的方式订阅的消息后的处理
public
void
onPMessage(String pattern, String channel, String message) {
System.out.println(pattern +
"="
+ channel +
"="
+ message);
}
};
jedis.subscribe(jedisPubSub, channel);
}
//close the connection
public
void
close()
throws
IOException {
//将Jedis对象归还给连接池
pool.returnResource(jedis);
}
}
|
6.测试方法
1
2
3
4
5
6
7
8
9
10
11
12
13
|
public
static
void
main(String[] args){
Message msg=
new
Message(
"hello!"
,
"this is the first message!"
);
Producer producer=
new
Producer();
Consumer consumer=
new
Consumer();
try
{
producer.provide(
"chn1"
,msg);
consumer.consum(
"chn1"
);
}
catch
(IOException e) {
e.printStackTrace();
}
}
|