RabbitMQ系列(二)深入了解RabbitMQ工作原理及简单使用

简介: 深入了解RabbitMQ工作原理及简单使用RabbitMQ系列文章RabbitMQ在Ubuntu上的环境搭建深入了解RabbitMQ工作原理及简单使用RabbitMQ交换器Exchange介绍与实践RabbitMQ事务和Confirm发送方消息确认——深入解读使用Docker部署RabbitMQ集群你不知道的RabbitMQ集群架构全解RabbitMQ简介在介绍RabbitMQ之前实现要介绍一下MQ,MQ是什么?MQ全称是Message Queue,可以理解为消息队列的意思,简单来说就是消息以管道的方式进行传递。

深入了解RabbitMQ工作原理及简单使用

RabbitMQ系列文章

  1. RabbitMQ在Ubuntu上的环境搭建
  2. 深入了解RabbitMQ工作原理及简单使用
  3. RabbitMQ交换器Exchange介绍与实践
  4. RabbitMQ事务和Confirm发送方消息确认——深入解读
  5. 使用Docker部署RabbitMQ集群
  6. 你不知道的RabbitMQ集群架构全解

RabbitMQ简介

在介绍RabbitMQ之前实现要介绍一下MQ,MQ是什么?

MQ全称是Message Queue,可以理解为消息队列的意思,简单来说就是消息以管道的方式进行传递。

RabbitMQ是一个实现了AMQP(Advanced Message Queuing Protocol)高级消息队列协议的消息队列服务,用Erlang语言的。

使用场景

在我们秒杀抢购商品的时候,系统会提醒我们稍等排队中,而不是像几年前一样页面卡死或报错给用户。

像这种排队结算就用到了消息队列机制,放入通道里面一个一个结算处理,而不是某个时间断突然涌入大批量的查询新增把数据库给搞宕机,所以RabbitMQ本质上起到的作用就是削峰填谷,为业务保驾护航。

为什么选择RabbitMQ

现在的市面上有很多MQ可以选择,比如ActiveMQ、ZeroMQ、Appche Qpid,那问题来了为什么要选择RabbitMQ?

  1. 除了Qpid,RabbitMQ是唯一一个实现了AMQP标准的消息服务器;
  2. 可靠性,RabbitMQ的持久化支持,保证了消息的稳定性;
  3. 高并发,RabbitMQ使用了Erlang开发语言,Erlang是为电话交换机开发的语言,天生自带高并发光环,和高可用特性;
  4. 集群部署简单,正是应为Erlang使得RabbitMQ集群部署变的超级简单;
  5. 社区活跃度高,根据网上资料来看,RabbitMQ也是首选;

工作机制

生产者、消费者和代理

在了解消息通讯之前首先要了解3个概念:生产者、消费者和代理。

生产者:消息的创建者,负责创建和推送数据到消息服务器;

消费者:消息的接收方,用于处理数据和确认消息;

代理:就是RabbitMQ本身,用于扮演“快递”的角色,本身不生产消息,只是扮演“快递”的角色。

消息发送原理

首先你必须连接到Rabbit才能发布和消费消息,那怎么连接和发送消息的呢?

你的应用程序和Rabbit Server之间会创建一个TCP连接,一旦TCP打开,并通过了认证,认证就是你试图连接Rabbit之前发送的Rabbit服务器连接信息和用户名和密码,有点像程序连接数据库,使用Java有两种连接认证的方式,后面代码会详细介绍,一旦认证通过你的应用程序和Rabbit就创建了一条AMQP信道(Channel)。

信道是创建在“真实”TCP上的虚拟连接,AMQP命令都是通过信道发送出去的,每个信道都会有一个唯一的ID,不论是发布消息,订阅队列或者介绍消息都是通过信道完成的。

为什么不通过TCP直接发送命令?

对于操作系统来说创建和销毁TCP会话是非常昂贵的开销,假设高峰期每秒有成千上万条连接,每个连接都要创建一条TCP会话,这就造成了TCP连接的巨大浪费,而且操作系统每秒能创建的TCP也是有限的,因此很快就会遇到系统瓶颈。

如果我们每个请求都使用一条TCP连接,既满足了性能的需要,又能确保每个连接的私密性,这就是引入信道概念的原因。

img_9f2e4cfee02b63851e7dc41b32022571.png

