如何结合spring和redis进行优雅的轮询

本文涉及的产品
云数据库 Redis 版,社区版 2GB
推荐场景:
搭建游戏排行榜
云原生内存数据库 Tair,内存型 2GB
云数据库 Redis 版,经济版 1GB 1个月
简介: 如何结合spring和redis进行优雅的轮询

背景


当业务结束后需要轮训获取内部接口或者第三方接口执行结果信息,可以使用一些轮训方式,如mq、项目自己异步线程轮训、队列等方式,下面的方式是以redis和spring定时执行相结合实现的


配置


application.xml和application.properties配置redis相关配置


application.xml


    <!-- Redis start -->
    <!-- 连接池工厂配置 -->
    <bean id="jedisConnectionFactory" class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory"
          p:use-pool="true" p:host-name="${redis.host}" p:port="${redis.port}" p:password="${redis.password}"/>
    <!-- redis请求实例配置 -->
    <bean id="redisTemplate" class="org.springframework.data.redis.core.RedisTemplate"
          p:connection-factory-ref="jedisConnectionFactory"/>

application.properties

redis.host=xxx.net
redis.port=7100
redis.password=xxx
redis.app=xxx

业务实现

轮询component

@Slf4j
@Component
public class RoundRobinComponent implements InitializingBean , DisposableBean {
    @Override
    public void afterPropertiesSet() {
        open();
    }
    @Override
    public void destroy() throws Exception {
        stop();
    }
    private static final Logger LOGGER = LoggerFactory.getLogger(RoundRobinComponent.class);
    @Autowired
    private RedisTemplate<String, String> redisTemplate;
    /** 任务队列 key */
    private static final String task_queue_flag = "TASK:BUF:QUEUE";
    /** 全局开关 - 非空运行,空不运行 */
    private static final String task_queue_switch = "LOOP:SWITCH";
    private static final ThreadPoolExecutor POOL = new ThreadPoolExecutor(10, 20, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10000),
            new ThreadFactory() {
                @Override
                public Thread newThread(@NotNull Runnable r) {
                    return new Thread(r, "loop-task");
                }
            },
            new RejectedExecutionHandler() {
                @Override
                public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                    try {
                        executor.getQueue().put(r);
                    } catch (InterruptedException e) {
                        LOGGER.error("re put task error !" , e);
                    }
                }
            });
    //每1s执行任务,按机器数量和需求预估数量设置
    @Scheduled(cron = "0/1 * * * * ?")
    private void loop(){
        for(int i = 0 ; i < 10 ; i++){
            String context_json = redisTemplate.opsForList().leftPop(task_queue_flag);
            loop_exec(context_json);
        }
    }
    private void loop_exec(String context_json){
        POOL.execute(new Runnable() {
            @Override
            public void run() {
                TaskContext context = null;
                try {
                    context = JSONObject.parseObject(context_json, TaskContext.class);
                    if (
                            StringUtils.isNotBlank(context_json) &&
                                    context != null
                    ){
                        Integer delaySecond = context.getDelaySecond();
                        Long contextCurrentTimeMills = context.getCurrentTimeMills();
                        long currentTimeMillis = System.currentTimeMillis();
                        //不满足延迟执行时间重新入队
                        if (contextCurrentTimeMills + delaySecond * 1000 > currentTimeMillis ) {
                            push(context);
                            return;
                        }
                        //ProcessResult result = doGetResult;
                        //执行请求任务获取结果【伪代码】
                        ProcessResult result = new ProcessResult();
                        result.setCode(500);
                        //请求响应不成功且当前任务没超过最大次数继续入队,并设置当前重试次数
                        if (result.getCode() != 200 && context.getCurrentTryCount() < context.getMaxTryCount()){
                            Integer count = context.getCurrentTryCount() + 1;
                            context.setCurrentTryCount(count);
                            context.setCode(result.getCode());
                            push(context);
                        }
                        //到这一步,到达任务轮训上限且氢气都失败处理重试逻辑
                        if (result.getCode() != 200) {
                            //doFaildMethod();
                        }
                        if (result.getCode() == 200) {
                            //执行响应成功后的任务
                            //doSuccessMethod();
                        }
                    }
                    Thread.sleep(ThreadLocalRandom.current().nextInt(500) + 1);
                } catch (InterruptedException e) {
                    LOGGER.error("random Interrupted error ! context : {} " , JSON.toJSONString(context) , e);
                }
            }
        });
    }
    public Long push(TaskContext context) throws RuntimeException {
        if(!checkParam(context)){
            throw new RuntimeException("参数错误!");
        }
        if(conditionSwitch()){
            return redisTemplate.opsForList().rightPush(task_queue_flag , JSON.toJSONString(context));
        }
        return -1L;
    }
    private void stop() {
        redisTemplate.delete(task_queue_switch);
    }
    private void open() {
        redisTemplate.opsForValue().set(task_queue_switch , "1");
    }
    //######## ###########
    private boolean conditionSwitch(){
        //非空 true,空 false
        return StringUtils.isNotBlank(redisTemplate.opsForValue().get(task_queue_switch));
    }
    private boolean checkParam(TaskContext param){
        if(param == null){
            return false;
        }
        return true;
    }
}

