RabbitMQ消息队列学习笔记

简介: 初次使用AMQP的过程中,总是容易被AMQP支持的消息模型绕晕,这里结合官方的教程,对AMQP的消息模型做一个简要总结,供参考

作者:俏巴

概述

初次使用AMQP的过程中,总是容易被AMQP支持的消息模型绕晕,这里结合官方的教程,对AMQP的消息模型做一个简要总结,供参考。目前官方给出了六种消息发送/接收模型,这里主要介绍前五种消息模型。

消息模型

1、Hello World

简单模式就是生产者将消息发送到队列、消费者从队列中获取消息。一条消息对应一个消费者。

image.png

示例代码说明:

测试使用的是阿里云的AMQP消息队列服务,具体的代码配置过程可以参考阿里云官方链接

工具类

import AMQP.AliyunCredentialsProvider;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class ConnectionUtil {

  public static Connection getConnection() throws Exception{
      // 初始化参数设置
      String AccessKey= "********";
      String SecretKey = "********";
      Long Uid = ********16617278L;
      String VhostName = "********";
      String host = "********16617278.mq-amqp.cn-hangzhou-a.aliyuncs.com";

      // 定义连接工厂
      ConnectionFactory connectionFactory = new ConnectionFactory();
      // 设置服务地址
      connectionFactory.setHost(host);
      // 端口
      connectionFactory.setPort(5672);
      // 设置用户名、密码、vhost
      connectionFactory.setCredentialsProvider(new AliyunCredentialsProvider(AccessKey,SecretKey,Uid));
      connectionFactory.setAutomaticRecoveryEnabled(true);
      connectionFactory.setNetworkRecoveryInterval(5000);
      connectionFactory.setVirtualHost(VhostName);

      // 通过工厂获取连接对象
      Connection connection = connectionFactory.newConnection();
      return connection;
  }
}

发送端示例代码

import AMQP.RabbitMQTutorials.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

// hello world 单个消费者和接收者
public class Send {

    private final static String Queue_name = "helloDemo";
    public static void main(String[] args) throws Exception {
        // 获取连接及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(Queue_name,false,false,false,null);
        //消息内容
        String message = "Hello World!";
        // 1、交换机,此处无   2、发送到那个队列 3、属性  4、消息内容
        channel.basicPublish("",Queue_name,null,message.getBytes());

        System.out.println("发送数据:" + message);

        // 关闭连接
        channel.close();
        connection.close();
    }
}

消费端示例代码

import AMQP.RabbitMQTutorials.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;

public class Receiver {
    private final static String Queue_name = "helloDemo";
    public static void main(String[] args) throws Exception{
        Connection connection = ConnectionUtil.getConnection();
        final Channel channel =  connection.createChannel();
        
        // 开始消费消息
        channel.basicConsume(Queue_name, false, "ConsumerTag", new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                //接收到的消息,进行业务逻辑处理
                System.out.println("message receive: ");
                System.out.println("Received: " + new String(body, "UTF-8") + ", deliveryTag: " + envelope.getDeliveryTag());
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });
        Thread.sleep(100000);
        channel.close();
        connection.close();
    }
}

2、Work Queues

一条消息可以被多个消费者尝试接收,最终只有一个消费者能够获取到消息。

image.png

发送端示例代码

import AMQP.RabbitMQTutorials.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

// 1:N  消费者各自接收消息
public class Sender {

    private final static String queueName = "workQueue";
    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 声明队列
        channel.queueDeclare(queueName,false,false,false,null);
        for (int i = 0; i < 100; i++) {

            String message = "workqueues message " + i;
            channel.basicPublish("",queueName,null,message.getBytes());
            System.out.println("发送消息: " + message);

            Thread.sleep(10);//休眠
        }
        // 关闭连接
        channel.close();
        connection.close();
    }
}

消费端示例代码1

import AMQP.RabbitMQTutorials.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;

