如何结合spring和redis进行优雅的轮询

本文涉及的产品
云数据库 Tair(兼容Redis),内存型 2GB
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
简介: 如何结合spring和redis进行优雅的轮询

背景


当业务结束后需要轮训获取内部接口或者第三方接口执行结果信息,可以使用一些轮训方式,如mq、项目自己异步线程轮训、队列等方式,下面的方式是以redis和spring定时执行相结合实现的


配置


application.xml和application.properties配置redis相关配置


application.xml


    <!-- Redis start -->
    <!-- 连接池工厂配置 -->
    <bean id="jedisConnectionFactory" class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory"
          p:use-pool="true" p:host-name="${redis.host}" p:port="${redis.port}" p:password="${redis.password}"/>
    <!-- redis请求实例配置 -->
    <bean id="redisTemplate" class="org.springframework.data.redis.core.RedisTemplate"
          p:connection-factory-ref="jedisConnectionFactory"/>

application.properties

redis.host=xxx.net
redis.port=7100
redis.password=xxx
redis.app=xxx

业务实现

轮询component

@Slf4j
@Component
public class RoundRobinComponent implements InitializingBean , DisposableBean {
    @Override
    public void afterPropertiesSet() {
        open();
    }
    @Override
    public void destroy() throws Exception {
        stop();
    }
    private static final Logger LOGGER = LoggerFactory.getLogger(RoundRobinComponent.class);
    @Autowired
    private RedisTemplate<String, String> redisTemplate;
    /** 任务队列 key */
    private static final String task_queue_flag = "TASK:BUF:QUEUE";
    /** 全局开关 - 非空运行,空不运行 */
    private static final String task_queue_switch = "LOOP:SWITCH";
    private static final ThreadPoolExecutor POOL = new ThreadPoolExecutor(10, 20, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10000),
            new ThreadFactory() {
                @Override
                public Thread newThread(@NotNull Runnable r) {
                    return new Thread(r, "loop-task");
                }
            },
            new RejectedExecutionHandler() {
                @Override
                public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                    try {
                        executor.getQueue().put(r);
                    } catch (InterruptedException e) {
                        LOGGER.error("re put task error !" , e);
                    }
                }
            });
    //每1s执行任务,按机器数量和需求预估数量设置
    @Scheduled(cron = "0/1 * * * * ?")
    private void loop(){
        for(int i = 0 ; i < 10 ; i++){
            String context_json = redisTemplate.opsForList().leftPop(task_queue_flag);
            loop_exec(context_json);
        }
    }
    private void loop_exec(String context_json){
        POOL.execute(new Runnable() {
            @Override
            public void run() {
                TaskContext context = null;
                try {
                    context = JSONObject.parseObject(context_json, TaskContext.class);
                    if (
                            StringUtils.isNotBlank(context_json) &&
                                    context != null
                    ){
                        Integer delaySecond = context.getDelaySecond();
                        Long contextCurrentTimeMills = context.getCurrentTimeMills();
                        long currentTimeMillis = System.currentTimeMillis();
                        //不满足延迟执行时间重新入队
                        if (contextCurrentTimeMills + delaySecond * 1000 > currentTimeMillis ) {
                            push(context);
                            return;
                        }
                        //ProcessResult result = doGetResult;
                        //执行请求任务获取结果【伪代码】
                        ProcessResult result = new ProcessResult();
                        result.setCode(500);
                        //请求响应不成功且当前任务没超过最大次数继续入队,并设置当前重试次数
                        if (result.getCode() != 200 && context.getCurrentTryCount() < context.getMaxTryCount()){
                            Integer count = context.getCurrentTryCount() + 1;
                            context.setCurrentTryCount(count);
                            context.setCode(result.getCode());
                            push(context);
                        }
                        //到这一步,到达任务轮训上限且氢气都失败处理重试逻辑
                        if (result.getCode() != 200) {
                            //doFaildMethod();
                        }
                        if (result.getCode() == 200) {
                            //执行响应成功后的任务
                            //doSuccessMethod();
                        }
                    }
                    Thread.sleep(ThreadLocalRandom.current().nextInt(500) + 1);
                } catch (InterruptedException e) {
                    LOGGER.error("random Interrupted error ! context : {} " , JSON.toJSONString(context) , e);
                }
            }
        });
    }
    public Long push(TaskContext context) throws RuntimeException {
        if(!checkParam(context)){
            throw new RuntimeException("参数错误!");
        }
        if(conditionSwitch()){
            return redisTemplate.opsForList().rightPush(task_queue_flag , JSON.toJSONString(context));
        }
        return -1L;
    }
    private void stop() {
        redisTemplate.delete(task_queue_switch);
    }
    private void open() {
        redisTemplate.opsForValue().set(task_queue_switch , "1");
    }
    //######## ###########
    private boolean conditionSwitch(){
        //非空 true,空 false
        return StringUtils.isNotBlank(redisTemplate.opsForValue().get(task_queue_switch));
    }
    private boolean checkParam(TaskContext param){
        if(param == null){
            return false;
        }
        return true;
    }
}

相关class

ProcessResult
@Data
@NoArgsConstructor
public class ProcessResult<T> {
    /**
     * 响应code
     *
     */
    private Integer code;
    private T resultData;
}

TaskContext

