大数据-77 Kafka 高级特性-稳定性-延时队列、重试队列 概念学习 JavaAPI实现(二)

本文涉及的产品
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 大数据-77 Kafka 高级特性-稳定性-延时队列、重试队列 概念学习 JavaAPI实现(二)

接上篇:https://developer.aliyun.com/article/1622489?spm=a2c6h.13148508.setting.18.49764f0e90XaKU

KafkaService

package icu.wzk.service;


import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.util.concurrent.ExecutionException;

@Service
public class KafkaService {

    @Resource
    private KafkaTemplate<String, String> kafkaTemplate;

    public String sendMessage(ProducerRecord<String, String> record) throws ExecutionException, InterruptedException {
        SendResult<String, String> result = kafkaTemplate.send(record).get();
        RecordMetadata metadata = result.getRecordMetadata();
        String returnResult = metadata.topic() + "\t" + metadata.partition() + "\t" + metadata.offset();
        System.out.println("发送消息: " + returnResult);
        return returnResult;
    }

}

ConsumerService

package icu.wzk.service;


import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

@Component
public class ConsumerListener {

    @Resource
    private KafkaRetryService kafkaRetryService;

    private static int index = 0;

    @KafkaListener(topics = "tp_demo_retry_01", groupId = "wzkicu")
    public void consumer(ConsumerRecord<String, String> record) {
        try {
            // 业务处理
            System.out.println("消费的消息: " + record);
            index ++;
            if (index % 2 == 0) {
                throw new Exception("重发消息");
            }
        } catch (Exception e) {
            // 消息重试
            kafkaRetryService.consumerLater(record);
        }
    }

}

KafkaRetryService

package icu.wzk.service;

import com.alibaba.fastjson.JSON;
import icu.wzk.model.RetryRecord;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Header;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.nio.ByteBuffer;
import java.util.Calendar;
import java.util.Date;


@Service
public class KafkaRetryService {

    /**
     * 消费失败后下一次消息的延迟时间(秒)
     */
    private static final int[] RETRY_INTERVAL_SECONDS = {
            10, 30, 1 * 60, 2 * 60, 5 * 60, 10 * 60, 30 * 60,
            1* 60 * 60, 2 * 60 * 60
    };

    private static final String RETRY_TOPIC = "tp_demo_retry_02";

    @Resource
    private KafkaTemplate<String, String> kafkaTemplate;

    public void consumerLater(ConsumerRecord<String, String> record) {
        // 获取消息的已重试次数
        int retryTimes = getRetryTimes(record);
        Date nextConsumerTime = getNextConsumerTime(retryTimes);
        if (null == nextConsumerTime) {
            return;
        }
        // 组织消息
        RetryRecord retryRecord = new RetryRecord();
        retryRecord.setNextTime(nextConsumerTime.getTime());
        retryRecord.setTopic(record.topic());
        retryRecord.setRetryTimes(retryTimes);
        retryRecord.setKey(record.key());
        retryRecord.setValue(record.value());
        // 转换字符串
        String value = JSON.toJSONString(retryRecord);
        // 发到重试队列
        kafkaTemplate.send(RETRY_TOPIC, null, value);
    }

    /**
     * 获取消息已经重试的次数
     */
    private int getRetryTimes(ConsumerRecord record) {
        int retryTimes = -1;
        for (Header header :record.headers()) {
            if (RetryRecord.KEY_RETRY_TIMES.equals(header.key())) {
                ByteBuffer byteBuffer = ByteBuffer.wrap(header.value());
                retryTimes = byteBuffer.getInt();
            }
        }
        retryTimes ++;
        return retryTimes;
    }

    /**
     * 获取等待重试的下一次消息时间
     */
    private Date getNextConsumerTime(int retryTimes) {
        // 重试次数超过上限 不再重试
        if (RETRY_INTERVAL_SECONDS.length < retryTimes) {
            return null;
        }
        Calendar calendar = Calendar.getInstance();
        calendar.add(Calendar.SECOND, RETRY_INTERVAL_SECONDS[retryTimes]);
        return calendar.getTime();
    }

}

RetryListener

package icu.wzk.service;


import com.alibaba.fastjson.JSON;
import icu.wzk.model.RetryRecord;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ZSetOperations;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.Set;
import java.util.UUID;

@Component
@EnableScheduling
public class RetryListener {

    private static final String RETRY_KEY_ZSET = "_retry_key";
    private static final String RETRY_VALUE_MAP = "_retry_value";

    @Resource
    private RedisTemplate<String, Object> redisTemplate;

    @Resource
    private KafkaTemplate<String, String> kafkaTemplate;

    private String topic = "tp_demo_retry_01";

    @KafkaListener(topics = "tp_demo_retry_02", groupId = "wzkicu")
    public void consumer(ConsumerRecord<String, String> record) {
        System.out.println("需要重试的消息: " + record);
        RetryRecord retryRecord = JSON.parseObject(record.value(), RetryRecord.class);
        // 防止待重试消息太多撑爆redis 可以将重试消息按下一次重试时间分开存储到不同介质中
        String key = UUID.randomUUID().toString();
        redisTemplate.opsForHash().put(RETRY_VALUE_MAP, key, record.value());
        redisTemplate.opsForZSet().add(RETRY_KEY_ZSET, key, retryRecord.getNextTime());
    }

