如何结合spring和redis进行优雅的轮询

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 Tair(兼容Redis),内存型 2GB
简介: 如何结合spring和redis进行优雅的轮询

背景


当业务结束后需要轮训获取内部接口或者第三方接口执行结果信息,可以使用一些轮训方式,如mq、项目自己异步线程轮训、队列等方式,下面的方式是以redis和spring定时执行相结合实现的


配置


application.xml和application.properties配置redis相关配置


application.xml


    <!-- Redis start -->
    <!-- 连接池工厂配置 -->
    <bean id="jedisConnectionFactory" class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory"
          p:use-pool="true" p:host-name="${redis.host}" p:port="${redis.port}" p:password="${redis.password}"/>
    <!-- redis请求实例配置 -->
    <bean id="redisTemplate" class="org.springframework.data.redis.core.RedisTemplate"
          p:connection-factory-ref="jedisConnectionFactory"/>

application.properties

redis.host=xxx.net
redis.port=7100
redis.password=xxx
redis.app=xxx

业务实现

轮询component

@Slf4j
@Component
public class RoundRobinComponent implements InitializingBean , DisposableBean {
    @Override
    public void afterPropertiesSet() {
        open();
    }
    @Override
    public void destroy() throws Exception {
        stop();
    }
    private static final Logger LOGGER = LoggerFactory.getLogger(RoundRobinComponent.class);
    @Autowired
    private RedisTemplate<String, String> redisTemplate;
    /** 任务队列 key */
    private static final String task_queue_flag = "TASK:BUF:QUEUE";
    /** 全局开关 - 非空运行,空不运行 */
    private static final String task_queue_switch = "LOOP:SWITCH";
    private static final ThreadPoolExecutor POOL = new ThreadPoolExecutor(10, 20, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10000),
            new ThreadFactory() {
                @Override
                public Thread newThread(@NotNull Runnable r) {
                    return new Thread(r, "loop-task");
                }
            },
            new RejectedExecutionHandler() {
                @Override
                public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                    try {
                        executor.getQueue().put(r);
                    } catch (InterruptedException e) {
                        LOGGER.error("re put task error !" , e);
                    }
                }
            });
    //每1s执行任务,按机器数量和需求预估数量设置
    @Scheduled(cron = "0/1 * * * * ?")
    private void loop(){
        for(int i = 0 ; i < 10 ; i++){
            String context_json = redisTemplate.opsForList().leftPop(task_queue_flag);
            loop_exec(context_json);
        }
    }
    private void loop_exec(String context_json){
        POOL.execute(new Runnable() {
            @Override
            public void run() {
                TaskContext context = null;
                try {
                    context = JSONObject.parseObject(context_json, TaskContext.class);
                    if (
                            StringUtils.isNotBlank(context_json) &&
                                    context != null
                    ){
                        Integer delaySecond = context.getDelaySecond();
                        Long contextCurrentTimeMills = context.getCurrentTimeMills();
                        long currentTimeMillis = System.currentTimeMillis();
                        //不满足延迟执行时间重新入队
                        if (contextCurrentTimeMills + delaySecond * 1000 > currentTimeMillis ) {
                            push(context);
                            return;
                        }
                        //ProcessResult result = doGetResult;
                        //执行请求任务获取结果【伪代码】
                        ProcessResult result = new ProcessResult();
                        result.setCode(500);
                        //请求响应不成功且当前任务没超过最大次数继续入队,并设置当前重试次数
                        if (result.getCode() != 200 && context.getCurrentTryCount() < context.getMaxTryCount()){
                            Integer count = context.getCurrentTryCount() + 1;
                            context.setCurrentTryCount(count);
                            context.setCode(result.getCode());
                            push(context);
                        }
                        //到这一步,到达任务轮训上限且氢气都失败处理重试逻辑
                        if (result.getCode() != 200) {
                            //doFaildMethod();
                        }
                        if (result.getCode() == 200) {
                            //执行响应成功后的任务
                            //doSuccessMethod();
                        }
                    }
                    Thread.sleep(ThreadLocalRandom.current().nextInt(500) + 1);
                } catch (InterruptedException e) {
                    LOGGER.error("random Interrupted error ! context : {} " , JSON.toJSONString(context) , e);
                }
            }
        });
    }
    public Long push(TaskContext context) throws RuntimeException {
        if(!checkParam(context)){
            throw new RuntimeException("参数错误!");
        }
        if(conditionSwitch()){
            return redisTemplate.opsForList().rightPush(task_queue_flag , JSON.toJSONString(context));
        }
        return -1L;
    }
    private void stop() {
        redisTemplate.delete(task_queue_switch);
    }
    private void open() {
        redisTemplate.opsForValue().set(task_queue_switch , "1");
    }
    //######## ###########
    private boolean conditionSwitch(){
        //非空 true,空 false
        return StringUtils.isNotBlank(redisTemplate.opsForValue().get(task_queue_switch));
    }
    private boolean checkParam(TaskContext param){
        if(param == null){
            return false;
        }
        return true;
    }
}

相关class

ProcessResult
@Data
@NoArgsConstructor
public class ProcessResult<T> {
    /**
     * 响应code
     *
     */
    private Integer code;
    private T resultData;
}

TaskContext

