大数据-77 Kafka 高级特性-稳定性-延时队列、重试队列 概念学习 JavaAPI实现(二)

本文涉及的产品
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 大数据-77 Kafka 高级特性-稳定性-延时队列、重试队列 概念学习 JavaAPI实现(二)

接上篇:https://developer.aliyun.com/article/1622489?spm=a2c6h.13148508.setting.18.49764f0e90XaKU

KafkaService

package icu.wzk.service;


import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.util.concurrent.ExecutionException;

@Service
public class KafkaService {

    @Resource
    private KafkaTemplate<String, String> kafkaTemplate;

    public String sendMessage(ProducerRecord<String, String> record) throws ExecutionException, InterruptedException {
        SendResult<String, String> result = kafkaTemplate.send(record).get();
        RecordMetadata metadata = result.getRecordMetadata();
        String returnResult = metadata.topic() + "\t" + metadata.partition() + "\t" + metadata.offset();
        System.out.println("发送消息: " + returnResult);
        return returnResult;
    }

}

ConsumerService

package icu.wzk.service;


import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

@Component
public class ConsumerListener {

    @Resource
    private KafkaRetryService kafkaRetryService;

    private static int index = 0;

    @KafkaListener(topics = "tp_demo_retry_01", groupId = "wzkicu")
    public void consumer(ConsumerRecord<String, String> record) {
        try {
            // 业务处理
            System.out.println("消费的消息: " + record);
            index ++;
            if (index % 2 == 0) {
                throw new Exception("重发消息");
            }
        } catch (Exception e) {
            // 消息重试
            kafkaRetryService.consumerLater(record);
        }
    }

}

KafkaRetryService

package icu.wzk.service;

import com.alibaba.fastjson.JSON;
import icu.wzk.model.RetryRecord;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Header;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.nio.ByteBuffer;
import java.util.Calendar;
import java.util.Date;


@Service
public class KafkaRetryService {

    /**
     * 消费失败后下一次消息的延迟时间(秒)
     */
    private static final int[] RETRY_INTERVAL_SECONDS = {
            10, 30, 1 * 60, 2 * 60, 5 * 60, 10 * 60, 30 * 60,
            1* 60 * 60, 2 * 60 * 60
    };

    private static final String RETRY_TOPIC = "tp_demo_retry_02";

    @Resource
    private KafkaTemplate<String, String> kafkaTemplate;

    public void consumerLater(ConsumerRecord<String, String> record) {
        // 获取消息的已重试次数
        int retryTimes = getRetryTimes(record);
        Date nextConsumerTime = getNextConsumerTime(retryTimes);
        if (null == nextConsumerTime) {
            return;
        }
        // 组织消息
        RetryRecord retryRecord = new RetryRecord();
        retryRecord.setNextTime(nextConsumerTime.getTime());
        retryRecord.setTopic(record.topic());
        retryRecord.setRetryTimes(retryTimes);
        retryRecord.setKey(record.key());
        retryRecord.setValue(record.value());
        // 转换字符串
        String value = JSON.toJSONString(retryRecord);
        // 发到重试队列
        kafkaTemplate.send(RETRY_TOPIC, null, value);
    }

    /**
     * 获取消息已经重试的次数
     */
    private int getRetryTimes(ConsumerRecord record) {
        int retryTimes = -1;
        for (Header header :record.headers()) {
            if (RetryRecord.KEY_RETRY_TIMES.equals(header.key())) {
                ByteBuffer byteBuffer = ByteBuffer.wrap(header.value());
                retryTimes = byteBuffer.getInt();
            }
        }
        retryTimes ++;
        return retryTimes;
    }

    /**
     * 获取等待重试的下一次消息时间
     */
    private Date getNextConsumerTime(int retryTimes) {
        // 重试次数超过上限 不再重试
        if (RETRY_INTERVAL_SECONDS.length < retryTimes) {
            return null;
        }
        Calendar calendar = Calendar.getInstance();
        calendar.add(Calendar.SECOND, RETRY_INTERVAL_SECONDS[retryTimes]);
        return calendar.getTime();
    }

}

RetryListener

package icu.wzk.service;


import com.alibaba.fastjson.JSON;
import icu.wzk.model.RetryRecord;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ZSetOperations;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.Set;
import java.util.UUID;

@Component
@EnableScheduling
public class RetryListener {

    private static final String RETRY_KEY_ZSET = "_retry_key";
    private static final String RETRY_VALUE_MAP = "_retry_value";

    @Resource
    private RedisTemplate<String, Object> redisTemplate;

    @Resource
    private KafkaTemplate<String, String> kafkaTemplate;

    private String topic = "tp_demo_retry_01";

    @KafkaListener(topics = "tp_demo_retry_02", groupId = "wzkicu")
    public void consumer(ConsumerRecord<String, String> record) {
        System.out.println("需要重试的消息: " + record);
        RetryRecord retryRecord = JSON.parseObject(record.value(), RetryRecord.class);
        // 防止待重试消息太多撑爆redis 可以将重试消息按下一次重试时间分开存储到不同介质中
        String key = UUID.randomUUID().toString();
        redisTemplate.opsForHash().put(RETRY_VALUE_MAP, key, record.value());
        redisTemplate.opsForZSet().add(RETRY_KEY_ZSET, key, retryRecord.getNextTime());
    }