    @Scheduled(fixedDelay = 2000)
    public void retryFromRedis() {
        System.out.println("retry redis");
        long currentTime = System.currentTimeMillis();
        // 时间倒序获取
        Set<ZSetOperations.TypedTuple<Object>> typeTuples = redisTemplate
                .opsForZSet()
                .reverseRangeByScoreWithScores(RETRY_KEY_ZSET, 0, currentTime);
        // 移除取出的消息
        redisTemplate.opsForZSet().removeRangeByScore(RETRY_KEY_ZSET, 0, currentTime);
        for (ZSetOperations.TypedTuple<Object> tuple : typeTuples) {
            String key = tuple.getValue().toString();
            String value = redisTemplate.opsForHash().get(RETRY_VALUE_MAP, key).toString();
            redisTemplate.opsForHash().delete(RETRY_VALUE_MAP, key);
            RetryRecord retryRecord = JSON.parseObject(value, RetryRecord.class);
            ProducerRecord record = retryRecord.parse();
            ProducerRecord recordReal = new ProducerRecord(
                    topic, record.partition(), record.timestamp(),
                    record.key(), record.value(), record.headers()
            );
            kafkaTemplate.send(recordReal);
        }
    }

}

RetryRecord

package icu.wzk.model;

import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;


public class RetryRecord {

    public static final String KEY_RETRY_TIMES = "retryTimes";

    private String key;
    private String value;
    private Integer retryTimes;
    private String topic;
    private Long nextTime;

    public ProducerRecord parse() {
        Integer partition = null;
        Long timestamp = System.currentTimeMillis();
        List<Header> headers = new ArrayList<>();
        ByteBuffer retryTimesBuffer = ByteBuffer.allocate(4);
        retryTimesBuffer.putInt(retryTimes);
        retryTimesBuffer.flip();
        headers.add(new RecordHeader(RetryRecord.KEY_RETRY_TIMES, retryTimesBuffer));
        ProducerRecord sendRecord = new ProducerRecord(topic, partition, timestamp, key, value, headers);
        return sendRecord;
    }

    public String getKey() {
        return key;
    }

    public void setKey(String key) {
        this.key = key;
    }

    public String getValue() {
        return value;
    }

    public void setValue(String value) {
        this.value = value;
    }

    public Integer getRetryTimes() {
        return retryTimes;
    }

    public void setRetryTimes(Integer retryTimes) {
        this.retryTimes = retryTimes;
    }

    public String getTopic() {
        return topic;
    }

    public void setTopic(String topic) {
        this.topic = topic;
    }

    public Long getNextTime() {
        return nextTime;
    }

    public void setNextTime(Long nextTime) {
        this.nextTime = nextTime;
    }
}

测试结果

Postman

控制台

武子康
+关注
目录
打赏
0
2
2
0
107
分享
相关文章
大数据-169 Elasticsearch 索引使用 与 架构概念 增删改查
大数据-169 Elasticsearch 索引使用 与 架构概念 增删改查
145 3
大数据-123 - Flink 并行度 相关概念 全局、作业、算子、Slot并行度 Flink并行度设置与测试
大数据-123 - Flink 并行度 相关概念 全局、作业、算子、Slot并行度 Flink并行度设置与测试
391 0
大数据体系知识学习(三):数据清洗_箱线图的概念以及代码实现
这篇文章介绍了如何使用Python中的matplotlib和numpy库来创建箱线图,以检测和处理数据集中的异常值。
203 1
大数据体系知识学习(三):数据清洗_箱线图的概念以及代码实现
Apache Kafka核心概念解析:生产者、消费者与Broker
【10月更文挑战第24天】在数字化转型的大潮中,数据的实时处理能力成为了企业竞争力的重要组成部分。Apache Kafka 作为一款高性能的消息队列系统,在这一领域占据了重要地位。通过使用 Kafka,企业可以构建出高效的数据管道,实现数据的快速传输和处理。今天,我将从个人的角度出发,深入解析 Kafka 的三大核心组件——生产者、消费者与 Broker,希望能够帮助大家建立起对 Kafka 内部机制的基本理解。
208 2
大数据的概念
【10月更文挑战第16天】
323 4
大数据-77 Kafka 高级特性-稳定性-延时队列、重试队列 概念学习 JavaAPI实现(一)
大数据-77 Kafka 高级特性-稳定性-延时队列、重试队列 概念学习 JavaAPI实现(一)
175 1
大数据-116 - Flink DataStream Sink 原理、概念、常见Sink类型 配置与使用 附带案例1:消费Kafka写到Redis
大数据-116 - Flink DataStream Sink 原理、概念、常见Sink类型 配置与使用 附带案例1:消费Kafka写到Redis
521 0
大数据-105 Spark GraphX 基本概述 与 架构基础 概念详解 核心数据结构
大数据-105 Spark GraphX 基本概述 与 架构基础 概念详解 核心数据结构
149 0
大数据-98 Spark 集群 Spark Streaming 基础概述 架构概念 执行流程 优缺点
大数据-98 Spark 集群 Spark Streaming 基础概述 架构概念 执行流程 优缺点
146 0
kafka 的数据是放在磁盘上还是内存上,为什么速度会快?
Kafka的数据存储机制通过将数据同时写入磁盘和内存,确保高吞吐量与持久性。其日志文件按主题和分区组织,使用预写日志(WAL)保证数据持久性,并借助操作系统的页缓存加速读取。Kafka采用顺序I/O、零拷贝技术和批量处理优化性能,支持分区分段以实现并行处理。示例代码展示了如何使用KafkaProducer发送消息。
AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等

登录插画

登录以查看您的控制台资源

管理云资源
状态一览
快捷访问