@Slf4j
@RestController
@RequestMapping("/testRocketMq")
public class TestRocketMqController {
private static final String ADDR = "xxxx:9876";
private static final String ACCESS_KEY = "xxx";
private static final String SECRET_KEY = "xxx";
private static final String INSTANCE_ID = "xxx";
private static final String TOPIC = "xx";
private static final String GROUP = "xdx";
@GetMapping("/test")
public Result<?> test() {
Properties properties = new Properties();
properties.put(PropertyKeyConst.AccessKey,ACCESS_KEY);
properties.put(PropertyKeyConst.SecretKey, SECRET_KEY);
properties.put(PropertyKeyConst.GROUP_ID, GROUP);
properties.put(PropertyKeyConst.INSTANCE_ID, INSTANCE_ID);
// 设置发送超时时间,单位:毫秒。
properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
// 设置为您从消息队列RocketMQ版控制台获取的接入点,类似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
// 注意!!!直接填写控制台提供的域名和端口即可,请勿添加http://或https://前缀标识,也不要用IP解析地址。
properties.put(PropertyKeyConst.NAMESRV_ADDR, ADDR);
Producer producer = ONSFactory.createProducer(properties);
// 在发送消息前,必须调用start方法来启动Producer,只需调用一次即可。
producer.start();
// 循环发送消息。
Message msg = new Message(TOPIC, "*", "HELLO EFREIGHT NOW.....".getBytes());
try {
SendResult sendResult = producer.send(msg);
// 同步发送消息,只要不抛异常就是成功。
if (sendResult != null) {
log.info("Send mq message success. Topic is: {}, msgId is : {}", msg.getTopic(), sendResult.getMessageId());
}
}
catch (Exception e) {
log.error(">>>>>>>>>>>>>>>>>>>>发送消息失败", e);
}
// 在应用退出前,销毁Producer对象。
// 注意:销毁Producer对象可以节约系统内存,若您需要频繁发送消息,则无需销毁Producer对象。
producer.shutdown();
return Result.ok();
}
}
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
涵盖 RocketMQ、Kafka、RabbitMQ、MQTT、轻量消息队列(原MNS) 的消息队列产品体系,全系列产品 Serverless 化。RocketMQ 中文社区:https://rocketmq-learning.com/