开发者社区> 问答> 正文

怎么开始Spring 集成


本文介绍如何在 Spring 框架下用 MQ 收发消息。主要包括三部分内容:普通消息生产者和 Spring 集成,事务消息生产者和 Spring 集成, 消息消费者与 Spring 集成。
请确保同一个 Consumer ID 下所有 Consumer 实例的订阅关系保持一致,具体请参考 订阅关系一致文档。
Spring 框架下支持的配置参数和 TCP Java 一致,具体可以参考 Java SDK 使用说明

生产者与 Spring 集成


  1. 在 producer.xml 中定义生产者 Bean 等信息。 <?xml version="1.0" encoding="UTF-8"?>
  2. <beans xmlns="http://www.springframework.org/schema/beans"
  3.         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4.         xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
  5.      <bean id="producer" class="com.aliyun.openservices.ons.api.bean.ProducerBean" init-method="start" destroy-method="shutdown">
  6.      <!-- Spring接入方式支持Java SDK支持的所有配置项 -->
  7.          <property name="properties" > <!--生产者配置信息-->
  8.              <props>
  9.                  <prop key="ProducerId">PID_DEMO</prop> <!--请替换XXX-->
  10.                  <prop key="AccessKey">XXX</prop>
  11.                  <prop key="SecretKey">XXX</prop>
  12.              </props>
  13.          </property>
  14.      </bean>
  15. </beans>

通过已经与 Spring 集成好的生产者生产消息。
  1. package demo;
  2. import com.aliyun.openservices.ons.api.Message;
  3. import com.aliyun.openservices.ons.api.Producer;
  4. import com.aliyun.openservices.ons.api.SendResult;
  5. import com.aliyun.openservices.ons.api.exception.ONSClientException;
  6. import org.springframework.context.ApplicationContext;
  7. import org.springframework.context.support.ClassPathXmlApplicationContext;
  8. public class ProduceWithSpring {
  9.      public static void main(String[] args) {
  10.          /**
  11.           * 生产者Bean配置在producer.xml中,可通过ApplicationContext获取或者直接注入到其他类(比如具体的Controller)中.
  12.           */
  13.          ApplicationContext context = new ClassPathXmlApplicationContext("producer.xml");
  14.          Producer producer = (Producer) context.getBean("producer");
  15.          //循环发送消息
  16.          for (int i = 0; i < 100; i++) {
  17.              Message msg = new Message( //
  18.                      // Message所属的Topic
  19.                      "TopicTestMQ",
  20.                      // Message Tag 可理解为Gmail中的标签,对消息进行再归类,方便Consumer指定过滤条件在MQ服务器过滤
  21.                      "TagA",
  22.                      // Message Body 可以是任何二进制形式的数据, MQ不做任何干预
  23.                      // 需要Producer与Consumer协商好一致的序列化和反序列化方式
  24.                      "Hello MQ".getBytes());
  25.              // 设置代表消息的业务关键属性,请尽可能全局唯一
  26.              // 以方便您在无法正常收到消息情况下,可通过MQ 控制台查询消息并补发
  27.              // 注意:不设置也不会影响消息正常收发
  28.              msg.setKey("ORDERID_100");
  29.              // 发送消息,只要不抛异常就是成功
  30.              try {
  31.                  SendResult sendResult = producer.send(msg);
  32.                  assert sendResult != null;
  33.                  System.out.println("send success: " + sendResult.getMessageId());
  34.              }catch (ONSClientException e) {
  35.                  System.out.println("发送失败");
  36.              }
  37.          }
  38.      }
  39. }


事务消息生产者与 Spring 集成


有关事务消息的概念请查看 发送事务消息

  1. 首先需要实现一个 LocalTransactionChecker,如下所示。一个消息生产者只能有一个 LocalTransactionChecker。 package demo;
  2. import com.aliyun.openservices.ons.api.Message;
  3. import com.aliyun.openservices.ons.api.transaction.LocalTransactionChecker;
  4. import com.aliyun.openservices.ons.api.transaction.TransactionStatus;
  5. public class DemoLocalTransactionChecker implements LocalTransactionChecker {
  6.      public TransactionStatus check(Message msg) {
  7.          System.out.println("开始回查本地事务状态");
  8.          return TransactionStatus.CommitTransaction; //根据本地事务状态检查结果返回不同的TransactionStatus
  9.      }
  10. }

其次,在 transactionProducer.xml 中定义事务消息生产者 Bean 等信息。
  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <beans xmlns="http://www.springframework.org/schema/beans"
  3.         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4.         xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
  5.      <bean id="localTransactionChecker" class="demo.DemoLocalTransactionChecker"></bean>
  6.      <bean id="transactionProducer" class="com.aliyun.openservices.ons.api.bean.TransactionProducerBean" init-method="start" destroy-method="shutdown">
  7.          <property name="properties" > <!--事务消息生产者配置信息-->
  8.              <props>
  9.                  <prop key="ProducerId">PID_DEMO</prop> <!--请替换XXX-->
  10.                  <prop key="AccessKey">AKDEMO</prop>
  11.                  <prop key="SecretKey">SKDEMO</prop>
  12.              </props>
  13.          </property>
  14.          <property name="localTransactionChecker" ref="localTransactionChecker"></property>
  15.      </bean>
  16. </beans>

