elastic-job的原理简介和使用

简介:


转自:http://blog.csdn.net/fanfan_v5/article/details/61310045


elastic-job是当当开源的一款非常好用的作业框架,在这之前,我们开发定时任务一般都是使用quartz或者spring-task(ScheduledExecutorService),无论是使用quartz还是spring-task,我们都会至少遇到两个痛点:
1.不敢轻易跟着应用服务多节点部署,可能会重复多次执行而引发系统逻辑的错误。
2.quartz的集群仅仅只是用来HA,节点数量的增加并不能给我们的每次执行效率带来提升,即不能实现水平扩展。

本篇博文将会自顶向下地介绍elastic-job,让大家认识了解并且快速搭建起环境。


elastic-job产品线说明


elastic-job2.x之后,出了两个产品线:Elastic-Job-LiteElastic-Job-Cloud。我们一般使用Elastic-Job-Lite就能够满足需求,本文也是以Elastic-Job-Lite为主。1.x系列对应的就只有Elastic-Job-Lite,并且在2.x里修改了一些核心类名,差别虽大,原理类似,建议使用2.x系列。写此博文,最新release版本为2.0.5


elastic-job-lite原理

举个典型的job场景,比如余额宝里的昨日收益,系统需要job在每天某个时间点开始,给所有余额宝用户计算收益。如果用户数量不多,我们可以轻易使用quartz来完成,我们让计息job在某个时间点开始执行,循环遍历所有用户计算利息,这没问题。可是,如果用户体量特别大,我们可能会面临着在第二天之前处理不完这么多用户。另外,我们部署job的时候也得注意,我们可能会把job直接放在我们的webapp里,webapp通常是多节点部署的,这样,我们的job也就是多节点,多个job同时执行,很容易造成重复执行,比如用户重复计息,为了避免这种情况,我们可能会对job的执行加锁,保证始终只有一个节点能执行,或者干脆让job从webapp里剥离出来,独自部署一个节点。
elastic-job就可以帮助我们解决上面的问题,elastic底层的任务调度还是使用的quartz,通过zookeeper来动态给job节点分片。
我们来看:
很大体量的用户需要在特定的时间段内计息完成
我们肯定是希望我们的任务可以通过集群达到水平扩展,集群里的每个节点都处理部分用户,不管用户数量有多庞大,我们只要增加机器就可以了,比如单台机器特定时间能处理n个用户,2台机器处理2n个用户,3台3n,4台4n...,再多的用户也不怕了。
使用elastic-job开发的作业都是zookeeper的客户端,比如我希望3台机器跑job,我们将任务分成3片,框架通过zk的协调,最终会让3台机器分别分配到0,1,2的任务片,比如server0-->0,server1-->1,server2-->2,当server0执行时,可以只查询id%3==0的用户,server1执行时,只查询id%3==1的用户,server2执行时,只查询id%3==2的用户。
任务部署多节点引发重复执行
在上面的基础上,我们再增加server3,此时,server3分不到任务分片,因为只有3片,已经分完了。没有分到任务分片的作业程序将不执行。
如果此时server2挂了,那么server2的分片项会分配给server3,server3有了分片,就会替代server2执行。
如果此时server3也挂了,只剩下server0和server1了,框架也会自动把server3的分片随机分配给server0或者server1,可能会这样,server0-->0,server1-->1,2。
这种特性称之为弹性扩容,即elastic-job名称的由来。

代码演示

我们搭建环境通过示例代码来演示上面的例子,elastic-job是不支持单机多实例的,通过zk的协调分片是以ip为单元的。很多同学上来可能就是通过单机多实例来学习,结果导致分片和预期不一致。这里没办法,只能通过多机器或者虚拟机,我们这里使用虚拟机,另外,由于资源有限,我们这里仅仅只模拟两台机器。

节点说明:
本地宿主机器
zookeeper、job
192.168.241.1

虚拟机
job
192.168.241.128

环境说明:
Java
请使用JDK1.7及其以上版本。
Zookeeper
请使用Zookeeper3.4.6及其以上版本
Elastic-Job-Lite
2.0.5(2.x系列即可,最好是2.0.4及其以上,因为2.0.4版本有本人提交的少许代码,(*^__^*) 嘻嘻……)

需求说明:
通过两台机器演示动态分片

step1. 引入框架的jar包

1
2
3
4
5
6
7
8
9
10
11
12
<!-- 引入elastic-job-lite核心模块 -->  
<dependency>  
     <groupId>com.dangdang</groupId>  
     <artifactId>elastic-job-lite-core</artifactId>  
     <version> 2.0 . 5 </version>  
