RabbitMQ的使用与分析

简介: 1、简介对于系统间的解耦合,无非是采用MQ(消息系统)的方式,而在众多的MQ方案中,使用的比较广泛且性能较高的开源项目则是RabbitMQ了。

1、简介

对于系统间的解耦合,无非是采用MQ(消息系统)的方式,而在众多的MQ方案中,使用的比较广泛且性能较高的开源项目则是RabbitMQ了。

RabbitMQ是采用Erlang语言编写,天生的支持高并发,而且内部有不同其它MQ的独特设计。


2、安装

为了学习与使用rabbitMQ,我们首先需要在我们的机器上安装它的服务。这里我们以windows32系统为例,需要otp_win32_R16B02.exe和rabbitmq-server-3.2.1.exe两个安装文件,很容易就能下载到,官网上也有。这两个的安装是按常规下一步直到结束。安装完成后到rabbitMQ的根目录下的sbin目录下,执行

rabbitmq-plugins enable rabbitmq_management安装web插件,然后到计算机管理里找到rabbitmq的service,选择并启动,再执行下安装的sbin目录里的rabbitmqctl.bat文件,等服务都启动好后就可以进入web控制台了,在浏览器中输入http://127.0.0.1/55672,用户名和密码默认都是guest。如果进不去,那只

有两种可能,一是服务没启起来,二是插件没安装生效。多试上面几步就可以了。

                                               图1--控制台安装web插件

                                                 图2-web插件控制台


3、RabbitMQ的独特设计

RabbitMQ里不仅仅有Queue,还有其他一些概念,像Vhost,exchange。MQ的生产者直接连的是exchange,exchange就像一个路由器,通过一定的规则和Queue进行绑定,消费者直接连接的是Queue。ecchange与Queue有多种绑定规则,具体如下:

Fanout Exchange:

                                                   图3--扇出型路由

Topic Exchange:

                                                图4--主题式路由

DirectExchange:

                                           图5--直连型路由

HeadersExchange。

如果要像其他MQ一样用,只要选择directExchange就可以了,由于复杂的场景,我在项目中运用的是topicExchange,可以根据不同的routekey路由到不同的队列里,从而

能达到负载均衡的作用。

RabbitMQ对权限也做了很好的设计,具体的权限粒度如图所示:

                                                         图6--权限控制粒度

4、RabbitMQ的使用

为了使用的简单,先说明结合spring的使用方式,利用spring的IOC自动管理Bean。

需要引入以下的配置:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
	xmlns:rabbit="http://www.springframework.org/schema/rabbit"
	xsi:schemaLocation="
            http://www.springframework.org/schema/beans
                http://www.springframework.org/schema/beans/spring-beans.xsd
            http://www.springframework.org/schema/context
                http://www.springframework.org/schema/context/spring-context.xsd
            http://www.springframework.org/schema/rabbit
                http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">
                
 	<bean id="mqPropertyConfig"
		class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
		<property name="order" value="2" />
        <property name="ignoreUnresolvablePlaceholders" value="true" />		
		<property name="locations">
			<list>
				<value>classpath:conf/mq/mq_$[envName].properties
				</value>
			</list>
		</property>
		<property name="placeholderPrefix" value="${" />
		<property name="systemPropertiesModeName" value="SYSTEM_PROPERTIES_MODE_OVERRIDE" />
	</bean>
	
	<!-- 连接服务配置 -->
 	<rabbit:connection-factory id="connectionFactory" host="${apns.mq.Host}" username="${apns.mq.username}" password="${apns.mq.password}" virtual-host="${apns.mq.vhost}" port="${apns.mq.Port}" channel-cache-size="${apns.mq.cachesize}"/>
 	
	<rabbit:admin id="amqpadmin" connection-factory="connectionFactory"/>
	
	<rabbit:template id="amqpTemplate" connection-factory="connectionFactory"/>

</beans>

在java代码中的引用示例如下:

<pre name="code" class="java">private AmqpTemplate amqpTemplate = context.getBean("amqpTemplate", AmqpTemplate.class);
 
 
amqpTemplate.convertAndSend(exchange, routeKey, message);
当然,exchange和Queue以及他们的绑定也可以直接用代码实现,具体例子如下:

        AmqpAdmin amqpAdmin = context.getBean(AmqpAdmin.class);
        amqpAdmin.declareExchange(new TopicExchange(ENT_EXCHANGE, true, false));
        amqpAdmin.declareExchange(new TopicExchange(DEV_EXCHANGE, true, false));
        Map<Integer, String> map = RouteKeyUtil.getAllQueues();
        for(String name : map.values()){
            /**申明企业证书的队列并绑定至对应的exchange*/
            amqpAdmin.declareQueue(new Queue(ENTERPRISE + name, true, false, false));
            amqpAdmin.declareBinding(new Binding(ENTERPRISE + name, DestinationType.QUEUE, ENT_EXCHANGE, name, null));
            /**申明开发者证书的队列并绑定至对应的exchange*/
            amqpAdmin.declareQueue(new Queue(DEVELOPER + name, true, false, false));
            amqpAdmin.declareBinding(new Binding(DEVELOPER + name, DestinationType.QUEUE, DEV_EXCHANGE, name, null));
        }
