RabbitMQ实例教程:用Java搞定工作队列

简介:

 在上一节中,我们学会了使用编程的方式发送和接收一个命名好的队列。本节中我们将会使用工作队列在多个工作者之间分发任务。


  工作队列的核心思想是避免立即处理高密集度必须等待完成的任务。它采用了安排任务的方式,将一个任务封装成一个消息把它放进队列。在后台运行的工作进程到时候会将它弹出并执行,这样任务队列中的任务就会被工作进程共享执行。


  工作队列适用于Web应用中在一个短的HTTP请求中处理复杂任务的场景。


  在上节中,我们发送了一个“Hello World!”字符串消息。现在发送多个字符串消息表示复杂任务。我们现在像图片重置大小,渲染PDF文件这样的真实任务,但我们使用 Thread.sleep() 假装正在我们忙。我们将字符串中的点的数量作为其复杂性;每个点都占1秒钟“工作”。例如,一个包含“...”这样的假任务就会需要三秒钟。


wKioL1YXaKyzqoSBAAHnTX0wnj0610.jpg


  NewTask.java


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package  com.favccxx.favrabbit;
 
import  com.rabbitmq.client.Channel;
import  com.rabbitmq.client.Connection;
import  com.rabbitmq.client.ConnectionFactory;
import  com.rabbitmq.client.MessageProperties;
 
public  class  NewTask {
 
 
       private  static  final  String TASK_QUEUE_NAME =  "task_queue" ;
 
       public  static  void  main(String[] argv)  throws  Exception {
         ConnectionFactory factory =  new  ConnectionFactory();
         factory.setHost( "localhost" );
         Connection connection = factory.newConnection();
         Channel channel = connection.createChannel();
 
         channel.queueDeclare(TASK_QUEUE_NAME,  true false false null );
 
         String[] args = { "Shuai Ge" , "ai" , "MeiNv" , "..." };
         String message = getMessage(args);
 
         channel.basicPublish( "" , TASK_QUEUE_NAME,
             MessageProperties.PERSISTENT_TEXT_PLAIN,
             message.getBytes( "UTF-8" ));
         System.out.println( " [x] Sent '"  + message +  "'" );
         
         for ( int  i= 0 ;i< 10 ;i++){
             channel.basicPublish( "" , TASK_QUEUE_NAME,
                     MessageProperties.PERSISTENT_TEXT_PLAIN,
                     (message+i).getBytes( "UTF-8" ));
             System.out.println( "Sent Message:"  + message+i);
         }
 
         channel.close();
         connection.close();
       }
 
       private  static  String getMessage(String[] strings) {
         if  (strings.length <  1 )
           return  "Hello World!" ;
         return  joinStrings(strings,  " " );
       }
 
       private  static  String joinStrings(String[] strings, String delimiter) {
         int  length = strings.length;
         if  (length ==  0 return  "" ;
         StringBuilder words =  new  StringBuilder(strings[ 0 ]);
         for  ( int  i =  1 ; i < length; i++) {
           words.append(delimiter).append(strings[i]);
         }
         return  words.toString();
       }
}


  控制台输出


 [x] Sent 'Shuai Ge ai MeiNv ...'

Sent Message:Shuai Ge ai MeiNv ...0

Sent Message:Shuai Ge ai MeiNv ...1

Sent Message:Shuai Ge ai MeiNv ...2

Sent Message:Shuai Ge ai MeiNv ...3

Sent Message:Shuai Ge ai MeiNv ...4

Sent Message:Shuai Ge ai MeiNv ...5

Sent Message:Shuai Ge ai MeiNv ...6

Sent Message:Shuai Ge ai MeiNv ...7

Sent Message:Shuai Ge ai MeiNv ...8

Sent Message:Shuai Ge ai MeiNv ...9


  Worker.java


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
package  com.favccxx.favrabbit;
 
import  java.io.IOException;
import  java.text.DateFormat;
import  java.text.SimpleDateFormat;
import  java.util.Date;
 
import  com.rabbitmq.client.AMQP;
import  com.rabbitmq.client.Channel;
import  com.rabbitmq.client.Connection;
import  com.rabbitmq.client.ConnectionFactory;
import  com.rabbitmq.client.Consumer;
import  com.rabbitmq.client.DefaultConsumer;
import  com.rabbitmq.client.Envelope;
 
public  class  Worker {
 
