前面的例子中,尽管我们使用了direct路由代替fanout路由解决了盲目广播的问题,但direct路由也有它的缺陷,他不能基于多个标准做路由转发。
在上面的日志系统中,如果不仅想基于日志等级做订阅,也想根据日志的发生源做订阅该怎么处理呢?这时候你可能想到了unix系统工具中的syslog服务,它不仅基于日志等级(info/warn/crit...)进行路由转发,也会根据操作(auth/cron/kern...)做路由转发。
如果是那样的话,日志系统就灵活多了,它不仅能够监听来自‘cron’的关键错误,也能监听来自'kern'的所有日志。其实主题交换机(topic exchange)就能解决这种问题。
主题交换机(Topic exchange)
主题交换机的路由代码不能是任意写的,必须是小树点分隔开的一组单词列表。这些单词可以随便写,但通常是与连接消息特征有关的单词。有效地路由代码应该是这样的“stock.usd.nyse”,“nyse.vmw”,“quick.orange.rabbit”。路由代码可以随便写,但是长度限制在255字节。
注意,绑定代码也必须在同一个表单中。topic交换机与direct交换机类似-具有特定路由代码的消息会传送给所有匹配绑定代码的队列,但有两个特殊的绑定代码:
* :它能替代一个单词
#:它能替代0或多个单词
该例子中,我们给所有的动物发送消息,符合由三个单词(第一个单词描述速度;第二个单词描述颜色;第三个单词描述物种)组成的路由代码将会发送消息:“<speed>.<colour>.<species>”。
我们创建了三个绑定:Q1使用“*.orange.*”绑定,Q2使用“*.*.rabbit”和“lazy.#”绑定。这些绑定的意义如下:
Q1描述了所有颜色为橙色的动物。
Q2描述了是兔子的动物和懒惰的动物。
这样,“quick.orange.rabbit”消息通过路由转发给Q1、Q2两个队列。"lazy.orange.elephant"消息也会转发给Q1、Q2两个队列。“quick.orange.fox”消息只会转发给Q1队列,"lazy.brown.fox"也只会转发给Q2队列。"lazy.pink.rabbit"会转发给Q2队列一次,尽管它匹配两个绑定。"quick.brown.fox"并不匹配任何一个队列就会被废弃。
如果我们打破规则,每次只发一个或四个单词的话,如“orange”或”quick.orange.male.rabbit“,这些消息不匹配任何绑定,就会被废弃。但如果发送”lazy.orange.male.rabbit“这样的消息的话,由于它匹配最后的绑定仍会被转发到Q2队列中。
主题交换机是一种非常强大的交换机,当它只绑定”#“时,它会接收所有的消息,与fanout交换机类似。当没有使用”*“和”#“符号时,主题交换机的作用等同与direct交换机。
源代码
EmitLogTopic.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
|
package
com.favccxx.favrabbit;
import
com.rabbitmq.client.Channel;
import
com.rabbitmq.client.Connection;
import
com.rabbitmq.client.ConnectionFactory;
public
class
EmitLogTopic {
private
static
final
String EXCHANGE_NAME =
"topic_logs"
;
public
static
void
main(String[] argv) {
Connection connection =
null
;
Channel channel =
null
;
try
{
ConnectionFactory factory =
new
ConnectionFactory();
factory.setHost(
"localhost"
);
connection = factory.newConnection();
channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME,
"topic"
);
String[] routingKeys = {
"fast.orange.duck"
,
"slow.orange.fish"
,
"grey.rabbit"
,
"fast.black.rabbit"
,
"quick.white.rabbit"
,
"lazy.dog"
,
"lazy.black.pig"
};
String[] messages = {
"Hello"
,
"Guys"
,
"Girls"
,
"Babies"
};
for
(
int
i =
0
; i < routingKeys.length; i++) {
for
(
int
j =
0
; j < messages.length; j++) {
channel.basicPublish(EXCHANGE_NAME, routingKeys[i],
null
, messages[j].getBytes(
"UTF-8"
));
System.out.println(
" [x] Sent '"
+ routingKeys[i] +
"':'"
+ messages[j] +
"'"
);
}
}
}
catch
(Exception e) {
e.printStackTrace();
}
finally
{
if
(connection !=
null
) {
try
{
connection.close();
}
catch
(Exception ignore) {
}
}
}
}
}
|
ReceiveLogsTopic.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
package
com.favccxx.favrabbit;
import
java.io.IOException;
import
com.rabbitmq.client.AMQP;
import
com.rabbitmq.client.Channel;
import
com.rabbitmq.client.Connection;
import
com.rabbitmq.client.ConnectionFactory;
import
com.rabbitmq.client.Consumer;
import
com.rabbitmq.client.DefaultConsumer;
import
com.rabbitmq.client.Envelope;
public
class
ReceiveLogsTopic {
private
static
final
String EXCHANGE_NAME =
"topic_logs"
;
public
static
void
main(String[] argv)
throws
Exception {
ConnectionFactory factory =
new
ConnectionFactory();
factory.setHost(
"localhost"
);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME,
"topic"
);
String queueName = channel.queueDeclare().getQueue();
String[] bindingKeys = {
"*.orange.*"
,
"*.*.rabbit"
,
"lazy.#"
};
for
(
final
String bindingKey : bindingKeys) {
channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
Consumer consumer =
new
DefaultConsumer(channel) {
@Override
public
void
handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte
[] body)
throws
IOException {
String message =
new
String(body,
"UTF-8"
);
System.out.println(
"["
+ bindingKey +
"] Received message :'"
+ message +
"' from routingKey : "
+ envelope.getRoutingKey());
}
};
channel.basicConsume(queueName,
true
, consumer);
}
}
}
|
运行消息发送器,在消息接收平台输出内容如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
[*.orange.*] Received message :
'Hello'
from routingKey : fast.orange.duck
[*.*.rabbit] Received message :
'Guys'
from routingKey : fast.orange.duck
[lazy.
#] Received message :'Girls' from routingKey : fast.orange.duck
[*.orange.*] Received message :
'Babies'
from routingKey : fast.orange.duck
[*.*.rabbit] Received message :
'Hello'
from routingKey : slow.orange.fish
[lazy.
#] Received message :'Guys' from routingKey : slow.orange.fish
[*.orange.*] Received message :
'Girls'
from routingKey : slow.orange.fish
[*.*.rabbit] Received message :
'Babies'
from routingKey : slow.orange.fish
[lazy.
#] Received message :'Hello' from routingKey : fast.black.rabbit
[*.orange.*] Received message :
'Guys'
from routingKey : fast.black.rabbit
[*.*.rabbit] Received message :
'Girls'
from routingKey : fast.black.rabbit
[lazy.
#] Received message :'Babies' from routingKey : fast.black.rabbit
[*.orange.*] Received message :
'Hello'
from routingKey : quick.white.rabbit
[*.*.rabbit] Received message :
'Guys'
from routingKey : quick.white.rabbit
[lazy.
#] Received message :'Girls' from routingKey : quick.white.rabbit
[*.orange.*] Received message :
'Babies'
from routingKey : quick.white.rabbit
[*.*.rabbit] Received message :
'Hello'
from routingKey : lazy.dog
[lazy.
#] Received message :'Guys' from routingKey : lazy.dog
[*.orange.*] Received message :
'Girls'
from routingKey : lazy.dog
[*.*.rabbit] Received message :
'Babies'
from routingKey : lazy.dog
[lazy.
#] Received message :'Hello' from routingKey : lazy.black.pig
[*.orange.*] Received message :
'Guys'
from routingKey : lazy.black.pig
[*.*.rabbit] Received message :
'Girls'
from routingKey : lazy.black.pig
[lazy.
#] Received message :'Babies' from routingKey : lazy.black.pig
|