【Spring Cloud】新闻头条微服务项目:使用Reids延迟队列实现文章定时发布(上)

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
任务调度 XXL-JOB 版免费试用,400 元额度,开发版规格
云原生网关 MSE Higress,422元/月
简介: 主要介绍了延迟任务的概念及不同技术实现延迟任务的区别,随后搭建了相关环境为文章定时发布打基础。

前言:

最近在做一个基于SpringCloud+Springboot+Docker的新闻头条微服务项目,用的是黑马的教程,现在项目开发进入了尾声,我打算通过写文章的形式进行梳理一遍,并且会将梳理过程中发现的Bug进行修复,有需要改进的地方我也会继续做出改进。这一系列的文章我将会放入微服务项目专栏中,这个项目适合刚接触微服务的人作为练手项目,假如你对这个项目感兴趣你可以订阅我的专栏进行查看,需要资料可以私信我,当然要是能给我点个小小的关注就更好了,你们的支持是我最大的动力。

目录

一:前期准备

1.需求分析

2. 延迟任务概述

3.技术对比

DelayQueue

RabbitMQ实现延迟任务

Redis实现

4.实现思路

二:环境搭建

1.搭建模块

2.数据库准备

编辑 3.在docker中安装redis

4.项目中集成redis

三:代码编写

1.实体类导入

2.创建taskService

3.功能实现


一:前期准备

1.需求分析

       当创作者创作好文章之后可以选择立马发布,还能选择定时发布(见下图),这个相信大家在CSDN创作时候都知道,我们需要使用延迟任务来实现文章的定时发布。

image.gif编辑

2. 延迟任务概述

