问题1:Apache大佬们,有没有基于RocketMQTemplate实现同步调用的例子:生产端发消息,消费端同步返回? 问题2:跑起来报这个错:CODE: 10007 DESC: create reply message fail, requestMessage error, property[CLUSTER] is null是不是版本问题,我安装4.3.2? 确认是版本问题。改到5.0.0就OK了
问题1:基于RocketMQTemplate实现生产端发消息,消费端同步返回的例子:
生产端代码示例:
@Autowired
private RocketMQTemplate rocketMQTemplate;
public String sendMessageSync(String topic, String message) {
Message<String> msg = MessageBuilder.withPayload(message).build();
SendResult sendResult = rocketMQTemplate.syncSend(topic, msg);
return sendResult.getSendStatus().toString();
}
消费端代码示例:
@RocketMQMessageListener(topic = "your-topic", consumerGroup = "your-consumer-group")
public class MyConsumer implements RocketMQListener<MessageEntity> {
@Override
public void onMessage(MessageEntity message) {
// 消费处理逻辑
// 同步返回结果可以通过返回值或者回调方式实现
}
}
在生产端,使用RocketMQTemplate的syncSend
方法同步发送消息,并通过SendResult
获取发送结果。
在消费端,通过实现RocketMQListener接口的onMessage
方法来处理接收到的消息。可以在该方法中编写消费逻辑,并根据需要返回结果。
问题2:报错"CODE: 10007 DESC: create reply message fail, requestMessage error, property[CLUSTER] is null"
这个错误通常是由于版本不兼容导致的。在RocketMQ 4.3.2版本中,该错误可能会出现。你已经确认将版本升级到5.0.0后问题解决了,这表明确实是版本问题。
建议在使用RocketMQ时,尽量使用最新稳定版本,以获得更好的稳定性和功能支持。在升级版本之前,可以查看RocketMQ的官方文档或升级指南,了解新版本的变更和注意事项。
在 Apache RocketMQ 中,可以使用 RocketMQTemplate
实现同步发送消息和消费消息的功能。下面是一个基本的示例代码:
// 生产者端发送消息
@Service
public class ProducerService {
@Autowired
private RocketMQTemplate rocketMQTemplate;
public String sendSyncMessage(String topic, String message) {
// 同步发送消息
SendResult sendResult = rocketMQTemplate.syncSend(topic, message);
// 返回消息 ID
return sendResult.getMsgId();
}
}
// 消费者端消费消息
@Component
@RocketMQMessageListener(consumerGroup = "consumer_group", topic = "test_topic")
public class ConsumerService implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
// 处理消息
System.out.println("Received message: " + message);
}
}
在上述示例中,生产者通过 RocketMQTemplate
的 syncSend
方法同步发送消息到指定 Topic,同时返回消息 ID。消费者通过实现 RocketMQListener
接口并定义消费者 Group 和 Topic,来监听并处理消息。
需要注意的是,在使用 RocketMQTemplate
进行同步调用时,建议合理设置超时时间和重试机制,以避免长时间等待或者因网络等原因导致的异常情况。另外,在消费者处理消息时也需要注意消息确认和消费过滤等问题,以提高系统稳定性和可靠性。
问题1:https://github.com/apache/rocketmq-spring/wiki/%E8%AF%B7%E6%B1%82-%E5%BA%94%E7%AD%94%E8%AF%AD%E4%B9%89%E6%94%AF%E6%8C%81 问题2:这里示例应该写错了吧 ,此回答整理自钉群“群1-Apache RocketMQ 中国开发者钉钉群”
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
涵盖 RocketMQ、Kafka、RabbitMQ、MQTT、轻量消息队列(原MNS) 的消息队列产品体系,全系产品 Serverless 化。RocketMQ 一站式学习:https://rocketmq.io/