基本概念
定时任务主要是指在一定的时间间隔或指定的时间点触发执行特定代码逻辑的功能。它在实际开发中应用广泛,比如日志清理、数据备份、定时发送通知等。
定时任务的核心要素有三个:
- 任务(Task):需要执行的代码逻辑。
- 时间间隔/触发条件:指定任务执行的时间点或周期。
- 调度器(Scheduler):负责管理任务的触发与调度。
本文将结合代码,深入理解定时任务的这三大核心要素。
核心要素一:任务与触发条件
我们以一个生活中简单的场景为例:半个小时后要吃饭了。
定任务三要素:
为了演示方便,将触发条件简化为 3秒后 执行。以下是一个简单的 Java 示例代码。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
| public class Person { private String name; } public class Task{
private Person person; private Runnable task;
private int delay;
public void execute(){ LogUtil.log(person.getName()+"正在执行任务"); task.run(); LogUtil.log(person.getName()+"任务执行完毕"); } } public class SimpleTaskDemo {
public static void submit(Task task) { LogUtil.log("添加任务"); sleepSeconds(task.getDelay()); task.execute(); } public static void main(String[] args) { Runnable eat = new Runnable() { @Override public void run() { LogUtil.log("吃饭中~~~~~"); sleepSeconds(3); } }; Person person = new Person("张三"); int delay = 3; Task zhangSanTask = new Task(person, eat); submit(zhangSanTask,3); } private static void sleepSeconds(int seconds) { try { TimeUnit.SECONDS.sleep(seconds); } catch (InterruptedException e) { throw new RuntimeException(e); } } }
|
执行 SimpleTaskDemo.main
方法,命令行打印结果为:
1 2 3 4
| 2025-01-23 15:59:42:添加任务 2025-01-23 15:59:45:张三正在执行任务 2025-01-23 15:59:45:吃饭中~~~~~ 2025-01-23 15:59:48:张三任务执行完毕
|
上述代码通过 submit
方法延迟指定时间后执行任务,实现了一个简单的定时任务功能。
核心要素二:调度器
上面的实现有一个明显问题:阻塞提交任务的线程,且无法同时执行多个任务。
例如:张三和李四半小时后同时吃饭,当前实现无法满足要求,因为李四必须等待张三完成任务后才能开始。在现实生活中,李四只能吃剩饭了。
为了解决这个问题,需要引入定时任务的核心要素——调度器(Scheduler)。
调度器的功能
- 管理任务触发与调度。
- 避免阻塞提交任务的线程。
- 支持任务的并发执行。
以下是一个简单的调度器实现: TaskScheduler
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82
| import util.LogUtil; import java.util.Collections; import java.util.LinkedList; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class TaskScheduler { private ExecutorService workers = Executors.newFixedThreadPool(10); private List<TaskWrapper> tasks = new LinkedList<>(); private class TaskWrapper implements Runnable { Task task; long executeTime; public TaskWrapper(Task task) { this.task = task; this.executeTime = System.currentTimeMillis() + task.getDelay() * 1000; } @Override public void run() { task.execute(); } } public TaskScheduler() { new Thread(() -> { schedule(); }).start(); } public void submit(Task task) { synchronized (tasks) { tasks.add(new TaskWrapper(task)); Collections.sort(tasks, (o1, o2) -> (int) (o1.executeTime - o2.executeTime)); tasks.notify(); } } public void schedule() { while (true) { synchronized (tasks) { while (tasks.isEmpty()) { try { LogUtil.log("无任务,等待唤醒"); tasks.wait(); LogUtil.log("有任务,被唤醒"); } catch (InterruptedException e) { e.printStackTrace(); } } final TaskWrapper taskWrapper = tasks.get(0); if (taskWrapper.executeTime <= System.currentTimeMillis()) { LogUtil.log("任务放入 workers 执行:" + taskWrapper.task.getPerson().getName()); workers.execute(taskWrapper); tasks.remove(taskWrapper); } else { long waiteTime = taskWrapper.executeTime - System.currentTimeMillis(); LogUtil.log("任务未到执行时间,等待 " + waiteTime + " ms"); try { tasks.wait(waiteTime); } catch (InterruptedException e) { e.printStackTrace(); } } } } } }
|
调度器的使用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| SimpleTaskDemo {
public static void main(String[] args) { Runnable eat = new Runnable() { @Override public void run() { LogUtil.log("吃饭中~~~~~"); sleepSeconds(3); } }; TaskScheduler taskScheduler = new TaskScheduler(); Person person = new Person("张三"); int delay = 3; Task zhangSanTask = new Task(person, eat, delay); taskScheduler.submit(zhangSanTask); Person lisi = new Person("李四"); Task lisiTask = new Task(lisi, eat, delay); taskScheduler.submit(lisiTask); } }
2025-01-24 10:39:03 534:无任务,等待唤醒 2025-01-24 10:39:03 538:有任务,被唤醒 2025-01-24 10:39:03 538:任务未到执行时间,等待 2998 ms 2025-01-24 10:39:06 537:任务放入 workers 执行:张三 2025-01-24 10:39:06 538:任务放入 workers 执行:李四 2025-01-24 10:39:06 539:张三正在执行任务 2025-01-24 10:39:06 539:无任务,等待唤醒 2025-01-24 10:39:06 539:吃饭中~~~~~ 2025-01-24 10:39:06 539:李四正在执行任务 2025-01-24 10:39:06 539:吃饭中~~~~~ 2025-01-24 10:39:09 540:张三任务执行完毕 2025-01-24 10:39:09 540:李四任务执行完毕
|
从输出结果可以看出,张三和李四同时在 3s 后,开始了吃饭。
调度器代码分析
我们从调度器的功能角度,分析下调度器代码的逻辑。
- 要能存储任务:
List<TaskWrapper> tasks = new LinkedList<>();
: task
列表实现了存储任务的功能。
submit
方法:使用 synchronized
对 task
加锁,防止并发操作队列问题。接收一个任务,设置任务执行的时间,封装成 taskWrapper
然后再放入 task
队列中。同时对 task
队列按照执行时间进行排序,最先执行的放在队列第一位。最后执行 task.notify
唤醒 schedule
方法;
- 要能触发任务:
TaskScheduler
构造方法内部,使用一个线程,执行了 schedule()
方法,该方法内部是一个死循环,执行逻辑如下:
- 对
task
对象加锁,防止并发操作问题;
- 如果
task
列表是空的,说明当前没有任务,执行 task.wait
方法,释放锁,并使当前线程进入等待状态(会释放 CPU 资源),直到其它线程调用 task.notify
方法唤醒。
- 一旦其它线程调用
submit
方法,则该线程被唤醒,取 task
队列的第一个任务,判断是否达到了任务的执行时间,如果达到了,则把任务放入 worker 线程池异步执行任务,同时再 task
队列中删除该任务。如果没达到,则调用 task.wait(time)
方法,休眠一段时间后,再次执行上面步骤.
- 任务可以同时执行:使用线程池
workers
可以对任务进行并发执行。
调度器总结
通过以上实现,我们了解了定时任务的核心要素,并通过自定义调度器解决了简单场景中的问题。然而,当前实现还不支持循环任务(如每天早上7点吃饭)。大家可以思考如何扩展调度器以支持循环任务。
Timer 分析
TaskScheduler
是我们自己实现的一个简单任务调度器,而 JDK 中的 java.util.Timer
类也提供了任务调度的功能。下面将简单分析 Timer
的实现,并与我们自定义的调度器进行对比,揭示其中的优缺点。
核心代码分析
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152
| public class Timer{ private final TaskQueue queue = new TaskQueue(); private final TimerThread thread = new TimerThread(queue); public Timer(String name) { thread.setName(name); thread.start(); } public void schedule(TimerTask task, long delay) { if (delay < 0) throw new IllegalArgumentException("Negative delay."); sched(task, System.currentTimeMillis()+delay, 0); }
private void sched(TimerTask task, long time, long period) { if (time < 0) throw new IllegalArgumentException("Illegal execution time."); if (Math.abs(period) > (Long.MAX_VALUE >> 1)) period >>= 1; synchronized(queue) { if (!thread.newTasksMayBeScheduled) throw new IllegalStateException("Timer already cancelled."); synchronized(task.lock) { if (task.state != TimerTask.VIRGIN) throw new IllegalStateException( "Task already scheduled or cancelled"); task.nextExecutionTime = time; task.period = period; task.state = TimerTask.SCHEDULED; } queue.add(task); if (queue.getMin() == task) queue.notify(); } }
class TimerThread extends Thread { boolean newTasksMayBeScheduled = true; private TaskQueue queue; TimerThread(TaskQueue queue) { this.queue = queue; } public void run() { try { mainLoop(); } finally { synchronized(queue) { newTasksMayBeScheduled = false; queue.clear(); } } } private void mainLoop() { while (true) { try { TimerTask task; boolean taskFired; synchronized(queue) { while (queue.isEmpty() && newTasksMayBeScheduled) queue.wait(); if (queue.isEmpty()) break; task = queue.getMin(); synchronized(task.lock) { if (task.state == TimerTask.CANCELLED) { queue.removeMin(); continue; } currentTime = System.currentTimeMillis(); executionTime = task.nextExecutionTime; if (taskFired = (executionTime<=currentTime)) { if (task.period == 0) { queue.removeMin(); task.state = TimerTask.EXECUTED; } else { queue.rescheduleMin( task.period<0 ? currentTime - task.period : executionTime + task.period); } } } if (!taskFired) queue.wait(executionTime - currentTime); } if (taskFired) task.run(); } catch(InterruptedException e) { } } } } }
class TaskQueue { private TimerTask[] queue = new TimerTask[128]; void add(TimerTask task) { if (size + 1 == queue.length) queue = Arrays.copyOf(queue, 2*queue.length); queue[++size] = task; fixUp(size); } TimerTask getMin() { return queue[1]; } void removeMin() { queue[1] = queue[size]; queue[size--] = null; fixDown(1); } void rescheduleMin(long newTime) { queue[1].nextExecutionTime = newTime; fixDown(1); } }
public abstract class TimerTask implements Runnable { public abstract void run(); long period = 0; }
|
Timer
的核心实现与我们自己的 TaskScheduler
类似,但有以下几个特点:
- 单线程执行:
Timer
是由单线程 (TimerThread
) 执行任务的,这意味着它无法并行处理多个任务。当任务执行时间较长时,可能会导致任务堆积,从而影响调度的及时性。
- 周期性任务支持:
TimerTask
类有一个 period
字段,用于控制任务是否是周期性执行的任务。period=0
时表示任务只执行一次,否则任务会按照周期不断重复。
- 优先队列:
Timer
使用小顶堆结构作为任务队列,确保最早执行的任务排在队列前面。通过这种方式,任务按顺序执行,避免了无序执行的情况。
- 性能优化:只有任务处于队列堆顶时,才会触发
notify
通知任务执行,避免了无谓的资源消耗。
如果要并行执行调度器,可以用 java.util.ScheduledThreadPoolExecutor
,使用了 java 线程池来执行任务。
ScheduledThreadPoolExecutor 分析
ScheduledThreadPoolExecutor
是 Java 中用于执行延迟或周期性任务的工具类,基于线程池设计,相比 Timer
提供了更多的灵活性和性能优势。它通过 ThreadPoolExecutor
来管理工作线程,支持并发执行任务,具有更强的容错能力。
主要特点
- 线程池执行:与
Timer
的单线程模式不同,ScheduledThreadPoolExecutor
使用线程池(ThreadPoolExecutor
)来并发执行任务,因此多个任务可以同时执行,避免了单线程模型可能带来的瓶颈。
- 延迟队列:
ScheduledThreadPoolExecutor
使用 DelayedWorkQueue
来存储任务,并根据任务的执行时间进行排序。任务只有到期后才能出队执行。
- 灵活的调度策略:它支持周期性任务,并提供了多种调度策略(例如固定频率和固定延迟)。对于周期性任务,它会在每次执行后重新计算下一次执行的时间。
- 任务取消和错误处理:
ScheduledThreadPoolExecutor
支持任务的取消,并能有效处理任务执行中的异常,确保系统稳定性。
- 高并发支持:通过线程池,它能够处理高并发场景,并且能更好地分配计算资源,避免了
Timer
在任务量较大时可能发生的阻塞和延迟。
核心代码分析
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105
| public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService { public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { RunnableScheduledFuture<?> t = new ScheduledFutureTask<Void>(command, null, triggerTime(delay, unit)); delayedExecute(t); return t; } private void delayedExecute(RunnableScheduledFuture<?> task) { super.getQueue().add(task); } private class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> { private long time; ScheduledFutureTask(Runnable r, V result, long ns) { super(r, result); this.time = ns; this.period = 0; this.sequenceNumber = sequencer.getAndIncrement(); } public long getDelay(TimeUnit unit) { return unit.convert(time - now(), NANOSECONDS); } public void run() { boolean periodic = isPeriodic(); if (!canRunInCurrentRunState(periodic)) cancel(false); else if (!periodic) ScheduledFutureTask.super.run(); else if (ScheduledFutureTask.super.runAndReset()) { setNextRunTime(); reExecutePeriodic(outerTask); } } } void reExecutePeriodic(RunnableScheduledFuture<?> task) { if (canRunInCurrentRunState(true)) { super.getQueue().add(task); if (!canRunInCurrentRunState(true) && remove(task)) task.cancel(false); else ensurePrestart(); } } static class DelayedWorkQueue extends AbstractQueue<Runnable> implements BlockingQueue<Runnable> { private final Condition available = lock.newCondition(); public RunnableScheduledFuture<?> take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { RunnableScheduledFuture<?> first = queue[0]; if (first == null) available.await(); else { long delay = first.getDelay(NANOSECONDS); if (delay <= 0) return finishPoll(first); first = null; available.awaitNanos(delay); } } } } finally { lock.unlock(); } } private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) { int s = --size; RunnableScheduledFuture<?> x = queue[s]; queue[s] = null; if (s != 0) siftDown(0, x); setIndex(f, -1); return f; } }
}
|
线程池中线程获取任务执行的核心代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
| public class ThreadPoolExecutor extends AbstractExecutorService { private final class Worker implements Runnable { public void run() { runWorker(this); } final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { try { beforeExecute(wt, task); try { task.run(); afterExecute(task, null); } catch (Throwable ex) { afterExecute(task, ex); throw ex; } } finally { } } } finally { processWorkerExit(w, completedAbruptly); } } private Runnable getTask() { for (;;) { try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) { return r; } } catch (InterruptedException retry) { timedOut = false; } } } }
}
|
结合上述源码,ScheduledThreadPoolExecutor
的任务获取和执行流程可以概括为以下步骤:
- 任务提交:任务通过
schedule
、scheduleAtFixedRate
或 scheduleWithFixedDelay
方法提交,任务会被封装为 ScheduledFutureTask
并加入到 DelayedWorkQueue
中。
- 任务排序:任务根据执行时间排序,最早到期的任务排在队首,准备执行。
- 线程获取任务:工作线程从队列中获取任务执行。如果任务未到期,工作线程会等待,直到有任务到期。
- 任务执行:取出的任务交由线程池中的线程执行。周期性任务在执行完成后会重新计算下一次执行时间,并加入队列重新调度
Timer和ScheduledThreadPoolExecutor对比
特性 |
Timer |
ScheduledThreadPoolExecutor |
线程模型 |
单线程执行 |
多线程执行(基于线程池) |
任务并行 |
不支持 |
支持并行执行多个任务 |
队列实现 |
小顶堆(TaskQueue ) |
延迟队列(DelayedWorkQueue ) |
任务调度 |
通过 TimerTask 的 period 字段来处理周期性任务 |
支持周期性任务,通过 ScheduledFutureTask 来处理 |
任务取消与错误处理 |
不支持任务取消、异常处理 |
支持任务取消、异常捕获和处理 |
灵活性 |
较低,只有基本的任务调度功能 |
高,支持更多的调度策略,能够调整任务执行间隔 |
使用复杂性 |
简单,适合少量任务调度 |
复杂,适合大规模任务调度和并发执行 |
- 如果你的应用只需要简单的任务调度,并且任务量较少,
Timer
足以满足需求。但在任务量增加,尤其是当任务执行时间较长时,Timer
会受到性能限制。
- 如果需要更强大的调度功能,支持并发执行、复杂的调度策略、任务取消和容错,
ScheduledThreadPoolExecutor
是一个更好的选择。它基于线程池的设计,能够更高效地处理高并发和长时间运行的任务。
总结
定时任务在现代软件开发中扮演着重要的角色,广泛应用于日志清理、数据备份、定时通知等场景。通过本文的学习,我们深入探讨了定时任务的三个核心要素:
- 任务与触发条件:定时任务的核心是任务本身及其触发条件。我们以一个简单的吃饭任务为例,展示了如何通过指定时间延迟来触发任务执行。
- 调度器:调度器是定时任务的关键组件,它负责管理任务的执行与调度。通过引入调度器,我们实现了任务的并发执行,并解决了原始实现中的线程阻塞问题。调度器利用线程池来支持任务并发执行,避免了任务执行过程中的瓶颈。
- 周期性任务与线程池:除了单次任务,定时任务也经常需要支持周期性执行。通过分析
Timer
和 ScheduledThreadPoolExecutor
的实现,我们了解了如何通过不同的调度策略来实现周期性任务的执行,以及如何利用线程池增强任务调度的灵活性和性能。
通过本文的深入分析,我们不仅学习了如何手动实现一个简单的定时任务调度器,还对 JDK 提供的 Timer
类与 ScheduledThreadPoolExecutor
进行了对比,进一步加深了对任务调度机制的理解。未来可以根据实际需求,结合自定义实现和现有工具,灵活应对各种复杂的定时任务调度场景。