同步异步调用,并谈谈消息队列mq;RocketMQ发送消息和消费消息测试类

简介: 同步调用优点:时效性强,打电话、直播,很快可以得到结果同步调用的问题:

同步调用优点:

时效性强,打电话、直播,很快可以得到结果

同步调用的问题:

耦合度高

性能和吞吐能力差

有额外的资源消耗

有级联失败的问题

异步:

对高并发有要求的功能使用异步

优点:

耦合度低

性能和吞吐能力高

流量削峰(对秒杀来说很重要)

故障隔离

缺点:

异步时效性没有同步好

依赖于Broker可靠性、安全性等要求高·1—————》Broker—>mq消息队列

架构明显变复杂了

场景:

双11是购物狂节,用户下单后,订单系统需要通知库存系统,传统的做法就是订单系统调用库存系统的接口

订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下但成功。

库存系统:订阅订单的消息,获取下单信息,进行库操作。就算库存系统出现故障,消息列队也能保证消息的可靠投递,不会导致消息丢失


流量削峰

 场景:秒杀活动,一般会因为流量过大,导致应用挂掉,为了解决这个问题,一般在应用前端加入消息队列


作用:


1,可以控制活动人数,超过此阈值的订单直接丢弃


2,可以缓解短时间的高流量压垮应用(应用程序安自己的最大处理能力获取订单)


消息队列mq

消息-----》以队列的形式存储

先进先出

异步解耦

场景:注册邮件和短信通知

在秒杀团队—》高并发 流量削峰


总的来说,不管是rocketmq还是rabbitmq,他们应用的场景是

1、流量削峰

2、日志处理

3、应用解耦

4、异步处理

RocketMQ发送消息和消费消息测试类

====================二期实现

导入依赖

        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.8.0</version>
        </dependency>

测试代码

package com.example.springbootproject;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets;
import java.util.List;
@SpringBootTest
@RunWith(SpringRunner.class)
@Slf4j
public class mqtest {
    /**
     * 生产者
     * @throws Exception
     */
    @Test
    public void test() throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("GROUP_DEMO");
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.setSendLatencyFaultEnable(true);
        producer.start();
        for (int i = 0; i < 10; i++) {
            Message msg = new Message("TOPIC_TEST",
                    "TAGE_TEST",
                    ("ROCKETMQ000" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            //发送核心方法
            //同步
            SendResult send = producer.send(msg);
            //异步
            producer.send(msg, new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    System.out.println("===" + sendResult);
                }
                @Override
                public void onException(Throwable throwable) {
                    System.out.println("e:" + throwable);
                }
            });
            //单向
            producer.sendOneway(msg);
            System.out.println("send:%s%n" + send);
        }
        //producer.shutdown();
    }
    /**
     * 消费者
     * @throws Exception
     */
    @Test
    public void consume() throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_consumer");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        //订阅Topic,去消费生产者产生的消息。
        consumer.subscribe("TOPIC_TEST", "*");//tag tagA|tagB|tagC
        consumer.setMessageModel(MessageModel.CLUSTERING);
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {
                try {
                    for (MessageExt item : list) {
                        String topic = item.getTopic();
                        String tags = item.getTags();
                        String msgBody = new String(item.getBody(), "utf-8");
                        System.out.println("收到消息:topic:" + topic + ",tags:" + tags + ",msg:" + msgBody);
                    }
                } catch (Exception e) {
                    log.error("e:", e);
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //启动消费者
        consumer.start();
        System.out.println("consumer start");
    }
}

====================一期实现

