引言
在rabbitmq中大概有五种这种消费模式,简单来说是三种,因为后面三种都是基于路由的模式,在这小编就暂且分开来介绍吧。
第一种、简单队列
首先我们来看一下这种模式的图解
P:消息的生产者
C:消息的消费者
红色:队列
生产者发送消息到队列中,消费中从队列中获取消息。每个消息只能被一个消费着消费,即:当消息被消费中获取后,该消息将会在队列中移除。这中模式也可以成为“阅后即焚”。
示例代码:
获得连接的工具类:
package cn.itcast.rabbitmq.util; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class ConnectionUtil { public static Connection getConnection() throws Exception { // 定义连接工厂 ConnectionFactory factory = new ConnectionFactory(); // 设置服务地址 factory.setHost("192.168.92.9"); // 端口 factory.setPort(5672); // 设置账号信息,用户名、密码、vhost factory.setVirtualHost("/taotao"); factory.setUsername("taotao"); factory.setPassword("taotao"); // 通过工程获取连接 Connection connection = factory.newConnection(); return connection; } }
生产者代码:
package cn.itcast.rabbitmq.simple; import cn.itcast.rabbitmq.util.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; public class Send { private final static String QUEUE_NAME = "test_queue"; public static void main(String[] argv) throws Exception { // 获取到连接以及mq通道 Connection connection = ConnectionUtil.getConnection(); // 从连接中创建通道 Channel channel = connection.createChannel(); // 声明(创建)队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 消息内容 String message = "Hello World!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); //关闭通道和连接 channel.close(); connection.close(); } }
当生产者代码成功运行后,我们通过管理工具查看会发现一个队列,并且队列中有一条信息。
我们点击队列名称可以查看消息的详细信息。
消费者代码:
package cn.itcast.rabbitmq.simple; import cn.itcast.rabbitmq.util.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.QueueingConsumer; public class Recv { private final static String QUEUE_NAME = "test_queue"; public static void main(String[] argv) throws Exception { // 获取到连接以及mq通道 Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 声明队列 因为消费者事先并不知道该队列的存在,所以在获取消息之前,自己先定义一个队列 // 如果消费者事先非常确定,该队列的存在,则不需要在创建队列,创建已经存在的队列,盖伊队列会被忽略 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 定义队列的消费者 QueueingConsumer consumer = new QueueingConsumer(channel); // 监听队列 channel.basicConsume(QUEUE_NAME, true, consumer); // 获取消息 while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); } } }
当消费者代码成功运行后,我们通过管理工具会发现队列中消息的数量从1变为0,这就说明消息被消费者获取以后,会被队列删除。
第二种、work模式
一个生产者、两个消费者;一个消息只能被一个消费者获取。
在这种模式中又可以分为两种模式,一种是两个消费中平均消费队列中的消息。也就说无论两个消费者的消费能力如何,都会平均获取消息。另一种方式则是,能者多劳模式,处理消息能力强的消费者会获取更多的消息,这种模式似乎更符合一些。
生产者向队列中发送50条消息。
package cn.itcast.rabbitmq.work; import cn.itcast.rabbitmq.util.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; public class Send { private final static String QUEUE_NAME = "test_queue_work"; public static void main(String[] argv) throws Exception { // 获取到连接以及mq通道 Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); for (int i = 0; i < 50; i++) { // 消息内容 String message = "" + i; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); Thread.sleep(i * 10); } channel.close(); connection.close(); } }
我们看到上面的代码中,生产者每生产一条消息后都会休眠一段时间,并且越往后面休眠的时间越长。
消费者1代码:
package cn.itcast.rabbitmq.work; import cn.itcast.rabbitmq.util.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.QueueingConsumer; public class Recv { private final static String QUEUE_NAME = "test_queue_work"; public static void main(String[] argv) throws Exception { // 获取到连接以及mq通道 Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 同一时刻服务器只会发一条消息给消费者 //channel.basicQos(1); // 定义队列的消费者 QueueingConsumer consumer = new QueueingConsumer(channel); // 监听队列,手动返回完成 channel.basicConsume(QUEUE_NAME, false, consumer); // 获取消息 while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); //休眠 Thread.sleep(10); // 返回确认状态 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } }
消费者2和消费者1的代码一样,只是消费者2休眠的时间更长一些为1000ms.
上面的代码也就说,消费者1的处理能力比消费者2更强,但是当我们看到结果的时候会发现,两个消费者获得消息的数量却是一样的。这就是work模式中的平均消费模式。
在消费中的代码中红色部分是被注释掉了,现在我们将其打开,我们再次运行会发现,这时候消费者2仅仅获得了8条消息,而消费者1却获得了42条消息。这也就是我们说的work模式中“能者多劳”模式。
这句代码的意思大概是,服务器同一时刻只会发送一条消息给你消费者,但是并不指定发送给那个消费者,这时候那个消费者有能力获得谁就获取。
两种模式对比
通过上面的介绍我们会发现,这两种工作模式是有几处不相同的地方。在消费者数量上面:简单模式只有一个消费者,而work模式可以有多个消费者。认真的读者会发现,work模式中需要手动返回完成状态,而简单模式shiite自动完成的,这两种模式是通过监听队列中的一个参数设定的.false说明需要手动返回,true说明自动返回。
// 监听队列,手动返回完成状态 channel.basicConsume(QUEUE_NAME, false, consumer);
小结
上面介绍了比较简单的两种,这两种模式比较简单,还没有用到exchange的概念,在下面的博客中将介绍三种路由模式。