public class Receiver1 {

    private final static String queueName = "workQueue";
    public static void main(String[] args) throws Exception{

        Connection connection = ConnectionUtil.getConnection();
        final Channel channel = connection.createChannel();
        channel.queueDeclare(queueName,false,false,false,null);
        channel.basicQos(1);//告诉服务器,在没有确认当前消息完成之前,不要给我发新的消息。

        DefaultConsumer consumer = new DefaultConsumer(channel){
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                //接收到的消息,进行业务逻辑处理
                System.out.println("message receive1: ");
                System.out.println("Received1: " + new String(body, "UTF-8") + ", deliveryTag: " + envelope.getDeliveryTag());
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                channel.basicAck(envelope.getDeliveryTag(), false);// 参数2 false为确认收到消息, true为拒绝收到消息
            }
        };
        channel.basicConsume(queueName,false,consumer);// 参数2 手动确认,代表我们收到消息后需要手动确认告诉服务器我们收到消息了
    }
}

消费端示例代码2

import AMQP.RabbitMQTutorials.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;

public class Receiver2 {

    private final static String queueName = "workQueue";
    public static void main(String[] args) throws Exception{

        Connection connection = ConnectionUtil.getConnection();
        final Channel channel = connection.createChannel();
        channel.queueDeclare(queueName,false,false,false,null);
        channel.basicQos(1);//告诉服务器,在没有确认当前消息完成之前,不要给我发新的消息。

        DefaultConsumer consumer = new DefaultConsumer(channel){
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                //接收到的消息,进行业务逻辑处理
                System.out.println("message receive2: ");
                System.out.println("Received2: " + new String(body, "UTF-8") + ", deliveryTag: " + envelope.getDeliveryTag());
                try {
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                channel.basicAck(envelope.getDeliveryTag(), false);// 参数2 false为确认收到消息, true为拒绝收到消息
            }
        };
        channel.basicConsume(queueName,false,consumer);// 参数2 手动确认,代表我们收到消息后需要手动确认告诉服务器我们收到消息了
    }
}

3、Publish/Subscribe

一条消息可以被多个消费者同时获取,生产者将消息发送给交换机,消费者将自己对应的队列注册到交换机,当发送消息后,所有注册的队列的消费者都可以收到消息。

image.png

发送端示例代码

import AMQP.RabbitMQTutorials.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Sender {

    private static String Exchange_Name = "ExchangeDemo";//声明交换机
    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 声明exchange
        // Producer 将消息发送到 Exchange ,由 Exchange 将消息路由到一个或多个 Queue 中(或者丢弃),Exchange 按照相应的 Binding 逻辑将消息路由到 Queue。
        channel.exchangeDeclare(Exchange_Name,"fanout");
        String message = "Exchange message demo";

        // 消息发送端交换机,如果此时没有队列绑定,则消息会丢失,因为交换机没有存储消息的能力
        channel.basicPublish(Exchange_Name,"",null,message.getBytes());
        System.out.println("发送消息: " + message);
    }
}

消费端示例代码1

import AMQP.RabbitMQTutorials.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;

public class Sub1 {
    private static String Exchange_Name = "ExchangeDemo";//声明交换机
    public static void main(String[] args) throws Exception{
        Connection connection = ConnectionUtil.getConnection();
        final Channel channel = connection.createChannel();

        channel.queueDeclare("testqueue1",false,false,false,null);
        // 绑定到交换机
        channel.queueBind("testqueue1",Exchange_Name,"");
        channel.basicQos(1);

        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException{

                System.out.println("sub1: " + new String(body));
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        channel.basicConsume("testqueue1",false,consumer);
    }
}

消费端示例代码2

import AMQP.RabbitMQTutorials.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;

public class Sub2 {
    private static String Exchange_Name = "ExchangeDemo";//声明交换机

