RocketMQ5.0以上的版本给出SimpleConsumer异步订阅消息demo
实现步骤
步骤1:配置与初始化SimpleConsumer
由于直接示例未给出,我们需依据RocketMQ API手册或现有文档构造一个异步消费的逻辑框架。首先,确保您的环境中已安装并配置好Apache RocketMQ客户端。接着,初始化SimpleConsumer实例,配置必要的参数,比如Namesrv地址、消费组名称等。
Properties consumerProps = new Properties();
consumerProps.put(PropertyKeyConst.NAMESRV_ADDR, "");
consumerProps.put(PropertyKeyConst.GROUP_ID, "");
SimpleConsumer consumer = new SimpleConsumer(consumerProps);
步骤2:订阅主题与Tag
接下来,使用SimpleConsumer订阅您感兴趣的特定主题及Tag(如果需要)。
consumer.subscribe("", "*");
步骤3:实现消息处理逻辑
不同于PushConsumer自动推送消息,SimpleConsumer需要您主动调用接收方法来获取消息。因此,您需要实现一个循环或定时任务来不断检查是否有新消息到达,并处理这些消息。
while (true) {
List messageViewList = consumer.receive(10, Duration.ofSeconds(30));
for (MessageView messageView : messageViewList) {
System.out.println("Received message: " + new String(messageView.getBody()));
// 您的业务逻辑处理
// 消费成功后确认
consumer.ack(messageView);
}
}
此回答整理自钉群“群2-Apache RocketMQ 中国开发者钉钉群”
使用阿里云RocketMQ 5.x的Java客户端SimpleConsumer异步消费Topic消息的简单示例:
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer;
import org.apache.rocketmq.spring.support.RocketMQMessageListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class SimpleConsumerConfig {
@Bean
public DefaultRocketMQListenerContainer simpleConsumer(RocketMQTemplate rocketMQTemplate) {
DefaultRocketMQListenerContainer container = new DefaultRocketMQListenerContainer();
container.setRocketMQTemplate(rocketMQTemplate);
container.setConsumerGroup("Your ConsumerGroup");
container.setTopics("Your Topic");
container.setMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageView> msgs, ConsumeConcurrentlyContext context) {
// 消费消息的逻辑
for (MessageView msg : msgs) {
System.out.println("Received message: " + msg);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
container.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
return container;
}
}
确保替换Your ConsumerGroup和Your Topic为实际的消费者组和主题名。此代码片段展示了如何设置一个简单的异步消息监听器,消息会并发地被处理。更多信息可参考阿里云RocketMQ文档。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
涵盖 RocketMQ、Kafka、RabbitMQ、MQTT、轻量消息队列(原MNS) 的消息队列产品体系,全系产品 Serverless 化。RocketMQ 一站式学习:https://rocketmq.io/