RabbitMQ的四种消息传递模式与演示代码

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: RabbitMQ的四种消息传递模式与演示代码

RabbitMQ的四种消息传递模式与演示代码


RabbitMQ是一个功能强大的消息代理,提供了多种消息传递模式来满足不同场景下的需求。本文将介绍RabbitMQ的四种常用消息传递模式:Work、Fanout、Direct、Topic,并给出相应的Java示例代码。


1. Work模式


Work模式也被称为任务队列模式,它将任务分发给多个消费者,并由消费者竞争性地消费任务。每个任务只会被一个消费者处理。


Java示例代码:

import com.rabbitmq.client.*;

public class Worker {
    private static final String TASK_QUEUE_NAME = "task_queue";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        final Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();

        channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        channel.basicQos(1);

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");

                System.out.println(" [x] Received '" + message + "'");
                try {
                    doWork(message);
                } finally {
                    System.out.println(" [x] Done");
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };
        channel.basicConsume(TASK_QUEUE_NAME, false, consumer);
    }

    private static void doWork(String task) {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException _ignored) {
            Thread.currentThread().interrupt();
        }
    }
}

应用场景


一个常见的应用场景是在Web应用中实现异步任务处理。例如,用户在网站上提交了一个长时间处理的任务(如生成报表、发送邮件等),为了提高用户体验,可以将任务提交到RabbitMQ的任务队列中,然后由后台的消费者进行异步处理。这样一来,用户在网站上提交任务后即可立即得到响应,而不必等待任务处理完成,提高了系统的响应速度和并发处理能力。


下面是一个使用Work模式的简单示例代码,包括生产者和消费者部分,用Java编写,并添加了详细的注释。


生产者(Producer)代码:

import com.rabbitmq.client.*;

public class Producer {
    private static final String TASK_QUEUE_NAME = "task_queue";

    public static void main(String[] args) throws Exception {
        // 创建连接和频道
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            // 声明队列
            channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
            
            // 构造消息
            String message = String.join(" ", args);
            
            // 发送消息
            channel.basicPublish("", TASK_QUEUE_NAME,
                    MessageProperties.PERSISTENT_TEXT_PLAIN,
                    message.getBytes("UTF-8"));
            System.out.println(" [x] Sent '" + message + "'");
        }
    }
}

消费者(Consumer)代码:

import com.rabbitmq.client.*;

public class Consumer {
    private static final String TASK_QUEUE_NAME = "task_queue";

    public static void main(String[] args) throws Exception {
        // 创建连接和频道
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        final Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();

        // 声明队列
        channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        // 设置消息处理的回调函数
        channel.basicConsume(TASK_QUEUE_NAME, false, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws java.io.IOException {
                String message = new String(body, "UTF-8");
                
                // 模拟任务处理过程
                try {
                    doWork(message);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    System.out.println(" [x] Done");
                    // 手动发送应答
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        });
    }

    private static void doWork(String task) throws InterruptedException {
        // 模拟任务处理过程,这里休眠1秒
        Thread.sleep(1000);
    }
}

在上述代码中,生产者(Producer)将消息发布到名为task_queue的队列中,而消费者(Consumer)则从该队列中获取任务并进行处理。每个任务都会被一个消费者处理,通过模拟任务处理过程,演示了Work模式的基本使用方式。


2. Fanout模式


Fanout模式是RabbitMQ中的一种消息传递模式,它将消息广播到所有绑定到Exchange的队列中,即使在消息发布之后才创建的队列,也能接收到消息。


原理解析

在Fanout模式中,生产者将消息发布到一个名为Exchange的交换机中,而不是直接发送到队列中。交换机会将收到的消息广播到与其绑定的所有队列中。因此,无论在消息发布之前还是之后创建的队列,只要它们与交换机进行了绑定,就能接收到交换机广播的消息。


在示例代码中,首先声明一个名为logs的Fanout类型的Exchange,并将消息发布到该Exchange中。消息发布时,并没有指定具体的队列,而是将消息发送到了Exchange中。Exchange会将消息广播到所有与其绑定的队列中,这就是Fanout模式的工作原理。


应用场景

一个常见的应用场景是日志处理系统。在一个分布式的日志系统中,通常会有多个日志消费者,它们分别负责处理不同级别(如info、error等)的日志。通过使用Fanout模式,可以将日志消息广播到所有相关的队列中,每个消费者只需要关注自己负责处理的日志级别,从而实现了日志的分发和处理。


通过Fanout模式,我们可以实现消息的广播传递,适用于多个消费者需要同时接收同一份消息的场景,例如日志处理系统、实时广播等。


Java示例代码:
import com.rabbitmq.client.*;

public class EmitLog {
    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

