如何结合spring和redis进行优雅的轮询

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 Tair(兼容Redis),内存型 2GB
简介: 如何结合spring和redis进行优雅的轮询

背景


当业务结束后需要轮训获取内部接口或者第三方接口执行结果信息,可以使用一些轮训方式,如mq、项目自己异步线程轮训、队列等方式,下面的方式是以redis和spring定时执行相结合实现的


配置


application.xml和application.properties配置redis相关配置


application.xml


    <!-- Redis start -->
    <!-- 连接池工厂配置 -->
    <bean id="jedisConnectionFactory" class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory"
          p:use-pool="true" p:host-name="${redis.host}" p:port="${redis.port}" p:password="${redis.password}"/>
    <!-- redis请求实例配置 -->
    <bean id="redisTemplate" class="org.springframework.data.redis.core.RedisTemplate"
          p:connection-factory-ref="jedisConnectionFactory"/>

application.properties

redis.host=xxx.net
redis.port=7100
redis.password=xxx
redis.app=xxx

业务实现

轮询component

@Slf4j
@Component
public class RoundRobinComponent implements InitializingBean , DisposableBean {
    @Override
    public void afterPropertiesSet() {
        open();
    }
    @Override
    public void destroy() throws Exception {
        stop();
    }
    private static final Logger LOGGER = LoggerFactory.getLogger(RoundRobinComponent.class);
    @Autowired
    private RedisTemplate<String, String> redisTemplate;
    /** 任务队列 key */
    private static final String task_queue_flag = "TASK:BUF:QUEUE";
    /** 全局开关 - 非空运行,空不运行 */
    private static final String task_queue_switch = "LOOP:SWITCH";
    private static final ThreadPoolExecutor POOL = new ThreadPoolExecutor(10, 20, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10000),
            new ThreadFactory() {
                @Override
                public Thread newThread(@NotNull Runnable r) {
                    return new Thread(r, "loop-task");
                }
            },
            new RejectedExecutionHandler() {
                @Override
                public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                    try {
                        executor.getQueue().put(r);
                    } catch (InterruptedException e) {
                        LOGGER.error("re put task error !" , e);
                    }
                }
            });
    //每1s执行任务,按机器数量和需求预估数量设置
    @Scheduled(cron = "0/1 * * * * ?")
    private void loop(){
        for(int i = 0 ; i < 10 ; i++){
            String context_json = redisTemplate.opsForList().leftPop(task_queue_flag);
            loop_exec(context_json);
        }
    }
    private void loop_exec(String context_json){
        POOL.execute(new Runnable() {
            @Override
            public void run() {
                TaskContext context = null;
                try {
                    context = JSONObject.parseObject(context_json, TaskContext.class);
                    if (
                            StringUtils.isNotBlank(context_json) &&
                                    context != null
                    ){
                        Integer delaySecond = context.getDelaySecond();
                        Long contextCurrentTimeMills = context.getCurrentTimeMills();
                        long currentTimeMillis = System.currentTimeMillis();
                        //不满足延迟执行时间重新入队
                        if (contextCurrentTimeMills + delaySecond * 1000 > currentTimeMillis ) {
                            push(context);
                            return;
                        }
                        //ProcessResult result = doGetResult;
                        //执行请求任务获取结果【伪代码】
                        ProcessResult result = new ProcessResult();
                        result.setCode(500);
                        //请求响应不成功且当前任务没超过最大次数继续入队,并设置当前重试次数
                        if (result.getCode() != 200 && context.getCurrentTryCount() < context.getMaxTryCount()){
                            Integer count = context.getCurrentTryCount() + 1;
                            context.setCurrentTryCount(count);
                            context.setCode(result.getCode());
                            push(context);
                        }
                        //到这一步,到达任务轮训上限且氢气都失败处理重试逻辑
                        if (result.getCode() != 200) {
                            //doFaildMethod();
                        }
                        if (result.getCode() == 200) {
                            //执行响应成功后的任务
                            //doSuccessMethod();
                        }
                    }
                    Thread.sleep(ThreadLocalRandom.current().nextInt(500) + 1);
                } catch (InterruptedException e) {
                    LOGGER.error("random Interrupted error ! context : {} " , JSON.toJSONString(context) , e);
                }
            }
        });
    }
    public Long push(TaskContext context) throws RuntimeException {
        if(!checkParam(context)){
            throw new RuntimeException("参数错误!");
        }
        if(conditionSwitch()){
            return redisTemplate.opsForList().rightPush(task_queue_flag , JSON.toJSONString(context));
        }
        return -1L;
    }
    private void stop() {
        redisTemplate.delete(task_queue_switch);
    }
    private void open() {
        redisTemplate.opsForValue().set(task_queue_switch , "1");
    }
    //######## ###########
    private boolean conditionSwitch(){
        //非空 true,空 false
        return StringUtils.isNotBlank(redisTemplate.opsForValue().get(task_queue_switch));
    }
    private boolean checkParam(TaskContext param){
        if(param == null){
            return false;
        }
        return true;
    }
}

相关class

ProcessResult
@Data
@NoArgsConstructor
public class ProcessResult<T> {
    /**
     * 响应code
     *
     */
    private Integer code;
    private T resultData;
}

TaskContext

