01.RabbitMQ入门
1.1.什么是MQ
消息队列(Message Queue,简称MQ),从字面意思上看,本质是个队列,FIFO先入先出,只不过队列中存放的内容是message而已
作用:应用程序“对”应用程序的通信方法。
1.2.应用场景
主要解决异步处理、应用解耦、流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构
1.2.1.异步处理
用户注册后,需要发注册邮件和注册短信
1.2.2.应用解耦
用户下单后,订单系统需要通知库存系统
1.2.3.流量削锋(重点)
流量削锋也是消息队列中的常用场景,一般在秒杀或团抢活动中使用广泛
应用场景:秒杀活动,一般会因为流量过大,导致流量暴增,应用挂掉。为解决这个问题,一般需要在应用前端加入消息队列
a、可以控制活动的人数
b、可以缓解短时间内高流量压垮应用
用户的请求,服务器接收后,首先写入消息队列。假如消息队列长度超过最大数量,则直接抛弃用户请求或跳转到错误页面。
秒杀业务根据消息队列中的请求信息,再做后续处理
1.2.4.日志处理
日志处理是指将消息队列用在日志处理中,比如Kafka的应用,解决大量日志传输的问题
1.日志采集客户端,负责日志数据采集,定时写受写入Kafka队列
2.Kafka消息队列,负责日志数据的接收,存储和转发
3.日志处理应用:订阅并消费kafka队列中的日志数据
1.3.主流MQ框架
MQ框架非常之多,比较流行的有RabbitMq、ActiveMq、ZeroMq、kafka,以及阿里开源的RocketMQ。
本文主要介绍RabbitMq
RabbitMQ是以AMQP协议实现的一种消息中间件产品。AMQP是Advanced Message Queuing Protocol的简称,
它是一个面向消息中间件的开放式标准应用层协议。
MQ Broker
1.4.Docker安装部署RabbitMQ
注意获取镜像的时候要获取management版本的,不要获取last版本的,management版本的才带有管理界面
1.4.1.查询镜像
docker search rabbitmq:management
1.4.2.获取镜像
docker pull rabbitmq:management
1.4.3.运行镜像
#方式一:默认guest用户,密码也是guest docker run -d -p 5672:5672 -p 15672:15672 --name rabbitmq rabbitmq:management #方式二:设置用户名和密码 docker run -d \ --name my-rabbitmq \ -p 5672:5672 -p 15672:15672 \ -v /data:/var/lib/rabbitmq \ --hostname my-rabbitmq-host \ -e RABBITMQ_DEFAULT_VHOST=my_vhost \ -e RABBITMQ_DEFAULT_USER=admin \ -e RABBITMQ_DEFAULT_PASS=admin \ --restart=always \ rabbitmq:management
参数说明:
-d:后台运行容器
-name:指定容器名
-p:指定服务运行的端口(5672:应用访问端口;15672:控制台Web端口号)
-v:映射目录或文件,启动了一个数据卷容器,数据卷路径为:/var/lib/rabbitmq,再将此数据卷映射到住宿主机的/data目录
–hostname:主机名(RabbitMQ的一个重要注意事项是它根据所谓的 “节点名称” 存储数据,默认为主机名)
-e:指定环境变量;(
RABBITMQ_DEFAULT_VHOST:默认虚拟机名;
RABBITMQ_DEFAULT_USER:默认的用户名;
RABBITMQ_DEFAULT_PASS:默认用户名的密码)
–restart=always:当Docker重启时,容器能自动启动
rabbitmq:management:镜像名
注1:RABBITMQ_DEFAULT_VHOST=my_vhost,my_vhost名字请记好,在之后的编程中要用到,
如果启动时没指定,默认值为/
#4.进入RabbitMQ管理平台进行相关操作
注1:容器启动后,可以通过docker logs 窗口ID/容器名字 查看日志
docker logs my-rabbitmq
注2:停止并删除所有容器
docker stop $(docker ps -aq) && docker rm $(docker ps -aq)
1.5.RabbitMQ管理平台
后台地址:http://[宿主机IP]:15672
默认账号:guest/guest,用户也可以自己创建新的账号,例如:admin/admin
1.5.1.Virtual Hosts
就象mysql有数据库的概念并且可以指定用户对库和表等操作的权限。那RabbitMQ呢?RabbitMQ也有类似的权限管理。
在RabbitMQ中可以虚拟消息服务器VirtualHost,每个VirtualHost相当于一个相对独立的RabbitMQ服务器,
每个VirtualHost之间是相互隔离的。exchange、queue、message不能互通
另外,虚拟主机的限制有两个:最大连接数和最大队列数
1.5.2.RabbitMQ关键名词
Server(Broker):接收客户端连接,实现AMQP协议的消息队列和路由功能的进程;
Virtual Host:虚拟主机的概念,类似权限控制组,一个Virtual Host里可以有多个Exchange和Queue。
Exchange:交换机,接收生产者发送的消息,并根据Routing Key将消息路由到服务器中的队列Queue。
ExchangeType:交换机类型决定了路由消息行为,RabbitMQ中有三种类型Exchange,分别是fanout、direct、topic;
Message Queue:消息队列,用于存储还未被消费者消费的消息;
Message:由Header和body组成,Header是由生产者添加的各种属性的集合,包括Message是否被持久化、优先级是多少、由哪个Message Queue接收等;body是真正需要发送的数据内容;
BindingKey:绑定关键字,将一个特定的Exchange和一个特定的Queue绑定起来。
1.6.MQ的核心概念
生产者、队列、消费者、消息
生产者生产消息并投递到队列中,
消费者可以从队列中获取消息并消费,
消息指的是各个服务之间要传递的数据
1.7.springboot整合rabbitmq
1.7.1.安装好rabbitmq,登陆RabbitMQ管理平台,新增管理用户并设置权限
1.7.1.1.新增用户
1.7.1.2.切换到springboot用户登陆,在All users中,点击Name为springboot, 进入权限设置页面
1.7.1.3.在权限设置页面,进入Permissions页面,点击“Set permission"
1.7.2.pom.xml添加rabbitmq依赖
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.7.2</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.example</groupId> <artifactId>rabbitMQ01</artifactId> <version>0.0.1-SNAPSHOT</version> <name>rabbitMQ01</name> <description>rabbitMQ01</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <configuration> <excludes> <exclude> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </exclude> </excludes> </configuration> </plugin> </plugins> </build> </project>
1.7.3.rabbitmq连接配置
application.yml
server: port: 8080 spring: rabbitmq: host: localhost password: 123456 port: 5672 username: admin virtual-host: my_vhost
与启动容器时虚拟主机名字一致与启动容器时虚拟主机名字一致与启动容器时虚拟主机名字一致 spring.rabbitmq.virtual-host=my_vhost
1.7.4.创建Rabbit配置类RabbitConfig
新建一个config包,新建文件
package com.zjzaki.rabbitmq01.config; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitConfig { @Bean public Queue firstQueue() { // 创建一个队列,名称为:first return new Queue("first"); } }
1.7.5.创建消息产生者类
新建component包,新建文件 Sender
package com.zjzaki.rabbitmq01.components; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component public class Sender { @Autowired private AmqpTemplate rabbitTemplate; public void send() { rabbitTemplate.convertAndSend("first", "test rabbitmq message !!!"); } }
1.7.6.创建测试类
package com.zjzaki.rabbitmq01; import com.zjzaki.rabbitmq01.components.Sender; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; @RunWith(SpringJUnit4ClassRunner.class) @SpringBootTest public class RabbitmqTest { @Autowired private Sender sender; @Test public void testRabbitmq() throws Exception { sender.send(); } }
1.7.7.启动测试类测试
控制台出现如下信息, 表示连接成功
web端也可以看到
此时我们多运行几次
1.7.8.创建消息消费者
package com.zjzaki.rabbitmq01.components; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component @RabbitListener(queues = "first") public class Receiver { @RabbitHandler public void process(String msg) { System.out.println("receive msg : " + msg); } }
注解作用:
@RabbitListener注解:定义该类需要监听的队列
@RabbitHandler注解:指定对消息的处理
1.7.9.再次运行程序
控制台出现以上信息表示集成RabbitMQ完成