RabbitMQ笔记

简介: RabbitMQ笔记

依赖

<dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.16.0</version>
        </dependency>

简单模式

生产者

package com.imooc.mq;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
 * 构建简单模式的生产者,发送消息
 */
public class FooProducer {
    public static void main(String[] args) throws Exception {
        // 1. 创建连接工厂以及相关的参数配置
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.233.128");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("guest");
        factory.setPassword("guest");
        // 2. 通过工程创建连接 Connection
        Connection connection = factory.newConnection();
        // 3. 创建管道 Channel
        Channel channel = connection.createChannel();
        // 4. 创建队列 Queue(简单模式不需要交换机Exchange)
        /**
         * queue: 队列名
         * durable: 是否持久化,true:重启之后,队列依然存在,false则不存在
         * exclusive: 是否独占,true:只能有一个消费者监听这个队列,一般设置为false
         * autoDelete: 是否自动删除,true:当没有消费者的时候,则自动删除这个队列
         * arguments: map类型的其他参数
         */
        channel.queueDeclare("hello", true, false, false, null);
        // 5. 向队列发送消息
        /**
         * exchange: 交换机的名称,简单模式下没有,所以直接设置为 ""
         * routingKey: 路由key,映射路径,如果交换机没有,则路由key和队列名保持一致
         * props: 配置参数
         * body: 消息数据
         */
        String msg = "Hello 慕课网~~~";
        channel.basicPublish("", "hello", null, msg.getBytes());
        // 6. 释放资源
        channel.close();
        connection.close();
    }
}

消费者

package com.imooc.mq;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
 * 构建简单模式的消费者,监听消费消息
 */
public class FooConsumer {
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.233.128");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("guest");
        factory.setPassword("guest");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare("hello", true, false, false, null);
        Consumer consumer = new DefaultConsumer(channel){
            /**
             * 重写消息配送方法
             * @param consumerTag 消息的标签(标识)
             * @param envelope  信封(一些信息,比如交换机路由等等信息)
             * @param properties 配置信息
             * @param body 收到的消息数据
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                System.out.println("consumerTag = " + consumerTag);
                System.out.println("envelope = " + envelope.toString());
                System.out.println("BasicProperties = " + properties.toString());
                System.out.println("body = " + new String(body));
            }
        };
        /**
         * queue: 监听的队列名
         * autoAck: 是否自动确认,true:告知mq消费者已经消费的确认通知
         * callback: 回调函数,处理监听到的消息
         */
        channel.basicConsume("hello", true, consumer);
    }
}

工作队列模式

生产者

package com.imooc.mq;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
 * 构建工作队列的生产者,发送消息
 */
public class WorkQueuesProducer {
    public static void main(String[] args) throws Exception {
        // 1. 创建连接工厂以及相关的参数配置
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.233.128");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("guest");
        factory.setPassword("guest");
        // 2. 通过工程创建连接 Connection
        Connection connection = factory.newConnection();
        // 3. 创建管道 Channel
        Channel channel = connection.createChannel();
        // 4. 创建队列 Queue(简单模式不需要交换机Exchange)
        /**
         * queue: 队列名
         * durable: 是否持久化,true:重启之后,队列依然存在,false则不存在
         * exclusive: 是否独占,true:只能有一个消费者监听这个队列,一般设置为false
         * autoDelete: 是否自动删除,true:当没有消费者的时候,则自动删除这个队列
         * arguments: map类型的其他参数
         */
        channel.queueDeclare("work_queue", true, false, false, null);
        // 5. 向队列发送消息
        /**
         * exchange: 交换机的名称,简单模式下没有,所以直接设置为 ""
         * routingKey: 路由key,映射路径,如果交换机没有,则路由key和队列名保持一致
         * props: 配置参数
         * body: 消息数据
         */
        for (int i = 0 ; i < 10 ; i ++) {
            String task = "开始上班,搬砖喽~ 开启任务[" + i + "]";
            channel.basicPublish("", "work_queue", null, task.getBytes());
        }
        // 6. 释放资源
        channel.close();
        connection.close();
    }
}

消费者A

package com.imooc.mq;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
 * 构建工作队列模式的消费者,监听消费消息
 */
public class WorkQueuesConsumerA {
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.233.128");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("guest");
        factory.setPassword("guest");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare("work_queue", true, false, false, null);
        Consumer consumer = new DefaultConsumer(channel){
            /**
             * 重写消息配送方法
             * @param consumerTag 消息的标签(标识)
             * @param envelope  信封(一些信息,比如交换机路由等等信息)
             * @param properties 配置信息
             * @param body 收到的消息数据
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                System.out.println("body = " + new String(body));
            }
        };
        /**
         * queue: 监听的队列名
         * autoAck: 是否自动确认,true:告知mq消费者已经消费的确认通知
         * callback: 回调函数,处理监听到的消息
         */
        channel.basicConsume("work_queue", true, consumer);
    }
}

