3.工作模式
RabbitMQ有以下几种工作模式 :
1、Work queues
2、Publish/Subscribe
3、Routing
4、Topics
5、Header
6、RPC
3.1.Work queues
work queues与入门程序相比,多了一个消费端,两个消费端共同消费同一个队列中的消息。
应用场景:对于 任务过重或任务较多情况使用工作队列可以提高任务处理的速度。
测试:
1、使用入门程序,启动多个消费者。
2、生产者发送多个消息。
结果:
1、一条消息只会被一个消费者接收;
2、rabbit采用轮询的方式将消息是平均发送给消费者的;
3、消费者在处理完某条消息后,才会收到下一条消息。
3.2.Publish/subscribe
3.2.1.工作模式
发布订阅模式:
1、每个消费者监听自己的队列。
2、生产者将消息发给broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收
到消息
3.2.2.代码
案例:
用户通知,当用户充值成功或转账完成系统通知用户,通知方式有短信、邮件多种方法 。
1、生产者
声明Exchange_fanout_inform交换机。
声明两个队列并且绑定到此交换机,绑定时不需要指定routingkey
发送消息时不需要指定routingkey
import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Producer02_publish { //队列名称 private static final String QUEUE_INFORM_EMAIL = "queue_inform_email"; private static final String QUEUE_INFORM_SMS = "queue_inform_sms"; private static final String EXCHANGE_FANOUT_INFORM="exchange_fanout_inform"; public static void main(String[] args) { Connection connection = null; Channel channel = null; try { //创建一个与MQ的连接 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); factory.setUsername("guest"); factory.setPassword("guest"); factory.setVirtualHost("/");//rabbitmq默认虚拟机名称为“/”,虚拟机相当于一个独立的mq服务 器 //创建一个连接 connection = factory.newConnection(); //创建与交换机的通道,每个通道代表一个会话 channel = connection.createChannel(); //声明交换机 String exchange, BuiltinExchangeType type /** * 参数明细 * 1、交换机名称 * 2、交换机类型,fanout、topic、direct、headers */ channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM, BuiltinExchangeType.FANOUT); //声明队列 // (String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String,Object> arguments) /** * 参数明细: * 1、队列名称 * 2、是否持久化 * 3、是否独占此队列 * 4、队列不用是否自动删除 * 5、参数 */ channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null); channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null); //交换机和队列绑定String queue, String exchange, String routingKey /** * 参数明细 * 1、队列名称 * 2、交换机名称 * 3、路由key */ channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_FANOUT_INFORM,""); channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_FANOUT_INFORM,""); //发送消息 for (int i=0;i<10;i++){ String message = "inform to user"+i; //向交换机发送消息 String exchange, String routingKey, BasicProperties props,byte[] body /** * 参数明细 * 1、交换机名称,不指令使用默认交换机名称 Default Exchange * 2、routingKey(路由key),根据key名称将消息转发到具体的队列,这里填写队列名称表示消 息将发到此队列 * 3、消息属性 * 4、消息内容 */ channel.basicPublish(EXCHANGE_FANOUT_INFORM, "", null, message.getBytes()); System.out.println("Send Message is:'" + message + "'"); } } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); }finally{ if(channel!=null){ try { channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } if(connection!=null){ try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
2、邮件发送消费者
/** * @author Administrator * @version 1.0 * @create 2018‐06‐14 10:32 **/ public class Consumer02_subscribe_email { //队列名称 private static final String QUEUE_INFORM_EMAIL = "inform_queue_email"; private static final String EXCHANGE_FANOUT_INFORM="inform_exchange_fanout"; public static void main(String[] args) throws IOException, TimeoutException { //创建一个与MQ的连接 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); factory.setUsername("guest"); factory.setPassword("guest"); factory.setVirtualHost("/");//rabbitmq默认虚拟机名称为“/”,虚拟机相当于一个独立的mq服务器 //创建一个连接 Connection connection = factory.newConnection(); //创建与交换机的通道,每个通道代表一个会话 Channel channel = connection.createChannel(); //声明交换机 String exchange, BuiltinExchangeType type /** * 参数明细 * 1、交换机名称 * 2、交换机类型,fanout、topic、direct、headers */ channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM, BuiltinExchangeType.FANOUT); //声明队列 // channel.queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) /** * 参数明细: * 1、队列名称 * 2、是否持久化 * 3、是否独占此队列 * 4、队列不用是否自动删除 * 5、参数 */ channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null); //交换机和队列绑定String queue, String exchange, String routingKey /** * 参数明细 * 1、队列名称 * 2、交换机名称 * 3、路由key */ channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_FANOUT_INFORM,""); //定义消费方法 DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { long deliveryTag = envelope.getDeliveryTag(); String exchange = envelope.getExchange(); //消息内容 String message = new String(body, "utf‐8"); System.out.println(message); } }; /** * 监听队列String queue, boolean autoAck,Consumer callback * 参数明细 * 1、队列名称 * 2、是否自动回复,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置 为false则需要手动回复 * 3、消费消息的方法,消费者接收到消息后调用此方法 */ channel.basicConsume(QUEUE_INFORM_EMAIL, true, defaultConsumer); } }
按照上边的代码,编写邮件通知的消费代码。
3、短信发送消费者
可参考上边的邮件发送消费者代码编写。
3.2.3.测试
打开RabbitMQ的管理界面,观察交换机绑定情况:
使用生产者发送若干条消息,每条消息都转发到各各队列,每消费者都接收到了消息。
3.2.3.思考
1、publish/subscribe与work queues有什么区别。
区别:
1)work queues不用定义交换机,而publish/subscribe需要定义交换机。
2)publish/subscribe的生产方是面向交换机发送消息,work queues的生产方是面向队列发送消息(底层使用默认
交换机)。
3)publish/subscribe需要设置队列和交换机的绑定,work queues不需要设置,实质上work queues会将队列绑定到默认的交换机 。
相同点:
所以两者实现的发布/订阅的效果是一样的,多个消费端监听同一个队列不会重复消费消息。
2、实质工作用什么 publish/subscribe还是work queues。
建议使用 publish/subscribe,发布订阅模式比工作队列模式更强大,并且发布订阅模式可以指定自己专用的交换
机。
3.3.Routing
3.3.1.工作模式
路由模式:
1、每个消费者监听自己的队列,并且设置routingkey。
2、生产者将消息发给交换机,由交换机根据routingkey来转发消息到指定的队列。
3.3.2.代码
1、生产者
声明exchange_routing_inform交换机。
声明两个队列并且绑定到此交换机,绑定时需要指定routingkey
发送消息时需要指定routingkey
import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Producer03_routing { //队列名称 private static final String QUEUE_INFORM_EMAIL = "queue_inform_email"; private static final String QUEUE_INFORM_SMS = "queue_inform_sms"; private static final String EXCHANGE_ROUTING_INFORM="exchange_routing_inform"; public static void main(String[] args) { Connection connection = null; Channel channel = null; try { //创建一个与MQ的连接 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); factory.setUsername("guest"); factory.setPassword("guest"); factory.setVirtualHost("/");//rabbitmq默认虚拟机名称为“/”,虚拟机相当于一个独立的mq服务 器 //创建一个连接 connection = factory.newConnection(); //创建与交换机的通道,每个通道代表一个会话 channel = connection.createChannel(); //声明交换机 String exchange, BuiltinExchangeType type /** * 参数明细 * 1、交换机名称 * 2、交换机类型,fanout、topic、direct、headers */ channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT); //声明队列 // channel.queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) /** * 参数明细: * 1、队列名称 * 2、是否持久化 * 3、是否独占此队列 * 4、队列不用是否自动删除 * 5、参数 */ channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null); channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null); //交换机和队列绑定String queue, String exchange, String routingKey /** * 参数明细 * 1、队列名称 * 2、交换机名称 * 3、路由key */ channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_ROUTING_INFORM,QUEUE_INFORM_EMAIL); channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_ROUTING_INFORM,QUEUE_INFORM_SMS); //发送邮件消息 for (int i=0;i<10;i++){ String message = "email inform to user"+i; //向交换机发送消息 String exchange, String routingKey, BasicProperties props,byte[] body /** * 参数明细 * 1、交换机名称,不指令使用默认交换机名称 Default Exchange * 2、routingKey(路由key),根据key名称将消息转发到具体的队列,这里填写队列名称表示消 息将发到此队列 * 3、消息属性 * 4、消息内容 */ channel.basicPublish(EXCHANGE_ROUTING_INFORM, QUEUE_INFORM_EMAIL,null,message.getBytes()); System.out.println("Send Message is:'" + message + "'"); } //发送短信消息 for (int i=0;i<10;i++){ String message = "sms inform to user"+i; //向交换机发送消息 String exchange, String routingKey, BasicProperties props,byte[] body channel.basicPublish(EXCHANGE_ROUTING_INFORM, QUEUE_INFORM_SMS, null,message.getBytes()); System.out.println("Send Message is:'" + message + "'"); } } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); }finally{ if(channel!=null){ try { channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } if(connection!=null){ try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
2、邮件发送消费者
import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Consumer03_routing_email { //队列名称 private static final String QUEUE_INFORM_EMAIL = "inform_queue_email"; private static final String EXCHANGE_ROUTING_INFORM="inform_exchange_routing"; public static void main(String[] args) throws IOException, TimeoutException { //创建一个与MQ的连接 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); factory.setUsername("guest"); factory.setPassword("guest"); factory.setVirtualHost("/");//rabbitmq默认虚拟机名称为“/”,虚拟机相当于一个独立的mq服务器 //创建一个连接 Connection connection = factory.newConnection(); //创建与交换机的通道,每个通道代表一个会话 Channel channel = connection.createChannel(); //声明交换机 String exchange, BuiltinExchangeType type /** * 参数明细 * 1、交换机名称 * 2、交换机类型,fanout、topic、direct、headers */ channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT); //声明队列 // channel.queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) /** * 参数明细: * 1、队列名称 * 2、是否持久化 * 3、是否独占此队列 * 4、队列不用是否自动删除 * 5、参数 */ channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null); //交换机和队列绑定String queue, String exchange, String routingKey /** * 参数明细 * 1、队列名称 * 2、交换机名称 * 3、路由key */ channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_ROUTING_INFORM,QUEUE_INFORM_EMAIL); //定义消费方法 DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) throws IOException { long deliveryTag = envelope.getDeliveryTag(); String exchange = envelope.getExchange(); //消息内容 String message = new String(body, "utf‐8"); System.out.println(message); } }; /** * 监听队列String queue, boolean autoAck,Consumer callback * 参数明细 * 1、队列名称 * 2、是否自动回复,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置 为false则需要手动回复 * 3、消费消息的方法,消费者接收到消息后调用此方法 */ channel.basicConsume(QUEUE_INFORM_EMAIL, true, defaultConsumer); } }
3 、短信发送消费者
可以参考邮件发送消费者的代码流程,编写短信通知的代码。
3.3.3.测试
打开RabbitMQ的管理界面,观察交换机绑定情况:
使用生产者发送若干条消息,交换机根据routingkey转发消息到指定的队列。
3.3.4.思考
Routing模式和Publish/subscibe有啥区别?
Routing模式要求队列在绑定交换机时要指定routingkey,消息会转发到符合routingkey的队列。
3.4.Topics
3.4.1.工作模式
路由模式:
1、每个消费者监听自己的队列,并且设置带统配符的routingkey。
2、生产者将消息发给broker,由交换机根据routingkey来转发消息到指定的队列。





