④. Topics通配符模式
- ①. 通配符规则:
# :
匹配一个或多个词
* :
匹配不多不少恰好1个词
- 举例:
item.#:能匹配item.insert.abc或者item.insert
item.*:只能匹配item.insert
②. 生产者
/** * 通配符模式:发送消息 */ public class Producer { //交换机名称 static final String TOPIC_EXCHAGE = "topic_exchage"; //队列名称 static final String TOPIC_QUEUE_1 = "topic_queue_1"; //队列名称 static final String TOPIC_QUEUE_2 = "topic_queue_2"; public static void main(String[] args) throws Exception { //1. 创建连接; Connection connection = ConnectionUtil.getConnection(); //2. 创建频道; Channel channel = connection.createChannel(); //3. 声明交换机;参数1:交换机名称,参数2:交换机类型(fanout,direct,topic) channel.exchangeDeclare(TOPIC_EXCHAGE, BuiltinExchangeType.TOPIC); channel.queueDeclare(Producer.TOPIC_QUEUE_1, true, false, false, null); //5. 队列绑定到交换机上 channel.queueBind(Producer.TOPIC_QUEUE_1, Producer.TOPIC_EXCHAGE, "item.update"); channel.queueBind(Producer.TOPIC_QUEUE_1, Producer.TOPIC_EXCHAGE, "item.delete"); //5. 队列绑定到交换机上 channel.queueDeclare(Producer.TOPIC_QUEUE_2, true, false, false, null); channel.queueBind(Producer.TOPIC_QUEUE_2, Producer.TOPIC_EXCHAGE, "item.*"); //6. 发送消息; String message = "商品新增。通配符模式 ;routing key 为 item.insert "; /** * 参数1:交换机名称;如果没有则指定空字符串(表示使用默认的交换机) * 参数2:路由key,简单模式中可以使用队列名称 * 参数3:消息其它属性 * 参数4:消息内容 */ channel.basicPublish(TOPIC_EXCHAGE, "item.insert", null, message.getBytes()); System.out.println("已发送消息:" + message); message = "商品修改。通配符模式 ;routing key 为 item.update "; /** * 参数1:交换机名称;如果没有则指定空字符串(表示使用默认的交换机) * 参数2:路由key,简单模式中可以使用队列名称 * 参数3:消息其它属性 * 参数4:消息内容 */ channel.basicPublish(TOPIC_EXCHAGE, "item.update", null, message.getBytes()); System.out.println("已发送消息:" + message); message = "商品删除。通配符模式 ;routing key 为 item.delete "; /** * 参数1:交换机名称;如果没有则指定空字符串(表示使用默认的交换机) * 参数2:路由key,简单模式中可以使用队列名称 * 参数3:消息其它属性 * 参数4:消息内容 */ channel.basicPublish(TOPIC_EXCHAGE, "item.delete", null, message.getBytes()); System.out.println("已发送消息:" + message); //6. 关闭资源 channel.close(); connection.close(); } }
③. 消费者
/** * 通配符模式;消费者接收消息 */ public class Consumer1 { public static void main(String[] args) throws Exception { //1. 创建连接;(抽取一个获取连接的工具类) Connection connection = ConnectionUtil.getConnection(); //2. 创建频道; Channel channel = connection.createChannel(); //3. 声明交换机 channel.exchangeDeclare(Producer.TOPIC_EXCHAGE, BuiltinExchangeType.TOPIC); //4. 创建消费者(接收消息并处理消息); DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //路由key System.out.println("路由key为:" + envelope.getRoutingKey()); //交换机 System.out.println("交换机为:" + envelope.getExchange()); //消息id System.out.println("消息id为:" + envelope.getDeliveryTag()); //接收到的消息 System.out.println("消费者1 --- 接收到的消息为:" + new String(body, "utf-8")); } }; //6. 监听队列 /** * 参数1:队列名 * 参数2:是否要自动确认;设置为true表示消息接收到自动向MQ回复接收到了,MQ则会将消息从队列中删除; * 如果设置为false则需要手动确认 * 参数3:消费者 */ channel.basicConsume(Producer.TOPIC_QUEUE_1, true, defaultConsumer); } }