消费者B

package com.imooc.mq;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
 * 构建工作队列模式的消费者,监听消费消息
 */
public class WorkQueuesConsumerB {
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.233.128");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("guest");
        factory.setPassword("guest");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare("work_queue", true, false, false, null);
        Consumer consumer = new DefaultConsumer(channel){
            /**
             * 重写消息配送方法
             * @param consumerTag 消息的标签(标识)
             * @param envelope  信封(一些信息,比如交换机路由等等信息)
             * @param properties 配置信息
             * @param body 收到的消息数据
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                System.out.println("body = " + new String(body));
            }
        };
        /**
         * queue: 监听的队列名
         * autoAck: 是否自动确认,true:告知mq消费者已经消费的确认通知
         * callback: 回调函数,处理监听到的消息
         */
        channel.basicConsume("work_queue", true, consumer);
    }
}

发布订阅模式

生产者

package com.imooc.mq;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
 * 构建发布订阅模式的生产者,发送消息
 */
public class PubSubProducer {
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.233.128");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("guest");
        factory.setPassword("guest");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        // 创建交换机 Exchange
        /**
         * exchange: 交换机的名称
         * type: 交换机的类型
         *  FANOUT("fanout"): 广播模式,发布订阅,把消息发送给所有的绑定的队列
         *  DIRECT("direct"): 定向投递模式,把消息发送给指定的“routing key”的队列
         *  TOPIC("topic"): 通配符模式,把消息发送给符合的“routing pattern”的队列
         *  HEADERS("headers"): 使用率不多,参数匹配
         * durable: 是否持久化
         * autoDelete: 是否自动删除
         * internal: 内部意思,true:表示当前exchange是rabbitmq内部使用的,用户创建的队列不会消费该类型交换机下的消息,所以我们一般使用false即可
         * arguments: map类型的参数
         */
        String fanout_exchange = "fanout_exchange";
        channel.exchangeDeclare(fanout_exchange,
                BuiltinExchangeType.FANOUT,
                true, false, false, null);
        // 定义两个队列
        String fanout_queue_a = "fanout_queue_a";
        String fanout_queue_b = "fanout_queue_b";
        channel.queueDeclare(fanout_queue_a, true, false, false, null);
        channel.queueDeclare(fanout_queue_b, true, false, false, null);
        // 绑定交换机和队列
        channel.queueBind(fanout_queue_a, fanout_exchange, "");
        channel.queueBind(fanout_queue_b, fanout_exchange, "");
        for (int i = 0 ; i < 10 ; i ++) {
            String task = "开始上班,搬砖喽~ 开启任务[" + i + "]";
            channel.basicPublish(fanout_exchange, "", null, task.getBytes());
        }
        channel.close();
        connection.close();
    }
}

消费者A

package com.imooc.mq;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
 * 构建发布订阅模式的消费者,监听消费消息
 */
public class PubSubConsumerA {
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.233.128");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("guest");
        factory.setPassword("guest");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        String fanout_queue_a = "fanout_queue_a";
        channel.queueDeclare(fanout_queue_a, true, false, false, null);
        Consumer consumer = new DefaultConsumer(channel){
            /**
             * 重写消息配送方法
             * @param consumerTag 消息的标签(标识)
             * @param envelope  信封(一些信息,比如交换机路由等等信息)
             * @param properties 配置信息
             * @param body 收到的消息数据
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                System.out.println("body = " + new String(body));
            }
        };
        /**
         * queue: 监听的队列名
         * autoAck: 是否自动确认,true:告知mq消费者已经消费的确认通知
         * callback: 回调函数,处理监听到的消息
         */
        channel.basicConsume(fanout_queue_a, true, consumer);
    }
}

消费者B

package com.imooc.mq;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
 * 构建发布订阅模式的消费者,监听消费消息
 */
public class PubSubConsumerB {
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.233.128");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("guest");
        factory.setPassword("guest");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        String fanout_queue_b = "fanout_queue_b";
        channel.queueDeclare(fanout_queue_b, true, false, false, null);
        Consumer consumer = new DefaultConsumer(channel){
            /**
             * 重写消息配送方法
             * @param consumerTag 消息的标签(标识)
             * @param envelope  信封(一些信息,比如交换机路由等等信息)
             * @param properties 配置信息
             * @param body 收到的消息数据
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                System.out.println("body = " + new String(body));
            }
        };
        /**
         * queue: 监听的队列名
         * autoAck: 是否自动确认,true:告知mq消费者已经消费的确认通知
         * callback: 回调函数,处理监听到的消息
         */
        channel.basicConsume(fanout_queue_b, true, consumer);
    }
}

