初识RabbitMQ
RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。AMQP协议更多用在企业系统内,对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次。
RabbitMQ的优点:开源、性能优秀、稳定性保障、提供可靠性消息投递模式、返回模式、与SpringAMQP完美整合、api丰富、在保证数据不丢失的情况下做到高可靠性、可用性。
AMQP高级消息队列协议(定义):是具有现代特征的二进制协议。是一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。
AMQP核心概念:
RabbitMQ重要概念
Server:又称Broker,接受客户端连接,实现AMQP实体服务
Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
Queue:消息队列载体,每个消息都会被投入到一个或多个队列。
Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来。
Routing Key:路由关键字,exchange根据这个关键字进行消息投递。
Virtual Host:虚拟主机,一个broker里可以开设多个Virtual Host,用作不同用户的权限分离。
producer:消息生产者,就是投递消息的程序。
consumer:消息消费者,就是接受消息的程序。
channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务。
Linux下安装RabbitMQ
http://www.rabbitmq.com/releases/
Erlang下载地址:http://www.rabbitmq.com/releases/erlang/
RabbitMQ下载地址:https://www.rabbitmq.com/download.html
版本对应关系:https://www.rabbitmq.com/which-erlang.html
安装rabbitmq所需要的依赖包
yum install build-essential openssl openssl-devel unixODBC unixODBC-devel make gcc gcc-c++ kernel-devel m4 ncurses-devel tk tc xz
安装Erlang出错,因为本系统是centos6.8所带的glibc只有到2.14所以要先升级我们的glibc,编写一个sh脚本,然后执行,下载的过程比较久。如果本脚本不行的话可以在脚本的最后添加强制安装 --force --nodeps
#! /bin/sh # update glibc to 2.17 for CentOS 6 wget http://copr-be.cloud.fedoraproject.org/results/mosquito/myrepo-el6/epel-6-x86_64/glibc-2.17-55.fc20/glibc-2.17-55.el6.x86_64.rpm wget http://copr-be.cloud.fedoraproject.org/results/mosquito/myrepo-el6/epel-6-x86_64/glibc-2.17-55.fc20/glibc-common-2.17-55.el6.x86_64.rpm wget http://copr-be.cloud.fedoraproject.org/results/mosquito/myrepo-el6/epel-6-x86_64/glibc-2.17-55.fc20/glibc-devel-2.17-55.el6.x86_64.rpm wget http://copr-be.cloud.fedoraproject.org/results/mosquito/myrepo-el6/epel-6-x86_64/glibc-2.17-55.fc20/glibc-headers-2.17-55.el6.x86_64.rpm sudo rpm -Uvh glibc-2.17-55.el6.x86_64.rpm \ glibc-common-2.17-55.el6.x86_64.rpm \ glibc-devel-2.17-55.el6.x86_64.rpm \ glibc-headers-2.17-55.el6.x86_64.rpm
安装Erlang
rpm -ivh erlang-18.3-1.el7.centos.x86_64.rpm
安装socat
rpm -ivh socat-1.7.3.2-1.1.el7.x86_64.rpm
安装rabbitmq
rpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm
修改核心配置文件,一个json格式的配置文件,rpm安装的路径是默认的,修改配置文件让rabbitmq可以直接访问
vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin/rabbit.app
{loopback_users, [<<"guest">>]}修改为{loopback_users, [guest]}
服务的启动: rabbitmq-server start & 服务的停止: rabbitmqctl stop_ app 管理插件: rabbitmq-plugins enable rabbitmq_ management
启动:
安装可视化插件
rabbitmq-plugins enable rabbitmq_management
ip+端口15672访问,管控台默认的端口为15672,5672是通信端口java端用的在配置文件里面有,默认的账号和密码为guest
基本命令
rabbitmqctl stop_app:关闭应用 rabbitmqctl start_app:启动应用 rabbitmqctl status:节点状态 rabbitmqctl add user username password: 添加用户 rabbitmqctl list users:列出所有用户 rabbitmqctl delete user username: 删除用户 rabbitmqctl clear permissions -p vhostpath username:清除用户权限 rabbitmqctl list user permissions username:列出用户权限 rabbitmqctl change_ password username newpassword:修改密码 rabbitmqctl set_ permissions -p vhostpath username".*"".*"".*". 设置用户权限 rabbitmqctl add _vhost vhostpath:创建虚拟主机 rabbitmqctl list vhosts:列出所有虚拟主机 rabbitmqctl list_ permissions -p vhostpath:列出虚拟主机上所有权限 rabbitmqctl list_ queues:查看所有队列信息 rabbitmqctl -p vhostpath purge_ queue blue:清除队列里的消息 rabbitmqctl reset:移除所有数据,要在rabbitmqctl stop_ app之后使用 rabbitmqctl join _cluster <clusternode> [--ram] :组成集群命令 rabbitmqctl change cluster node type disc |修改集群节点的存储形式 rabbitmqctl forget cluster_ node [--offline] 忘记节点(摘除节点) rabbitmqctl rename_ cluster node oldnode1 newnode1 [oldnode2][newnode2..]. (修改节 点名称)
创建一个springboot项目测试一下
需要引入rabbitmq的包
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>3.6.5</version> </dependency>
生成者
package com.smxy.quickstart; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Procuder { public static void main(String[] args) throws Exception { //1 创建一个ConnectionFactory, 并进行配置 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.254.129"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); //2 通过连接工厂创建连接 Connection connection = connectionFactory.newConnection(); //3 通过connection创建一个Channel Channel channel = connection.createChannel(); //4 通过Channel发送数据 for(int i=0; i < 5; i++){ String msg = "Hello RabbitMQ!"; //1 exchange 2 routingKey channel.basicPublish("", "test001", null, msg.getBytes()); } //5 记得要关闭相关的连接 channel.close(); connection.close(); } }
消费者
package com.smxy.quickstart; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Envelope; import com.rabbitmq.client.QueueingConsumer; import com.rabbitmq.client.QueueingConsumer.Delivery; public class Consumer { public static void main(String[] args) throws Exception { //1 创建一个ConnectionFactory, 并进行配置 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.254.129"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); //2 通过连接工厂创建连接 Connection connection = connectionFactory.newConnection(); //3 通过connection创建一个Channel Channel channel = connection.createChannel(); //4 声明(创建)一个队列 String queueName = "test001"; channel.queueDeclare(queueName, true, false, false, null); //5 创建消费者 QueueingConsumer queueingConsumer = new QueueingConsumer(channel); //6 设置Channel channel.basicConsume(queueName, true, queueingConsumer); while(true){ //7 获取消息 Delivery delivery = queueingConsumer.nextDelivery(); String msg = new String(delivery.getBody()); System.err.println("消费端: " + msg); //Envelope envelope = delivery.getEnvelope(); } } }
启动后
交换机属性:
Name:交换机名称
Type:交换机类型direct、topic、fanout、headers
Durability:是否需要持久化,true为持久化
Auto Delete:当最后一个绑定到Exchange上的队列删除后,自动删除该Exchange。
Internal:当前Exchange是否用于RabbitMQ内部使用,默认False
Arguments:扩展参数。
四种交换机类型
Direct Exchange(直连交换机)
1对1-----一个消息只能被一个消费者消费
所有发送到Direct Exchange的消息被转发到RouteKey中指定的Queue
注意: Direct模式可以使用RabbitMQ自带的Exchange: default
Exchange,所以不需要将Exchange进行任何绑定(binding)操作,消息传递时,RouteKey必须完全匹配才会被队列接收,否则该消息会被抛弃。
消费者:
public class Consumer4DirectExchange { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory() ; connectionFactory.setHost("192.168.254.129"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setAutomaticRecoveryEnabled(true); connectionFactory.setNetworkRecoveryInterval(3000); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); //4 声明 String exchangeName = "test_direct_exchange"; String exchangeType = "direct"; String queueName = "test_direct_queue"; String routingKey = "test.direct"; //表示声明了一个交换机 channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null); //表示声明了一个队列 channel.queueDeclare(queueName, false, false, false, null); //建立一个绑定关系: channel.queueBind(queueName, exchangeName, routingKey); //durable 是否持久化消息 QueueingConsumer consumer = new QueueingConsumer(channel); //参数:队列名称、是否自动ACK、Consumer channel.basicConsume(queueName, true, consumer); //循环获取消息 while(true){ //获取消息,如果没有消息,这一步将会一直阻塞 Delivery delivery = consumer.nextDelivery(); String msg = new String(delivery.getBody()); System.out.println("收到消息:" + msg); } } }
生产者
public class Producer4DirectExchange { public static void main(String[] args) throws Exception { //1 创建ConnectionFactory ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.254.129"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); //2 创建Connection Connection connection = connectionFactory.newConnection(); //3 创建Channel Channel channel = connection.createChannel(); //4 声明 String exchangeName = "test_direct_exchange"; String routingKey = "test.direct"; //5 发送 String msg = "Hello World RabbitMQ 4 Direct Exchange Message 111 ... "; channel.basicPublish(exchangeName, routingKey , null , msg.getBytes()); } }
Topic Exchange(主题交换机)
1对多-----一个消息可以被多个消费者消费
所有发送到Topic Exchange的消息被转发到所有关系RouteKey中指定Topic的Queue上。
Exchange将RouteKey和某Topic进行模糊匹配。队列需要绑定一个Topic。
路由关键字通配符
#:可以匹配一个或者多个词
*:可以匹配一个词
Fanout 是最快的
Fanout Exchange(扇型交换机)
广播
不处理路由键,只需要简单的将队列绑定到交换机上。
发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。
Headers exchange(头交换机)
类似主题交换机,但是头交换机使用多个消息属性来代替路由键建立路由规则。通过判断消息头的值能否与指定的绑定相匹配来确立路由规则。
此交换机有个重要参数:”x-match”
当”x-match”为“any”时,消息头的任意一个值被匹配就可以满足条件
当”x-match”设置为“all”的时候,就需要消息头的所有值都匹配成功
消息队列:
Durability:是否持久化,durable:是,Transient:否
Auto delete:如选yes,代表当最后一个监听被移除之后,该Queue会自动被删除。
Message设置
package com.bfxy.rabbitmq.api.message; import java.util.HashMap; import java.util.Map; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Procuder { public static void main(String[] args) throws Exception { //1 创建一个ConnectionFactory, 并进行配置 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.254.129"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); //2 通过连接工厂创建连接 Connection connection = connectionFactory.newConnection(); //3 通过connection创建一个Channel Channel channel = connection.createChannel(); Map<String, Object> headers = new HashMap<>(); headers.put("my1", "111"); headers.put("my2", "222"); AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder() .deliveryMode(2)//发送到服务器后,重启服务后消息还存在 .contentEncoding("UTF-8") .expiration("10000")//过期时间 .headers(headers) .build(); //4 通过Channel发送数据 for(int i=0; i < 5; i++){ String msg = "Hello RabbitMQ!"; //1 exchange 2 routingKey channel.basicPublish("", "test001", properties, msg.getBytes()); } //5 记得要关闭相关的连接 channel.close(); connection.close(); } }
package com.bfxy.rabbitmq.api.message; import java.util.Map; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Envelope; import com.rabbitmq.client.QueueingConsumer; import com.rabbitmq.client.QueueingConsumer.Delivery; public class Consumer { public static void main(String[] args) throws Exception { //1 创建一个ConnectionFactory, 并进行配置 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.254.129"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); //2 通过连接工厂创建连接 Connection connection = connectionFactory.newConnection(); //3 通过connection创建一个Channel Channel channel = connection.createChannel(); //4 声明(创建)一个队列 String queueName = "test001"; channel.queueDeclare(queueName, true, false, false, null); //5 创建消费者 QueueingConsumer queueingConsumer = new QueueingConsumer(channel); //6 设置Channel channel.basicConsume(queueName, true, queueingConsumer); while(true){ //7 获取消息 Delivery delivery = queueingConsumer.nextDelivery(); String msg = new String(delivery.getBody()); System.err.println("消费端: " + msg); Map<String, Object> headers = delivery.getProperties().getHeaders(); System.err.println("headers get my1 value: " + headers.get("my1")); //Envelope envelope = delivery.getEnvelope(); } } }