如何结合spring和redis进行优雅的轮询

本文涉及的产品
云数据库 Redis 版,社区版 2GB
推荐场景:
搭建游戏排行榜
简介: 如何结合spring和redis进行优雅的轮询

背景


当业务结束后需要轮训获取内部接口或者第三方接口执行结果信息,可以使用一些轮训方式,如mq、项目自己异步线程轮训、队列等方式,下面的方式是以redis和spring定时执行相结合实现的


配置


application.xml和application.properties配置redis相关配置


application.xml


    <!-- Redis start -->
    <!-- 连接池工厂配置 -->
    <bean id="jedisConnectionFactory" class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory"
          p:use-pool="true" p:host-name="${redis.host}" p:port="${redis.port}" p:password="${redis.password}"/>
    <!-- redis请求实例配置 -->
    <bean id="redisTemplate" class="org.springframework.data.redis.core.RedisTemplate"
          p:connection-factory-ref="jedisConnectionFactory"/>

application.properties

redis.host=xxx.net
redis.port=7100
redis.password=xxx
redis.app=xxx

业务实现

轮询component

@Slf4j
@Component
public class RoundRobinComponent implements InitializingBean , DisposableBean {
    @Override
    public void afterPropertiesSet() {
        open();
    }
    @Override
    public void destroy() throws Exception {
        stop();
    }
    private static final Logger LOGGER = LoggerFactory.getLogger(RoundRobinComponent.class);
    @Autowired
    private RedisTemplate<String, String> redisTemplate;
    /** 任务队列 key */
    private static final String task_queue_flag = "TASK:BUF:QUEUE";
    /** 全局开关 - 非空运行,空不运行 */
    private static final String task_queue_switch = "LOOP:SWITCH";
    private static final ThreadPoolExecutor POOL = new ThreadPoolExecutor(10, 20, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10000),
            new ThreadFactory() {
                @Override
                public Thread newThread(@NotNull Runnable r) {
                    return new Thread(r, "loop-task");
                }
            },
            new RejectedExecutionHandler() {
                @Override
                public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                    try {
                        executor.getQueue().put(r);
                    } catch (InterruptedException e) {
                        LOGGER.error("re put task error !" , e);
                    }
                }
            });
    //每1s执行任务,按机器数量和需求预估数量设置
    @Scheduled(cron = "0/1 * * * * ?")
    private void loop(){
        for(int i = 0 ; i < 10 ; i++){
            String context_json = redisTemplate.opsForList().leftPop(task_queue_flag);
            loop_exec(context_json);
        }
    }
    private void loop_exec(String context_json){
        POOL.execute(new Runnable() {
            @Override
            public void run() {
                TaskContext context = null;
                try {
                    context = JSONObject.parseObject(context_json, TaskContext.class);
                    if (
                            StringUtils.isNotBlank(context_json) &&
                                    context != null
                    ){
                        Integer delaySecond = context.getDelaySecond();
                        Long contextCurrentTimeMills = context.getCurrentTimeMills();
                        long currentTimeMillis = System.currentTimeMillis();
                        //不满足延迟执行时间重新入队
                        if (contextCurrentTimeMills + delaySecond * 1000 > currentTimeMillis ) {
                            push(context);
                            return;
                        }
                        //ProcessResult result = doGetResult;
                        //执行请求任务获取结果【伪代码】
                        ProcessResult result = new ProcessResult();
                        result.setCode(500);
                        //请求响应不成功且当前任务没超过最大次数继续入队,并设置当前重试次数
                        if (result.getCode() != 200 && context.getCurrentTryCount() < context.getMaxTryCount()){
                            Integer count = context.getCurrentTryCount() + 1;
                            context.setCurrentTryCount(count);
                            context.setCode(result.getCode());
                            push(context);
                        }
                        //到这一步,到达任务轮训上限且氢气都失败处理重试逻辑
                        if (result.getCode() != 200) {
                            //doFaildMethod();
                        }
                        if (result.getCode() == 200) {
                            //执行响应成功后的任务
                            //doSuccessMethod();
                        }
                    }
                    Thread.sleep(ThreadLocalRandom.current().nextInt(500) + 1);
                } catch (InterruptedException e) {
                    LOGGER.error("random Interrupted error ! context : {} " , JSON.toJSONString(context) , e);
                }
            }
        });
    }
    public Long push(TaskContext context) throws RuntimeException {
        if(!checkParam(context)){
            throw new RuntimeException("参数错误!");
        }
        if(conditionSwitch()){
            return redisTemplate.opsForList().rightPush(task_queue_flag , JSON.toJSONString(context));
        }
        return -1L;
    }
    private void stop() {
        redisTemplate.delete(task_queue_switch);
    }
    private void open() {
        redisTemplate.opsForValue().set(task_queue_switch , "1");
    }
    //######## ###########
    private boolean conditionSwitch(){
        //非空 true,空 false
        return StringUtils.isNotBlank(redisTemplate.opsForValue().get(task_queue_switch));
    }
    private boolean checkParam(TaskContext param){
        if(param == null){
            return false;
        }
        return true;
    }
}

