开发者社区 问答 正文

一个maven项目中,spring项目,想配置多个producer,怎么配置呢

已解决

展开
收起
2018-05-30 07:38:06 1669 分享 版权
1 条回答
写回答
取消 提交回答
  • 用户已注销
    采纳回答

    详细解答可以参考官方帮助文档

    本文介绍如何在 Spring 框架下用 MQ 收发消息。 主要包括三部分内容:普通消息生产者和 Spring 集成,事务消息生产者和 Spring 集成,消息消费者和 Spring 集成。

    请确保同一个 Consumer ID 下所有 Consumer 实例的订阅关系保持一致,具体请参考订阅关系一致文档。

    Spring 框架下支持的配置参数和 TCP Java 一致,具体可以参考Java SDK 使用说明

    生产者与 Spring 集成

    1. 在 producer.xml 中定义生产者 Bean 等信息。

      1. <?xml version="1.0" encoding="UTF-8"?>
      2. <beans xmlns="http://www.springframework.org/schema/beans"
      3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      4. xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
      5. <bean id="producer" class="com.aliyun.openservices.ons.api.bean.ProducerBean" init-method="start" destroy-method="shutdown">
      6. <!-- Spring 接入方式支持 Java SDK 支持的所有配置项 -->
      7. <property name="properties" > <!--生产者配置信息-->
      8. <props>
      9. <prop key="ProducerId">PID_DEMO</prop> <!--请替换 XXX-->
      10. <prop key="AccessKey">XXX</prop>
      11. <prop key="SecretKey">XXX</prop>
      12. </props>
      13. </property>
      14. </bean>
      15. </beans>
    2. 通过已经与 Spring 集成好的生产者生产消息。

      1. package demo;
      2. import com.aliyun.openservices.ons.api.Message;
      3. import com.aliyun.openservices.ons.api.Producer;
      4. import com.aliyun.openservices.ons.api.SendResult;
      5. import com.aliyun.openservices.ons.api.exception.ONSClientException;
      6. import org.springframework.context.ApplicationContext;
      7. import org.springframework.context.support.ClassPathXmlApplicationContext;
      8. public class ProduceWithSpring {
      9. public static void main(String[] args) {
      10. /**
      11. * 生产者 Bean 配置在 producer.xml 中,可通过 ApplicationContext 获取或者直接注入到其他类(比如具体的 Controller)中
      12. */
      13. ApplicationContext context = new ClassPathXmlApplicationContext("producer.xml");
      14. Producer producer = (Producer) context.getBean("producer");
      15. //循环发送消息
      16. for (int i = 0; i < 100; i++) {
      17. Message msg = new Message( //
      18. // Message 所属的 Topic
      19. "TopicTestMQ",
      20. // Message Tag 可理解为 Gmail 中的标签,对消息进行再归类,方便 Consumer 指定过滤条件在 MQ 服务器过滤
      21. "TagA",
      22. // Message Body 可以是任何二进制形式的数据, MQ 不做任何干预
      23. // 需要 Producer 与 Consumer 协商好一致的序列化和反序列化方式
      24. "Hello MQ".getBytes());
      25. // 设置代表消息的业务关键属性,请尽可能全局唯一
      26. // 以方便您在无法正常收到消息情况下,可通过 MQ 控制台查询消息并补发
      27. // 注意:不设置也不会影响消息正常收发
      28. msg.setKey("ORDERID_100");
      29. // 发送消息,只要不抛异常就是成功
      30. try {
      31. SendResult sendResult = producer.send(msg);
      32. assert sendResult != null;
      33. System.out.println("send success: " + sendResult.getMessageId());
      34. }catch (ONSClientException e) {
      35. System.out.println("发送失败");
      36. }
      37. }
      38. }
      39. }

    事务消息生产者与 Spring 集成

    有关事务消息的概念,请参见收发事务消息

    1. 首先需要实现一个 LocalTransactionChecker,如下所示。 一个消息生产者只能有一个 LocalTransactionChecker。

      1. package demo;
      2. import com.aliyun.openservices.ons.api.Message;
      3. import com.aliyun.openservices.ons.api.transaction.LocalTransactionChecker;
      4. import com.aliyun.openservices.ons.api.transaction.TransactionStatus;
      5. public class DemoLocalTransactionChecker implements LocalTransactionChecker {
      6. public TransactionStatus check(Message msg) {
      7. System.out.println("开始回查本地事务状态");
      8. return TransactionStatus.CommitTransaction; //根据本地事务状态检查结果返回不同的 TransactionStatus
      9. }
      10. }
    2. 在 transactionProducer.xml 中定义事务消息生产者 Bean 等信息。

      1. <?xml version="1.0" encoding="UTF-8"?>
      2. <beans xmlns="http://www.springframework.org/schema/beans"
      3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      4. xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
      5. <bean id="localTransactionChecker" class="demo.DemoLocalTransactionChecker"></bean>
      6. <bean id="transactionProducer" class="com.aliyun.openservices.ons.api.bean.TransactionProducerBean" init-method="start" destroy-method="shutdown">
      7. <property name="properties" > <!--事务消息生产者配置信息-->
      8. <props>
      9. <prop key="ProducerId">PID_DEMO</prop> <!--请替换 XXX-->
      10. <prop key="AccessKey">AKDEMO</prop>
      11. <prop key="SecretKey">SKDEMO</prop>
      12. </props>
      13. </property>
      14. <property name="localTransactionChecker" ref="localTransactionChecker"></property>
      15. </bean>
      16. </beans>
    3. 通过已经与 Spring 集成好的生产者生产事务消息。

      1. package demo;
      2. import com.aliyun.openservices.ons.api.Message;
      3. import com.aliyun.openservices.ons.api.SendResult;
      4. import com.aliyun.openservices.ons.api.transaction.LocalTransactionExecuter;
      5. import com.aliyun.openservices.ons.api.transaction.TransactionProducer;
      6. import com.aliyun.openservices.ons.api.transaction.TransactionStatus;
      7. import org.springframework.context.ApplicationContext;
      8. import org.springframework.context.support.ClassPathXmlApplicationContext;
      9. public class ProduceTransMsgWithSpring {
      10. public static void main(String[] args) {
      11. /**
      12. * 事务消息生产者 Bean 配置在 transactionProducer.xml 中,可通过 ApplicationContext 获取或者直接注入到其他类(比如具体的 Controller)中.
      13. * 请结合例子"发送事务消息"
      14. */
      15. ApplicationContext context = new ClassPathXmlApplicationContext("transactionProducer.xml");
      16. TransactionProducer transactionProducer = (TransactionProducer) context.getBean("transactionProducer");
      17. Message msg = new Message("XXX", "TagA", "Hello MQ transaction===".getBytes());
      18. SendResult sendResult = transactionProducer.send(msg, new LocalTransactionExecuter() {
      19. @Override
      20. public TransactionStatus execute(Message msg, Object arg) {
      21. System.out.println("执行本地事务");
      22. return TransactionStatus.CommitTransaction; //根据本地事务执行结果来返回不同的 TransactionStatus
      23. }
      24. }, null);
      25. }
      26. }

    消费者与 Spring 集成

    1. 创建 MessageListener,如下所示。

      1. package demo;
      2. import com.aliyun.openservices.ons.api.Action;
      3. import com.aliyun.openservices.ons.api.ConsumeContext;
      4. import com.aliyun.openservices.ons.api.Message;
      5. import com.aliyun.openservices.ons.api.MessageListener;
      6. public class DemoMessageListener implements MessageListener {
      7. public Action consume(Message message, ConsumeContext context) {
      8. System.out.println("Receive: " + message.getMsgID());
      9. try {
      10. //do something..
      11. return Action.CommitMessage;
      12. }catch (Exception e) {
      13. //消费失败
      14. return Action.ReconsumeLater;
      15. }
      16. }
      17. }
    2. 在 consumer.xml 中定义消费者 Bean 等信息。

      1. <?xml version="1.0" encoding="UTF-8"?>
      2. <beans xmlns="http://www.springframework.org/schema/beans"
      3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      4. xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
      5. <bean id="msgListener" class="demo.DemoMessageListener"></bean> <!--Listener 配置-->
      6. <!-- 多 CID 订阅同一个 Topic,可以创建多个 ConsumerBean-->
      7. <bean id="consumer" class="com.aliyun.openservices.ons.api.bean.ConsumerBean" init-method="start" destroy-method="shutdown">
      8. <property name="properties" > <!--消费者配置信息-->
      9. <props>
      10. <prop key="ConsumerId">CID_DEMO</prop> <!--请替换 XXX-->
      11. <prop key="AccessKey">AKDEMO</prop>
      12. <prop key="SecretKey">SKDEMO</prop>
      13. <!--将消费者线程数固定为50个
      14. <prop key="ConsumeThreadNums">50</prop>
      15. -->
      16. </props>
      17. </property>
      18. <property name="subscriptionTable">
      19. <map>
      20. <entry value-ref="msgListener">
      21. <key>
      22. <bean class="com.aliyun.openservices.ons.api.bean.Subscription">
      23. <property name="topic" value="TopicTestMQ"/>
      24. <property name="expression" value="*"/><!--expression 即 Tag,可以设置成具体的 Tag,如 taga||tagb||tagc,也可设置成*。 *仅代表订阅所有 Tag,不支持通配-->
      25. </bean>
      26. </key>
      27. </entry>
      28. <!--更多的订阅添加 entry 节点即可,如下所示-->
      29. <entry value-ref="msgListener">
      30. <key>
      31. <bean class="com.aliyun.openservices.ons.api.bean.Subscription">
      32. <property name="topic" value="TopicTestMQ-Other"/> <!--订阅另外一个 Topic -->
      33. <property name="expression" value="taga||tagb"/> <!-- 订阅多个 Tag -->
      34. </bean>
      35. </key>
      36. </entry>
      37. </map>
      38. </property>
      39. </bean>
      40. </beans>
    3. 运行已经与 Spring 集成好的消费者,如下所示。

      1. package demo;
      2. import org.springframework.context.ApplicationContext;
      3. import org.springframework.context.support.ClassPathXmlApplicationContext;
      4. public class ConsumeWithSpring {
      5. public static void main(String[] args) {
      6. /**
      7. * 消费者 Bean 配置在 consumer.xml 中,可通过 ApplicationContext 获取或者直接注入到其他类(比如具体的 Controller)中
      8. */
      9. ApplicationContext context = new ClassPathXmlApplicationContext("consumer.xml");
      10. System.out.println("Consumer Started");
      11. }
      12. }
    2018-06-02 03:43:49
    赞同 展开评论