3.4.2.代码
案例:
根据用户的通知设置去通知用户,设置接收Email的用户只接收Email,设置接收sms的用户只接收sms,设置两种
通知类型都接收的则两种通知都有效。
1、生产者
声明交换机,指定topic类型:
/** * 声明交换机 * param1:交换机名称 * param2:交换机类型 四种交换机类型:direct、fanout、topic、headers */ channel.exchangeDeclare(EXCHANGE_TOPICS_INFORM, BuiltinExchangeType.TOPIC); //Email通知 channel.basicPublish(EXCHANGE_TOPICS_INFORM, "inform.email", null, message.getBytes()); //sms通知 channel.basicPublish(EXCHANGE_TOPICS_INFORM, "inform.sms", null, message.getBytes()); //两种都通知 channel.basicPublish(EXCHANGE_TOPICS_INFORM, "inform.sms.email", null, message.getBytes());
完整代码:完整代码:
import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Producer04_topics { //队列名称 private static final String QUEUE_INFORM_EMAIL = "queue_inform_email"; private static final String QUEUE_INFORM_SMS = "queue_inform_sms"; private static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform"; public static void main(String[] args) { Connection connection = null; Channel channel = null; try { //创建一个与MQ的连接 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); factory.setUsername("guest"); factory.setPassword("guest"); factory.setVirtualHost("/");//rabbitmq默认虚拟机名称为“/”,虚拟机相当于一个独立的mq服务器 //创建一个连接 connection = factory.newConnection(); //创建与交换机的通道,每个通道代表一个会话 channel = connection.createChannel(); //声明交换机 String exchange, BuiltinExchangeType type /** * 参数明细* 1、交换机名称 * 2、交换机类型,fanout、topic、direct、headers */ channel.exchangeDeclare(EXCHANGE_TOPICS_INFORM, BuiltinExchangeType.TOPIC); //声明队列 /** * 参数明细: * 1、队列名称 * 2、是否持久化 * 3、是否独占此队列 * 4、队列不用是否自动删除 * 5、参数 */ channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null); channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null); //发送邮件消息 for (int i=0;i<10;i++){ String message = "email inform to user"+i; //向交换机发送消息 String exchange, String routingKey, BasicProperties props,byte[] body /** * 参数明细 * 1、交换机名称,不指令使用默认交换机名称 Default Exchange * 2、routingKey(路由key),根据key名称将消息转发到具体的队列,这里填写队列名称表示消 息将发到此队列 * 3、消息属性 * 4、消息内容 */ channel.basicPublish(EXCHANGE_TOPICS_INFORM, "inform.email", null,message.getBytes()); System.out.println("Send Message is:'" + message + "'"); } //发送短信消息 for (int i=0;i<10;i++){ String message = "sms inform to user"+i; channel.basicPublish(EXCHANGE_TOPICS_INFORM, "inform.sms", null,message.getBytes()); System.out.println("Send Message is:'" + message + "'"); } //发送短信和邮件消息 for (int i=0;i<10;i++){ String message = "sms and email inform to user"+i; channel.basicPublish(EXCHANGE_TOPICS_INFORM, "inform.sms.email", null,message.getBytes()); System.out.println("Send Message is:'" + message + "'"); } } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); }finally{ if(channel!=null){ try { channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } if(connection!=null){ try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
2、消费端
队列绑定交换机指定通配符:
统配符规则:
中间以“.”分隔。
符号#可以匹配多个词,符号*可以匹配一个词语。
//声明队列 channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null); channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null); //声明交换机 channel.exchangeDeclare(EXCHANGE_TOPICS_INFORM, BuiltinExchangeType.TOPIC); //绑定email通知队列 channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_TOPICS_INFORM,"inform.#.email.#"); //绑定sms通知队列 channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_TOPICS_INFORM,"inform.#.sms.#");
3.4.3.测试
使用生产者发送若干条消息,交换机根据routingkey统配符匹配并转发消息到指定的队列。
3.4.4.思考
本案例的需求使用Routing工作模式能否实现?
使用Routing模式也可以实现本案例,共设置三个 routingkey,分别是email、sms、all,email队列绑定email和
all,sms队列绑定sms和all,这样就可以实现上边案例的功能,实现过程比topics复杂。
Topic模式更多加强大,它可以实现Routing、publish/subscirbe模式的功能。
3.5.Header 模式
header模式与routing不同的地方在于,header模式取消routingkey,使用header中的 key/value(键值对)匹配队列。
案例:
根据用户的通知设置去通知用户,设置接收Email的用户只接收Email,设置接收sms的用户只接收sms,设置两种通知类型都接收的则两种通知都有效。
代码:
1)生产者
队列与交换机绑定的代码与之前不同,如下:
Map<String, Object> headers_email = new Hashtable<String, Object>(); headers_email.put("inform_type", "email"); Map<String, Object> headers_sms = new Hashtable<String, Object>(); headers_sms.put("inform_type", "sms"); channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_HEADERS_INFORM,"",headers_email); channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_HEADERS_INFORM,"",headers_sms);
通知:
String message = "email inform to user"+i; Map<String,Object> headers = new Hashtable<String, Object>(); headers.put("inform_type", "email");//匹配email通知消费者绑定的header //headers.put("inform_type", "sms");//匹配sms通知消费者绑定的header AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties.Builder(); properties.headers(headers); //Email通知 channel.basicPublish(EXCHANGE_HEADERS_INFORM, "", properties.build(), message.getBytes());
2)发送邮件消费者
channel.exchangeDeclare(EXCHANGE_HEADERS_INFORM, BuiltinExchangeType.HEADERS); Map<String, Object> headers_email = new Hashtable<String, Object>(); headers_email.put("inform_email", "email"); //交换机和队列绑定 channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_HEADERS_INFORM,"",headers_email); //指定消费队列 channel.basicConsume(QUEUE_INFORM_EMAIL, true, consumer);
3)测试
3.6.RPC
RPC即客户端远程调用服务端的方法 ,使用MQ可以实现RPC的异步调用,基于Direct交换机实现,流程如下:
1、客户端即是生产者就是消费者,向RPC请求队列发送RPC调用消息,同时监听RPC响应队列。
2、服务端监听RPC请求队列的消息,收到消息后执行服务端的方法,得到方法返回的结果
3、服务端将RPC方法 的结果发送到RPC响应队列
4、客户端(RPC调用方)监听RPC响应队列,接收到RPC调用结果。
4.Spring 整合RibbitMQ
4.1.搭建SpringBoot环境
我们选择基于Spring-Rabbit去操作RabbitMQ
https://github.com/spring-projects/spring-amqp
使用 spring-boot-starter-amqp会自动添加spring-rabbit依赖,如下:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring‐boot‐starter‐amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring‐boot‐starter‐test</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring‐boot‐starter‐logging</artifactId> </dependency>
4.2.配置
1、配置application.yml
配置连接rabbitmq的参数
server: port: 8081 spring: application: name: test‐rabbitmq‐producer rabbitmq: host: 127.0.0.1 port: 5672 username: guest password: guest virtualHost: /
2、定义RabbitConfig类,配置Exchange、Queue、及绑定交换机。
本例配置Topic交换机。
import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitmqConfig { public static final String QUEUE_INFORM_EMAIL = "queue_inform_email"; public static final String QUEUE_INFORM_SMS = "queue_inform_sms"; public static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform"; /** * 交换机配置 * ExchangeBuilder提供了fanout、direct、topic、header交换机类型的配置 * @return the exchange */ @Bean(EXCHANGE_TOPICS_INFORM) public Exchange EXCHANGE_TOPICS_INFORM() { //durable(true)持久化,消息队列重启后交换机仍然存在 return ExchangeBuilder.topicExchange(EXCHANGE_TOPICS_INFORM).durable(true).build(); } //声明队列 @Bean(QUEUE_INFORM_SMS) public Queue QUEUE_INFORM_SMS() { Queue queue = new Queue(QUEUE_INFORM_SMS); return queue; } //声明队列 @Bean(QUEUE_INFORM_EMAIL) public Queue QUEUE_INFORM_EMAIL() { Queue queue = new Queue(QUEUE_INFORM_EMAIL); return queue; } /** channel.queueBind(INFORM_QUEUE_SMS,"inform_exchange_topic","inform.#.sms.#"); * 绑定队列到交换机 . * * @param queue the queue * @param exchange the exchange * @return the binding */ @Bean public Binding BINDING_QUEUE_INFORM_SMS(@Qualifier(QUEUE_INFORM_SMS) Queue queue, @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange) { return BindingBuilder.bind(queue).to(exchange).with("inform.#.sms.#").noargs(); } @Bean public Binding BINDING_QUEUE_INFORM_EMAIL(@Qualifier(QUEUE_INFORM_EMAIL) Queue queue, @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange) { return BindingBuilder.bind(queue).to(exchange).with("inform.#.email.#").noargs(); } }
4.3.生产端
使用RarbbitTemplate发送消息
import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; @SpringBootTest @RunWith(SpringRunner.class) public class Producer05_topics_springboot { @Autowired RabbitTemplate rabbitTemplate; @Test public void testSendByTopics(){ for (int i=0;i<5;i++){ String message = "sms email inform to user"+i; rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_TOPICS_INFORM,"inform.sms.email",message); System.out.println("Send Message is:'" + message + "'"); } } }
4.4.消费端
创建消费端工程,添加依赖:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring‐boot‐starter‐amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring‐boot‐starter‐test</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring‐boot‐starter‐logging</artifactId> </dependency>
使用@RabbitListener注解监听队列。
import com.rabbitmq.client.Channel; import com.xuecheng.test.rabbitmq.config.RabbitmqConfig; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class ReceiveHandler { //监听email队列 @RabbitListener(queues = {RabbitmqConfig.QUEUE_INFORM_EMAIL}) public void receive_email(String msg,Message message,Channel channel){ System.out.println(msg); } //监听sms队列 @RabbitListener(queues = {RabbitmqConfig.QUEUE_INFORM_SMS}) public void receive_sms(String msg,Message message,Channel channel){ System.out.println(msg); } }
4.5.测试
5.RabbitMQ三大应用场景(异步提速、解耦、削峰填谷)
5.1.异步提速
5.2.解耦
5.3.削峰填谷