相关class

ProcessResult
@Data
@NoArgsConstructor
public class ProcessResult<T> {
    /**
     * 响应code
     *
     */
    private Integer code;
    private T resultData;
}

TaskContext

@Data
public class TaskContext implements Serializable {
    /**
     * 延迟执行轮训任务时间执行时间,默认1s
     */
    private Integer delaySecond = 1;
    /**
     * 额外上下文信息
     */
    private String exData;
    /**
     * 提交轮训任务时间
     */
    private Long currentTimeMills;
    /**
     * 响应结果code
     *
     */
    private Integer code;
    /**
     * 最高失败可重试次数
     */
    private Integer maxTryCount;
    /**
     * 当前重试次数
     */
    private Integer currentTryCount;
    public TaskContext() {
    }
    public TaskContext(String exData, Integer maxTryCount, Integer delaySecond){
        this.exData = exData;
        this.maxTryCount = maxTryCount;
        if (delaySecond != null && delaySecond >= 0){
            this.delaySecond = delaySecond;
        }
        this.currentTryCount = 0;
        this.currentTimeMills = System.currentTimeMillis();
    }
}

测试类

@RunWith(Spring4JUnitParamsRunner.class)
@ContextConfiguration(locations = {"classpath*:application.xml"})
public class ServiceTest {
    @Autowired
    private RoundRobinComponent roundRobinComponent;
    @Test
    public void testRoundRobinComponent(){
        roundRobinComponent.push(new TaskContext(
                "my test text",
                5,
                1
                ));
    }
}
相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
2月前
|
NoSQL Java Redis
Spring Boot 监听 Redis Key 失效事件实现定时任务
Spring Boot 监听 Redis Key 失效事件实现定时任务
64 0
|
13天前
|
存储 NoSQL Java
Spring Boot与Redis:整合与实战
【4月更文挑战第29天】Redis,作为一个高性能的键值存储数据库,广泛应用于缓存、消息队列、会话存储等多种场景中。在Spring Boot应用中整合Redis可以显著提高数据处理的效率和应用的响应速度。
27 0
|
19天前
|
XML NoSQL Java
spring整合redis出错
spring整合redis出错
17 0
|
2月前
|
负载均衡 NoSQL Java
Spring Boot + Redis 处理 Session 共享
Spring Boot + Redis 处理 Session 共享
14 1
|
2月前
|
缓存 NoSQL Java
spring cache整合redis实现springboot项目中的缓存功能
spring cache整合redis实现springboot项目中的缓存功能
50 1
|
2月前
|
存储 NoSQL Java
[Redis]——Spring整合Redis(SpringDataRedis)
[Redis]——Spring整合Redis(SpringDataRedis)
|
2月前
|
监控 NoSQL Java
Spring Boot集成Redis启动失败【Caused by: java.lang.ClassNotFoundException: org.apache.commons.pool2.impl.G】
Spring Boot集成Redis启动失败【Caused by: java.lang.ClassNotFoundException: org.apache.commons.pool2.impl.G】
|
2月前
|
NoSQL Java 定位技术
|
2月前
|
Java 应用服务中间件 Maven
SpringBoot 项目瘦身指南
SpringBoot 项目瘦身指南
53 0
|
2月前
|
缓存 安全 Java
Spring Boot 面试题及答案整理,最新面试题
Spring Boot 面试题及答案整理,最新面试题
138 0