rabbitmq系列(二)几种常见模式的应用场景及实现

简介: rabbitmq系列(二)几种常见模式的应用场景及实现

一、简单模式

原理:生产者将消息交给默认的交换机,交换机获取消息后交给绑定这个生产者的队列(投递规则为队列名称和routing key 相同的队列),监听当前队列的消费者获取信息并执行消费逻辑。

场景:有一个oa系统,用户通过接收手机验证码进行注册,页面上点击获取验证码后,将验证码放到消息队列,然后短信服务从队列中获取到验证码,并发送给用户。

实现

生产者:

public class Producter {
 
    public static void main(String[] args) throws Exception {
 
        // 1. 创建出链接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        factory.setVirtualHost("/");
 
        // 2. 通过链接工厂创建链接对象
        Connection connection = factory.newConnection();
 
        // 3. 通过链接对象创建出channel
        Channel channel = connection.createChannel();
        // 4. 通过channel发布消息
        /**
         * 四个参数:
         * 第一个参数是交换机的名称
         * 第二个参数是路由键
         * 第三个参数标识消息的一些额外的属性
         * 第四个是消息的具体的内容
         */
        String message = "字节";
        for(int i = 0;i < 5;i ++){
 
            channel.basicPublish("","byte001",null,message.getBytes());
        }
        // 5. 释放资源,释放channel 和 链接对象
 
        channel.close();
        connection.close();
    }
}

消费者:

public class Consumer {
 
    public static void main(String[] args) throws Exception {
 
        // 1. 创建出链接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        factory.setVirtualHost("/");
 
        // 2. 通过链接工厂创建链接对象
        Connection connection = factory.newConnection();
 
        // 3. 通过链接对象创建出channel
        Channel channel = connection.createChannel();
 
        // 4. 创建出消息队列
        /**
         * 第一个参数是消息队列的名称
         * 第二个参数表示消息是否持久化
         * 第三个参数标识消息队列是否被channel独占
         * 第四个参数标识是否自动删除消息队列,当消息队列没有绑定交换机后是否自动删除
         * 第五个是消息队列扩展参数
         */
        String queueName = "byte001";
        channel.queueDeclare(queueName, true, false, false, null);
 
        // 5. 创建消费者,对消息进行处理
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            /**
             * consumerTag 用来标识.可以再监听队列时候设置
             * envelope 信封,通过envelope可以通过这个获取到很多东西
             * properties 额外的消息属性
             * body:消息体
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
 
                String s = new String(body, "UTF-8");
                System.out.println("获取到的消息:"+s);
            }
        };
        // 6. 通过channel消费者和消息队列关联
        /**
         * 第一个参数是消息队列的名字
         * 第二个参数是否自动签收(即消费消息后告知服务器已被消费)
         * 第三个参数是消费者
         */
        channel.basicConsume(queueName, true, consumer);
    }
}

二、工作模式

原理:生产者将消息交给交换机,交换机交给绑定的队列,队列有多个消费者监听,一条消息只能由一个消费者消费,这样就形成了资源竞争,谁的资源空闲大,争抢到的可能性就大。

场景:有一个电商平台,有两个订单服务,用户下单的时候,任意一个订单服务消费用户的下单请求生成订单即可。不用两个订单服务同时消费用户的下单请求。

实现

生产者:

public class Producter {
 
    public static final String QUEUE_NAME = "byte002";
    public static void main(String[] args) throws Exception {
 
        // 1. 创建出链接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        factory.setVirtualHost("/");
 
        // 2. 通过链接工厂创建链接对象
        Connection connection = factory.newConnection();
 
        // 3. 通过链接对象创建出channel
        Channel channel = connection.createChannel();
 
        // 申明队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        // 4. 通过channel发布消息
        /**
         * 四个参数:
         * 第一个参数是交换机的名称
         * 第二个参数是路由键
         * 第三个参数标识消息的一些额外的属性
         * 第四个是消息的具体的内容
         */
        String message = "字节";
        for(int i = 0;i < 100;i ++){
 
            channel.basicPublish("",QUEUE_NAME,null,(message+i).getBytes());
        }
        // 5. 释放资源,释放channel 和 链接对象
 
        channel.close();
        connection.close();
    }
}

消费者1:

public class Consumer {
 
