前言
前面说过当paused设置为false,QuartzSchedulerThread才正式启动,我们接着《Quartz与Spring集成——创建调度器》与《Quartz与Spring集成——启动调度器》中QuartzSchedulerThread启动的部分接着展开分析,QuartzSchedulerThread的run方法紧接着会从线程池获取可用的线程数,代码如下:
int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();
只有availThreadCount大于0时才会进行真正的调度,否则将轮询等待线程的释放。所以我们来看看可用线程数充足的情况下的执行过程。
获取触发器
获取触发器的代码见代码清单1。其中调用了JobStore的acquireNextTriggers方法来获取触发器。
List<OperableTrigger> triggers = null;
long now = System.currentTimeMillis();
clearSignaledSchedulingChange();
try {
triggers = qsRsrcs.getJobStore().acquireNextTriggers(
now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());
lastAcquireFailed = false;
if (log.isDebugEnabled())
log.debug("batch acquisition of " + (triggers == null ? 0 : triggers.size()) + " triggers");
} catch (JobPersistenceException jpe) {
//省略异常信息
} catch (RuntimeException e) {
//省略异常信息
}
以JobStore的实现类LocalDataSourceJobStore来具体看看acquireNextTriggers方法的执行内容。LocalDataSourceJobStore继承了父类JobStoreSupport的acquireNextTriggers方法(见代码清单2),此方法用于从数据源获取触发器。
public List<OperableTrigger> acquireNextTriggers(final long noLaterThan, final int maxCount, final long timeWindow)
throws JobPersistenceException {
String lockName;
if(isAcquireTriggersWithinLock() || maxCount > 1) {
lockName = LOCK_TRIGGER_ACCESS;
} else {
lockName = null;
}
return executeInNonManagedTXLock(lockName,
new TransactionCallback<List<OperableTrigger>>() {
public List<OperableTrigger> execute(Connection conn) throws JobPersistenceException {
return acquireNextTrigger(conn, noLaterThan, maxCount, timeWindow);
}
},
new TransactionValidator<List<OperableTrigger>>() {
public Boolean validate(Connection conn, List<OperableTrigger> result) throws JobPersistenceException {
try {
List<FiredTriggerRecord> acquired = getDelegate().selectInstancesFiredTriggerRecords(conn, getInstanceId());
Set<String> fireInstanceIds = new HashSet<String>();
for (FiredTriggerRecord ft : acquired) {
fireInstanceIds.add(ft.getFireInstanceId());
}
for (OperableTrigger tr : result) {
if (fireInstanceIds.contains(tr.getFireInstanceId())) {
return true;
}
}
return false;
} catch (SQLException e) {
throw new JobPersistenceException("error validating trigger acquisition", e);
}
}
});
}
JobStoreSupport的acquireNextTriggers方法,主要调用了executeInNonManagedTXLock方法(见代码清单3),其执行逻辑如下:
- 获取数据库连接;
- 回调txCallback(即代码清单2中的TransactionCallback的匿名类)的execute方法,因此调用了acquireNextTrigger方法获取触发器;
- 调用commitConnection方法提交第2步中的所有sql;
- 返回获取的触发器集合;
protected <T> T executeInNonManagedTXLock(
String lockName,
TransactionCallback<T> txCallback, final TransactionValidator<T> txValidator) throws JobPersistenceException {
boolean transOwner = false;
Connection conn = null;
try {
if (lockName != null) {
// If we aren't using db locks, then delay getting DB connection
// until after acquiring the lock since it isn't needed.
if (getLockHandler().requiresConnection()) {
conn = getNonManagedTXConnection();
}
transOwner = getLockHandler().obtainLock(conn, lockName);
}
if (conn == null) {
conn = getNonManagedTXConnection();
}
final T result = txCallback.execute(conn);
try {
commitConnection(conn);
} catch (JobPersistenceException e) {
rollbackConnection(conn);
if (txValidator == null || !retryExecuteInNonManagedTXLock(lockName, new TransactionCallback<Boolean>() {
@Override
public Boolean execute(Connection conn) throws JobPersistenceException {
return txValidator.validate(conn, result);
}
})) {
throw e;
}
}
Long sigTime = clearAndGetSignalSchedulingChangeOnTxCompletion();
if(sigTime != null && sigTime >= 0) {
signalSchedulingChangeImmediately(sigTime);
}
return result;
} catch (JobPersistenceException e) {
rollbackConnection(conn);
throw e;
} catch (RuntimeException e) {
rollbackConnection(conn);
throw new JobPersistenceException("Unexpected runtime exception: "
+ e.getMessage(), e);
} finally {
try {
releaseLock(lockName, transOwner);
} finally {
cleanupConnection(conn);
}
}
}
acquireNextTrigger方法用于获取触发器,它的执行步骤如下:
首先,查询状态为WAITING的触发器(见代码清单4),以StdJDBCDelegate为例,其selectTriggerToAcquire方法(使用JDBC的API,留给读者自己去看)实际就是执行sql查询触发器,执行的sql为:
SELECT TRIGGER_NAME, TRIGGER_GROUP, NEXT_FIRE_TIME, PRIORITY FROM {0}TRIGGERS WHERE SCHED_NAME = {1} AND TRIGGER_STATE = ? AND NEXT_FIRE_TIME <= ? AND (MISFIRE_INSTR = -1 OR (MISFIRE_INSTR != -1 AND NEXT_FIRE_TIME >= ?)) ORDER BY NEXT_FIRE_TIME ASC, PRIORITY DESC。
TRIGGER_STATE的条件是WAITING,NEXT_FIRE_TIME小于最迟的触发时间,并且要大于最早的触发时间。
注意:本文所有sql中的{0}为QRTZ_,{1}为'schedulerFactoryBean'。
代码清单4
List<TriggerKey> keys = getDelegate().selectTriggerToAcquire(conn, noLaterThan + timeWindow, getMisfireTime(), maxCount);
其次,遍历集合keys中的TriggerKey,在循环中执行以下步骤:
- 从表QRTZ_TRIGGERS中查询触发器,代码为:
retrieveTrigger方法内部实际执行了Delegate的selectTrigger方法。以StdJDBCDelegate为例,其selectTrigger方法根据TriggerKey查询触发器,执行的sql为:SELECT * FROM {0}TRIGGERS WHERE SCHED_NAME = {1} AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ?OperableTrigger nextTrigger = retrieveTrigger(conn, triggerKey);
- 根据OperableTrigger持有的JobKey查询表QRTZ_JOB_DETAILS中对应的作业信息,并将作业添加到集合acquiredJobKeysForNoConcurrentExec中,代码如下:
以StdJDBCDelegate为例,其selectJobDetail方法执行的sql为:SELECT * FROM {0}JOB_DETAILS WHERE SCHED_NAME = {1} AND JOB_NAME = ? AND JOB_GROUP = ?JobKey jobKey = nextTrigger.getJobKey(); JobDetail job = getDelegate().selectJobDetail(conn, jobKey, getClassLoadHelper()); if (job.isConcurrentExectionDisallowed()) { if (acquiredJobKeysForNoConcurrentExec.contains(jobKey)) { continue; // next trigger } else { acquiredJobKeysForNoConcurrentExec.add(jobKey); } }
- 根据已获得的TriggerKey,将此触发器在表QRTZ_TRIGGERS中的状态从WAITING更新为ACQUIRED,代码如下:
以StdJDBCDelegate为例,其updateTriggerStateFromOtherState方法执行的sql为:UPDATE {0}TRIGGERS SET TRIGGER_STATE = ? WHERE SCHED_NAME = {1} AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ? AND TRIGGER_STATE = ?int rowsUpdated = getDelegate().updateTriggerStateFromOtherState(conn, triggerKey, STATE_ACQUIRED, STATE_WAITING);
- 给OperableTrigger设置实例ID,然后将已触发的触发器插入表QRTZ_FIRED_TRIGGERS,代码如下:
以StdJDBCDelegate为例,其insertFiredTrigger方法执行的sql为:INSERT INTO {0}FIRED_TRIGGERS (SCHED_NAME, ENTRY_ID, TRIGGER_NAME, TRIGGER_GROUP, INSTANCE_NAME, FIRED_TIME, SCHED_TIME, STATE, JOB_NAME, JOB_GROUP, IS_NONCONCURRENT, REQUESTS_RECOVERY, PRIORITY) VALUES({1}, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)nextTrigger.setFireInstanceId(getFiredTriggerRecordId()); getDelegate().insertFiredTrigger(conn, nextTrigger, STATE_ACQUIRED, null);
- 将OperableTrigger添加到acquiredTriggers,代码如下:
acquiredTriggers.add(nextTrigger);
触发触发器
在获取触发器后,下下来就是要触发这些触发器(见代码清单5),可以看到调用了JobStore的triggersFired方法。代码清单5
// set triggers to 'executing'
List<TriggerFiredResult> bndles = new ArrayList<TriggerFiredResult>();
boolean goAhead = true;
synchronized(sigLock) {
goAhead = !halted.get();
}
if(goAhead) {
try {
List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);
if(res != null)
bndles = res;
} catch (SchedulerException se) {
//省略异常信息
}
}
以JobStore的实现类LocalDataSourceJobStore来具体看看triggersFired方法的执行内容。LocalDataSourceJobStore继承了父类JobStoreSupport的triggersFired方法(见代码清单6),此方法用于触发触发器。
代码清单6
public List<TriggerFiredResult> triggersFired(final List<OperableTrigger> triggers) throws JobPersistenceException {
return executeInNonManagedTXLock(LOCK_TRIGGER_ACCESS,
new TransactionCallback<List<TriggerFiredResult>>() {
public List<TriggerFiredResult> execute(Connection conn) throws JobPersistenceException {
List<TriggerFiredResult> results = new ArrayList<TriggerFiredResult>();
TriggerFiredResult result;
for (OperableTrigger trigger : triggers) {
try {
TriggerFiredBundle bundle = triggerFired(conn, trigger);
result = new TriggerFiredResult(bundle);
} catch (JobPersistenceException jpe) {
result = new TriggerFiredResult(jpe);
} catch(RuntimeException re) {
result = new TriggerFiredResult(re);
}
results.add(result);
}
return results;
}
},
new TransactionValidator<List<TriggerFiredResult>>() {
@Override
public Boolean validate(Connection conn, List<TriggerFiredResult> result) throws JobPersistenceException {
try {
List<FiredTriggerRecord> acquired = getDelegate().selectInstancesFiredTriggerRecords(conn, getInstanceId());
Set<String> executingTriggers = new HashSet<String>();
for (FiredTriggerRecord ft : acquired) {
if (STATE_EXECUTING.equals(ft.getFireInstanceState())) {
executingTriggers.add(ft.getFireInstanceId());
}
}
for (TriggerFiredResult tr : result) {
if (tr.getTriggerFiredBundle() != null && executingTriggers.contains(tr.getTriggerFiredBundle().getTrigger().getFireInstanceId())) {
return true;
}
}
return false;
} catch (SQLException e) {
throw new JobPersistenceException("error validating trigger acquisition", e);
}
}
});
}
可以看到triggersFired方法也调用了executeInNonManagedTXLock方法,我们根据前面的分析,知道最终实际会回调新的TransactionCallback匿名类的execute方法,可以看到其主要执行逻辑无非循环triggers列表,并且调用triggerFired方法获取TriggerFiredBundle。triggerFired的执行步骤如下:
- 获取表QRTZ_TRIGGERS中的触发器状态,代码如下:
以StdJDBCDelegate为例,其selectTriggerState方法的执行sql为:try { // if trigger was deleted, state will be STATE_DELETED String state = getDelegate().selectTriggerState(conn, trigger.getKey()); if (!state.equals(STATE_ACQUIRED)) { return null; } } catch (SQLException e) { throw new JobPersistenceException("Couldn't select trigger state: " + e.getMessage(), e); }
SELECT TRIGGER_STATE FROM {0}TRIGGERS WHERE SCHED_NAME = {1} AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ?
- 查询触发器对应的作业,代码如下:
这里的retrieveJob方法实际也调用了Delegate的selectJobDetail方法,不再赘述。try { job = retrieveJob(conn, trigger.getJobKey()); if (job == null) { return null; } } catch (JobPersistenceException jpe) { try { getLog().error("Error retrieving job, setting trigger state to ERROR.", jpe); getDelegate().updateTriggerState(conn, trigger.getKey(), STATE_ERROR); } catch (SQLException sqle) { getLog().error("Unable to set trigger state to ERROR.", sqle); } throw jpe; }
- 更新表QRTZ_FIRED_TRIGGERS中此触发器被触发的状态为STATE_EXECUTING,代码如下:
这里实际执行的sql为try { getDelegate().updateFiredTrigger(conn, trigger, STATE_EXECUTING, job); } catch (SQLException e) { throw new JobPersistenceException("Couldn't insert fired trigger: " + e.getMessage(), e); }
UPDATE {0}FIRED_TRIGGERS SET INSTANCE_NAME = ?, FIRED_TIME = ?, SCHED_TIME = ?, STATE = ?, JOB_NAME = ?, JOB_GROUP = ?, IS_NONCONCURRENT = ?, REQUESTS_RECOVERY = ? WHERE SCHED_NAME = {1} AND ENTRY_ID = ?
- 更新触发器被触发的状态,代码如下:
trigger.triggered(cal);
- 如果捕获到DisallowConcurrentExecution,则将处于STATE_WAITING、STATE_ACQUIRED、STATE_PAUSED状态的触发器的状态修改为STATE_BLOCKED,代码如下:
if (job.isConcurrentExectionDisallowed()) { state = STATE_BLOCKED; force = false; try { getDelegate().updateTriggerStatesForJobFromOtherState(conn, job.getKey(), STATE_BLOCKED, STATE_WAITING); getDelegate().updateTriggerStatesForJobFromOtherState(conn, job.getKey(), STATE_BLOCKED, STATE_ACQUIRED); getDelegate().updateTriggerStatesForJobFromOtherState(conn, job.getKey(), STATE_PAUSED_BLOCKED, STATE_PAUSED); } catch (SQLException e) { throw new JobPersistenceException( "Couldn't update states of blocked triggers: " + e.getMessage(), e); } }
- 插入新的触发器,代码如下:
storeTrigger方法首先调用triggerExists用于判断当前触发器是否存在:storeTrigger(conn, trigger, job, true, state, force, false);
以StdJDBCDelegate为例,其triggerExists方法中执行的sql为:SELECT TRIGGER_NAME FROM {0}TRIGGERS WHERE SCHED_NAME = {1} AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ?。然后根据existingTrigger的值插入或者更新表QRTZ_TRIGGERS中触发器的下次触发时间,代码如下:boolean existingTrigger = triggerExists(conn, newTrigger.getKey());
以StdJDBCDelegate为例,其执行的sql为:INSERT INTO {0}TRIGGERS (SCHED_NAME, TRIGGER_NAME, TRIGGER_GROUP, JOB_NAME, JOB_GROUP, DESCRIPTION, NEXT_FIRE_TIME, PREV_FIRE_TIME, TRIGGER_STATE, TRIGGER_TYPE, START_TIME, END_TIME, CALENDAR_NAME, MISFIRE_INSTR, JOB_DATA, PRIORITY) VALUES({1}, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)if (existingTrigger) { getDelegate().updateTrigger(conn, newTrigger, state, job); } else { getDelegate().insertTrigger(conn, newTrigger, state, job); }
- 返回TriggerFiredBundle对象,代码如下:
return new TriggerFiredBundle(job, trigger, cal, trigger.getKey().getGroup() .equals(Scheduler.DEFAULT_RECOVERY_GROUP), new Date(), trigger .getPreviousFireTime(), prevFireTime, trigger.getNextFireTime());
创建作业运行的shell脚本
之后QuartzSchedulerThread会遍历每个TriggerFiredBundle,然后创建作业运行的shell脚本,见代码清单7.
for (int i = 0; i < bndles.size(); i++) { TriggerFiredResult result = bndles.get(i); TriggerFiredBundle bndle = result.getTriggerFiredBundle(); Exception exception = result.getException(); if (exception instanceof RuntimeException) { getLog().error("RuntimeException while firing trigger " + triggers.get(i), exception); qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i)); continue; } // it's possible to get 'null' if the triggers was paused, // blocked, or other similar occurrences that prevent it being // fired at this time... or if the scheduler was shutdown (halted) if (bndle == null) { qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i)); continue; } JobRunShell shell = null; try { shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle); shell.initialize(qs); } catch (SchedulerException se) { qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR); continue; } }
其中最重要的是执行JobRunShellFactory的createJobRunShell方法。以的实现类StdJobRunShellFactory为例,其createJobRunShell方法(见代码清单8)创建作业运行的shell。
public JobRunShell createJobRunShell(TriggerFiredBundle bndle) throws SchedulerException {
return new JobRunShell(scheduler, bndle);
}
作业执行
if (qsRsrcs.getThreadPool().runInThread(shell) == false) {
// this case should never happen, as it is indicative of the
// scheduler being shutdown or a bug in the thread pool or
// a thread pool being used concurrently - which the docs
// say not to do...
getLog().error("ThreadPool.runInThread() return false!");
qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
}
执行作业
runInThread方法的的实现见代码清单10。
代码清单10
public boolean runInThread(Runnable runnable) {
if (runnable == null) {
return false;
}
synchronized (nextRunnableLock) {
handoffPending = true;
// Wait until a worker thread is available
while ((availWorkers.size() < 1) && !isShutdown) {
try {
nextRunnableLock.wait(500);
} catch (InterruptedException ignore) {
}
}
if (!isShutdown) {
WorkerThread wt = (WorkerThread)availWorkers.removeFirst();
busyWorkers.add(wt);
wt.run(runnable);
} else {
// If the thread pool is going down, execute the Runnable
// within a new additional worker thread (no thread from the pool).
WorkerThread wt = new WorkerThread(this, threadGroup,
"WorkerThread-LastJob", prio, isMakeThreadsDaemons(), runnable);
busyWorkers.add(wt);
workers.add(wt);
wt.start();
}
nextRunnableLock.notifyAll();
handoffPending = false;
}
return true;
}
可以看到其中创建了WorkerThread,并启动WorkerThread。WorkerThread的run方法中的主要代码如下:
synchronized(lock) {
while (runnable == null && run.get()) {
lock.wait(500);
}
if (runnable != null) {
ran = true;
runnable.run();
}
}
上面代码中的runnable,实际就是之前创建的JobRunShell。可以看到WorkerThread的run方法实际代理执行了JobRunShell的run方法。JobRunShell的run方法中最重要的是执行了以下代码:
job.execute(jec);
以NativeJob为例,其实现见代码清单11.
public void execute(JobExecutionContext context)
throws JobExecutionException {
JobDataMap data = context.getMergedJobDataMap();
String command = data.getString(PROP_COMMAND);
String parameters = data.getString(PROP_PARAMETERS);
if (parameters == null) {
parameters = "";
}
boolean wait = true;
if(data.containsKey(PROP_WAIT_FOR_PROCESS)) {
wait = data.getBooleanValue(PROP_WAIT_FOR_PROCESS);
}
boolean consumeStreams = false;
if(data.containsKey(PROP_CONSUME_STREAMS)) {
consumeStreams = data.getBooleanValue(PROP_CONSUME_STREAMS);
}
Integer exitCode = this.runNativeCommand(command, parameters, wait, consumeStreams);
context.setResult(exitCode);
}
NativeJob的execute方法中的关键代码即调用了runNativeCommand方法,其实现见代码清单12。我们可以知道Quartz是如何运行shell了。
private Integer runNativeCommand(String command, String parameters, boolean wait, boolean consumeStreams) throws JobExecutionException {
String[] cmd;
String[] args = new String[2];
Integer result = null;
args[0] = command;
args[1] = parameters;
try {
//with this variable will be done the swithcing
String osName = System.getProperty("os.name");
// specific for Windows
if (osName.startsWith("Windows")) {
cmd = new String[args.length + 2];
if (osName.equals("Windows 95")) { // windows 95 only
cmd[0] = "command.com";
} else {
cmd[0] = "cmd.exe";
}
cmd[1] = "/C";
System.arraycopy(args, 0, cmd, 2, args.length);
} else if (osName.equals("Linux")) {
cmd = new String[3];
cmd[0] = "/bin/sh";
cmd[1] = "-c";
cmd[2] = args[0] + " " + args[1];
} else { // try this...
cmd = args;
}
Runtime rt = Runtime.getRuntime();
// Executes the command
getLog().info("About to run " + cmd[0] + " " + cmd[1] + " " + (cmd.length>2 ? cmd[2] : "") + " ...");
Process proc = rt.exec(cmd);
// Consumes the stdout from the process
StreamConsumer stdoutConsumer = new StreamConsumer(proc.getInputStream(), "stdout");
// Consumes the stderr from the process
if(consumeStreams) {
StreamConsumer stderrConsumer = new StreamConsumer(proc.getErrorStream(), "stderr");
stdoutConsumer.start();
stderrConsumer.start();
}
if(wait) {
result = proc.waitFor();
}
// any error message?
} catch (Throwable x) {
throw new JobExecutionException("Error launching native command: ", x, false);
}
return result;
}
作业执行完成后的处理
在QuartzSchedulerThread执行完作业后还会进行一些后续处理,见代码清单9。以StdJDBCDelegate为例,其triggeredJobComplete方法的实现见代码清单13.
代码清单13
public void triggeredJobComplete(final OperableTrigger trigger,
final JobDetail jobDetail, final CompletedExecutionInstruction triggerInstCode) {
retryExecuteInNonManagedTXLock(
LOCK_TRIGGER_ACCESS,
new VoidTransactionCallback() {
public void executeVoid(Connection conn) throws JobPersistenceException {
triggeredJobComplete(conn, trigger, jobDetail,triggerInstCode);
}
});
}
可以看到其中调用了retryExecuteInNonManagedTXLock方法,其执行逻辑和executeInNonManagedTXLock非常相似,最终回调了VoidTransactionCallback匿名类的execute方法。triggeredJobComplete方法将做一些最终的工作:如清除QRTZ_FIRED_TRIGGERS表中触发器触发的实例,更新触发器的完成或者错误状态等。
小结
经过以上分析,对Quartz如何定时调度的原理有了较深入的了解。不过,阅读Quartz源码相对不是很轻松的过程。因为相比较Tomcat、Spark的源码,其设计略感繁重,逻辑严重耦合与数据库,注释也没有前两者那么丰富。后记:个人总结整理的《深入理解Spark:核心思想与源码分析》一书现在已经正式出版上市,目前京东、当当、天猫等网站均有销售,欢迎感兴趣的同学购买。
京东:http://item.jd.com/11846120.html
当当:http://product.dangdang.com/23838168.html