路由(定向)模式

生产者

package com.imooc.mq;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
 * 构建路由(定向)模式的生产者,发送消息
 */
public class RoutingProducer {
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.233.128");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("guest");
        factory.setPassword("guest");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        // 创建交换机 Exchange
        /**
         * exchange: 交换机的名称
         * type: 交换机的类型
         *  FANOUT("fanout"): 广播模式,发布订阅,把消息发送给所有的绑定的队列
         *  DIRECT("direct"): 定向投递模式,把消息发送给指定的“routing key”的队列
         *  TOPIC("topic"): 通配符模式,把消息发送给符合的“routing pattern”的队列
         *  HEADERS("headers"): 使用率不多,参数匹配
         * durable: 是否持久化
         * autoDelete: 是否自动删除
         * internal: 内部意思,true:表示当前exchange是rabbitmq内部使用的,用户创建的队列不会消费该类型交换机下的消息,所以我们一般使用false即可
         * arguments: map类型的参数
         */
        String routing_exchange = "routing_exchange";
        channel.exchangeDeclare(routing_exchange,
                BuiltinExchangeType.DIRECT,
                true, false, false, null);
        // 定义两个队列
        String routing_queue_order = "routing_queue_order";
        String routing_queue_pay = "routing_queue_pay";
        channel.queueDeclare(routing_queue_order, true, false, false, null);
        channel.queueDeclare(routing_queue_pay, true, false, false, null);
        // 绑定交换机和队列
        channel.queueBind(routing_queue_order, routing_exchange, "order_create");
        channel.queueBind(routing_queue_order, routing_exchange, "order_delete");
        channel.queueBind(routing_queue_order, routing_exchange, "order_update");
        channel.queueBind(routing_queue_pay, routing_exchange, "order_pay");
        String msg1 = "创建订单A";
        String msg2 = "创建订单B";
        String msg3 = "删除订单C";
        String msg4 = "修改订单D";
        String msg5 = "支付订单E";
        String msg6 = "支付订单F";
        channel.basicPublish(routing_exchange, "order_create", null, msg1.getBytes());
        channel.basicPublish(routing_exchange, "order_create", null, msg2.getBytes());
        channel.basicPublish(routing_exchange, "order_delete", null, msg3.getBytes());
        channel.basicPublish(routing_exchange, "order_update", null, msg4.getBytes());
        channel.basicPublish(routing_exchange, "order_pay", null, msg5.getBytes());
        channel.basicPublish(routing_exchange, "order_pay", null, msg6.getBytes());
        channel.close();
        connection.close();
    }
}

消费者A

package com.imooc.mq;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
 * 构建路由模式的消费者,监听消费消息
 */
public class RoutingOrderConsumerA {
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.233.128");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("guest");
        factory.setPassword("guest");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        String routing_queue_order = "routing_queue_order";
        channel.queueDeclare(routing_queue_order, true, false, false, null);
        Consumer consumer = new DefaultConsumer(channel){
            /**
             * 重写消息配送方法
             * @param consumerTag 消息的标签(标识)
             * @param envelope  信封(一些信息,比如交换机路由等等信息)
             * @param properties 配置信息
             * @param body 收到的消息数据
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                System.out.println("body = " + new String(body));
            }
        };
        /**
         * queue: 监听的队列名
         * autoAck: 是否自动确认,true:告知mq消费者已经消费的确认通知
         * callback: 回调函数,处理监听到的消息
         */
        channel.basicConsume(routing_queue_order, true, consumer);
    }
}

消费者B

package com.imooc.mq;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
 * 构建路由模式的消费者,监听消费消息
 */
public class RoutingPayConsumerB {
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.233.128");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("guest");
        factory.setPassword("guest");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        String routing_queue_pay = "routing_queue_pay";
        channel.queueDeclare(routing_queue_pay, true, false, false, null);
        Consumer consumer = new DefaultConsumer(channel){
            /**
             * 重写消息配送方法
             * @param consumerTag 消息的标签(标识)
             * @param envelope  信封(一些信息,比如交换机路由等等信息)
             * @param properties 配置信息
             * @param body 收到的消息数据
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                System.out.println("body = " + new String(body));
            }
        };
        /**
         * queue: 监听的队列名
         * autoAck: 是否自动确认,true:告知mq消费者已经消费的确认通知
         * callback: 回调函数,处理监听到的消息
         */
        channel.basicConsume(routing_queue_pay, true, consumer);
    }
}

通配符匹配模式 topic

生产者

package com.imooc.mq;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
 * 构建通配符模式的生产者,发送消息
 */
