服务器的异步通信——RabbitMQ2

简介: 服务器的异步通信——RabbitMQ

服务器的异步通信——RabbitMQ1:https://developer.aliyun.com/article/1521829

工作消息队列(WorkQueue)

下面场景中如果queue中有50条请求消息,但是consumer1只能处理40条,剩余的10条就可以由consumer进行处理,所以说工作消息队列可以提高消息的处理速度,避免队列消息堆积


模拟Workqueue,实现一个队列绑定多个消费者,基本实现思路如下:

  1. 在publisher服务中定义测试方法,每秒产生50条消息,发送到simple.queue中
  2. 在consumer服务中定义两个消息监听者,都监听simple.queue队列
  1. 消费者1每秒处理50条消息,消费者2每秒处理10条消息

代码实现:

在publisher服务中定义测试方法,每秒产生50条消息,发送到simple.queue中

    public void testSendMessage2WorkQueue() throws InterruptedException {
        String queueName = "simple.queue";
        String message = "hello, message__";
        for (int i = 1; i <= 50; i++) {
            rabbitTemplate.convertAndSend(queueName, message + i);
            Thread.sleep(20);
        }
    }

在consumer服务中定义两个消息监听者,都监听simple.queue队列,设置消费者1每秒处理50条消息,消费者2每秒处理10条消息

    @RabbitListener(queues = "simple.queue")
    public void listenWorkQueue1(String msg) throws InterruptedException {
        System.out.println("消费者1接收到消息:【" + msg + "】" + LocalTime.now());
        Thread.sleep(20);
    }
 
    @RabbitListener(queues = "simple.queue")
    public void listenWorkQueue2(String msg) throws InterruptedException {
        System.err.println("消费者2........接收到消息:【" + msg + "】" + LocalTime.now());
        Thread.sleep(200);
    }

修改application.yml文件,设置preFetch这个值,可以控制预取消息的上限,确保消费者2取消息时只能取一条,提高效率(“能者多劳”):

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1

运行结果:

发布、订阅(Publish、Subscribe)

发布订阅模式与之前案例的区别就是允许将同一消息发送给多个消费者。实现方式是加入了exchange(交换机)。

常见exchange类型包括:

  • Fanout:广播
  • Direct:路由
  • Topic:话题

exchange负责消息路由,而不是存储,路由失败则消息丢失

Fanout Exchange

Fanout Exchange会将接收到的消息路由到每一个跟其绑定的queue中,如下:

基本实现思路如下:

  1. 在consumer中,利用代码声明队列、交换机,将二者进行绑定
  2. 在consumer中,编写两个消费方法,分别监听fanout.queue1和fanout.queue2
  1. 在publisher中编写测试方法,向fanout发送消息

代码实现:

在consumer中,利用代码声明队列、交换机,将二者进行绑定

@Configuration
public class FanoutConfig {
    // itcast.fanout
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("itcast.fanout");
    }
 
    // fanout.queue1
    @Bean
    public Queue fanoutQueue1(){
        return new Queue("fanout.queue1");
    }
 
    // 绑定队列1到交换机
    @Bean
    public Binding fanoutBinding1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
    }
 
    // fanout.queue2
    @Bean
    public Queue fanoutQueue2(){
        return new Queue("fanout.queue2");
    }
 
    // 绑定队列2到交换机
    @Bean
    public Binding fanoutBinding2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
    }
}

在consumer中,编写两个消费方法,分别监听fanout.queue1和fanout.queue2

    @RabbitListener(queues = "fanout.queue1")
    public void listenFanoutQueue1(String msg) {
        System.out.println("消费者接收到fanout.queue1的消息:【" + msg + "】");
    }
    @RabbitListener(queues = "fanout.queue2")
    public void listenFanoutQueue2(String msg) {
        System.out.println("消费者接收到fanout.queue2的消息:【" + msg + "】");
    }

在publisher中编写测试方法,向fanout发送消息

    @Test
    public void testSendFanoutExchange() {
        // 交换机名称
        String exchangeName = "itcast.fanout";
        // 消息
        String message = "hello, every one!";
        // 发送消息
        rabbitTemplate.convertAndSend(exchangeName, "", message);
    }

运行结果:

Direct Exchange

Direct Exchange会将接收到的消息根据规则路由到指定的Queue,因此被称为路由模式

  • l每一个Queue都与Exchange设置一个BindingKey
  • l发布者发送消息时,指定消息的RoutingKey
  • lExchange将消息路由到BindingKey与消息RoutingKey一致的队列

