1 引言
在高性能网络编程中,定时任务的管理无处不在:心跳检测、请求超时控制、会话清理、任务调度等。这些看似简单的“在 X 秒后执行 Y 操作”的需求,在低延迟、高并发的场景下,却对底层定时器提出了极其苛刻的性能要求。
传统的做法是直接使用 JDK 提供的 Timer 或 ScheduledThreadPoolExecutor。它们简单易用,足以应对常规场景。然而,当海量(例如百万级)的短周期定时任务如潮水般涌来时,这些基于堆结构的定时器在任务调度和取消上的性能开销便会成为系统的瓶颈,导致吞吐量下降和延迟飙升。
那么,Netty 等众多高性能框架(如 Kafka、Akka)是如何优雅地解决这一问题的呢?答案就是一种名为“时间轮”(Time Wheel)的巧妙算法。它并非一种新鲜事物,却在现代异步编程中焕发了新生。
本文将深入浅出地探讨 Netty 如何实现并运用时间轮,剖析其为何能成为海量定时任务场景下的“性能利器”,同时也客观分析其自身的局限性。
2 时间轮原理
时间轮的原理是非常简单的。
可以想象一个钟表的表盘,它被均匀的分为很多个格子(比如 8 个),格子上挂载了所有在该时间点需要执行的任务,还有一个指针以固定的时间间隔跳动一格,每当指针跳动到格子上时,就会执行该格子里到期的所有任务。
指针以固定的时间间隔进行跳动,这个时间间隔也可以说成是格子的时间间隔。
举一些简单的例子。
假设一个时间轮有 8 格格子,每个格子的时间间隔是 1 秒(tickDuration = 1s)。
- 现在添加一个 3 秒后执行的任务 task,计算 3 % 8 = 3,所以将这个任务放在第 3 个格子。
- 指针每秒跳动一次(此时指向 0),在第 3 秒时,指针执行格子 3,发现任务 task,立刻执行它。
对于延迟时间更长的任务(超出一个时间轮一圈的时长),比如一个任务需要在 10 秒后执行,而时间轮只有 8 个格子,这就引入了圈数的概念。
- 计算 10 % 8 = 2,所以它应该放在第 2 个格子。
- 同时记录它的圈数:10 / 8 = 1,商为 1,所有这个任务需要等待 1 圈零 2 格才执行。
- 指针每跳完一圈,所有任务的圈数减 1,当指针再次指向格子 2 时,发现该任务的圈数减为 0,就执行它。
从上面的例子可以看出,时间轮类似于 HashMap,如果多个任务需要放入同一个格子,就采用双向链表拉链。
3 基础示例
下面看一个 Netty 实现的基于哈希算法的时间轮的使用。
class TimerTest {
public static void main(String[] args) {
Timer timer = new HashedWheelTimer();
TimerTask task = new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
System.out.println("1s 后执行");
}
};
Timeout timeout = timer.newTimeout(task, 1000, TimeUnit.MILLISECONDS);
// 检查任务状态
if (!timeout.isExpired() && !timeout.isCancelled()) {
boolean canceled = timeout.cancel();
System.out.println("任务取消结果: " + canceled);
}
}
}
基于上面的示例,可以看出 Timer、TimerTask 和 Timeout 是 Netty 时间轮定时器 API 的核心组件。
首先,TimerTask 表示实际的定时任务,run 方法中是具体的任务逻辑。
import java.util.concurrent.TimeUnit;
/**
* 定时任务接口,表示在通过{@link Timer#newTimeout(TimerTask, long, TimeUnit)}指定的延迟后执行的任务。
* <p>
* 这是定时器系统中的核心抽象,定义了定时任务的生命周期回调:
* <ul>
* <li>{@link #run(Timeout)} - 任务执行回调</li>
* <li>{@link #cancelled(Timeout)} - 任务取消回调</li>
* </ul>
* <p>
* 实现此接口的类应该:
* <ul>
* <li>在run方法中实现具体的业务逻辑</li>
* <li>处理可能抛出的异常</li>
* <li>在cancelled方法中进行必要的清理工作</li>
* </ul>
*
* <h3>使用示例:</h3>
* <pre>{@code
* TimerTask task = new TimerTask() {
* @Override
* public void run(Timeout timeout) throws Exception {
* System.out.println("任务执行了!");
* }
*
* @Override
* public void cancelled(Timeout timeout) {
* System.out.println("任务被取消了,进行清理...");
* }
* };
* }</pre>
*
* @see Timer 定时器接口
* @see Timeout 超时任务句柄
*/
public interface TimerTask {
/**
* 在通过{@link Timer#newTimeout(TimerTask, long, TimeUnit)}指定的延迟后执行。
* <p>
* 这是任务的主要执行方法,包含具体的业务逻辑。此方法在定时器的工作线程中被调用,
* 因此应该避免执行耗时过长的操作,以免影响其他任务的调度。
* <p>
* 如果此方法抛出异常,异常会被定时器捕获并记录,但不会影响定时器的正常运行。
*
* @param timeout 与此任务关联的超时句柄,可用于获取任务状态信息
* @throws Exception 任务执行过程中可能抛出的任何异常
*/
void run(Timeout timeout) throws Exception;
/**
* 当{@link TimerTask}通过{@link Timeout#cancel()}成功取消时被调用。
* <p>
* 重写此方法允许执行一些清理工作,例如:
* <ul>
* <li>释放资源</li>
* <li>记录日志</li>
* <li>通知其他组件</li>
* <li>执行回滚操作</li>
* </ul>
* <p>
* 默认实现不执行任何操作。此方法在任务被成功取消后调用,
* 如果任务已经开始执行或已经完成,则不会调用此方法。
*
* @param timeout 与此任务关联的超时句柄
*/
default void cancelled(Timeout timeout) {
// 默认不执行任何操作
}
}
其次是 Timer 接口,它表示定时器的调度器,核心职责是管理时间轮和任务调度。
它提供了 newTimeout 方法,让你提交一个 TimerTask,并指定延迟时间,它负责计算在任务应该放在时间轮的哪个格子里,并在正确的时间触发它(执行 TimerTask 的 run 回调)。
import java.util.Set;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
/**
* 定时器接口,用于在后台线程中调度{@link TimerTask}进行一次性的未来执行。
* <p>
* 这是定时器的核心抽象接口,定义了定时器的基本功能:
* <ul>
* <li>调度定时任务</li>
* <li>停止定时器并清理资源</li>
* </ul>
* <p>
* 实现类应该提供高效的任务调度机制,并确保资源的正确管理。
*
* @see TimerTask 定时任务接口
* @see Timeout 超时任务句柄
* @see HashedWheelTimer 基于时间轮的高性能实现
*/
public interface Timer {
/**
* 调度指定的{@link TimerTask}在指定延迟后执行一次。
* <p>
* 此方法将任务添加到定时器的调度队列中,任务将在延迟时间到达后被执行。
* 如果定时器已经停止,则会抛出IllegalStateException异常。
*
* @param task 要执行的定时任务,不能为null
* @param delay 延迟时间,必须为非负数
* @param unit 延迟时间的时间单位,不能为null
* @return 与指定任务关联的超时句柄,可用于取消任务或查询状态
*
* @throws IllegalStateException 如果此定时器已经被{@linkplain #stop() 停止}
* @throws RejectedExecutionException 如果待处理的超时任务过多,创建新的超时任务可能导致系统不稳定
* @throws NullPointerException 如果task或unit为null
* @throws IllegalArgumentException 如果delay为负数
*/
Timeout newTimeout(TimerTask task, long delay, TimeUnit unit);
/**
* 释放此{@link Timer}获取的所有资源,并取消所有已调度但尚未执行的任务。
* <p>
* 此方法会:
* <ul>
* <li>停止定时器的工作线程</li>
* <li>取消所有待执行的任务</li>
* <li>释放相关资源</li>
* <li>返回被取消的任务句柄</li>
* </ul>
* <p>
* 调用此方法后,定时器将无法再调度新的任务。
*
* @return 与被此方法取消的任务关联的句柄集合
*/
Set<Timeout> stop();
}
最后是 Timeout,它表示一个定时任务的句柄,每次调用 timer.newTimeout 时,都会返回一个 Timeout 对象。
通过这个 Timeout 对象,可以查询任务的状态(是否已过期、是否被取消),也可以调用 cancel 方法来取消这个尚未执行的任务,最后它关联了对应的 Timer 和 TimerTask。
/**
* 定时任务句柄,与{@link TimerTask}关联,由{@link Timer}返回。
* <p>
* 这是定时器系统中的重要抽象,提供了对已调度任务的控制和状态查询功能:
* <ul>
* <li>查询任务状态(是否过期、是否取消)</li>
* <li>取消任务执行</li>
* <li>获取关联的定时器和任务对象</li>
* </ul>
* <p>
* 定时任务的生命周期状态:
* <ol>
* <li><b>初始状态</b>:任务刚被调度,等待执行</li>
* <li><b>已过期</b>:任务已执行完成</li>
* <li><b>已取消</b>:任务在执行前被取消</li>
* </ol>
* <p>
* 一旦任务进入"已过期"或"已取消"状态,就不能再改变状态。
*
* <h3>使用示例:</h3>
* <pre>{@code
* Timer timer = new HashedWheelTimer();
* Timeout timeout = timer.newTimeout(task, 5, TimeUnit.SECONDS);
*
* // 检查任务状态
* if (!timeout.isExpired() && !timeout.isCancelled()) {
* // 取消任务
* boolean cancelled = timeout.cancel();
* System.out.println("任务取消结果: " + cancelled);
* }
* }</pre>
*
* @see Timer 定时器接口
* @see TimerTask 定时任务接口
*/
public interface Timeout {
/**
* 返回创建此句柄的{@link Timer}。
* <p>
* 此方法用于获取创建此定时任务的定时器实例,可用于:
* <ul>
* <li>调度更多任务</li>
* <li>查询定时器状态</li>
* <li>停止定时器</li>
* </ul>
*
* @return 创建此句柄的定时器实例,永远不会为null
*/
Timer timer();
/**
* 返回与此句柄关联的{@link TimerTask}。
* <p>
* 此方法用于获取此定时任务要执行的具体任务对象,可用于:
* <ul>
* <li>获取任务的具体信息</li>
* <li>在取消时进行特定处理</li>
* </ul>
*
* @return 与此句柄关联的定时任务,永远不会为null
*/
TimerTask task();
/**
* 当且仅当与此句柄关联的{@link TimerTask}已过期时返回{@code true}。
* <p>
* 任务过期意味着:
* <ul>
* <li>任务已经执行完成</li>
* <li>任务不能再被取消</li>
* <li>任务的生命周期已结束</li>
* </ul>
* <p>
* 注意:已过期的任务不一定执行成功,可能在执行过程中抛出了异常,只是表示执行过
*
* @return 如果任务已过期则返回true,否则返回false
*/
boolean isExpired();
/**
* 当且仅当与此句柄关联的{@link TimerTask}已被取消时返回{@code true}。
* <p>
* 任务被取消意味着:
* <ul>
* <li>任务在执行前被主动取消</li>
* <li>任务不会再被执行</li>
* <li>可能会调用{@link TimerTask#cancelled(Timeout)}回调</li>
* </ul>
* <p>
* 注意:已经开始执行或已经完成的任务不能被取消。
*
* @return 如果任务已被取消则返回true,否则返回false
*/
boolean isCancelled();
/**
* 尝试取消与此句柄关联的{@link TimerTask}。
* <p>
* 取消操作的行为:
* <ul>
* <li>如果任务尚未执行,则取消成功,返回true</li>
* <li>如果任务已经执行或已经取消,则无副作用,返回false</li>
* <li>成功取消的任务会调用{@link TimerTask#cancelled(Timeout)}回调</li>
* </ul>
* <p>
* 此方法是线程安全的,可以从任何线程调用。
*
* @return 如果取消成功则返回true,否则返回false
*/
boolean cancel();
}
在 Netty 中,HashedWheelTimeout 是 Timeout 的唯一实现,HashedWheelTimer 是 Timer 的唯一实现。
4 实现源码
在 HashedWheelTimer 的实现中,时间轮本质是一个数组(固定大小),数组元素是 HashedWheelBucket,每个 bucket 对应时间轮上的一个格子,存储这个时间段应该执行的所有定时任务,这些任务基于双向链表进行串联。
双向链表的节点就是 HashedWheelTimeout,其内部通过 prev 和 next 链接前后节点,而 HashedWheelBucket 中维护了该槽位的链表头尾节点。
同时,整个时间轮由单个工作线程完成运转,避免多线程锁竞争开销。
当任务被添加时,并不是直接加入到时间轮对应的槽位链表中,而是先进入一个 Mpsc timeouts 队列,在每一次 tick 到来时,会将 timeouts 队列中至多 100000 个任务加入到时间轮上,此外,在通过 Timeout cancel 任务时,也只是将任务状态从初始化更新为取消,然后将任务句柄加入到另一个 Mpsc cancelledTimeouts 队列,等到下一次 tick 时,才会在时间轮中进行实际的移出操作。
整个 HashedWheelTimer 的实现是比较简单的,这里就直接给出代码。
4.1 添加任务
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
Validate.notNull(task, "task");
Validate.notNull(unit, "unit");
// 增加待处理定时任务计数
long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();
// 检查是否超过最大待处理任务数量限制
if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
pendingTimeouts.decrementAndGet();
throw new RejectedExecutionException("Number of pending timeouts ("
+ pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "
+ "timeouts (" + maxPendingTimeouts + ")");
}
// 确保定时器已启动
start(); // 内部会启动工作线程
// 将定时任务添加到超时队列中,将在下一个tick时处理
// 在处理过程中,所有排队的HashedWheelTimeout将被添加到正确的HashedWheelBucket中
long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
// 防止溢出
if (delay > 0 && deadline < 0) {
deadline = Long.MAX_VALUE;
}
HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
timeouts.add(timeout);
return timeout;
}
4.2 时间轮调度
整个时间轮的调度是由单个线程完成的,该线程会在第一次添加任务时启动。
public void run() {
// 初始化启动时间
startTime = System.nanoTime();
if (startTime == 0) {
// 使用0作为未初始化值的指示器,所以确保初始化时不为0
startTime = 1;
}
// 通知在start()方法中等待初始化的其他线程
startTimeInitialized.countDown();
// 主循环:处理时间轮的每个tick
do {
final long deadline = waitForNextTick();
if (deadline > 0) {
// 计算当前tick对应的bucket索引
int idx = (int) (tick & mask);
// 处理已取消的任务
processCancelledTasks();
// 获取当前tick对应的bucket
HashedWheelBucket bucket = wheel[idx];
// 将新提交的定时任务分配到相应的bucket中
transferTimeoutsToBuckets();
// 执行当前bucket中到期的定时任务
bucket.expireTimeouts(deadline);
// 增加tick计数
tick++;
}
} while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);
// 收集未处理的定时任务,以便从stop()方法返回
for (HashedWheelBucket bucket : wheel) {
bucket.clearTimeouts(unprocessedTimeouts);
}
// 处理队列中剩余的定时任务
for (; ; ) {
HashedWheelTimeout timeout = timeouts.poll();
if (timeout == null) {
break;
}
if (!timeout.isCancelled()) {
unprocessedTimeouts.add(timeout);
}
}
// 最后处理一次已取消的任务
processCancelledTasks();
}
private long waitForNextTick() {
// 计算下一个tick的目标时间
long deadline = tickDuration * (tick + 1);
for (; ; ) {
final long currentTime = System.nanoTime() - startTime;
// 计算需要睡眠的毫秒数,+999999是为了向上取整
long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;
if (sleepTimeMs <= 0) {
// 已经到达或超过目标时间
if (currentTime == Long.MIN_VALUE) {
return -Long.MAX_VALUE;
} else {
return currentTime;
}
}
// 检查是否在Windows上运行,如果是,需要调整睡眠时间作为JVM在Windows上的bug的解决方案
if (SystemUtils.IS_OS_WINDOWS) {
sleepTimeMs = sleepTimeMs / 10 * 10;
if (sleepTimeMs == 0) {
sleepTimeMs = 1;
}
}
try {
Thread.sleep(sleepTimeMs);
} catch (InterruptedException ignored) {
// 如果收到关闭请求,立即返回
if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {
return Long.MIN_VALUE;
}
}
}
}
private void processCancelledTasks() {
for (; ; ) {
HashedWheelTimeout timeout = cancelledTimeouts.poll();
if (timeout == null) {
// 所有已取消的任务都已处理
break;
}
try {
// 先从对应的槽中移出该 HashedWheelTimeout 节点(链表节点删除)
// 然后执行可能的 cancelled 回调方法
timeout.removeAfterCancellation();
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("An exception was thrown while process a cancellation task", t);
}
}
}
}
private void transferTimeoutsToBuckets() {
// 每个tick最多只转移100000个定时任务,防止当有线程不断添加新定时任务时阻塞工作线程
for (int i = 0; i < 100000; i++) {
HashedWheelTimeout timeout = timeouts.poll();
if (timeout == null) {
// 所有任务都已处理
break;
}
if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {
// 任务在此期间被取消了
continue;
}
// 计算任务应该在哪个tick执行
// deadline 是相对于定时器 startTime 的时间差
long calculated = timeout.deadline / tickDuration;
// 计算需要经过多少轮时间轮才能执行此任务
timeout.remainingRounds = (calculated - tick) / wheel.length;
// 确保不会调度到过去的时间
final long ticks = Math.max(calculated, tick);
// 计算应该放入哪个bucket
int stopIndex = (int) (ticks & mask);
HashedWheelBucket bucket = wheel[stopIndex];
// 添加 HashedWheelTimeout 节点到 bucket 槽
bucket.addTimeout(timeout);
}
}
public void expireTimeouts(long deadline) {
HashedWheelTimeout timeout = head;
// 处理所有定时任务
while (timeout != null) {
HashedWheelTimeout next = timeout.next;
if (timeout.remainingRounds <= 0) {
// 剩余轮数为0,检查是否到期
if (timeout.deadline <= deadline) {
timeout.expire();
} else {
// 定时任务被放入了错误的槽位,这不应该发生
throw new IllegalStateException(String.format(
"timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
}
} else if (!timeout.isCancelled()) {
// 还需要等待更多轮,减少剩余轮数
timeout.remainingRounds--;
}
timeout = next;
}
}
public void expire() {
// 更新任务状态
if (!STATE_UPDATER.compareAndSet(this, ST_INIT, ST_EXPIRED)) {
return;
}
try {
// 先从链表中移除节点
remove();
// 实际执行任务
timer.taskExecutor.execute(this);
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("An exception was thrown" +
" while submit {} for execution.", TimerTask.class.getSimpleName(), t);
}
}
}
4.3 取消任务
public boolean cancel() {
// 只更新状态,实际的移除操作将在下一个tick时进行
if (!STATE_UPDATER.compareAndSet(this, ST_INIT, ST_CANCELLED)) {
return false;
}
// 如果任务需要被取消,将其放入另一个队列,该队列将在每个tick时处理。
// 这意味着我们最多有1个tick持续时间的GC延迟,这已经足够好了。
// 这样我们可以再次使用MpscLinkedQueue,从而尽可能减少锁定/开销。
timer.cancelledTimeouts.add(this);
return true;
}
5 时间轮与 JDK 定时器
ScheduledThreadPoolExecutor 和 Timer 其内部使用堆来存储定时任务,每次任务的插入(schedule)和取消(cancel)时间复杂度都为 O(log n)。当任务数量巨大时,频繁的插入和取消操作会带来显著的性能开销。
而在时间轮上,任务的添加和取消都是近乎 O(1) 的时间复杂度,在添加任务时,只需要计算出对应的槽位,然后添加到槽位对应的链表尾。在取消任务时,也是同样的道理,计算出对应的槽位之后,移除对应的链表节点即可。
双向链表的设计可以非常方便进行链表节点的尾插和任意节点的移除。
尽管时间轮在特定场景下优势巨大,但它也并非银弹,存在以下劣势:
1)调度精度(误差)问题:这是时间轮最显著的缺点。时间轮的调度精度取决于一个 tick 的时长(tickDuration)。如果时间轮设置为每秒移动一个刻度(tickDuration = 1s),那么所有任务的执行时间都会对齐到整秒。例如,一个希望在 150ms 后执行的任务,可能会被放入时间轮的第 1 个格子中,并在 1000ms 后才被执行,存在高达 850ms 的误差。虽然可以通过减小 tickDuration(如 1ms)来提高精度,但这会增大空转开销。
2)内存占用:时间轮是一种“以空间换时间”的算法。为了降低冲突(多个任务落入同一个槽),需要设置足够多的槽位数(ticksPerWheel)。一个拥有 512 个或 1024 个槽位的时间轮并不少见,这会预先占用一块连续的内存空间。虽然单个槽位开销很小,但相比基于堆的定时器动态分配的模式,其初始空间开销是固定的。
3)不适合处理到期过长的定时任务:基本的时间轮模型是一个环形数组,其能表示的时间范围是有限的(tickDuration * ticksPerWheel)。对于超出这个范围的超时任务,需要额外的机制(例如层次时间轮或溢出轮)来管理,增加了实现的复杂性。而 JDK 的定时器基于绝对时间,理论上可以处理任何未来的任务。
总的来说,时间轮是管理海量、短周期、对精度要求不极端的定时任务的绝佳选择,尤其是在网络 IO 超时、心跳等场景下,其性能优势是 JDK 定时器无法比拟的。
但在需要高精度调度或处理到期时间过长的大型任务时,则需要慎重评估或选择其他方案。