本文介绍如何在 Spring 框架下用 MQ 收发消息。主要包括三部分内容:普通消息生产者和 Spring 集成,事务消息生产者和 Spring 集成, 消息消费者与 Spring 集成。
请确保同一个 Consumer ID 下所有 Consumer 实例的订阅关系保持一致,具体请参考
订阅关系一致文档。
Spring 框架下支持的配置参数和 TCP Java 一致,具体可以参考
Java SDK 使用说明。
生产者与 Spring 集成
在 producer.xml 中定义生产者 Bean 等信息。 <?xml version="1.0" encoding="UTF-8"?>- <beans xmlns="http://www.springframework.org/schema/beans"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
- <bean id="producer" class="com.aliyun.openservices.ons.api.bean.ProducerBean" init-method="start" destroy-method="shutdown">
- <!-- Spring接入方式支持Java SDK支持的所有配置项 -->
- <property name="properties" > <!--生产者配置信息-->
- <props>
- <prop key="ProducerId">PID_DEMO</prop> <!--请替换XXX-->
- <prop key="AccessKey">XXX</prop>
- <prop key="SecretKey">XXX</prop>
- </props>
- </property>
- </bean>
- </beans>
通过已经与 Spring 集成好的生产者生产消息。
- package demo;
- import com.aliyun.openservices.ons.api.Message;
- import com.aliyun.openservices.ons.api.Producer;
- import com.aliyun.openservices.ons.api.SendResult;
- import com.aliyun.openservices.ons.api.exception.ONSClientException;
- import org.springframework.context.ApplicationContext;
- import org.springframework.context.support.ClassPathXmlApplicationContext;
- public class ProduceWithSpring {
- public static void main(String[] args) {
- /**
- * 生产者Bean配置在producer.xml中,可通过ApplicationContext获取或者直接注入到其他类(比如具体的Controller)中.
- */
- ApplicationContext context = new ClassPathXmlApplicationContext("producer.xml");
- Producer producer = (Producer) context.getBean("producer");
- //循环发送消息
- for (int i = 0; i < 100; i++) {
- Message msg = new Message( //
- // Message所属的Topic
- "TopicTestMQ",
- // Message Tag 可理解为Gmail中的标签,对消息进行再归类,方便Consumer指定过滤条件在MQ服务器过滤
- "TagA",
- // Message Body 可以是任何二进制形式的数据, MQ不做任何干预
- // 需要Producer与Consumer协商好一致的序列化和反序列化方式
- "Hello MQ".getBytes());
- // 设置代表消息的业务关键属性,请尽可能全局唯一
- // 以方便您在无法正常收到消息情况下,可通过MQ 控制台查询消息并补发
- // 注意:不设置也不会影响消息正常收发
- msg.setKey("ORDERID_100");
- // 发送消息,只要不抛异常就是成功
- try {
- SendResult sendResult = producer.send(msg);
- assert sendResult != null;
- System.out.println("send success: " + sendResult.getMessageId());
- }catch (ONSClientException e) {
- System.out.println("发送失败");
- }
- }
- }
- }
事务消息生产者与 Spring 集成
有关事务消息的概念请查看
发送事务消息。
首先需要实现一个 LocalTransactionChecker,如下所示。一个消息生产者只能有一个 LocalTransactionChecker。 package demo;- import com.aliyun.openservices.ons.api.Message;
- import com.aliyun.openservices.ons.api.transaction.LocalTransactionChecker;
- import com.aliyun.openservices.ons.api.transaction.TransactionStatus;
- public class DemoLocalTransactionChecker implements LocalTransactionChecker {
- public TransactionStatus check(Message msg) {
- System.out.println("开始回查本地事务状态");
- return TransactionStatus.CommitTransaction; //根据本地事务状态检查结果返回不同的TransactionStatus
- }
- }
其次,在 transactionProducer.xml 中定义事务消息生产者 Bean 等信息。
- <?xml version="1.0" encoding="UTF-8"?>
- <beans xmlns="http://www.springframework.org/schema/beans"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
- <bean id="localTransactionChecker" class="demo.DemoLocalTransactionChecker"></bean>
- <bean id="transactionProducer" class="com.aliyun.openservices.ons.api.bean.TransactionProducerBean" init-method="start" destroy-method="shutdown">
- <property name="properties" > <!--事务消息生产者配置信息-->
- <props>
- <prop key="ProducerId">PID_DEMO</prop> <!--请替换XXX-->
- <prop key="AccessKey">AKDEMO</prop>
- <prop key="SecretKey">SKDEMO</prop>
- </props>
- </property>
- <property name="localTransactionChecker" ref="localTransactionChecker"></property>
- </bean>
- </beans>
通过已经与 Spring 集成好的生产者生产事务消息。
- package demo;
- import com.aliyun.openservices.ons.api.Message;
- import com.aliyun.openservices.ons.api.SendResult;
- import com.aliyun.openservices.ons.api.transaction.LocalTransactionExecuter;
- import com.aliyun.openservices.ons.api.transaction.TransactionProducer;
- import com.aliyun.openservices.ons.api.transaction.TransactionStatus;
- import org.springframework.context.ApplicationContext;
- import org.springframework.context.support.ClassPathXmlApplicationContext;
- public class ProduceTransMsgWithSpring {
- public static void main(String[] args) {
- /**
- * 事务消息生产者Bean配置在transactionProducer.xml中,可通过ApplicationContext获取或者直接注入到其他类(比如具体的Controller)中.
- * 请结合例子"发送事务消息"
- */
- ApplicationContext context = new ClassPathXmlApplicationContext("transactionProducer.xml");
- TransactionProducer transactionProducer = (TransactionProducer) context.getBean("transactionProducer");
- Message msg = new Message("XXX", "TagA", "Hello MQ transaction===".getBytes());
- SendResult sendResult = transactionProducer.send(msg, new LocalTransactionExecuter() {
- @Override
- public TransactionStatus execute(Message msg, Object arg) {
- System.out.println("执行本地事务");
- return TransactionStatus.CommitTransaction; //根据本地事务执行结果来返回不同的TransactionStatus
- }
- }, null);
- }
- }
消费者与 Spring 集成
创建 MessageListener,如下所示。 package demo;- import com.aliyun.openservices.ons.api.Action;
- import com.aliyun.openservices.ons.api.ConsumeContext;
- import com.aliyun.openservices.ons.api.Message;
- import com.aliyun.openservices.ons.api.MessageListener;
- public class DemoMessageListener implements MessageListener {
- public Action consume(Message message, ConsumeContext context) {
- System.out.println("Receive: " + message.getMsgID());
- try {
- //do something..
- return Action.CommitMessage;
- }catch (Exception e) {
- //消费失败
- return Action.ReconsumeLater;
- }
- }
- }
在 consumer.xml 中定义消费者 Bean 等信息。
- <?xml version="1.0" encoding="UTF-8"?>
- <beans xmlns="http://www.springframework.org/schema/beans"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
- <bean id="msgListener" class="demo.DemoMessageListener"></bean> <!--Listener配置-->
- <!-- 多CID订阅同一个Topic,可以创建多个ConsumerBean-->
- <bean id="consumer" class="com.aliyun.openservices.ons.api.bean.ConsumerBean" init-method="start" destroy-method="shutdown">
- <property name="properties" > <!--消费者配置信息-->
- <props>
- <prop key="ConsumerId">CID_DEMO</prop> <!--请替换XXX-->
- <prop key="AccessKey">AKDEMO</prop>
- <prop key="SecretKey">SKDEMO</prop>
- <!--将消费者线程数固定为50个
- <prop key="ConsumeThreadNums">50</prop>
- -->
- </props>
- </property>
- <property name="subscriptionTable">
- <map>
- <entry value-ref="msgListener">
- <key>
- <bean class="com.aliyun.openservices.ons.api.bean.Subscription">
- <property name="topic" value="TopicTestMQ"/>
- <property name="expression" value="*"/><!--expression即Tag,可以设置成具体的Tag,如 taga||tagb||tagc,也可设置成*。 *仅代表订阅所有Tag,不支持通配-->
- </bean>
- </key>
- </entry>
- <!--更多的订阅添加entry节点即可,如下所示-->
- <entry value-ref="msgListener">
- <key>
- <bean class="com.aliyun.openservices.ons.api.bean.Subscription">
- <property name="topic" value="TopicTestMQ-Other"/> <!--订阅另外一个Topic -->
- <property name="expression" value="taga||tagb"/> <!-- 订阅多个Tag -->
- </bean>
- </key>
- </entry>
- </map>
- </property>
- </bean>
- </beans>
运行已经与 Spring 集成好的消费者,如下所示。
- package demo;
- import org.springframework.context.ApplicationContext;
- import org.springframework.context.support.ClassPathXmlApplicationContext;
- public class ConsumeWithSpring {
- public static void main(String[] args) {
- /**
- * 消费者Bean配置在consumer.xml中,可通过ApplicationContext获取或者直接注入到其他类(比如具体的Controller)中
- */
- ApplicationContext context = new ClassPathXmlApplicationContext("consumer.xml");
- System.out.println("Consumer Started");
- }
- }