RabbitMQ 是一个可靠且成熟的消息传递和流代理,易于部署在云环境、本地和本地计算机上。它目前被全球数百万人使用。通过确认消息传递和跨集群复制消息的功能,您可以使用 RabbitMQ 确保您的消息是安全的。
RabbitMQ安装
推荐使用docker安装部署,及其方便。
1搜索镜像
docker search rabbitmq
2拉取镜像
docker pull rabbitmq
此为安装最新版,如果需要安装其他版本在rabbitmq后面跟上版本号即可 docker pull rabbitmq:3.7.7-management
说明
docker pull rabbitmq:版本号 -management
3搭建并运行容器
docker run -d --hostname rabbitmq --name rabbitmq -p 15672:15672 -p 5673:5672 rabbitmq
-d 后台运行容器;
--name 指定容器名;
-p 指定服务运行的端口(5672:应用访问端口;15672:控制台Web端口号);
-v 映射目录或文件;
--hostname 主机名(RabbitMQ的一个重要注意事项是它根据所谓的 “节点名称” 存储数据,默认为主机名);
-e 指定环境变量;(RABBITMQ_DEFAULT_VHOST:默认虚拟机名;RABBITMQ_DEFAULT_USER:默认的用户名;
RABBITMQ_DEFAULT_PASS:默认用户名的密码)
4查看正在运行的容器
docker ps
5进入容器内部
docker exec -it 容器id或者容器名 /bin/bssh
6运行容器
rabbitmq-plugins enable rabbitmq_management
7浏览器运行
http://ip:15672/
账户密码默认guest
现在rabbitmq已经安装完成
具体使用
安装依赖(注意,下面示例基于springboot工程的支付下单案例实现)
<!--消息队列依赖--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
消息发送配置
@Slf4j @Configuration public class PayNotifyConfig implements ApplicationContextAware { //交换机 public static final String PAYNOTIFY_EXCHANGE_FANOUT = "paynotify_exchange_fanout"; //支付结果通知消息类型 public static final String MESSAGE_TYPE = "payresult_notify"; //支付通知队列 public static final String PAYNOTIFY_QUEUE = "paynotify_queue"; //声明交换机,且持久化 @Bean(PAYNOTIFY_EXCHANGE_FANOUT) public FanoutExchange paynotify_exchange_fanout() { // 三个参数:交换机名称、是否持久化、当没有queue与其绑定时是否自动删除 return new FanoutExchange(PAYNOTIFY_EXCHANGE_FANOUT, true, false); } //支付通知队列,且持久化 @Bean(PAYNOTIFY_QUEUE) public Queue course_publish_queue() { return QueueBuilder.durable(PAYNOTIFY_QUEUE).build(); } //交换机和支付通知队列绑定 @Bean public Binding binding_course_publish_queue(@Qualifier(PAYNOTIFY_QUEUE) Queue queue, @Qualifier(PAYNOTIFY_EXCHANGE_FANOUT) FanoutExchange exchange) { return BindingBuilder.bind(queue).to(exchange); } @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { // 获取RabbitTemplate RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class); // 设置ReturnCallback rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> { // 投递失败,记录日志 log.info("消息发送失败,应答码{},原因{},交换机{},路由键{},消息{}", replyCode, replyText, exchange, routingKey, message.toString()); }); } }
mq监听配置,监听订单支付成功
@Slf4j @Service public class ReceivePayNotifyService { @Autowired private RabbitTemplate rabbitTemplate; @Resource private IOrderService orderService; //监听消息队列接收支付结果通知 @RabbitListener(queues = PayNotifyConfig.PAYNOTIFY_QUEUE) public void receive(Message message, Channel channel) { // try { // Thread.sleep(5000); // } catch (InterruptedException e) { // throw new RuntimeException(e); // } String orderId = new String(message.getBody(), StandardCharsets.UTF_8); try { log.info("支付完成,发货并记录,开始。订单:{}", orderId); orderService.deliverGoods(orderId); } catch (Exception e) { log.error("支付完成,发货并记录,失败。订单:{}", orderId, e); } } }1. @Slf4
消息发送
rabbitTemplate.convertAndSend(PayNotifyConfig.PAYNOTIFY_EXCHANGE_FANOUT,"", tradeNo);