高性能消息中间件 RabbitMQ(三)

简介: 高性能消息中间件 RabbitMQ(三)

3.3 发布订阅模式

在开发过程中,有一些消息需要不同消费者进行不同的处理,如电商网站的同一条促销信息需要短信发送、邮件发送、站内信发送等。此时可以使用发布订阅模式(Publish/Subscribe)

特点

  1. 生产者将消息发送给交换机,交换机将消息转发到绑定此交换机的每个队列中。
  2. 工作队列模式的交换机只能将消息发送给一个队列,发布订阅模式的交换机能将消息发送给多个队列。发布订阅模式使用fanout交换机。

1、编写生产者

package com.zj.mq.publish;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/*生产者*/
public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.66.100");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("MQzhang");
        connectionFactory.setPassword("MQzhang");
        connectionFactory.setVirtualHost("/");
        //2.创建连接
        Connection connection = connectionFactory.newConnection();
        //3.建立信道
        Channel channel = connection.createChannel();
        //4.创建交换机fanout
        /*
        * 参数一:交换机名称
        * 参数二:交换机类型
        * 参数三: 交换机是否持久化(关闭控制台是否还存在)*/
        channel.exchangeDeclare("exchangeFanout", BuiltinExchangeType.FANOUT,false);
        //5.创建三个队列(分别模拟邮件发送、短信发送、站内信发送)
        channel.queueDeclare("mailQueue", false,false,false,null);
        channel.queueDeclare("messageQueue", false,false,false,null);
        channel.queueDeclare("stationQueue", false,false,false,null);
        //6.交换机绑定队列
        /*
        * 参数一:队列名称
        * 参数二:交换机名称
        * 参数三:路由关键字,发布订阅模式不存在路由关键字*/
        channel.queueBind("mailQueue","exchangeFanout","");
        channel.queueBind("messageQueue","exchangeFanout","");
        channel.queueBind("stationQueue","exchangeFanout","");
        //7.往交换机发送消息
        for (int i = 0; i < 10; i++) {
            channel.basicPublish("exchangeFanout","",null,("你好,MQ"+i).getBytes());
        }
        //8.关闭资源
        channel.close();
        connection.close();
    }
}

2、站内信消费者(其他同理)

package com.zj.mq.publish;
import com.rabbitmq.client.*;
import com.sun.deploy.ui.AboutDialog;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/*站内信消费者*/
public class ConsumerStation {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory connectionaFactory = new ConnectionFactory();
        connectionaFactory.setHost("192.168.66.100");
        connectionaFactory.setPort(5672);
        connectionaFactory.setUsername("MQzhang");
        connectionaFactory.setPassword("MQzhang");
        connectionaFactory.setVirtualHost("/");
        //2.创建连接
        Connection connection = connectionaFactory.newConnection();
        //3.创建信道
        Channel channel = connection.createChannel();
        //4.监听队列(一直在连接不会关闭连接)
        /*
         * 参数一:监听的队列名
         * 参数二:是否自动签收(消费完消息后自动告诉MQ消息消费完了),如果设置为false需要手动确认消息,否则MQ会一直发送消息。
         * 参数三:Consumer的实现类,重写该类方法表示接受到消息后如何消费,body就是消息的字节数组。
         * */
        channel.basicConsume("stationQueue",true,new DefaultConsumer(channel){
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("发送站内信:"+message);
            }
        });
    }
}
发送站内信:你好,MQ0
发送站内信:你好,MQ1
发送站内信:你好,MQ2
发送站内信:你好,MQ3
发送站内信:你好,MQ4
发送站内信:你好,MQ5
发送站内信:你好,MQ6
发送站内信:你好,MQ7
发送站内信:你好,MQ8
发送站内信:你好,MQ9

当然也能创建多个消费者来监听同一个队列来提高消费速度。

3.4 路由模式

使用发布订阅模式时,所有消息都会发送到绑定的队列中(发送到绑定到交换机上的每个队列,队列再发送给消费者),但很多时候,不是所有消息都无差别的发布到所有队列中。比如电商网站的促销活动,双十一大促可能会发布到所有队列;而一些小的促销活动为了节约成本,只发布到站内信队列。此时需要使用路由模式(Routing)完成这一需求。

特点

  1. 每个队列绑定路由关键字RoutingKey
  2. 生产者将带有RoutingKey的消息发送给交换机,交换机根据RoutingKey转发到指定队列。路由模式使用direct交换机。

编写生产者