     private  static  final  String TASK_QUEUE_NAME =  "task_queue" ;
     
     private  static  DateFormat df =  new  SimpleDateFormat( "yyyy-MM-dd HH:mm:ss" );
 
     public  static  void  main(String[] argv)  throws  Exception {
         ConnectionFactory factory =  new  ConnectionFactory();
         factory.setHost( "localhost" );
         final  Connection connection = factory.newConnection();
         final  Channel channel = connection.createChannel();
 
         channel.queueDeclare(TASK_QUEUE_NAME,  true false false null );
         System.out.println( " [*] Waiting for messages. To exit press CTRL+C" );
 
         channel.basicQos( 1 );
 
         final  Consumer consumer =  new  DefaultConsumer(channel) {
             @Override
             public  void  handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                     byte [] body)  throws  IOException {
                 String message =  new  String(body,  "UTF-8" );
 
                 System.out.println(df.format( new  Date()) +  " [x] Received '"  + message +  "'" );
                 try  {
                     doWork(message);
                 finally  {
                     System.out.println( " [x] Done" );
                     channel.basicAck(envelope.getDeliveryTag(),  false );
                 }
             }
         };
         channel.basicConsume(TASK_QUEUE_NAME,  false , consumer);
     }
 
     private  static  void  doWork(String task) {
         for  ( char  ch : task.toCharArray()) {
             if  (ch ==  '.' ) {
                 try  {
                     Thread.sleep( 1000 );
                 catch  (InterruptedException _ignored) {
                     Thread.currentThread().interrupt();
                 }
             }
         }
     }
 
}


  控制台输出


 [*] Waiting for messages. To exit press CTRL+C

2015-10-08 15:41:36 [x] Received 'Shuai Ge ai MeiNv ...'

 [x] Done

2015-10-08 15:41:39 [x] Received 'Shuai Ge ai MeiNv ...0'

 [x] Done

2015-10-08 15:41:42 [x] Received 'Shuai Ge ai MeiNv ...1'

 [x] Done

2015-10-08 15:41:45 [x] Received 'Shuai Ge ai MeiNv ...2'

 [x] Done

2015-10-08 15:41:48 [x] Received 'Shuai Ge ai MeiNv ...3'

 [x] Done

2015-10-08 15:41:51 [x] Received 'Shuai Ge ai MeiNv ...4'

 [x] Done

2015-10-08 15:41:54 [x] Received 'Shuai Ge ai MeiNv ...5'

 [x] Done

2015-10-08 15:41:57 [x] Received 'Shuai Ge ai MeiNv ...6'

 [x] Done

2015-10-08 15:42:00 [x] Received 'Shuai Ge ai MeiNv ...7'

 [x] Done

2015-10-08 15:42:03 [x] Received 'Shuai Ge ai MeiNv ...8'

 [x] Done

2015-10-08 15:42:06 [x] Received 'Shuai Ge ai MeiNv ...9'

 [x] Done

2015-10-08 15:42:46 [x] Received 'Shuai Ge ai MeiNv ...'

 [x] Done

2015-10-08 15:42:49 [x] Received 'Shuai Ge ai MeiNv ...0'

 [x] Done

2015-10-08 15:42:52 [x] Received 'Shuai Ge ai MeiNv ...1'

 [x] Done

2015-10-08 15:42:55 [x] Received 'Shuai Ge ai MeiNv ...2'

 [x] Done

2015-10-08 15:42:58 [x] Received 'Shuai Ge ai MeiNv ...3'

 [x] Done

2015-10-08 15:43:01 [x] Received 'Shuai Ge ai MeiNv ...4'

 [x] Done

2015-10-08 15:43:04 [x] Received 'Shuai Ge ai MeiNv ...5'

 [x] Done

2015-10-08 15:43:07 [x] Received 'Shuai Ge ai MeiNv ...6'

