什么是RabbitMQ?
RabbitMQ是一个消息代理 , 一个消息系统的媒介。它可以为你的应用提供一个通用的消息发送和接收平台,并且保证消息在传输过程中的安全。
RabbitMQ架构图
基本概念
Connection
Connection是物理TCP连接。Connection将应用与RabbitMQ连接在一起。Connection会执行认证、IP解析、路由等底层网络任务。应用与RabbitMQ版完成Connection建立大约需要15个TCP报文交互,因而会消耗大量的网络资源和RabbitMQ资源。
Channel
Channel是物理TCP连接中的虚拟连接。当应用通过Connection与RabbitMQ建立连接后,所有的AMQP协议操作(例如创建队列、发送消息、接收消息等)都会通过Connection中的Channel完成。Channel可以复用Connection,即一个Connection下可以建立多个Channel。Channel不能脱离Connection独立存在,而必须存活在Connection中。当某个Connection断开时,该Connection下的所有Channel都会断开。当大量应用需要与消息队列RabbitMQ版建立多个连接时,建议使用Channel来复用Connection,从而减少网络资源和RabbitMQ资源消耗。
Queue
队列(Queue)存储着即将被应用消费掉的消息。队列跟交换机共享某些属性,但是队列也有一些另外的属性。
- Name
- Durable(消息代理重启后,队列依旧存在)
- Exclusive(只被一个连接(connection)使用,而且当连接关闭后队列即被删除)
- Auto-delete(当最后一个消费者退订后即被删除)
- Arguments(一些消息代理用他来完成类似与TTL的某些额外功能)
队列在声明(declare)后才能被使用。如果一个队列尚不存在,声明一个队列会创建它。如果声明的队列已经存在,并且属性完全相同,那么此次声明不会对原有队列产生任何影响。如果声明中的属性与已存在队列的属性有差异,那么一个错误代码为406的通道级异常就会被抛出。
Exchange
Exchange是消息队列RabbitMQ的消息路由代理。生产者向RabbitMQ发送消息时,不会直接将消息发送到Queue,而是先将消息发送到Exchange,由Exchange将消息路由到一个或多个Queue。Exchange根据Binding Key、Routing Key以及Headers属性路由消息。
交换机是用来发送消息的AMQP实体。交换机拿到一个消息之后将它路由给一个或零个队列。它使用哪种路由算法是由交换机类型和被称作绑定(bindings)的规则所决定的。AMQP 0-9-1的代理提供了四种交换机
Name(交换机类型) | Default pre-declared names(预声明的默认名称) |
Direct exchange(直连交换机) | (Empty string) and amq.direct |
Fanout exchange(扇型交换机) | amq.fanout |
Topic exchange(主题交换机) | amq.topic |
Headers exchange(头交换机) | amq.match (and amq.headers in RabbitMQ) |
除交换机类型外,在声明交换机时还可以附带许多其他的属性,其中最重要的几个分别是:
- Name
- Durability (消息代理重启后,交换机是否还存在)
- Auto-delete (当所有与之绑定的消息队列都完成了对此交换机的使用后,删掉它)
- Arguments(依赖代理本身)
交换机可以有两个状态:持久(durable)、暂存(transient)。持久化的交换机会在消息代理(broker)重启后依旧存在,而暂存的交换机则不会(它们需要在代理再次上线后重新被声明)。然而并不是所有的应用场景都需要持久化的交换机。
交换机类型
Direct Exchange
直连型交换机(direct exchange)是根据消息携带的路由键(routing key)将消息投递给对应队列的。
- 路由规则Direct Exchange根据Binding Key和Routing Key完全匹配的规则路由消息。
- 使用场景Direct Exchange适用于通过简单字符标识符区分消息的场景。Direct Exchange常用于单播路由。
Fanout Exchange
扇型交换机(funout exchange)将消息路由给绑定到它身上的所有队列,而不理会绑定的路由键。如果N个队列绑定到某个扇型交换机上,当有消息发送给此扇型交换机时,交换机会将消息的拷贝分别发送给这所有的N个队列。
- 路由规则Fanout Exchange忽略Routing Key和Binding Key的匹配规则将消息路由到所有绑定的Queue。
- 使用场景Fanout Exchange适用于广播消息的场景。例如,分发系统使用Fanout Exchange来广播各种状态和配置更新。
Topic Exchange
主题交换机(topic exchanges)通过对消息的路由键和队列到交换机的绑定模式之间的匹配,将消息路由给一个或多个队列。主题交换机经常用来实现各种分发/订阅模式及其变种。主题交换机通常用来实现消息的多播路由(multicast routing)。
- 路由规则Topic Exchange根据Binding Key和Routing Key通配符匹配的规则路由消息。Topic Exchange支持的通配符包括星号(*)和井号(#)。星号(*)代表一个英文单词(例如cn)。井号(#)代表零个、一个或多个英文单词,英文单词间通过英文句号(.)分隔,例如cn.zj.hz。
- 使用场景Topic Exchange适用于通过通配符区分消息的场景。Topic Exchange常用于多播路由。例如,使用Topic Exchange分发有关于特定地理位置的数据。
Headers Exchange
头交换机使用多个消息属性来代替路由键建立路由规则。通过判断消息头的值能否与指定的绑定相匹配来确立路由规则。
- 路由规则Headers Exchange可以被视为Direct Exchange的另一种表现形式。Headers Exchange可以像Direct Exchange一样工作,不同之处在于Headers Exchange使用Headers属性代替Routing Key进行路由匹配。您在绑定Headers Exchange和Queue时,可以设置绑定属性的键值对。然后,在向Headers Exchange发送消息时,设置消息的Headers属性键值对。Headers Exchange将根据消息Headers属性键值对和绑定属性键值对的匹配情况路由消息。匹配算法由一个特殊的绑定属性键值对控制。该属性为x-match,只有以下两种取值:
- all:所有除x-match以外的绑定属性键值对必须和消息Headers属性键值对匹配才会路由消息。
- any:只要有一组除x-match以外的绑定属性键值对和消息Headers属性键值对匹配就会路由消息。
- 以下两种情况下,认为消息Headers属性键值对和绑定属性键值对匹配:
- 消息Headers属性的键和值与绑定属性的键和值完全相同。
- 消息Headers属性的键和绑定属性的键完全相同,但绑定属性的值为空。
- 使用场景Headers Exchange适用于通过多组Headers属性区分消息的场景。Headers Exchange常用于多播路由。例如,涉及到分类或者标签的新闻更新。
RabbitMQ的通讯方式
1、"Hello World!"
2、Work Queues
3、Publish / Subscribe
4、Routing
5、Topics
6、RPC
AMQP协议相关知识
AMQP协议
AMQP(高级消息队列协议)是一个网络协议。它支持符合要求的客户端应用(application)和消息中间件代理(messaging middleware broker)之间进行通信。
消息代理(message brokers)从发布者(publishers)亦称生产者(producers)那儿接收消息,并根据既定的路由规则把接收到的消息发送给处理消息的消费者(consumers)。由于AMQP是一个网络协议,所以这个过程中的发布者,消费者,消息代理 可以存在于不同的设备上。
AMQP模型
消息(message)被发布者(publisher)发送给交换机(exchange),交换机常常被比喻成邮局或者邮箱。然后交换机将收到的消息根据路由规则分发给绑定的队列(queue)。最后AMQP代理会将消息投递给订阅了此队列的消费者,或者消费者按照需求自行获取。
从安全角度考虑,网络是不可靠的,接收消息的应用也有可能在处理消息的时候失败。基于此原因,AMQP模块包含了一个消息确认(message acknowledgements)的概念:当一个消息从队列中投递给消费者后(consumer),消费者会通知一下消息代理(broker),这个可以是自动的也可以由处理消息的应用的开发者执行。当“消息确认”被启用的时候,消息代理不会完全将消息从队列中删除,直到它收到来自消费者的确认回执(acknowledgement)。
通过JAVA连接RabbitMQ
新增Vhost
先在图形化界面中新增一个用户,角色填administrator 退出切换到test用户登录
点击test用户名,进入页面后选择刚刚创建的vhost
pom
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.9.0</version></dependency>
RabbitMQClient
publicclassRabbitMQClient { publicstaticConnectionGetConnection(){ ConnectionFactoryfactory=newConnectionFactory(); //指定IP和端口号factory.setHost("192.168.0.108"); factory.setPort(5672); //用户名和密码factory.setUsername("test"); factory.setPassword("test"); //指定vHostfactory.setVirtualHost("/test"); Connectionconnection=null; try { connection=factory.newConnection(); } catch (IOExceptione) { e.printStackTrace(); } catch (TimeoutExceptione) { e.printStackTrace(); } returnconnection; } }
接下来debug测试一下
图形化界面也显示已经连接了