RabbitMQ
MQ定义
消息队列,遵循FIFO先进先出原理,实现上下游的逻辑解耦和物理解耦的消息通信服务,消息发送上游只需要依赖MQ,不需要依赖其他服务。
MQ应用场景
1.流量消峰
以逻辑图的形式进行展现
2.应用解耦
3.异步处理
MQ分类
1.ActiveMQ
优点:单机吞吐量万级,时效性MS(毫秒)级,基于主从架构实现高可用性,消息可靠性叫低的概率丢失数据(一般不会丢失数据)
缺点:官方社区现在对ActiveMQ 5x 维护越来越少,高吞吐量场景较少使用(阿帕奇公司)。
2.Kafka
大数据的杀手锏,大数据领域专用,为大数据而生的消息中间件,百万级TPS的吞吐量
优点:吞吐量高,时效性MS(毫秒)级可用性非常高,kafka是分布式的,一个数据多个副本,少数机器宕机,不会丢失数据,有优秀的第三方kafka Web管理界面,日志领域比较成熟。
缺点:队列越多,load(加载)越高,发送消息响应时间变长,使用短轮询的方式,时效性取决于轮询间隔时间,消息失败不支持重试。
3.RocketMQ
阿里巴巴的开源产品,Java 语言实现,设计时参考了 Kafka,并做出了自己的-些改进,被阿里巴巴广泛应用在订单,交易,充值,流计算,消息推送,日志流式处理,binglog 分发等场景。
优点:单机吞吐量十万级,可用性非常高,分布式架构,扩展性好,支持10亿级别的消息堆积。
缺点:支持客户端语言少,目前是java和c++,没有在 MQ核心中去实现JMS 等接口,有些系统要迁移需要修改大量代码。
4.RabbitMQ
是在AMQP(高级消息队列协议)基础上完成的,可重复的企业消息系统,当前最主流的消息中间件之一。
优点:由erlang(一浪)语言(开发的)的高并发特性,性能较好,吞吐量万级,功能比较完善,支持多种语言。
缺点:收费。
MQ消息应答方式
手动应答
Channel.basicAck(用于肯定)
RabbitMQ 已知道该消息并且成功的处理消息,可以将消息进行删除
Channel.basicNack(用于否定确认)
Channel.basicReject(用于否定确认)
与Channel.basicNack 相比少一个参数
不处理该消息直接拒绝,可以将消息进行删除
手动应答好处:可以批量应答并减少网络拥堵
自动应答
这种方式适用于在某种速率能够处理这些消息的情况下使用
MQ消息应答重新入队
逻辑图
MQ队列持久化
当RabbitMQ服务关闭,或者出现其他问题导致服务关闭时,此时MQ中的队列会丢失,为了避免丢失,在生产者创建消息队列时,将其设置为持久化,具体操作如下
我们只需要将queueDeclare方法中的第二个参数设置为true即可,再次回到rabbitMQ管理界面进行查看
队列:是MQ当中的一个组件叫队列,比如hello队列
消息:是生产者发送过来的数据
boolean durable = true;//需要让Queue(困 队列)进行持久化 channel.queueDeclare(task_queue_name,durable,false,false,null);
D就是持久化的标识,表示已经将当前队列设置为持久化,当我们重启rabbitMQ服务时,队列不会丢失
MQ消息持久化
将消息标记为持久化并不能完全保证不会丢失消息。告诉 RabbitMQ 将消息保存到磁盘
将basicPublish方法中第三个参数设置为MessageProperties.PERSISTENT_TEXT_PLAIN
//设置生产者发送的消息为持久化消息,将消息保存到磁盘中,一般保存到内存中 channel.basicPublish("",task_queue_name, MessageProperties.PERSISTENT_TEXT_PLAIN,massage.getBytes("UTF-8"));
MQ不公平分发
RabbitMQ发消息采用轮训分发消息(你一下我一下),假如两个消费者在执行任务,消费者1处理任务非常快,而消费者2处理任务非常慢,就会到这消费者1在处理完后,处于空闲状态,这种分配方式不是太好,我们只需要设置接收消息时要求信道设置basicQos(1),不是轮训分发,轮训分发默认为0
//设置不公平分发 channel.basicQos(1);
MQ发布确认
生产者将消息发送到MQ中,MQ将消息保存到磁盘后,告知生产者已经将消息保存到磁盘中
单个发布确认
同步确认发布的方式,发布完第一个后,等待确认消息收到后,在发送第二个
缺点:发布速度特别慢
批量发布确认
提高吞吐量,批量发布,出现问题后,不知道具体是哪个消息出现问题
异步发布确认
最佳性能和资源使用,在出现错误的情况下可以很好地控制。
RabbitMQ命令
安装erlang
需要RabbitMQ安装包私信我
rpm -ivh erlang-21.3-1.el7.x86_64.rpm
安装socat
yum install socat -y
安装RabbitMQ
rpm -ivh rabbitmq-server-3.8.8-1.el7.noarch.rpm
设置RabbitMQ服务开机自动启动
chkconfig rabbitmq-server on
启动RabbitMQ服务
/sbin/service rabbitmq-server start 或者 systemctl restart rabbitmq-server
查看RabbitMQ状态
systemctl status rabbitmq-server 或者 /sbin/service rabbitmq-server status
关闭RabbitMQ服务
/sbin/service rabbitmq-server stop
查看RabbitMQ用户
rabbitmqctl list_users
查看防火墙状态
systemctl status firewalld
创建RabbitMQ用户
// 第一步:创建账号
rabbitmqctl add_user admin 123
// 第二步:设置用户角色
rabbitmqctl set_user_tags admin administrator
// 第三步:设置用户权限
rabbitmqctl set_permissions -p “/” admin “." ".” “.*”