</dependency>  
<!-- 使用springframework自定义命名空间时引入 -->  
<dependency>  
     <groupId>com.dangdang</groupId>  
     <artifactId>elastic-job-lite-spring</artifactId>  
     <version> 2.0 . 5 </version>  
</dependency>

step2. 编写job

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package  com.fanfan.sample001;  
   
import  com.dangdang.ddframe.job.api.ShardingContext;  
import  com.dangdang.ddframe.job.api.simple.SimpleJob;  
   
import  java.util.Date;  
   
/** 
  * Created by fanfan on 2016/12/20. 
  */  
public  class  MySimpleJob  implements  SimpleJob {  
     @Override  
     public  void  execute(ShardingContext shardingContext) {  
         System.out.println(String.format( "------Thread ID: %s, 任务总片数: %s, 当前分片项: %s" ,  
                 Thread.currentThread().getId(), shardingContext.getShardingTotalCount(), shardingContext.getShardingItem()));  
         /** 
          * 实际开发中,有了任务总片数和当前分片项,就可以对任务进行分片执行了 
          * 比如 SELECT * FROM user WHERE status = 0 AND MOD(id, shardingTotalCount) = shardingItem 
          */  
     }  
}

Step3. Spring配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
<?xml version= "1.0"  encoding= "UTF-8" ?>  
<beans xmlns= "http://www.springframework.org/schema/beans"  
        xmlns:xsi= "http://www.w3.org/2001/XMLSchema-instance"  
        xmlns:reg= "http://www.dangdang.com/schema/ddframe/reg"  
        xmlns:job= "http://www.dangdang.com/schema/ddframe/job"  
        xsi:schemaLocation="http: //www.springframework.org/schema/beans  
                         http: //www.springframework.org/schema/beans/spring-beans.xsd  
                         http: //www.dangdang.com/schema/ddframe/reg  
                         http: //www.dangdang.com/schema/ddframe/reg/reg.xsd  
                         http: //www.dangdang.com/schema/ddframe/job  
                         http: //www.dangdang.com/schema/ddframe/job/job.xsd">  
     <!--配置作业注册中心 -->  
     <reg:zookeeper id= "regCenter"  server-lists= "192.168.241.1:2181"  namespace= "dd-job"  
                    base-sleep-time-milliseconds= "1000"  max-sleep-time-milliseconds= "3000"  max-retries= "3"  />  
   
     <!-- 配置作业-->  
     <job:simple id= "mySimpleJob"  class = "com.fanfan.sample001.MySimpleJob"  registry-center-ref= "regCenter"  
                 sharding-total-count= "2"  cron= "0/2 * * * * ?"  overwrite= "true"  />  
   
</beans>

  


Case1. 单节点






Case2. 增加一个节点







Case3. 断开一个节点




作业类型

elastic-job提供了三种类型的作业:Simple类型作业、Dataflow类型作业、Script类型作业。这里主要讲解前两者。Script类型作业意为脚本类型作业,支持shell,python,perl等所有类型脚本,使用不多,可以参见github文档。

SimpleJob需要实现SimpleJob接口,意为简单实现,未经过任何封装,与quartz原生接口相似,比如示例代码中所使用的job。

Dataflow类型用于处理数据流,需实现DataflowJob接口。该接口提供2个方法可供覆盖,分别用于抓取(fetchData)和处理(processData)数据。
可通过DataflowJobConfiguration配置是否流式处理。
流式处理数据只有fetchData方法的返回值为null或集合长度为空时,作业才停止抓取,否则作业将一直运行下去; 非流式处理数据则只会在每次作业执行过程中执行一次fetchData方法和processData方法,随即完成本次作业。
实际开发中,Dataflow类型的job还是很有好用的。

比如拿余额宝计息来说:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package  com.fanfan.sample001;  
   
import  com.dangdang.ddframe.job.api.ShardingContext;  
import  com.dangdang.ddframe.job.api.dataflow.DataflowJob;  
   
import  java.util.ArrayList;  
import  java.util.List;  
   
/** 
  * Created by fanfan on 2016/12/23. 
  */  
public  class  MyDataFlowJob  implements  DataflowJob<User> {  
   
     /* 
         status 
         0:待处理 
         1:已处理 
      */  
   