相关class

ProcessResult
@Data
@NoArgsConstructor
public class ProcessResult<T> {
    /**
     * 响应code
     *
     */
    private Integer code;
    private T resultData;
}

TaskContext

@Data
public class TaskContext implements Serializable {
    /**
     * 延迟执行轮训任务时间执行时间,默认1s
     */
    private Integer delaySecond = 1;
    /**
     * 额外上下文信息
     */
    private String exData;
    /**
     * 提交轮训任务时间
     */
    private Long currentTimeMills;
    /**
     * 响应结果code
     *
     */
    private Integer code;
    /**
     * 最高失败可重试次数
     */
    private Integer maxTryCount;
    /**
     * 当前重试次数
     */
    private Integer currentTryCount;
    public TaskContext() {
    }
    public TaskContext(String exData, Integer maxTryCount, Integer delaySecond){
        this.exData = exData;
        this.maxTryCount = maxTryCount;
        if (delaySecond != null && delaySecond >= 0){
            this.delaySecond = delaySecond;
        }
        this.currentTryCount = 0;
        this.currentTimeMills = System.currentTimeMillis();
    }
}

测试类

@RunWith(Spring4JUnitParamsRunner.class)
@ContextConfiguration(locations = {"classpath*:application.xml"})
public class ServiceTest {
    @Autowired
    private RoundRobinComponent roundRobinComponent;
    @Test
    public void testRoundRobinComponent(){
        roundRobinComponent.push(new TaskContext(
                "my test text",
                5,
                1
                ));
    }
}
相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
8天前
|
缓存 NoSQL Java
Redis Spring配置集群
【7月更文挑战第5天】
36 10
|
18天前
|
NoSQL Java 应用服务中间件
蓝易云 - Spring redis使用报错Read timed out排查解决
以上都是可能的解决方案,具体的解决方案可能会因具体情况而异。
18 1
|
20天前
|
NoSQL Java 应用服务中间件
蓝易云 - Spring redis使用报错Read timed out排查解决
以上都是可能的解决方案,具体的解决方案可能会因具体情况而异。
13 2
|
14天前
|
缓存 NoSQL Java
Spring Boot整合Redis缓存的最佳实践
Spring Boot整合Redis缓存的最佳实践
|
17天前
|
缓存 NoSQL Java
Spring Boot与Redis的缓存一致性问题
Spring Boot与Redis的缓存一致性问题
|
17天前
|
消息中间件 NoSQL Java
Spring Boot中使用Redis和Lua脚本实现延时队列
Spring Boot中使用Redis和Lua脚本实现延时队列
|
25天前
|
NoSQL Java Redis
Spring Boot2 系列教程(二十六)Spring Boot 整合 Redis
Spring Boot2 系列教程(二十六)Spring Boot 整合 Redis
|
2月前
|
NoSQL Java Redis
SpringBoot集成Redis解决表单重复提交接口幂等(亲测可用)
SpringBoot集成Redis解决表单重复提交接口幂等(亲测可用)
431 0
|
2月前
|
NoSQL Java Redis
SpringBoot集成Redis
SpringBoot集成Redis
477 0
|
2月前
|
缓存 NoSQL Java
springboot业务开发--springboot集成redis解决缓存雪崩穿透问题
该文介绍了缓存使用中可能出现的三个问题及解决方案:缓存穿透、缓存击穿和缓存雪崩。为防止缓存穿透,可校验请求数据并缓存空值;缓存击穿可采用限流、热点数据预加载或加锁策略;缓存雪崩则需避免同一时间大量缓存失效,可设置随机过期时间。文章还提及了Spring Boot中Redis缓存的配置,包括缓存null值、使用前缀和自定义过期时间,并提供了改造代码以实现缓存到期时间的个性化设置。