④. 消费者
/** * 路由模式;消费者接收消息 */ public class Consumer { static Runnable runnable=()->{ try{ //1. 创建连接;(抽取一个获取连接的工具类) Connection connection = ConnectionUtil.getConnection(); //2. 创建频道; Channel channel = connection.createChannel(); //3. 创建消费者(接收消息并处理消息); DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //路由key System.out.println("路由key为:" + envelope.getRoutingKey()); //交换机 System.out.println("交换机为:" + envelope.getExchange()); //消息id System.out.println("消息id为:" + envelope.getDeliveryTag()); //接收到的消息 System.out.println("消费者1 --- 接收到的消息为:" + new String(body, "utf-8")); } }; //4. 监听队列 /** * 参数1:队列名 * 参数2:是否要自动确认;设置为true表示消息接收到自动向MQ回复接收到了,MQ则会将消息从队列中删除; * 如果设置为false则需要手动确认 * 参数3:消费者 */ String name = Thread.currentThread().getName(); System.out.println("线程的名字:"+name); channel.basicConsume(name, true, defaultConsumer); }catch (Exception e){ e.printStackTrace(); } }; public static void main(String[] args) throws Exception { new Thread(runnable,Producer.DIRECT_QUEUE_INSERT).start(); new Thread(runnable,Producer.DIRECT_QUEUE_UPDATE).start(); } }
⑤. 测试
在执行完测试代码后,其实到RabbitMQ的管理后台找到Exchanges选项卡,点击direct_exchange的交换机,可以查看到如下的绑定: