rabbitmq高并发RPC调用,你Get到了吗?

本文涉及的产品
性能测试 PTS,5000VUM额度
简介: rabbitmq高并发RPC调用,你Get到了吗?

今天给大家介绍下rabbitmq中很重要的一个功能,RPC调用。


RPC,即Remote Procedure Call的简称,也就是远程过程调用,是一种通过网络从远程计算机上请求服务,而不需要了解底层网络的技术。比如两台服务器上的A和B两个应用,需要进行服务接口的相互调用,我们就可以使用RPC实现。比如常见的Java RMI、WebService、Dubbo都可以

实现RPC调用。


rabbitmq实现的RPC调用主要是简单,不用管各种复杂的网络协议,客户端发送消息,消费者消费消息,反馈消息到回复队列Reply中,然后客户端获取反馈的结果。


一、原理



1.png


流程说明:

1、对于一个RPC请求,客户端发送一条消息,该消息具有两个重要属性:replyTo(设置为仅为该请求创建的匿名互斥队列,答复队列)和correlationId(设置为每个请求的唯一值)。


2、该请求被发送到rpc_queue队列。


3、RPC工作程序(消息消费者)会监听该队列的消息。监听到有新的消息后,会根据消息执行响应的逻辑,然后将结果返回到消息中携带的replyTo指定的答复队列中。


4、客户端(消息生产者)等待答复队列中的数据,出现出现后,它会检查correlationId属性是否一致,如果匹配,则将响应结果返回给应用程序。


二、rpc的三种调用方式


之后官网就针对使用Spring AMQP实现RPC调用给出了一个简单的 Tut6Server.java示例,但真心太简单,只能作为入门的参考demo。

之后分析通过查看rabbitTemplate.sendAndReceive()方法的源码,Spring AMQP支持3中RPC调用实现。

分别是:


1、doSendAndReceiveWithDirect 直接反馈

2、doSendAndReceiveWithFixed 使用固定队列答复

3、doSendAndReceiveWithTemporary 使用临时队列答复


根据源码,对着三种方式的排序不难看出,对三者的推荐顺序为:

doSendAndReceiveWithDirect 》 doSendAndReceiveWithFixed》doSendAndReceiveWithTemporary

直接反馈无疑是最快最资源消耗最少的,固定队列会声明指定的的队列用来接收答复,

而使用临时队列来接收答复是最消耗资源,性能也是最差的,因为队列的声明,建立,销毁会消耗大。

@Nullable
    protected Message doSendAndReceive(String exchange, String routingKey, Message message, @Nullable CorrelationData correlationData) {
        if (!this.evaluatedFastReplyTo) {
            synchronized(this) {
                if (!this.evaluatedFastReplyTo) {
                    this.evaluateFastReplyTo();
                }
            }
        }
        if (this.usingFastReplyTo && this.useDirectReplyToContainer) {
            return this.doSendAndReceiveWithDirect(exchange, routingKey, message, correlationData);
        } else {
            return this.replyAddress != null && !this.usingFastReplyTo ? this.doSendAndReceiveWithFixed(exchange, routingKey, message, correlationData) : this.doSendAndReceiveWithTemporary(exchange, routingKey, message, correlationData);
        }
    }


三、代码实战


添加依赖:


<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>

生产者代码:

/**
 * @program: rabbitmq
 * @description: 交换器常量
 * @author: laowan
 * @create: 2019-06-13 17:36
 **/
@Getter
public enum ExchangeEnum {
    DIRECT_EXCHANGE("direct"),
    FIXED_EXCHANGE("fixed"),
    TMP_EXCHANGE("tmp");
    private String value;
    ExchangeEnum(String value) {
        this.value = value;
    }
}
/**
 * @program: rabbitmq
 * @description: 队列枚举
 * @author: laowan
 * @create: 2019-06-13 17:37
 **/
@Getter
public enum QueueEnum {
    //direct模式
    DIRECT_REQUEST("direct.request", "direct"),
    //固定队列应答模式
    FIXED_REQUEST("fixed.request", "fixed"),
    FIXED_RESPONSE("fixed.response", ""),
    //临时模式  消息发送到的队列
    TMP_REQUEST("tmp.request", "tmp")
   ;
    /**
     * 队列名称
     */
    private String name;
    /**
     * 队列路由键
     */
    private String routingKey;
    QueueEnum(String name, String routingKey) {
        this.name = name;
        this.routingKey = routingKey;
    }
}
/**
 * @program: rpc-parent
 * @description: direct   rpc请求模式
 * @author: laowan
 * @create: 2020-04-09 18:05
 **/
