前言
之前项目集成了 quartz 但是遇到了点问题,就是项目使用的是nocas,然后之前quartz的配置是写在quartz.properties文件里面的,而且里面有quartz对应的数据库连接信息,而公司的要求是测试生产的数据库连接信息要配置到nocas中,且由测试和运维分别管理,基于此,数据库连接就不能放在quartz.properties文件里面了,于是我将quartz.properties中的数据库配置信息放到nocas中,结果quartz也能使用,但是却暴露了一些问题,下面就来看看他的问题
梳理
quartz配置
# 实例化ThreadPool时,使用的线程类为SimpleThreadPool org.quartz.threadPool.class=org.quartz.simpl.SimpleThreadPool # threadCount和threadPriority将以setter的形式注入ThreadPool实例 # 并发个数 org.quartz.threadPool.threadCount=10 # 优先级 org.quartz.threadPool.threadPriority=5 org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread=true org.quartz.jobStore.misfireThreshold=5000 #持久化使用的类 org.quartz.jobStore.class=org.quartz.impl.jdbcjobstore.JobStoreTX #数据库中表的前缀 org.quartz.jobStore.tablePrefix=QRTZ_ #数据源命名 org.quartz.jobStore.dataSource=qzDS #qzDS 数据源,我们使用hikaricp,默认的是c3p0 org.quartz.dataSource.qzDS.provider=hikaricp org.quartz.dataSource.qzDS.driver=com.mysql.cj.jdbc.Driver org.quartz.dataSource.qzDS.URL=jdbc:mysql://127.0.0.1:3306/dst_db_message?useUnicode=true&characterEncoding=utf- 8&useSSL=false&allowMultiQueries=true&serverTimezone=Asia/Shanghai org.quartz.dataSource.qzDS.user=xxx org.quartz.dataSource.qzDS.password=xxx org.quartz.dataSource.qzDS.maxConnections=10
源码分析
把里面的信息配置到nocas中,会发现定时任务也能执行,但是他是基于内存的,那我是怎么找到的呢,因为我看了下他的源码部分
quartz执行任务的核心类是QuartzSchedulerThread,我们看下代码执行逻辑,
/** * <p> * The main processing loop of the <code>QuartzSchedulerThread</code>. * </p> */ @Override public void run() { int acquiresFailed = 0; // halted(停止)默认为false,当QuartzScheduler执行shutdown()时才会更新为true while (!halted.get()) { try { // paused(暂停) 默认是true 当QuartzScheduler执行start()时 更新为false; synchronized (sigLock) { while (paused && !halted.get()) { try { // wait until togglePause(false) is called... sigLock.wait(1000L); } catch (InterruptedException ignore) { }
是通过这个线程调用方法来执行的,由于次方法比较长,直接到核心的地方
try { // 查询一段时间内将要被调度的triggers triggers = qsRsrcs.getJobStore().acquireNextTriggers( now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow()); acquiresFailed = 0; if (log.isDebugEnabled()) log.debug("batch acquisition of " + (triggers == null ? 0 : triggers.size()) + " triggers"); } catch (JobPersistenceException jpe) { if (acquiresFailed == 0) { qs.notifySchedulerListenersError( "An error occurred while scanning for the next triggers to fire.", jpe); } if (acquiresFailed < Integer.MAX_VALUE) acquiresFailed++; continue; } catch (RuntimeException e) {
之前文章有提到,quarz框架是通过三个能力来处理的,schedule是具体执行的类,triggers是什么时候去执行,这个方式就是获取到trigger来进行调用具体的执行, qsRsrcs.getJobStore()是获取具体的执行JobStore,我们看下JobStore
public interface JobStore {
/**
* Called by the QuartzScheduler before the <code>JobStore</code> is
* used, in order to give the it a chance to initialize.
*/
void initialize(ClassLoadHelper loadHelper, SchedulerSignaler signaler)
throws SchedulerConfigException;
```
他是一个接口,有很多实现,
![](https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/7b12a215cac84f3bb1d18a31d3e32e7c~tplv-k3u1fbpfcp-zoom-1.image)
而具体那个实现是由我们自己配置的,就是上面quartz.properties文件中配置的参数
```
org.quartz.jobStore.class=org.quartz.impl.jdbcjobstore.JobStoreTX
```
JobStoreTX就是具体的实现,但是在源码调试过程中,发现了一个问题,项目运行时获取的实现并不是JobStoreTX,而是RAMJobStore,这是为什么呢,为什么我的配置没有生效呢,后来发现是因为我们不在项目根目录中配置quartz.properties,那么quartz框架会读取自己的quartz.properties文件,我们看下他的位置
![](https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/e98a44a11dc144c991db61d29397bf57~tplv-k3u1fbpfcp-zoom-1.image)
他是在quartz自己的jar包中,再看下他的配置
```
# Default Properties file for use by StdSchedulerFactory
# to create a Quartz Scheduler Instance, if a different
# properties file is not explicitly specified.
#
org.quartz.scheduler.instanceName: DefaultQuartzScheduler
org.quartz.scheduler.rmi.export: false
org.quartz.scheduler.rmi.proxy: false
org.quartz.scheduler.wrapJobExecutionInUserTransaction: false
org.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPool
org.quartz.threadPool.threadCount: 10
org.quartz.threadPool.threadPriority: 5
org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread: true
org.quartz.jobStore.misfireThreshold: 60000
org.quartz.jobStore.class: org.quartz.simpl.RAMJobStore
```
这时候就真像了,因为配置文件里面配置的就是RAMJobStore,那他的能力是什么,查阅到,其实他是基于内存的定时任务处理,也就是新建的任务不会放到数据库,而是放到内存中,不会持久化,当程序重启之后,之前设立的定时任务就不会执行了,显然是有问题的
如何解决呢?
不能在项目中新建quartz.properties去覆盖,经过一番思考忠厚,发现其实还有另外一种方法,就是在代码中取设置quartz的配置
```
@Bean
public SchedulerFactoryBean schedulerFactoryBean() throws IOException {
SchedulerFactoryBean factory = new SchedulerFactoryBean();
Properties props = new Properties();
props.put(StdSchedulerFactory.PROP_THREAD_POOL_CLASS, "org.quartz.simpl.SimpleThreadPool");
props.put(StdSchedulerFactory.PROP_JOB_STORE_CLASS, "org.quartz.impl.jdbcjobstore.JobStoreTX");
props.put(StdSchedulerFactory.PROP_JOB_STORE_PREFIX + ".tablePrefix", "QRTZ_");
props.put(StdSchedulerFactory.PROP_JOB_STORE_PREFIX + ".dataSource", "qzDS");
// 启用分布式锁 虽然源码中 属性是boolean 但是传入要传字符串true 否则不生效
props.put(StdSchedulerFactory.PROP_JOB_STORE_PREFIX + ".acquireTriggersWithinLock", "true");
props.put(StdSchedulerFactory.PROP_JOB_STORE_PREFIX + ".misfireThreshold", "5000");
props.put(StdSchedulerFactory.PROP_DATASOURCE_PREFIX + ".qzDS.provider", provider);
props.put(StdSchedulerFactory.PROP_DATASOURCE_PREFIX + ".qzDS.driver", driver);
props.put(StdSchedulerFactory.PROP_DATASOURCE_PREFIX + ".qzDS.URL", URL);
props.put(StdSchedulerFactory.PROP_DATASOURCE_PREFIX + ".qzDS.user", user);
props.put(StdSchedulerFactory.PROP_DATASOURCE_PREFIX + ".qzDS.password", password);
props.put(StdSchedulerFactory.PROP_DATASOURCE_PREFIX + ".qzDS.maxConnections", maxConnections);
props.put(StdSchedulerFactory.PROP_THREAD_POOL_PREFIX + ".threadCount", "10");
props.put(StdSchedulerFactory.PROP_THREAD_POOL_PREFIX + ".threadPriority", "5");
props.put(StdSchedulerFactory.PROP_THREAD_POOL_PREFIX + ".threadsInheritContextClassLoaderOfInitializingThread", "true");
factory.setQuartzProperties(props);
factory.setJobFactory(jobFactory);
return factory;
}
```
通过这种方式就可以设置quartz的配置文件,看来当一条路走不下去,肯定有另一条路。
解决了这个问题之后,还有一个问题,quartz分布式定时任务调用
quartz分布式定时任务调用
之前的文章有写,自己实现的定时任务调用处理分布式问题,是使用的redis锁,但是quartz要怎么处理呢,在网上查quartz是支持分布式的,那是不是要配置什么呢,我们继续看看源码,还是这段
try { // 查询一段时间内将要被调度的triggers triggers = qsRsrcs.getJobStore().acquireNextTriggers( now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow()); acquiresFailed = 0; if (log.isDebugEnabled()) log.debug("batch acquisition of " + (triggers == null ? 0 : triggers.size()) + " triggers"); } catch (JobPersistenceException jpe) { if (acquiresFailed == 0) { qs.notifySchedulerListenersError( "An error occurred while scanning for the next triggers to fire.", jpe); } if (acquiresFailed < Integer.MAX_VALUE) acquiresFailed++; continue; } catch (RuntimeException e) {
我们看下acquireNextTriggers方法的具体实现
@SuppressWarnings("unchecked") public List<OperableTrigger> acquireNextTriggers(final long noLaterThan, final int maxCount, final long timeWindow) throws JobPersistenceException { String lockName; if(isAcquireTriggersWithinLock() || maxCount > 1) { lockName = LOCK_TRIGGER_ACCESS; } else { lockName = null; } return executeInNonManagedTXLock(lockName,
我们看到这里面设置了lockName,当isAcquireTriggersWithinLock()方法返回为true时,就会加锁,再看看isAcquireTriggersWithinLock()方法
private boolean acquireTriggersWithinLock = false; public boolean isAcquireTriggersWithinLock() { return acquireTriggersWithinLock; }
我们看到acquireTriggersWithinLock属性,而他的默认值是false,也就是说默认情况下,是不会开启分布式锁的,也就是说如果不配置,是不会开启分布式锁的,于是查阅资料,需要加上
org.quartz.jobStore.acquireTriggersWithinLock=true
这样就可以开启分布式锁,对应的代码设置就是
```
props.put(StdSchedulerFactory.PROP_JOB_STORE_PREFIX + ".acquireTriggersWithinLock", "true");
```
启用分布式锁 虽然源码中 属性是boolean 但是传入要传字符串true 否则不生效,这一点要铭记
了解了这个参数后,我们再看一下分布式锁的执行逻辑
我们看到方法后面调用executeInNonManagedTXLock方法,他的实现是
```
protected <T> T executeInNonManagedTXLock(
String lockName,
TransactionCallback<T> txCallback, final TransactionValidator<T> txValidator) throws JobPersistenceException {
boolean transOwner = false;
Connection conn = null;
try {
if (lockName != null) {
// If we aren't using db locks, then delay getting DB connection
// until after acquiring the lock since it isn't needed.
if (getLockHandler().requiresConnection()) {
conn = getNonManagedTXConnection();
}
transOwner = getLockHandler().obtainLock(conn, lockName);
}
if (conn == null) {
conn = getNonManagedTXConnection();
}
final T result = txCallback.execute(conn);
try {
commitConnection(conn);
} catch (JobPersistenceException e) {
rollbackConnection(conn);
if (txValidator == null || !retryExecuteInNonManagedTXLock(lockName, new TransactionCallback<Boolean>() {
@Override
public Boolean execute(Connection conn) throws JobPersistenceException {
return txValidator.validate(conn, result);
}
})) {
throw e;
}
}
Long sigTime = clearAndGetSignalSchedulingChangeOnTxCompletion();
if(sigTime != null && sigTime >= 0) {
signalSchedulingChangeImmediately(sigTime);
}
return result;
} catch (JobPersistenceException e) {
rollbackConnection(conn);
throw e;
} catch (RuntimeException e) {
rollbackConnection(conn);
throw new JobPersistenceException("Unexpected runtime exception: "
+ e.getMessage(), e);
} finally {
try {
releaseLock(lockName, transOwner);
} finally {
cleanupConnection(conn);
}
}
}
```
他的过程就是获取锁,执行逻辑,释放锁的过程,我们看看获取锁的过程
```
transOwner = getLockHandler().obtainLock(conn, lockName)
public boolean obtainLock(Connection conn, String lockName)
throws LockException {
if(log.isDebugEnabled()) {
log.debug(
"Lock '" + lockName + "' is desired by: "
+ Thread.currentThread().getName());
}
if (!isLockOwner(lockName)) {
executeSQL(conn, lockName, expandedSQL, expandedInsertSQL);
if(log.isDebugEnabled()) {
log.debug(
"Lock '" + lockName + "' given to: "
+ Thread.currentThread().getName());
}
getThreadLocks().add(lockName);
//getThreadLocksObtainer().put(lockName, new
// Exception("Obtainer..."));
} else if(log.isDebugEnabled()) {
log.debug(
"Lock '" + lockName + "' Is already owned by: "
+ Thread.currentThread().getName());
}
return true;
}
```
秘密就在expandedSQL里面,我们看下这个sql
```
public static final String SELECT_FOR_LOCK = "SELECT * FROM "
+ TABLE_PREFIX_SUBST + TABLE_LOCKS + " WHERE " + COL_SCHEDULER_NAME + " = " + SCHED_NAME_SUBST
+ " AND " + COL_LOCK_NAME + " = ? FOR UPDATE";
```
重点就是FOR UPDATE,这个是mysql的行锁,当多个节点共同执行定时任务时,最先到的节点会通过for update在数据库行程行锁,其他节点进入等待状态,获取到锁的节点继续执行逻辑,也就是
```
final T result = txCallback.execute(conn);
return executeInNonManagedTXLock(lockName,
new TransactionCallback<List<OperableTrigger>>() {
public List<OperableTrigger> execute(Connection conn) throws JobPersistenceException {
return acquireNextTrigger(conn, noLaterThan, maxCount, timeWindow);
}
},
```
对应的是方法是acquireNextTrigger
```
protected List<OperableTrigger> acquireNextTrigger(Connection conn, long noLaterThan, int maxCount, long timeWindow)
throws JobPersistenceException {
if (timeWindow < 0) {
throw new IllegalArgumentException();
}
List<OperableTrigger> acquiredTriggers = new ArrayList<OperableTrigger>();
Set<JobKey> acquiredJobKeysForNoConcurrentExec = new HashSet<JobKey>();
final int MAX_DO_LOOP_RETRY = 3;
int currentLoopCount = 0;
do {
currentLoopCount ++;
try {
List<TriggerKey> keys = getDelegate().selectTriggerToAcquire(conn, noLaterThan + timeWindow, getMisfireTime(), maxCount)
public List<TriggerKey> selectTriggerToAcquire(Connection conn, long noLaterThan, long noEarlierThan, int maxCount)
throws SQLException {
PreparedStatement ps = null;
ResultSet rs = null;
List<TriggerKey> nextTriggers = new LinkedList<TriggerKey>();
try {
ps = conn.prepareStatement(rtp(SELECT_NEXT_TRIGGER_TO_ACQUIRE));
```
selectTriggerToAcquire查询tigger为wait状态的数据,执行之后会把tigger的状态改为acquire状态,都执行之后,释放锁,这样下一个节点获取到锁之后,通过 selectTriggerToAcquire查询就不会查询到相同的数据,保证了定时任务的分布式执行。