背景
当业务结束后需要轮训获取内部接口或者第三方接口执行结果信息,可以使用一些轮训方式,如mq、项目自己异步线程轮训、队列等方式,下面的方式是以redis和spring定时执行相结合实现的
配置
application.xml和application.properties配置redis相关配置
application.xml
<!-- Redis start --> <!-- 连接池工厂配置 --> <bean id="jedisConnectionFactory" class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory" p:use-pool="true" p:host-name="${redis.host}" p:port="${redis.port}" p:password="${redis.password}"/> <!-- redis请求实例配置 --> <bean id="redisTemplate" class="org.springframework.data.redis.core.RedisTemplate" p:connection-factory-ref="jedisConnectionFactory"/>
application.properties
redis.host=xxx.net redis.port=7100 redis.password=xxx redis.app=xxx
业务实现
轮询component
@Slf4j @Component public class RoundRobinComponent implements InitializingBean , DisposableBean { @Override public void afterPropertiesSet() { open(); } @Override public void destroy() throws Exception { stop(); } private static final Logger LOGGER = LoggerFactory.getLogger(RoundRobinComponent.class); @Autowired private RedisTemplate<String, String> redisTemplate; /** 任务队列 key */ private static final String task_queue_flag = "TASK:BUF:QUEUE"; /** 全局开关 - 非空运行,空不运行 */ private static final String task_queue_switch = "LOOP:SWITCH"; private static final ThreadPoolExecutor POOL = new ThreadPoolExecutor(10, 20, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10000), new ThreadFactory() { @Override public Thread newThread(@NotNull Runnable r) { return new Thread(r, "loop-task"); } }, new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { try { executor.getQueue().put(r); } catch (InterruptedException e) { LOGGER.error("re put task error !" , e); } } }); //每1s执行任务,按机器数量和需求预估数量设置 @Scheduled(cron = "0/1 * * * * ?") private void loop(){ for(int i = 0 ; i < 10 ; i++){ String context_json = redisTemplate.opsForList().leftPop(task_queue_flag); loop_exec(context_json); } } private void loop_exec(String context_json){ POOL.execute(new Runnable() { @Override public void run() { TaskContext context = null; try { context = JSONObject.parseObject(context_json, TaskContext.class); if ( StringUtils.isNotBlank(context_json) && context != null ){ Integer delaySecond = context.getDelaySecond(); Long contextCurrentTimeMills = context.getCurrentTimeMills(); long currentTimeMillis = System.currentTimeMillis(); //不满足延迟执行时间重新入队 if (contextCurrentTimeMills + delaySecond * 1000 > currentTimeMillis ) { push(context); return; } //ProcessResult result = doGetResult; //执行请求任务获取结果【伪代码】 ProcessResult result = new ProcessResult(); result.setCode(500); //请求响应不成功且当前任务没超过最大次数继续入队,并设置当前重试次数 if (result.getCode() != 200 && context.getCurrentTryCount() < context.getMaxTryCount()){ Integer count = context.getCurrentTryCount() + 1; context.setCurrentTryCount(count); context.setCode(result.getCode()); push(context); } //到这一步,到达任务轮训上限且氢气都失败处理重试逻辑 if (result.getCode() != 200) { //doFaildMethod(); } if (result.getCode() == 200) { //执行响应成功后的任务 //doSuccessMethod(); } } Thread.sleep(ThreadLocalRandom.current().nextInt(500) + 1); } catch (InterruptedException e) { LOGGER.error("random Interrupted error ! context : {} " , JSON.toJSONString(context) , e); } } }); } public Long push(TaskContext context) throws RuntimeException { if(!checkParam(context)){ throw new RuntimeException("参数错误!"); } if(conditionSwitch()){ return redisTemplate.opsForList().rightPush(task_queue_flag , JSON.toJSONString(context)); } return -1L; } private void stop() { redisTemplate.delete(task_queue_switch); } private void open() { redisTemplate.opsForValue().set(task_queue_switch , "1"); } //######## ########### private boolean conditionSwitch(){ //非空 true,空 false return StringUtils.isNotBlank(redisTemplate.opsForValue().get(task_queue_switch)); } private boolean checkParam(TaskContext param){ if(param == null){ return false; } return true; } }
相关class
ProcessResult
@Data @NoArgsConstructor public class ProcessResult<T> { /** * 响应code * */ private Integer code; private T resultData; }
TaskContext
@Data public class TaskContext implements Serializable { /** * 延迟执行轮训任务时间执行时间,默认1s */ private Integer delaySecond = 1; /** * 额外上下文信息 */ private String exData; /** * 提交轮训任务时间 */ private Long currentTimeMills; /** * 响应结果code * */ private Integer code; /** * 最高失败可重试次数 */ private Integer maxTryCount; /** * 当前重试次数 */ private Integer currentTryCount; public TaskContext() { } public TaskContext(String exData, Integer maxTryCount, Integer delaySecond){ this.exData = exData; this.maxTryCount = maxTryCount; if (delaySecond != null && delaySecond >= 0){ this.delaySecond = delaySecond; } this.currentTryCount = 0; this.currentTimeMills = System.currentTimeMillis(); } }
测试类
@RunWith(Spring4JUnitParamsRunner.class) @ContextConfiguration(locations = {"classpath*:application.xml"}) public class ServiceTest { @Autowired private RoundRobinComponent roundRobinComponent; @Test public void testRoundRobinComponent(){ roundRobinComponent.push(new TaskContext( "my test text", 5, 1 )); } }