@Configuration
@Slf4j
public class DirectReplyConfig {
    /**
     * 注意bean的名称是由方法名决定的,所以不能重复
     * @return
     */
    @Bean
    public Queue directRequest() {
        return new Queue(QueueEnum.DIRECT_REQUEST.getName(), true);
    }
    @Bean
    public DirectExchange directExchange() {
        return new DirectExchange(ExchangeEnum.DIRECT_EXCHANGE.getValue());
    }
    @Bean
    public Binding directBinding() {
        return BindingBuilder.bind(directRequest()).to(directExchange()).with(QueueEnum.DIRECT_REQUEST.getRoutingKey());
    }
    /**
     * 当进行多个主题队列消费时,最好对每个单独定义RabbitTemplate,以便将各自的参数分别控制
     * @param connectionFactory
     * @return
     */
    @Bean
    public RabbitTemplate directRabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        template.setMessageConverter(new Jackson2JsonMessageConverter());
        //这一步非常关键
        template.setUseTemporaryReplyQueues(false);
        template.setReplyAddress("amq.rabbitmq.reply-to");
       // template.expectedQueueNames();
        template.setUserCorrelationId(true);
        //设置请求超时时间为10s
        template.setReplyTimeout(10000);
        return template;
    }
}


DirectProducer 生产者代码


@Component
@Slf4j
public class DirectProducer {
    @Autowired
    private RabbitTemplate directRabbitTemplate;
    public String sendAndReceive(String request) throws TimeoutException {
        log.info("请求报文:{}" , request);
        //请求结果
        String result = null;
        //设置消息唯一id
        CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
        //直接发送message对象
        MessageProperties messageProperties = new MessageProperties();
        //过期时间10秒,也是为了减少消息挤压的可能
        messageProperties.setExpiration("10000");
        messageProperties.setCorrelationId(correlationId.getId());
        Message message = new Message(request.getBytes(), messageProperties);
        StopWatch stopWatch = new StopWatch();
        stopWatch.start("direct模式下rpc请求耗时");
        Message response = directRabbitTemplate.sendAndReceive(ExchangeEnum.DIRECT_EXCHANGE.getValue(), QueueEnum.DIRECT_REQUEST.getRoutingKey(), message, correlationId);
        stopWatch.stop();
        log.info(stopWatch.getLastTaskName()+":" + stopWatch.getTotalTimeMillis());
        if (response != null) {
            result = new String(response.getBody());
            log.info("请求成功,返回的结果为:{}" , result);
        }else{
            log.error("请求超时");
            //为了方便jmeter测试,这里抛出异常
            throw  new TimeoutException("请求超时");
        }
        return result;
    }
}


四、Fixed reply-to模式


Fixed 配置类

/**
 * @program: rpc-parent
 * @description: Fixed   rpc请求模式
 * @author: wanli
 * @create: 2020-04-09 18:05
 **/
@Configuration
@Slf4j
public class FixedReplyConfig {
    @Bean
    public Queue fixedRequest() {
        return new Queue(QueueEnum.FIXED_REQUEST.getName(), true);
    }
    @Bean
    public DirectExchange fixedExchange() {
        return new DirectExchange(ExchangeEnum.FIXED_EXCHANGE.getValue());
    }
    @Bean
    public Binding fixedBinding() {
        return BindingBuilder.bind(fixedRequest()).to(fixedExchange()).with(QueueEnum.FIXED_REQUEST.getRoutingKey());
    }
    /**
     * 注意,固定模式指定的应答队列  exclusive排他属性设置为true,且能自动删除
     * @return
     */
    @Bean
    public Queue fixedResponseQueue() {
        return new Queue(QueueEnum.FIXED_RESPONSE.getName(),false,true,true,new HashMap<>());
    }
    @Bean
    public RabbitTemplate fixedRabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        template.setMessageConverter(new Jackson2JsonMessageConverter());
        //设置固定的Reply 地址
        template.setUseTemporaryReplyQueues(false);
        template.setReplyAddress(QueueEnum.FIXED_RESPONSE.getName());
        template.expectedQueueNames();
        template.setUserCorrelationId(true);
        //设置请求超时时间为10s
        template.setReplyTimeout(10000);
        return template;
    }
    @Bean
    public SimpleMessageListenerContainer fixedListenerContainer(ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        //这一步非常重要,固定队列模式要,一定要主动设置  SimpleMessageListenerContainer监听容器,监听应答队列
        container.setQueueNames(QueueEnum.FIXED_RESPONSE.getName());
        container.setMessageListener(fixedRabbitTemplate(connectionFactory));
        container.setConcurrentConsumers(100);
        container.setConcurrentConsumers(100);
        container.setPrefetchCount(250);
        return container;
    }
}


FixedProducer生产者

@Component
@Slf4j
public class FixedProducer {
    @Autowired
    private RabbitTemplate fixedRabbitTemplate;
    public String sendAndReceive(String request) throws TimeoutException {
        log.info("请求报文:{}" , request);
        //请求结果
        String result = null;
        //设置消息唯一id
        CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
        //直接发送message对象
        MessageProperties messageProperties = new MessageProperties();
        //过期时间10秒
        messageProperties.setExpiration("10000");
        messageProperties.setCorrelationId(correlationId.getId());
        Message message = new Message(request.getBytes(), messageProperties);
        StopWatch stopWatch = new StopWatch();
        stopWatch.start("fixed模式下rpc请求耗时");
        Message response = fixedRabbitTemplate.sendAndReceive(ExchangeEnum.FIXED_EXCHANGE.getValue(), QueueEnum.FIXED_REQUEST.getRoutingKey(), message, correlationId);
        stopWatch.stop();
        log.info(stopWatch.getLastTaskName()+":" + stopWatch.getTotalTimeMillis());
        if (response != null) {
            result = new String(response.getBody());
            log.info("请求成功,返回的结果为:{}" , result);
        }else{
            //为了方便jmeter测试,这里抛出异常
            throw  new TimeoutException("请求超时");
        }
        return result;
    }
}


五、Temporary reply-to模式


/**
 * @program: rpc-parent
 * @description: Temporary应答模式
 * @author: laowan
 * @create: 2020-04-09 18:05
 **/
@Configuration
@Slf4j
public class TmpReplyConfig {
    @Bean
    public Queue tmpRequest() {
        return new Queue(QueueEnum.TMP_REQUEST.getName(), true);
    }
    @Bean
    public DirectExchange tmpExchange() {
        return new DirectExchange(ExchangeEnum.TMP_EXCHANGE.getValue());
    }
    @Bean
    public Binding tmpBinding() {
        return BindingBuilder.bind(tmpRequest()).to(tmpExchange()).with(QueueEnum.TMP_REQUEST.getRoutingKey());
    }
    @Bean
    public RabbitTemplate tmpRabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        template.setMessageConverter(new Jackson2JsonMessageConverter());
        template.setUseTemporaryReplyQueues(true);
        template.setUserCorrelationId(true);
        //设置请求超时时间为10s
        template.setReplyTimeout(10000);
        return template;
    }
}

TmpProducer生产者代码

@Component
@Slf4j
public class TmpProducer {
    @Autowired
    private RabbitTemplate tmpRabbitTemplate;
    public String sendAndReceive(String request) throws TimeoutException {
        log.info("请求报文:{}" , request);
        //请求结果
        String result = null;
        //设置消息唯一id
        CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
        //直接发送message对象
        MessageProperties messageProperties = new MessageProperties();
        //过期时间10秒
        messageProperties.setExpiration("10000");
        messageProperties.setCorrelationId(correlationId.getId());
        Message message = new Message(request.getBytes(), messageProperties);
        StopWatch stopWatch = new StopWatch();
        stopWatch.start("tmp模式下rpc请求耗时");
        Message response = tmpRabbitTemplate.sendAndReceive(ExchangeEnum.TMP_EXCHANGE.getValue(), QueueEnum.TMP_REQUEST.getRoutingKey(), message, correlationId);
        stopWatch.stop();
        log.info(stopWatch.getLastTaskName()+":" + stopWatch.getTotalTimeMillis());
        if (response != null) {
            result = new String(response.getBody());
            log.info("请求成功,返回的结果为:{}" , result);
        }else{
            log.error("请求超时");
            //为了方便jmeter测试,这里抛出异常
            throw  new TimeoutException("请求超时");
        }
        return result;
    }
}


生产者启动类:

@SpringBootApplication
@RestController
public class ProducerApplication {
    @Autowired
    DirectProducer directProducer;
    @Autowired
    FixedProducer fixedProducer;
    @Autowired
    TmpProducer tmpProducer;
    public static void main(String[] args) {
        SpringApplication.run(ProducerApplication.class, args);
    }
    @GetMapping("/direct")
    public String direct(String message) throws Exception {
        return directProducer.sendAndReceive(message);
    }
    @GetMapping("/fixed")
    public String fixed(String message) throws Exception {
        return fixedProducer.sendAndReceive(message);
    }
    @GetMapping("/tmp")
    public String tmp(String message) throws Exception {
        return tmpProducer.sendAndReceive(message);
    }
}


