1. 延时等级
延时时长不支持随意时长的延迟,是通过特定的延迟等级来指定的,默认变量有1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h,分别对应1~18等级。例如等级为3,对应于10s
如果需要自己定义延时等级,需要在broker加载的配置文件中配置messageDelayLevel
messageDelayLevel = 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 1d
2. 延时消息实现原理
修改消息
- 修改消息的Topic为SCHEDULE_TOPIC_XXXX
- 根据延时等级,在consumequeue目录中SCHEDULE_TOPIC_XXXX主题下创建出相应的queueId目录与consumequeue文件
- 修改消息索引单元内容。索引单元中的Message Tag HashCode部分原本存放的是消息的Tag的Hash值。现修改为消息的投递时间。投递时间是指该消息被重新修改为原Topic后再次被写入到commitlog中的时间。投递时间 = 消息存储时间 + 延时等级时间。消息存储时间指的是消息被发送到Broker时的时间戳
- 将消息索引写入到SCHEDULE_TOPIC_XXXX主题下相应的consumequeue中
投递延时消息
Broker内部有⼀个延迟消息服务类ScheuleMessageService,其会消费SCHEDULE_TOPIC_XXXX中的消息,即按照每条消息的投递时间,将延时消息投递到⽬标Topic中。不过,在投递之前会从commitlog中将原来写入的消息再次读出,并将其原来的延时等级设置为0,即原消息变为了一条不延迟的普通消息。然后再次将消息投递到目标Topic中。
将消息重新写入commitlog
延迟消息服务类ScheuleMessageService将延迟消息再次发送给了commitlog,并再次形成新的消息索引条目,分发到相应Queue。
3. 生产者业务接口
public interface ScheduleMessageService {
/**
* 发送同步定时消息
* @param id
* @param message 消息内容
* @param timeout 过期时间(毫秒)
* @param delayLevel 延时级别为(1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h)其下标,从1开始
*/
void sendSyncScheduleMessage(String id, String message, long timeout, int delayLevel);
}
4. 生产者业务接口实现类
@Service
public class ScheduleMessageServiceImpl implements ScheduleMessageService {
@Autowired
private RocketMQTemplate rocketMQTemplate;
private static final Logger logger = LoggerFactory.getLogger(ScheduleMessageServiceImpl.class);
@Override
public void sendSyncScheduleMessage(String id, String message, long timeout, int delayLevel) {
Message<String> strMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, id).build();
SendResult result = rocketMQTemplate.syncSend("schedule-message-topic:sync-tags", strMessage, timeout, delayLevel);
if (result.getSendStatus() == SendStatus.SEND_OK) {
logger.info("发送同步定时消息成功!消息ID为:{},当前时间为:{}", result.getMsgId(), LocalDateTime.now());
} else {
logger.info("发送同步定时消息失败!消息ID为:{}", result.getMsgId());
}
}
}
5. 消费者类
@Component
@RocketMQMessageListener(topic = "schedule-message-topic", consumerGroup = "schedule-consumer-group")
public class ScheduleMessageListener implements RocketMQListener<String> {
private static final Logger logger = LoggerFactory.getLogger(ScheduleMessageListener.class);
@Override
public void onMessage(String message) {
logger.info("接收到定时消息:{},当前时间为:{}", message, LocalDateTime.now());
}
}
5.6. 测试
@Test
void scheduleMessage() {
String uuid = UUID.randomUUID().toString();
scheduleMessageService.sendSyncScheduleMessage(uuid, "hello" + uuid, 100,3);
}