开发者社区> 问答> 正文

关于开源框架高可用 Elastic-Job的实践说明(该如何操作)?

关于开源框架高可用 Elastic-Job的实践说明(该如何操作)?

展开
收起
kun坤 2020-04-23 16:30:54 595 0
1 条回答
写回答
取消 提交回答
  • (1)安装zookeeper,配置注册中心config,配置文件加入注册中心zk的配置。

    @Configuration
    @ConditionalOnExpression("'${regCenter.serverList}'.length() > 0")
    public class JobRegistryCenterConfig {
      
        @Bean(initMethod = "init")
        public ZookeeperRegistryCenter regCenter(@Value("${regCenter.serverList}") final String serverList,
                                                 @Value("${regCenter.namespace}") final String namespace) {
            return new ZookeeperRegistryCenter(new ZookeeperConfiguration(serverList, namespace));
        }
    }
    
    
    
    spring.application.name=demo_elasticjob
      
    regCenter.serverList=localhost:2181
    regCenter.namespace=demo_elasticjob
      
    spring.datasource.url=jdbc:mysql://127.0.0.1:3306/xxl-job?Unicode=true&characterEncoding=UTF-8
    spring.datasource.username=user
    spring.datasource.password=pwd
    

    (2)配置数据源config,并配置文件中加入数据源配置。

    @Getter
    @Setter
    @NoArgsConstructor
    @AllArgsConstructor
    @ToString
    @Configuration
    @ConfigurationProperties(prefix = "spring.datasource")
    public class DataSourceProperties {
        private String url;
        private String username;
        private String password;
      
        @Bean
        @Primary
        public DataSource getDataSource() {
            DruidDataSource dataSource = new DruidDataSource();
            dataSource.setUrl(url);
            dataSource.setUsername(username);
            dataSource.setPassword(password);
            return dataSource;
        }
    }
    
    spring.datasource.url=jdbc:mysql://127.0.0.1:3306/xxl-job?Unicode=true&characterEncoding=UTF-8
    spring.datasource.username=user
    spring.datasource.password=pwd
    

    (3)配置事件config。

    
    @Configuration
    public class JobEventConfig {
        @Autowired
        private DataSource dataSource;
      
        @Bean
        public JobEventConfiguration jobEventConfiguration() {
            return new JobEventRdbConfiguration(dataSource);
        }
    }
    

    (4)为了便于灵活配置不同的任务触发事件,加入ElasticSimpleJob注解。

    
    
    @Target({ElementType.TYPE})
    @Retention(RetentionPolicy.RUNTIME)
    public @interface ElasticSimpleJob {
      
        @AliasFor("cron")
        String value() default "";
      
        @AliasFor("value")
        String cron() default "";
      
        String jobName() default "";
      
        int shardingTotalCount() default 1;
      
        String shardingItemParameters() default "";
      
        String jobParameter() default "";
    }
    
    

    (5)对配置进行初始化。

    
    @Configuration
    @ConditionalOnExpression("'${elaticjob.zookeeper.server-lists}'.length() > 0")
    public class ElasticJobAutoConfiguration {
      
        @Value("${regCenter.serverList}")
        private String serverList;
      
        @Value("${regCenter.namespace}")
        private String namespace;
      
        @Autowired
        private ApplicationContext applicationContext;
        @Autowired
        private DataSource dataSource;
      
        @PostConstruct
        public void initElasticJob() {
            ZookeeperRegistryCenter regCenter = new ZookeeperRegistryCenter(new ZookeeperConfiguration(serverList, namespace));
            regCenter.init();
            Map<String, SimpleJob> map = applicationContext.getBeansOfType(SimpleJob.class);
      
            for (Map.Entry<String, SimpleJob> entry : map.entrySet()) {
                SimpleJob simpleJob = entry.getValue();
                ElasticSimpleJob elasticSimpleJobAnnotation = simpleJob.getClass().getAnnotation(ElasticSimpleJob.class);
      
                String cron = StringUtils.defaultIfBlank(elasticSimpleJobAnnotation.cron(), elasticSimpleJobAnnotation.value());
                SimpleJobConfiguration simpleJobConfiguration = new SimpleJobConfiguration(JobCoreConfiguration.newBuilder(simpleJob.getClass().getName(), cron, elasticSimpleJobAnnotation.shardingTotalCount()).shardingItemParameters(elasticSimpleJobAnnotation.shardingItemParameters()).build(), simpleJob.getClass().getCanonicalName());
                LiteJobConfiguration liteJobConfiguration = LiteJobConfiguration.newBuilder(simpleJobConfiguration).overwrite(true).build();
      
                JobEventRdbConfiguration jobEventRdbConfiguration = new JobEventRdbConfiguration(dataSource);
                SpringJobScheduler jobScheduler = new SpringJobScheduler(simpleJob, regCenter, liteJobConfiguration, jobEventRdbConfiguration);
                jobScheduler.init();
            }
        }
    }
    
    

    (6)实现 SimpleJob接口,按上文中方法整合dubbo, 完成业务逻辑。

    @ElasticSimpleJob(
            cron = "*/10 * * * * ?",
            jobName = "OfflineTaskJob",
            shardingTotalCount = 2,
            jobParameter = "测试参数",
            shardingItemParameters = "0=A,1=B")
    @Component
    public class MySimpleJob implements SimpleJob {
        Logger logger = LoggerFactory.getLogger(OfflineTaskJob.class);
      
        @Reference(check = false, version = "cms-dev", group = "cms-service")
        private OfflineTaskExecutorFacade offlineTaskExecutorFacade;
      
      
        @Override
        public void execute(ShardingContext shardingContext) {
      
            offlineTaskExecutorFacade.executeOfflineTask();
      
            logger.info(String.format("Thread ID: %s, 作业分片总数: %s, " +
                            "当前分片项: %s.当前参数: %s," +
                            "作业名称: %s.作业自定义参数: %s"
                    ,
                    Thread.currentThread().getId(),
                    shardingContext.getShardingTotalCount(),
                    shardingContext.getShardingItem(),
                    shardingContext.getShardingParameter(),
                    shardingContext.getJobName(),
                    shardingContext.getJobParameter()
            ));
        }
    }
    
    
    2020-04-23 16:32:44
    赞同 展开评论 打赏
问答地址:
问答排行榜
最热
最新

相关电子书

更多
阿里云Elasticsearch体系架构与特性解析 立即下载
基于Kubernetes搭建HBase在知乎的实践 立即下载
基于Kubernates的流处理平台实践 ——Flink为例 立即下载