package com.zj.mq.route;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/*生产者*/
public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.66.100");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("MQzhang");
        connectionFactory.setPassword("MQzhang");
        connectionFactory.setVirtualHost("/");
        //2.创建连接
        Connection connection = connectionFactory.newConnection();
        //3.建立信道
        Channel channel = connection.createChannel();
        //4.创建交换机fanout
        /*
        * 参数一:交换机名称
        * 参数二:交换机类型
        * 参数三: 交换机是否持久化(关闭控制台是否还存在)*/
        channel.exchangeDeclare("exchangeRoute", BuiltinExchangeType.DIRECT,false);
        //5.创建三个队列(分别模拟邮件发送、短信发送、站内信发送)
        /* 参数1:队列名
         * 参数2:是否持久化,true表示MQ重启后队列还在。
         * 参数3:是否私有化,false表示所有消费者都可以访问,true表示只有第一次拥有它的消费者才能访问
         * 参数4:是否自动删除,true表示不再使用队列时自动删除队列
         * 参数5:其他额外参数
         * */
        channel.queueDeclare("mailQueue", false,false,false,null);
        channel.queueDeclare("messageQueue", false,false,false,null);
        channel.queueDeclare("stationQueue", false,false,false,null);
        //6.交换机绑定队列
        /*
        * 参数一:队列名称
        * 参数二:交换机名称
        * 参数三:路由关键字,一个队列可以有多个路由关键字
        * */
        channel.queueBind("mailQueue","exchangeRoute","import");
        channel.queueBind("messageQueue","exchangeRoute","normal");
        channel.queueBind("stationQueue","exchangeRoute","import");
        //7.往交换机发送消息,路由关键字是import,表示交换机会将消息发送到带有import关键字的队列。
        channel.basicPublish("exchangeRoute","import",null,("你好,import MQ").getBytes());
        //路由关键字是normal表示交换机会将消息发送到带有normal关键字的队列
        channel.basicPublish("exchangeRoute","normal",null,("你好,normal MQ").getBytes());
        //8.关闭资源
        channel.close();
        connection.close();
    }
}

编写消费者

消费者还是和其他模式的消费者是一样的。这里以mailQuene举例子。

package com.zj.mq.route;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/*消费者*/
public class Consumer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.66.100");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("MQzhang");
        connectionFactory.setPassword("MQzhang");
        connectionFactory.setVirtualHost("/");
        //2.创建连接
        Connection connection = connectionFactory.newConnection();
        //3.创建信道
        Channel channel = connection.createChannel();
        //4.监听队列(一直在连接不会关闭连接)
        /*
        * 参数一:监听的队列名
        * 参数二:是否自动签收(消费完消息后自动告诉MQ消息消费完了),如果设置为false需要手动确认消息,否则MQ会一直发送消息。
        * 参数三:Consumer的实现类,重写该类方法表示接受到消息后如何消费,body就是消息的字节数组。
        *
        */
        channel.basicConsume("mailQueue",true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("接受消息为:"+message);
            }
        });
    }
}

3.5 通配符模式

通配符模式(Topic)是在路由模式的基础上,给队列绑定带通配符的路由关键字,只要消息的RoutingKey能实现通配符匹配,就会将消息转发到该队列。通配符模式比路由模式更灵活,使用topic交换机。

通配符规则:

  1. 消息设置RoutingKey时,RoutingKey由多个单词构成,中间以.分割。
  2. 队列设置RoutingKey时,#可以匹配任意多个单词,*可以匹配任意一个单词。

编写生产者

package com.zj.mq.topic;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/*生产者*/
public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.66.100");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("MQzhang");
        connectionFactory.setPassword("MQzhang");
        connectionFactory.setVirtualHost("/");
        //2.创建连接
        Connection connection = connectionFactory.newConnection();
        //3.建立信道
        Channel channel = connection.createChannel();
        //4.创建交换机fanout
        /*
        * 参数一:交换机名称
        * 参数二:交换机类型
        * 参数三: 交换机是否持久化(关闭控制台是否还存在)*/
        channel.exchangeDeclare("exchangeTopic", BuiltinExchangeType.TOPIC,false);
        //5.创建三个队列(分别模拟邮件发送、短信发送、站内信发送)
        channel.queueDeclare("mailQueue", false,false,false,null);
        channel.queueDeclare("messageQueue", false,false,false,null);
        channel.queueDeclare("stationQueue", false,false,false,null);
        //6.交换机绑定队列
        /*
        * 参数一:队列名称
        * 参数二:交换机名称
        * 参数三:路由关键字,【#.mail.#】 表示:mail前后可以匹配多个单词*/
        channel.queueBind("mailQueue","exchangeTopic","#.mail.#");
        channel.queueBind("messageQueue","exchangeTopic","#.message.#");
        channel.queueBind("stationQueue","exchangeTopic","#.station.#");
        //7.往交换机发送消息到三个队列
        channel.basicPublish("exchangeTopic","mail.message.station",null,("你好,MQ").getBytes());
        //8.关闭资源
        channel.close();
        connection.close();
    }
}

编写消费者

也是和其他模式的消费者是一样的只需要监听消费者。