     @Override  
     public List<User> fetchData(ShardingContext shardingContext) {  
         List<User> users = null;  
         /** 
          * users = SELECT * FROM user WHERE status = 0 AND MOD(id, shardingTotalCount) = shardingItem Limit 0, 30 
          */  
         return users;  
     }  
   
     @Override  
     public void processData(ShardingContext shardingContext, List<User> data) {  
         for (User user: data) {  
             System.out.println(String.format("用户 %s 开始计息", user.getUserId()));  
             user.setStatus(1);  
             /** 
              * update user 
              */  
         }  
     }  
}
1
2
<job:dataflow id= "myDataFlowJob"  class = "com.fanfan.sample001.MyDataFlowJob"  registry-center-ref= "regCenter"  
               sharding-total-count= "2"  cron= "0 0 02 * * ?"  streaming-process= "true"  overwrite= "true"  />

其它功能


上述介绍的是最精简常用的功能。elastic-job的功能集还不止这些,比如像作业事件追踪、任务监听等,另外,elastic-job-lite-console作为一个独立的运维平台还提供了用来查询和操作任务的web页面。
这些增强的功能读者可以在github/elastic-job上自行学习,相信有了本篇博文的基础,再阅读那些文档就特别简单了。




      本文转自布拉君君 51CTO博客,原文链接:http://blog.51cto.com/5148737/1972500,如需转载请自行联系原作者





相关文章
|
Linux
如何看懂火焰图
如何看懂火焰图
1780 0
如何看懂火焰图
|
存储 Nacos 数据库
在 Docker 中部署 Nacos 并挂载配置文件
在 Docker 中部署 Nacos 并挂载配置文件
|
9月前
|
消息中间件 存储 设计模式
RocketMQ原理—5.高可用+高并发+高性能架构
本文主要从高可用架构、高并发架构、高性能架构三个方面来介绍RocketMQ的原理。
3078 21
RocketMQ原理—5.高可用+高并发+高性能架构
|
存储 消息中间件 缓存
面试的系统设计题,给我整懵了。。。
先赞后看,Java进阶一大半小明(化名)坐在密不透风的会议室里,手握着笔,放在桌面上的是满满的两页面试题。其中一道系统设计题是这样。。。微博或者短信都有单条发送字数的限制,如果需要分享一个长网址,很容易越出限制,短链服务可以将长网址变成短网址,方便传播。请设计一个短链服务,要求短网址尽可能短,且保证系统安全和并发能力。各位hao,我是南哥,相信对你通关面试、拿下Offer有所帮助。
296 14
面试的系统设计题,给我整懵了。。。
|
Ubuntu Linux pouch
Docker容器管理工具
文章介绍了Docker容器管理工具,以及早期使用的LXC容器管理工具,包括它们的安装、使用和相关技术特点。
333 10
Docker容器管理工具
|
数据采集 存储 监控
Java爬虫:数据采集的强大工具
在数据驱动的时代,Java爬虫技术凭借其强大的功能和灵活性,成为企业获取市场信息、用户行为及竞争情报的关键工具。本文详细介绍了Java爬虫的工作原理、应用场景、构建方法及其重要性,强调了在合法合规的前提下,如何有效利用Java爬虫技术为企业决策提供支持。
|
机器学习/深度学习 人工智能 算法
主流AI服务——大专生新就业之路
主流AI服务——大专生新就业之路
759 0
|
数据采集 监控 大数据
大数据中的ETL过程详解
【8月更文挑战第25天】ETL过程在大数据中扮演着至关重要的角色。通过合理设计和优化ETL过程,企业可以高效地整合和利用海量数据资源,为数据分析和决策提供坚实的基础。同时,随着技术的不断进步和发展,ETL过程也将不断演进和创新,以更好地满足企业的数据需求。
1162 3
|
SQL Java 关系型数据库
MyBatis-Plus 分页魅力绽放!紧跟技术热点,带你领略数据分页的高效与便捷
【8月更文挑战第29天】在 Java 开发中,数据处理至关重要,尤其在大量数据查询与展示时,分页功能尤为重要。MyBatis-Plus 作为一款强大的持久层框架,提供了便捷高效的分页解决方案。通过封装数据库分页查询语句,开发者能轻松实现分页功能。在实际应用中,只需创建 `Page` 对象并设置页码和每页条数,再通过 `QueryWrapper` 构建查询条件,调用 `selectPage` 方法即可完成分页查询。MyBatis-Plus 不仅生成分页 SQL 语句,还自动处理参数合法性检查,并支持条件查询和排序等功能,极大地提升了系统性能和稳定性。
524 0