消费者基本类似,就附上DirectConsumer类的代码:

/**
 * @program: rabbitmq
 * @description: direct消费者
 * @author: wanli
 * @create: 2019-06-13 18:01
 **/
@Component
@RabbitListener(queues = "direct.request")
@Slf4j
public class DirectConsumer {
    @RabbitHandler
    public String onMessage(byte[] message,
                            @Headers Map<String, Object> headers,
                            Channel channel) {
        StopWatch stopWatch = new StopWatch("调用计时");
        stopWatch.start("rpc调用消费者耗时");
        String request = new String(message);
        String response = null;
        log.info("接收到的消息为:" + request);
        //模拟请求耗时3s
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        response= this.sayHello(request);
        log.info("返回的消息为:" + response);
        stopWatch.stop();
        log.info(stopWatch.getLastTaskName()+stopWatch.getTotalTimeMillis()+"ms");
        return response;
    }
    public String sayHello(String name){
        return "hello " + name;
    }
}

六、压测


通过对/direct,/fixed,/tmp三个接口使用JMeter压测,线程数1000,时间1s,

多次执行,比较发现:

direct和fixed的rpc方式调用的性能基本一致,差别不大,每分钟3500左右的并发

而tmp方式并发能力会弱会弱很多,大概3000并发左右。

并发请求时可以通过rabbitmq的管理界面明显看到tmp方式高并发时生成了非常多的临时队列。

性能:direct>=fixed>tmp,与之前根据源码和各自执行原理预期的执行性能基本一致


七、参数优化


生产者这边,在fix模式下,需要配置对应的SimpleMessageListenerContainer监听答复队列,可以适当增加消费者的并发数,并且提高每次抓取的消息数。

并且设置acknowledge-mode=auto自动ack。

@Bean
    public SimpleMessageListenerContainer fixedListenerContainer(ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        //这一步非常重要,固定队列模式要,一定要主动设置  SimpleMessageListenerContainer监听容器,监听应答队列
        container.setQueueNames(QueueEnum.FIXED_RESPONSE.getName());
        container.setMessageListener(fixedRabbitTemplate(connectionFactory));
        container.setConcurrentConsumers(100);
        container.setConcurrentConsumers(100);
        container.setPrefetchCount(250);
        return container;
    }

消费者这边,一定要注意设置消费者每次抓取的数量,如果每个消息消费比较耗时,一次抓取太多,就容易导致抓取的这一批消息被这个消费者串行消费的时候出现超时情况。这里我设置的是10,经过压测发现在高并发下,rpc响应出现延长,说明消费能力基本能满足。

#消费者的并发参数
spring.rabbitmq.listener.type=simple
spring.rabbitmq.listener.simple.concurrency=200
spring.rabbitmq.listener.simple.max-concurrency=500
#抓取参数非常关键,一次抓取的消息多了,消费速度一慢,就会造成响应延迟,抓取少了又会导致并发量低
spring.rabbitmq.listener.simple.prefetch=10
#可以不需要反馈
spring.rabbitmq.listener.simple.acknowledge-mode=none


七、问题


这里要吐槽一下,关于rabbitmq的RPC调用,网上的资料真到太少了,踩了不少坑。

坑一:

CORRECTION: The RabbitTemplate does not currently support Direct reply-to for sendAndReceive() operations; you can, however, specify a fixed reply queue (with a reply-listener). Or you can use rabbitTemplate.execute() with a ChannelCallback to consume the reply from that "queue" (and publish).
I have created a JIRA issue if you wish to track it.
1.4.1 and above now supports direct reply-to.

百度上找的资料太少,之后在google上找到上面的说明,大意是RabbitTemplate在sendAndReceive操作时不支持Direct reply-to调用

解决:

作为老鸟一枚,这里我就和他杠上了,偏偏不信这个邪,RabbitTemplate源码中明明可以搜索到'amq.rabbitmq.reply-to'相关判断以及doSendAndReceiveWithDirect的定义,怎么可能不支持?


坑二:

Broker does not support fast replies via 'amq.rabbitmq.reply-to'

Broker指的是我们的rabbitmq的服务节点,不支持通过'amq.rabbitmq.reply-to'进行快速返回。


