RabbitMQ消息中间件

本文涉及的产品
Serverless 应用引擎免费试用套餐包,4320000 CU,有效期3个月
注册配置 MSE Nacos/ZooKeeper,118元/月
性能测试 PTS,5000VUM额度
简介: RabbitMQ消息中间件

初识RabbitMQ        

RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。AMQP协议更多用在企业系统内,对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次。


RabbitMQ的优点:开源、性能优秀、稳定性保障、提供可靠性消息投递模式、返回模式、与SpringAMQP完美整合、api丰富、在保证数据不丢失的情况下做到高可靠性、可用性。


AMQP高级消息队列协议(定义):是具有现代特征的二进制协议。是一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。

AMQP核心概念:

RabbitMQ重要概念

20190121202014272.png

Server:又称Broker,接受客户端连接,实现AMQP实体服务

Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。

Queue:消息队列载体,每个消息都会被投入到一个或多个队列。

Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来。

Routing Key:路由关键字,exchange根据这个关键字进行消息投递。

Virtual Host:虚拟主机,一个broker里可以开设多个Virtual Host,用作不同用户的权限分离。

producer:消息生产者,就是投递消息的程序。

consumer:消息消费者,就是接受消息的程序。

channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务。

Linux下安装RabbitMQ

http://www.rabbitmq.com/releases/

Erlang下载地址:http://www.rabbitmq.com/releases/erlang/

RabbitMQ下载地址:https://www.rabbitmq.com/download.html

版本对应关系:https://www.rabbitmq.com/which-erlang.html

安装rabbitmq所需要的依赖包

yum install build-essential openssl openssl-devel unixODBC unixODBC-devel make gcc gcc-c++ kernel-devel m4 ncurses-devel tk tc xz

安装Erlang出错,因为本系统是centos6.8所带的glibc只有到2.14所以要先升级我们的glibc,编写一个sh脚本,然后执行,下载的过程比较久。如果本脚本不行的话可以在脚本的最后添加强制安装 --force --nodeps

#! /bin/sh
# update glibc to 2.17 for CentOS 6
wget http://copr-be.cloud.fedoraproject.org/results/mosquito/myrepo-el6/epel-6-x86_64/glibc-2.17-55.fc20/glibc-2.17-55.el6.x86_64.rpm
wget http://copr-be.cloud.fedoraproject.org/results/mosquito/myrepo-el6/epel-6-x86_64/glibc-2.17-55.fc20/glibc-common-2.17-55.el6.x86_64.rpm
wget http://copr-be.cloud.fedoraproject.org/results/mosquito/myrepo-el6/epel-6-x86_64/glibc-2.17-55.fc20/glibc-devel-2.17-55.el6.x86_64.rpm
wget http://copr-be.cloud.fedoraproject.org/results/mosquito/myrepo-el6/epel-6-x86_64/glibc-2.17-55.fc20/glibc-headers-2.17-55.el6.x86_64.rpm
sudo rpm -Uvh glibc-2.17-55.el6.x86_64.rpm \
glibc-common-2.17-55.el6.x86_64.rpm \
glibc-devel-2.17-55.el6.x86_64.rpm \
glibc-headers-2.17-55.el6.x86_64.rpm

安装Erlang

rpm -ivh erlang-18.3-1.el7.centos.x86_64.rpm

安装socat

rpm -ivh socat-1.7.3.2-1.1.el7.x86_64.rpm

安装rabbitmq

rpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm

修改核心配置文件,一个json格式的配置文件,rpm安装的路径是默认的,修改配置文件让rabbitmq可以直接访问

vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin/rabbit.app

{loopback_users, [<<"guest">>]}修改为{loopback_users, [guest]}

20190123214643194.png

服务的启动: rabbitmq-server start &
服务的停止: rabbitmqctl stop_ app
管理插件: rabbitmq-plugins enable rabbitmq_ management

启动:

20190123221049665.png

安装可视化插件

rabbitmq-plugins enable rabbitmq_management

ip+端口15672访问,管控台默认的端口为15672,5672是通信端口java端用的在配置文件里面有,默认的账号和密码为guest

20190123222124101.png

基本命令

rabbitmqctl stop_app:关闭应用
rabbitmqctl start_app:启动应用
rabbitmqctl status:节点状态
rabbitmqctl add user username password: 添加用户
rabbitmqctl list users:列出所有用户
rabbitmqctl delete user username: 删除用户
rabbitmqctl clear permissions -p vhostpath username:清除用户权限
rabbitmqctl list user permissions username:列出用户权限
rabbitmqctl change_ password username newpassword:修改密码
rabbitmqctl set_ permissions -p vhostpath username".*"".*"".*". 设置用户权限
rabbitmqctl add _vhost vhostpath:创建虚拟主机
rabbitmqctl list vhosts:列出所有虚拟主机
rabbitmqctl list_ permissions -p vhostpath:列出虚拟主机上所有权限
rabbitmqctl list_ queues:查看所有队列信息
rabbitmqctl -p vhostpath purge_ queue blue:清除队列里的消息
rabbitmqctl reset:移除所有数据,要在rabbitmqctl stop_ app之后使用
rabbitmqctl join _cluster <clusternode> [--ram] :组成集群命令
rabbitmqctl change cluster node type disc |修改集群节点的存储形式
rabbitmqctl forget cluster_ node [--offline] 忘记节点(摘除节点)
rabbitmqctl rename_ cluster node oldnode1 newnode1 [oldnode2][newnode2..]. (修改节 点名称)

创建一个springboot项目测试一下

需要引入rabbitmq的包

        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>3.6.5</version>
        </dependency>

生成者

package com.smxy.quickstart;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Procuder {
    public static void main(String[] args) throws Exception {
        //1 创建一个ConnectionFactory, 并进行配置
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.254.129");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        //2 通过连接工厂创建连接
        Connection connection = connectionFactory.newConnection();
        //3 通过connection创建一个Channel
        Channel channel = connection.createChannel();
        //4 通过Channel发送数据
        for(int i=0; i < 5; i++){
            String msg = "Hello RabbitMQ!";
            //1 exchange   2 routingKey
            channel.basicPublish("", "test001", null, msg.getBytes());
        }
        //5 记得要关闭相关的连接
        channel.close();
        connection.close();
    }
}

消费者

package com.smxy.quickstart;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
public class Consumer {
    public static void main(String[] args) throws Exception {
        //1 创建一个ConnectionFactory, 并进行配置
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.254.129");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        //2 通过连接工厂创建连接
        Connection connection = connectionFactory.newConnection();
        //3 通过connection创建一个Channel
        Channel channel = connection.createChannel();
        //4 声明(创建)一个队列
        String queueName = "test001";
        channel.queueDeclare(queueName, true, false, false, null);
        //5 创建消费者
        QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
        //6 设置Channel
        channel.basicConsume(queueName, true, queueingConsumer);
        while(true){
            //7 获取消息
            Delivery delivery = queueingConsumer.nextDelivery();
            String msg = new String(delivery.getBody());
            System.err.println("消费端: " + msg);
            //Envelope envelope = delivery.getEnvelope();
        }
    }
}

启动后

20190127230213437.png

交换机属性:

Name:交换机名称


Type:交换机类型direct、topic、fanout、headers


Durability:是否需要持久化,true为持久化


Auto Delete:当最后一个绑定到Exchange上的队列删除后,自动删除该Exchange。


Internal:当前Exchange是否用于RabbitMQ内部使用,默认False


Arguments:扩展参数。

四种交换机类型

Direct Exchange(直连交换机)

1对1-----一个消息只能被一个消费者消费

所有发送到Direct Exchange的消息被转发到RouteKey中指定的Queue

注意: Direct模式可以使用RabbitMQ自带的Exchange: default

Exchange,所以不需要将Exchange进行任何绑定(binding)操作,消息传递时,RouteKey必须完全匹配才会被队列接收,否则该消息会被抛弃。

消费者:

public class Consumer4DirectExchange {
  public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory() ;  
        connectionFactory.setHost("192.168.254.129");
        connectionFactory.setPort(5672);
    connectionFactory.setVirtualHost("/");
        connectionFactory.setAutomaticRecoveryEnabled(true);
        connectionFactory.setNetworkRecoveryInterval(3000);
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();  
    //4 声明
    String exchangeName = "test_direct_exchange";
    String exchangeType = "direct";
    String queueName = "test_direct_queue";
    String routingKey = "test.direct";
    //表示声明了一个交换机
    channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
    //表示声明了一个队列
    channel.queueDeclare(queueName, false, false, false, null);
    //建立一个绑定关系:
    channel.queueBind(queueName, exchangeName, routingKey);
        //durable 是否持久化消息
        QueueingConsumer consumer = new QueueingConsumer(channel);
        //参数:队列名称、是否自动ACK、Consumer
        channel.basicConsume(queueName, true, consumer);  
        //循环获取消息  
        while(true){  
            //获取消息,如果没有消息,这一步将会一直阻塞  
            Delivery delivery = consumer.nextDelivery();  
            String msg = new String(delivery.getBody());    
            System.out.println("收到消息:" + msg);  
        } 
  }
}

生产者

public class Producer4DirectExchange {
  public static void main(String[] args) throws Exception {
    //1 创建ConnectionFactory
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("192.168.254.129");
    connectionFactory.setPort(5672);
    connectionFactory.setVirtualHost("/");
    //2 创建Connection
    Connection connection = connectionFactory.newConnection();
    //3 创建Channel
    Channel channel = connection.createChannel();  
    //4 声明
    String exchangeName = "test_direct_exchange";
    String routingKey = "test.direct";
    //5 发送
    String msg = "Hello World RabbitMQ 4  Direct Exchange Message 111 ... ";
    channel.basicPublish(exchangeName, routingKey , null , msg.getBytes());     
  }
}

Topic Exchange(主题交换机)

1对多-----一个消息可以被多个消费者消费

所有发送到Topic Exchange的消息被转发到所有关系RouteKey中指定Topic的Queue上。


Exchange将RouteKey和某Topic进行模糊匹配。队列需要绑定一个Topic。


路由关键字通配符


#:可以匹配一个或者多个词


*:可以匹配一个词


Fanout 是最快的


Fanout Exchange(扇型交换机)


广播


不处理路由键,只需要简单的将队列绑定到交换机上。


发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。


Headers exchange(头交换机)

类似主题交换机,但是头交换机使用多个消息属性来代替路由键建立路由规则。通过判断消息头的值能否与指定的绑定相匹配来确立路由规则。

此交换机有个重要参数:”x-match”


当”x-match”为“any”时,消息头的任意一个值被匹配就可以满足条件

当”x-match”设置为“all”的时候,就需要消息头的所有值都匹配成功


消息队列:


Durability:是否持久化,durable:是,Transient:否


Auto delete:如选yes,代表当最后一个监听被移除之后,该Queue会自动被删除。

Message设置