你必须知道的Rabbit

想要真正的了解Rabbit有些名词是你必须知道的。

包括:ConnectionFactory(连接管理器)、Channel(信道)、Exchange(交换器)、Queue(队列)、RoutingKey(路由键)、BindingKey(绑定键)。

ConnectionFactory(连接管理器):应用程序与Rabbit之间建立连接的管理器,程序代码中使用;

Channel(信道):消息推送使用的通道;

Exchange(交换器):用于接受、分配消息;

Queue(队列):用于存储生产者的消息;

RoutingKey(路由键):用于把生成者的数据分配到交换器上;

BindingKey(绑定键):用于把交换器的消息绑定到队列上;

看到上面的解释,最难理解的路由键和绑定键了,那么他们具体怎么发挥作用的,请看下图:

img_0d58e30272d79d7148f4093ca0dd7cea.gif

关于更多交换器的信息,我们在后面再讲。

消息持久化

Rabbit队列和交换器有一个不可告人的秘密,就是默认情况下重启服务器会导致消息丢失,那么怎么保证Rabbit在重启的时候不丢失呢?答案就是消息持久化。

当你把消息发送到Rabbit服务器的时候,你需要选择你是否要进行持久化,但这并不能保证Rabbit能从崩溃中恢复,想要Rabbit消息能恢复必须满足3个条件:

  1. 投递消息的时候durable设置为true,消息持久化,代码:channel.queueDeclare(x, true, false, false, null),参数2设置为true持久化;
  2. 设置投递模式deliveryMode设置为2(持久),代码:channel.basicPublish(x, x, MessageProperties.PERSISTENT_TEXT_PLAIN,x),参数3设置为存储纯文本到磁盘;
  3. 消息已经到达持久化交换器上;
  4. 消息已经到达持久化的队列;

持久化工作原理

Rabbit会将你的持久化消息写入磁盘上的持久化日志文件,等消息被消费之后,Rabbit会把这条消息标识为等待垃圾回收。

持久化的缺点

消息持久化的优点显而易见,但缺点也很明显,那就是性能,因为要写入硬盘要比写入内存性能较低很多,从而降低了服务器的吞吐量,尽管使用SSD硬盘可以使事情得到缓解,但他仍然吸干了Rabbit的性能,当消息成千上万条要写入磁盘的时候,性能是很低的。

所以使用者要根据自己的情况,选择适合自己的方式。

虚拟主机

每个Rabbit都能创建很多vhost,我们称之为虚拟主机,每个虚拟主机其实都是mini版的RabbitMQ,拥有自己的队列,交换器和绑定,拥有自己的权限机制。

vhost特性

  1. RabbitMQ默认的vhost是“/”开箱即用;

  2. 多个vhost是隔离的,多个vhost无法通讯,并且不用担心命名冲突(队列和交换器和绑定),实现了多层分离;

  3. 创建用户的时候必须指定vhost;

vhost操作

可以通过rabbitmqctl工具命令创建:

rabbitmqctl add_vhost[vhost_name]

删除vhost:

rabbitmqctl delete_vhost[vhost_name]

查看所有的vhost:

rabbitmqctl list_vhosts

环境搭建

前文我们已经介绍了Ubuntu搭建RabbitMQ的步骤:RabbitMQ在Ubuntu上的环境搭建

如果你是在Windows10上去安装那就更简单了,先放下载地址:

Erlang/Rabbit Server百度网盘链接:https://pan.baidu.com/s/1TnKDV-ZuXLiIgyK8c8f9dg 密码:wct9

当然也可去Erlang和Rabbit官网去下,就是速度比较慢。我的百度云Rabbit最新版本:3.7.6,Erlang版本:20.2,注意:不要下载最新的Erlang,在Windows10上打开扩展插件有问题,打不开。

  1. 安装Erlang;

  2. 安装Rabbit Server;

  3. 进入安装目录\sbin下,使用命令“rabbitmq-plugins enable rabbitmq_management”启动网页管理插件;

  4. 重启Rabbit服务;

使用:http://localhost:15672进行测试,默认的登陆账号为:guest,密码为:guest

重复安装Rabbit Server的坑

