RocketMQ SimpleConsumer异步订阅消息demo
根据您的问题,您希望了解如何使用RocketMQ的SimpleConsumer实现异步订阅消息的示例代码。虽然提供的知识内容没有直接包含异步订阅的Demo代码,但我们可以基于RocketMQ的API文档和常规实践来构建一个简单的异步订阅消息的示例。下面是一个简化的Java示例代码,展示如何使用SimpleConsumer(更准确地说,应使用org.apache.rocketmq.client.consumer.DefaultMQPushConsumer,因为SimpleConsumer更多指的是低级API,而异步订阅通常推荐使用PushConsumer模式)实现异步消费消息的功能。
异步订阅消息的示例代码
首先,确保您已添加了Apache RocketMQ客户端依赖到您的项目中。如果您使用Maven,可以在pom.xml中添加如下依赖:
org.apache.rocketmq
rocketmq-client
4.9.3
接下来是异步消费消息的Java代码示例:
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class AsyncConsumerExample {
public static void main(String[] args) throws Exception {
// 实例化消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("your_consumer_group");
// 设置NameServer地址
consumer.setNamesrvAddr("your_nameserver_addr");
// 订阅主题和Tag
consumer.subscribe("your_topic", "*");
// 设置消费模式,如从第一条开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// 注册消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) {
// 打印每个消息的属性和内容
for (MessageExt msg : msgs) {
System.out.printf("Received message: %s %n", new String(msg.getBody()));
}
// 返回消费状态,这里简单示例为成功消费
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者
consumer.start();
System.out.println("Async Consumer Started.");
}
}
此回答整理自钉群“群2-Apache RocketMQ 中国开发者钉钉群”
import com.aliyun.openservices.ons.api.*;
import com.aliyun.openservices.ons.api.consumer.MessageListener;
public class SimpleConsumerDemo {
public static void main(String[] args) {
//消费示例:使用SimpleConsumer消费普通消息,主动获取消息处理并提交。
ClientServiceProvider provider = ClientServiceProvider.loadService();
String topic = "Your Topic";
FilterExpression filterExpression = new FilterExpression("Your Filter Tag", FilterExpressionType.TAG);
SimpleConsumer simpleConsumer = provider.newSimpleConsumerBuilder()
//设置消费者分组。
.setConsumerGroup("Your ConsumerGroup")
//设置接入点。
.setClientConfiguration(ClientConfiguration.newBuilder().setEndpoints("Your Endpoint").build())
//设置预绑定的订阅关系。
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
.build();
List<MessageView> messageViewList = null;
try {
//SimpleConsumer需要主动获取消息,并处理。
messageViewList = simpleConsumer.receive(10, Duration.ofSeconds(30));
messageViewList.forEach(messageView -> {
System.out.println(messageView);
//消费处理完成后,需要主动调用ACK提交消费结果。
try {
simpleConsumer.ack(messageView);
} catch (ClientException e) {
e.printStackTrace();
}
});
} catch (ClientException e) {
//如果遇到系统流控等原因造成拉取失败,需要重新发起获取消息请求。
e.printStackTrace();
}
}
}
SimpleConsumer
SimpleConsumer是一种接口原子型的消费者类型,消息的获取、消费状态提交以及消费重试都是通过消费者业务逻辑主动发起调用完成。
使用方式
SimpleConsumer的使用涉及多个接口调用,由业务逻辑按需调用接口获取消息,然后分发给业务线程处理消息,最后按照处理的结果调用提交接口,返回服务端当前消息的处理结果。示例如下:
ClientServiceProvider provider = ClientServiceProvider.loadService();
String topic = "Your Topic";
FilterExpression filterExpression = new FilterExpression("Your Filter Tag", FilterExpressionType.TAG);
SimpleConsumer simpleConsumer = provider.newSimpleConsumerBuilder()
//设置消费者分组。
.setConsumerGroup("Your ConsumerGroup")
//设置接入点。
.setClientConfiguration(ClientConfiguration.newBuilder().setEndpoints("Your Endpoint").build())
//设置预绑定的订阅关系。
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
.build();
List<MessageView> messageViewList = null;
try {
//SimpleConsumer需要主动获取消息,并处理。
messageViewList = simpleConsumer.receive(10, Duration.ofSeconds(30));
messageViewList.forEach(messageView -> {
System.out.println(messageView);
//消费处理完成后,需要主动调用ACK提交消费结果。
try {
simpleConsumer.ack(messageView);
} catch (ClientException e) {
e.printStackTrace();
}
});
} catch (ClientException e) {
//如果遇到系统流控等原因造成拉取失败,需要重新发起获取消息请求。
e.printStackTrace();
}
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
涵盖 RocketMQ、Kafka、RabbitMQ、MQTT、轻量消息队列(原MNS) 的消息队列产品体系,全系产品 Serverless 化。RocketMQ 一站式学习:https://rocketmq.io/