 [x] Done

2015-10-08 15:43:10 [x] Received 'Shuai Ge ai MeiNv ...7'

 [x] Done

2015-10-08 15:43:13 [x] Received 'Shuai Ge ai MeiNv ...8'

 [x] Done

2015-10-08 15:43:16 [x] Received 'Shuai Ge ai MeiNv ...9'

 [x] Done


  循环分发消息(Round-robin dispatching)


  使用任务队列的一个好处是轻松处理并行工作,如果我们有一个积压的工作,通过添加更多的工人就可以完成。


  首先,现在有两个worker实例在同时工作,他们都从队列中读取消息。接下来这么做:


  (1)运行NewTask类,发送10个消息队列,控制台输出如下内容:


1
2
3
4
5
6
7
8
9
10
11
  [x] Sent  'Shuai Ge ai MeiNv ...'
Sent Message:Shuai Ge ai MeiNv ...0
Sent Message:Shuai Ge ai MeiNv ...1
Sent Message:Shuai Ge ai MeiNv ...2
Sent Message:Shuai Ge ai MeiNv ...3
Sent Message:Shuai Ge ai MeiNv ...4
Sent Message:Shuai Ge ai MeiNv ...5
Sent Message:Shuai Ge ai MeiNv ...6
Sent Message:Shuai Ge ai MeiNv ...7
Sent Message:Shuai Ge ai MeiNv ...8
Sent Message:Shuai Ge ai MeiNv ...9


   (2)启动一个worker实例,其输出内容如下:


1
2
3
4
5
6
7
8
9
10
11
12
2015-10-08 15:53:45 [x] Received  'Shuai Ge ai MeiNv ...'
  [x] Done
2015-10-08 15:53:48 [x] Received  'Shuai Ge ai MeiNv ...1'
  [x] Done
2015-10-08 15:53:51 [x] Received  'Shuai Ge ai MeiNv ...3'
  [x] Done
2015-10-08 15:53:54 [x] Received  'Shuai Ge ai MeiNv ...5'
  [x] Done
2015-10-08 15:53:57 [x] Received  'Shuai Ge ai MeiNv ...7'
  [x] Done
2015-10-08 15:54:00 [x] Received  'Shuai Ge ai MeiNv ...9'
  [x] Done


  (3)启动另外一个worker实例,其输出内容如下:


1
2
3
4
5
6
7
8
9
10
2015-10-08 15:53:45 [x] Received  'Shuai Ge ai MeiNv ...0'
  [x] Done
2015-10-08 15:53:48 [x] Received  'Shuai Ge ai MeiNv ...2'
  [x] Done
2015-10-08 15:53:51 [x] Received  'Shuai Ge ai MeiNv ...4'
  [x] Done
2015-10-08 15:53:54 [x] Received  'Shuai Ge ai MeiNv ...6'
  [x] Done
2015-10-08 15:53:57 [x] Received  'Shuai Ge ai MeiNv ...8'
  [x] Done


  RabbitMQ可能会出现下述所示的队列变化图  


wKiom1YXcj3zEONoAAHSUcEuN5s109.jpg


  默认情况下,RabbitMQ会按顺序将消息发送给下一个消费者,每个消费者都有相同数量的信息,跟消息的持续时长没有关系。这种分发消息的模式就是循环分发(round-robin)。


  消息应答模式(Message acknowledgment)


  每个任务执行都会占用几秒钟时间,如果一个任务启动用了很长时间后因为某种原因死掉了,但只完成了部分任务,该怎么办呢?在上面的round-robin模式下,一旦RabbitMQ将消息分发给一个消费者就会立即将其从内存中移除。在这种情况下,如果杀掉worker进程就会丢失正在处理的消息,当然也会丢失分发给该worker的未处理的消息。


  但我们不想丢失任何任务。如果一个worker进程死掉了,我们希望将该任务分发给其它工作进程。


  为了解决上面的问题,RabbitMQ支持应答模式让消费者告诉RabbitMQ特定的消息是否已经收到并处理,如果处理了就从内存中移除。


  如果一个消息消费者没有应答的话,RabbitMQ会假设该消息没有处理并将它转发给其它消费者。这样就能确保消息不会丢失,即便工作进程意外死掉。


