消息中间件系列教程(08) -RabbitMQ -案例代码(工作队列模式)

简介: 消息中间件系列教程(08) -RabbitMQ -案例代码(工作队列模式)

引言

代码已上传至Github,有兴趣的同学可以下载看看:https://github.com/ylw-github/RabbitMQ-Demo

前面博客讲解了RabbitMQ的五种队列形式《消息中间件系列教程(06) -RabbitMQ -五种队列形式》,主要讲解一下五种队列的代码实现。

主要分为:

  1. 点对点队列模式(简单)
  2. 工作队列模式(公平性)
  3. 发布订阅模式
  4. 路由模式Routing
  5. 通配符模式Topics

本文主要讲解工作队列模式。

工作队列模式

1.新建Maven项目RabbitMQ-Demo

2.添加Maven依赖:

<dependencies>
  <dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>3.6.5</version>
  </dependency>
</dependencies>

3.连接工具类

package com.ylw.rabbitmq;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class RabbitMQConnecUtils {
    public static Connection newConnection() throws IOException, TimeoutException {
        // 1.定义连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 2.设置服务器地址
        factory.setHost("127.0.0.1");
        // 3.设置协议端口号
        factory.setPort(5672);
        // 4.设置vhost
        factory.setVirtualHost("OrderHost");
        // 5.设置用户名称
        factory.setUsername("OrderAdmin");
        // 6.设置用户密码
        factory.setPassword("123456");
        // 7.创建新的连接
        Connection newConnection = factory.newConnection();
        return newConnection;
    }
}

1. 生产者

public class Producer {
    private static final String QUEUE_NAME = "add_order_work_queue";
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.获取连接
        Connection newConnection = RabbitMQConnecUtils.newConnection();
        // 2.创建通道
        Channel channel = newConnection.createChannel();
        // 3.创建队列声明
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        channel.basicQos(1);// 保证一次只分发一次,限制发送给同一个消费者,不得超过一条消息
        for (int i = 1; i <= 10; i++) {
            String msg = "index=" + i;
            System.out.println("生产者发送消息 -> " + msg);
            // 4.发送消息
            channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
        }
        channel.close();
        newConnection.close();
    }
}

2. 消费者

消费者1:

public class Consumer1 {
    private static final String QUEUE_NAME = "add_order_work_queue";
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.获取连接
        Connection newConnection = RabbitMQConnecUtils.newConnection();
        // 2.获取通道
        final Channel channel = newConnection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        channel.basicQos(1);// 保证一次只分发一次 限制发送给同一个消费者 不得超过一条消息
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String msgString = new String(body, "UTF-8");
                System.out.println("消费者1获取消息:" + msgString);
                // 手动回执消息
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        // 3.监听队列
        channel.basicConsume(QUEUE_NAME, false, defaultConsumer);
    }
}

消费者2:

public class Consumer2 {
    private static final String QUEUE_NAME = "add_order_work_queue";
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.获取连接
        Connection newConnection = RabbitMQConnecUtils.newConnection();
        // 2.获取通道
        final Channel channel = newConnection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        channel.basicQos(1);// 保证一次只分发一次 限制发送给同一个消费者 不得超过一条消息
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String msgString = new String(body, "UTF-8");
                System.out.println("消费者获取消息:" + msgString);
                try {
                    Thread.sleep(1000);
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    // 手动回执消息
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };
        // 3.监听队列
        channel.basicConsume(QUEUE_NAME, false, defaultConsumer);
    }
}

3. 测试

启动消费者1和消费者2,在RabbitMQ控制台可以看到,消息已经注册到了队列了:

然后开始生产,启动生产者:

可以看到Consumer1比Comsumer2 消费多9个,因为Consumer延时了2:

这就是典型的工作队列模式,多劳多得。

相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
15天前
|
消息中间件 Java API
RabbitMQ入门指南(五):Java声明队列、交换机以及绑定
RabbitMQ是一个高效、可靠的开源消息队列系统,广泛用于软件开发、数据传输、微服务等领域。本文主要介绍了Java声明队列、交换机以及绑定队列和交换机等内容。
25 0
|
23天前
|
消息中间件 Java Maven
springboot 使用注解的方式创建rabbitmq的交换机、路由key、以及监听队列的名称
springboot 使用注解的方式创建rabbitmq的交换机、路由key、以及监听队列的名称
|
2月前
|
消息中间件 Java 调度
【深度挖掘RocketMQ底层源码】「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行调度的流程(Pull模式)
【深度挖掘RocketMQ底层源码】「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行调度的流程(Pull模式)
12 1
|
2月前
|
消息中间件 Java RocketMQ
【深度挖掘 RocketMQ底层源码】「底层源码挖掘系列」抽丝剥茧贯穿RocketMQ的消费者端的运行核心的流程(Pull模式-下)
【深度挖掘 RocketMQ底层源码】「底层源码挖掘系列」抽丝剥茧贯穿RocketMQ的消费者端的运行核心的流程(Pull模式-下)
12 1
|
2月前
|
消息中间件 存储 NoSQL
【深度挖掘 RocketMQ底层源码】「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行核心的流程(Pull模式-上)
【深度挖掘 RocketMQ底层源码】「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行核心的流程(Pull模式-上)
27 1
|
2月前
|
消息中间件 前端开发 算法
【十七】RabbitMQ基础篇(延迟队列和死信队列实战)
【十七】RabbitMQ基础篇(延迟队列和死信队列实战)
42 1
|
2月前
|
传感器 监控 网络协议
MQTT 发布、订阅模式介绍
【2月更文挑战第17天】
73 6
MQTT 发布、订阅模式介绍
|
9月前
|
NoSQL Java Redis
阿里Java高级岗中间件二面:GC+IO+JVM+多线程+Redis+数据库+源码
虽然“钱多、事少、离家近”的工作可能离技术人比较远,但是找到一份合适的工作,其实并不像想象中那么难。但是,有些技术人确实是认真努力工作,但在面试时表现出的能力水平却不足以通过面试,或拿到高薪,其实不外乎以下 2 个原因:
|
9月前
|
算法 NoSQL Java
2023年阿里高频Java面试题:分布式+中间件+高并发+算法+数据库
又到了一年一度的金九银十,互联网行业竞争是一年比一年严峻,作为工程师的我们唯有不停地学习,不断的提升自己才能保证自己的核心竞争力从而拿到更好的薪水,进入心仪的企业(阿里、字节、美团、腾讯.....)
|
9月前
|
算法 NoSQL Java
2021年阿里高频Java面试题:分布式+中间件+高并发+算法+数据库
又到了一年一度的金九银十,互联网行业竞争是一年比一年严峻,作为工程师的我们唯有不停地学习,不断的提升自己才能保证自己的核心竞争力从而拿到更好的薪水,进入心仪的企业(阿里、字节、美团、腾讯.....)