    @Scheduled(fixedDelay = 2000)
    public void retryFromRedis() {
        System.out.println("retry redis");
        long currentTime = System.currentTimeMillis();
        // 时间倒序获取
        Set<ZSetOperations.TypedTuple<Object>> typeTuples = redisTemplate
                .opsForZSet()
                .reverseRangeByScoreWithScores(RETRY_KEY_ZSET, 0, currentTime);
        // 移除取出的消息
        redisTemplate.opsForZSet().removeRangeByScore(RETRY_KEY_ZSET, 0, currentTime);
        for (ZSetOperations.TypedTuple<Object> tuple : typeTuples) {
            String key = tuple.getValue().toString();
            String value = redisTemplate.opsForHash().get(RETRY_VALUE_MAP, key).toString();
            redisTemplate.opsForHash().delete(RETRY_VALUE_MAP, key);
            RetryRecord retryRecord = JSON.parseObject(value, RetryRecord.class);
            ProducerRecord record = retryRecord.parse();
            ProducerRecord recordReal = new ProducerRecord(
                    topic, record.partition(), record.timestamp(),
                    record.key(), record.value(), record.headers()
            );
            kafkaTemplate.send(recordReal);
        }
    }

}

RetryRecord

package icu.wzk.model;

import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;


public class RetryRecord {

    public static final String KEY_RETRY_TIMES = "retryTimes";

    private String key;
    private String value;
    private Integer retryTimes;
    private String topic;
    private Long nextTime;

    public ProducerRecord parse() {
        Integer partition = null;
        Long timestamp = System.currentTimeMillis();
        List<Header> headers = new ArrayList<>();
        ByteBuffer retryTimesBuffer = ByteBuffer.allocate(4);
        retryTimesBuffer.putInt(retryTimes);
        retryTimesBuffer.flip();
        headers.add(new RecordHeader(RetryRecord.KEY_RETRY_TIMES, retryTimesBuffer));
        ProducerRecord sendRecord = new ProducerRecord(topic, partition, timestamp, key, value, headers);
        return sendRecord;
    }

    public String getKey() {
        return key;
    }

    public void setKey(String key) {
        this.key = key;
    }

    public String getValue() {
        return value;
    }

    public void setValue(String value) {
        this.value = value;
    }

    public Integer getRetryTimes() {
        return retryTimes;
    }

    public void setRetryTimes(Integer retryTimes) {
        this.retryTimes = retryTimes;
    }

    public String getTopic() {
        return topic;
    }

    public void setTopic(String topic) {
        this.topic = topic;
    }

    public Long getNextTime() {
        return nextTime;
    }

    public void setNextTime(Long nextTime) {
        this.nextTime = nextTime;
    }
}

测试结果

Postman

控制台

目录
相关文章
消息中间件 Java Kafka
196 0
|
5月前
|
数据采集 搜索推荐 算法
Java 大视界 -- Java 大数据在智能教育学习社区用户互动分析与社区活跃度提升中的应用(274)
本文系统阐述 Java 大数据技术在智能教育学习社区中的深度应用,涵盖数据采集架构、核心分析算法、活跃度提升策略及前沿技术探索,为教育数字化转型提供完整技术解决方案。
|
机器学习/深度学习 XML 分布式计算
大数据的概念
【10月更文挑战第16天】
657 4
|
存储 SQL 分布式计算
大数据学习
【10月更文挑战第15天】
321 1
|
数据采集 数据可视化 大数据
大数据体系知识学习(三):数据清洗_箱线图的概念以及代码实现
这篇文章介绍了如何使用Python中的matplotlib和numpy库来创建箱线图,以检测和处理数据集中的异常值。
347 1
大数据体系知识学习(三):数据清洗_箱线图的概念以及代码实现
|
分布式计算 大数据 Linux
大数据体系知识学习(二):WordCount案例实现及错误总结
这篇文章介绍了如何使用PySpark进行WordCount操作,包括环境配置、代码实现、运行结果和遇到的错误。作者在运行过程中遇到了Py4JJavaError和JAVA_HOME未设置的问题,并通过导入findspark初始化和设置环境变量解决了这些问题。文章还讨论了groupByKey和reduceByKey的区别。
220 1
|
分布式计算 Hadoop 大数据
大数据体系知识学习(一):PySpark和Hadoop环境的搭建与测试
这篇文章是关于大数据体系知识学习的,主要介绍了Apache Spark的基本概念、特点、组件,以及如何安装配置Java、PySpark和Hadoop环境。文章还提供了详细的安装步骤和测试代码,帮助读者搭建和测试大数据环境。
434 1
|
存储 分布式计算 大数据
大数据-169 Elasticsearch 索引使用 与 架构概念 增删改查
大数据-169 Elasticsearch 索引使用 与 架构概念 增删改查
203 3
|
消息中间件 存储 druid
大数据-156 Apache Druid 案例实战 Scala Kafka 订单统计
大数据-156 Apache Druid 案例实战 Scala Kafka 订单统计
206 3
|
消息中间件 分布式计算 druid
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(一)
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(一)
210 1
下一篇
oss云网关配置