springboot整合阿里云的消息队列MQ

简介: springboot整合阿里云的消息队列MQ

1、概述


公司使用的阿里云消息队列MQ服务,框架是springboot。做了一个demo,记录整合过程。


2、步骤


第一步:配置

配置工作基本上就是按照阿里云消息队列MQ的文档所述那样,在控制台的消息队列MQ里面进行配置.快速入门概述

这里有一点注意的点:我们的topic在授权的时候,可以授权给子账号的。(阿里的rocketmq 支持子账号)


第二步:编码

这里涉及到两个知识点:


配置类:

public class BusMqConfig {
    @Value("${mq.topic.business}")
    private String topic;
    @Value("${mq.producerId.business}")
    private String producerId;
    @Value("${mq.consumerId.business}")
    private String consumerId;
    @Value("${mq.accesskey}")
    private String accesskey;
    @Value("${mq.secretkey}")
    private String secretkey;
    @Value("${mq.onsaddr}")
    private String onsaddr;
    @Value("${mq.subExpression}")
    private String subExpression;
    public String getSubExpression() {
        return subExpression;
    }
    public void setSubExpression(String subExpression) {
        this.subExpression = subExpression;
    }
    //提供消费者的配置
    public Properties getConsumerProperties() {
        Properties consumerProperties = new Properties();
        consumerProperties.setProperty(PropertyKeyConst.ConsumerId, consumerId);
        consumerProperties.setProperty(PropertyKeyConst.AccessKey, accesskey);
        consumerProperties.setProperty(PropertyKeyConst.SecretKey, secretkey);
        consumerProperties.setProperty(PropertyKeyConst.ONSAddr, onsaddr);
        return consumerProperties;
    }
    //提供生产者的配置
    public Properties getProducerProperties() {
        Properties producerProperties = new Properties();
        producerProperties.setProperty(PropertyKeyConst.ProducerId, producerId);
        producerProperties.setProperty(PropertyKeyConst.AccessKey, accesskey);
        producerProperties.setProperty(PropertyKeyConst.SecretKey, secretkey);
        producerProperties.setProperty(PropertyKeyConst.ONSAddr, onsaddr);
        return producerProperties;
    }
    public String getTopic() {
        return topic;
    }
    public void setTopic(String topic) {
        this.topic = topic;
    }
}
复制代码


消费者:在容器启动后立马开始消费

public class MqConsumer implements InitializingBean, DisposableBean{
    @Autowired
    BusMqConfig busMqConfig;
    private Consumer busConsumer;
    @Override
    public void afterPropertiesSet() throws Exception {
        System.out.println("消费者初始化");
        busConsumer = ONSFactory.createConsumer(busMqConfig.getConsumerProperties());
       // busConsumer.start();
        System.out.println("消费者初始化完成");
    }
    public void start(){
        busConsumer.start();
    }
    public void onMessage(){
        busConsumer.subscribe(busMqConfig.getTopic(), busMqConfig.getSubExpression(), new MessageListener() {
            @Override
            public Action consume(Message message, ConsumeContext context) {
               // System.out.println(JSON.toJSONString(message));
                System.out.println("Receive: " + message);
                System.out.println(new String(message.getBody()));
                return Action.CommitMessage;
            }
        });
    }
    @Override
    public void destroy() throws Exception {
        busConsumer.shutdown();
        System.out.println("停止");
    }
}
复制代码


配置容器启动开始消费:

@Autowired
    MqConsumer mqConsumer;
    @Override
    public void run(String... strings) throws Exception {
        System.out.println("开始消费");
        mqConsumer.start();
        mqConsumer.onMessage();
    }

此时我们先在控制台上,用阿里提供的界面化的窗口,模拟发送几条消息。此时的消息的状态是未消费状态。启动项目后,就会在控制台看到消费记录了。


生产者