    public static void main(String[] args) throws Exception{
        Connection connection = ConnectionUtil.getConnection();
        final Channel channel = connection.createChannel();
        channel.queueDeclare("testqueue2",false,false,false,null);
        // 绑定到交换机
        channel.queueBind("testqueue2",Exchange_Name,"");
        channel.basicQos(1);

        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException{
                System.out.println("sub2: " + new String(body));
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        channel.basicConsume("testqueue2",false,consumer);
    }
}

4、Routing

生产者将消息发送到type为direct模式的交换机,消费者的队列将自己绑定到路由的时候给自己绑定一个key,只有生产者发送的消息key和绑定的key一致时,消费者才能收到对应的消息。

image.png

发送端示例代码

import AMQP.RabbitMQTutorials.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Sender {
    private static final String ExchangeName = "Rout_Change";//路由消息交换机

    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(ExchangeName,"direct");
        channel.basicPublish(ExchangeName,"key3",null,"route 消息".getBytes());

        channel.close();
        connection.close();
    }
}

消费端示例代码

import AMQP.RabbitMQTutorials.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;

public class Sub {
    private static final String ExchangeName = "Rout_Change";//路由消息交换机
    public static void main(String[] args) throws Exception{

        Connection connection = ConnectionUtil.getConnection();
        final Channel channel = connection.createChannel();

        // 声明队列
        channel.queueDeclare("testroutequeue1",false,false,false,null);

        // 绑定交换机
        // 参数3 标记 绑定到交换机的时候会有一个标记,只有和它一样标记的消息才会别消费到
        channel.queueBind("testroutequeue1",ExchangeName,"key1");
        channel.queueBind("testroutequeue1",ExchangeName,"key2");//绑定多个标记
        channel.basicQos(1);

        DefaultConsumer consumer = new DefaultConsumer(channel){
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                //接收到的消息,进行业务逻辑处理
                System.out.println("message route receive1: ");
                System.out.println("Received1: " + new String(body, "UTF-8") + ", deliveryTag: " + envelope.getDeliveryTag());
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                channel.basicAck(envelope.getDeliveryTag(), false);// 参数2 false为确认收到消息, true为拒绝收到消息
            }
        };
        channel.basicConsume("testroutequeue1",false,consumer);// 参数2 手动确认,代表我们收到消息后需要手动确认告诉服务器我们收到消息了
    }
}

5、Topics

该类型与 Direct 类型相似,只是规则没有那么严格,可以模糊匹配和多条件匹配,即该类型 Exchange 使用 Routing key 模式匹配和字符串比较的方式将消息路由至绑定的 Queue。

示例:

Routing key 为 use.stock 的消息会转发给绑定匹配模式为 .stock, use.stock, . 和 #.use.stock.# 的 Queue; 表是匹配一个任意词组,# 表示匹配 0 个或多个词组。

image.png

发送端示例代码

import AMQP.RabbitMQTutorials.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Sender {
    private static String Exchange_Name = "Exchange_Topic";
    public static void main(String[] args) throws Exception{

        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 声明exchange类型为Topic 也就是通配符模式
        channel.exchangeDeclare(Exchange_Name,"topic");
        channel.basicPublish(Exchange_Name,"abc.1.2",null,"Topic 模式消息".getBytes());

        // 关闭通道和连接
        channel.close();
        connection.close();
    }
}

接收端示例代码

import AMQP.RabbitMQTutorials.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;