  消息没有超时一说,RabbitMQ只有在工作进程连接死掉的时候才会重新投递消息。即便一个消息需要很长很长的时间处理也是不会出问题。


  消息应答模式默认是开启的,在前面的例子我们通过autoAck=true显式的关闭了。现在将该属性设置为true即可。


  消息持久化(Message durability)


  上面我们知道了如何处理消息消费者死机的问题,但是如果RabbitMQ服务器宕机呢?


  当RabbitMQ退出或崩溃时,除非你提醒它,否则它会忘记队列和消息。若想消息不丢失的话,就必须让队列和消息都设为持久化。


  若想RabbitMQ不会丢失队列的话,可以通过下面的方式将其声明为持久化:


1
2
boolean durable =  true ;
channel.queueDeclare( "hello" , durable,  false false , null);


  尽管上面的代码是正确的,但是它不会起作用的,因为我们已经定义了非持久化的“hello”队列。RabbitMQ不允许使用不同的参数重新定义已存在的队列,那样的话会返回错误。我们可以采用将其声明为不同的队列名字作为解决方案,如:


1
2
boolean durable =  true ;
channel.queueDeclare( "task_queue" , durable,  false false , null);


  队列声明改变后需要同时应用到消息生产者和消息消费者身上。


  这时,我们就能确保RabbitMQ重启后task_queue队列不会丢失。现在需要通过设置 MessageProperties 属性值为 PERSISTENT_TEXT_PLAIN 将消息标记为持久化。


1
2
import  com.rabbitmq.client.MessageProperties;
channel.basicPublish( "" "task_queue" ,  MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());


  公平分发消息(Fair dispatch)


  你可能注意到分发有时候并不像我们想象的那样,比如,有两个消息消费者时有一个一边的消息是复杂耗时的,而另一边消息是简单快速的,这样一个队列经常是繁忙的,而另一个队列非常轻松。RabbitMQ并不知道这些仍然是平均分发消息。


  造成这样的原因是RabbitMQ仅仅是当消息到达队列的出口时才转发消息,它并不在乎未到达消息消费者的消息数量。它只是盲目的将奇数消息发送给一个消费者,偶数消息发送给另一个消费者。