@Data
public class TaskContext implements Serializable {
    /**
     * 延迟执行轮训任务时间执行时间,默认1s
     */
    private Integer delaySecond = 1;
    /**
     * 额外上下文信息
     */
    private String exData;
    /**
     * 提交轮训任务时间
     */
    private Long currentTimeMills;
    /**
     * 响应结果code
     *
     */
    private Integer code;
    /**
     * 最高失败可重试次数
     */
    private Integer maxTryCount;
    /**
     * 当前重试次数
     */
    private Integer currentTryCount;
    public TaskContext() {
    }
    public TaskContext(String exData, Integer maxTryCount, Integer delaySecond){
        this.exData = exData;
        this.maxTryCount = maxTryCount;
        if (delaySecond != null && delaySecond >= 0){
            this.delaySecond = delaySecond;
        }
        this.currentTryCount = 0;
        this.currentTimeMills = System.currentTimeMillis();
    }
}

测试类

@RunWith(Spring4JUnitParamsRunner.class)
@ContextConfiguration(locations = {"classpath*:application.xml"})
public class ServiceTest {
    @Autowired
    private RoundRobinComponent roundRobinComponent;
    @Test
    public void testRoundRobinComponent(){
        roundRobinComponent.push(new TaskContext(
                "my test text",
                5,
                1
                ));
    }
}
相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
打赏
0
0
0
0
165
分享
相关文章
基于Spring Data Redis与RabbitMQ实现字符串缓存和计数功能(数据同步)
总的来说,借助Spring Data Redis和RabbitMQ,我们可以轻松实现字符串缓存和计数的功能。而关键的部分不过是一些"厨房的套路",一旦你掌握了这些套路,那么你就像厨师一样可以准备出一道道饕餮美食了。通过这种方式促进数据处理效率无疑将大大提高我们的生产力。
115 32
|
3月前
|
深入理解 RedisConnectionFactory:Spring Data Redis 的核心组件
在 Spring Data Redis 中,`RedisConnectionFactory` 是核心组件,负责创建和管理与 Redis 的连接。它支持单机、集群及哨兵等多种模式,为上层组件(如 `RedisTemplate`)提供连接抽象。Spring 提供了 Lettuce 和 Jedis 两种主要实现,其中 Lettuce 因其线程安全和高性能特性被广泛推荐。通过手动配置或 Spring Boot 自动化配置,开发者可轻松集成 Redis,提升应用性能与扩展性。本文深入解析其作用、实现方式及常见问题解决方法,助你高效使用 Redis。
291 4
|
4月前
|
微服务——SpringBoot使用归纳——Spring Boot 中集成Redis——Spring Boot 集成 Redis
本文介绍了在Spring Boot中集成Redis的方法,包括依赖导入、Redis配置及常用API的使用。通过导入`spring-boot-starter-data-redis`依赖和配置`application.yml`文件,可轻松实现Redis集成。文中详细讲解了StringRedisTemplate的使用,适用于字符串操作,并结合FastJSON将实体类转换为JSON存储。还展示了Redis的string、hash和list类型的操作示例。最后总结了Redis在缓存和高并发场景中的应用价值,并提供课程源代码下载链接。
312 0
|
4月前
|
微服务——SpringBoot使用归纳——Spring Boot 中集成Redis——Redis 安装
本教程介绍在 VMware 虚拟机(CentOS 7)或阿里云服务器中安装 Redis 的过程,包括安装 gcc 编译环境、下载 Redis(官网或 wget)、解压安装、修改配置文件(如 bind、daemonize、requirepass 等设置)、启动 Redis 服务及测试客户端连接。通过 set 和 get 命令验证安装是否成功。适用于初学者快速上手 Redis 部署。
72 0
微服务——SpringBoot使用归纳——Spring Boot 中集成Redis——Redis 介绍
本文介绍在 Spring Boot 中集成 Redis 的方法。Redis 是一种支持多种数据结构的非关系型数据库(NoSQL),具备高并发、高性能和灵活扩展的特点,适用于缓存、实时数据分析等场景。其数据以键值对形式存储,支持字符串、哈希、列表、集合等类型。通过将 Redis 与 Mysql 集群结合使用,可实现数据同步,提升系统稳定性。例如,在网站架构中优先从 Redis 获取数据,故障时回退至 Mysql,确保服务不中断。
148 0
微服务——SpringBoot使用归纳——Spring Boot 中集成Redis——Redis 介绍
|
7月前
|
使用lock4j-redis-template-spring-boot-starter实现redis分布式锁
通过使用 `lock4j-redis-template-spring-boot-starter`,我们可以轻松实现 Redis 分布式锁,从而解决分布式系统中多个实例并发访问共享资源的问题。合理配置和使用分布式锁,可以有效提高系统的稳定性和数据的一致性。希望本文对你在实际项目中使用 Redis 分布式锁有所帮助。
459 5
Spring Boot整合Redis
通过Spring Boot整合Redis,可以显著提升应用的性能和响应速度。在本文中,我们详细介绍了如何配置和使用Redis,包括基本的CRUD操作和具有过期时间的值设置方法。希望本文能帮助你在实际项目中高效地整合和使用Redis。
441 2
|
2月前
|
Redis+Caffeine构建高性能二级缓存
大家好,我是摘星。今天为大家带来的是Redis+Caffeine构建高性能二级缓存,废话不多说直接开始~
319 0
|
2月前
|
Redis:现代服务端开发的缓存基石与电商实践-优雅草卓伊凡
Redis:现代服务端开发的缓存基石与电商实践-优雅草卓伊凡
69 5
Redis:现代服务端开发的缓存基石与电商实践-优雅草卓伊凡
AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等

登录插画

登录以查看您的控制台资源

管理云资源
状态一览
快捷访问