            String message = getMessage(argv);

            channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
            System.out.println(" [x] Sent '" + message + "'");
        }
    }

    private static String getMessage(String[] strings) {
        if (strings.length < 1) {
            return "info: Hello World!";
        }
        return String.join(" ", strings);
    }
}


3. Direct模式


Direct模式将消息路由到与消息的RoutingKey完全匹配的队列中。


应用场景


一个常见的应用场景是日志级别过滤。在一个分布式的日志系统中,可能有多个消费者负责处理不同级别的日志(如info、error、warning等)。通过使用Direct模式,生产者可以根据日志的级别指定不同的RoutingKey,并将日志消息发布到Exchange中。Exchange会将日志消息路由到与其RoutingKey完全匹配的队列中,从而实现了日志的级别过滤和分发。


Java示例代码:
import com.rabbitmq.client.*;

public class EmitLogDirect {
    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.exchangeDeclare(EXCHANGE_NAME, "direct");

            String severity = getSeverity(argv);
            String message = getMessage(argv);

            channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8"));
            System.out.println(" [x] Sent '" + severity + "':'" + message + "'");
        }
    }

    private static String getSeverity(String[] strings) {
        if (strings.length < 1) {
            return "info";
        }
        return strings[0];
    }

    private static String getMessage(String[] strings) {
        if (strings.length < 2) {
            return "Hello World!";
        }
        return String.join(" ", strings);
    }
}


4. Topic模式


Topic模式是RabbitMQ中的一种消息传递模式,它将消息发送到与匹配通配符的RoutingKey相匹配的队列中.


原理解析