public class TopicsProducer {
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.233.128");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("guest");
        factory.setPassword("guest");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        // 创建交换机 Exchange
        /**
         * exchange: 交换机的名称
         * type: 交换机的类型
         *  FANOUT("fanout"): 广播模式,发布订阅,把消息发送给所有的绑定的队列
         *  DIRECT("direct"): 定向投递模式,把消息发送给指定的“routing key”的队列
         *  TOPIC("topic"): 通配符模式,把消息发送给符合的“routing pattern”的队列
         *  HEADERS("headers"): 使用率不多,参数匹配
         * durable: 是否持久化
         * autoDelete: 是否自动删除
         * internal: 内部意思,true:表示当前exchange是rabbitmq内部使用的,用户创建的队列不会消费该类型交换机下的消息,所以我们一般使用false即可
         * arguments: map类型的参数
         */
        String topics_exchange = "topics_exchange";
        channel.exchangeDeclare(topics_exchange,
                BuiltinExchangeType.TOPIC,
                true, false, false, null);
        // 定义两个队列
        String topics_queue_order = "topics_queue_order";
        String topics_queue_pay = "topics_queue_pay";
        channel.queueDeclare(topics_queue_order, true, false, false, null);
        channel.queueDeclare(topics_queue_pay, true, false, false, null);
        // 绑定交换机和队列 *表示通配一个词,#表示通配一个或多个词
        channel.queueBind(topics_queue_order, topics_exchange, "order.*");
        channel.queueBind(topics_queue_pay, topics_exchange, "*.pay.#");
        String msg1 = "创建订单A";
        String msg2 = "创建订单B";
        String msg3 = "删除订单C";
        String msg4 = "修改订单D";
        String msg5 = "支付订单E";
        String msg6 = "超市订单F";
        String msg7 = "慕课订单G";
        channel.basicPublish(topics_exchange, "order.create", null, msg1.getBytes());
        channel.basicPublish(topics_exchange, "order.create", null, msg2.getBytes());
        channel.basicPublish(topics_exchange, "order.delete", null, msg3.getBytes());
        channel.basicPublish(topics_exchange, "order.update", null, msg4.getBytes());
        // order.pay 能匹配到两个队列,注意看两个队列是否都能消费到
        channel.basicPublish(topics_exchange, "order.pay", null, msg5.getBytes());
        channel.basicPublish(topics_exchange, "imooc.pay.super.market", null, msg6.getBytes());
        channel.basicPublish(topics_exchange, "imooc.pay.course", null, msg7.getBytes());
        channel.close();
        connection.close();
    }
}

消费者A

package com.imooc.mq;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
 * 构建通配符模式的消费者,监听消费消息
 */
public class TopicsOrderConsumerA {
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.233.128");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("guest");
        factory.setPassword("guest");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        String topics_queue_order = "topics_queue_order";
        channel.queueDeclare(topics_queue_order, true, false, false, null);
        Consumer consumer = new DefaultConsumer(channel){
            /**
             * 重写消息配送方法
             * @param consumerTag 消息的标签(标识)
             * @param envelope  信封(一些信息,比如交换机路由等等信息)
             * @param properties 配置信息
             * @param body 收到的消息数据
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                System.out.println("body = " + new String(body));
            }
        };
        /**
         * queue: 监听的队列名
         * autoAck: 是否自动确认,true:告知mq消费者已经消费的确认通知
         * callback: 回调函数,处理监听到的消息
         */
        channel.basicConsume(topics_queue_order, true, consumer);
    }
}

消费者B

package com.imooc.mq;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
 * 构建通配符模式的消费者,监听消费消息
 */
public class TopicsPayConsumerB {
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.233.128");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("guest");
        factory.setPassword("guest");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        String topics_queue_pay = "topics_queue_pay";
        channel.queueDeclare(topics_queue_pay, true, false, false, null);
        Consumer consumer = new DefaultConsumer(channel){
            /**
             * 重写消息配送方法
             * @param consumerTag 消息的标签(标识)
             * @param envelope  信封(一些信息,比如交换机路由等等信息)
             * @param properties 配置信息
             * @param body 收到的消息数据
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                System.out.println("body = " + new String(body));
            }
        };
        /**
         * queue: 监听的队列名
         * autoAck: 是否自动确认,true:告知mq消费者已经消费的确认通知
         * callback: 回调函数,处理监听到的消息
         */
        channel.basicConsume(topics_queue_pay, true, consumer);
    }
}

SpringBoot集成RabbitMQ

依赖(生产和消费)

<!-- SpringBoot 整合RabbitMQ 依赖 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

配置类 (生产者)

package com.imooc.api.mq;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
 * RabbitMQ 的配置类
 */