如果不是第一次在Windows上安装Rabbit Server一定要把Rabbit和Erlang卸载干净之后,找到注册表:HKEY_LOCAL_MACHINE\SOFTWARE\Ericsson\Erlang\ErlSrv 删除其下的所有项。

不然会出现Rabbit安装之后启动不了的情况,理论上卸载的顺序也是先Rabbit在Erlang。

代码实现

java版实现,使用maven项目,创建可以查看:MyEclipse2017破解设置与maven项目搭建

项目创建成功之后,添加Rabbit Client jar包,只需要在pom.xml里面配置,如下信息:

 <dependency>
  <groupId>com.rabbitmq</groupId>
  <artifactId>amqp-client</artifactId>
  <version>5.2.0</version>
</dependency>

java实现代码分为两个类,第一个是创建Rabbit连接,第二是应用类使用最简单的方式发布和消费消息。

Rabbit的连接,两种方式:

方式一:

public static Connection GetRabbitConnection() {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setUsername(Config.UserName);
    factory.setPassword(Config.Password);
    factory.setVirtualHost(Config.VHost);
    factory.setHost(Config.Host);
    factory.setPort(Config.Port);
    Connection conn = null;
    try {
        conn = factory.newConnection();
    } catch (Exception e) {
        e.printStackTrace();
    }
    return conn;
}

方式二:

public static Connection GetRabbitConnection2() {
    ConnectionFactory factory = new ConnectionFactory();
    // 连接格式:amqp://userName:password@hostName:portNumber/virtualHost
    String uri = String.format("amqp://%s:%s@%s:%d%s", Config.UserName, Config.Password, Config.Host, Config.Port,
            Config.VHost);
    Connection conn = null;
    try {
        factory.setUri(uri);
        factory.setVirtualHost(Config.VHost);
        conn = factory.newConnection();
    } catch (Exception e) {
        e.printStackTrace();
    }
    return conn;
}

第二部分:应用类,使用最简单的方式发布和消费消息

public static void main(String[] args) {
    Publisher(); // 推送消息

    Consumer(); // 消费消息
}

/**
 * 推送消息
 */