基本实现思路如下:

  1. 利用@RabbitListener声明ExchangeQueueRoutingKey
  2. consumer服务中,编写两个消费者方法,分别监听direct.queue1direct.queue2
  3. publisher中编写测试方法,向itcast. direct发送消息


代码实现:

在consumer服务中,编写两个消费者方法,分别监听direct.queue1direct.queue2,并利用@RabbitListener声明ExchangeQueueRoutingKey

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue1"),
            exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),
            key = {"red", "blue"}
    ))
    public void listenDirectQueue1(String msg){
        System.out.println("消费者接收到direct.queue1的消息:【" + msg + "】");
    }
 
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue2"),
            exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),
            key = {"red", "yellow"}
    ))
    public void listenDirectQueue2(String msg){
        System.out.println("消费者接收到direct.queue2的消息:【" + msg + "】");
    }

在publisher服务发送消息到DirectExchange

    @Test
    public void testSendDirectExchange() {
        // 交换机名称
        String exchangeName = "itcast.direct";
        // 消息
        String message = "hello, red!";
        // 发送消息
        rabbitTemplate.convertAndSend(exchangeName, "red", message);
    }

运行结果:

Topic Exchange

Topic Exchange与Direct Exchange类似,区别在于Topic Exchange的routingKey必须是多个单词的列表,并且以.分割

QueueExchange指定BindingKey时可以使用通配符:

#:代指0个或多个单词

*:代指一个单词

基本实现思路如下:

  1. 利用@RabbitListener声明ExchangeQueueRoutingKey
  2. 在consumer服务中,编写两个消费者方法,分别监听topic.queue1topic.queue2
  1. 在publisher中编写测试方法,向itcast. topic发送消息

代码实现:

利用@RabbitListener声明ExchangeQueueRoutingKey,在consumer服务中,编写两个消费者方法,分别监听topic.queue1topic.queue2

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "topic.queue1"),
            exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),
            key = "china.#"
    ))
    public void listenTopicQueue1(String msg){
        System.out.println("消费者接收到topic.queue1的消息:【" + msg + "】");
    }
 
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "topic.queue2"),
            exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),
            key = "#.news"
    ))
    public void listenTopicQueue2(String msg){
        System.out.println("消费者接收到topic.queue2的消息:【" + msg + "】");
    }

在publisher中编写测试方法,向itcast. topic发送消息

    @Test
    public void testSendTopicExchange() {
        // 交换机名称
        String exchangeName = "itcast.topic";
        // 消息
        String message = "今天天气不错,我的心情好极了!";
        // 发送消息
        rabbitTemplate.convertAndSend(exchangeName, "china.weather", message);
    }

运行结果:

SpringAMQP-消息转换器

SpringAMQP的发送方法中,接收消息的类型是Object,也就是说我们可以发送任意对象类型的消息,SpringAMQP会帮我们序列化为字节后发送。

Spring的对消息对象的处理是由org.springframework.amqp.support.converter.MessageConverter来处理的。而默认实现是SimpleMessageConverter,基于JDK的ObjectOutputStream完成序列化。


如果要修改只需要定义一个MessageConverter 类型的Bean即可。


推荐用JSON方式序列化,实现步骤如下:

在父工程中引入依赖

        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
        </dependency>

在publisher和consumer服务中声明MessageConverter:

    @Bean
    public MessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }

相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
5天前
|
缓存 监控 Java
Java Socket编程最佳实践:优化客户端-服务器通信性能
【6月更文挑战第21天】Java Socket编程优化涉及识别性能瓶颈,如网络延迟和CPU计算。使用非阻塞I/O(NIO)和多路复用技术提升并发处理能力,减少线程上下文切换。缓存利用可减少I/O操作,异步I/O(AIO)进一步提高效率。持续监控系统性能是关键。通过实践这些策略,开发者能构建高效稳定的通信系统。
|
5天前
|
Java 应用服务中间件 开发者
【实战指南】Java Socket编程:构建高效的客户端-服务器通信
【6月更文挑战第21天】Java Socket编程用于构建客户端-服务器通信。`Socket`和`ServerSocket`类分别处理两端的连接。实战案例展示了一个简单的聊天应用,服务器监听端口,接收客户端连接,并使用多线程处理每个客户端消息。客户端连接服务器,发送并接收消息。了解这些基础,加上错误处理和优化,能帮你开始构建高效网络应用。
|
5天前
|
IDE Java 开发工具
从零开始学Java Socket编程:客户端与服务器通信实战
【6月更文挑战第21天】Java Socket编程教程带你从零开始构建简单的客户端-服务器通信。安装JDK后,在命令行分别运行`SimpleServer`和`SimpleClient`。服务器监听端口,接收并回显客户端消息;客户端连接服务器,发送“Hello, Server!”并显示服务器响应。这是网络通信基础,为更复杂的网络应用打下基础。开始你的Socket编程之旅吧!
|
5天前
|
Java
Java Socket编程与多线程:提升客户端-服务器通信的并发性能
【6月更文挑战第21天】Java网络编程中,Socket结合多线程提升并发性能,服务器对每个客户端连接启动新线程处理,如示例所示,实现每个客户端的独立操作。多线程利用多核处理器能力,避免串行等待,提升响应速度。防止死锁需减少共享资源,统一锁定顺序,使用超时和重试策略。使用synchronized、ReentrantLock等维持数据一致性。多线程带来性能提升的同时,也伴随复杂性和挑战。
|
5天前
|
安全 Java 网络安全
Java Socket编程教程:构建安全可靠的客户端-服务器通信
【6月更文挑战第21天】构建安全的Java Socket通信涉及SSL/TLS加密、异常处理和重连策略。示例中,`SecureServer`使用SSLServerSocketFactory创建加密连接,而`ReliableClient`展示异常捕获与自动重连。理解安全意识,如防数据截获和中间人攻击,是首要步骤。通过良好的编程实践,确保网络应用在复杂环境中稳定且安全。
|
5天前
|
Java 数据安全/隐私保护
深入剖析:Java Socket编程原理及客户端-服务器通信机制
【6月更文挑战第21天】Java Socket编程用于构建网络通信,如在线聊天室。服务器通过`ServerSocket`监听,接收客户端`Socket`连接请求。客户端使用`Socket`连接服务器,双方通过`PrintWriter`和`BufferedReader`交换数据。案例展示了服务器如何处理每个新连接并广播消息,以及客户端如何发送和接收消息。此基础为理解更复杂的网络应用奠定了基础。
|
5天前
|
网络协议 Java Linux
探索Java Socket编程:实现跨平台客户端-服务器通信的奥秘
【6月更文挑战第21天】Java Socket编程示例展示了如何构建跨平台聊天应用。服务器端使用`ServerSocket`监听客户端连接,每个连接启动新线程处理。客户端连接服务器,发送并接收消息。Java的跨平台能力确保代码在不同操作系统上无需修改即可运行,简化开发与维护。
|
9天前
|
消息中间件 Java 双11
RocketMQ:揭秘电商巨头背后的消息队列秘密
**RocketMQ概览:**高性能分布式消息队列,适用于有序消息、事务处理、流计算、消息推送、日志处理及Binlog分发。在双11等高流量场景下证明了其性能、稳定性和低延迟。Java开发,利于扩展,性能超RabbitMQ,支持死信队列,但可能有集成兼容性问题。适合Java开发者,为电商等场景优化,每秒处理大量消息。
30 3
RocketMQ:揭秘电商巨头背后的消息队列秘密
|
16天前
|
消息中间件 监控 应用服务中间件
消息队列 MQ操作报错合集之重启Broker后,积压数出现为负数是什么导致的
在使用消息队列MQ时,可能会遇到各种报错情况。以下是一些常见的错误场景、可能的原因以及解决建议的汇总:1.连接错误、2.消息发送失败、3.消息消费报错、4.消息重试与死信处理、5.资源与权限问题、6.配置错误、7.系统资源限制、8.版本兼容性问题。
消息队列 MQ操作报错合集之重启Broker后,积压数出现为负数是什么导致的
|
16天前
|
消息中间件 Java 测试技术
消息队列 MQ操作报错合集之设置了setKeepAliveInterval(1)但仍然出现客户端未连接,该怎么解决
在使用消息队列MQ时,可能会遇到各种报错情况。以下是一些常见的错误场景、可能的原因以及解决建议的汇总:1.连接错误、2.消息发送失败、3.消息消费报错、4.消息重试与死信处理、5.资源与权限问题、6.配置错误、7.系统资源限制、8.版本兼容性问题。

热门文章

最新文章