package com.zj.mq.topic;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/*消费者*/
public class Consumer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.66.100");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("MQzhang");
        connectionFactory.setPassword("MQzhang");
        connectionFactory.setVirtualHost("/");
        //2.创建连接
        Connection connection = connectionFactory.newConnection();
        //3.创建信道
        Channel channel = connection.createChannel();
        //4.监听队列(一直在连接不会关闭连接)
        /*
        * 参数一:监听的队列名
        * 参数二:是否自动签收(消费完消息后自动告诉MQ消息消费完了),如果设置为false需要手动确认消息,否则MQ会一直发送消息。
        * 参数三:Consumer的实现类,重写该类方法表示接受到消息后如何消费,body就是消息的字节数组。
        *
        */
        channel.basicConsume("mailQueue",true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("接受消息为:"+message);
            }
        });
    }
}

四、SpringBoot整合RabbitMQ

之前我们使用原生JAVA操作RabbitMQ较为繁琐,接下来我们使用SpringBoot整合RabbitMQ,简化代码编写。

1.创建SpringBoot项目,引入RabbitMQ起步依赖(springboot版本是2.7.0)

<!-- RabbitMQ起步依赖 -->
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2.编写配置文件

spring:
  rabbitmq:
    host: 192.168.66.100
    port: 5672
    username: MQzhang
    password: MQzhang
    virtual-host: /
#日志格式
logging:
  pattern:
    console: '%d{HH:mm:ss.SSS} %clr(%-5level) ---  [%-15thread] %cyan(%-50logger{50}):%msg%n'

3.创建队列和交换机

SpringBoot整合RabbitMQ时,需要在配置类创建队列和交换机,写法如下:

package com.zj.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitConfig {
    private final String EXCHANGE_NAME = "boot_topic_exchange";
    private final String QUEUE_NAME = "boot_queue";
    // 创建交换机
    @Bean(EXCHANGE_NAME)
    public Exchange getExchange() {
        return ExchangeBuilder
                .topicExchange(EXCHANGE_NAME) // 交换机类型和名称
                .durable(true) // 是否持久化
                .build();
    }
    // 创建队列
    @Bean(QUEUE_NAME)
    public Queue getMessageQueue() {
        return new Queue(QUEUE_NAME); // 队列名
    }
    // 交换机绑定队列
    @Bean
    public Binding bindMessageQueue(@Qualifier(EXCHANGE_NAME) Exchange exchange,
                                    @Qualifier(QUEUE_NAME) Queue queue) {
        return BindingBuilder
                .bind(queue)
                .to(exchange)
                .with("#.message.#")
                .noargs();
    }
}


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
23天前
|
消息中间件 存储 Java
RocketMQ(一):消息中间件缘起,一览整体架构及核心组件
【10月更文挑战第15天】本文介绍了消息中间件的基本概念和特点,重点解析了RocketMQ的整体架构和核心组件。消息中间件如RocketMQ、RabbitMQ、Kafka等,具备异步通信、持久化、削峰填谷、系统解耦等特点,适用于分布式系统。RocketMQ的架构包括NameServer、Broker、Producer、Consumer等组件,通过这些组件实现消息的生产、存储和消费。文章还提供了Spring Boot快速上手RocketMQ的示例代码,帮助读者快速入门。
|
2月前
|
消息中间件 存储 RocketMQ
消息中间件-RocketMQ技术(二)
消息中间件-RocketMQ技术(二)
|
2月前
|
消息中间件 存储 中间件
消息中间件-RocketMQ技术(一)
消息中间件-RocketMQ技术(一)
|
1月前
|
消息中间件 编解码 Docker
【Docker项目实战】Docker部署RabbitMQ消息中间件
【10月更文挑战第8天】Docker部署RabbitMQ消息中间件
81 1
【Docker项目实战】Docker部署RabbitMQ消息中间件
|
16天前
|
消息中间件 存储 Java
吃透 RocketMQ 消息中间件,看这篇就够了!
本文详细介绍 RocketMQ 的五大要点、核心特性及应用场景,涵盖高并发业务场景下的消息中间件关键知识点。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
吃透 RocketMQ 消息中间件,看这篇就够了!
|
21天前
|
消息中间件 JSON Java
开发者如何使用轻量消息队列MNS
【10月更文挑战第19天】开发者如何使用轻量消息队列MNS
62 8
|
16天前
|
消息中间件 存储 Kafka
MQ 消息队列核心原理,12 条最全面总结!
本文总结了消息队列的12个核心原理,涵盖消息顺序性、ACK机制、持久化及高可用性等内容。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
|
30天前
|
消息中间件 安全 Java
云消息队列RabbitMQ实践解决方案评测
一文带你详细了解云消息队列RabbitMQ实践的解决方案优与劣
63 10
|
20天前
|
消息中间件
解决方案 | 云消息队列RabbitMQ实践获奖名单公布!
云消息队列RabbitMQ实践获奖名单公布!
|
27天前
|
消息中间件 存储 弹性计算
云消息队列RabbitMQ实践
云消息队列RabbitMQ实践