@Configuration
public class RabbitMQSMSConfig {
    // 定义交换机的名称
    public static final String SMS_EXCHANGE = "sms_exchange";
    // 定义队列的名称
    public static final String SMS_QUEUE = "sms_queue";
    // 统一定义路由key
    public static final String ROUTING_KEY_SMS_SEND_LOGIN = "imooc.sms.send.login";
    // 创建交换机
    @Bean(SMS_EXCHANGE)
    public Exchange exchange() {
        return ExchangeBuilder
                    .topicExchange(SMS_EXCHANGE)
                    .durable(true)
                    .build();
    }
    // 创建队列
    @Bean(SMS_QUEUE)
    public Queue queue() {
//        return new Queue(SMS_QUEUE);
        return QueueBuilder
                .durable(SMS_QUEUE)
                .build();
    }
    // 创建绑定关系
    @Bean
    public Binding smsBinding(@Qualifier(SMS_EXCHANGE) Exchange exchange,
                              @Qualifier(SMS_QUEUE) Queue queue) {
        return BindingBuilder
                .bind(queue)
                .to(exchange)
                .with("imooc.sms.#")
                .noargs();
    }
}

yml配置(生产和消费)

spring:
  rabbitmq:
    host: 192.168.233.128
    port: 5672
    virtual-host: /
    username: guest
    password: guest

生产者

@Autowired
    private RabbitTemplate rabbitTemplate;
    @PostMapping("getSMSCode")
        public GraceJSONResult getSMSCode(String mobile,
                HttpServletRequest request) throws Exception {
            if (StringUtils.isBlank(mobile)) {
                return GraceJSONResult.error();
            }
            // 获得用户ip
            String userIp = IPUtil.getRequestIp(request);
            // 限制用户只能在60s以内获得一次验证码
            redis.setnx60s(MOBILE_SMSCODE + ":" + userIp, mobile);
            String code = (int)((Math.random() * 9 + 1) * 100000) + "";
            // 使用消息队列异步解耦发送短信
            SMSContentQO contentQO = new SMSContentQO();
            contentQO.setMobile(mobile);
            contentQO.setContent(code);
            rabbitTemplate.convertAndSend(
                    "sms_exchange",
                    "imooc.sms.send.login",
                    GsonUtils.object2String(contentQO)
            );

GsonUtils

package com.imooc.utils;
import com.google.gson.*;
import com.google.gson.reflect.TypeToken;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class GsonUtils {
    /**
     * 不用创建对象,直接使用Gson.就可以调用方法
     */
    private static Gson gson = null;
    private static JsonParser jsonParser = null;
    /**
     * 判断gson对象是否存在了,不存在则创建对象
     */
    static {
        if (gson == null) {
            //gson = new Gson();
            // 当使用GsonBuilder方式时属性为空的时候输出来的json字符串是有键值key的,显示形式是"key":null,而直接new出来的就没有"key":null的
            gson = new GsonBuilder().setDateFormat("yyyy-MM-dd HH:mm:ss").create();
        }
        if(jsonParser == null ){
            jsonParser = new JsonParser();
        }
    }
    private GsonUtils() {}
    /**
     * json 转对象
     * @param strJson
     * @return
     */
    public static JsonObject string2Object(String strJson) {
        return jsonParser.parse(strJson).getAsJsonObject();
    }
    /**
     * 将对象转成json格式
     * @param object
     * @return String
     */
    public static String object2String(Object object) {
        String gsonString = null;
        if (gson != null) {
            gsonString = gson.toJson(object);
        }
        return gsonString;
    }
    /**
     * 将json转成特定的cls的对象
     * @param gsonString
     * @param cls
     * @return
     */
    public static <T> T stringToBean(String gsonString, Class<T> cls) {
        T t = null;
        if (gson != null) {
            //传入json对象和对象类型,将json转成对象
            t = gson.fromJson(gsonString, cls);
        }
        return t;
    }
//    public static <T> T stringToBean2(String gsonString, Class<T> cls) {
//
//        JsonParser jsonParser = new JsonParser();
//        JsonObject jsonObject = jsonParser.parse(gsonString).getAsJsonObject();
//
//        T t = null;
//        if (gson != null) {
//            //传入json对象和对象类型,将json转成对象
//            t = gson.fromJson(jsonObject, cls);
//        }
//        return t;
//    }
    /**
     * json字符串转成list
     * @param gsonString
     * @param cls
     * @return
     */
    public static <T> List<T> stringToList(String gsonString, Class<T> cls) {
        List<T> list = null;
        if (gson != null) {
            //根据泛型返回解析指定的类型,TypeToken<List<T>>{}.getType()获取返回类型
            list = gson.fromJson(gsonString, new TypeToken<List<T>>() {
            }.getType());
        }
        return list;
    }
    public static <T> List<T> stringToListAnother(String gsonString, Class<T> cls) {
        List<T> list = new ArrayList<>();
        JsonArray jsonArray = new JsonParser().parse(gsonString).getAsJsonArray();
        Gson gson = new Gson();
        for (JsonElement jsonElement : jsonArray) {
            list.add(gson.fromJson(jsonElement,cls));
        }
        return list;
    }
    /**
     * json字符串转成list中有map的
     * @param gsonString
     * @return
     */
    public static <T> List<Map<String, T>> stringToListMaps(String gsonString) {
        List<Map<String, T>> list = null;
        if (gson != null) {
            list = gson.fromJson(gsonString,
                    new TypeToken<List<Map<String, T>>>() {
                    }.getType());
        }
        return list;
    }
    /**
     * json字符串转成map的
     * @param gsonString
     * @return
     */
    public static <T> Map<String, T> stringToMaps(String gsonString) {
        Map<String, T> map = null;
        if (gson != null) {
            map = gson.fromJson(gsonString, new TypeToken<Map<String, T>>() {
            }.getType());
        }
        return map;
    }
    public static String jsonElementAsString(JsonElement jsonElement) {
        return jsonElement == null ? null : jsonElement.getAsString();
    }
    public static Integer jsonElementAsInt(JsonElement jsonElement) {
        return jsonElement == null ? null : jsonElement.getAsInt();
    }
}

消费者

/**
     * 监听队列,并且处理消息
     * @param payload
     * @param message
     */
    @RabbitListener(queues = {RabbitMQSMSConfig.SMS_QUEUE})
    public void watchQueue(String payload, Message message) throws Exception {
        log.info("payload = " + payload);
        String routingKey = message.getMessageProperties().getReceivedRoutingKey();
        log.info("routingKey = " + routingKey);
        String msg = payload;
        log.info("msg = " + msg);
        if (routingKey.equalsIgnoreCase(RabbitMQSMSConfig.ROUTING_KEY_SMS_SEND_LOGIN)) {
            // 此处为短信发送的消息消费处理
            SMSContentQO contentQO = GsonUtils.stringToBean(msg, SMSContentQO.class);
//            smsUtils.sendSMS(contentQO.getMobile(), contentQO.getContent());
        }
    }

生产端消息可靠性机制

setConfirmCallback交换机确认接收到消息机制

配置文件:

rabbitmq:
    host: 192.168.233.128
    port: 5672
    virtual-host: /
    username: guest
    password: guest
    publisher-confirm-type: correlated

生产者实现

rabbitTemplate.convertAndSend(
                "sms_exchange",
                "imooc.sms.send.login",
                GsonUtils.object2String(contentQO),
                new CorrelationData(UUID.randomUUID().toString())
        );
        // 定义confirm回调
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            /**
             * 回调函数
             * @param correlationData 相关性数据
             * @param ack 交换机是否成功接收到消息,true:成功
             * @param cause 失败的原因
             */
            @Override
            public void confirm(CorrelationData correlationData,
                                boolean ack,
                                String cause) {
                log.info("进入confirm");
                log.info("correlationData:{}", correlationData.getId());
                if (ack) {
                    log.info("交换机成功接收到消息~~ {}", cause);
                } else {
                    log.info("交换机接收消息失败~~失败原因: {}", cause);
                }
            }
        });