public static void Publisher() {
    // 创建一个连接
    Connection conn = ConnectionFactoryUtil.GetRabbitConnection();
    if (conn != null) {
        try {
            // 创建通道
            Channel channel = conn.createChannel();
            // 声明队列【参数说明:参数一:队列名称,参数二:是否持久化;参数三:是否独占模式;参数四:消费者断开连接时是否删除队列;参数五:消息其他参数】
            channel.queueDeclare(Config.QueueName, false, false, false, null);
            String content = String.format("当前时间:%s", new Date().getTime());
            // 发送内容【参数说明:参数一:交换机名称;参数二:队列名称,参数三:消息的其他属性-routing headers,此属性为MessageProperties.PERSISTENT_TEXT_PLAIN用于设置纯文本消息存储到硬盘;参数四:消息主体】
            channel.basicPublish("", Config.QueueName, null, content.getBytes("UTF-8"));
            System.out.println("已发送消息:" + content);
            // 关闭连接
            channel.close();
            conn.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

/**
 * 消费消息
 */
public static void Consumer() {
    // 创建一个连接
    Connection conn = ConnectionFactoryUtil.GetRabbitConnection();
    if (conn != null) {
        try {
            // 创建通道
            Channel channel = conn.createChannel();
            // 声明队列【参数说明:参数一:队列名称,参数二:是否持久化;参数三:是否独占模式;参数四:消费者断开连接时是否删除队列;参数五:消息其他参数】
            channel.queueDeclare(Config.QueueName, false, false, false, null);

            // 创建订阅器,并接受消息
            channel.basicConsume(Config.QueueName, false, "", new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                        byte[] body) throws IOException {
                    String routingKey = envelope.getRoutingKey(); // 队列名称
                    String contentType = properties.getContentType(); // 内容类型
                    String content = new String(body, "utf-8"); // 消息正文
                    System.out.println("消息正文:" + content);
                    channel.basicAck(envelope.getDeliveryTag(), false); // 手动确认消息【参数说明:参数一:该消息的index;参数二:是否批量应答,true批量确认小于index的消息】
                }
            });

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

代码里面已经写了很详细的注释,在这里也不过多的介绍了。

执行效果,如图:

img_bf56e6a2162435c1038e745cac23e9c4.png

img_fa0be433d68c8212b2b0b3b1a564ccb1.png
如果本文对你有所帮助,请打赏——1元就足够感动我:)
支付宝打赏 微信打赏
联系邮箱:intdb@qq.com
我的GitHub: https://github.com/vipstone
关注公众号: img_9bde0f31ac4a0eca10b1bd7414b78faf.png


作者: 王磊
出处: http://vipstone.cnblogs.com/
本文版权归作者和博客园共有,欢迎转载,请标明出处。

相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
10月前
|
消息中间件 存储 缓存
RocketMQ工作原理详解及开发实例
RocketMQ工作原理详解及开发实例
506 0
|
27天前
|
消息中间件 存储 安全
【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息顺序消费和并发消费机制体系的原理分析
【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息顺序消费和并发消费机制体系的原理分析
25 0
|
7月前
|
消息中间件 存储 网络协议
从原理到实战,手把手教你在项目中使用RabbitMQ
RabbitMQ 的文章之前写过,但是当时给的示例是 Demo 版的,这篇文章主要是结合之前写的理论知识,将 RabbitMQ 集成到技术派项目中。 话不多说,上文章目录: 下面我们先回顾一下理论知识,如果对这块知识已经清楚的同学,可以直接跳到实战部分。 1. 消息队列 1.1 消息队列模式 消息队列目前主要 2 种模式,分别为“点对点模式”和“发布/订阅模式”。 点对点模式 一个具体的消息只能由一个消费者消费,多个生产者可以向同一个消息队列发送消息,但是一个消息在被一个消息者处理的时候,这个消息在队列上会被锁住或者被移除并且其他消费者无法处理该消息。 需要额外注意的是,如果消费者
446 5
|
27天前
|
消息中间件 存储 Kafka
【深入浅出 RocketMQ原理及实战】「底层源码挖掘系列」透彻剖析贯穿一下RocketMQ和Kafka索引设计原理和方案
【深入浅出 RocketMQ原理及实战】「底层源码挖掘系列」透彻剖析贯穿一下RocketMQ和Kafka索引设计原理和方案
48 1
|
5月前
|
消息中间件 Kafka 测试技术
微服务轮子项目(33) -RocketMQ特点、安装部署、异常处理、事务消息原理
微服务轮子项目(33) -RocketMQ特点、安装部署、异常处理、事务消息原理
93 0
|
1月前
|
消息中间件 存储 Cloud Native
【Spring云原生系列】Spring RabbitMQ:异步处理机制的基础--消息队列 原理讲解+使用教程
【Spring云原生系列】Spring RabbitMQ:异步处理机制的基础--消息队列 原理讲解+使用教程
|
1月前
|
消息中间件 存储 缓存
【Redis实战】有MQ为啥不用?用Redis作消息队列!?Redis作消息队列使用方法及底层原理高级进阶
【Redis实战】有MQ为啥不用?用Redis作消息队列!?Redis作消息队列使用方法及底层原理高级进阶
|
2月前
|
消息中间件 存储 Apache
精华推荐 | 【深入浅出RocketMQ原理及实战】「性能原理挖掘系列」透彻剖析贯穿RocketMQ的事务性消息的底层原理并在分析其实际开发场景
事务消息(Transactional Message)是指应用本地事务和发送消息操作可以被定义到全局事务中,要么同时成功,要么同时失败。RocketMQ的事务消息提供类似 X/Open XA 的分布事务功能,通过事务消息能达到分布式事务的最终一致。
362 2
精华推荐 | 【深入浅出RocketMQ原理及实战】「性能原理挖掘系列」透彻剖析贯穿RocketMQ的事务性消息的底层原理并在分析其实际开发场景
|
6月前
|
消息中间件 存储 监控
消息中间件第八讲:消息队列 RocketMQ 版实战、集群及原理
消息中间件第八讲:消息队列 RocketMQ 版实战、集群及原理
100 0
|
7月前
|
消息中间件 存储 网络协议
面试跳槽季惊艳面试官的回答:谈谈你对RabbitMQ工作原理的理解?
一个5年工作经验的小伙伴,在面试的时候被这样一个问题。谈谈你对RabbitMQ架构原理的理解。当时,这位小伙伴只解答说,我只会用,原理并没有关注过。那今天我给大家来分享一下我的理解。
62 1

热门文章

最新文章