定时任务能力进击!Quartz框架的使用

简介: 定时任务能力进击!Quartz框架的使用

前言

之前项目集成了 quartz 但是遇到了点问题,就是项目使用的是nocas,然后之前quartz的配置是写在quartz.properties文件里面的,而且里面有quartz对应的数据库连接信息,而公司的要求是测试生产的数据库连接信息要配置到nocas中,且由测试和运维分别管理,基于此,数据库连接就不能放在quartz.properties文件里面了,于是我将quartz.properties中的数据库配置信息放到nocas中,结果quartz也能使用,但是却暴露了一些问题,下面就来看看他的问题

梳理

  1. quartz配置

    # 实例化ThreadPool时,使用的线程类为SimpleThreadPool org.quartz.threadPool.class=org.quartz.simpl.SimpleThreadPool 
    # threadCount和threadPriority将以setter的形式注入ThreadPool实例
    # 并发个数 
    org.quartz.threadPool.threadCount=10
    # 优先级
    org.quartz.threadPool.threadPriority=5 org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread=true org.quartz.jobStore.misfireThreshold=5000
    #持久化使用的类 org.quartz.jobStore.class=org.quartz.impl.jdbcjobstore.JobStoreTX 
    #数据库中表的前缀 
    org.quartz.jobStore.tablePrefix=QRTZ_ 
    #数据源命名 org.quartz.jobStore.dataSource=qzDS
    #qzDS 数据源,我们使用hikaricp,默认的是c3p0 
    org.quartz.dataSource.qzDS.provider=hikaricp 
    org.quartz.dataSource.qzDS.driver=com.mysql.cj.jdbc.Driver 
    org.quartz.dataSource.qzDS.URL=jdbc:mysql://127.0.0.1:3306/dst_db_message?useUnicode=true&characterEncoding=utf-
    8&useSSL=false&allowMultiQueries=true&serverTimezone=Asia/Shanghai 
    org.quartz.dataSource.qzDS.user=xxx
    org.quartz.dataSource.qzDS.password=xxx org.quartz.dataSource.qzDS.maxConnections=10
  2. 源码分析

    把里面的信息配置到nocas中,会发现定时任务也能执行,但是他是基于内存的,那我是怎么找到的呢,因为我看了下他的源码部分

    quartz执行任务的核心类是QuartzSchedulerThread,我们看下代码执行逻辑,

    /**
     * <p>
     * The main processing loop of the <code>QuartzSchedulerThread</code>.
     * </p>
     */
    @Override
    public void run() {
        int acquiresFailed = 0;
        // halted(停止)默认为false,当QuartzScheduler执行shutdown()时才会更新为true
        while (!halted.get()) {
            try {
                // paused(暂停) 默认是true 当QuartzScheduler执行start()时 更新为false;
                synchronized (sigLock) {
                    while (paused && !halted.get()) {
                        try {
                            // wait until togglePause(false) is called...
                            sigLock.wait(1000L);
                        } catch (InterruptedException ignore) {
                        }

    是通过这个线程调用方法来执行的,由于次方法比较长,直接到核心的地方

    try {
        // 查询一段时间内将要被调度的triggers
        triggers = qsRsrcs.getJobStore().acquireNextTriggers(
                now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());
        acquiresFailed = 0;
        if (log.isDebugEnabled())
            log.debug("batch acquisition of " + (triggers == null ? 0 : triggers.size()) + " triggers");
    } catch (JobPersistenceException jpe) {
        if (acquiresFailed == 0) {
            qs.notifySchedulerListenersError(
                "An error occurred while scanning for the next triggers to fire.",
                jpe);
        }
        if (acquiresFailed < Integer.MAX_VALUE)
            acquiresFailed++;
        continue;
    } catch (RuntimeException e) {

    之前文章有提到,quarz框架是通过三个能力来处理的,schedule是具体执行的类,triggers是什么时候去执行,这个方式就是获取到trigger来进行调用具体的执行, qsRsrcs.getJobStore()是获取具体的执行JobStore,我们看下JobStore

    public interface JobStore {
/**
 * Called by the QuartzScheduler before the <code>JobStore</code> is
 * used, in order to give the it a chance to initialize.
 */
void initialize(ClassLoadHelper loadHelper, SchedulerSignaler signaler) 
    throws SchedulerConfigException;
```

他是一个接口,有很多实现,

![](https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/7b12a215cac84f3bb1d18a31d3e32e7c~tplv-k3u1fbpfcp-zoom-1.image)

而具体那个实现是由我们自己配置的,就是上面quartz.properties文件中配置的参数

```
 org.quartz.jobStore.class=org.quartz.impl.jdbcjobstore.JobStoreTX
```

JobStoreTX就是具体的实现,但是在源码调试过程中,发现了一个问题,项目运行时获取的实现并不是JobStoreTX,而是RAMJobStore,这是为什么呢,为什么我的配置没有生效呢,后来发现是因为我们不在项目根目录中配置quartz.properties,那么quartz框架会读取自己的quartz.properties文件,我们看下他的位置

![](https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/e98a44a11dc144c991db61d29397bf57~tplv-k3u1fbpfcp-zoom-1.image)

他是在quartz自己的jar包中,再看下他的配置

```
# Default Properties file for use by StdSchedulerFactory
# to create a Quartz Scheduler Instance, if a different
# properties file is not explicitly specified.
#


org.quartz.scheduler.instanceName: DefaultQuartzScheduler
org.quartz.scheduler.rmi.export: false
org.quartz.scheduler.rmi.proxy: false
org.quartz.scheduler.wrapJobExecutionInUserTransaction: false


org.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPool
org.quartz.threadPool.threadCount: 10
org.quartz.threadPool.threadPriority: 5
org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread: true


org.quartz.jobStore.misfireThreshold: 60000


org.quartz.jobStore.class: org.quartz.simpl.RAMJobStore
```

这时候就真像了,因为配置文件里面配置的就是RAMJobStore,那他的能力是什么,查阅到,其实他是基于内存的定时任务处理,也就是新建的任务不会放到数据库,而是放到内存中,不会持久化,当程序重启之后,之前设立的定时任务就不会执行了,显然是有问题的

如何解决呢?

不能在项目中新建quartz.properties去覆盖,经过一番思考忠厚,发现其实还有另外一种方法,就是在代码中取设置quartz的配置

```
@Bean
public SchedulerFactoryBean schedulerFactoryBean() throws IOException {
    SchedulerFactoryBean factory = new SchedulerFactoryBean();
    Properties props = new Properties();
    props.put(StdSchedulerFactory.PROP_THREAD_POOL_CLASS, "org.quartz.simpl.SimpleThreadPool");
    props.put(StdSchedulerFactory.PROP_JOB_STORE_CLASS, "org.quartz.impl.jdbcjobstore.JobStoreTX");
    props.put(StdSchedulerFactory.PROP_JOB_STORE_PREFIX + ".tablePrefix", "QRTZ_");
    props.put(StdSchedulerFactory.PROP_JOB_STORE_PREFIX + ".dataSource", "qzDS");
    // 启用分布式锁 虽然源码中 属性是boolean 但是传入要传字符串true 否则不生效
    props.put(StdSchedulerFactory.PROP_JOB_STORE_PREFIX + ".acquireTriggersWithinLock", "true");
    props.put(StdSchedulerFactory.PROP_JOB_STORE_PREFIX + ".misfireThreshold", "5000");
    props.put(StdSchedulerFactory.PROP_DATASOURCE_PREFIX + ".qzDS.provider", provider);
    props.put(StdSchedulerFactory.PROP_DATASOURCE_PREFIX + ".qzDS.driver", driver);
    props.put(StdSchedulerFactory.PROP_DATASOURCE_PREFIX + ".qzDS.URL", URL);
    props.put(StdSchedulerFactory.PROP_DATASOURCE_PREFIX + ".qzDS.user", user);
    props.put(StdSchedulerFactory.PROP_DATASOURCE_PREFIX + ".qzDS.password", password);
    props.put(StdSchedulerFactory.PROP_DATASOURCE_PREFIX + ".qzDS.maxConnections", maxConnections);
    props.put(StdSchedulerFactory.PROP_THREAD_POOL_PREFIX + ".threadCount", "10");
    props.put(StdSchedulerFactory.PROP_THREAD_POOL_PREFIX + ".threadPriority", "5");
    props.put(StdSchedulerFactory.PROP_THREAD_POOL_PREFIX + ".threadsInheritContextClassLoaderOfInitializingThread", "true");
    factory.setQuartzProperties(props);
    factory.setJobFactory(jobFactory);
    return factory;
}
```

通过这种方式就可以设置quartz的配置文件,看来当一条路走不下去,肯定有另一条路。

解决了这个问题之后,还有一个问题,quartz分布式定时任务调用
  1. quartz分布式定时任务调用

    之前的文章有写,自己实现的定时任务调用处理分布式问题,是使用的redis锁,但是quartz要怎么处理呢,在网上查quartz是支持分布式的,那是不是要配置什么呢,我们继续看看源码,还是这段

    try {
        // 查询一段时间内将要被调度的triggers
        triggers = qsRsrcs.getJobStore().acquireNextTriggers(
                now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());
        acquiresFailed = 0;
        if (log.isDebugEnabled())
            log.debug("batch acquisition of " + (triggers == null ? 0 : triggers.size()) + " triggers");
    } catch (JobPersistenceException jpe) {
        if (acquiresFailed == 0) {
            qs.notifySchedulerListenersError(
                "An error occurred while scanning for the next triggers to fire.",
                jpe);
        }
        if (acquiresFailed < Integer.MAX_VALUE)
            acquiresFailed++;
        continue;
    } catch (RuntimeException e) {

    我们看下acquireNextTriggers方法的具体实现

    @SuppressWarnings("unchecked")
    public List<OperableTrigger> acquireNextTriggers(final long noLaterThan, final int maxCount, final long timeWindow)
        throws JobPersistenceException {
    
        String lockName;
        if(isAcquireTriggersWithinLock() || maxCount > 1) { 
            lockName = LOCK_TRIGGER_ACCESS;
        } else {
            lockName = null;
        }
        return executeInNonManagedTXLock(lockName, 

    我们看到这里面设置了lockName,当isAcquireTriggersWithinLock()方法返回为true时,就会加锁,再看看isAcquireTriggersWithinLock()方法

    private boolean acquireTriggersWithinLock = false;
    public boolean isAcquireTriggersWithinLock() {
        return acquireTriggersWithinLock;
    }

    我们看到acquireTriggersWithinLock属性,而他的默认值是false,也就是说默认情况下,是不会开启分布式锁的,也就是说如果不配置,是不会开启分布式锁的,于是查阅资料,需要加上

org.quartz.jobStore.acquireTriggersWithinLock=true 
这样就可以开启分布式锁,对应的代码设置就是

```
    props.put(StdSchedulerFactory.PROP_JOB_STORE_PREFIX + ".acquireTriggersWithinLock", "true");
```

启用分布式锁 虽然源码中 属性是boolean 但是传入要传字符串true 否则不生效,这一点要铭记

了解了这个参数后,我们再看一下分布式锁的执行逻辑

我们看到方法后面调用executeInNonManagedTXLock方法,他的实现是



```
protected <T> T executeInNonManagedTXLock(
        String lockName, 
        TransactionCallback<T> txCallback, final TransactionValidator<T> txValidator) throws JobPersistenceException {
    boolean transOwner = false;
    Connection conn = null;
    try {
        if (lockName != null) {
            // If we aren't using db locks, then delay getting DB connection 
            // until after acquiring the lock since it isn't needed.
            if (getLockHandler().requiresConnection()) {
                conn = getNonManagedTXConnection();
            }
            transOwner = getLockHandler().obtainLock(conn, lockName);
        }        
        if (conn == null) {
            conn = getNonManagedTXConnection();
        }        
        final T result = txCallback.execute(conn);
        try {
            commitConnection(conn);
        } catch (JobPersistenceException e) {
            rollbackConnection(conn);
            if (txValidator == null || !retryExecuteInNonManagedTXLock(lockName, new TransactionCallback<Boolean>() {
                @Override
                public Boolean execute(Connection conn) throws JobPersistenceException {
                    return txValidator.validate(conn, result);
                }
            })) {
                throw e;
            }
        }
        Long sigTime = clearAndGetSignalSchedulingChangeOnTxCompletion();
        if(sigTime != null && sigTime >= 0) {
            signalSchedulingChangeImmediately(sigTime);
        }        
        return result;
    } catch (JobPersistenceException e) {
        rollbackConnection(conn);
        throw e;
    } catch (RuntimeException e) {
        rollbackConnection(conn);
        throw new JobPersistenceException("Unexpected runtime exception: "
                + e.getMessage(), e);
    } finally {
        try {
            releaseLock(lockName, transOwner);
        } finally {
            cleanupConnection(conn);
        }
    }
}
```

他的过程就是获取锁,执行逻辑,释放锁的过程,我们看看获取锁的过程

```
transOwner = getLockHandler().obtainLock(conn, lockName)




public boolean obtainLock(Connection conn, String lockName)
    throws LockException {


    if(log.isDebugEnabled()) {
        log.debug(
            "Lock '" + lockName + "' is desired by: "
                    + Thread.currentThread().getName());
    }
    if (!isLockOwner(lockName)) {


        executeSQL(conn, lockName, expandedSQL, expandedInsertSQL);

        if(log.isDebugEnabled()) {
            log.debug(
                "Lock '" + lockName + "' given to: "
                        + Thread.currentThread().getName());
        }
        getThreadLocks().add(lockName);
        //getThreadLocksObtainer().put(lockName, new
        // Exception("Obtainer..."));
    } else if(log.isDebugEnabled()) {
        log.debug(
            "Lock '" + lockName + "' Is already owned by: "
                    + Thread.currentThread().getName());
    }


    return true;
}
```


秘密就在expandedSQL里面,我们看下这个sql

```
public static final String SELECT_FOR_LOCK = "SELECT * FROM "
        + TABLE_PREFIX_SUBST + TABLE_LOCKS + " WHERE " + COL_SCHEDULER_NAME + " = " + SCHED_NAME_SUBST
        + " AND " + COL_LOCK_NAME + " = ? FOR UPDATE";
```

重点就是FOR UPDATE,这个是mysql的行锁,当多个节点共同执行定时任务时,最先到的节点会通过for update在数据库行程行锁,其他节点进入等待状态,获取到锁的节点继续执行逻辑,也就是

```
final T result = txCallback.execute(conn);


return executeInNonManagedTXLock(lockName, 
        new TransactionCallback<List<OperableTrigger>>() {
            public List<OperableTrigger> execute(Connection conn) throws JobPersistenceException {
                return acquireNextTrigger(conn, noLaterThan, maxCount, timeWindow);
            }
        },
```

对应的是方法是acquireNextTrigger


```
protected List<OperableTrigger> acquireNextTrigger(Connection conn, long noLaterThan, int maxCount, long timeWindow)
    throws JobPersistenceException {
    if (timeWindow < 0) {
      throw new IllegalArgumentException();
    }

    List<OperableTrigger> acquiredTriggers = new ArrayList<OperableTrigger>();
    Set<JobKey> acquiredJobKeysForNoConcurrentExec = new HashSet<JobKey>();
    final int MAX_DO_LOOP_RETRY = 3;
    int currentLoopCount = 0;
    do {
        currentLoopCount ++;
        try {
            List<TriggerKey> keys = getDelegate().selectTriggerToAcquire(conn, noLaterThan + timeWindow, getMisfireTime(), maxCount)

public List<TriggerKey> selectTriggerToAcquire(Connection conn, long noLaterThan, long noEarlierThan, int maxCount)
    throws SQLException {
    PreparedStatement ps = null;
    ResultSet rs = null;
    List<TriggerKey> nextTriggers = new LinkedList<TriggerKey>();
    try {
        ps = conn.prepareStatement(rtp(SELECT_NEXT_TRIGGER_TO_ACQUIRE));
```

selectTriggerToAcquire查询tigger为wait状态的数据,执行之后会把tigger的状态改为acquire状态,都执行之后,释放锁,这样下一个节点获取到锁之后,通过 selectTriggerToAcquire查询就不会查询到相同的数据,保证了定时任务的分布式执行。

相关文章
|
存储 负载均衡 监控
分布式定时任务,你了解多少?基于Quartz实现分布式定时任务解决方案!
定时任务系统在应用平台中的重要性不言而喻,特别是互联网电商、金融等行业更是离不开定时任务。在任务数量不多、执行频率不高时,单台服务器完全能够满足。但是随着业务逐渐增加,定时任务系统必须具备高可用和水平扩展的能力,单台服务器已经不能满足需求。因此需要把定时任务系统部署到集群中,实现分布式定时任务系统集群。
4696 1
分布式定时任务,你了解多少?基于Quartz实现分布式定时任务解决方案!
|
监控 Dubbo Java
分布式定时任务调度框架实践
分布式定时任务调度框架实践
758 1
|
调度
90分布式电商项目 - SpringTask任务调度框架
90分布式电商项目 - SpringTask任务调度框架
56 0
|
存储 开发框架 Java
分布式定时任务框架Quartz总结和实践(1)
Quartz是OpenSymphony开源组织在Job scheduling领域又一个开源项目,它可以与J2EE与J2SE应用程序相结合也可以单独使用。Quartz可以用来创建简单或为运行十个,百个,甚至是好几万个Jobs这样复杂的程序。Jobs可以做成标准的Java组件或 EJBs。
189 0
|
分布式计算 前端开发 数据可视化
你只会用 xxl-job?一款更强大、新一代分布式任务调度框架来了,太强大了!
你只会用 xxl-job?一款更强大、新一代分布式任务调度框架来了,太强大了!
900 0
你只会用 xxl-job?一款更强大、新一代分布式任务调度框架来了,太强大了!
|
调度
【解决方案 二十】作业调度系统cron表达式详解
【解决方案 二十】作业调度系统cron表达式详解
107 0
|
消息中间件 监控 算法
分布式定时任务框架选型,写的太好了 !
分布式定时任务框架选型,写的太好了 !
|
消息中间件 监控 算法
分布式定时任务框架选型
分布式定时任务框架选型
|
Java 测试技术 调度
【优化技术专题】「温故而知新」基于Quartz系列的任务调度框架的动态化任务实现分析
【优化技术专题】「温故而知新」基于Quartz系列的任务调度框架的动态化任务实现分析
155 0
【优化技术专题】「温故而知新」基于Quartz系列的任务调度框架的动态化任务实现分析
|
XML druid 前端开发
【SpringBoot技术指南】「开发实战系列」动态化Quartz任务调度机制+实时推送任务数据到前端
【SpringBoot技术指南】「开发实战系列」动态化Quartz任务调度机制+实时推送任务数据到前端
290 0
【SpringBoot技术指南】「开发实战系列」动态化Quartz任务调度机制+实时推送任务数据到前端