正文
五、RabitMQ消息类型
第一种:点对点 生产者将消息发送到队列,然后消费者从队列中取消息依次消费,消费之后,消息出队列,本次消费结束
第二种:工作队列。又称任务队列。主要思想就是避免执行资源密集型任务时,必须等待它执行完成。我们将任务封装为消息并将其发送到队列。 在后台运行的工作进程将获取任务并最终执行作业。消息在多个消费者共享,但是一个消息只能被一个消费者消费。
总之:让多个消费者绑定到一个队列,共同消费队列中的消息。队列中的消息一旦消
费,就会消失,因此任务是不会被重复执行的 。
第三种:发布订阅、Routing(路由键)、Topics(主题)
1、1个生产者,多个消费者
2、每一个消费者都有自己的一个队列
3、生产者没有将消息直接发送到队列,而是发送到了交换机
4、每个队列都要绑定到交换机
5、生产者发送的消息,经过交换机到达队列,实现一个消息被多个消费者获取的目的。
X(Exchanges):交换机一方面:接收生产者发送的消息。另一方面:知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型direct(直连交换机,把消息交给符合指定routing key 的队列)、topic(通配符,把消息交给符合routing pattern(路由模式) 的队列)、headers 和fanout(扇形交换机或者广播,将消息交给所有绑定到交换机的队列)。Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失。
(1)订阅模型-Fanout
Fanout,也称为广播。
在广播模式下,消息发送流程是这样的:
1) 可以有多个消费者
2) 每个消费者有自己的queue(队列)
3) 每个队列都要绑定到Exchange(交换机)
4) 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定。
5) 交换机把消息发送给绑定过的所有队列
6) 队列的消费者都能拿到消息。实现一条消息被多个消费者消费
(2)订阅模型-Direct:
在Direct模型下,队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key),消息的发送方在向Exchange发送消息时,也必须指定消息的routing key。
P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key。
X:Exchange(交换机),接收生产者的消息,然后把消息递交给 与routing key完全匹配的队列
C1:消费者,其所在队列指定了需要routing key 为 error 的消息
C2:消费者,其所在队列指定了需要routing key 为 info、error、warning 的消息
(3)订阅模型-Topic
Topic 类型的 Exchange 与 Direct 相比,都是可以根据 RoutingKey 把消息路由到不同的队列。只不过 Topic 类型 Exchange 可以让队列在绑定 Routing key 的时候使用通配符
通配符规则:#:匹配一个或多个词
*:匹配不多不少恰好 1 个词
六、代码
连接工具
package com.xiaojie.rabbitmq; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @author xiaojie * @version 1.0 * @description: 创建mq连接 * @date 2021/9/24 22:54 */ public class MyConnection { public static Connection getConnection() throws IOException, TimeoutException { // 1.创建连接 ConnectionFactory connectionFactory = new ConnectionFactory(); // 2.设置连接地址 connectionFactory.setHost("192.168.139.154"); // 3.设置端口号 connectionFactory.setPort(5672); // 4.设置账号和密码 connectionFactory.setUsername("admin"); connectionFactory.setPassword("admin"); // 5.设置VirtualHost connectionFactory.setVirtualHost("/xiaojie"); return connectionFactory.newConnection(); } }
点对点和工作队列模式
package com.xiaojie.rabbitmq.p2p; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.xiaojie.rabbitmq.MyConnection; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException; /** * @author xiaojie * @version 1.0 * @description: 点对点生产者 * 生产者生产消息时如果没有对应的队列,则直接遗弃消息,并不会报错。 * @date 2021/9/24 22:53 */ public class PProvider { //定义队列 private static final String QUEUE_NAME = "myqueue"; public static void main(String[] args) throws IOException, TimeoutException { System.out.println("生产者启动成功.."); // 1.创建连接 Connection connection = MyConnection.getConnection(); // 2.创建通道 Channel channel = connection.createChannel(); //创建队列,如果队列存在则使用这个队列,不存在则创建 //第一个参数,对列名称 myqueue //第二个参数,是否持久话,false表示不持久化数据,MQ停掉后数据就会丢失 //第三个参数,是否队列私有化,false表示所有的消费者都可以访问,true表示只有第一次拥有它的消费者才可以一直使用,其他消费者不能访问 //第四个参数,是否自动删除,false连接停掉后不自动删除掉这个队列 //第五个参数,其他额外的参数 channel.queueDeclare(QUEUE_NAME, false, false, false, null); for (int i = 0; i < 10; i++) { String msg = "测试点对点消息" + i; channel.basicPublish("", QUEUE_NAME, null, msg.getBytes(StandardCharsets.UTF_8)); System.out.println("生产者发送消息成功:" + msg); } channel.close(); connection.close(); } }
package com.xiaojie.rabbitmq.p2p; import com.rabbitmq.client.*; import com.xiaojie.rabbitmq.MyConnection; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @author xiaojie * @version 1.0 * @description: TODO * @date 2021/9/24 23:15 */ public class PConsumer { private static final String QUEUE_NAME = "myqueue"; public static void main(String[] args) throws IOException, TimeoutException { // 1.创建我们的连接 Connection connection = MyConnection.getConnection(); // 2.创建我们通道 Channel channel = connection.createChannel(); DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "UTF-8"); System.out.println("消费消息msg:" + msg); } }; // 3.创建我们的监听的消息 channel.basicConsume(QUEUE_NAME, true, defaultConsumer); } }
package com.xiaojie.rabbitmq.p2p; import com.rabbitmq.client.*; import com.xiaojie.rabbitmq.MyConnection; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @author xiaojie * @version 1.0 * @description: 模拟工作队列,即多个消费者消费消息, * 结果是消息均等消费,就是在工作队列模式下,默认情况下消息是均摊到每个消费者的。 * @date 2021/9/24 23:15 */ public class PConsumer2 { private static final String QUEUE_NAME = "myqueue"; public static void main(String[] args) throws IOException, TimeoutException { // 1.创建我们的连接 Connection connection = MyConnection.getConnection(); // 2.创建我们通道 Channel channel = connection.createChannel(); DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "UTF-8"); System.out.println("消费消息msg:" + msg); } }; // 3.创建我们的监听的消息 channel.basicConsume(QUEUE_NAME, true, defaultConsumer); } }
Fanout模式
package com.xiaojie.rabbitmq.fanout; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.xiaojie.rabbitmq.MyConnection; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException; /** * @author xiaojie * @version 1.0 * @description: fanout模式生产者 * @date 2021/9/24 23:45 */ public class Provider { public static final String EXCHANGE="my_fanout_exchange"; public static void main(String[] args) throws IOException, TimeoutException { //创建连接 Connection connection = MyConnection.getConnection(); //创建通道 Channel channel = connection.createChannel(); //生产者绑定交换机 参数1 交换机的名称。2,交换机的类型(扇形交换机) channel.exchangeDeclare(EXCHANGE, "fanout"); //创建消息 String msg="fanout交换机消息。。。。"; //发送消息 channel.basicPublish(EXCHANGE, "", null, msg.getBytes(StandardCharsets.UTF_8)); System.out.println("消息发送成功。。。。。。"); //关闭通道,关闭连接 channel.close(); connection.close(); } }
package com.xiaojie.rabbitmq.fanout; import com.rabbitmq.client.*; import com.xiaojie.rabbitmq.MyConnection; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @author xiaojie * @version 1.0 * @description: 短信消费者 * 扇形交换机是通过同一个交换机,将消息处理到不同的队列,不同的队列对应不同的消费者 * @date 2021/9/24 23:47 */ public class SmsConsumer { //交换机 private static final String EXCHANGE="my_fanout_exchange"; //队列 private static final String SMS_QUEUE="sms_queue"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = MyConnection.getConnection(); Channel channel = connection.createChannel(); //创建队列 channel.queueDeclare(SMS_QUEUE, false, false, false, null); //绑定队列到交换机 channel.queueBind(SMS_QUEUE, EXCHANGE, ""); System.out.println("短信消费者开启。。。。"); //开启生产者监听 DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg= new String(body,"utf-8"); System.out.println("接收到的短信消息时msg:"+msg); } }; //设置应答模式 channel.basicConsume(SMS_QUEUE,true, consumer); } }
package com.xiaojie.rabbitmq.fanout; import com.rabbitmq.client.*; import com.xiaojie.rabbitmq.MyConnection; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @author xiaojie * @version 1.0 * @description: 微信消费者 * 扇形交换机是通过同一个交换机,将消息处理到不同的队列,不同的队列对应不同的消费者 * @date 2021/9/24 23:47 */ public class WxConsumer { //交换机 private static final String EXCHANGE="my_fanout_exchange"; //队列 private static final String WX_QUEUE="wx_queue"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = MyConnection.getConnection(); Channel channel = connection.createChannel(); //创建队列 channel.queueDeclare(WX_QUEUE, false, false, false, null); //绑定队列到交换机 channel.queueBind(WX_QUEUE, EXCHANGE, ""); System.out.println("微信消费者开启。。。。"); //开启生产者监听 DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg= new String(body,"utf-8"); System.out.println("接收到的短信消息时msg:"+msg); } }; //设置应答模式 channel.basicConsume(WX_QUEUE,true, consumer); } }
Direct模式
package com.xiaojie.rabbitmq.direct; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.xiaojie.rabbitmq.MyConnection; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @author xiaojie * @version 1.0 * @description: direct生产者 * @date 2021/9/24 23:39 */ public class Provider { public static final String EXCHANGE="my_direct_exchange"; public static void main(String[] args) throws IOException, TimeoutException { //创建连接 Connection connection = MyConnection.getConnection(); //创建通道 Channel channel = connection.createChannel(); //生产者绑定交换机 参数1 交换机的名称。2,交换机的类型 channel.exchangeDeclare(EXCHANGE, "direct"); //路由键 String routingKey="email"; //创建消息 String msg="direct---交换机的消息。。。。。"; //发送消息 channel.basicPublish(EXCHANGE, routingKey, null, msg.getBytes()); System.out.println("生产者启动成功。。。。。"); //关闭连接,管道 channel.close(); connection.close(); } }
package com.xiaojie.rabbitmq.direct; import com.rabbitmq.client.*; import com.xiaojie.rabbitmq.MyConnection; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @author xiaojie * @version 1.0 * @description: 路由键交换机 * @date 2021/9/25 0:01 */ public class Consumer { public static String EMAIL_QUEUE_FANOUT="email_queue"; public static final String EXCHANGE="my_direct_exchange"; public static void main(String[] args) throws IOException, TimeoutException { //创建mq连接 Connection connection = MyConnection.getConnection(); //创建通道 Channel channel = connection.createChannel(); //创建队列 channel.queueDeclare(EMAIL_QUEUE_FANOUT, false, false, false, null); //消费者队列绑定交换机 参数分别是 队列、交换机、routingkey channel.queueBind(EMAIL_QUEUE_FANOUT, EXCHANGE, "email"); System.out.println("邮件消费者开启。。。。"); //开启生产者监听 DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg= new String(body,"utf-8"); System.out.println("接收到的消息时msg:"+msg); } }; //设置应答模式 channel.basicConsume(EMAIL_QUEUE_FANOUT,true, consumer); } }
Topic模式
package com.xiaojie.rabbitmq.topic; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.xiaojie.rabbitmq.MyConnection; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @author xiaojie * @version 1.0 * @description: topic生产者 通配符模式 * @date 2021/9/24 23:39 */ public class Provider { public static final String EXCHANGE="my_topic_exchange"; public static void main(String[] args) throws IOException, TimeoutException { //创建连接 Connection connection = MyConnection.getConnection(); //创建通道 Channel channel = connection.createChannel(); //生产者绑定交换机 参数1 交换机的名称。2,交换机的类型 channel.exchangeDeclare(EXCHANGE, "topic"); //路由键 String routingKey="email.send"; //创建消息 String msg="topic---交换机的消息。。。。。"; //发送消息 channel.basicPublish(EXCHANGE, routingKey, null, msg.getBytes()); System.out.println("生产者启动成功。。。。。"); //关闭连接,管道 channel.close(); connection.close(); } }
package com.xiaojie.rabbitmq.topic; import com.rabbitmq.client.*; import com.xiaojie.rabbitmq.MyConnection; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @author xiaojie * @version 1.0 * @description: 路由键交换机 * 通配符 #:匹配一个或多个词;*:匹配不多不少恰好 1 个词 * @date 2021/9/25 0:01 */ public class Consumer { public static String EMAIL_QUEUE_FANOUT="email_queue"; public static final String EXCHANGE="my_topic_exchange"; public static void main(String[] args) throws IOException, TimeoutException { //创建mq连接 Connection connection = MyConnection.getConnection(); //创建通道 Channel channel = connection.createChannel(); //创建队列 channel.queueDeclare(EMAIL_QUEUE_FANOUT, false, false, false, null); //消费者队列绑定交换机 参数分别是 队列、交换机、routingkey //通配符 #:匹配一个或多个词;*:匹配不多不少恰好 1 个词 channel.queueBind(EMAIL_QUEUE_FANOUT, EXCHANGE, "email.#"); System.out.println("邮件消费者开启。。。。"); //开启生产者监听 DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg= new String(body,"utf-8"); System.out.println("接收到的消息时msg:"+msg); } }; //设置应答模式 channel.basicConsume(EMAIL_QUEUE_FANOUT,true, consumer); } }
完整代码:spring-boot: Springboot整合redis、消息中间件等相关代码
参考:docker部署RabbitMQ集群 - Alan6 - 博客园