消息发送分类
1、同步发送消息
生产者发出一条消息后,会在收到MQ返回的ACK之后才发下一条消息。此类消息可靠性最高,但消息发送效率低
2、异步发送消息
生产者发出消息后无需等待MQ返回ACK,直接发送下一条消息。此类消息可靠性可以得到保障,消息发送效率也可
3、单向发送消息
生产者仅负责发送消息,不等待、不处理MQ的ACK,此类方式MQ也不返回ACK,消息发送效率最高,但消息可靠性较差
配置信息
生产者
rocketmq:
name-server: xx.xx.xx.xx:9876
producer:
group: simple-producer-group
send-message-timeout: 3000 #发送超时时间毫秒 默认3000
retry-times-when-send-failed: 2 #同步发送失败时重试次数 默认2
消费者
rocketmq:
name-server: xx.xx.xx.xx:9876
consumer:
group: simple-consumer-group
生产者业务接口
public interface SimpleMessageService {
/**
* 发送消息
* @param message
*/
void sendMessage(String message);
/**
* 发送同步消息
* @param id
* @param message
*/
void sendSyncMessage(String id, String message);
/**
* 发送异步消息
* @param id
* @param message
*/
void sendAsyncMessage(String id, String message);
/**
* 发送单向消息
* @param id
* @param message
*/
void sendOnewayMessage(String id, String message);
}
生产者业务接口实现类
@Service
public class SimpleMessageServiceImpl implements SimpleMessageService {
@Autowired
private RocketMQTemplate rocketMQTemplate;
private static final Logger logger = LoggerFactory.getLogger(SimpleMessageServiceImpl.class);
@Override
public void sendMessage(String message) {
rocketMQTemplate.convertAndSend("simple-message-topic", message);
logger.info("发生消息成功!");
}
@Override
public void sendSyncMessage(String id, String message) {
Message<String> strMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, id).build();
SendResult result = rocketMQTemplate.syncSend("simple-message-topic:sync-tags", strMessage);
logger.info("发送简单同步消息成功!返回信息为:{}", JSON.toJSONString(result));
}
@Override
public void sendAsyncMessage(String id, String message) {
Message<String> strMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, id).build();
rocketMQTemplate.asyncSend("simple-message-topic:async-tags", strMessage, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
logger.info("发送简单异步消息成功!返回信息为:{}", JSON.toJSONString(sendResult));
}
}
@Override
public void onException(Throwable throwable) {
logger.error("发送简单异步消息失败!异常信息为:{}", throwable.getMessage());
}
});
}
@Override
public void sendOnewayMessage(String id, String message) {
Message<String> strMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, id).build();
rocketMQTemplate.sendOneWay("simple-message-topic:oneway-tags", strMessage);
}
}
消费者类
@Component
@RocketMQMessageListener(topic = "simple-message-topic", consumerGroup = "${rocketmq.consumer.group}")
public class SimpleMessageListener implements RocketMQListener<String> {
private static final Logger logger = LoggerFactory.getLogger(SimpleMessageListener.class);
@Override
public void onMessage(String message) {
logger.info("接收到消息:{}", message);
}
}
测试
//测试同步发送消息
@Test
void simpleMessage() {
String uuid = UUID.randomUUID().toString();
simpleMessageService.sendSyncMessage(uuid, "hello world");
}
//测试异步发送消息
@Test
void simpleMessage() {
String uuid = UUID.randomUUID().toString();
simpleMessageService.sendAsyncMessage(uuid, "hello world");
Thread.sleep(5000);
}