版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
详细解答可以参考官方帮助文档
本文介绍如何在 Spring 框架下用 MQ 收发消息。 主要包括三部分内容:普通消息生产者和 Spring 集成,事务消息生产者和 Spring 集成,消息消费者和 Spring 集成。
请确保同一个 Consumer ID 下所有 Consumer 实例的订阅关系保持一致,具体请参考订阅关系一致文档。
Spring 框架下支持的配置参数和 TCP Java 一致,具体可以参考Java SDK 使用说明。
在 producer.xml 中定义生产者 Bean 等信息。
<?xml version="1.0" encoding="UTF-8"?><beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"><bean id="producer" class="com.aliyun.openservices.ons.api.bean.ProducerBean" init-method="start" destroy-method="shutdown"><!-- Spring 接入方式支持 Java SDK 支持的所有配置项 --><property name="properties" > <!--生产者配置信息--><props><prop key="ProducerId">PID_DEMO</prop> <!--请替换 XXX--><prop key="AccessKey">XXX</prop><prop key="SecretKey">XXX</prop></props></property></bean></beans>
通过已经与 Spring 集成好的生产者生产消息。
package demo;import com.aliyun.openservices.ons.api.Message;import com.aliyun.openservices.ons.api.Producer;import com.aliyun.openservices.ons.api.SendResult;import com.aliyun.openservices.ons.api.exception.ONSClientException;import org.springframework.context.ApplicationContext;import org.springframework.context.support.ClassPathXmlApplicationContext;public class ProduceWithSpring {public static void main(String[] args) {/*** 生产者 Bean 配置在 producer.xml 中,可通过 ApplicationContext 获取或者直接注入到其他类(比如具体的 Controller)中*/ApplicationContext context = new ClassPathXmlApplicationContext("producer.xml");Producer producer = (Producer) context.getBean("producer");//循环发送消息for (int i = 0; i < 100; i++) {Message msg = new Message( //// Message 所属的 Topic"TopicTestMQ",// Message Tag 可理解为 Gmail 中的标签,对消息进行再归类,方便 Consumer 指定过滤条件在 MQ 服务器过滤"TagA",// Message Body 可以是任何二进制形式的数据, MQ 不做任何干预// 需要 Producer 与 Consumer 协商好一致的序列化和反序列化方式"Hello MQ".getBytes());// 设置代表消息的业务关键属性,请尽可能全局唯一// 以方便您在无法正常收到消息情况下,可通过 MQ 控制台查询消息并补发// 注意:不设置也不会影响消息正常收发msg.setKey("ORDERID_100");// 发送消息,只要不抛异常就是成功try {SendResult sendResult = producer.send(msg);assert sendResult != null;System.out.println("send success: " + sendResult.getMessageId());}catch (ONSClientException e) {System.out.println("发送失败");}}}}
有关事务消息的概念,请参见收发事务消息。
首先需要实现一个 LocalTransactionChecker,如下所示。 一个消息生产者只能有一个 LocalTransactionChecker。
package demo;import com.aliyun.openservices.ons.api.Message;import com.aliyun.openservices.ons.api.transaction.LocalTransactionChecker;import com.aliyun.openservices.ons.api.transaction.TransactionStatus;public class DemoLocalTransactionChecker implements LocalTransactionChecker {public TransactionStatus check(Message msg) {System.out.println("开始回查本地事务状态");return TransactionStatus.CommitTransaction; //根据本地事务状态检查结果返回不同的 TransactionStatus}}
在 transactionProducer.xml 中定义事务消息生产者 Bean 等信息。
<?xml version="1.0" encoding="UTF-8"?><beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"><bean id="localTransactionChecker" class="demo.DemoLocalTransactionChecker"></bean><bean id="transactionProducer" class="com.aliyun.openservices.ons.api.bean.TransactionProducerBean" init-method="start" destroy-method="shutdown"><property name="properties" > <!--事务消息生产者配置信息--><props><prop key="ProducerId">PID_DEMO</prop> <!--请替换 XXX--><prop key="AccessKey">AKDEMO</prop><prop key="SecretKey">SKDEMO</prop></props></property><property name="localTransactionChecker" ref="localTransactionChecker"></property></bean></beans>
通过已经与 Spring 集成好的生产者生产事务消息。
package demo;import com.aliyun.openservices.ons.api.Message;import com.aliyun.openservices.ons.api.SendResult;import com.aliyun.openservices.ons.api.transaction.LocalTransactionExecuter;import com.aliyun.openservices.ons.api.transaction.TransactionProducer;import com.aliyun.openservices.ons.api.transaction.TransactionStatus;import org.springframework.context.ApplicationContext;import org.springframework.context.support.ClassPathXmlApplicationContext;public class ProduceTransMsgWithSpring {public static void main(String[] args) {/*** 事务消息生产者 Bean 配置在 transactionProducer.xml 中,可通过 ApplicationContext 获取或者直接注入到其他类(比如具体的 Controller)中.* 请结合例子"发送事务消息"*/ApplicationContext context = new ClassPathXmlApplicationContext("transactionProducer.xml");TransactionProducer transactionProducer = (TransactionProducer) context.getBean("transactionProducer");Message msg = new Message("XXX", "TagA", "Hello MQ transaction===".getBytes());SendResult sendResult = transactionProducer.send(msg, new LocalTransactionExecuter() {@Overridepublic TransactionStatus execute(Message msg, Object arg) {System.out.println("执行本地事务");return TransactionStatus.CommitTransaction; //根据本地事务执行结果来返回不同的 TransactionStatus}}, null);}}
创建 MessageListener,如下所示。
package demo;import com.aliyun.openservices.ons.api.Action;import com.aliyun.openservices.ons.api.ConsumeContext;import com.aliyun.openservices.ons.api.Message;import com.aliyun.openservices.ons.api.MessageListener;public class DemoMessageListener implements MessageListener {public Action consume(Message message, ConsumeContext context) {System.out.println("Receive: " + message.getMsgID());try {//do something..return Action.CommitMessage;}catch (Exception e) {//消费失败return Action.ReconsumeLater;}}}
在 consumer.xml 中定义消费者 Bean 等信息。
<?xml version="1.0" encoding="UTF-8"?><beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"><bean id="msgListener" class="demo.DemoMessageListener"></bean> <!--Listener 配置--><!-- 多 CID 订阅同一个 Topic,可以创建多个 ConsumerBean--><bean id="consumer" class="com.aliyun.openservices.ons.api.bean.ConsumerBean" init-method="start" destroy-method="shutdown"><property name="properties" > <!--消费者配置信息--><props><prop key="ConsumerId">CID_DEMO</prop> <!--请替换 XXX--><prop key="AccessKey">AKDEMO</prop><prop key="SecretKey">SKDEMO</prop><!--将消费者线程数固定为50个<prop key="ConsumeThreadNums">50</prop>--></props></property><property name="subscriptionTable"><map><entry value-ref="msgListener"><key><bean class="com.aliyun.openservices.ons.api.bean.Subscription"><property name="topic" value="TopicTestMQ"/><property name="expression" value="*"/><!--expression 即 Tag,可以设置成具体的 Tag,如 taga||tagb||tagc,也可设置成*。 *仅代表订阅所有 Tag,不支持通配--></bean></key></entry><!--更多的订阅添加 entry 节点即可,如下所示--><entry value-ref="msgListener"><key><bean class="com.aliyun.openservices.ons.api.bean.Subscription"><property name="topic" value="TopicTestMQ-Other"/> <!--订阅另外一个 Topic --><property name="expression" value="taga||tagb"/> <!-- 订阅多个 Tag --></bean></key></entry></map></property></bean></beans>
运行已经与 Spring 集成好的消费者,如下所示。
package demo;import org.springframework.context.ApplicationContext;import org.springframework.context.support.ClassPathXmlApplicationContext;public class ConsumeWithSpring {public static void main(String[] args) {/*** 消费者 Bean 配置在 consumer.xml 中,可通过 ApplicationContext 获取或者直接注入到其他类(比如具体的 Controller)中*/ApplicationContext context = new ClassPathXmlApplicationContext("consumer.xml");System.out.println("Consumer Started");}}