    public static final String QUEUE_NAME = "byte002";
    public static void main(String[] args) throws Exception {
 
        // 1. 创建出链接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        factory.setVirtualHost("/");
 
        // 2. 通过链接工厂创建链接对象
        Connection connection = factory.newConnection();
 
        // 3. 通过链接对象创建出channel
        Channel channel = connection.createChannel();
 
        // 4. 创建出消息队列
        /**
         * 第一个参数是消息队列的名称
         * 第二个参数表示消息是否持久化
         * 第三个参数标识消息队列是否被channel独占
         * 第四个参数标识是否自动删除消息队列,当消息队列没有绑定交换机后是否自动删除
         * 第五个是消息队列扩展参数
         */
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        channel.basicQos(1); // 告诉服务器,在我们没有确认当前消息时不要给我们发送新的消息
        // 5. 创建消费者,对消息进行处理
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            /**
             * consumerTag 用来标识.可以再监听队列时候设置
             * envelope 信封,通过envelope可以通过这个获取到很多东西
             * properties 额外的消息属性
             * body:消息体
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
 
                String s = new String(body, "UTF-8");
                System.out.println("消费者1收到的内容:"+s);
                try {
                    Thread.sleep(10); // 模拟消费耗时
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                channel.basicAck(envelope.getDeliveryTag(),false); // 参数2false为确认收到消息,true为拒绝收到消息
            }
        };
        // 6. 通过channel消费者和消息队列关联
        /**
         * 第一个参数是消息队列的名字
         * 第二个参数是否自动签收(即消费消息后告知服务器已被消费)
         * 第三个参数是消费者
         */
        channel.basicConsume(QUEUE_NAME, false, consumer);
    }
}

消费者2:

public class Consumer2 {
 
    public static final String QUEUE_NAME = "byte002";
    public static void main(String[] args) throws Exception {
 
        // 1. 创建出链接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        factory.setVirtualHost("/");
 
        // 2. 通过链接工厂创建链接对象
        Connection connection = factory.newConnection();
 
        // 3. 通过链接对象创建出channel
        Channel channel = connection.createChannel();
 
        // 4. 创建出消息队列
        /**
         * 第一个参数是消息队列的名称
         * 第二个参数表示消息是否持久化
         * 第三个参数标识消息队列是否被channel独占
         * 第四个参数标识是否自动删除消息队列,当消息队列没有绑定交换机后是否自动删除
         * 第五个是消息队列扩展参数
         */
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        channel.basicQos(1); // 告诉服务器,在我们没有确认当前消息时不要给我们发送新的消息
        // 5. 创建消费者,对消息进行处理
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            /**
             * consumerTag 用来标识.可以再监听队列时候设置
             * envelope 信封,通过envelope可以通过这个获取到很多东西
             * properties 额外的消息属性
             * body:消息体
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
 
                String s = new String(body, "UTF-8");
                System.out.println("消费者2收到的内容:"+s);
                try {
                    Thread.sleep(500); // 模拟消费耗时
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                channel.basicAck(envelope.getDeliveryTag(),false); // 参数2false为确认收到消息,true为拒绝收到消息
            }
        };
        // 6. 通过channel消费者和消息队列关联
        /**
         * 第一个参数是消息队列的名字
         * 第二个参数是否自动签收(即消费消息后告知服务器已被消费)
         * 第三个参数是消费者
         */
        channel.basicConsume(QUEUE_NAME, false, consumer);
    }
}

保证资源竞争的代码就是这一行channel.basicQos(1);如果不加这一行,我们会发现两个消费者是轮询消费消息的。

三、发布订阅模式

原理:生产者将消息扔给交换机,交换机类型是fanout,不同的队列注册到交换机上,不同的消费注册在不同的队列上。所有消费者都会收到消息。

场景:有一个商城,我们新添加一个商品后,可能同时需要去更新缓存和数据库

实现:

生产者:

public class Producter {
 
    // 定义交换机的名字
    public static final String EXCHANGE_NAME="byte003";
    public static void main(String[] args) throws Exception {
 
        // 1. 创建出链接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        factory.setVirtualHost("/");
 
        // 2. 通过链接工厂创建链接对象
        Connection connection = factory.newConnection();
 
        // 3. 通过链接对象创建出channel
        Channel channel = connection.createChannel();
        // 定义一个交换机,类型是fanout
        channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
 
        // 因为消息先发到交换机,交换机没有保存功能,所以如果没有消费者,消息会丢失
        channel.basicPublish(EXCHANGE_NAME,"",null,"发布订阅模式的消息".getBytes());
 
        channel.close();
        connection.close();
    }
}