我在项目中只引入了两个JAR文件:Maven的中央仓库里就能下载到(http://search.maven.org)

也可以直接用API而不需要spring。感兴趣的可以到官网上查看相关文档。

最后推荐一本RabbitMQ的相关书《RabbitMQ in Action》,不过这本书是英文的,网上也有电子版的。




相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
11月前
|
消息中间件 架构师 Java
美团面试:对比分析 RocketMQ、Kafka、RabbitMQ 三大MQ常见问题?
美团面试:对比分析 RocketMQ、Kafka、RabbitMQ 三大MQ常见问题?
美团面试:对比分析 RocketMQ、Kafka、RabbitMQ 三大MQ常见问题?
|
存储 消息中间件 缓存
RocketMQ原理—3.源码设计简单分析下
本文介绍了Producer作为生产者是如何创建出来的、启动时是如何准备好相关资源的、如何从拉取Topic元数据的、如何选择MessageQueue的、与Broker是如何进行网络通信的,Broker收到一条消息后是如何存储的、如何实时更新索引文件的、如何实现同步刷盘以及异步刷盘的、如何清理存储较久的磁盘数据的,Consumer作为消费者是如何创建和启动的、消费者组的多个Consumer会如何分配消息、Consumer会如何从Broker拉取一批消息。
516 11
RocketMQ原理—3.源码设计简单分析下
|
消息中间件 Java 数据管理
RocketMQ原理—2.源码设计简单分析上
本文介绍了NameServer的启动脚本、启动时会解析哪些配置、如何初始化Netty网络服务器、如何启动Netty网络服务器,介绍了Broker启动时是如何初始化配置的、BrokerController的创建以及包含的组件、BrokerController的初始化、启动、Broker如何把自己注册到NameServer上、BrokerOuterAPI是如何发送注册请求的,介绍了NameServer如何处理Broker的注册请求、Broker如何发送定时心跳
|
消息中间件 监控 数据挖掘
基于RabbitMQ与Apache Flink构建实时分析系统
【8月更文第28天】本文将介绍如何利用RabbitMQ作为数据源,结合Apache Flink进行实时数据分析。我们将构建一个简单的实时分析系统,该系统能够接收来自不同来源的数据,对数据进行实时处理,并将结果输出到另一个队列或存储系统中。
1288 2
|
消息中间件 存储 数据中心
RocketMQ的长轮询(Long Polling)实现分析
文章深入分析了RocketMQ的长轮询实现机制,长轮询结合了推送(push)和拉取(pull)两种消息消费模式的优点,通过客户端和服务端的配合,确保了消息的实时性同时将主动权保留在客户端。文中首先解释了长轮询的基本概念和实现步骤,然后通过一个简单的实例模拟了长轮询的过程,最后详细介绍了RocketMQ中DefaultMQPushConsumer的长轮询实现方式,包括PullMessage服务、PullMessageProcessor服务和PullCallback回调的工作原理。
593 1
|
消息中间件 Arthas Java
RocketMQ—一次连接namesvr失败的案例分析
项目组在使用RocketMQ时遇到Consumer连接Name Server失败的问题,异常显示连接特定地址失败。通过Arthas工具逐步分析代码执行路径,定位到创建Channel返回空值导致异常。进一步跟踪发现,问题源于Netty组件在初始化`ByteBufAllocator`时出现错误。分析依赖后确认存在Netty版本冲突。解决方法为排除冲突的Netty包,仅保留兼容版本。
1164 0
RocketMQ—一次连接namesvr失败的案例分析
|
数据采集 监控 物联网
MQTT协议在智能制造中的应用案例与效益分析
【6月更文挑战第8天】MQTT协议在智能制造中的应用案例与效益分析
679 1
|
消息中间件 存储 安全
【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息顺序消费和并发消费机制体系的原理分析
【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息顺序消费和并发消费机制体系的原理分析
656 0
|
消息中间件 负载均衡 Java
【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息消费长轮训机制体系的原理分析
【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息消费长轮训机制体系的原理分析
435 0
|
消息中间件 存储 Apache
精华推荐 | 【深入浅出RocketMQ原理及实战】「性能原理挖掘系列」透彻剖析贯穿RocketMQ的事务性消息的底层原理并在分析其实际开发场景
事务消息(Transactional Message)是指应用本地事务和发送消息操作可以被定义到全局事务中,要么同时成功,要么同时失败。RocketMQ的事务消息提供类似 X/Open XA 的分布事务功能,通过事务消息能达到分布式事务的最终一致。
1194 2
精华推荐 | 【深入浅出RocketMQ原理及实战】「性能原理挖掘系列」透彻剖析贯穿RocketMQ的事务性消息的底层原理并在分析其实际开发场景