③. 生产者
/** * 发布与订阅模式:发送消息 */ public class Producer { //交换机名称 static final String FANOUT_EXCHANGE = "fanout_exchange"; //队列名称 static final String FANOUT_QUEUE_1 = "fanout_queue_1"; //队列名称 static final String FANOUT_QUEUE_2 = "fanout_queue_2"; public static void main(String[] args) throws Exception { //1. 创建连接; Connection connection = ConnectionUtil.getConnection(); //2. 创建频道; Channel channel = connection.createChannel(); //3. 声明交换机;参数1:交换机名称,参数2:交换机类型(fanout,direct,topic) channel.exchangeDeclare(FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT); //4. 声明队列; /** * 参数1:队列名称 * 参数2:是否定义持久化队列(消息会持久化保存在服务器上) * 参数3:是否独占本连接 * 参数4:是否在不使用的时候队列自动删除 * 参数5:其它参数 */ channel.queueDeclare(FANOUT_QUEUE_1, true, false, false, null); channel.queueDeclare(FANOUT_QUEUE_2, true, false, false, null); //5. 队列绑定到交换机;参数1:队列名称,参数2:交换机名称,参数3:路由key channel.queueBind(FANOUT_QUEUE_1, FANOUT_EXCHANGE, ""); channel.queueBind(FANOUT_QUEUE_2, FANOUT_EXCHANGE, ""); //6. 发送消息; for(int i = 1; i<=10; i++) { String message = "你好!小兔纸。发布订阅模式 --- " + i; /** * 参数1:交换机名称;如果没有则指定空字符串(表示使用默认的交换机) * 参数2:路由key,简单模式中可以使用队列名称 * 参数3:消息其它属性 * 参数4:消息内容 */ channel.basicPublish(FANOUT_EXCHANGE, "", null, message.getBytes()); System.out.println("已发送消息:" + message); } //6. 关闭资源 channel.close(); connection.close(); } }
④. 消费者
/** * 发布与订阅模式;消费者接收消息 */ public class Consumer { private static Runnable runnable=()->{ try{ //1. 创建连接;(抽取一个获取连接的工具类) Connection connection = ConnectionUtil.getConnection(); //2. 创建频道; Channel channel = connection.createChannel(); //3.申明队列queue存储消息 /* * 如果队列不存在,则会创建 * Rabbitmq不允许创建两个相同的队列名称,否则会报错。 * @params1: queue 队列的名称 * @params2: durable 队列是否持久化 * @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭 * @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。 * @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。 * */ // 这里如果queue已经被创建过一次了,可以不需要定义 // channel.queueDeclare("queue1", false, false, false, null); //6. 创建消费者(接收消息并处理消息); DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //路由key System.out.println("路由key为:" + envelope.getRoutingKey()); //交换机 System.out.println("交换机为:" + envelope.getExchange()); //消息id System.out.println("消息id为:" + envelope.getDeliveryTag()); //接收到的消息 System.out.println("消费者1 --- 接收到的消息为:" + new String(body, "utf-8")); } }; //6. 监听队列 /** * 参数1:队列名 * 参数2:是否要自动确认;设置为true表示消息接收到自动向MQ回复接收到了,MQ则会将消息从队列中删除; * 如果设置为false则需要手动确认 * 参数3:消费者 */ channel.basicConsume(Thread.currentThread().getName(), true, defaultConsumer); }catch (Exception e){ e.getLocalizedMessage(); } }; public static void main(String[] args) { new Thread(runnable, Producer.FANOUT_QUEUE_1).start(); new Thread(runnable, Producer.FANOUT_QUEUE_2).start(); } }