消费者1:

public class Consumer1 {
    public static final String EXCHANGE_NAME="byte003";
 
    public static void main(String[] args) throws Exception {
 
 
        // 1. 创建出链接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        factory.setVirtualHost("/");
 
        // 2. 通过链接工厂创建链接对象
        Connection connection = factory.newConnection();
 
        // 3. 通过链接对象创建出channel
        Channel channel = connection.createChannel();
 
        String queueName = "queue003";
        channel.queueDeclare(queueName,false,false,false,null);
 
        // 绑定队列到交换机
        channel.queueBind(queueName,EXCHANGE_NAME,"");
        channel.basicQos(1);
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            /**
             * consumerTag 用来标识.可以再监听队列时候设置
             * envelope 信封,通过envelope可以通过这个获取到很多东西
             * properties 额外的消息属性
             * body:消息体
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
 
                String s = new String(body, "UTF-8");
                System.out.println("消费者1:"+s);
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        channel.basicConsume(queueName,false,consumer);
    }
}

消费者2:

public class Consumer2 {
    public static final String EXCHANGE_NAME="byte003";
 
    public static void main(String[] args) throws Exception {
 
 
        // 1. 创建出链接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        factory.setVirtualHost("/");
 
        // 2. 通过链接工厂创建链接对象
        Connection connection = factory.newConnection();
 
        // 3. 通过链接对象创建出channel
        Channel channel = connection.createChannel();
 
        String queueName = "queue004";
        channel.queueDeclare(queueName,false,false,false,null);
 
        // 绑定队列到交换机
        channel.queueBind(queueName,EXCHANGE_NAME,"");
        channel.basicQos(1);
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            /**
             * consumerTag 用来标识.可以再监听队列时候设置
             * envelope 信封,通过envelope可以通过这个获取到很多东西
             * properties 额外的消息属性
             * body:消息体
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
 
                String s = new String(body, "UTF-8");
                System.out.println("消费者2:"+s);
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        channel.basicConsume(queueName,false,consumer);
    }
}

需要注意的一点就是交换机没有保存功能,如果没有消费者,则消息会丢失。

四、路由模式

原理:生产者将消息发送给交换机,消息携带具体的routingkey。交换机类型是direct,接收到消息中的routingkey,比对与之绑定的队列的routingkey,分发到不同的队列上。

场景:还是一样,有一个商城,新添加了一个商品,实时性不是很高,只需要添加到数据库即可,不用刷新缓存。

实现

生产者:

public class Producter {
 
    // 定义交换机的名字
    public static final String EXCHANGE_NAME="byte004";
    public static void main(String[] args) throws Exception {
 
        // 1. 创建出链接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        factory.setVirtualHost("/");
 
        // 2. 通过链接工厂创建链接对象
        Connection connection = factory.newConnection();
 
        // 3. 通过链接对象创建出channel
        Channel channel = connection.createChannel();
        // 定义一个交换机,类型是direct
        channel.exchangeDeclare(EXCHANGE_NAME,"direct");
 
        // 因为消息先发到交换机,交换机没有保存功能,所以如果没有消费者,消息会丢失
        channel.basicPublish(EXCHANGE_NAME,"key1",null,"发布路由模式的消息".getBytes());
 
        channel.close();
        connection.close();
    }
}

消费者1:

public class Consumer1 {
    public static final String EXCHANGE_NAME="byte004";
 
    public static void main(String[] args) throws Exception {
 
 
        // 1. 创建出链接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        factory.setVirtualHost("/");
 
        // 2. 通过链接工厂创建链接对象
        Connection connection = factory.newConnection();
 
        // 3. 通过链接对象创建出channel
        Channel channel = connection.createChannel();
 
        String queueName = "queue005";
        channel.queueDeclare(queueName,false,false,false,null);
 
        // 绑定队列到交换机
        /**
         * 参数3是routingkey,只有和它一样的routingkey的消息才会被当前消费者收到
         */
        channel.queueBind(queueName,EXCHANGE_NAME,"key1");
        // 如果要接收多个routingkey的消息,在执行一次上面的代码即可,如下
        channel.queueBind(queueName,EXCHANGE_NAME,"key2");
        channel.basicQos(1);
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            /**
             * consumerTag 用来标识.可以再监听队列时候设置
             * envelope 信封,通过envelope可以通过这个获取到很多东西
             * properties 额外的消息属性
             * body:消息体
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
 
                String s = new String(body, "UTF-8");
                System.out.println("消费者1:"+s);
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        channel.basicConsume(queueName,false,consumer);
    }
}

消费者2:

public class Consumer2 {
    public static final String EXCHANGE_NAME="byte004";
 
    public static void main(String[] args) throws Exception {
 
 
        // 1. 创建出链接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        factory.setVirtualHost("/");
 
        // 2. 通过链接工厂创建链接对象
        Connection connection = factory.newConnection();
 
        // 3. 通过链接对象创建出channel
        Channel channel = connection.createChannel();
 
        String queueName = "queue006";
        channel.queueDeclare(queueName,false,false,false,null);
 
        // 绑定队列到交换机
        channel.queueBind(queueName,EXCHANGE_NAME,"key2");
        channel.basicQos(1);
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            /**
             * consumerTag 用来标识.可以再监听队列时候设置
             * envelope 信封,通过envelope可以通过这个获取到很多东西
             * properties 额外的消息属性
             * body:消息体
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
 
                String s = new String(body, "UTF-8");
                System.out.println("消费者2:"+s);
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        channel.basicConsume(queueName,false,consumer);
    }
}

五、主题模式

原理:路由模式的一种,路由功能添加了模糊匹配。星号(*)代表1个单词,#号(#)代表一个或多个单词。具体可参考路由模式。

场景:还是一样,有一个商城,新添加了一个商品,实时性不是很高,只需要添加到数据库即可,数据库包含了主数据库mysql1和从数据库mysql2的内容,不用刷新缓存。

实现

生产者:

public class Producter {
 
    // 定义交换机的名字
    public static final String EXCHANGE_NAME="byte004";
    public static void main(String[] args) throws Exception {
 
        // 1. 创建出链接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        factory.setVirtualHost("/");
 
        // 2. 通过链接工厂创建链接对象
        Connection connection = factory.newConnection();
 
        // 3. 通过链接对象创建出channel
        Channel channel = connection.createChannel();
        // 定义一个交换机,类型是topic
        channel.exchangeDeclare(EXCHANGE_NAME,"topic");
 
        // 因为消息先发到交换机,交换机没有保存功能,所以如果没有消费者,消息会丢失
        channel.basicPublish(EXCHANGE_NAME,"key.1.2",null,"发布路由模式的消息".getBytes());
 
        channel.close();
        connection.close();
    }
}

消费者1:

public class Consumer1 {
    public static final String EXCHANGE_NAME="byte004";
 
    public static void main(String[] args) throws Exception {
 
 
        // 1. 创建出链接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        factory.setVirtualHost("/");
 
        // 2. 通过链接工厂创建链接对象
        Connection connection = factory.newConnection();
 
        // 3. 通过链接对象创建出channel
        Channel channel = connection.createChannel();
 
        String queueName = "queue005";
        channel.queueDeclare(queueName,false,false,false,null);
 
        // 绑定队列到交换机
        /**
         * 参数3是routingkey,只有和它一样的routingkey的消息才会被当前消费者收到
         */
        channel.queueBind(queueName,EXCHANGE_NAME,"key.*");
        // 如果要接收多个routingkey的消息,在执行一次上面的代码即可,如下
        channel.queueBind(queueName,EXCHANGE_NAME,"abc.#");
        channel.basicQos(1);
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            /**
             * consumerTag 用来标识.可以再监听队列时候设置
             * envelope 信封,通过envelope可以通过这个获取到很多东西
             * properties 额外的消息属性
             * body:消息体
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
 
                String s = new String(body, "UTF-8");
                System.out.println("消费者1:"+s);
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        channel.basicConsume(queueName,false,consumer);
    }
}

消费者2:

public class Consumer2 {
    public static final String EXCHANGE_NAME="byte004";
 
    public static void main(String[] args) throws Exception {
 
 
        // 1. 创建出链接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        factory.setVirtualHost("/");
 
        // 2. 通过链接工厂创建链接对象
        Connection connection = factory.newConnection();
 
        // 3. 通过链接对象创建出channel
        Channel channel = connection.createChannel();
 
        String queueName = "queue006";
        channel.queueDeclare(queueName,false,false,false,null);
 
        // 绑定队列到交换机
        channel.queueBind(queueName,EXCHANGE_NAME,"key.#");
        channel.basicQos(1);
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            /**
             * consumerTag 用来标识.可以再监听队列时候设置
             * envelope 信封,通过envelope可以通过这个获取到很多东西
             * properties 额外的消息属性
             * body:消息体
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
 
                String s = new String(body, "UTF-8");
                System.out.println("消费者2:"+s);
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        channel.basicConsume(queueName,false,consumer);
    }
}

代码已上传:

github地址: https://github.com/binzh303/zhixie-code-example

gitee地址: https://gitee.com/javaXiaoCaiJi/zhixie-code-example

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
4月前
|
消息中间件
RabbitMQ的 RPC 消息模式你会了吗?
【9月更文挑战第11天】RabbitMQ 的 RPC(远程过程调用)消息模式允许客户端向服务器发送请求并接收响应。其基本原理包括:1) 客户端发送请求,创建回调队列并设置关联标识符;2) 服务器接收请求并发送响应至回调队列;3) 客户端根据关联标识符接收并匹配响应。实现步骤涵盖客户端和服务器的连接、信道创建及请求处理。注意事项包括关联标识符唯一性、回调队列管理、错误处理及性能考虑。RPC 模式适用于构建可靠的分布式应用程序,但需根据需求调整优化。
|
12天前
|
消息中间件 网络协议 RocketMQ
RocketMQ Controller 模式 始终更新成本机ip
ontrollerAddr=192.168.24.241:8878 但是日志输出Update controller leader address to 127.0.0.1:8878。导致访问失败
43 3
|
18天前
|
消息中间件 存储 监控
说说MQ在你项目中的应用(一)
本文总结了消息队列(MQ)在项目中的应用,主要围绕异步处理、系统解耦和流量削峰三大功能展开。通过分析短信通知和业务日志两个典型场景,介绍了MQ的实现方式及其优势。短信通知中,MQ用于异步发送短信并处理状态更新;业务日志中,Kafka作为高吞吐量的消息系统,负责收集和传输系统及用户行为日志,确保数据的可靠性和高效处理。MQ不仅提高了系统的灵活性和响应速度,还提供了重试机制和状态追踪等功能,保障了业务的稳定运行。
58 6
|
18天前
|
消息中间件 存储 中间件
说说MQ在你项目中的应用(二)商品支付
本文总结了消息队列(MQ)在支付订单业务中的应用,重点分析了RabbitMQ的优势。通过异步处理、系统解耦和流量削峰等功能,RabbitMQ确保了支付流程的高效与稳定。具体场景包括用户下单、支付请求、商品生产和物流配送等环节。相比Kafka,RabbitMQ在低吞吐量、高实时性需求下表现更优,提供了更低延迟和更高的可靠性。
30 0
|
2月前
|
消息中间件 存储 Apache
探索 RocketMQ:企业级消息中间件的选择与应用
RocketMQ 是一个高性能、高可靠、可扩展的分布式消息中间件,它是由阿里巴巴开发并贡献给 Apache 软件基金会的一个开源项目。RocketMQ 主要用于处理大规模、高吞吐量、低延迟的消息传递,它是一个轻量级的、功能强大的消息队列系统,广泛应用于金融、电商、日志系统、数据分析等领域。
121 0
探索 RocketMQ:企业级消息中间件的选择与应用
|
5月前
|
消息中间件 开发者
【RabbitMQ深度解析】Topic交换器与模式匹配:掌握消息路由的艺术!
【8月更文挑战第24天】在消息队列(MQ)体系中,交换器作为核心组件之一负责消息路由。特别是`topic`类型的交换器,它通过模式匹配实现消息的精准分发,适用于发布-订阅模式。不同于直接交换器和扇形交换器,`topic`交换器支持更复杂的路由策略,通过带有通配符(如 * 和 #)的模式字符串来定义队列与交换器间的绑定关系。
88 2
|
5月前
|
消息中间件
RabbitMQ广播模式
RabbitMQ广播模式
88 1
|
5月前
|
消息中间件 应用服务中间件 网络安全
rabbitMQ镜像模式搭建
rabbitMQ镜像模式搭建
|
5月前
|
消息中间件 开发工具
【Azure Event Hub】原生应用中使用RabbitMQ,是否可以不改动代码的情况下直接转换为使用Event Hub呢?
【Azure Event Hub】原生应用中使用RabbitMQ,是否可以不改动代码的情况下直接转换为使用Event Hub呢?
|
6月前
|
消息中间件 传感器 负载均衡
消息队列 MQ使用问题之如何配置一主一从的同步复制模式
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
消息队列 MQ使用问题之如何配置一主一从的同步复制模式