在介绍延迟任务之前,我们先了解一下定时任务,定时任务就是有固定的的执行频率,每隔一段时间就执行一次,定时任务与延迟任务区别如下:

    • 定时任务:有固定周期的,有明确的触发时间
    • 延迟任务:没有固定的开始时间,它常常是由一个事件触发的,而在这个事件触发之后的一段时间内触发另一个事件,任务可以立即执行,也可以延迟。
    • 延迟任务使用场景:  
      1. 商品下单30分钟之内没有付款则将订单取消
      2. 接口对接出现网络问题,1分钟之后重试,如果再失败则2分钟之后重试,直至达到阈值      

      3.技术对比

        • DelayQueue
          JDK自带DelayQueue 是一个支持延时获取元素的阻塞队列, 内部采用优先队列 PriorityQueue 存储元素,同时元素必须实现 Delayed 接口;在创建元素时可以指定多久才可以从队列中获取当前元素,只有在延迟期满时才能从队列中提取元素。 DelayQueue属于排序队列,它的特殊之处在于队列的元素必须实现Delayed接口,该接口需要实现compareTo和getDelay方法
          getDelay方法:获取元素在队列中的剩余时间,只有当剩余时间为0时元素才可以出队列。
          compareTo方法:用于排序,确定元素出队列的顺序。

               需要注意的是,使用线程池或者原生DelayQueue程序挂掉之后,任务都是放在内存,需要考虑未处理消息的丢失带来的影响,要保证数据不丢失,就需要持久化(磁盘)。

          • RabbitMQ实现延迟任务

                   RabbitMQ实现延迟任务有两种形式,一种是传统的TTL+DLX(死信交换机)来实现,另一种是直接使用插件。TTL+DLX的实现原理是给每个队列设置过期时间,当消息过期之后变成Dead message,就将死信消息发送到另一个交换机,这个交换机叫做死信交换机。

                 不过更方便的还是直接使用RabbitMQ提供的延迟队列插件比较方便,实践过程可以翻看我前面关于微信支付的文章,里面就是使用了RabbitMQ提供的延迟插件来实现订单的管理。

            • Redis实现

                     由于项目后面还需要用到Redis缓存热度靠前的文章,所以这里我选择了使用Redis来实现延迟队列,也正好可以了解其实现原理。

                   我们都知道Redis中的ZSet数据类型可以实现对数据的排序,那么我们就可以利用这个特点来实现延迟队列,使用时间戳作为score来进行排序。

            4.实现思路

            image.gif编辑

            1.为什么任务需要存储在数据库中?

            延迟任务是一个通用的服务,任何需要延迟得任务都可以调用该服务,需要考虑数据持久化的问题,存储数据库中是一种数据安全的考虑。

            2.为什么redis中使用两种数据类型,list和zset?

            效率问题,算法的时间复杂度

            3.在添加zset数据的时候,为什么不需要预加载?

            任务模块是一个通用的模块,项目中任何需要延迟队列的地方,都可以调用这个接口,要考虑到数据量的问题,如果数据量特别大,为了防止阻塞,只需要把未来几分钟要执行的数据存入缓存即可。

            二:环境搭建

            1.搭建模块

            ①在service模块下创建一个tbug-headlines-schedule模块

            image.gif编辑

            ②添加bootstrap.yml

            server:
              port: 51701
            spring:
              application:
                name: headlines-schedule
              cloud:
                nacos:
                  discovery:
                    server-addr: 49.234.52.192:8848
                  config:
                    server-addr: 49.234.52.192:8848
                    file-extension: yml

            image.gif

            ③在nacos添加相关配置

            spring:
              datasource:
                driver-class-name: com.mysql.jdbc.Driver
                url: jdbc:mysql://localhost:3306/headlines_schedule?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC&useSSL=false
                username: root
                password: 440983
            # 设置Mapper接口所对应的XML文件位置,如果你在Mapper接口中有自定义方法,需要进行该配置
            mybatis-plus:
              mapper-locations: classpath*:mapper/*.xml
              # 设置别名包扫描路径,通过该属性可以给包中的类注册别名
              type-aliases-package: com.my.model.schedule.pojos

            image.gif

            2.数据库准备

            taskinfo任务表

            image.gif编辑

            taksinfo_log任务日志表

            image.gif编辑 3.在docker中安装redis

            4.项目中集成redis

            ①在项目中导入redis依赖

            <!--spring data redis & cache-->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-data-redis</artifactId>
            </dependency>
            <!-- redis依赖commons-pool 这个依赖一定要添加 -->
            <dependency>
                <groupId>org.apache.commons</groupId>
                <artifactId>commons-pool2</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-cache</artifactId>
            </dependency>

            image.gif

            ②在nacos添加redis配置信息

            spring:
              redis:
                host: 49.234.52.192
                password: 440983
                port: 6379

            image.gif

            ③拷贝CacheService类到tbug-headlines-common模块下,并添加自动配置

            org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
              com.my.common.exception.ExceptionCatch,\
              com.my.common.swagger.SwaggerConfiguration,\
              com.my.common.swagger.Swagger2Configuration,\
              com.my.common.tencentcloud.TextDetection,\
              com.my.common.tencentcloud.ImageDetection,\
              com.my.common.tess4j.Tess4jClient,\
              com.my.common.redis.CacheService,\

            image.gif

            三:代码编写

            1.实体类导入

            ①创建task类,用于接收添加任务的参数

            package com.my.model.schedule.dtos;
            import lombok.Data;
            import java.io.Serializable;
            @Data
            public class Task implements Serializable {
                /**
                 * 任务id
                 */
                private Long taskId;
                /**
                 * 类型
                 */
                private Integer taskType;
                /**
                 * 优先级
                 */
                private Integer priority;
                /**
                 * 执行id
                 */
                private long executeTime;
                /**
                 * task参数
                 */
                private byte[] parameters;
            }

            image.gif

            ②创建mapper

            package com.my.schedule.mapper;
            import com.baomidou.mybatisplus.core.mapper.BaseMapper;
            import com.my.model.schedule.pojos.Taskinfo;
            import org.apache.ibatis.annotations.Mapper;
            import org.apache.ibatis.annotations.Param;
            import java.util.Date;
            import java.util.List;
            /**
             * <p>
             *  Mapper 接口
             * </p>
             *
             * @author itheima
             */
            @Mapper
            public interface TaskInfoMapper extends BaseMapper<Taskinfo> {
                List<Taskinfo> queryFutureTime(@Param("taskType")int type, @Param("priority")int priority, @Param("future")Date future);
            }

            image.gif

            <?xml version="1.0" encoding="UTF-8"?>
            <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
            <mapper namespace="com.my.schedule.mapper.TaskInfoMapper">
                <select id="queryFutureTime" resultType="com.my.model.schedule.pojos.Taskinfo">
                    select *
                    from taskinfo
                    where task_type = #{taskType}
                      and priority = #{priority}
                      and execute_time <![CDATA[<]]> #{future,javaType=java.util.Date}
                </select>
            </mapper>

            image.gif

            package com.my.schedule.mapper;
            import com.baomidou.mybatisplus.core.mapper.BaseMapper;
            import com.my.model.schedule.pojos.TaskinfoLogs;
            import org.apache.ibatis.annotations.Mapper;
            /**
             * <p>
             *  Mapper 接口
             * </p>
             *
             * @author itheima
             */
            @Mapper
            public interface TaskInfoLogsMapper extends BaseMapper<TaskinfoLogs> {
            }

            image.gif

            2.创建taskService

            package com.my.schedule.service;
            import com.my.model.schedule.dtos.Task;
            /**
             * 对外访问接口
             */
            public interface TaskService {
                /**
                 * 添加任务接口
                 * @param task  任务对象
                 * @return  任务ID
                 */
                long addTask(Task task);
                /**
                 * 取消任务
                 * @param taskId        任务id
                 * @return              取消结果
                 */
                boolean cancelTask(long taskId);
                /**
                 * 按照类型和优先级来拉取任务
                 * @param type
                 * @param priority
                 * @return
                 */
                Task poll(int type,int priority);
            }

            image.gif

            3.功能实现

            ScheduleConstants常量类

            package com.my.model.common.enums;
            import lombok.AllArgsConstructor;
            import lombok.Getter;
            @Getter
            @AllArgsConstructor
            public enum TaskTypeEnum {
                NEWS_SCAN_TIME(1001, 1,"文章定时审核"),
                REMOTEERROR(1002, 2,"第三方接口调用失败,重试");
                private final int taskType; //对应具体业务
                private final int priority; //业务不同级别
                private final String desc; //描述信息
            }

            image.gif

            TaskServiceImpl

            package com.my.schedule.service.serviceImpl;
            import com.alibaba.fastjson.JSON;
            import com.baomidou.mybatisplus.core.toolkit.Wrappers;
            import com.my.common.constans.ScheduleConstants;
            import com.my.common.redis.CacheService;
            import com.my.model.schedule.dtos.Task;
            import com.my.model.schedule.pojos.Taskinfo;
            import com.my.model.schedule.pojos.TaskinfoLogs;
            import com.my.schedule.mapper.TaskInfoLogsMapper;
            import com.my.schedule.mapper.TaskInfoMapper;
            import com.my.schedule.service.TaskService;
            import lombok.extern.slf4j.Slf4j;
            import org.apache.commons.lang.StringUtils;
            import org.springframework.beans.BeanUtils;
            import org.springframework.beans.BeansException;
            import org.springframework.beans.factory.annotation.Autowired;
            import org.springframework.scheduling.annotation.Scheduled;
            import org.springframework.stereotype.Service;
            import org.springframework.transaction.annotation.Transactional;
            import javax.annotation.PostConstruct;
            import java.util.Calendar;
            import java.util.Date;
            import java.util.List;
            import java.util.Set;
            @Slf4j
            @Service
            @Transactional
            public class TaskServiceImpl implements TaskService {
                /**
                 * 添加延迟任务
                 * @param task  任务对象
                 * @return
                 */
                @Override
                public long addTask(Task task) {
                    //1.将任务添加到数据库中
                    boolean success = addTaskToDb(task);
                    if(success) {
                        //2.将任务添加到redis
                        addTaskToCache(task);
                    }
                    return task.getTaskId();
                }
                @Autowired
                private TaskInfoMapper taskInfoMapper;
                @Autowired
                private TaskInfoLogsMapper taskInfoLogsMapper;
                /**
                 * 将任务添加到数据库中
                 * @param task
                 * @return
                 */
                private boolean addTaskToDb(Task task) {
                    boolean flag = false;
                    try {
                        //1.保存任务表
                        Taskinfo taskinfo = new Taskinfo();
                        BeanUtils.copyProperties(task,taskinfo);
                        taskinfo.setExecuteTime(new Date(task.getExecuteTime()));
                        taskInfoMapper.insert(taskinfo);
                        //2.设置任务id
                        task.setTaskId(taskinfo.getTaskId());
                        //3.保存任务日志数据
                        TaskinfoLogs taskinfoLogs = new TaskinfoLogs();
                        BeanUtils.copyProperties(taskinfo,taskinfoLogs);
                        taskinfoLogs.setVersion(1);
                        taskinfoLogs.setStatus(ScheduleConstants.SCHEDULED);
                        taskInfoLogsMapper.insert(taskinfoLogs);
                        flag = true;
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    return flag;
                }
                @Autowired
                private CacheService cacheService;
                /**
                 * 将任务添加到redis
                 * @param task
                 */
                private void addTaskToCache(Task task) {
                    //1.构造key
                    String key = task.getTaskType()+"_"+task.getPriority();
                    //2.获取5分钟之后的时间 毫秒级
                    Calendar calendar = Calendar.getInstance();
                    calendar.add(Calendar.MINUTE,5);
                    long nextScheduleTime = calendar.getTimeInMillis();
                    //3.如果任务执行时间小于等于当前时间,存入list
                    if(task.getExecuteTime() <= System.currentTimeMillis()) {
                        cacheService.lLeftPush(ScheduleConstants.TOPIC + key, JSON.toJSONString(task));
                    } else if(task.getExecuteTime() <= nextScheduleTime) {
                        //4.如果任务执行时间在5分钟之内,存入zSet
                        cacheService.zAdd(ScheduleConstants.FUTURE + key,JSON.toJSONString(task),task.getExecuteTime());
                    }
                }
                /**
                 * 删除任务
                 * @param taskId      任务id
                 * @return
                 */
                @Override
                public boolean cancelTask(long taskId) {
                    boolean flag = false;
                    //1.删除任务并更新日志
                    Task task = UpdateDb(taskId,ScheduleConstants.EXECUTED);
                    //2.从Redis中删除任务
                    if(task != null) {
                        removeFromCache(task);
                        log.info("删除Redis中的任务成功:{}",taskId);
                        flag = true;
                    }
                    return flag;
                }
                /**
                 * 从redis中删除任务
                 * @param task
                 */
                private void removeFromCache(Task task) {
                    String key = task.getTaskType() + "_" + task.getPriority();
                    if(task.getExecuteTime() <= System.currentTimeMillis()) {
                        log.info("删除正要执行的任务...");
                        cacheService.lRemove(ScheduleConstants.TOPIC + key,0,JSON.toJSONString(task));
                    } else {
                        log.info("删除将要执行的任务...");
                        cacheService.zRemove(ScheduleConstants.FUTURE + key,JSON.toJSONString(task));
                    }
                }
                /**
                 * 删除任务,更新日志
                 * @param taskId
                 * @param status
                 * @return
                 */
                private Task UpdateDb(long taskId, int status) {
                    Task task = null;
                    try {
                        //1.删除任务
                        log.info("删除数据库中的任务...");
                        taskInfoMapper.deleteById(taskId);
                        //2.更新日志
                        log.info("更新任务日志...");
                        TaskinfoLogs taskinfoLogs = taskInfoLogsMapper.selectById(taskId);
                        taskinfoLogs.setStatus(status);
                        taskInfoLogsMapper.updateById(taskinfoLogs);
                        //3.设置返回值
                        task = new Task();
                        BeanUtils.copyProperties(taskinfoLogs,task);
                        task.setExecuteTime(taskinfoLogs.getExecuteTime().getTime());
                    } catch (BeansException e) {
                        throw new RuntimeException(e);
                    }
                    return task;
                }
                /**
                 * 消费任务
                 * @param type  任务类型
                 * @param priority 任务优先级
                 * @return  Task
                 */
                @Override
                public Task poll(int type, int priority) {
                    Task task = null;
                    try {
                        String key = type + "_" + priority;
                        String task_json = cacheService.lRightPop(ScheduleConstants.TOPIC + key);
                        if(StringUtils.isNotBlank(task_json)) {
                            task = JSON.parseObject(task_json,Task.class);
                            //更新数据库
                            UpdateDb(task.getTaskId(),ScheduleConstants.EXECUTED);
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                        log.error("poll task exception");
                    }
                    return task;
                }
            }

            image.gif

            主要包括添加任务、取消任务、消费任务三个功能。

            下篇预告:实现数据定时刷新

            相关实践学习
            基于Redis实现在线游戏积分排行榜
            本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
            云数据库 Redis 版使用教程
            云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
            目录
            打赏
            0
            0
            0
            0
            22
            分享
            相关文章
            |
            3月前
            |
            利用Spring Cloud Gateway Predicate优化微服务路由策略
            Spring Cloud Gateway 的路由配置中,`predicates`​(断言)用于定义哪些请求应该匹配特定的路由规则。 断言是Gateway在进行路由时,根据具体的请求信息如请求路径、请求方法、请求参数等进行匹配的规则。当一个请求的信息符合断言设置的条件时,Gateway就会将该请求路由到对应的服务上。
            202 69
            利用Spring Cloud Gateway Predicate优化微服务路由策略
            Spring Cloud Dubbo:微服务通信的高效解决方案
            【10月更文挑战第15天】随着信息技术的发展,微服务架构成为企业应用开发的主流。Spring Cloud Dubbo结合了Dubbo的高性能RPC和Spring Cloud的生态系统,提供高效、稳定的微服务通信解决方案。它支持多种通信协议,具备服务注册与发现、负载均衡及容错机制,简化了服务调用的复杂性,使开发者能更专注于业务逻辑的实现。
            110 2
            从单体到微服务:如何借助 Spring Cloud 实现架构转型
            **Spring Cloud** 是一套基于 Spring 框架的**微服务架构解决方案**,它提供了一系列的工具和组件,帮助开发者快速构建分布式系统,尤其是微服务架构。
            324 69
            从单体到微服务:如何借助 Spring Cloud 实现架构转型
            微服务架构设计与实践:用Spring Cloud实现抖音的推荐系统
            本文基于Spring Cloud实现了一个简化的抖音推荐系统,涵盖用户行为管理、视频资源管理、个性化推荐和实时数据处理四大核心功能。通过Eureka进行服务注册与发现,使用Feign实现服务间调用,并借助Redis缓存用户画像,Kafka传递用户行为数据。文章详细介绍了项目搭建、服务创建及配置过程,包括用户服务、视频服务、推荐服务和数据处理服务的开发步骤。最后,通过业务测试验证了系统的功能,并引入Resilience4j实现服务降级,确保系统在部分服务故障时仍能正常运行。此示例旨在帮助读者理解微服务架构的设计思路与实践方法。
            113 17
            智慧工地云平台的技术架构解析:微服务+Spring Cloud如何支撑海量数据?
            慧工地解决方案依托AI、物联网和BIM技术,实现对施工现场的全方位、立体化管理。通过规范施工、减少安全隐患、节省人力、降低运营成本,提升工地管理的安全性、效率和精益度。该方案适用于大型建筑、基础设施、房地产开发等场景,具备微服务架构、大数据与AI分析、物联网设备联网、多端协同等创新点,推动建筑行业向数字化、智能化转型。未来将融合5G、区块链等技术,助力智慧城市建设。
            建筑施工一体化信息管理平台源码,支持微服务架构,采用Java、Spring Cloud、Vue等技术开发。
            智慧工地云平台是专为建筑施工领域打造的一体化信息管理平台,利用大数据、云计算、物联网等技术,实现施工区域各系统数据汇总与可视化管理。平台涵盖人员、设备、物料、环境等关键因素的实时监控与数据分析,提供远程指挥、决策支持等功能,提升工作效率,促进产业信息化发展。系统由PC端、APP移动端及项目、监管、数据屏三大平台组成,支持微服务架构,采用Java、Spring Cloud、Vue等技术开发。
            101 7
            |
            3月前
            |
            Spring Cloud Alibaba:一站式微服务解决方案
            Spring Cloud Alibaba(简称SCA) 是一个基于 Spring Cloud 构建的开源微服务框架,专为解决分布式系统中的服务治理、配置管理、服务发现、消息总线等问题而设计。
            529 13
            Spring Cloud Alibaba:一站式微服务解决方案
            微服务SpringCloud链路追踪之Micrometer+Zipkin
            SpringCloud+Openfeign远程调用,并用Mircrometer+Zipkin进行链路追踪
            382 20
            微服务SpringCloud分布式事务之Seata
            SpringCloud+SpringCloudAlibaba的Seata实现分布式事务,步骤超详细,附带视频教程
            96 1
            深入探索Spring Cloud与Spring Boot:构建微服务架构的实践经验
            深入探索Spring Cloud与Spring Boot:构建微服务架构的实践经验
            216 5
            AI助理

            你好,我是AI助理

            可以解答问题、推荐解决方案等