  解决上面问题的方法就是设置 prefetchCount = 1,这就好比告诉RabbitMQ每个只给工作进程一个消息。换句话说,就是在工作进程处理完并应答该消息前,不会发送给它新的消息,它会把它消息发送给其它的空闲工作进程。


1
2
int prefetchCount = 1;
channel.basicQos(prefetchCount);





本文转自 genuinecx 51CTO博客,原文链接:http://blog.51cto.com/favccxx/1701253,如需转载请自行联系原作者
相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
5月前
|
Java 关系型数据库 数据库
Java 项目实战教程从基础到进阶实战案例分析详解
本文介绍了多个Java项目实战案例,涵盖企业级管理系统、电商平台、在线书店及新手小项目,结合Spring Boot、Spring Cloud、MyBatis等主流技术,通过实际应用场景帮助开发者掌握Java项目开发的核心技能,适合从基础到进阶的学习与实践。
909 3
|
4月前
|
安全 Java
Java之泛型使用教程
Java之泛型使用教程
377 10
|
9月前
|
前端开发 Java
java实现队列数据结构代码详解
本文详细解析了Java中队列数据结构的实现,包括队列的基本概念、应用场景及代码实现。队列是一种遵循“先进先出”原则的线性结构,支持在队尾插入和队头删除操作。文章介绍了顺序队列与链式队列,并重点分析了循环队列的实现方式以解决溢出问题。通过具体代码示例(如`enqueue`入队和`dequeue`出队),展示了队列的操作逻辑,帮助读者深入理解其工作机制。
306 1
|
3月前
|
Oracle Java 关系型数据库
Java 简单教程
Java是跨平台、面向对象的编程语言,广泛用于企业开发、Android应用等。本教程涵盖环境搭建、基础语法、流程控制、面向对象、集合与异常处理,助你快速入门并编写简单程序,为进一步深入学习打下坚实基础。
386 0
|
11月前
|
JavaScript NoSQL Java
接替此文【下篇-服务端+后台管理】优雅草蜻蜓z系统JAVA版暗影版为例-【蜻蜓z系列通用】-2025年全新项目整合搭建方式-这是独立吃透代码以后首次改变-独立PC版本vue版搭建教程-优雅草卓伊凡
接替此文【下篇-服务端+后台管理】优雅草蜻蜓z系统JAVA版暗影版为例-【蜻蜓z系列通用】-2025年全新项目整合搭建方式-这是独立吃透代码以后首次改变-独立PC版本vue版搭建教程-优雅草卓伊凡
572 96
接替此文【下篇-服务端+后台管理】优雅草蜻蜓z系统JAVA版暗影版为例-【蜻蜓z系列通用】-2025年全新项目整合搭建方式-这是独立吃透代码以后首次改变-独立PC版本vue版搭建教程-优雅草卓伊凡
|
6月前
|
缓存 安全 Java
Java 并发新特性实战教程之核心特性详解与项目实战
本教程深入解析Java 8至Java 19并发编程新特性,涵盖CompletableFuture异步编程、StampedLock读写锁、Flow API响应式流、VarHandle内存访问及结构化并发等核心技术。结合电商订单处理、缓存系统、实时数据流、高性能计数器与用户资料聚合等实战案例,帮助开发者高效构建高并发、低延迟、易维护的Java应用。适合中高级Java开发者提升并发编程能力。
229 0
|
7月前
|
Oracle Java 关系型数据库
java 编程基础入门级超级完整版教程详解
这份文档是针对Java编程入门学习者的超级完整版教程,涵盖了从环境搭建到实际项目应用的全方位内容。首先介绍了Java的基本概念与开发环境配置方法,随后深入讲解了基础语法、控制流程、面向对象编程的核心思想,并配以具体代码示例。接着探讨了常用类库与API的应用,如字符串操作、集合框架及文件处理等。最后通过一个学生成绩管理系统的实例,帮助读者将理论知识应用于实践。此外,还提供了进阶学习建议,引导学员逐步掌握更复杂的Java技术。适合初学者系统性学习Java编程。资源地址:[点击访问](https://pan.quark.cn/s/14fcf913bae6)。
979 2
|
消息中间件 Java 数据库
自研Java框架 Sunrays-Framework使用教程「博客之星」
### Sunrays-Framework:助力高效开发的Java微服务框架 **Sunrays-Framework** 是一款基于 Spring Boot 构建的高效微服务开发框架,深度融合了 Spring Cloud 生态中的核心技术组件。它旨在简化数据访问、缓存管理、消息队列、文件存储等常见开发任务,帮助开发者快速构建高质量的企业级应用。 #### 核心功能 - **MyBatis-Plus**:简化数据访问层开发,提供强大的 CRUD 操作和分页功能。 - **Redis**:实现高性能缓存和分布式锁,提升系统响应速度。 - **RabbitMQ**:可靠的消息队列支持,适用于异步
自研Java框架 Sunrays-Framework使用教程「博客之星」
|
存储 移动开发 算法
【潜意识Java】Java基础教程:从零开始的学习之旅
本文介绍了 Java 编程语言的基础知识,涵盖从简介、程序结构到面向对象编程的核心概念。首先,Java 是一种高级、跨平台的面向对象语言,支持“一次编写,到处运行”。接着,文章详细讲解了 Java 程序的基本结构,包括包声明、导入语句、类声明和 main 方法。随后,深入探讨了基础语法,如数据类型、变量、控制结构、方法和数组。此外,还介绍了面向对象编程的关键概念,例如类与对象、继承和多态。最后,针对常见的编程错误提供了调试技巧,并总结了学习 Java 的重要性和方法。适合初学者逐步掌握 Java 编程。
313 1
|
前端开发 Java 开发工具
Git使用教程-将idea本地Java等文件配置到gitte上【保姆级教程】
本内容详细介绍了使用Git进行版本控制的全过程,涵盖从本地仓库创建到远程仓库配置,以及最终推送代码至远程仓库的步骤。
1105 0

热门文章

最新文章