@Data
public class TaskContext implements Serializable {
    /**
     * 延迟执行轮训任务时间执行时间,默认1s
     */
    private Integer delaySecond = 1;
    /**
     * 额外上下文信息
     */
    private String exData;
    /**
     * 提交轮训任务时间
     */
    private Long currentTimeMills;
    /**
     * 响应结果code
     *
     */
    private Integer code;
    /**
     * 最高失败可重试次数
     */
    private Integer maxTryCount;
    /**
     * 当前重试次数
     */
    private Integer currentTryCount;
    public TaskContext() {
    }
    public TaskContext(String exData, Integer maxTryCount, Integer delaySecond){
        this.exData = exData;
        this.maxTryCount = maxTryCount;
        if (delaySecond != null && delaySecond >= 0){
            this.delaySecond = delaySecond;
        }
        this.currentTryCount = 0;
        this.currentTimeMills = System.currentTimeMillis();
    }
}

测试类

@RunWith(Spring4JUnitParamsRunner.class)
@ContextConfiguration(locations = {"classpath*:application.xml"})
public class ServiceTest {
    @Autowired
    private RoundRobinComponent roundRobinComponent;
    @Test
    public void testRoundRobinComponent(){
        roundRobinComponent.push(new TaskContext(
                "my test text",
                5,
                1
                ));
    }
}
相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
6天前
|
消息中间件 NoSQL Java
Spring Boot整合Redis
通过Spring Boot整合Redis,可以显著提升应用的性能和响应速度。在本文中,我们详细介绍了如何配置和使用Redis,包括基本的CRUD操作和具有过期时间的值设置方法。希望本文能帮助你在实际项目中高效地整合和使用Redis。
21 1
|
1月前
|
NoSQL Java Redis
redis的基本命令,并用netty操作redis(不使用springboot或者spring框架)就单纯的用netty搞。
这篇文章介绍了Redis的基本命令,并展示了如何使用Netty框架直接与Redis服务器进行通信,包括设置Netty客户端、编写处理程序以及初始化Channel的完整示例代码。
42 1
redis的基本命令,并用netty操作redis(不使用springboot或者spring框架)就单纯的用netty搞。
|
25天前
|
缓存 NoSQL Java
Spring Boot与Redis:整合与实战
【10月更文挑战第15天】本文介绍了如何在Spring Boot项目中整合Redis,通过一个电商商品推荐系统的案例,详细展示了从添加依赖、配置连接信息到创建配置类的具体步骤。实战部分演示了如何利用Redis缓存提高系统响应速度,减少数据库访问压力,从而提升用户体验。
63 2
|
12天前
|
JavaScript NoSQL Java
CC-ADMIN后台简介一个基于 Spring Boot 2.1.3 、SpringBootMybatis plus、JWT、Shiro、Redis、Vue quasar 的前后端分离的后台管理系统
CC-ADMIN后台简介一个基于 Spring Boot 2.1.3 、SpringBootMybatis plus、JWT、Shiro、Redis、Vue quasar 的前后端分离的后台管理系统
28 0
|
1月前
|
NoSQL Java Redis
在 Spring 中操作 Redis
本文详细介绍了在Spring框架中如何通过引入依赖、配置文件、使用StringRedisTemplate类以及执行原生命令等方式来操作Redis数据库,并提供了对String、List、Set、Hash和ZSet数据类型的操作示例。
64 0
在 Spring 中操作 Redis
|
2月前
|
NoSQL 网络协议 Java
[Redis] 渐进式遍历+使用jedis操作Redis+使用Spring操作Redis
[Redis] 渐进式遍历+使用jedis操作Redis+使用Spring操作Redis
41 7
|
1月前
|
存储 NoSQL Java
Spring Boot项目中使用Redis实现接口幂等性的方案
通过上述方法,可以有效地在Spring Boot项目中利用Redis实现接口幂等性,既保证了接口操作的安全性,又提高了系统的可靠性。
35 0
|
1月前
|
消息中间件 缓存 NoSQL
Redis 是一个高性能的键值对存储系统,常用于缓存、消息队列和会话管理等场景。
【10月更文挑战第4天】Redis 是一个高性能的键值对存储系统,常用于缓存、消息队列和会话管理等场景。随着数据增长,有时需要将 Redis 数据导出以进行分析、备份或迁移。本文详细介绍几种导出方法:1)使用 Redis 命令与重定向;2)利用 Redis 的 RDB 和 AOF 持久化功能;3)借助第三方工具如 `redis-dump`。每种方法均附有示例代码,帮助你轻松完成数据导出任务。无论数据量大小,总有一款适合你。
70 6
|
4天前
|
缓存 NoSQL 关系型数据库
大厂面试高频:如何解决Redis缓存雪崩、缓存穿透、缓存并发等5大难题
本文详解缓存雪崩、缓存穿透、缓存并发及缓存预热等问题,提供高可用解决方案,帮助你在大厂面试和实际工作中应对这些常见并发场景。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
大厂面试高频:如何解决Redis缓存雪崩、缓存穿透、缓存并发等5大难题
|
5天前
|
存储 缓存 NoSQL
【赵渝强老师】基于Redis的旁路缓存架构
本文介绍了引入缓存后的系统架构,通过缓存可以提升访问性能、降低网络拥堵、减轻服务负载和增强可扩展性。文中提供了相关图片和视频讲解,并讨论了数据库读写分离、分库分表等方法来减轻数据库压力。同时,文章也指出了缓存可能带来的复杂度增加、成本提高和数据一致性问题。
【赵渝强老师】基于Redis的旁路缓存架构