Routing Topic
Topic类型的Exchange与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key的时候使用通配符!这种模型Routingkey一般都是由一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert
、 item.#
、item.*
。
统配符
* (star) can substitute for exactly one word. 匹配不多不少恰好1个词 # (hash) can substitute for zero or more words. 匹配[0-n]个词 a.# 可以匹配 a.b、a.b.c、a.b.c.d 等只要是a.开头的情况 a.* 只能匹配 a.b 这种后面只有一个单词的情况
通配符可以出现在
注意:RoutingKey的任意位置。
创建生产者
public class MyProducer { @Test public void test() throws Exception { // 交换机 String exchange = "logs_topic"; // 创建工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setVirtualHost("/"); factory.setHost("xuewei.world"); factory.setUsername("xuewei"); factory.setPassword("123456"); factory.setPort(5672); // 创建连接和通道 Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 声明交换机 channel.exchangeDeclare(exchange, "topic"); // 发布消息 channel.basicPublish(exchange, "a.b", null, "a.b".getBytes()); channel.basicPublish(exchange, "a.b.c", null, "a.b.c".getBytes()); channel.basicPublish(exchange, "a.b.c.d", null, "a.b.c.d" .getBytes()); channel.basicPublish(exchange, "a.b.c.d.e", null, "a.b.c.d.e".getBytes()); } }
创建消费者1
public class MyConsumer1 { public static void main(String[] args) throws Exception { // 指定交换机 String exchange = "logs_topic"; // 创建工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setVirtualHost("/"); factory.setHost("xuewei.world"); factory.setUsername("xuewei"); factory.setPassword("123456"); factory.setPort(5672); // 创建连接和通道 Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 绑定交换机 channel.exchangeDeclare(exchange, "topic"); // 创建临时队列 String queue = channel.queueDeclare().getQueue(); // 将临时队列绑定exchange channel.queueBind(queue, exchange, "a.*"); channel.queueBind(queue, exchange, "#.d.#"); // 处理消息 channel.basicConsume(queue, true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者1: " + new String(body)); // TODO 业务处理 } }); } }
创建消费者2
public class MyConsumer2 { public static void main(String[] args) throws Exception { // 指定交换机 String exchange = "logs_topic"; // 创建工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setVirtualHost("/"); factory.setHost("xuewei.world"); factory.setUsername("xuewei"); factory.setPassword("123456"); factory.setPort(5672); // 创建连接和通道 Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 绑定交换机 channel.exchangeDeclare(exchange, "topic"); // 创建临时队列 String queue = channel.queueDeclare().getQueue(); // 将临时队列绑定exchange channel.queueBind(queue, exchange, "#.b.#"); // 处理消息 channel.basicConsume(queue, true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者2: " + new String(body)); // TODO 业务处理 } }); } }
生产者生产的消息:a.b
、a.b.c
、a.b.c.d
、a.b.c.d.e
消费者1接受的消息规则为:
channel.queueBind(queue, exchange, "a.*"); channel.queueBind(queue, exchange, "#.d.#");
所以消费者1将会接收到:a.b
、a.b.c.d
、a.b.c.d.e
消费者2接受的消息规则为:
channel.queueBind(queue, exchange, "#.b.#");
所以消费者2将会接收到:a.b
、a.b.c
、a.b.c.d
、a.b.c.d.e