【Spring Cloud】新闻头条微服务项目:使用Reids延迟队列实现文章定时发布(下)

本文涉及的产品
注册配置 MSE Nacos/ZooKeeper,118元/月
云原生网关 MSE Higress,422元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 主要介绍了如何使用redis的zset及list数据类型实现延迟队列完成文章的定时发布功能。

目录

一:未来数据定时刷新

1.redis key值匹配

方案一:keys模糊匹配

方案二:scan

2.redis管道

3.定时刷新功能实现

二:分布式锁解决集群下的方法抢占执行

1.问题描述

2.分布式锁

3.redis分布式锁

4.实现

(1)方法添加

(2) 代码修改

5.数据库同步

三:延迟队列实现定时发布

1.提供对外接口

2.具体实现

(1)前期准备

(2)添加任务到延迟队列

(3)修改发布文章代码

(4)消费任务进行文章审核


一:未来数据定时刷新

1.redis key值匹配

       将未来5分钟之内要发布的文章加入到redis之后,我们需要定时对这部分数据(也就是zset中的数据)进行扫描,以便将zset中时间到了文章存入list中准备发布,但是这时候扫描zset中的数据有两种选择,见下面分析:

方案一:keys模糊匹配

keys的模糊匹配功能很方便也很强大,但是在生产环境需要慎用!开发中使用keys的模糊匹配却发现redis的CPU使用率极高,所以公司的redis生产环境将keys命令禁用了!redis是单线程,会被堵塞

image.gif编辑

方案二:scan

SCAN 命令是一个基于游标的迭代器,SCAN命令每次被调用之后, 都会向用户返回一个新的游标, 用户在下次迭代时需要使用这个新游标作为SCAN命令的游标参数, 以此来延续之前的迭代过程。  

image.gif编辑

2.redis管道

普通redis客户端和服务器交互模式

image.gif编辑

Pipeline请求模型

image.gif编辑

       两者的区别从图中就可以看出来,第一种方式对每一个命令都需要向服务端发送一次请求,假如命令过多会不断创建连接,降低执行效率;而第二种方式则是将一批命令积攒到一起再开启通道一次性执行,大大减少了连接数。  

3.定时刷新功能实现

在taskserviceImpl中添加如下方法,并且引导类中开启任务调度注解@EnableScheduling

@Scheduled(cron = "0 */1 * * * ?")
public void refresh() {
    // 获取所有未来数据集合的key值
    Set<String> futureKeys = cacheService.scan(ScheduleConstants.FUTURE + "*");// future_*
    for (String futureKey : futureKeys) { // future_250_250
        String topicKey = ScheduleConstants.TOPIC + futureKey.split(ScheduleConstants.FUTURE)[1];
        //获取该组key下当前需要消费的任务数据
        Set<String> tasks = cacheService.zRangeByScore(futureKey, 0, System.currentTimeMillis());
        if (!tasks.isEmpty()) {
            //将这些任务数据添加到消费者队列中
            cacheService.refreshWithPipeline(futureKey, topicKey, tasks);
            log.info("成功将{}下的当前需要执行的任务添加到{}",futureKey,topicKey);
        }
    }
}

image.gif

二:分布式锁解决集群下的方法抢占执行

1.问题描述

       假如启动两台tbug-headlines-schedule服务,这时候两者都会去执行refresh方法,但是我们只需要其中一台去执行扫描任务即可,这时候就需要加入分布式锁来进行控制。

2.分布式锁

分布式锁:控制分布式系统有序的去对共享资源进行操作,通过互斥来保证数据的一致性。

解决方案:

image.gif编辑

3.redis分布式锁

setnx (Set if Not Exists) 命令在指定的 key 不存在时,为 key 设置指定的值。

image.gif编辑

这种加锁的思路是,如果 key 不存在则为 key 设置 value,如果 key 已存在则 SETNX 命令不做任何操作

    • 客户端A请求服务器设置key的值,如果设置成功就表示加锁成功
    • 客户端B也去请求服务器设置key的值,如果返回失败,那么就代表加锁失败
    • 客户端A执行代码完成,删除锁
    • 客户端B在等待一段时间后再去请求设置key的值,设置成功
    • 客户端B执行代码完成,删除锁

    4.实现

    (1)方法添加

    在工具类CacheService中添加如下方法:

    /**
     * 加锁
     *
     * @param name
     * @param expire
     * @return
     */
    public String tryLock(String name, long expire) {
        name = name + "_lock";
        String token = UUID.randomUUID().toString();
        RedisConnectionFactory factory = stringRedisTemplate.getConnectionFactory();
        RedisConnection conn = factory.getConnection();
        try {
            //参考redis命令:
            //set key value [EX seconds] [PX milliseconds] [NX|XX]
            Boolean result = conn.set(
                    name.getBytes(),
                    token.getBytes(),
                    Expiration.from(expire, TimeUnit.MILLISECONDS),
                    RedisStringCommands.SetOption.SET_IF_ABSENT //NX
            );
            if (result != null && result)
                return token;
        } finally {
            RedisConnectionUtils.releaseConnection(conn, factory,false);
        }
        return null;
    }

    image.gif

    参数name表示锁的名称,expire表示锁的过期时间,最重要的是set方法中最后一个参数RedisStringCommands.SetOption.SET_IF_ABSENT,这表示当有一个请求过来之后就会设置key值进行加锁,这样再有请求过来就获取不到了。

    (2) 代码修改

    /**
     * 定时器任务,每分钟扫描redis一次
     */
    @Scheduled(cron = "0 */1 * * * ?")
    public void refresh() {
        String token = cacheService.tryLock("FUTURE_TASK_SYNC", 1000 * 30);//加锁,30秒过期
        if(StringUtils.isNotBlank(token)) {
            log.info("开始执行定时扫描redis任务...");
            //获取所有未来数据集合的key值
            Set<String> futureKeys = cacheService.scan(ScheduleConstants.FUTURE + "*");
            for (String futureKey : futureKeys) {
                //截取后半部分 future_200_100 --> _200_100
                String topicKey = ScheduleConstants.TOPIC + futureKey.split(ScheduleConstants.FUTURE)[1];
                //获取该组key下当前需要清理的任务数据
                Set<String> tasks = cacheService.zRangeByScore(futureKey,0,System.currentTimeMillis());
                if(!tasks.isEmpty()) {
                    //将这些数据添加到消费队列中
                    cacheService.refreshWithPipeline(futureKey,topicKey,tasks);
                    log.info("成功将{}下的当前需要执行的任务添加到{}",futureKey,topicKey);
                }
            }
        }
    }

    image.gif

    5.数据库同步

    在完成上述操作之后,我们需要知道的是redis中只是存放现在就需要发布和5分钟之内需要发布的文章,而那些超过5分钟之后才需要发布的文章(比如一天之后发布)我们是不将其存入redis中的,它们只是存放在数据库中,这时候就需要定时去扫描数据库查看哪些文章需要被放入redis进行处理,流程图还可以参考上一篇文章的:

    image.gif编辑

    /**
     * 数据库同步任务,每五分钟执行一次
     */
    @PostConstruct  //表示服务一启动便执行一次
    @Scheduled(cron = "0 */5 * * * ?")
    public void reloadData() {
        log.info("开始同步数据库任务到redis...");
        //1.清理缓存任务,避免重复
        clearCache();
        //2.获取5分钟之后的时间
        Calendar calendar = Calendar.getInstance();
        calendar.add(Calendar.MINUTE,5);
        //3.查看未来所有小于5分钟的任务
        List<Taskinfo> tasks = taskInfoMapper.selectList
                (Wrappers.<Taskinfo>lambdaQuery().lt(Taskinfo::getExecuteTime, calendar.getTime()));
        if(tasks != null && tasks.size() > 0) {
            for (Taskinfo taskinfo : tasks) {
                Task task = new Task();
                BeanUtils.copyProperties(taskinfo,task);
                task.setExecuteTime(taskinfo.getExecuteTime().getTime());
                addTaskToCache(task);
            }
        }
    }
    /**
     * 清理缓存任务
     */
    private void clearCache() {
        log.info("开始清理缓存任务...");
        //获取任务集
        Set<String> futureKeys = cacheService.scan(ScheduleConstants.FUTURE + "*");
        Set<String> topicKeys = cacheService.scan(ScheduleConstants.TOPIC + "*");
        cacheService.delete(futureKeys);
        cacheService.delete(topicKeys);
    }

    image.gif

    三:延迟队列实现定时发布

    1.提供对外接口

    提供远程的feign接口,在tbug-headlines-feign-api编写类如下:  

    package com.my.apis.schedule;
    import com.my.model.common.dtos.ResponseResult;
    import com.my.model.schedule.dtos.Task;
    import org.springframework.cloud.openfeign.FeignClient;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.PathVariable;
    import org.springframework.web.bind.annotation.PostMapping;
    import org.springframework.web.bind.annotation.RequestBody;
    @FeignClient(value = "headlines-schedule")
    public interface IScheduleClient {
        /**
         * 添加任务
         * @param task   任务对象
         * @return       任务id
         */
        @PostMapping("/api/v1/task/add")
        ResponseResult addTask(@RequestBody Task task);
        /**
         * 取消任务
         * @param taskId        任务id
         * @return              取消结果
         */
        @GetMapping("/api/v1/task/cancel/{taskId}")
        ResponseResult cancelTask(@PathVariable("taskId") long taskId);
        /**
         * 按照类型和优先级来拉取任务
         * @param type
         * @param priority
         * @return
         */
        @GetMapping("/api/v1/task/poll/{type}/{priority}")
        ResponseResult poll(@PathVariable("type") int type, @PathVariable("priority")  int priority);
    }

    image.gif

    在tbug-headlines-schedule微服务下提供对应的实现

    package com.my.schedule.feign;
    import com.my.apis.schedule.IScheduleClient;
    import com.my.model.common.dtos.ResponseResult;
    import com.my.model.schedule.dtos.Task;
    import com.my.schedule.service.TaskService;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.*;
    @RestController
    public class ScheduleClient implements IScheduleClient {
        @Autowired
        private TaskService taskService;
        /**
         * 添加任务
         * @param task   任务对象
         * @return       任务id
         */
        @Override
        @PostMapping("/api/v1/task/add")
        public ResponseResult addTask(@RequestBody Task task) {
            return ResponseResult.okResult(taskService.addTask(task));
        }
        /**
         * 取消任务
         * @param taskId        任务id
         * @return              取消结果
         */
        @Override
        @GetMapping("/api/v1/task/cancel/{taskId}")
        public ResponseResult cancelTask(@PathVariable long taskId) {
            return ResponseResult.okResult(taskService.cancelTask(taskId));
        }
        /**
         * 按照类型和优先级来拉取任务
         * @param type
         * @param priority
         * @return
         */
        @Override
        @GetMapping("/api/v1/task/poll/{type}/{priority}")
        public ResponseResult poll(@PathVariable("type") int type,@PathVariable("priority") int priority) {
            return ResponseResult.okResult(taskService.poll(type,priority));
        }
    }

    image.gif

    2.具体实现

    (1)前期准备

    ①枚举类

    package com.my.model.common.enums;
    import lombok.AllArgsConstructor;
    import lombok.Getter;
    @Getter
    @AllArgsConstructor
    public enum TaskTypeEnum {
        NEWS_SCAN_TIME(1001, 1,"文章定时审核"),
        REMOTEERROR(1002, 2,"第三方接口调用失败,重试");
        private final int taskType; //对应具体业务
        private final int priority; //业务不同级别
        private final String desc; //描述信息
    }

    image.gif

    ②序列化工具

           在添加任务到延迟队列的方法中,我们需要用到序列化工具进行序列化操作,而在任务消费时候又需要进行反序列化操作。java内置的序列化能将实现了Serilazable接口的对象进行序列化和反序列化,但是这里我选用的是效率更高的Protostuff。Protostuff是google开源的,其采用更为紧凑的二进制数组,表现更加优异。

    将ProtostuffUtil拷贝到tbug-headlines-utils中,然后导入如下依赖

    <dependency>
        <groupId>io.protostuff</groupId>
        <artifactId>protostuff-core</artifactId>
        <version>1.6.0</version>
    </dependency>
    <dependency>
        <groupId>io.protostuff</groupId>
        <artifactId>protostuff-runtime</artifactId>
        <version>1.6.0</version>
    </dependency>

    image.gif

    (2)添加任务到延迟队列

    创建WmNewsTaskService

    package com.my.wemedia.service;
    import java.util.Date;
    public interface WmNewsTaskService {
        /**
         * 添加任务到延迟队列中
         * @param id 文章id
         * @param publishTime  文章发布时间
         */
        void addNewsToTask(Integer id, Date publishTime);
        /**
         * 消费延迟队列数据
         */
        void scanNewsByTask();
    }

    image.gif

    实现类

    package com.my.wemedia.service.impl;
    import com.alibaba.fastjson.JSON;
    import com.my.apis.schedule.IScheduleClient;
    import com.my.common.constans.ScheduleConstants;
    import com.my.common.redis.CacheService;
    import com.my.model.common.dtos.ResponseResult;
    import com.my.model.common.enums.TaskTypeEnum;
    import com.my.model.schedule.dtos.Task;
    import com.my.model.wemedia.pojos.WmNews;
    import com.my.utils.common.ProtostuffUtil;
    import com.my.wemedia.service.WmAutoScanService;
    import com.my.wemedia.service.WmNewsTaskService;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.scheduling.annotation.Async;
    import org.springframework.scheduling.annotation.Scheduled;
    import org.springframework.stereotype.Service;
    import java.util.Calendar;
    import java.util.Date;
    @Slf4j
    @Service
    public class WmNewsTaskServiceImpl implements WmNewsTaskService {
        @Autowired
        private IScheduleClient scheduleClient;
        /**
         * 添加任务到延迟队列
         * @param id 文章id
         * @param publishTime  文章发布时间
         */
        @Override
        @Async
        public void addNewsToTask(Integer id, Date publishTime) {
            log.info("添加任务到延迟服务中---begin");
            Task task = new Task();
            task.setExecuteTime(publishTime.getTime());
            task.setTaskType(TaskTypeEnum.NEWS_SCAN_TIME.getTaskType());
            task.setPriority(TaskTypeEnum.NEWS_SCAN_TIME.getPriority());
            WmNews wmNews = new WmNews();
            wmNews.setId(id);
            task.setParameters(ProtostuffUtil.serialize(wmNews));
            scheduleClient.addTask(task);
            log.info("添加任务到延迟服务中---end");
        }
    }

    image.gif

    (3)修改发布文章代码

    将之前的异步调用审核文章修改为将文章数据加入延迟队列

    @Autowired
        private WmAutoScanService wmAutoScanService;
        @Autowired
        private WmNewsTaskService wmNewsTaskService;
        /**
         * 提交文章
         * @param dto
         * @return
         */
        @Override
        public ResponseResult submitNews(WmNewsDto dto) throws Exception {
            //1.参数校验
            if(dto == null || dto.getContent().length() == 0) {
                return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);
            }
            //2.保存或修改文章
            //2.1属性拷贝
            WmNews wmNews = new WmNews();
            BeanUtils.copyProperties(dto,wmNews);
            //2.2设置封面图片
            if(dto.getImages() != null && dto.getImages().size() != 0) {
                String images = StringUtils.join(dto.getImages(), ",");
                wmNews.setImages(images);
            }
            //2.3封面类型为自动
            if(dto.getType().equals(WemediaConstants.WM_NEWS_TYPE_AUTO)) {
                wmNews.setType(null);
            }
            saveOrUpdateWmNews(wmNews);
            //3.判断是否为草稿
            if(dto.getStatus().equals(WmNews.Status.NORMAL.getCode())) {
                //直接保存结束
                return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
            }
            //4.不是草稿
            //4.1保存文章图片素材与文章关系
            //4.1.1提取图片素材列表
            List<String> imagesList = getImagesList(dto);
            //4.1.2保存
            saveRelatedImages(imagesList,wmNews.getId(),WemediaConstants.WM_CONTENT_REFERENCE);
            //4.2保存封面图片和文章关系
            saveRelatedCover(dto,imagesList,wmNews);
            //5.审核文章(异步调用)
            // wmAutoScanService.AutoScanTextAndImage(wmNews.getId());
            //5.将任务添加到延迟服务
            wmNewsTaskService.addNewsToTask(wmNews.getId(),wmNews.getPublishTime());
            return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
        }

    image.gif

    (4)消费任务进行文章审核

    在WmNewsTaskServiceImpl中添加如下方法:

    @Autowired
    private WmNewsAutoScanServiceImpl wmNewsAutoScanService;
    /**
         * 消费延迟队列数据
         */
    @Scheduled(fixedRate = 1000)
    @Override
    @SneakyThrows
    public void scanNewsByTask() {
        ResponseResult responseResult = scheduleClient.poll(TaskTypeEnum.NEWS_SCAN_TIME.getTaskType(), TaskTypeEnum.NEWS_SCAN_TIME.getPriority());
        if(responseResult.getCode().equals(200) && responseResult.getData() != null){
            log.info("文章审核---消费任务执行---begin---");
            String json_str = JSON.toJSONString(responseResult.getData());
            Task task = JSON.parseObject(json_str, Task.class);
            byte[] parameters = task.getParameters();
            WmNews wmNews = ProtostuffUtil.deserialize(parameters, WmNews.class);
            System.out.println(wmNews.getId()+"-----------");
            wmNewsAutoScanService.autoScanWmNews(wmNews.getId());
            log.info("文章审核---消费任务执行---end---");
        }
    }

    image.gif

    下篇预告:定时发布文章优化策略

    相关实践学习
    基于Redis实现在线游戏积分排行榜
    本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
    云数据库 Redis 版使用教程
    云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
    相关文章
    |
    1天前
    |
    JSON Java API
    利用Spring Cloud Gateway Predicate优化微服务路由策略
    Spring Cloud Gateway 的路由配置中,`predicates`​(断言)用于定义哪些请求应该匹配特定的路由规则。 断言是Gateway在进行路由时,根据具体的请求信息如请求路径、请求方法、请求参数等进行匹配的规则。当一个请求的信息符合断言设置的条件时,Gateway就会将该请求路由到对应的服务上。
    93 69
    利用Spring Cloud Gateway Predicate优化微服务路由策略
    |
    20天前
    |
    Java 开发者 微服务
    从单体到微服务:如何借助 Spring Cloud 实现架构转型
    **Spring Cloud** 是一套基于 Spring 框架的**微服务架构解决方案**,它提供了一系列的工具和组件,帮助开发者快速构建分布式系统,尤其是微服务架构。
    141 68
    从单体到微服务:如何借助 Spring Cloud 实现架构转型
    |
    17天前
    |
    Java Nacos Sentinel
    Spring Cloud Alibaba:一站式微服务解决方案
    Spring Cloud Alibaba(简称SCA) 是一个基于 Spring Cloud 构建的开源微服务框架,专为解决分布式系统中的服务治理、配置管理、服务发现、消息总线等问题而设计。
    169 13
    Spring Cloud Alibaba:一站式微服务解决方案
    |
    4天前
    |
    Java 关系型数据库 Nacos
    微服务SpringCloud链路追踪之Micrometer+Zipkin
    SpringCloud+Openfeign远程调用,并用Mircrometer+Zipkin进行链路追踪
    60 20
    |
    3天前
    |
    运维 监控 Java
    为何内存不够用?微服务改造启动多个Spring Boot的陷阱与解决方案
    本文记录并复盘了生产环境中Spring Boot应用内存占用过高的问题及解决过程。系统上线初期运行正常,但随着业务量上升,多个Spring Boot应用共占用了64G内存中的大部分,导致应用假死。通过jps和jmap工具排查发现,原因是运维人员未设置JVM参数,导致默认配置下每个应用占用近12G内存。最终通过调整JVM参数、优化堆内存大小等措施解决了问题。建议在生产环境中合理设置JVM参数,避免资源浪费和性能问题。
    19 3
    |
    25天前
    |
    负载均衡 Java 开发者
    深入探索Spring Cloud与Spring Boot:构建微服务架构的实践经验
    深入探索Spring Cloud与Spring Boot:构建微服务架构的实践经验
    83 5
    |
    2月前
    |
    JSON Java 数据格式
    【微服务】SpringCloud之Feign远程调用
    本文介绍了使用Feign作为HTTP客户端替代RestTemplate进行远程调用的优势及具体使用方法。Feign通过声明式接口简化了HTTP请求的发送,提高了代码的可读性和维护性。文章详细描述了Feign的搭建步骤,包括引入依赖、添加注解、编写FeignClient接口和调用代码,并提供了自定义配置的示例,如修改日志级别等。
    139 1
    |
    Dubbo Java 应用服务中间件
    深入了解Spring Cloud Alibaba Dubbo
    在现代分布式系统开发中,构建高性能、可伸缩性和弹性的微服务架构变得越来越重要。Spring Cloud Alibaba Dubbo(简称Dubbo)是一个开源的分布式服务框架,可以帮助开发者构建强大的微服务架构,具备负载均衡、服务治理、远程调用等强大功能。本文将深入介绍Spring Cloud Alibaba Dubbo,帮助你理解它的核心概念、工作原理以及如何在你的项目中使用它。
    |
    Kubernetes Java 微服务
    Spring Boot 单体应用一键升级成 Spring Cloud Alibaba(1)
    Spring Boot 单体应用一键升级成 Spring Cloud Alibaba(1)
    149 0
    Spring Boot 单体应用一键升级成 Spring Cloud Alibaba(1)
    |
    7月前
    |
    Java 中间件 开发者
    Spring Cloud Alibaba
    【1月更文挑战第27天】【1月更文挑战第127篇】Spring Cloud Alibaba
    148 1