解决:

当前版本rabbitmq的Broker不支持通过'amq.rabbitmq.reply-to'进行快速返回,那么就升级broker的版本。

3.3.5版本不支持创建amq.rabbitmq.reply-to虚拟队列,那就升级到3.7.8版本。


坑三:

Caused by: java.lang.IllegalStateException: A listener container must not be provided when using direct reply-to

解决:

指定名为“amq.rabbitmq.reply-to”的反馈地址后,不能再调用expectedQueueNames方法

template.setUseTemporaryReplyQueues(false);
        template.setReplyAddress("amq.rabbitmq.reply-to");
       // template.expectedQueueNames();
        template.setUserCorrelationId(true);

坑四:

压测过程中,并发一高,就容易出现rpc调用超时的问题。


解决:

增加消费者的并发数,减小消费者每次抓取的消息数。


总结


有些东西,百度不会告诉你,要看官网;

有些东西,官网不会告诉你,要看源码;

有些东西,源码不会告诉你,只能根据原理实践推敲;

最后,推敲不出来,可以找老万。


git源码地址:

https://github.com/StarlightWANLI/rabbitmq-rpc.git

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
2月前
|
消息中间件
RabbitMQ的 RPC 消息模式你会了吗?
【9月更文挑战第11天】RabbitMQ 的 RPC(远程过程调用)消息模式允许客户端向服务器发送请求并接收响应。其基本原理包括:1) 客户端发送请求,创建回调队列并设置关联标识符;2) 服务器接收请求并发送响应至回调队列;3) 客户端根据关联标识符接收并匹配响应。实现步骤涵盖客户端和服务器的连接、信道创建及请求处理。注意事项包括关联标识符唯一性、回调队列管理、错误处理及性能考虑。RPC 模式适用于构建可靠的分布式应用程序,但需根据需求调整优化。
|
1月前
|
消息中间件 前端开发 Java
java高并发场景RabbitMQ的使用
java高并发场景RabbitMQ的使用
77 0
|
6月前
|
消息中间件 缓存 API
|
6月前
|
消息中间件 存储 NoSQL
【Redis项目实战】使用Springcloud整合Redis分布式锁+RabbitMQ技术实现高并发预约管理处理系统
【Redis项目实战】使用Springcloud整合Redis分布式锁+RabbitMQ技术实现高并发预约管理处理系统
|
消息中间件 存储 缓存
远程调用RPC和消息MQ区别
远程调用RPC和消息MQ区别
118 0
|
Dubbo 应用服务中间件
Netty实现简单RPC调用
我们知道Dubbo是一个RPC框架,那RPC框架需要实现什么?需要实现的是调用远程服务和本地服务一样方便,同时提高调用远程服务的性能。而服务端和客户端之间的关系,其实就是一个生产和消费的关系。
115 0
Netty实现简单RPC调用
|
监控 前端开发 Java
Rpc 调用监控 | 学习笔记
快速学习 Rpc 调用监控
Rpc 调用监控 | 学习笔记
|
6月前
|
设计模式 负载均衡 网络协议
【分布式技术专题】「分布式技术架构」实践见真知,手把手教你如何实现一个属于自己的RPC框架(架构技术引导篇)
【分布式技术专题】「分布式技术架构」实践见真知,手把手教你如何实现一个属于自己的RPC框架(架构技术引导篇)
260 0
|
13天前
|
自然语言处理 负载均衡 API
gRPC 一种现代、开源、高性能的远程过程调用 (RPC) 可以在任何地方运行的框架
gRPC 是一种现代开源高性能远程过程调用(RPC)框架,支持多种编程语言,可在任何环境中运行。它通过高效的连接方式,支持负载平衡、跟踪、健康检查和身份验证,适用于微服务架构、移动设备和浏览器客户端连接后端服务等场景。gRPC 使用 Protocol Buffers 作为接口定义语言,支持四种服务方法:一元 RPC、服务器流式处理、客户端流式处理和双向流式处理。
|
3月前
|
Dubbo 网络协议 Java
RPC框架:一文带你搞懂RPC
这篇文章全面介绍了RPC(远程过程调用)的概念、原理和应用场景,解释了RPC如何工作以及为什么在分布式系统中广泛使用,并探讨了几种常用的RPC框架如Thrift、gRPC、Dubbo和Spring Cloud,同时详细阐述了RPC调用流程和实现透明化远程服务调用的关键技术,包括动态代理和消息的编码解码过程。
RPC框架:一文带你搞懂RPC