public class Sub {
    private static String ExchangeName = "Exchange_Topic";
    public static void main(String[] args) throws Exception{
        Connection connection = ConnectionUtil.getConnection();
        final Channel channel = connection.createChannel();

        // 声明队列
        channel.queueDeclare("topicqueue",false,false,false,null);
        // 绑定交换机
        // 参数3 标记 绑定到交换机的时候会有一个标记,只有和它一样标记的消息才会别消费到
        channel.queueBind("topicqueue",ExchangeName,"key.*");
        channel.queueBind("topicqueue",ExchangeName,"abc.#");//绑定多个标记
        channel.basicQos(1);

        DefaultConsumer consumer = new DefaultConsumer(channel){
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                //接收到的消息,进行业务逻辑处理
                System.out.println("message route receive1: ");
                System.out.println("Received1: " + new String(body, "UTF-8") + ", deliveryTag: " + envelope.getDeliveryTag());
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                channel.basicAck(envelope.getDeliveryTag(), false);// 参数2 false为确认收到消息, true为拒绝收到消息
            }
        };
        channel.basicConsume("topicqueue",false,consumer);// 参数2 手动确认,代表我们收到消息后需要手动确认告诉服务器我们收到消息了
    }
}

参考链接
RabbitMQ Tutorials

Rabbitmq企业级消息队列视频课程

相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
2月前
|
消息中间件 网络协议 JavaScript
MQTT常见问题之微消息队列mqtt支持ipv6失败如何解决
MQTT(Message Queuing Telemetry Transport)是一个轻量级的、基于发布/订阅模式的消息协议,广泛用于物联网(IoT)中设备间的通信。以下是MQTT使用过程中可能遇到的一些常见问题及其答案的汇总:
|
2月前
|
消息中间件 物联网 Java
MQTT常见问题之微消息队列配置失败如何解决
MQTT(Message Queuing Telemetry Transport)是一个轻量级的、基于发布/订阅模式的消息协议,广泛用于物联网(IoT)中设备间的通信。以下是MQTT使用过程中可能遇到的一些常见问题及其答案的汇总:
|
2月前
|
消息中间件 Java
springboot整合消息队列——RabbitMQ
springboot整合消息队列——RabbitMQ
76 0
|
2天前
|
消息中间件 存储 运维
深入理解MQ消息队列的高可用与可靠性策略
深入理解MQ消息队列的高可用与可靠性策略
16 3
|
18天前
|
消息中间件 大数据 Java
消息队列 MQ
消息队列 MQ
25 3
|
22天前
|
消息中间件 数据安全/隐私保护
MQTT微消息队列服务器连接报错:Error: Connection refused: Not authorized
使用MQTTX工具进行测试时,通过AccessKey创建了Client ID的用户名和密码。配置了公网接入点及端口1883,但尝试连接时出现错误。已附上工具截图:![](https://ucc.alicdn.com/pic/developer-ecology/3byii5uar64gg_36327474e991439da422f38c450ef153.png)。确认过用户名、密码和Client ID无误,问题仍未解决,期待回复!
|
1月前
|
消息中间件 存储 监控
解析RocketMQ:高性能分布式消息队列的原理与应用
RocketMQ是阿里开源的高性能分布式消息队列,具备低延迟、高吞吐和高可靠性,广泛应用于电商、金融等领域。其核心概念包括Topic、Producer、Consumer、Message和Name Server/Broker。RocketMQ支持异步通信、系统解耦、异步处理和流量削峰。关键特性有分布式架构、顺序消息、高可用性设计和消息事务。提供发布/订阅和点对点模型,以及消息过滤功能。通过集群模式、存储方式、发送和消费方式的选择进行性能优化。RocketMQ易于部署,可与Spring集成,并与Kafka等系统对比各有优势,拥有丰富的生态系统。
152 4
|
1月前
|
消息中间件 存储 负载均衡
消息队列学习之RabbitMQ
【4月更文挑战第3天】消息队列学习之RabbitMQ,一种基于erlang语言开发的流行的开源消息中间件。
20 0
|
2月前
|
消息中间件 存储 中间件
【SpringCloud Stream消息驱动、设计思想以及整合rabbitmq消息队列案例--学习笔记】
【SpringCloud Stream消息驱动、设计思想以及整合rabbitmq消息队列案例--学习笔记】
59 0
|
2月前
|
消息中间件 缓存 API

热门文章

最新文章

相关产品

  • 云消息队列 MQ