setReturnsCallback队列未接受到消息回退机制

配置文件

rabbitmq:
    host: 192.168.233.128
    port: 5672
    virtual-host: /
    username: guest
    password: guest
    publisher-confirm-type: correlated
    publisher-returns: true

生产者实现

rabbitTemplate.convertAndSend(
                "sms_exchange",
                "123imooc.sms.send.login",
                GsonUtils.object2String(contentQO),
                new CorrelationData(UUID.randomUUID().toString())
        );
        // 定义路由匹配不到时候 return回调
        rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
            @Override
            public void returnedMessage(ReturnedMessage returned) {
                log.info("进入return");
                log.info(returned.toString());
            }
        });

这里把路由故意写为"123imooc.sms.send.login" 这匹配不到RabbitMQSMSConfig类中定义的路由,所以从交换机发布到对应的队列中

所以就会进入到setReturnsCallback方法中

消费端消息可靠性

ACK确认机制

none 默认自动确认
  manual 手动确认(业务处理完成后手动确认,最常用)
  auto 异常类型确认

配置文件

rabbitmq:
    host: 192.168.233.128
    port: 5672
    virtual-host: /
    username: guest
    password: guest
    listener:
      simple:
        acknowledge-mode: manual  # 手动ack确认

消费者实现

@RabbitListener(queues = {RabbitMQSMSConfig.SMS_QUEUE})
    public void watchQueue(Message message, Channel channel) throws Exception {
        try {
            String routingKey = message.getMessageProperties().getReceivedRoutingKey();
            log.info("routingKey = " + routingKey);
            int a = 1/0; // 模拟异常
            String msg = new String(message.getBody());
            log.info("msg = " + msg);
            /**
             * long deliveryTag: 消息投递的标签
             * boolean multiple: 批量确认所有消费者获得的消息
             */
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),
                            true);
        } catch (Exception e) {
            e.printStackTrace();
            /**
             * long deliveryTag
             * boolean multiple
             * boolean requeue: true:重回队列 ;false:丢弃消息
             */
            channel.basicNack(message.getMessageProperties().getDeliveryTag(),
                    true,
                    false);
        }
    }

