接上篇:https://developer.aliyun.com/article/1622489?spm=a2c6h.13148508.setting.18.49764f0e90XaKU
KafkaService
package icu.wzk.service; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.util.concurrent.ExecutionException; @Service public class KafkaService { @Resource private KafkaTemplate<String, String> kafkaTemplate; public String sendMessage(ProducerRecord<String, String> record) throws ExecutionException, InterruptedException { SendResult<String, String> result = kafkaTemplate.send(record).get(); RecordMetadata metadata = result.getRecordMetadata(); String returnResult = metadata.topic() + "\t" + metadata.partition() + "\t" + metadata.offset(); System.out.println("发送消息: " + returnResult); return returnResult; } }
ConsumerService
package icu.wzk.service; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; import javax.annotation.Resource; @Component public class ConsumerListener { @Resource private KafkaRetryService kafkaRetryService; private static int index = 0; @KafkaListener(topics = "tp_demo_retry_01", groupId = "wzkicu") public void consumer(ConsumerRecord<String, String> record) { try { // 业务处理 System.out.println("消费的消息: " + record); index ++; if (index % 2 == 0) { throw new Exception("重发消息"); } } catch (Exception e) { // 消息重试 kafkaRetryService.consumerLater(record); } } }
KafkaRetryService
package icu.wzk.service; import com.alibaba.fastjson.JSON; import icu.wzk.model.RetryRecord; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.header.Header; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.nio.ByteBuffer; import java.util.Calendar; import java.util.Date; @Service public class KafkaRetryService { /** * 消费失败后下一次消息的延迟时间(秒) */ private static final int[] RETRY_INTERVAL_SECONDS = { 10, 30, 1 * 60, 2 * 60, 5 * 60, 10 * 60, 30 * 60, 1* 60 * 60, 2 * 60 * 60 }; private static final String RETRY_TOPIC = "tp_demo_retry_02"; @Resource private KafkaTemplate<String, String> kafkaTemplate; public void consumerLater(ConsumerRecord<String, String> record) { // 获取消息的已重试次数 int retryTimes = getRetryTimes(record); Date nextConsumerTime = getNextConsumerTime(retryTimes); if (null == nextConsumerTime) { return; } // 组织消息 RetryRecord retryRecord = new RetryRecord(); retryRecord.setNextTime(nextConsumerTime.getTime()); retryRecord.setTopic(record.topic()); retryRecord.setRetryTimes(retryTimes); retryRecord.setKey(record.key()); retryRecord.setValue(record.value()); // 转换字符串 String value = JSON.toJSONString(retryRecord); // 发到重试队列 kafkaTemplate.send(RETRY_TOPIC, null, value); } /** * 获取消息已经重试的次数 */ private int getRetryTimes(ConsumerRecord record) { int retryTimes = -1; for (Header header :record.headers()) { if (RetryRecord.KEY_RETRY_TIMES.equals(header.key())) { ByteBuffer byteBuffer = ByteBuffer.wrap(header.value()); retryTimes = byteBuffer.getInt(); } } retryTimes ++; return retryTimes; } /** * 获取等待重试的下一次消息时间 */ private Date getNextConsumerTime(int retryTimes) { // 重试次数超过上限 不再重试 if (RETRY_INTERVAL_SECONDS.length < retryTimes) { return null; } Calendar calendar = Calendar.getInstance(); calendar.add(Calendar.SECOND, RETRY_INTERVAL_SECONDS[retryTimes]); return calendar.getTime(); } }
RetryListener
package icu.wzk.service; import com.alibaba.fastjson.JSON; import icu.wzk.model.RetryRecord; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.ProducerRecord; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.ZSetOperations; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.Set; import java.util.UUID; @Component @EnableScheduling public class RetryListener { private static final String RETRY_KEY_ZSET = "_retry_key"; private static final String RETRY_VALUE_MAP = "_retry_value"; @Resource private RedisTemplate<String, Object> redisTemplate; @Resource private KafkaTemplate<String, String> kafkaTemplate; private String topic = "tp_demo_retry_01"; @KafkaListener(topics = "tp_demo_retry_02", groupId = "wzkicu") public void consumer(ConsumerRecord<String, String> record) { System.out.println("需要重试的消息: " + record); RetryRecord retryRecord = JSON.parseObject(record.value(), RetryRecord.class); // 防止待重试消息太多撑爆redis 可以将重试消息按下一次重试时间分开存储到不同介质中 String key = UUID.randomUUID().toString(); redisTemplate.opsForHash().put(RETRY_VALUE_MAP, key, record.value()); redisTemplate.opsForZSet().add(RETRY_KEY_ZSET, key, retryRecord.getNextTime()); } @Scheduled(fixedDelay = 2000) public void retryFromRedis() { System.out.println("retry redis"); long currentTime = System.currentTimeMillis(); // 时间倒序获取 Set<ZSetOperations.TypedTuple<Object>> typeTuples = redisTemplate .opsForZSet() .reverseRangeByScoreWithScores(RETRY_KEY_ZSET, 0, currentTime); // 移除取出的消息 redisTemplate.opsForZSet().removeRangeByScore(RETRY_KEY_ZSET, 0, currentTime); for (ZSetOperations.TypedTuple<Object> tuple : typeTuples) { String key = tuple.getValue().toString(); String value = redisTemplate.opsForHash().get(RETRY_VALUE_MAP, key).toString(); redisTemplate.opsForHash().delete(RETRY_VALUE_MAP, key); RetryRecord retryRecord = JSON.parseObject(value, RetryRecord.class); ProducerRecord record = retryRecord.parse(); ProducerRecord recordReal = new ProducerRecord( topic, record.partition(), record.timestamp(), record.key(), record.value(), record.headers() ); kafkaTemplate.send(recordReal); } } }
RetryRecord
package icu.wzk.model; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.internals.RecordHeader; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; public class RetryRecord { public static final String KEY_RETRY_TIMES = "retryTimes"; private String key; private String value; private Integer retryTimes; private String topic; private Long nextTime; public ProducerRecord parse() { Integer partition = null; Long timestamp = System.currentTimeMillis(); List<Header> headers = new ArrayList<>(); ByteBuffer retryTimesBuffer = ByteBuffer.allocate(4); retryTimesBuffer.putInt(retryTimes); retryTimesBuffer.flip(); headers.add(new RecordHeader(RetryRecord.KEY_RETRY_TIMES, retryTimesBuffer)); ProducerRecord sendRecord = new ProducerRecord(topic, partition, timestamp, key, value, headers); return sendRecord; } public String getKey() { return key; } public void setKey(String key) { this.key = key; } public String getValue() { return value; } public void setValue(String value) { this.value = value; } public Integer getRetryTimes() { return retryTimes; } public void setRetryTimes(Integer retryTimes) { this.retryTimes = retryTimes; } public String getTopic() { return topic; } public void setTopic(String topic) { this.topic = topic; } public Long getNextTime() { return nextTime; } public void setNextTime(Long nextTime) { this.nextTime = nextTime; } }
测试结果
Postman
控制台