大数据-77 Kafka 高级特性-稳定性-延时队列、重试队列 概念学习 JavaAPI实现(二)

本文涉及的产品
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 大数据-77 Kafka 高级特性-稳定性-延时队列、重试队列 概念学习 JavaAPI实现(二)

接上篇:https://developer.aliyun.com/article/1622489?spm=a2c6h.13148508.setting.18.49764f0e90XaKU

KafkaService

package icu.wzk.service;


import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.util.concurrent.ExecutionException;

@Service
public class KafkaService {

    @Resource
    private KafkaTemplate<String, String> kafkaTemplate;

    public String sendMessage(ProducerRecord<String, String> record) throws ExecutionException, InterruptedException {
        SendResult<String, String> result = kafkaTemplate.send(record).get();
        RecordMetadata metadata = result.getRecordMetadata();
        String returnResult = metadata.topic() + "\t" + metadata.partition() + "\t" + metadata.offset();
        System.out.println("发送消息: " + returnResult);
        return returnResult;
    }

}

ConsumerService

package icu.wzk.service;


import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

@Component
public class ConsumerListener {

    @Resource
    private KafkaRetryService kafkaRetryService;

    private static int index = 0;

    @KafkaListener(topics = "tp_demo_retry_01", groupId = "wzkicu")
    public void consumer(ConsumerRecord<String, String> record) {
        try {
            // 业务处理
            System.out.println("消费的消息: " + record);
            index ++;
            if (index % 2 == 0) {
                throw new Exception("重发消息");
            }
        } catch (Exception e) {
            // 消息重试
            kafkaRetryService.consumerLater(record);
        }
    }

}

KafkaRetryService

package icu.wzk.service;

import com.alibaba.fastjson.JSON;
import icu.wzk.model.RetryRecord;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Header;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.nio.ByteBuffer;
import java.util.Calendar;
import java.util.Date;


@Service
public class KafkaRetryService {

    /**
     * 消费失败后下一次消息的延迟时间(秒)
     */
    private static final int[] RETRY_INTERVAL_SECONDS = {
            10, 30, 1 * 60, 2 * 60, 5 * 60, 10 * 60, 30 * 60,
            1* 60 * 60, 2 * 60 * 60
    };

    private static final String RETRY_TOPIC = "tp_demo_retry_02";

    @Resource
    private KafkaTemplate<String, String> kafkaTemplate;

    public void consumerLater(ConsumerRecord<String, String> record) {
        // 获取消息的已重试次数
        int retryTimes = getRetryTimes(record);
        Date nextConsumerTime = getNextConsumerTime(retryTimes);
        if (null == nextConsumerTime) {
            return;
        }
        // 组织消息
        RetryRecord retryRecord = new RetryRecord();
        retryRecord.setNextTime(nextConsumerTime.getTime());
        retryRecord.setTopic(record.topic());
        retryRecord.setRetryTimes(retryTimes);
        retryRecord.setKey(record.key());
        retryRecord.setValue(record.value());
        // 转换字符串
        String value = JSON.toJSONString(retryRecord);
        // 发到重试队列
        kafkaTemplate.send(RETRY_TOPIC, null, value);
    }

    /**
     * 获取消息已经重试的次数
     */
    private int getRetryTimes(ConsumerRecord record) {
        int retryTimes = -1;
        for (Header header :record.headers()) {
            if (RetryRecord.KEY_RETRY_TIMES.equals(header.key())) {
                ByteBuffer byteBuffer = ByteBuffer.wrap(header.value());
                retryTimes = byteBuffer.getInt();
            }
        }
        retryTimes ++;
        return retryTimes;
    }

    /**
     * 获取等待重试的下一次消息时间
     */
    private Date getNextConsumerTime(int retryTimes) {
        // 重试次数超过上限 不再重试
        if (RETRY_INTERVAL_SECONDS.length < retryTimes) {
            return null;
        }
        Calendar calendar = Calendar.getInstance();
        calendar.add(Calendar.SECOND, RETRY_INTERVAL_SECONDS[retryTimes]);
        return calendar.getTime();
    }

}

RetryListener

package icu.wzk.service;


import com.alibaba.fastjson.JSON;
import icu.wzk.model.RetryRecord;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ZSetOperations;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.Set;
import java.util.UUID;

@Component
@EnableScheduling
public class RetryListener {

    private static final String RETRY_KEY_ZSET = "_retry_key";
    private static final String RETRY_VALUE_MAP = "_retry_value";

    @Resource
    private RedisTemplate<String, Object> redisTemplate;

    @Resource
    private KafkaTemplate<String, String> kafkaTemplate;

    private String topic = "tp_demo_retry_01";

    @KafkaListener(topics = "tp_demo_retry_02", groupId = "wzkicu")
    public void consumer(ConsumerRecord<String, String> record) {
        System.out.println("需要重试的消息: " + record);
        RetryRecord retryRecord = JSON.parseObject(record.value(), RetryRecord.class);
        // 防止待重试消息太多撑爆redis 可以将重试消息按下一次重试时间分开存储到不同介质中
        String key = UUID.randomUUID().toString();
        redisTemplate.opsForHash().put(RETRY_VALUE_MAP, key, record.value());
        redisTemplate.opsForZSet().add(RETRY_KEY_ZSET, key, retryRecord.getNextTime());
    }