消费端消息拉取限流

配置文件

rabbitmq:
    host: 192.168.233.128
    port: 5672
    virtual-host: /
    username: guest
    password: guest
    listener:
      simple:
        acknowledge-mode: manual  # 手动ack确认
        prefetch: 2  # 每次每个消费者从mq中拉去的消息的数量,直到手动ack确认之后,才会拉取下一个消息

模拟场景:

生产者连续发10条消息

for (int i = 0 ; i < 10 ; i ++) {
            rabbitTemplate.convertAndSend(RabbitMQSMSConfig.SMS_EXCHANGE,
                    RabbitMQSMSConfig.ROUTING_KEY_SMS_SEND_LOGIN,
                    GsonUtils.object2String(contentQO),
                    new CorrelationData(UUID.randomUUID().toString()));
        }

消费者在手动ack的地方断点

查看RabbitMQ管理平台,每次确实只从队列中拉取2条消息

设置消息过期失效(移除消息)

在RabbitMQSMSConfig配置类中

// 创建队列
    @Bean(SMS_QUEUE)
    public Queue queue() {
        return QueueBuilder
                .durable(SMS_QUEUE)
                .withArgument("x-message-ttl", 30*1000)
                .build();
    }

.withArgument(“x-message-ttl”, 30*1000) 表示30秒后,消息自动失效

注意:只能对新的队列生效,对老的队列需要先在RabbitMQ管理平台中删除老队列

测试方法

可以先将消费者注释掉,或者不启动消费者

生产者生产的消息,过一段时间在RabbitMQ管理平台看看是否自动消失

死信队列

以下3中消息都会进入死信队列

1、 ttl超时被丢弃的消息

2、超过队列长度被丢弃的消息

3、手动nack或者reject,并且requeue为false的消息

死信队列配置类

package com.imooc.api.mq;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
 * RabbitMQ 的配置类(用于死信队列的配置)
 */
@Configuration
public class RabbitMQSMSConfig_Dead {
    // 定义交换机的名称
    public static final String SMS_EXCHANGE_DEAD = "sms_exchange_dead";
    // 定义队列的名称
    public static final String SMS_QUEUE_DEAD = "sms_queue_dead";
    // 统一定义路由key
    public static final String ROUTING_KEY_SMS_DEAD = "dead.sms.display";
    // 创建交换机
    @Bean(SMS_EXCHANGE_DEAD)
    public Exchange exchange() {
        return ExchangeBuilder
                    .topicExchange(SMS_EXCHANGE_DEAD)
                    .durable(true)
                    .build();
    }
    // 创建队列
    @Bean(SMS_QUEUE_DEAD)
    public Queue queue() {
        return QueueBuilder
                .durable(SMS_QUEUE_DEAD)
                .build();
    }
    // 创建绑定关系
    @Bean
    public Binding smsDeadBinding(@Qualifier(SMS_EXCHANGE_DEAD) Exchange exchange,
                              @Qualifier(SMS_QUEUE_DEAD) Queue queue) {
        return BindingBuilder
                .bind(queue)
                .to(exchange)
                .with("dead.sms.*")
                .noargs();
    }
}

正常队列配置类中添加死信队列配置

package com.imooc.api.mq;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
 * RabbitMQ 的配置类
 */
@Configuration
public class RabbitMQSMSConfig {
    // 定义交换机的名称
    public static final String SMS_EXCHANGE = "sms_exchange";
    // 定义队列的名称
    public static final String SMS_QUEUE = "sms_queue";
    // 统一定义路由key
    public static final String ROUTING_KEY_SMS_SEND_LOGIN = "imooc.sms.send.login";
    // 创建交换机
    @Bean(SMS_EXCHANGE)
    public Exchange exchange() {
        return ExchangeBuilder
                    .topicExchange(SMS_EXCHANGE)
                    .durable(true)
                    .build();
    }
    // 创建队列
    @Bean(SMS_QUEUE)
    public Queue queue() {
        return QueueBuilder
                .durable(SMS_QUEUE)
                .withArgument("x-message-ttl", 30*1000)  // 队列中消息过期时间,过期进入死信队列
                .withArgument("x-dead-letter-exchange",
                        RabbitMQSMSConfig_Dead.SMS_EXCHANGE_DEAD)    // 死信队列交换机
                .withArgument("x-dead-letter-routing-key",
                        RabbitMQSMSConfig_Dead.ROUTING_KEY_SMS_DEAD)  // 死信队列路由
                .withArgument("x-max-length", 6)    // 队列消息个数最大值,超出的进入死信队列
                .build();
    }
    // 创建绑定关系
    @Bean
    public Binding smsBinding(@Qualifier(SMS_EXCHANGE) Exchange exchange,
                              @Qualifier(SMS_QUEUE) Queue queue) {
        return BindingBuilder
                .bind(queue)
                .to(exchange)
                .with("imooc.sms.#")
                .noargs();
    }
}

