今天给大家介绍下rabbitmq中很重要的一个功能,RPC调用。
RPC,即Remote Procedure Call的简称,也就是远程过程调用,是一种通过网络从远程计算机上请求服务,而不需要了解底层网络的技术。比如两台服务器上的A和B两个应用,需要进行服务接口的相互调用,我们就可以使用RPC实现。比如常见的Java RMI、WebService、Dubbo都可以
实现RPC调用。
rabbitmq实现的RPC调用主要是简单,不用管各种复杂的网络协议,客户端发送消息,消费者消费消息,反馈消息到回复队列Reply中,然后客户端获取反馈的结果。
一、原理
流程说明:
1、对于一个RPC请求,客户端发送一条消息,该消息具有两个重要属性:replyTo(设置为仅为该请求创建的匿名互斥队列,答复队列)和correlationId(设置为每个请求的唯一值)。
2、该请求被发送到rpc_queue队列。
3、RPC工作程序(消息消费者)会监听该队列的消息。监听到有新的消息后,会根据消息执行响应的逻辑,然后将结果返回到消息中携带的replyTo指定的答复队列中。
4、客户端(消息生产者)等待答复队列中的数据,出现出现后,它会检查correlationId属性是否一致,如果匹配,则将响应结果返回给应用程序。
二、rpc的三种调用方式
之后官网就针对使用Spring AMQP实现RPC调用给出了一个简单的 Tut6Server.java示例,但真心太简单,只能作为入门的参考demo。
之后分析通过查看rabbitTemplate.sendAndReceive()方法的源码,Spring AMQP支持3中RPC调用实现。
分别是:
1、doSendAndReceiveWithDirect 直接反馈
2、doSendAndReceiveWithFixed 使用固定队列答复
3、doSendAndReceiveWithTemporary 使用临时队列答复
根据源码,对着三种方式的排序不难看出,对三者的推荐顺序为:
doSendAndReceiveWithDirect 》 doSendAndReceiveWithFixed》doSendAndReceiveWithTemporary
直接反馈无疑是最快最资源消耗最少的,固定队列会声明指定的的队列用来接收答复,
而使用临时队列来接收答复是最消耗资源,性能也是最差的,因为队列的声明,建立,销毁会消耗大。
@Nullable protected Message doSendAndReceive(String exchange, String routingKey, Message message, @Nullable CorrelationData correlationData) { if (!this.evaluatedFastReplyTo) { synchronized(this) { if (!this.evaluatedFastReplyTo) { this.evaluateFastReplyTo(); } } } if (this.usingFastReplyTo && this.useDirectReplyToContainer) { return this.doSendAndReceiveWithDirect(exchange, routingKey, message, correlationData); } else { return this.replyAddress != null && !this.usingFastReplyTo ? this.doSendAndReceiveWithFixed(exchange, routingKey, message, correlationData) : this.doSendAndReceiveWithTemporary(exchange, routingKey, message, correlationData); } }
三、代码实战
添加依赖:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency>
生产者代码:
/** * @program: rabbitmq * @description: 交换器常量 * @author: laowan * @create: 2019-06-13 17:36 **/ @Getter public enum ExchangeEnum { DIRECT_EXCHANGE("direct"), FIXED_EXCHANGE("fixed"), TMP_EXCHANGE("tmp"); private String value; ExchangeEnum(String value) { this.value = value; } }
/** * @program: rabbitmq * @description: 队列枚举 * @author: laowan * @create: 2019-06-13 17:37 **/ @Getter public enum QueueEnum { //direct模式 DIRECT_REQUEST("direct.request", "direct"), //固定队列应答模式 FIXED_REQUEST("fixed.request", "fixed"), FIXED_RESPONSE("fixed.response", ""), //临时模式 消息发送到的队列 TMP_REQUEST("tmp.request", "tmp") ; /** * 队列名称 */ private String name; /** * 队列路由键 */ private String routingKey; QueueEnum(String name, String routingKey) { this.name = name; this.routingKey = routingKey; } }
/** * @program: rpc-parent * @description: direct rpc请求模式 * @author: laowan * @create: 2020-04-09 18:05 **/ @Configuration @Slf4j public class DirectReplyConfig { /** * 注意bean的名称是由方法名决定的,所以不能重复 * @return */ @Bean public Queue directRequest() { return new Queue(QueueEnum.DIRECT_REQUEST.getName(), true); } @Bean public DirectExchange directExchange() { return new DirectExchange(ExchangeEnum.DIRECT_EXCHANGE.getValue()); } @Bean public Binding directBinding() { return BindingBuilder.bind(directRequest()).to(directExchange()).with(QueueEnum.DIRECT_REQUEST.getRoutingKey()); } /** * 当进行多个主题队列消费时,最好对每个单独定义RabbitTemplate,以便将各自的参数分别控制 * @param connectionFactory * @return */ @Bean public RabbitTemplate directRabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate template = new RabbitTemplate(connectionFactory); template.setMessageConverter(new Jackson2JsonMessageConverter()); //这一步非常关键 template.setUseTemporaryReplyQueues(false); template.setReplyAddress("amq.rabbitmq.reply-to"); // template.expectedQueueNames(); template.setUserCorrelationId(true); //设置请求超时时间为10s template.setReplyTimeout(10000); return template; } }
DirectProducer 生产者代码
@Component @Slf4j public class DirectProducer { @Autowired private RabbitTemplate directRabbitTemplate; public String sendAndReceive(String request) throws TimeoutException { log.info("请求报文:{}" , request); //请求结果 String result = null; //设置消息唯一id CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString()); //直接发送message对象 MessageProperties messageProperties = new MessageProperties(); //过期时间10秒,也是为了减少消息挤压的可能 messageProperties.setExpiration("10000"); messageProperties.setCorrelationId(correlationId.getId()); Message message = new Message(request.getBytes(), messageProperties); StopWatch stopWatch = new StopWatch(); stopWatch.start("direct模式下rpc请求耗时"); Message response = directRabbitTemplate.sendAndReceive(ExchangeEnum.DIRECT_EXCHANGE.getValue(), QueueEnum.DIRECT_REQUEST.getRoutingKey(), message, correlationId); stopWatch.stop(); log.info(stopWatch.getLastTaskName()+":" + stopWatch.getTotalTimeMillis()); if (response != null) { result = new String(response.getBody()); log.info("请求成功,返回的结果为:{}" , result); }else{ log.error("请求超时"); //为了方便jmeter测试,这里抛出异常 throw new TimeoutException("请求超时"); } return result; } }
四、Fixed reply-to模式
Fixed 配置类
/** * @program: rpc-parent * @description: Fixed rpc请求模式 * @author: wanli * @create: 2020-04-09 18:05 **/ @Configuration @Slf4j public class FixedReplyConfig { @Bean public Queue fixedRequest() { return new Queue(QueueEnum.FIXED_REQUEST.getName(), true); } @Bean public DirectExchange fixedExchange() { return new DirectExchange(ExchangeEnum.FIXED_EXCHANGE.getValue()); } @Bean public Binding fixedBinding() { return BindingBuilder.bind(fixedRequest()).to(fixedExchange()).with(QueueEnum.FIXED_REQUEST.getRoutingKey()); } /** * 注意,固定模式指定的应答队列 exclusive排他属性设置为true,且能自动删除 * @return */ @Bean public Queue fixedResponseQueue() { return new Queue(QueueEnum.FIXED_RESPONSE.getName(),false,true,true,new HashMap<>()); } @Bean public RabbitTemplate fixedRabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate template = new RabbitTemplate(connectionFactory); template.setMessageConverter(new Jackson2JsonMessageConverter()); //设置固定的Reply 地址 template.setUseTemporaryReplyQueues(false); template.setReplyAddress(QueueEnum.FIXED_RESPONSE.getName()); template.expectedQueueNames(); template.setUserCorrelationId(true); //设置请求超时时间为10s template.setReplyTimeout(10000); return template; } @Bean public SimpleMessageListenerContainer fixedListenerContainer(ConnectionFactory connectionFactory) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory); //这一步非常重要,固定队列模式要,一定要主动设置 SimpleMessageListenerContainer监听容器,监听应答队列 container.setQueueNames(QueueEnum.FIXED_RESPONSE.getName()); container.setMessageListener(fixedRabbitTemplate(connectionFactory)); container.setConcurrentConsumers(100); container.setConcurrentConsumers(100); container.setPrefetchCount(250); return container; } }
FixedProducer生产者
@Component @Slf4j public class FixedProducer { @Autowired private RabbitTemplate fixedRabbitTemplate; public String sendAndReceive(String request) throws TimeoutException { log.info("请求报文:{}" , request); //请求结果 String result = null; //设置消息唯一id CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString()); //直接发送message对象 MessageProperties messageProperties = new MessageProperties(); //过期时间10秒 messageProperties.setExpiration("10000"); messageProperties.setCorrelationId(correlationId.getId()); Message message = new Message(request.getBytes(), messageProperties); StopWatch stopWatch = new StopWatch(); stopWatch.start("fixed模式下rpc请求耗时"); Message response = fixedRabbitTemplate.sendAndReceive(ExchangeEnum.FIXED_EXCHANGE.getValue(), QueueEnum.FIXED_REQUEST.getRoutingKey(), message, correlationId); stopWatch.stop(); log.info(stopWatch.getLastTaskName()+":" + stopWatch.getTotalTimeMillis()); if (response != null) { result = new String(response.getBody()); log.info("请求成功,返回的结果为:{}" , result); }else{ //为了方便jmeter测试,这里抛出异常 throw new TimeoutException("请求超时"); } return result; } }
五、Temporary reply-to模式
/** * @program: rpc-parent * @description: Temporary应答模式 * @author: laowan * @create: 2020-04-09 18:05 **/ @Configuration @Slf4j public class TmpReplyConfig { @Bean public Queue tmpRequest() { return new Queue(QueueEnum.TMP_REQUEST.getName(), true); } @Bean public DirectExchange tmpExchange() { return new DirectExchange(ExchangeEnum.TMP_EXCHANGE.getValue()); } @Bean public Binding tmpBinding() { return BindingBuilder.bind(tmpRequest()).to(tmpExchange()).with(QueueEnum.TMP_REQUEST.getRoutingKey()); } @Bean public RabbitTemplate tmpRabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate template = new RabbitTemplate(connectionFactory); template.setMessageConverter(new Jackson2JsonMessageConverter()); template.setUseTemporaryReplyQueues(true); template.setUserCorrelationId(true); //设置请求超时时间为10s template.setReplyTimeout(10000); return template; } }
TmpProducer生产者代码
@Component @Slf4j public class TmpProducer { @Autowired private RabbitTemplate tmpRabbitTemplate; public String sendAndReceive(String request) throws TimeoutException { log.info("请求报文:{}" , request); //请求结果 String result = null; //设置消息唯一id CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString()); //直接发送message对象 MessageProperties messageProperties = new MessageProperties(); //过期时间10秒 messageProperties.setExpiration("10000"); messageProperties.setCorrelationId(correlationId.getId()); Message message = new Message(request.getBytes(), messageProperties); StopWatch stopWatch = new StopWatch(); stopWatch.start("tmp模式下rpc请求耗时"); Message response = tmpRabbitTemplate.sendAndReceive(ExchangeEnum.TMP_EXCHANGE.getValue(), QueueEnum.TMP_REQUEST.getRoutingKey(), message, correlationId); stopWatch.stop(); log.info(stopWatch.getLastTaskName()+":" + stopWatch.getTotalTimeMillis()); if (response != null) { result = new String(response.getBody()); log.info("请求成功,返回的结果为:{}" , result); }else{ log.error("请求超时"); //为了方便jmeter测试,这里抛出异常 throw new TimeoutException("请求超时"); } return result; } }
生产者启动类:
@SpringBootApplication @RestController public class ProducerApplication { @Autowired DirectProducer directProducer; @Autowired FixedProducer fixedProducer; @Autowired TmpProducer tmpProducer; public static void main(String[] args) { SpringApplication.run(ProducerApplication.class, args); } @GetMapping("/direct") public String direct(String message) throws Exception { return directProducer.sendAndReceive(message); } @GetMapping("/fixed") public String fixed(String message) throws Exception { return fixedProducer.sendAndReceive(message); } @GetMapping("/tmp") public String tmp(String message) throws Exception { return tmpProducer.sendAndReceive(message); } }
消费者基本类似,就附上DirectConsumer类的代码:
/** * @program: rabbitmq * @description: direct消费者 * @author: wanli * @create: 2019-06-13 18:01 **/ @Component @RabbitListener(queues = "direct.request") @Slf4j public class DirectConsumer { @RabbitHandler public String onMessage(byte[] message, @Headers Map<String, Object> headers, Channel channel) { StopWatch stopWatch = new StopWatch("调用计时"); stopWatch.start("rpc调用消费者耗时"); String request = new String(message); String response = null; log.info("接收到的消息为:" + request); //模拟请求耗时3s try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } response= this.sayHello(request); log.info("返回的消息为:" + response); stopWatch.stop(); log.info(stopWatch.getLastTaskName()+stopWatch.getTotalTimeMillis()+"ms"); return response; } public String sayHello(String name){ return "hello " + name; } }
六、压测
通过对/direct,/fixed,/tmp三个接口使用JMeter压测,线程数1000,时间1s,
多次执行,比较发现:
direct和fixed的rpc方式调用的性能基本一致,差别不大,每分钟3500左右的并发
而tmp方式并发能力会弱会弱很多,大概3000并发左右。
并发请求时可以通过rabbitmq的管理界面明显看到tmp方式高并发时生成了非常多的临时队列。
性能:direct>=fixed>tmp,与之前根据源码和各自执行原理预期的执行性能基本一致
七、参数优化
生产者这边,在fix模式下,需要配置对应的SimpleMessageListenerContainer监听答复队列,可以适当增加消费者的并发数,并且提高每次抓取的消息数。
并且设置acknowledge-mode=auto自动ack。
@Bean public SimpleMessageListenerContainer fixedListenerContainer(ConnectionFactory connectionFactory) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory); //这一步非常重要,固定队列模式要,一定要主动设置 SimpleMessageListenerContainer监听容器,监听应答队列 container.setQueueNames(QueueEnum.FIXED_RESPONSE.getName()); container.setMessageListener(fixedRabbitTemplate(connectionFactory)); container.setConcurrentConsumers(100); container.setConcurrentConsumers(100); container.setPrefetchCount(250); return container; }
消费者这边,一定要注意设置消费者每次抓取的数量,如果每个消息消费比较耗时,一次抓取太多,就容易导致抓取的这一批消息被这个消费者串行消费的时候出现超时情况。这里我设置的是10,经过压测发现在高并发下,rpc响应出现延长,说明消费能力基本能满足。
#消费者的并发参数 spring.rabbitmq.listener.type=simple spring.rabbitmq.listener.simple.concurrency=200 spring.rabbitmq.listener.simple.max-concurrency=500 #抓取参数非常关键,一次抓取的消息多了,消费速度一慢,就会造成响应延迟,抓取少了又会导致并发量低 spring.rabbitmq.listener.simple.prefetch=10 #可以不需要反馈 spring.rabbitmq.listener.simple.acknowledge-mode=none
七、问题
这里要吐槽一下,关于rabbitmq的RPC调用,网上的资料真到太少了,踩了不少坑。
坑一:
CORRECTION: The RabbitTemplate does not currently support Direct reply-to for sendAndReceive() operations; you can, however, specify a fixed reply queue (with a reply-listener). Or you can use rabbitTemplate.execute() with a ChannelCallback to consume the reply from that "queue" (and publish). I have created a JIRA issue if you wish to track it. 1.4.1 and above now supports direct reply-to.
百度上找的资料太少,之后在google上找到上面的说明,大意是RabbitTemplate在sendAndReceive操作时不支持Direct reply-to调用
解决:
作为老鸟一枚,这里我就和他杠上了,偏偏不信这个邪,RabbitTemplate源码中明明可以搜索到'amq.rabbitmq.reply-to'相关判断以及doSendAndReceiveWithDirect的定义,怎么可能不支持?
坑二:
Broker does not support fast replies via 'amq.rabbitmq.reply-to'
Broker指的是我们的rabbitmq的服务节点,不支持通过'amq.rabbitmq.reply-to'进行快速返回。
解决:
当前版本rabbitmq的Broker不支持通过'amq.rabbitmq.reply-to'进行快速返回,那么就升级broker的版本。
3.3.5版本不支持创建amq.rabbitmq.reply-to虚拟队列,那就升级到3.7.8版本。
坑三:
Caused by: java.lang.IllegalStateException: A listener container must not be provided when using direct reply-to
解决:
指定名为“amq.rabbitmq.reply-to”的反馈地址后,不能再调用expectedQueueNames方法
template.setUseTemporaryReplyQueues(false); template.setReplyAddress("amq.rabbitmq.reply-to"); // template.expectedQueueNames(); template.setUserCorrelationId(true);
坑四:
压测过程中,并发一高,就容易出现rpc调用超时的问题。
解决:
增加消费者的并发数,减小消费者每次抓取的消息数。
总结
有些东西,百度不会告诉你,要看官网;
有些东西,官网不会告诉你,要看源码;
有些东西,源码不会告诉你,只能根据原理实践推敲;
最后,推敲不出来,可以找老万。
git源码地址: