1、概述
公司使用的阿里云消息队列MQ服务,框架是springboot。做了一个demo,记录整合过程。
2、步骤
第一步:配置
配置工作基本上就是按照阿里云消息队列MQ的文档所述那样,在控制台的消息队列MQ里面进行配置.快速入门概述
这里有一点注意的点:我们的topic在授权的时候,可以授权给子账号的。(阿里的rocketmq 支持子账号)
第二步:编码
这里涉及到两个知识点:
配置类:
public class BusMqConfig { @Value("${mq.topic.business}") private String topic; @Value("${mq.producerId.business}") private String producerId; @Value("${mq.consumerId.business}") private String consumerId; @Value("${mq.accesskey}") private String accesskey; @Value("${mq.secretkey}") private String secretkey; @Value("${mq.onsaddr}") private String onsaddr; @Value("${mq.subExpression}") private String subExpression; public String getSubExpression() { return subExpression; } public void setSubExpression(String subExpression) { this.subExpression = subExpression; } //提供消费者的配置 public Properties getConsumerProperties() { Properties consumerProperties = new Properties(); consumerProperties.setProperty(PropertyKeyConst.ConsumerId, consumerId); consumerProperties.setProperty(PropertyKeyConst.AccessKey, accesskey); consumerProperties.setProperty(PropertyKeyConst.SecretKey, secretkey); consumerProperties.setProperty(PropertyKeyConst.ONSAddr, onsaddr); return consumerProperties; } //提供生产者的配置 public Properties getProducerProperties() { Properties producerProperties = new Properties(); producerProperties.setProperty(PropertyKeyConst.ProducerId, producerId); producerProperties.setProperty(PropertyKeyConst.AccessKey, accesskey); producerProperties.setProperty(PropertyKeyConst.SecretKey, secretkey); producerProperties.setProperty(PropertyKeyConst.ONSAddr, onsaddr); return producerProperties; } public String getTopic() { return topic; } public void setTopic(String topic) { this.topic = topic; } } 复制代码
消费者:在容器启动后立马开始消费
public class MqConsumer implements InitializingBean, DisposableBean{ @Autowired BusMqConfig busMqConfig; private Consumer busConsumer; @Override public void afterPropertiesSet() throws Exception { System.out.println("消费者初始化"); busConsumer = ONSFactory.createConsumer(busMqConfig.getConsumerProperties()); // busConsumer.start(); System.out.println("消费者初始化完成"); } public void start(){ busConsumer.start(); } public void onMessage(){ busConsumer.subscribe(busMqConfig.getTopic(), busMqConfig.getSubExpression(), new MessageListener() { @Override public Action consume(Message message, ConsumeContext context) { // System.out.println(JSON.toJSONString(message)); System.out.println("Receive: " + message); System.out.println(new String(message.getBody())); return Action.CommitMessage; } }); } @Override public void destroy() throws Exception { busConsumer.shutdown(); System.out.println("停止"); } } 复制代码
配置容器启动开始消费:
@Autowired MqConsumer mqConsumer; @Override public void run(String... strings) throws Exception { System.out.println("开始消费"); mqConsumer.start(); mqConsumer.onMessage(); }
此时我们先在控制台上,用阿里提供的界面化的窗口,模拟发送几条消息。此时的消息的状态是未消费状态。启动项目后,就会在控制台看到消费记录了。
生产者
private final static Logger LOGGER = LoggerFactory.getLogger(MqProducer.class); @Autowired BusMqConfig busMqConfig; private Producer producer; @Override public void afterPropertiesSet() throws Exception { System.out.println("生产者初始化"); producer = ONSFactory.createProducer(busMqConfig.getProducerProperties()); producer.start(); } public void sentMessage(Message message){ producer.sendAsync(message, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { LOGGER.info(sendResult.getTopic()+"-----"+sendResult.getMessageId()); } @Override public void onException(OnExceptionContext context) { LOGGER.error(context.getTopic()+"-----"+context.getMessageId()+":error="+context.getException()); } }); } @Override public void destroy() throws Exception { producer.shutdown(); }
创建一个controller,通过postman向接口发送消息体。
@PostMapping(value = "/send") public Message msg(@RequestBody String msgbody, HttpServletRequest request){ System.out.println(msgbody); Message msg = new Message( // Message 所属的 Topic busMqConfig.getTopic(), // Message Tag, // 可理解为 Gmail 中的标签,对消息进行再归类,方便 Consumer 指定过滤条件在 MQ 服务器过滤 "TagA", // Message Body // 任何二进制形式的数据,MQ 不做任何干预,需要 Producer 与 Consumer 协商好一致的序列化和反序列化方式 msgbody.getBytes()); // 设置代表消息的业务关键属性,请尽可能全局唯一。 // 以方便您在无法正常收到消息情况下,可通过阿里云服务器管理控制台查询消息并补发。 // 注意:不设置也不会影响消息正常收发 msg.setKey("ORDERID_" + Math.round(Math.random()*8999+1000)); mqProducer.sentMessage(msg); return msg; }
至此demo就结束。其实过程很简单
3、总结
此次整合的过程。看似简单。我感觉还有深层的东西在里面
- spring中的bean生命周期的各个阶段,我们是否能记住,并运用了。我们整天吵着读源码,是否真的读了?读了是否有会运用?我想这就是研究spring架构的意义吧。
- springboot 虽然使得开发人员的工作量减少了。但是那些关键点,我们是否了解,并掌握。
- 当我们项目中要整合新的东西时,头绪在哪? 答:官方文档+源码+别人的理解=自己的理解
4、扩展
此demo某些部分还是有其他替代方案的
- 应用启动后立即开始消费消息:可以创建一个监听实现ApplicationListener
- 消费者生产者:我是自定义类,把阿里提供的custemer producer 做了封装。其实我们还可以直接将阿里的custemer producer作为bean配置。但我觉得封装下还是比较灵活的
网格学习法:由一个点,或者一个项目将各个知识点串起来,锚点连接,由点到面,串成网格。