@Data
public class TaskContext implements Serializable {
    /**
     * 延迟执行轮训任务时间执行时间,默认1s
     */
    private Integer delaySecond = 1;
    /**
     * 额外上下文信息
     */
    private String exData;
    /**
     * 提交轮训任务时间
     */
    private Long currentTimeMills;
    /**
     * 响应结果code
     *
     */
    private Integer code;
    /**
     * 最高失败可重试次数
     */
    private Integer maxTryCount;
    /**
     * 当前重试次数
     */
    private Integer currentTryCount;
    public TaskContext() {
    }
    public TaskContext(String exData, Integer maxTryCount, Integer delaySecond){
        this.exData = exData;
        this.maxTryCount = maxTryCount;
        if (delaySecond != null && delaySecond >= 0){
            this.delaySecond = delaySecond;
        }
        this.currentTryCount = 0;
        this.currentTimeMills = System.currentTimeMillis();
    }
}

测试类

@RunWith(Spring4JUnitParamsRunner.class)
@ContextConfiguration(locations = {"classpath*:application.xml"})
public class ServiceTest {
    @Autowired
    private RoundRobinComponent roundRobinComponent;
    @Test
    public void testRoundRobinComponent(){
        roundRobinComponent.push(new TaskContext(
                "my test text",
                5,
                1
                ));
    }
}
相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
18天前
|
存储 NoSQL Java
使用lock4j-redis-template-spring-boot-starter实现redis分布式锁
通过使用 `lock4j-redis-template-spring-boot-starter`,我们可以轻松实现 Redis 分布式锁,从而解决分布式系统中多个实例并发访问共享资源的问题。合理配置和使用分布式锁,可以有效提高系统的稳定性和数据的一致性。希望本文对你在实际项目中使用 Redis 分布式锁有所帮助。
47 5
|
1月前
|
消息中间件 NoSQL Java
Spring Boot整合Redis
通过Spring Boot整合Redis,可以显著提升应用的性能和响应速度。在本文中,我们详细介绍了如何配置和使用Redis,包括基本的CRUD操作和具有过期时间的值设置方法。希望本文能帮助你在实际项目中高效地整合和使用Redis。
57 2
|
2月前
|
NoSQL Java Redis
redis的基本命令,并用netty操作redis(不使用springboot或者spring框架)就单纯的用netty搞。
这篇文章介绍了Redis的基本命令,并展示了如何使用Netty框架直接与Redis服务器进行通信,包括设置Netty客户端、编写处理程序以及初始化Channel的完整示例代码。
67 1
redis的基本命令,并用netty操作redis(不使用springboot或者spring框架)就单纯的用netty搞。
|
2月前
|
缓存 NoSQL Java
Spring Boot与Redis:整合与实战
【10月更文挑战第15天】本文介绍了如何在Spring Boot项目中整合Redis,通过一个电商商品推荐系统的案例,详细展示了从添加依赖、配置连接信息到创建配置类的具体步骤。实战部分演示了如何利用Redis缓存提高系统响应速度,减少数据库访问压力,从而提升用户体验。
125 2
|
1月前
|
JavaScript NoSQL Java
CC-ADMIN后台简介一个基于 Spring Boot 2.1.3 、SpringBootMybatis plus、JWT、Shiro、Redis、Vue quasar 的前后端分离的后台管理系统
CC-ADMIN后台简介一个基于 Spring Boot 2.1.3 、SpringBootMybatis plus、JWT、Shiro、Redis、Vue quasar 的前后端分离的后台管理系统
45 0
|
2月前
|
NoSQL Java Redis
在 Spring 中操作 Redis
本文详细介绍了在Spring框架中如何通过引入依赖、配置文件、使用StringRedisTemplate类以及执行原生命令等方式来操作Redis数据库,并提供了对String、List、Set、Hash和ZSet数据类型的操作示例。
98 0
在 Spring 中操作 Redis
|
2月前
|
存储 NoSQL Java
Spring Boot项目中使用Redis实现接口幂等性的方案
通过上述方法,可以有效地在Spring Boot项目中利用Redis实现接口幂等性,既保证了接口操作的安全性,又提高了系统的可靠性。
58 0
|
2月前
|
人工智能 自然语言处理 前端开发
SpringBoot + 通义千问 + 自定义React组件:支持EventStream数据解析的技术实践
【10月更文挑战第7天】在现代Web开发中,集成多种技术栈以实现复杂的功能需求已成为常态。本文将详细介绍如何使用SpringBoot作为后端框架,结合阿里巴巴的通义千问(一个强大的自然语言处理服务),并通过自定义React组件来支持服务器发送事件(SSE, Server-Sent Events)的EventStream数据解析。这一组合不仅能够实现高效的实时通信,还能利用AI技术提升用户体验。
224 2
|
4天前
|
NoSQL Java Redis
Spring Boot 自动配置机制:从原理到自定义
Spring Boot 的自动配置机制通过 `spring.factories` 文件和 `@EnableAutoConfiguration` 注解,根据类路径中的依赖和条件注解自动配置所需的 Bean,大大简化了开发过程。本文深入探讨了自动配置的原理、条件化配置、自定义自动配置以及实际应用案例,帮助开发者更好地理解和利用这一强大特性。
42 14
|
27天前
|
缓存 IDE Java
SpringBoot入门(7)- 配置热部署devtools工具
SpringBoot入门(7)- 配置热部署devtools工具
42 1
SpringBoot入门(7)- 配置热部署devtools工具
下一篇
DataWorks