深入浅出定时任务调度:实现与优化

基本概念

定时任务主要是指在一定的时间间隔或指定的时间点触发执行特定代码逻辑的功能。它在实际开发中应用广泛,比如日志清理、数据备份、定时发送通知等。

定时任务的核心要素有三个:

  • 任务(Task):需要执行的代码逻辑。
  • 时间间隔/触发条件:指定任务执行的时间点或周期。
  • 调度器(Scheduler):负责管理任务的触发与调度。

本文将结合代码,深入理解定时任务的这三大核心要素。

核心要素一:任务与触发条件

我们以一个生活中简单的场景为例:半个小时后要吃饭了。

定任务三要素:

  • 任务:吃饭
  • 触发条件:半个小时后
  • 调度器:暂无

为了演示方便,将触发条件简化为 3秒后 执行。以下是一个简单的 Java 示例代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66

public class Person {

private String name;
//省略getter setter和构造方法
}

public class Task{

private Person person;

private Runnable task;

private int delay;

//省略getter setter和构造方法

public void execute(){
LogUtil.log(person.getName()+"正在执行任务");
task.run();
LogUtil.log(person.getName()+"任务执行完毕");
}

}

public class SimpleTaskDemo {
/**
* 添加、执行定时任务
*
* @param task 任务
* @param delay 延迟时间(单位 s)
*/
public static void submit(Task task) {
LogUtil.log("添加任务");
// 延迟delay秒执行任务
sleepSeconds(task.getDelay());
// 执行任务
task.execute();
}

public static void main(String[] args) {
//要素一:任务-->吃饭
Runnable eat = new Runnable() {
@Override
public void run() {
LogUtil.log("吃饭中~~~~~");
sleepSeconds(3);
}
};
Person person = new Person("张三");
//要素二:时间间隔-->3s
int delay = 3;
//张三3秒后开始吃饭的定时任务
Task zhangSanTask = new Task(person, eat);
submit(zhangSanTask,3);
}

private static void sleepSeconds(int seconds) {
try {
TimeUnit.SECONDS.sleep(seconds);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

}

执行 SimpleTaskDemo.main 方法,命令行打印结果为:

1
2
3
4
2025-01-23 15:59:42:添加任务
2025-01-23 15:59:45:张三正在执行任务
2025-01-23 15:59:45:吃饭中~~~~~
2025-01-23 15:59:48:张三任务执行完毕

上述代码通过 submit 方法延迟指定时间后执行任务,实现了一个简单的定时任务功能。

核心要素二:调度器

上面的实现有一个明显问题:阻塞提交任务的线程,且无法同时执行多个任务
例如:张三和李四半小时后同时吃饭,当前实现无法满足要求,因为李四必须等待张三完成任务后才能开始。在现实生活中,李四只能吃剩饭了。

为了解决这个问题,需要引入定时任务的核心要素——调度器(Scheduler)

调度器的功能

  • 管理任务触发与调度。
  • 避免阻塞提交任务的线程。
  • 支持任务的并发执行。

以下是一个简单的调度器实现: TaskScheduler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
import util.LogUtil;  

import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class TaskScheduler {
//执行任务的线程池
private ExecutorService workers = Executors.newFixedThreadPool(10);
//任务队列
private List<TaskWrapper> tasks = new LinkedList<>();

private class TaskWrapper implements Runnable {
//任务
Task task;
//任务执行的时间
long executeTime;

public TaskWrapper(Task task) {
this.task = task;
this.executeTime = System.currentTimeMillis() + task.getDelay() * 1000;
}

@Override
public void run() {
task.execute();
}
}

public TaskScheduler() {
new Thread(() -> {
schedule();
}).start();
}

//提交任务
public void submit(Task task) {
synchronized (tasks) {
tasks.add(new TaskWrapper(task));
// 排序任务队列,把最先要执行的放在队列前面
Collections.sort(tasks, (o1, o2) -> (int) (o1.executeTime - o2.executeTime));
tasks.notify();
}
}

public void schedule() {
while (true) {
synchronized (tasks) {
while (tasks.isEmpty()) {//没有任务,等待
try {
LogUtil.log("无任务,等待唤醒");
tasks.wait();
LogUtil.log("有任务,被唤醒");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//取第一个任务,判断是否已经过期,过期则执行,否则等待
final TaskWrapper taskWrapper = tasks.get(0);
if (taskWrapper.executeTime <= System.currentTimeMillis()) {
LogUtil.log("任务放入 workers 执行:" + taskWrapper.task.getPerson().getName());
workers.execute(taskWrapper);
//删除已经执行的任务
tasks.remove(taskWrapper);
} else {
long waiteTime = taskWrapper.executeTime - System.currentTimeMillis();
LogUtil.log("任务未到执行时间,等待 " + waiteTime + " ms");
try {
//等待执行时间,释放锁
tasks.wait(waiteTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}

}

调度器的使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
SimpleTaskDemo {  

public static void main(String[] args) {
Runnable eat = new Runnable() {
@Override
public void run() {
LogUtil.log("吃饭中~~~~~");
sleepSeconds(3);
}
};
TaskScheduler taskScheduler = new TaskScheduler();
//张三3秒后开始吃饭
Person person = new Person("张三");
int delay = 3;
Task zhangSanTask = new Task(person, eat, delay);
taskScheduler.submit(zhangSanTask);
//张三3秒后开始吃饭
Person lisi = new Person("李四");
Task lisiTask = new Task(lisi, eat, delay);
taskScheduler.submit(lisiTask);
}
}

//输出结果:
2025-01-24 10:39:03 534:无任务,等待唤醒
2025-01-24 10:39:03 538:有任务,被唤醒
2025-01-24 10:39:03 538:任务未到执行时间,等待 2998 ms
2025-01-24 10:39:06 537:任务放入 workers 执行:张三
2025-01-24 10:39:06 538:任务放入 workers 执行:李四
2025-01-24 10:39:06 539:张三正在执行任务
2025-01-24 10:39:06 539:无任务,等待唤醒
2025-01-24 10:39:06 539:吃饭中~~~~~
2025-01-24 10:39:06 539:李四正在执行任务
2025-01-24 10:39:06 539:吃饭中~~~~~
2025-01-24 10:39:09 540:张三任务执行完毕
2025-01-24 10:39:09 540:李四任务执行完毕

从输出结果可以看出,张三和李四同时在 3s 后,开始了吃饭。

调度器代码分析

我们从调度器的功能角度,分析下调度器代码的逻辑。

  1. 要能存储任务:
    1. List<TaskWrapper> tasks = new LinkedList<>();task 列表实现了存储任务的功能。
    2. submit 方法:使用 synchronizedtask 加锁,防止并发操作队列问题。接收一个任务,设置任务执行的时间,封装成 taskWrapper 然后再放入 task 队列中。同时对 task 队列按照执行时间进行排序,最先执行的放在队列第一位。最后执行 task.notify 唤醒 schedule 方法;
  2. 要能触发任务:TaskScheduler 构造方法内部,使用一个线程,执行了 schedule() 方法,该方法内部是一个死循环,执行逻辑如下:
    1. task 对象加锁,防止并发操作问题;
    2. 如果 task 列表是空的,说明当前没有任务,执行 task.wait 方法,释放锁,并使当前线程进入等待状态(会释放 CPU 资源),直到其它线程调用 task.notify 方法唤醒。
    3. 一旦其它线程调用 submit 方法,则该线程被唤醒,取 task 队列的第一个任务,判断是否达到了任务的执行时间,如果达到了,则把任务放入 worker 线程池异步执行任务,同时再 task 队列中删除该任务。如果没达到,则调用 task.wait(time) 方法,休眠一段时间后,再次执行上面步骤.
  3. 任务可以同时执行:使用线程池 workers 可以对任务进行并发执行。

调度器总结

通过以上实现,我们了解了定时任务的核心要素,并通过自定义调度器解决了简单场景中的问题。然而,当前实现还不支持循环任务(如每天早上7点吃饭)。大家可以思考如何扩展调度器以支持循环任务。

Timer 分析

TaskScheduler 是我们自己实现的一个简单任务调度器,而 JDK 中的 java.util.Timer 类也提供了任务调度的功能。下面将简单分析 Timer 的实现,并与我们自定义的调度器进行对比,揭示其中的优缺点。

核心代码分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
public class Timer{
//任务队列:一个优先级队列,按照执行时间排序的小顶堆结构
private final TaskQueue queue = new TaskQueue();
//类似TaskScheduler构造方法中的new Thread(),
private final TimerThread thread = new TimerThread(queue);

public Timer(String name) {
thread.setName(name);
//启动线程,循环获取队列任务执行
thread.start();
}
//添加调度任务,再delay时间(ms)后执行
public void schedule(TimerTask task, long delay) {
if (delay < 0)
throw new IllegalArgumentException("Negative delay.");
// System.currentTimeMillis()+delay 是任务执行的时间,0表示只执行一次
sched(task, System.currentTimeMillis()+delay, 0);
}

private void sched(TimerTask task, long time, long period) {
if (time < 0)
throw new IllegalArgumentException("Illegal execution time.");

if (Math.abs(period) > (Long.MAX_VALUE >> 1))
period >>= 1;
//对queue加锁
synchronized(queue) {
if (!thread.newTasksMayBeScheduled)
throw new IllegalStateException("Timer already cancelled.");

synchronized(task.lock) {
if (task.state != TimerTask.VIRGIN)
throw new IllegalStateException(
"Task already scheduled or cancelled");
task.nextExecutionTime = time;
task.period = period;
task.state = TimerTask.SCHEDULED;
}
//任务加入队列中
queue.add(task);
//如果当前任务=堆顶元素,通知准备执行任务
if (queue.getMin() == task)
queue.notify();
}
}

class TimerThread extends Thread {
boolean newTasksMayBeScheduled = true;

private TaskQueue queue;

TimerThread(TaskQueue queue) {
this.queue = queue;
}

public void run() {
try {
mainLoop();
} finally {
// Someone killed this Thread, behave as if Timer cancelled
synchronized(queue) {
newTasksMayBeScheduled = false;
queue.clear(); // Eliminate obsolete references
}
}
}

private void mainLoop() {
while (true) {
try {
TimerTask task;
boolean taskFired;
//对队列queue进行加锁
synchronized(queue) {
//如果队列是空,wait 等待
while (queue.isEmpty() && newTasksMayBeScheduled)
queue.wait();
if (queue.isEmpty())
break; //再次判断队列如果是空,退出循环
//任务队列不为空,取堆顶任务
task = queue.getMin();
//对任务进行加锁
synchronized(task.lock) {
//如果任务是取消状态,则移除该任务,队列重新堆化。返回while循环处再次执行流程
if (task.state == TimerTask.CANCELLED) {
queue.removeMin();
continue;
}
currentTime = System.currentTimeMillis();
executionTime = task.nextExecutionTime;
//判断是否到达任务执行时间
if (taskFired = (executionTime<=currentTime)) {
//判断任务是否还需要执行,这里任务可能是周期性任务,需要执行多次的
if (task.period == 0) { // Non-repeating, remove
//不需要再执行,则移除任务,重新堆化
queue.removeMin();
task.state = TimerTask.EXECUTED;
} else { // Repeating task, reschedule
//设置任务下一次执行时间,同时重新堆化
queue.rescheduleMin(
task.period<0 ? currentTime - task.period
: executionTime + task.period);
}
}
}
//任务还没到执行时间,wait 等待一段时间
if (!taskFired) // Task hasn't yet fired; wait
queue.wait(executionTime - currentTime);
}
//任务可以执行
if (taskFired) // Task fired; run it, holding no locks
task.run();
} catch(InterruptedException e) {
} }
}
}
}

class TaskQueue {
private TimerTask[] queue = new TimerTask[128];

void add(TimerTask task) {
// Grow backing store if necessary
if (size + 1 == queue.length)
queue = Arrays.copyOf(queue, 2*queue.length);

queue[++size] = task;
fixUp(size);
}

TimerTask getMin() {
return queue[1];
}

void removeMin() {
queue[1] = queue[size];
queue[size--] = null; // Drop extra reference to prevent memory leak
fixDown(1);
}

void rescheduleMin(long newTime) {
queue[1].nextExecutionTime = newTime;
fixDown(1);
}
}

public abstract class TimerTask implements Runnable {
public abstract void run();
//是指任务的执行间隔时间,等于0表示不需要重复执行
long period = 0;
//......
}

Timer 的核心实现与我们自己的 TaskScheduler 类似,但有以下几个特点:

  1. 单线程执行Timer 是由单线程 (TimerThread) 执行任务的,这意味着它无法并行处理多个任务。当任务执行时间较长时,可能会导致任务堆积,从而影响调度的及时性。
  2. 周期性任务支持TimerTask 类有一个 period 字段,用于控制任务是否是周期性执行的任务。period=0 时表示任务只执行一次,否则任务会按照周期不断重复。
  3. 优先队列Timer 使用小顶堆结构作为任务队列,确保最早执行的任务排在队列前面。通过这种方式,任务按顺序执行,避免了无序执行的情况。
  4. 性能优化:只有任务处于队列堆顶时,才会触发 notify 通知任务执行,避免了无谓的资源消耗。

如果要并行执行调度器,可以用 java.util.ScheduledThreadPoolExecutor,使用了 java 线程池来执行任务。

ScheduledThreadPoolExecutor 分析

ScheduledThreadPoolExecutor 是 Java 中用于执行延迟或周期性任务的工具类,基于线程池设计,相比 Timer 提供了更多的灵活性和性能优势。它通过 ThreadPoolExecutor 来管理工作线程,支持并发执行任务,具有更强的容错能力。

主要特点

  1. 线程池执行:与 Timer 的单线程模式不同,ScheduledThreadPoolExecutor 使用线程池(ThreadPoolExecutor)来并发执行任务,因此多个任务可以同时执行,避免了单线程模型可能带来的瓶颈。
  2. 延迟队列ScheduledThreadPoolExecutor 使用 DelayedWorkQueue 来存储任务,并根据任务的执行时间进行排序。任务只有到期后才能出队执行。
  3. 灵活的调度策略:它支持周期性任务,并提供了多种调度策略(例如固定频率和固定延迟)。对于周期性任务,它会在每次执行后重新计算下一次执行的时间。
  4. 任务取消和错误处理ScheduledThreadPoolExecutor 支持任务的取消,并能有效处理任务执行中的异常,确保系统稳定性。
  5. 高并发支持:通过线程池,它能够处理高并发场景,并且能更好地分配计算资源,避免了 Timer 在任务量较大时可能发生的阻塞和延迟。

核心代码分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
public class ScheduledThreadPoolExecutor  
extends ThreadPoolExecutor
implements ScheduledExecutorService {

//提交任务的入口方法
public ScheduledFuture<?> schedule(Runnable command,
long delay,
TimeUnit unit) {
//任务包装成 ScheduledFutureTask
RunnableScheduledFuture<?> t = new ScheduledFutureTask<Void>(command, null, triggerTime(delay, unit));
delayedExecute(t);
return t;
}

private void delayedExecute(RunnableScheduledFuture<?> task) {
//放入队列中
super.getQueue().add(task);
}

//任务包装类
private class ScheduledFutureTask<V>
extends FutureTask<V> implements RunnableScheduledFuture<V> {

private long time;

ScheduledFutureTask(Runnable r, V result, long ns) {
super(r, result);
this.time = ns;
this.period = 0;
this.sequenceNumber = sequencer.getAndIncrement();
}
//返回当前时间和执行时间的时间差
public long getDelay(TimeUnit unit) {
return unit.convert(time - now(), NANOSECONDS);
}
//执行的方法
public void run() {
boolean periodic = isPeriodic();
if (!canRunInCurrentRunState(periodic))
cancel(false);
else if (!periodic)
ScheduledFutureTask.super.run(); //非周期性任务,直接运行
else if (ScheduledFutureTask.super.runAndReset()) {
//周期性任务,设置下一个执行时间
setNextRunTime();
//重新把任务放入队列
reExecutePeriodic(outerTask);
}
}
}
//周期性任务重新放入队列
void reExecutePeriodic(RunnableScheduledFuture<?> task) {
if (canRunInCurrentRunState(true)) {
//周期性任务重新放入队列中
super.getQueue().add(task);
if (!canRunInCurrentRunState(true) && remove(task))
task.cancel(false);
else
ensurePrestart();
}
}
//存储任务的核心类DelayedWorkQueue
static class DelayedWorkQueue extends AbstractQueue<Runnable>
implements BlockingQueue<Runnable> {

private final Condition available = lock.newCondition();

//获取任务的方法,没有要执行的任务会阻塞
public RunnableScheduledFuture<?> take() throws InterruptedException {
final ReentrantLock lock = this.lock;
//获取锁
lock.lockInterruptibly();
try {
for (;;) {
RunnableScheduledFuture<?> first = queue[0];
if (first == null)
available.await();//没有任务await等待
else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
return finishPoll(first);//返回到期可以执行的任务
first = null;
//任务未到期,await 一段时间
available.awaitNanos(delay);
}
}
}
} finally {
lock.unlock();//释放锁
}
}

private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
int s = --size;
RunnableScheduledFuture<?> x = queue[s];
queue[s] = null;
if (s != 0)
siftDown(0, x);
setIndex(f, -1);
return f;
}
}

}

线程池中线程获取任务执行的核心代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public class ThreadPoolExecutor extends AbstractExecutorService {

private final class Worker implements Runnable {
public void run() {
runWorker(this);
}
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
boolean completedAbruptly = true;
try {
//循环调用getTask()方法从队列中获取任务执行
while (task != null || (task = getTask()) != null) {
try {
beforeExecute(wt, task);
try {
//执行任务,这里的task时ScheduledFutureTask
task.run();
afterExecute(task, null);
} catch (Throwable ex) {
afterExecute(task, ex);
throw ex;
}
} finally {
}
}
} finally {
processWorkerExit(w, completedAbruptly);
}
}
//调用poll或者take方法从队列中获取任务,
//此处的 workQueue 就是 DelayedWorkQueue
private Runnable getTask() {
for (;;) {
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null) {
return r;
}
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
}

}

结合上述源码,ScheduledThreadPoolExecutor 的任务获取和执行流程可以概括为以下步骤:

  • 任务提交:任务通过 schedulescheduleAtFixedRatescheduleWithFixedDelay 方法提交,任务会被封装为 ScheduledFutureTask 并加入到 DelayedWorkQueue 中。
  • 任务排序:任务根据执行时间排序,最早到期的任务排在队首,准备执行。
  • 线程获取任务:工作线程从队列中获取任务执行。如果任务未到期,工作线程会等待,直到有任务到期。
  • 任务执行:取出的任务交由线程池中的线程执行。周期性任务在执行完成后会重新计算下一次执行时间,并加入队列重新调度

Timer和ScheduledThreadPoolExecutor对比

特性 Timer ScheduledThreadPoolExecutor
线程模型 单线程执行 多线程执行(基于线程池)
任务并行 不支持 支持并行执行多个任务
队列实现 小顶堆(TaskQueue 延迟队列(DelayedWorkQueue
任务调度 通过 TimerTaskperiod 字段来处理周期性任务 支持周期性任务,通过 ScheduledFutureTask 来处理
任务取消与错误处理 不支持任务取消、异常处理 支持任务取消、异常捕获和处理
灵活性 较低,只有基本的任务调度功能 高,支持更多的调度策略,能够调整任务执行间隔
使用复杂性 简单,适合少量任务调度 复杂,适合大规模任务调度和并发执行
  • 如果你的应用只需要简单的任务调度,并且任务量较少,Timer 足以满足需求。但在任务量增加,尤其是当任务执行时间较长时,Timer 会受到性能限制。
  • 如果需要更强大的调度功能,支持并发执行、复杂的调度策略、任务取消和容错,ScheduledThreadPoolExecutor 是一个更好的选择。它基于线程池的设计,能够更高效地处理高并发和长时间运行的任务。

总结

定时任务在现代软件开发中扮演着重要的角色,广泛应用于日志清理、数据备份、定时通知等场景。通过本文的学习,我们深入探讨了定时任务的三个核心要素:

  1. 任务与触发条件:定时任务的核心是任务本身及其触发条件。我们以一个简单的吃饭任务为例,展示了如何通过指定时间延迟来触发任务执行。
  2. 调度器:调度器是定时任务的关键组件,它负责管理任务的执行与调度。通过引入调度器,我们实现了任务的并发执行,并解决了原始实现中的线程阻塞问题。调度器利用线程池来支持任务并发执行,避免了任务执行过程中的瓶颈。
  3. 周期性任务与线程池:除了单次任务,定时任务也经常需要支持周期性执行。通过分析 TimerScheduledThreadPoolExecutor 的实现,我们了解了如何通过不同的调度策略来实现周期性任务的执行,以及如何利用线程池增强任务调度的灵活性和性能。

通过本文的深入分析,我们不仅学习了如何手动实现一个简单的定时任务调度器,还对 JDK 提供的 Timer 类与 ScheduledThreadPoolExecutor 进行了对比,进一步加深了对任务调度机制的理解。未来可以根据实际需求,结合自定义实现和现有工具,灵活应对各种复杂的定时任务调度场景。


深入浅出定时任务调度:实现与优化
https://wydpp.com/posts/b829a8b5.html
作者
老段
发布于
2025年1月26日
许可协议