开发者社区 > 云原生 > 云消息队列 > 正文

RocketMQ5.0以上的版本给出SimpleConsumer异步订阅消息demo

RocketMQ5.0以上的版本给出SimpleConsumer异步订阅消息demo

展开
收起
嘟嘟嘟嘟嘟嘟 2024-08-28 08:12:41 110 0
2 条回答
写回答
取消 提交回答
  • 实现步骤
    步骤1:配置与初始化SimpleConsumer
    由于直接示例未给出,我们需依据RocketMQ API手册或现有文档构造一个异步消费的逻辑框架。首先,确保您的环境中已安装并配置好Apache RocketMQ客户端。接着,初始化SimpleConsumer实例,配置必要的参数,比如Namesrv地址、消费组名称等。
    Properties consumerProps = new Properties();
    consumerProps.put(PropertyKeyConst.NAMESRV_ADDR, "");
    consumerProps.put(PropertyKeyConst.GROUP_ID, "");
    SimpleConsumer consumer = new SimpleConsumer(consumerProps);
    步骤2:订阅主题与Tag
    接下来,使用SimpleConsumer订阅您感兴趣的特定主题及Tag(如果需要)。
    consumer.subscribe("", "*");
    步骤3:实现消息处理逻辑
    不同于PushConsumer自动推送消息,SimpleConsumer需要您主动调用接收方法来获取消息。因此,您需要实现一个循环或定时任务来不断检查是否有新消息到达,并处理这些消息。
    while (true) {
    List messageViewList = consumer.receive(10, Duration.ofSeconds(30));
    for (MessageView messageView : messageViewList) {
    System.out.println("Received message: " + new String(messageView.getBody()));
    // 您的业务逻辑处理
    // 消费成功后确认
    consumer.ack(messageView);
    }
    }
    此回答整理自钉群“群2-Apache RocketMQ 中国开发者钉钉群”

    2024-08-28 10:48:43
    赞同 46 展开评论 打赏
  • 使用阿里云RocketMQ 5.x的Java客户端SimpleConsumer异步消费Topic消息的简单示例:

    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
    import org.apache.rocketmq.spring.core.RocketMQTemplate;
    import org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer;
    import org.apache.rocketmq.spring.support.RocketMQMessageListener;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class SimpleConsumerConfig {
    
        @Bean
        public DefaultRocketMQListenerContainer simpleConsumer(RocketMQTemplate rocketMQTemplate) {
            DefaultRocketMQListenerContainer container = new DefaultRocketMQListenerContainer();
            container.setRocketMQTemplate(rocketMQTemplate);
            container.setConsumerGroup("Your ConsumerGroup");
            container.setTopics("Your Topic");
            container.setMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageView> msgs, ConsumeConcurrentlyContext context) {
                    // 消费消息的逻辑
                    for (MessageView msg : msgs) {
                        System.out.println("Received message: " + msg);
                    }
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            container.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
            return container;
        }
    }
    

    确保替换Your ConsumerGroup和Your Topic为实际的消费者组和主题名。此代码片段展示了如何设置一个简单的异步消息监听器,消息会并发地被处理。更多信息可参考阿里云RocketMQ文档

    2024-08-28 10:32:12
    赞同 47 展开评论 打赏

涵盖 RocketMQ、Kafka、RabbitMQ、MQTT、轻量消息队列(原MNS) 的消息队列产品体系,全系产品 Serverless 化。RocketMQ 一站式学习:https://rocketmq.io/

相关产品

  • 云消息队列 MQ
  • 相关电子书

    更多
    RocketMQ Client-GO 介绍 立即下载
    RocketMQ Prometheus Exporter 打造定制化 DevOps 平台 立即下载
    基于 RocketMQ Prometheus Exporter 打造定制化 DevOps 平台 立即下载