死信队列消费者

package com.imooc.mq;
import com.imooc.api.mq.RabbitMQSMSConfig_Dead;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
 * 死信队列监听消费者
 */
@Slf4j
@Component
public class RabbitMQSMSConsumer_Dead {
    /**
     *
     * @param message
     * @param channel
     * @throws Exception
     */
    @RabbitListener(queues = {RabbitMQSMSConfig_Dead.SMS_QUEUE_DEAD})
    public void watchQueue(Message message, Channel channel) throws Exception {
        log.info("++++++++++++++++++++++++++++++");
        String routingKey = message.getMessageProperties().getReceivedRoutingKey();
        log.info("routingKey = " + routingKey);
        String msg = new String(message.getBody());
        log.info("msg = " + msg);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),
                        true);
        log.info("++++++++++++++++++++++++++++++");
    }
}

测试方法:正常的消费者关闭,设置队列长度为6,然后发10条消息,会发现,首先有4条超过队列长度的消息,进入死信队列消费者,然后,到了消息过期时间后,又有6条消息进入死信队列中被死信消费者消费

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
传感器 消息中间件 运维
Mqtt开发笔记:Mqtt服务器搭建
Mqtt开发笔记:Mqtt服务器搭建
Mqtt开发笔记:Mqtt服务器搭建
|
4月前
|
消息中间件 JSON Java
|
8月前
|
消息中间件 Java API
【微服务系列笔记】MQ消息可靠性
消息可靠性涉及防止丢失,包括生产者发送时丢失、未到达队列以及消费者消费失败处理后丢失。 确保RabbitMQ消息可靠性的方法有:开启生产者确认机制,确保消息到达队列;启用消息持久化以防止未消费时丢失;使用消费者确认机制,如设置为auto,由Spring确认处理成功后ack。此外,可开启消费者失败重试机制,多次失败后将消息投递到异常交换机。
138 1
|
传感器 消息中间件 运维
统信UOS系统开发笔记(八):在统信UOS上编译搭建mqtt基础环境(版本使用QMQTT::Clinet)
统信uos使用到mqtt开发,需要重新编译mqtt,本篇描述统信uos20上的mqtt源码编译和环境搭建。这里mqtt版本是使用QMQTT::CLIENT来操作的,这里笔者已知Qt的版本分为两个:一个是QMQTT::QMQTTCLIENT,一个是QTMQTT:CLIENT,对应不同的类和方式,请根据需求选择)
|
消息中间件 存储 监控
RabbitMQ进阶学习复习笔记
RabbitMQ进阶学习复习笔记
267 0
RabbitMQ进阶学习复习笔记
|
消息中间件 编解码 监控
ACP互联网架构认证笔记 MQ消息队列服务
MQ是消息服务中间件,基于高可用分布式集群技术,是消费模式基于发布订阅模式的消息系统。
280 0
|
消息中间件 Kafka
kafka和rabbitMQ简单对比笔记
rabbitMQ对于可靠性的设计更好,有ack消息确认机制,当ack消息确认后,该消息从队列删除,不管是真正的磁盘删除还是修改消息状态,都要涉及到文件的寻址,所以会稍慢
131 0
|
消息中间件 传感器 网络协议
Mqtt开发笔记:windows下C++ ActiveMQ客户端介绍、编译和使用
项目需求,需要使用到mqtt协议,之前编译QtMqtt库,不支持队列模式queue(点对点),只支持订阅/发布者模式.,所以使用C++ ActiveMQ实现。
Mqtt开发笔记:windows下C++ ActiveMQ客户端介绍、编译和使用
|
物联网 开发者 Python
【开发者笔记】MQTT python测试笔记
MQTT是基于订阅/发布的物联网协议。 python测试需要一个发送进程和接收进程,即一个发送客户端和一个接收客户端,如果这两个客户端工作在同一个topic下,那么就能进行消息互通了。 服务器用“iot.eclipse.org”就好了,避免了自己搭建服务器,然后流程还可以跑通。
3380 0