通过已经与 Spring 集成好的生产者生产事务消息。
  1. package demo;
  2. import com.aliyun.openservices.ons.api.Message;
  3. import com.aliyun.openservices.ons.api.SendResult;
  4. import com.aliyun.openservices.ons.api.transaction.LocalTransactionExecuter;
  5. import com.aliyun.openservices.ons.api.transaction.TransactionProducer;
  6. import com.aliyun.openservices.ons.api.transaction.TransactionStatus;
  7. import org.springframework.context.ApplicationContext;
  8. import org.springframework.context.support.ClassPathXmlApplicationContext;
  9. public class ProduceTransMsgWithSpring {
  10.      public static void main(String[] args) {
  11.          /**
  12.           * 事务消息生产者Bean配置在transactionProducer.xml中,可通过ApplicationContext获取或者直接注入到其他类(比如具体的Controller)中.
  13.           * 请结合例子"发送事务消息"
  14.           */
  15.          ApplicationContext context = new ClassPathXmlApplicationContext("transactionProducer.xml");
  16.          TransactionProducer transactionProducer = (TransactionProducer) context.getBean("transactionProducer");
  17.          Message msg = new Message("XXX", "TagA", "Hello MQ transaction===".getBytes());
  18.          SendResult sendResult = transactionProducer.send(msg, new LocalTransactionExecuter() {
  19.              @Override
  20.              public TransactionStatus execute(Message msg, Object arg) {
  21.                  System.out.println("执行本地事务");
  22.                  return TransactionStatus.CommitTransaction; //根据本地事务执行结果来返回不同的TransactionStatus
  23.              }
  24.          }, null);
  25.      }
  26. }


消费者与 Spring 集成


  1. 创建 MessageListener,如下所示。 package demo;
  2. import com.aliyun.openservices.ons.api.Action;
  3. import com.aliyun.openservices.ons.api.ConsumeContext;
  4. import com.aliyun.openservices.ons.api.Message;
  5. import com.aliyun.openservices.ons.api.MessageListener;
  6. public class DemoMessageListener implements MessageListener {
  7.      public Action consume(Message message, ConsumeContext context) {
  8.          System.out.println("Receive: " + message.getMsgID());
  9.          try {
  10.              //do something..
  11.              return Action.CommitMessage;
  12.          }catch (Exception e) {
  13.              //消费失败
  14.              return Action.ReconsumeLater;
  15.          }
  16.      }
  17. }

在 consumer.xml 中定义消费者 Bean 等信息。
  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <beans xmlns="http://www.springframework.org/schema/beans"
  3.         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4.         xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
  5.      <bean id="msgListener" class="demo.DemoMessageListener"></bean> <!--Listener配置-->
  6. <!-- 多CID订阅同一个Topic,可以创建多个ConsumerBean-->
  7.      <bean id="consumer" class="com.aliyun.openservices.ons.api.bean.ConsumerBean" init-method="start" destroy-method="shutdown">
  8.          <property name="properties" > <!--消费者配置信息-->
  9.              <props>
  10.                  <prop key="ConsumerId">CID_DEMO</prop> <!--请替换XXX-->
  11.                  <prop key="AccessKey">AKDEMO</prop>
  12.                  <prop key="SecretKey">SKDEMO</prop>
  13.                  <!--将消费者线程数固定为50个
  14.                  <prop key="ConsumeThreadNums">50</prop>
  15.                  -->
  16.              </props>
  17.          </property>
  18.          <property name="subscriptionTable">
  19.              <map>
  20.                  <entry value-ref="msgListener">
  21.                      <key>
  22.                          <bean class="com.aliyun.openservices.ons.api.bean.Subscription">
  23.                              <property name="topic" value="TopicTestMQ"/>
  24.                              <property name="expression" value="*"/><!--expression即Tag,可以设置成具体的Tag,如 taga||tagb||tagc,也可设置成*。 *仅代表订阅所有Tag,不支持通配-->
  25.                          </bean>
  26.                      </key>
  27.                  </entry>
  28.                  <!--更多的订阅添加entry节点即可,如下所示-->
  29.                  <entry value-ref="msgListener">
  30.                      <key>
  31.                          <bean class="com.aliyun.openservices.ons.api.bean.Subscription">
  32.                              <property name="topic" value="TopicTestMQ-Other"/> <!--订阅另外一个Topic -->
  33.                              <property name="expression" value="taga||tagb"/> <!-- 订阅多个Tag -->
  34.                          </bean>
  35.                      </key>
  36.                  </entry>
  37.              </map>
  38.          </property>
  39.      </bean>
  40. </beans>

运行已经与 Spring 集成好的消费者,如下所示。
  1. package demo;
  2. import org.springframework.context.ApplicationContext;
  3. import org.springframework.context.support.ClassPathXmlApplicationContext;
  4. public class ConsumeWithSpring {
  5.      public static void main(String[] args) {
  6.          /**
  7.           * 消费者Bean配置在consumer.xml中,可通过ApplicationContext获取或者直接注入到其他类(比如具体的Controller)中
  8.           */
  9.          ApplicationContext context = new ClassPathXmlApplicationContext("consumer.xml");
  10.          System.out.println("Consumer Started");
  11.      }
  12. }

展开
收起
猫饭先生 2017-10-26 14:02:22 3960 0
0 条回答
写回答
取消 提交回答
问答排行榜
最热
最新

相关电子书

更多
云栖社区特邀专家徐雷Java Spring Boot开发实战系列课程(第20讲):经典面试题与阿里等名企内部招聘求职面试技巧 立即下载
微服务架构模式与原理Spring Cloud开发实战 立即下载
阿里特邀专家徐雷Java Spring Boot开发实战系列课程(第18讲):制作Java Docker镜像与推送到DockerHub和阿里云Docker仓库 立即下载