private final static Logger LOGGER = LoggerFactory.getLogger(MqProducer.class);
    @Autowired
    BusMqConfig busMqConfig;
    private Producer producer;
    @Override
    public void afterPropertiesSet() throws Exception {
        System.out.println("生产者初始化");
        producer = ONSFactory.createProducer(busMqConfig.getProducerProperties());
        producer.start();
    }
    public void sentMessage(Message message){
            producer.sendAsync(message, new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    LOGGER.info(sendResult.getTopic()+"-----"+sendResult.getMessageId());
                }
                @Override
                public void onException(OnExceptionContext context) {
                    LOGGER.error(context.getTopic()+"-----"+context.getMessageId()+":error="+context.getException());
                }
            });
    }
    @Override
    public void destroy() throws Exception {
            producer.shutdown();
    }


创建一个controller,通过postman向接口发送消息体。

@PostMapping(value = "/send")
    public Message msg(@RequestBody String msgbody, HttpServletRequest request){
        System.out.println(msgbody);
        Message msg = new Message(
                // Message 所属的 Topic
                busMqConfig.getTopic(),
                // Message Tag,
                // 可理解为 Gmail 中的标签,对消息进行再归类,方便 Consumer 指定过滤条件在 MQ 服务器过滤
                "TagA",
                // Message Body
                // 任何二进制形式的数据,MQ 不做任何干预,需要 Producer 与 Consumer 协商好一致的序列化和反序列化方式
                msgbody.getBytes());
        // 设置代表消息的业务关键属性,请尽可能全局唯一。
        // 以方便您在无法正常收到消息情况下,可通过阿里云服务器管理控制台查询消息并补发。
        // 注意:不设置也不会影响消息正常收发
        msg.setKey("ORDERID_" + Math.round(Math.random()*8999+1000));
        mqProducer.sentMessage(msg);
        return msg;
    }

至此demo就结束。其实过程很简单


3、总结


此次整合的过程。看似简单。我感觉还有深层的东西在里面

  • spring中的bean生命周期的各个阶段,我们是否能记住,并运用了。我们整天吵着读源码,是否真的读了?读了是否有会运用?我想这就是研究spring架构的意义吧。
  • springboot 虽然使得开发人员的工作量减少了。但是那些关键点,我们是否了解,并掌握。
  • 当我们项目中要整合新的东西时,头绪在哪? 答:官方文档+源码+别人的理解=自己的理解


4、扩展


此demo某些部分还是有其他替代方案的

  • 应用启动后立即开始消费消息:可以创建一个监听实现ApplicationListener
  • 消费者生产者:我是自定义类,把阿里提供的custemer producer 做了封装。其实我们还可以直接将阿里的custemer producer作为bean配置。但我觉得封装下还是比较灵活的

demo地址

网格学习法:由一个点,或者一个项目将各个知识点串起来,锚点连接,由点到面,串成网格。


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
2天前
|
消息中间件 测试技术 RocketMQ
消息队列 MQ产品使用合集之在异步发送消息函数sendMessage()中出现了错误,错误代码为-3,该如何解决
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
2天前
|
消息中间件 监控 Oracle
消息队列 MQ产品使用合集之启动Namesrv节点时,遇到报错,该如何解决
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
2天前
|
消息中间件 Java RocketMQ
消息队列 MQ产品使用合集之当SpringBoot应用因网络不通而启动失败时,该如何解决
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
2天前
|
消息中间件 网络协议 JavaScript
消息队列 MQ产品使用合集之报错提示是"the internal error!",是什么原因导致的”
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
1天前
|
消息中间件 Java Apache
使用Spring Boot实现与ActiveMQ的消息队列集成
使用Spring Boot实现与ActiveMQ的消息队列集成
|
2天前
|
消息中间件 Java API
消息队列 MQ产品使用合集之遇到"No topic route info in name server for the topic"错误,该如何处理
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
2天前
|
消息中间件 Java Shell
消息队列 MQ产品使用合集之启动broker&proxy的时候会报错,该怎么办
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
2天前
|
消息中间件 Linux 开发工具
消息队列 MQ产品使用合集之重复消费一般是什么导致的
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
2天前
|
消息中间件 网络协议 RocketMQ
消息队列 MQ产品使用合集之broker开启proxy,启动之后producer生产消息始终都只到一个broker,该怎么办
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
2天前
|
消息中间件 网络安全 开发工具
消息队列 MQ产品使用合集之使用grpc proxy,生产者心跳并没有发送至Default中,如何解决
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。

热门文章

最新文章

相关产品

  • 云消息队列 MQ