package com.bfxy.rabbitmq.api.message;
import java.util.HashMap;
import java.util.Map;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Procuder {
  public static void main(String[] args) throws Exception {
    //1 创建一个ConnectionFactory, 并进行配置
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("192.168.254.129");
    connectionFactory.setPort(5672);
    connectionFactory.setVirtualHost("/");
    //2 通过连接工厂创建连接
    Connection connection = connectionFactory.newConnection();
    //3 通过connection创建一个Channel
    Channel channel = connection.createChannel();
    Map<String, Object> headers = new HashMap<>();
    headers.put("my1", "111");
    headers.put("my2", "222");
    AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
        .deliveryMode(2)//发送到服务器后,重启服务后消息还存在
        .contentEncoding("UTF-8")
        .expiration("10000")//过期时间
        .headers(headers)
        .build();
    //4 通过Channel发送数据
    for(int i=0; i < 5; i++){
      String msg = "Hello RabbitMQ!";
      //1 exchange   2 routingKey
      channel.basicPublish("", "test001", properties, msg.getBytes());
    }
    //5 记得要关闭相关的连接
    channel.close();
    connection.close();
  }
}
package com.bfxy.rabbitmq.api.message;
import java.util.Map;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
public class Consumer {
  public static void main(String[] args) throws Exception {
    //1 创建一个ConnectionFactory, 并进行配置
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("192.168.254.129");
    connectionFactory.setPort(5672);
    connectionFactory.setVirtualHost("/");
    //2 通过连接工厂创建连接
    Connection connection = connectionFactory.newConnection();
    //3 通过connection创建一个Channel
    Channel channel = connection.createChannel();
    //4 声明(创建)一个队列
    String queueName = "test001";
    channel.queueDeclare(queueName, true, false, false, null);
    //5 创建消费者
    QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
    //6 设置Channel
    channel.basicConsume(queueName, true, queueingConsumer);
    while(true){
      //7 获取消息
      Delivery delivery = queueingConsumer.nextDelivery();
      String msg = new String(delivery.getBody());
      System.err.println("消费端: " + msg);
      Map<String, Object> headers = delivery.getProperties().getHeaders();
      System.err.println("headers get my1 value: " + headers.get("my1"));
      //Envelope envelope = delivery.getEnvelope();
    }
  }
}


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
5月前
|
消息中间件 存储 Java
RocketMQ(一):消息中间件缘起,一览整体架构及核心组件
【10月更文挑战第15天】本文介绍了消息中间件的基本概念和特点,重点解析了RocketMQ的整体架构和核心组件。消息中间件如RocketMQ、RabbitMQ、Kafka等,具备异步通信、持久化、削峰填谷、系统解耦等特点,适用于分布式系统。RocketMQ的架构包括NameServer、Broker、Producer、Consumer等组件,通过这些组件实现消息的生产、存储和消费。文章还提供了Spring Boot快速上手RocketMQ的示例代码,帮助读者快速入门。
|
6月前
|
消息中间件 存储 RocketMQ
消息中间件-RocketMQ技术(二)
消息中间件-RocketMQ技术(二)
|
6月前
|
消息中间件 存储 中间件
消息中间件-RocketMQ技术(一)
消息中间件-RocketMQ技术(一)
|
4月前
|
消息中间件 存储 Apache
探索 RocketMQ:企业级消息中间件的选择与应用
RocketMQ 是一个高性能、高可靠、可扩展的分布式消息中间件,它是由阿里巴巴开发并贡献给 Apache 软件基金会的一个开源项目。RocketMQ 主要用于处理大规模、高吞吐量、低延迟的消息传递,它是一个轻量级的、功能强大的消息队列系统,广泛应用于金融、电商、日志系统、数据分析等领域。
277 0
探索 RocketMQ:企业级消息中间件的选择与应用
|
5月前
|
消息中间件 编解码 Docker
【Docker项目实战】Docker部署RabbitMQ消息中间件
【10月更文挑战第8天】Docker部署RabbitMQ消息中间件
188 1
【Docker项目实战】Docker部署RabbitMQ消息中间件
|
4月前
|
消息中间件 存储 Java
吃透 RocketMQ 消息中间件,看这篇就够了!
本文详细介绍 RocketMQ 的五大要点、核心特性及应用场景,涵盖高并发业务场景下的消息中间件关键知识点。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
吃透 RocketMQ 消息中间件,看这篇就够了!
|
7月前
|
消息中间件 Java 测试技术
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
这篇文章是关于如何在SpringBoot应用中整合RabbitMQ的消息中间件。内容包括了在SpringBoot项目中添加RabbitMQ的依赖、配置文件设置、启动类注解,以及如何通过单元测试来创建交换器、队列、绑定,并发送和接收消息。文章还介绍了如何配置消息转换器以支持对象的序列化和反序列化,以及如何使用注解`@RabbitListener`来接收消息。
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
|
7月前
|
消息中间件 Docker 容器
消息中间件RabbitMQ---Docker安装RabbitMQ、以及RabbitMQ的基本使用【二】
这篇文章提供了RabbitMQ的安装和基本使用教程,包括如何使用Docker拉取RabbitMQ镜像、创建容器、通过浏览器访问管理界面,以及如何创建交换机、队列、绑定和使用direct、fanout和topic三种类型的交换器进行消息发布和接收的测试。
消息中间件RabbitMQ---Docker安装RabbitMQ、以及RabbitMQ的基本使用【二】
|
7月前
|
消息中间件 存储 网络协议
消息中间件RabbitMQ---概述和概念 【一】
该文章提供了对消息中间件RabbitMQ的全面概述,包括其核心概念、工作原理以及与AMQP和JMS的关系。
消息中间件RabbitMQ---概述和概念 【一】
|
8月前
|
消息中间件 缓存 IDE
MetaQ/RocketMQ 原理问题之消息队列中间件的问题如何解决
MetaQ/RocketMQ 原理问题之消息队列中间件的问题如何解决

相关产品

  • 云消息队列 MQ