微服务 分片 运维管理(下)

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 微服务 分片 运维管理(下)

Dataflow类型调度

Dataflow类型的定时任务需要实现Dataflowjob接口,该接口提供2个方法供覆盖,分别用于抓取(fetchData)和处理( processData)数据,我们继续对例子进行改造。

Dataflow类型用于处理数据流,他和SimpleJob不同,它以数据流的方式执行,调用fetchData抓取数据,知道抓取不到数据才停止作业。

定时任务开始的时候,先抓取数据,判断数据是否为空,若不为空则进行处理数据

代码示例

第一步:创建任务类

@Component
    public class FileDataflowJob implements DataflowJob<FileCustom> {
        @Autowired
        private FileCustomMapper fileCustomMapper;
        //抓取数据
        @Override
        public List<FileCustom> fetchData(ShardingContext shardingContext) {
            System.out.println("开始抓取数据......");
            List<FileCustom> fileCustoms = fileCustomMapper.selectLimit(2);
            return fileCustoms;
        }
        //处理数据
        @Override
        public void processData(ShardingContext shardingContext, List<FileCustom> data) {
            for(FileCustom custom:data){
                backUp(custom);
            }
        }
        private void backUp(FileCustom custom){
            System.out.println("备份的方法名:"+custom.getName()+"备份的类型:"+custom.getType());
            System.out.println("=======================");
            //模拟进行备份操作
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            fileCustomMapper.changeState(custom.getId(),1);
        }
    }

第二步:创建任务配置类

@Configuration
    public class JobConfig {
        @Bean
        public static CoordinatorRegistryCenter registryCenter(@Value("${zookeeper.url}") String url, @Value("${zookeeper.groupName}") String groupName) {
            ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration(url, groupName);
            //设置节点超时时间
            zookeeperConfiguration.setSessionTimeoutMilliseconds(100);
            //zookeeperConfiguration("zookeeper地址","项目名")
            CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(zookeeperConfiguration);
            regCenter.init();
            return regCenter;
        }
        //功能的方法
        private static LiteJobConfiguration createJobConfiguration(Class clazz, String corn, int shardingCount,String shardingParam,boolean isDateFlowJob) {
            JobCoreConfiguration.Builder jobBuilder = JobCoreConfiguration.newBuilder(clazz.getSimpleName(), corn, shardingCount);
            if(!StringUtils.isEmpty(shardingParam)){
                jobBuilder.shardingItemParameters(shardingParam);
            }
            //定义作业核心配置newBuilder("任务名称","corn表达式","分片数量")
            JobCoreConfiguration simpleCoreConfig = jobBuilder.build();
            // 定义SIMPLE类型配置 cn.wolfcode.MyElasticJob
            JobTypeConfiguration jobConfiguration;
            if(isDateFlowJob){
                jobConfiguration = new DataflowJobConfiguration(simpleCoreConfig,clazz.getCanonicalName(),true);
            }else{
                jobConfiguration = new SimpleJobConfiguration(simpleCoreConfig,clazz.getCanonicalName());
            }
            //定义Lite作业根配置
            LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(jobConfiguration).overwrite(true).build();
            return simpleJobRootConfig;
        }
        @Bean(initMethod = "init")
        public SpringJobScheduler fileDatFlowaScheduler(FileDataflowJob job, CoordinatorRegistryCenter registryCenter){
            LiteJobConfiguration jobConfiguration = createJobConfiguration(job.getClass(),"0/10 * * * * ?",1,null,true);
            return new SpringJobScheduler(job,registryCenter,jobConfiguration);
        }
    }

第三步:创建Mapper映射文件

@Mapper
    public interface FileCustomMapper {
        @Update("update t_file_custom set backedUp = #{state} where id = #{id}")
        int changeState(@Param("id") Long id, @Param("state")int state);
        @Select("select * from t_file_custom where backedUp = 0 limit #{count}")
        List<FileCustom> selectLimit(int count);
    }

运维管理

事件追踪

Elastic-Job-Lite在配置中提供了JobEventConfiguration,支持数据库方式配置,会在数据库中自动创建JOB_EXECUTION_LOG和JOB_STATUS_TRACE_LOG两张表以及若干索引来近路作业的相关信息。
修改Elastic-job配置类
第一步:在ElasticJobConfig配置类中注入DataSource

第二步:在任务配置中增加事件追踪配置


运行结果
该表记录每次作业的执行历史,分为两个步骤:

1.作业开始执行时间想数据库插入数据

2.作业完成执行时向数据库更新数据,更新is_success,complete_time和failure_cause(如果任务执行失败)

该表记录作业状态变更痕迹表,可通过每次作业运行的task_id查询作业状态变化的生命轨迹和运行轨迹


运维平台

搭建步骤

1.解压缩


2.进入bin目录,并执行
bin\start.bat
3.打开浏览器访问http://localhost:8899

用户名:root 密码:root


使用步骤

第一步:注册中心配置


第二步:事件追踪数据源配置


之后就可以使用了


相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
目录
相关文章
|
6月前
|
存储 运维 数据安全/隐私保护
微服务应用运维入门
微服务应用运维入门
|
1月前
|
人工智能 运维 监控
构建高性能微服务架构:现代后端开发的挑战与策略构建高效自动化运维系统的关键策略
【2月更文挑战第30天】 随着企业应用的复杂性增加,传统的单体应用架构已经难以满足快速迭代和高可用性的需求。微服务架构作为解决方案,以其服务的细粒度、独立性和弹性而受到青睐。本文将深入探讨如何构建一个高性能的微服务系统,包括关键的设计原则、常用的技术栈选择以及性能优化的最佳实践。我们将分析微服务在处理分布式事务、数据一致性以及服务发现等方面的挑战,并提出相应的解决策略。通过实例分析和案例研究,我们的目标是为后端开发人员提供一套实用的指南,帮助他们构建出既能快速响应市场变化,又能保持高效率和稳定性的微服务系统。 【2月更文挑战第30天】随着信息技术的飞速发展,企业对于信息系统的稳定性和效率要求
|
1月前
|
Kubernetes 安全 Java
运维人少,如何批量管理上百个微服务、上千条流水线?
云效 AppStack 平台针对微服务和云原生环境下的应用管理难题,提供了以应用为中心的资源、流水线和权限管理解决方案。
|
1月前
|
运维 应用服务中间件 调度
微服务容器化的运维
【2月更文挑战第27天】
|
6月前
|
运维 监控 数据安全/隐私保护
微服务应用运维的注意事项
微服务应用运维的注意事项
|
6月前
|
运维 负载均衡 监控
微服务应用运维
微服务应用运维
|
8月前
|
运维 调度 数据库
微服务 分片 运维管理(上)
微服务 分片 运维管理(上)
52 0
|
10月前
|
运维 SpringCloudAlibaba 安全
SpringCloud Alibaba微服务运维二 - 集成ELK日志
SpringCloud Alibaba微服务运维二 - 集成ELK日志
499 0
|
10月前
|
存储 运维 SpringCloudAlibaba
SpringCloud Alibaba微服务运维一 - 集成SkyWalking
SpringCloud Alibaba微服务运维一 - 集成SkyWalking
497 0
|
11月前
|
自然语言处理 运维 监控
《云原生架构容器&微服务优秀案例集》——01 互联网——站酷 基于 ASM 解决多语言技术栈下服务管理难题,实现运维提效
《云原生架构容器&微服务优秀案例集》——01 互联网——站酷 基于 ASM 解决多语言技术栈下服务管理难题,实现运维提效
203 0