public class RockMQSendMessageTest {
  //发送消息
  public static void main(String[] args) throws Exception {
    //1创建消息生产者,并且设置生成组名
    DefaultMQProducer producer = new DefaultMQProducer("myproducer-group");
    //2为生产者设置NameServer的地址 mq ip:port port可以在本地跑起来mq
    producer.setNamesrvAddr(".......");
    //3 启动生产者
    producer.start();
    //4 构建消息对象,主要设置消息的主题 标签 内容 topic tags 内容
    Message message = new Message("myTopic", "myTag", ("Test jfdlfd").getBytes());
    //5发送消息
    SendResult result = producer.send(message, 1000);
    System.out.println(result);
    //6 关闭生产者
    producer.shutdown();
  }
}
public class RocketMQReceviceMessageTest {
  //接收消息
  public static void main(String[] args) throws Exception {
    //1创建消费者,并且为其制定消费者组名
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("myconsumer-group");
    //2为消费者设置NameServer的地址ip:port可以在本地跑起来mq
    consumer.setNamesrvAddr(".......");
    //3制定消费者订阅的主题和标签
    consumer.subscribe("myTopic", "*");
    //4设置一个回调函数,并在函数编写接收到消息之后的处理方法
    consumer.registerMessageListener(new MessageListenerConcurrently() {
      //j处理获取到的消息
      @Override
      public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        //消费逻辑
        System.out.println("Message====>" + list);
        //消息成功
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
      }
    });
    //5启动消费者
    consumer.start();
    System.out.println("启动消费成功");
  }
}
相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
2月前
|
消息中间件 监控 Java
RocketMQ 同步发送、异步发送和单向发送,如何选择?
本文详细分析了 RocketMQ 中同步发送、异步发送和单向发送三种消息发送方式的原理、优缺点及适用场景。同步发送可靠性高但延迟较大,适合订单系统等场景;异步发送非阻塞且延迟低,适用于实时数据处理等场景;单向发送高效但可靠性低,适用于日志收集等场景。文章还提供了示例代码和核心源码分析,帮助读者更好地理解每种发送方式的特点。
318 4
|
5月前
|
消息中间件 Java 测试技术
消息队列 MQ使用问题之数据流出规则是否支持平台的云RabbitMQ
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
6月前
|
消息中间件 测试技术 RocketMQ
消息队列 MQ产品使用合集之在异步发送消息函数sendMessage()中出现了错误,错误代码为-3,该如何解决
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
2月前
|
存储 消息中间件 安全
JUC组件实战:实现RRPC(Java与硬件通过MQTT的同步通信)
【10月更文挑战第9天】本文介绍了如何利用JUC组件实现Java服务与硬件通过MQTT的同步通信(RRPC)。通过模拟MQTT通信流程,使用`LinkedBlockingQueue`作为消息队列,详细讲解了消息发送、接收及响应的同步处理机制,包括任务超时处理和内存泄漏的预防措施。文中还提供了具体的类设计和方法实现,帮助理解同步通信的内部工作原理。
JUC组件实战:实现RRPC(Java与硬件通过MQTT的同步通信)
|
4月前
|
消息中间件 存储 Java
【揭秘】RocketMQ内部运作大揭秘:一探究竟,原来消息队列是这样工作的!
【8月更文挑战第19天】RocketMQ是一款高性能、高可用的消息中间件,在分布式系统中至关重要。它采用发布/订阅模式,支持高吞吐量的消息传递。核心组件包括管理元数据的NameServer、存储消息的Broker以及Producer和Consumer。RocketMQ支持发布/订阅与点对点两种模型,并具备复杂的消息持久化和路由机制。通过Java API示例,可轻松实现消息的发送与接收。RocketMQ凭借其出色的特性和可靠性,成为大型分布式系统首选的消息解决方案。
77 5
|
5月前
|
消息中间件 API 开发工具
消息队列 MQ使用问题之如何开启RabbitMQ的MQTT功能
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
5月前
|
消息中间件 传感器 负载均衡
消息队列 MQ使用问题之如何配置一主一从的同步复制模式
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
消息队列 MQ使用问题之如何配置一主一从的同步复制模式
|
5月前
|
消息中间件 缓存 IDE
MetaQ/RocketMQ 原理问题之消息队列中间件的问题如何解决
MetaQ/RocketMQ 原理问题之消息队列中间件的问题如何解决
|
5月前
|
消息中间件 运维 RocketMQ
MetaQ/RocketMQ 原理问题之slave broker是从master同步信息的问题如何解决
MetaQ/RocketMQ 原理问题之slave broker是从master同步信息的问题如何解决
|
5月前
|
消息中间件 小程序 RocketMQ
消息队列 MQ使用问题之如何在小程序中引用paho-mqtt
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。

相关产品

  • 云消息队列 MQ