    @Scheduled(fixedDelay = 2000)
    public void retryFromRedis() {
        System.out.println("retry redis");
        long currentTime = System.currentTimeMillis();
        // 时间倒序获取
        Set<ZSetOperations.TypedTuple<Object>> typeTuples = redisTemplate
                .opsForZSet()
                .reverseRangeByScoreWithScores(RETRY_KEY_ZSET, 0, currentTime);
        // 移除取出的消息
        redisTemplate.opsForZSet().removeRangeByScore(RETRY_KEY_ZSET, 0, currentTime);
        for (ZSetOperations.TypedTuple<Object> tuple : typeTuples) {
            String key = tuple.getValue().toString();
            String value = redisTemplate.opsForHash().get(RETRY_VALUE_MAP, key).toString();
            redisTemplate.opsForHash().delete(RETRY_VALUE_MAP, key);
            RetryRecord retryRecord = JSON.parseObject(value, RetryRecord.class);
            ProducerRecord record = retryRecord.parse();
            ProducerRecord recordReal = new ProducerRecord(
                    topic, record.partition(), record.timestamp(),
                    record.key(), record.value(), record.headers()
            );
            kafkaTemplate.send(recordReal);
        }
    }

}

RetryRecord

package icu.wzk.model;

import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;


public class RetryRecord {

    public static final String KEY_RETRY_TIMES = "retryTimes";

    private String key;
    private String value;
    private Integer retryTimes;
    private String topic;
    private Long nextTime;

    public ProducerRecord parse() {
        Integer partition = null;
        Long timestamp = System.currentTimeMillis();
        List<Header> headers = new ArrayList<>();
        ByteBuffer retryTimesBuffer = ByteBuffer.allocate(4);
        retryTimesBuffer.putInt(retryTimes);
        retryTimesBuffer.flip();
        headers.add(new RecordHeader(RetryRecord.KEY_RETRY_TIMES, retryTimesBuffer));
        ProducerRecord sendRecord = new ProducerRecord(topic, partition, timestamp, key, value, headers);
        return sendRecord;
    }

    public String getKey() {
        return key;
    }

    public void setKey(String key) {
        this.key = key;
    }

    public String getValue() {
        return value;
    }

    public void setValue(String value) {
        this.value = value;
    }

    public Integer getRetryTimes() {
        return retryTimes;
    }

    public void setRetryTimes(Integer retryTimes) {
        this.retryTimes = retryTimes;
    }

    public String getTopic() {
        return topic;
    }

    public void setTopic(String topic) {
        this.topic = topic;
    }

    public Long getNextTime() {
        return nextTime;
    }

    public void setNextTime(Long nextTime) {
        this.nextTime = nextTime;
    }
}

测试结果

Postman

控制台

目录
相关文章
|
5月前
|
SQL 大数据 API
大数据-118 - Flink DataSet 基本介绍 核心特性 创建、转换、输出等
大数据-118 - Flink DataSet 基本介绍 核心特性 创建、转换、输出等
100 0
|
5月前
|
消息中间件 NoSQL 大数据
大数据-77 Kafka 高级特性-稳定性-延时队列、重试队列 概念学习 JavaAPI实现(一)
大数据-77 Kafka 高级特性-稳定性-延时队列、重试队列 概念学习 JavaAPI实现(一)
98 1
|
2月前
|
SQL 数据可视化 大数据
从数据小白到大数据达人:一步步成为数据分析专家
从数据小白到大数据达人:一步步成为数据分析专家
255 92
|
4月前
|
存储 分布式计算 数据挖掘
数据架构 ODPS 是什么?
数据架构 ODPS 是什么?
943 7
|
8天前
|
SQL 分布式计算 数据挖掘
从湖仓分离到湖仓一体,四川航空基于 SelectDB 的多源数据联邦分析实践
川航选择引入 SelectDB 建设湖仓一体大数据分析引擎,取得了数据导入效率提升 3-6 倍,查询分析性能提升 10-18 倍、实时性提升至 5 秒内等收益。
从湖仓分离到湖仓一体,四川航空基于 SelectDB 的多源数据联邦分析实践
|
4月前
|
存储 分布式计算 大数据
大数据 优化数据读取
【11月更文挑战第4天】
138 2
|
12天前
|
人工智能 算法 大数据
数据的“潘多拉魔盒”:大数据伦理的深度思考
数据的“潘多拉魔盒”:大数据伦理的深度思考
51 25
|
20天前
|
存储 SQL 数据挖掘
数据无界、湖仓无界, Apache Doris 湖仓一体解决方案全面解读(上篇)
湖仓一体架构融合了数据湖的低成本、高扩展性,以及数据仓库的高性能、强数据治理能力,高效应对大数据时代的挑战。为助力企业实现湖仓一体的建设,Apache Doris 提出了数据无界和湖仓无界核心理念,并结合自身特性,助力企业加速从 0 到 1 构建湖仓体系,降低转型过程中的风险和成本。本文将对湖仓一体演进及 Apache Doris 湖仓一体方案进行介绍。
数据无界、湖仓无界, Apache Doris 湖仓一体解决方案全面解读(上篇)
|
2月前
|
分布式计算 Shell MaxCompute
odps测试表及大量数据构建测试
odps测试表及大量数据构建测试
|
22天前
|
存储 分布式计算 大数据
大数据与云计算:无缝结合,开启数据新纪元
大数据与云计算:无缝结合,开启数据新纪元
136 11