在Topic模式中,生产者将消息发布到一个名为Exchange的交换机中,并且在发送消息时需要指定一个RoutingKey。RoutingKey可以使用通配符(和#)来匹配多个队列,其中表示匹配一个单词,#表示匹配零个或多个单词。交换机会将收到的消息根据RoutingKey和通配符匹配规则将消息路由到与之匹配的队列中。


在示例代码中,我们首先声明一个名为topic_logs的Topic类型的Exchange,并指定消息的RoutingKey。然后将消息发布到Exchange中,Exchange会根据消息的RoutingKey和通配符匹配规则将消息路由到与之匹配的队列中。


应用场景


一个常见的应用场景是日志过滤器。在一个分布式的日志系统中,可能有多个消费者负责处理不同模块或不同级别的日志。通过使用Topic模式,生产者可以根据日志的模块或级别指定特定的RoutingKey,并将日志消息发布到Exchange中。消费者可以使用通配符来订阅感兴趣的日志模块或级别,Exchange会将日志消息路由到与之匹配的队列中,从而实现了日志的模块化过滤和分发。


Java示例代码:

import com.rabbitmq.client.*;

public class EmitLogTopic {
    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.exchangeDeclare(EXCHANGE_NAME, "topic");

            String routingKey = getRouting(argv);
            String message = getMessage(argv);

            channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
            System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");
        }
    }

    private static String getRouting(String[] strings) {
        if (strings.length < 1) {
            return "anonymous.info";
        }
        return strings[0];
    }

    private static String

 getMessage(String[] strings) {
        if (strings.length < 2) {
            return "Hello World!";
        }
        return String.join(" ", strings);
    }
}
相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
3月前
|
消息中间件
RabbitMQ的 RPC 消息模式你会了吗?
【9月更文挑战第11天】RabbitMQ 的 RPC(远程过程调用)消息模式允许客户端向服务器发送请求并接收响应。其基本原理包括:1) 客户端发送请求,创建回调队列并设置关联标识符;2) 服务器接收请求并发送响应至回调队列;3) 客户端根据关联标识符接收并匹配响应。实现步骤涵盖客户端和服务器的连接、信道创建及请求处理。注意事项包括关联标识符唯一性、回调队列管理、错误处理及性能考虑。RPC 模式适用于构建可靠的分布式应用程序,但需根据需求调整优化。
|
4月前
|
消息中间件 Java Kafka
消息传递新纪元:探索RabbitMQ、RocketMQ和Kafka的魅力所在
【8月更文挑战第29天】这段内容介绍了在分布式系统中起到异步通信与解耦作用的消息队列,并详细探讨了三种流行的消息队列产品:RabbitMQ、RocketMQ 和 Kafka。其中,RabbitMQ 是一个基于 AMQP 协议的开源消息队列系统,支持多种消息模型;RocketMQ 则是由阿里巴巴开源的具备高性能、高可用性和高可靠性的分布式消息队列,支持事务消息等多种特性;而 Kafka 作为一个由 LinkedIn 开源的分布式流处理平台,以高吞吐量和良好的可扩展性著称。此外,还提供了使用这三种消息队列发送和接收消息的代码示例。总之,这三种消息队列各有优势,适用于不同的业务场景。
75 3
|
4月前
|
消息中间件 开发者
【RabbitMQ深度解析】Topic交换器与模式匹配:掌握消息路由的艺术!
【8月更文挑战第24天】在消息队列(MQ)体系中,交换器作为核心组件之一负责消息路由。特别是`topic`类型的交换器,它通过模式匹配实现消息的精准分发,适用于发布-订阅模式。不同于直接交换器和扇形交换器,`topic`交换器支持更复杂的路由策略,通过带有通配符(如 * 和 #)的模式字符串来定义队列与交换器间的绑定关系。
81 2
|
4月前
|
消息中间件 存储 负载均衡
"RabbitMQ集群大揭秘!让你的消息传递系统秒变超级英雄,轻松应对亿级并发挑战!"
【8月更文挑战第24天】RabbitMQ是一款基于AMQP的开源消息中间件,以其高可靠性、扩展性和易用性闻名。面对高并发和大数据挑战时,可通过构建集群提升性能。本文深入探讨RabbitMQ集群配置、工作原理,并提供示例代码。集群由多个通过网络连接的节点组成,共享消息队列,确保高可用性和负载均衡。搭建集群需准备多台服务器,安装Erlang和RabbitMQ,并确保节点间通信顺畅。核心步骤包括配置.erlang.cookie文件、使用rabbitmqctl命令加入集群。消息发布至任一节点时,通过集群机制同步至其他节点;消费者可从任一节点获取消息。
55 2
|
4月前
|
消息中间件
RabbitMQ广播模式
RabbitMQ广播模式
78 1
|
4月前
|
物联网 C# Windows
看看如何使用 C# 代码让 MQTT 进行完美通信
看看如何使用 C# 代码让 MQTT 进行完美通信
650 0
|
4月前
|
Java
MQTT(EMQX) - Java 调用 MQTT Demo 代码
MQTT(EMQX) - Java 调用 MQTT Demo 代码
194 0
MQTT(EMQX) - Java 调用 MQTT Demo 代码
|
4月前
|
存储 C# 关系型数据库
“云端融合:WPF应用无缝对接Azure与AWS——从Blob存储到RDS数据库,全面解析跨平台云服务集成的最佳实践”
【8月更文挑战第31天】本文探讨了如何将Windows Presentation Foundation(WPF)应用与Microsoft Azure和Amazon Web Services(AWS)两大主流云平台无缝集成。通过具体示例代码展示了如何利用Azure Blob Storage存储非结构化数据、Azure Cosmos DB进行分布式数据库操作;同时介绍了如何借助Amazon S3实现大规模数据存储及通过Amazon RDS简化数据库管理。这不仅提升了WPF应用的可扩展性和可用性,还降低了基础设施成本。
95 0
|
4月前
|
消息中间件 应用服务中间件 网络安全
rabbitMQ镜像模式搭建
rabbitMQ镜像模式搭建
|
5月前
|
消息中间件 安全 PHP
消息队列 MQ使用问题之如何获取PHP客户端代码
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
下一篇
DataWorks