页面发布:
业务流程:
1、管理员进入管理界面点击“页面发布”,前端请求cms页面发布接口
2、cms页面发布接口执行页面静态化,并将静态化页面(html文件)存储至GridFS中
3、静态化成功后,向消息队列发送页面发布的消息
页面发布的最终目标是将页面发布到服务器
通过消息队列将页面发布的消息发送给各个服务器
4、消息队列负责将消息发送给各个服务器上部署的Cms Client(Cms客户端)
在服务器上部署Cms Client(Cms客户端),客户端接收消息队列的通知
5、每个接收到页面发布消息的Cms Client从GridFS获取Html页面文件,并将Html文件存储在本地服务器
CmsClient根据页面发布的消息内容请求GridFS获取页面文件,存储在本地服务器
要实现上边页面发布的功能,有一个重要的环节就是由消息队列将页面发布的消息通知给各个服务器,下面讲解下消息队列的具体用法
RabbitMQ
MQ全称为Message Queue,即消息队列, RabbitMQ是由erlang语言开发,基于AMQP(Advanced Message Queue 高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列在分布式系统开发中应用非常广泛。RabbitMQ官方地址:http://www.rabbitmq.com/
应用场景:
1、任务异步处理
将不需要同步处理的并且耗时长的操作由消息队列通知消息接收方进行异步处理,提高了应用程序的响应时间
2、应用程序解耦合
MQ相当于一个中介,生产方通过MQ与消费方交互,它将应用程序进行解耦合
市场上还有哪些消息队列?
ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ、Redis
为什么使用RabbitMQ呢?
1、使用简单,功能强大
2、基于AMQP协议
3、社区活跃,文档完善
4、高并发性能好,这主要得益于Erlang语言
5、SpringBoot默认已集成RabbitMQ
AMQP:
AMQP是一套公开的消息队列协议,它旨在从协议层定义消息通信数据的标准格式,为的就是解决MQ市场上协议不统一的问题,RabbitMQ就是遵循AMQP标准协议开发的MQ服务
JMS:
JMS是java提供的一套消息服务API标准,其目的是为所有的java应用程序提供统一的消息通信的标准,类似java的jdbc,只要遵循jms标准的应用程序之间都可以进行消息通信,JMS和AMQP不同点在于JMS是Java语言专属的消息服务标准,它是在api层定义标准,并且只能用于Java应用,而AMQP是在协议层定义的标准,是跨语言的
快速入门
工作原理:
组成部分说明:
Broker:消息队列服务进程,此进程包括两个部分:Exchange和Queue
Exchange:消息队列交换机,按一定的规则将消息路由转发到某个队列,对消息进行过滤
Queue:消息队列,存储消息的队列,消息到达队列并转发给指定的消费方
Producer:消息生产者,即生产方客户端,生产方客户端将消息发送到MQ
Consumer:消息消费者,即消费方客户端,接收MQ转发的消息
消息发布接收流程:
-----发送消息-----
1、生产者和Broker建立TCP连接
2、生产者和Broker建立通道
3、生产者通过通道消息发送给Broker,由Exchange将消息进行转发
4、Exchange将消息转发到指定的Queue(队列)
----接收消息-----
1、消费者和Broker建立TCP连接
2、消费者和Broker建立通道
3、消费者监听指定的Queue(队列)
4、当有消息到达Queue时Broker默认将消息推送给消费者
5、消费者接收到消息
RabbitMQ由Erlang语言开发,Erlang语言用于并发及分布式系统的开发,安装RabbitMQ需 要安装Erlang/OTP,并保持版本匹配,如下图: RabbitMQ的下载地址:http://www.rabbitmq.com/download.html
本项目使用Erlang/OTP 20.3版本和RabbitMQ3.7.3版本。
1、下载erlang 地址如下: http://erlang.org/download/otp_win64_20.3.exe 右键 otp_win64_20.3.exe,以管理员方式运行此文件,安装。 erlang安装完成需要配置erlang环境变量: ERLANG_HOME=D:\Program Files\erl9.3 在path中添 加%ERLANG_HOME%\bin;
2、安装RabbitMQ https://github.com/rabbitmq/rabbitmq-server/releases/tag/v3.7.3 右键 rabbitmq-server-3.7.3.exe,以管理员方式运行此文件,安装。
安装成功后会自动创建RabbitMQ服务并且启动。
启动rabbitmq:管理员身份进入D:\Soft\RabbitMQ Server\rabbitmq_server-3.7.7\sbin
安装管理插件:rabbitmq-plugins.bat enable rabbitmq_management
rabbitmq-service.bat start 启动服务
启动成功 登录RabbitMQ 进入浏览器,输入:http://localhost:15672
初始账号和密码:guest/guest
搭建环境:
1、创建maven工程
创建生产者工程和消费者工程,分别加入RabbitMQ java client的依赖。
test-rabbitmq-producer:生产者工程
test-rabbitmq-consumer:消费者工程
pom中添加依赖:
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp‐client</artifactId> <version>4.0.3</version><!‐‐此版本与spring boot 1.5.9版本匹配‐‐> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring‐boot‐starter‐logging</artifactId> </dependency>
2、生产者代码如下:
package com.xuecheng.test.rabbitmq; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @Author Barrett * @Date 2020/5/10 * @Description 生产者 */ public class Producer01 { //队列 private static final String QUEUE = "helloworld"; public static void main(String[] args) { //通过连接工厂创建新的连接和mq建立连接 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory.setPort(5672);//端口 connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); //设置虚拟机,一个mq服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mq connectionFactory.setVirtualHost("/"); Connection connection = null; Channel channel = null; try { //建立新连接 connection = connectionFactory.newConnection(); //创建会话通道,生产者和mq服务所有通信都在channel通道中完成 channel = connection.createChannel(); //声明队列,如果队列在mq 中没有则要创建 //参数:String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments /** * 参数明细 * 1、queue 队列名称 * 2、durable 是否持久化,如果持久化,mq重启后队列还在 * 3、exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建 * 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除) * 5、arguments 参数,可以设置一个队列的扩展参数,比如:可设置存活时间 */ channel.queueDeclare(QUEUE, true, false, false, null); //发送消息 //参数:String exchange, String routingKey, BasicProperties props, byte[] body /** * 参数明细: * 1、exchange,交换机,如果不指定将使用mq的默认交换机(设置为"") * 2、routingKey,路由key,交换机根据路由key来将消息转发到指定的队列,如果使用默认交换机,routingKey设置为队列的名称 * 3、props,消息的属性 * 4、body,消息内容 */ //消息内容 String message = "hello world 黑马程序员"; channel.basicPublish("", QUEUE, null, message.getBytes()); System.out.println("send to mq " + message); } catch (Exception e) { e.printStackTrace(); } finally { //关闭连接 //先关闭通道 try { channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } }
3、消费者代码如下:
package com.xuecheng.test.rabbitmq; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @Author Barrett * @Date 2020/5/10 * @Description 消费者 */ public class Consumer01 { //队列 private static final String QUEUE = "helloworld"; public static void main(String[] args) throws IOException, TimeoutException { //通过连接工厂创建新的连接和mq建立连接 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory.setPort(5672);//端口 connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); //设置虚拟机,一个mq服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mq connectionFactory.setVirtualHost("/"); //建立新连接 Connection connection = connectionFactory.newConnection(); //创建会话通道,生产者和mq服务所有通信都在channel通道中完成 Channel channel = connection.createChannel(); //监听队列 //声明队列,如果队列在mq 中没有则要创建 //参数:String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments /** * 参数明细 * 1、queue 队列名称 * 2、durable 是否持久化,如果持久化,mq重启后队列还在 * 3、exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建 * 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除) * 5、arguments 参数,可以设置一个队列的扩展参数,比如:可设置存活时间 */ channel.queueDeclare(QUEUE, true, false, false, null); //实现消费方法 DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { /** * 当接收到消息后此方法将被调用 * @param consumerTag 消费者标签,用来标识消费者的,在监听队列时设置channel.basicConsume * @param envelope 信封,通过envelope * @param properties 消息属性 * @param body 消息内容 * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //交换机 String exchange = envelope.getExchange(); //消息id,mq在channel中用来标识消息的id,可用于确认消息已接收 long deliveryTag = envelope.getDeliveryTag(); //消息内容 String message = new String(body, "utf-8"); System.out.println("receive message:" + message); } }; //监听队列 //参数:String queue, boolean autoAck, Consumer callback /** * 参数明细: * 1、queue 队列名称 * 2、autoAck 自动回复,当消费者接收到消息后要告诉mq消息已接收,如果将此参数设置为tru表示会自动回复mq,如果设置为false要通过编程实现回复 * 3、callback,消费方法,当消费者接收到消息要执行的方法 */ channel.basicConsume(QUEUE, true, defaultConsumer); } }
测试:
1、先启动消费者Consumer01:
开启监听
2、启动生产者:
显示已启动发送消息
然后再查看消费者状态:
显示接收到了消息。
1、发送端操作流程
1)创建连接:`connection = connectionFactory.newConnection();` 2)创建通道:`channel = connection.createChannel();` 3)声明队列:`channel.queueDeclare(QUEUE, true, false, false, null);` 4)发送消息:`channel.basicPublish("", QUEUE, null, message.getBytes());`
2、接收端操作流程
1)创建连接:`connection = connectionFactory.newConnection();` 2)创建通道:`connection.createChannel();` 3)声明队列:`channel.queueDeclare(QUEUE, true, false, false, null);` 4)监听队列:`channel.basicConsume(QUEUE, true, defaultConsumer);` 5)接收消息:`handleDelivery` 6)ack回复
工作模式:
1、Work queues 2、Publish/Subscribe 3、Routing 4、Topics 5、Header 6、RPC
Work queues
work queues与入门程序相比,多了一个消费端,两个消费端共同消费同一个队列中的消息。
应用场景:对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度
测试:
1、使用入门程序,启动多个消费者。
2、生产者发送多个消息。
结果:复制Consumer01生成Consumer02,同时启动Consumer01和Consumer02,当Producer01第一次调用时候,Consumer01接收到信息,第二次调用时候Consumer02接收到信息。
结论:
1、一条消息只会被一个消费者接收;
2、rabbit采用轮询(依次询问)的方式将消息平均发送给消费者的;
3、消费者在处理完某条消息后,才会收到下一条消息。
Publish/Subscribe
发布订阅模式:
1、每个消费者监听自己的队列。
2、生产者将消息发给broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收到消息。
一个消息被多个消费者订阅。
案例: 用户通知,当用户充值成功或转账完成系统通知用户,通知方式有短信、邮件多种方法 。
1、生产者
声明Exchange_fanout_inform交换机。
声明两个队列并且绑定到此交换机,绑定时不需要指定routingkey
发送消息时不需要指定routingkey
编写生产者代码:Producer02_publish
2、邮件发送消费者代码:Consumer02_subscribe_email
按照上边的代码,编写邮件通知的消费代码。
3、短信发送消费者
参考上边的邮件发送消费者代码编写。
启动Consumer02_subscribe_email和Producer02_publish
测试:
Consumer02_subscribe_email和Consumer02_subscribe_sms同时启动,当调用Producer02_publish时候,email和sms同时接收到消息。
思考:
1、work queues与publish/subscribe有什么区别?
区别:
1)work queues不用定义交换机,而publish/subscribe需要定义交换机。
2)work queues的生产方是面向队列发送消息,publish/subscribe的生产方是面向交换机发送消息(底层使用默认 交换机)。
3)work queues不需要设置,实质上work queues会将队列绑定到默认的交换机,publish/subscribe需要设置队列和交换机的绑定 。
相同点:
所以两者实现的发布/订阅的效果是一样的,多个消费端监听同一个队列不会重复消费消息。
2、实际工作用work queues还是publish/subscribe?
建议使用 publish/subscribe,发布订阅模式比工作队列模式更强大,并且发布订阅模式可以指定自己专用的交换机。
3.Routing
工作模式
路由模式:
1、每个消费者监听自己的队列,并且设置routingkey。
2、生产者将消息发给交换机,由交换机根据routingkey来转发消息到指定的队列。
代码:
1、生产者
声明exchange_routing_inform交换机。
声明两个队列并且绑定到此交换机,绑定时需要指定routingkey
发送消息时需要指定routingkey
创建Producer03_routing、Consumer03_routing_email和Consumer03_routing_sms;
使用生产者发送若干条消息,交换机根据routingkey转发消息到指定的队列。
先启动Consumer03_routing_email和Consumer03_routing_sms,然后启动Producer03_routing。
思考
1、Routing模式和Publish/subscibe有啥区别?
Routing模式要求队列在绑定交换机时要指定routingkey,消息会转发到符合routingkey的队列。
4.Topics
工作模式
路由模式:
1、每个消费者监听自己的队列,并且设置带统配符的routingkey。
2、生产者将消息发给broker,由交换机根据routingkey来转发消息到指定的队列。
案例: 根据用户的通知设置去通知用户,设置接收Email的用户只接收Email,设置接收sms的用户只接收sms,设置两种通知类型的则两种通知都有效。
代码
1、生产者
声明交换机,指定topic类型,创建Producer04_topics。
2、消费者
队列绑定交换机指定通配符:
统配符规则: 中间以“.”分隔。
符号#可以匹配多个词,符号*可以匹配一个词语。
创建Consumer04_topics_email和Consumer04_topics_sms
测试:
使用生产者发送若干条消息,交换机根据routingkey统配符匹配并转发消息到指定的队列。
思考:
1、本案例的需求使用Routing工作模式能否实现?
使用Routing模式也可以实现本案例,共设置三个 routingkey,分别是email、sms、all,email队列绑定email和 all,sms队列绑定sms和all,这样就可以实现上边案例的功能,实现过程比topics复杂。
Topic模式更加强大,它可以实现Routing、publish/subscirbe模式的功能。
Header模式
header模式与routing不同的地方在于,header模式取消routingkey,使用header中的key/value(键值对)匹配队列。
案例: 根据用户的通知设置去通知用户,设置接收Email的用户只接收Email,设置接收sms的用户只接收sms,设置两种通知类型都接收的则两种通知都有效。
代码:
1、生产者
队列与交换机绑定的代码与之前不同,如下:
Map<String, Object> headers_email = new Hashtable<String, Object>(); headers_email.put("inform_type", "email"); Map<String, Object> headers_sms = new Hashtable<String, Object>(); headers_sms.put("inform_type", "sms"); channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_HEADERS_INFORM,"",headers_email); channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_HEADERS_INFORM,"",headers_sms);
通知:
String message = "email inform to user"+i; Map<String,Object> headers = new Hashtable<String, Object>(); headers.put("inform_type", "email");//匹配email通知消费者绑定的header //headers.put("inform_type", "sms");//匹配sms通知消费者绑定的header AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties.Builder(); properties.headers(headers); //Email通知 channel.basicPublish(EXCHANGE_HEADERS_INFORM, "", properties.build(), message.getBytes());
发送邮件消费者:
channel.exchangeDeclare(EXCHANGE_HEADERS_INFORM, BuiltinExchangeType.HEADERS); Map<String, Object> headers_email = new Hashtable<String, Object>(); headers_email.put("inform_email", "email"); //交换机和队列绑定 channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_HEADERS_INFORM,"",headers_email); //指定消费队列 channel.basicConsume(QUEUE_INFORM_EMAIL, true, consumer);
RPC
RPC即客户端远程调用服务端的方法 ,使用MQ可以实现RPC的异步调用,基于Direct交换机实现,流程如下:
1、客户端即是生产者就是消费者,向RPC请求队列发送RPC调用消息,同时监听RPC响应队列。 2、服务端监听RPC请求队列的消息,收到消息后执行服务端的方法,得到方法返回的结果
3、服务端将RPC方法 的结果发送到RPC响应队列
4、客户端(RPC调用方)监听RPC响应队列,接收到RPC调用结果。
SpringBoot整合RibbitMQ
1、pom中添加如下依赖:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring‐boot‐starter‐amqp</artifactId> </dependency>
2、yml文件中配置参数
server: port: 44000 spring: application: name: test-rabbitmq-producer rabbitmq: host: 127.0.0.1 port: 5672 username: guest password: guest virtualHost: /
3、定义RabbitConfig类,配置Exchange、Queue、及绑定交换机。
编写RabbitmqConfig类:
package com.xuecheng.test.rabbitmq.config; import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @Author Barrett * @Date 2020/5/10 * @Description Rabbitmq配置类 */ @Configuration public class RabbitmqConfig { public static final String QUEUE_INFORM_EMAIL = "queue_inform_email"; public static final String QUEUE_INFORM_SMS = "queue_inform_sms"; public static final String EXCHANGE_TOPICS_INFORM = "exchange_topics_inform"; public static final String ROUTINGKEY_EMAIL = "inform.#.email.#"; public static final String ROUTINGKEY_SMS = "inform.#.sms.#"; //声明交换机 @Bean(EXCHANGE_TOPICS_INFORM) public Exchange EXCHANGE_TOPICS_INFORM() { //durable(true) 持久化,mq重启之后交换机还在 return ExchangeBuilder.topicExchange(EXCHANGE_TOPICS_INFORM).durable(true).build(); } //声明QUEUE_INFORM_EMAIL队列 @Bean(QUEUE_INFORM_EMAIL) public Queue QUEUE_INFORM_EMAIL() { return new Queue(QUEUE_INFORM_EMAIL); } //声明QUEUE_INFORM_SMS队列 @Bean(QUEUE_INFORM_SMS) public Queue QUEUE_INFORM_SMS() { return new Queue(QUEUE_INFORM_SMS); } //ROUTINGKEY_EMAIL队列绑定交换机,指定routingKey @Bean public Binding BINDING_QUEUE_INFORM_EMAIL(@Qualifier(QUEUE_INFORM_EMAIL) Queue queue, @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_EMAIL).noargs(); } //ROUTINGKEY_SMS队列绑定交换机,指定routingKey @Bean public Binding BINDING_ROUTINGKEY_SMS(@Qualifier(QUEUE_INFORM_SMS) Queue queue, @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_SMS).noargs(); } }
4、生产者
package com.xuecheng.test.rabbitmq; import com.alibaba.fastjson.JSON; import com.xuecheng.test.rabbitmq.config.RabbitmqConfig; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import java.util.HashMap; import java.util.Map; /** * @Author Barrett * @Date 2020/5/10 * @Description SpringBoot整合mq生产者 */ @SpringBootTest @RunWith(SpringRunner.class) public class Producer05_topics_springboot { @Autowired RabbitTemplate rabbitTemplate; //使用rabbitTemplate发送消息 @Test public void testSendEmail() { String message = "send email message to user"; /** * 参数: * 1、交换机名称 * 2、routingKey * 3、消息内容 */ rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_TOPICS_INFORM, "inform.email", message); } //使用rabbitTemplate发送消息 @Test public void testSendPostPage() { Map message = new HashMap<>(); message.put("pageId", "5a795ac7dd573c04508f3a56"); //将消息对象转成json串 String messageString = JSON.toJSONString(message); //路由key,就是站点ID String routingKey = "5a751fab6abb5044e0d19ea1"; /** * 参数: * 1、交换机名称 * 2、routingKey * 3、消息内容 */ rabbitTemplate.convertAndSend("ex_routing_cms_postpage", routingKey, messageString); } }
5、消费者
pom中添加:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
6、使用@RabbitListener注解监听队列
package com.xuecheng.test.rabbitmq.mq; import com.rabbitmq.client.Channel; import com.xuecheng.test.rabbitmq.config.RabbitmqConfig; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * @Author Barrett * @Date 2020/5/10 * @Description 使用@RabbitListener注解监听队列。 */ @Component public class ReceiveHandler { @RabbitListener(queues = {RabbitmqConfig.QUEUE_INFORM_EMAIL}) public void send_email(String msg, Message message, Channel channel) { System.out.println("receive message is:" + msg); } }
7、测试:
启动消费者TestRabbitmqApplication工程,然后调用生产者的testSendEmail方法,会在消费者显示已消费。