rabbitMq 直连模式实现多消费者轮询消费(一对多)

简介: 今天通过rabbitMq的直连模式(direct)来实现多个消费者对消息的轮询读取前提是已经安装了rabbitMq!话不多说,步入正题:首先创建一个连接mq的服务器:

今天通过rabbitMq的直连模式(direct)来实现多个消费者对消息的轮询读取

前提是已经安装了rabbitMq!

话不多说,步入正题:

首先创建一个连接mq的服务器:

ConnectionUtil :

public class ConnectionUtil {

    public static Connection getConnection()throws Exception{
        //创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();

        //配置参数
        connectionFactory.setHost("localhost");//主机IP地址
        connectionFactory.setPort(5672);//端口
        connectionFactory.setVirtualHost("/");//虚拟机,默认/
        connectionFactory.setUsername("test");//用户名
        connectionFactory.setPassword("test");//密码

        //通过工厂创建连接
        Connection connection = connectionFactory.newConnection();
        return connection;
    }
}

生产者:Producer

public class Producer {

    static final String QUEUE_NAME = "work_queue";//队列名


    public static void main(String[] args) throws Exception {

        //获取连接
        Connection connection = ConnectionUtil.getConnection();

        //创建频道
        Channel channel = connection.createChannel();
        channel.queueBind(QUEUE_NAME, "amq.direct", "message_ttl_routingKey");


        for (int i = 0; i < 10; i++) {
            String message = i + "队列消息~~~";
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        }

        //关闭频道
        channel.close();
        //关闭连接
        connection.close();
    }
}

消费者:Consumer

public class Consumer {

    static final String QUEUE_NAME="work_queue";


    public static void main(String[] args) throws Exception {
        //获取连接
        Connection connection = ConnectionUtil.getConnection();

        //使用连接创建频道
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME,true,false,false,null);

        //接收消息(消费端监听)
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                long deliveryTag = envelope.getDeliveryTag();
                System.out.println("deliverTag: "+deliveryTag);
                System.out.println("consumerTag:"+consumerTag);

                System.out.println("Exchange:"+envelope.getExchange());
                System.out.println("RoutingKey:"+envelope.getRoutingKey());
                System.out.println("properties:"+properties);
                System.out.println("body:"+new String(body));
            }
        };

        channel.basicConsume(QUEUE_NAME,true,defaultConsumer);
    }
}

消费者2:Consumer2

public class Consumer2 {
    static final String QUEUE_NAME="work_queue";


    public static void main(String[] args) throws Exception {
        //获取连接
        Connection connection = ConnectionUtil.getConnection();

        //使用连接创建频道
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME,true,false,false,null);

        //接收消息(消费端监听)
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                long deliveryTag = envelope.getDeliveryTag();
                System.out.println("deliverTag: "+deliveryTag);

                System.out.println("consumerTag:"+consumerTag);
                System.out.println("Exchange:"+envelope.getExchange());
                System.out.println("RoutingKey:"+envelope.getRoutingKey());
                System.out.println("properties:"+properties);
                System.out.println("body:"+new String(body));
            }
        };

        channel.basicConsume(QUEUE_NAME,true,defaultConsumer);
    }
}

然后启动消费者,再启动生产者:结果如下:

消费者1:

在这里插入图片描述

消费者2:

在这里插入图片描述

相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
3天前
|
消息中间件
RabbitMQ配置单活模式队列
RabbitMQ配置单活模式队列
9 0
|
7天前
|
消息中间件 缓存 数据库
rabbitmq系列(二)几种常见模式的应用场景及实现
rabbitmq系列(二)几种常见模式的应用场景及实现
|
18天前
|
消息中间件 Apache C语言
消息队列 MQ产品使用合集之在Cluster部署模式下,使用dashboard无法查询到消费组信息,一般是什么导致的
阿里云消息队列MQ(Message Queue)是一种高可用、高性能的消息中间件服务,它允许您在分布式应用的不同组件之间异步传递消息,从而实现系统解耦、流量削峰填谷以及提高系统的可扩展性和灵活性。以下是使用阿里云消息队列MQ产品的关键点和最佳实践合集。
|
19天前
|
消息中间件 负载均衡 Apache
消息队列 MQ产品使用合集之是否支持Master/Slave模式进行部署?
阿里云消息队列MQ(Message Queue)是一种高可用、高性能的消息中间件服务,它允许您在分布式应用的不同组件之间异步传递消息,从而实现系统解耦、流量削峰填谷以及提高系统的可扩展性和灵活性。以下是使用阿里云消息队列MQ产品的关键点和最佳实践合集。
|
19天前
|
消息中间件 存储 负载均衡
消息队列 MQ产品使用合集之POP消费模式是否可以保证消息顺序性
阿里云消息队列MQ(Message Queue)是一种高可用、高性能的消息中间件服务,它允许您在分布式应用的不同组件之间异步传递消息,从而实现系统解耦、流量削峰填谷以及提高系统的可扩展性和灵活性。以下是使用阿里云消息队列MQ产品的关键点和最佳实践合集。
|
1月前
|
消息中间件 Java RocketMQ
MQ产品使用合集之在同一个 Java 进程内建立三个消费对象并设置三个消费者组订阅同一主题和标签的情况下,是否会发生其中一个消费者组无法接收到消息的现象
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
1月前
|
消息中间件 Java API
MQ产品使用合集之RocketMQ dledger集群模式的dledgerpeers端口是集群之间通讯吗
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
1月前
|
消息中间件 供应链 Java
RabbitMQ入门指南(九):消费者可靠性
RabbitMQ是一个高效、可靠的开源消息队列系统,广泛用于软件开发、数据传输、微服务等领域。本文主要介绍了消费者确认机制、失败重试机制、失败处理策略、业务幂等性等内容。
79 0
RabbitMQ入门指南(九):消费者可靠性
|
1月前
|
传感器 监控 网络协议
MQTT 发布、订阅模式介绍
【2月更文挑战第17天】
189 6
MQTT 发布、订阅模式介绍
|
1月前
|
消息中间件 Java 调度
【深度挖掘RocketMQ底层源码】「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行调度的流程(Pull模式)
【深度挖掘RocketMQ底层源码】「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行调度的流程(Pull模式)
27 1

热门文章

最新文章