nodejs这个除了使用setInterval类似的方法持续消费消息RocketMQ还有其它方法吗?
```async function main() {
const simpleConsumer = new SimpleConsumer({
consumerGroup: 'nodejs-demo-group',
endpoints: '127.0.0.1:8081',
subscriptions: new Map().set('TopicTest', 'nodejs-demo'),
});
await simpleConsumer.startup();
const messages = await simpleConsumer.receive(20);
console.log('got %d messages', messages.length);
for (const message of messages) {
console.log(message);
console.log('body=%o', message.body.toString());
await simpleConsumer.ack(message);
}
await simpleConsumer.shutdown();
}
main();
```
除了使用setInterval类似的方法持续消费消息RocketMQ,还可以使用事件驱动的方式。在Node.js中,可以使用EventEmitter类来实现事件驱动的消费者。以下是一个简单的示例:
const EventEmitter = require('events');
class SimpleConsumer extends EventEmitter {
constructor(options) {
super();
this.consumerGroup = options.consumerGroup;
this.endpoints = options.endpoints;
this.subscriptions = options.subscriptions;
}
async startup() {
// 启动消费者的代码
}
async receive(maxMessages) {
// 接收消息的代码
}
async ack(message) {
// 确认消息的代码
}
async shutdown() {
// 关闭消费者的代码
}
}
async function main() {
const simpleConsumer = new SimpleConsumer({
consumerGroup: 'nodejs-demo-group',
endpoints: '127.0.0.1:8081',
subscriptions: new Map().set('TopicTest', 'nodejs-demo'),
});
simpleConsumer.on('message', async (message) => {
console.log('got message');
console.log('body=%o', message.body.toString());
await simpleConsumer.ack(message);
});
await simpleConsumer.startup();
// 在这里添加一个循环,以便持续监听事件
while (true) {
// 等待事件触发
await new Promise((resolve) => setTimeout(resolve, 1000));
}
}
main();
在这个示例中,我们创建了一个SimpleConsumer类,它继承自EventEmitter类。我们在SimpleConsumer类中定义了startup、receive、ack和shutdown方法,并在main函数中实例化了这个类。然后,我们使用simpleConsumer.on('message', callback)
来监听消息事件,并在回调函数中处理消息。最后,我们使用一个无限循环来持续监听事件。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
涵盖 RocketMQ、Kafka、RabbitMQ、MQTT、轻量消息队列(原MNS) 的消息队列产品体系,全系产品 Serverless 化。RocketMQ 一站式学习:https://rocketmq.io/