From 732e740d9439e8ca76b418b8e8e9ce2dbe3bf215 Mon Sep 17 00:00:00 2001 From: Looly Date: Mon, 10 May 2021 17:44:57 +0800 Subject: [PATCH] add TimingWheel --- CHANGELOG.md | 4 +- .../hutool/cron/timingwheel/SystemTimer.java | 70 ++++++++++++ .../cn/hutool/cron/timingwheel/TimerTask.java | 54 +++++++++ .../cron/timingwheel/TimerTaskList.java | 98 ++++++++++++++++ .../hutool/cron/timingwheel/TimingWheel.java | 108 ++++++++++++++++++ .../hutool/cron/timingwheel/package-info.java | 12 ++ 6 files changed, 345 insertions(+), 1 deletion(-) create mode 100644 hutool-cron/src/main/java/cn/hutool/cron/timingwheel/SystemTimer.java create mode 100644 hutool-cron/src/main/java/cn/hutool/cron/timingwheel/TimerTask.java create mode 100644 hutool-cron/src/main/java/cn/hutool/cron/timingwheel/TimerTaskList.java create mode 100644 hutool-cron/src/main/java/cn/hutool/cron/timingwheel/TimingWheel.java create mode 100644 hutool-cron/src/main/java/cn/hutool/cron/timingwheel/package-info.java diff --git a/CHANGELOG.md b/CHANGELOG.md index abbcbe8f0..5574e4daf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,9 +3,11 @@ ------------------------------------------------------------------------------------------------------------- -# 5.6.6 (2021-05-08) +# 5.6.6 (2021-05-10) ### 🐣新特性 +* 【cron 】 增加时间轮简单实现 + ### 🐞Bug修复 ------------------------------------------------------------------------------------------------------------- diff --git a/hutool-cron/src/main/java/cn/hutool/cron/timingwheel/SystemTimer.java b/hutool-cron/src/main/java/cn/hutool/cron/timingwheel/SystemTimer.java new file mode 100644 index 000000000..29fba2eef --- /dev/null +++ b/hutool-cron/src/main/java/cn/hutool/cron/timingwheel/SystemTimer.java @@ -0,0 +1,70 @@ +package cn.hutool.cron.timingwheel; + +import cn.hutool.core.thread.ThreadUtil; + +import java.util.concurrent.DelayQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; + +public class SystemTimer { + /** + * 底层时间轮 + */ + private final TimingWheel timeWheel; + + /** + * 一个Timer只有一个delayQueue + */ + private final DelayQueue delayQueue = new DelayQueue<>(); + + /** + * 过期任务执行线程 + */ + private final ExecutorService workerThreadPool; + + /** + * 轮询delayQueue获取过期任务线程 + */ + private ExecutorService bossThreadPool; + + /** + * 构造函数 + */ + public SystemTimer(int timeout) { + timeWheel = new TimingWheel(1, 20, System.currentTimeMillis(), delayQueue); + workerThreadPool = ThreadUtil.newExecutor(100); + bossThreadPool = ThreadUtil.newSingleExecutor(); + bossThreadPool.submit(() -> { + while (true) { + this.advanceClock(timeout); + } + }); + } + + /** + * 添加任务 + */ + public void addTask(TimerTask timerTask) { + //添加失败任务直接执行 + if (!timeWheel.addTask(timerTask)) { + workerThreadPool.submit(timerTask.getTask()); + } + } + + /** + * 获取过期任务 + */ + private void advanceClock(long timeout) { + try { + TimerTaskList timerTaskList = delayQueue.poll(timeout, TimeUnit.MILLISECONDS); + if (timerTaskList != null) { + //推进时间 + timeWheel.advanceClock(timerTaskList.getExpiration()); + //执行过期任务(包含降级操作) + timerTaskList.flush(this::addTask); + } + } catch (InterruptedException ignore) { + // ignore + } + } +} diff --git a/hutool-cron/src/main/java/cn/hutool/cron/timingwheel/TimerTask.java b/hutool-cron/src/main/java/cn/hutool/cron/timingwheel/TimerTask.java new file mode 100644 index 000000000..f894e88b0 --- /dev/null +++ b/hutool-cron/src/main/java/cn/hutool/cron/timingwheel/TimerTask.java @@ -0,0 +1,54 @@ +package cn.hutool.cron.timingwheel; + +public class TimerTask { + /** + * 延迟时间 + */ + private final long delayMs; + + /** + * 任务 + */ + private final Runnable task; + + /** + * 时间槽 + */ + protected TimerTaskList timerTaskList; + + /** + * 下一个节点 + */ + protected TimerTask next; + + /** + * 上一个节点 + */ + protected TimerTask prev; + + /** + * 描述 + */ + public String desc; + + public TimerTask(Runnable task, long delayMs) { + this.delayMs = System.currentTimeMillis() + delayMs; + this.task = task; + this.timerTaskList = null; + this.next = null; + this.prev = null; + } + + public Runnable getTask() { + return task; + } + + public long getDelayMs() { + return delayMs; + } + + @Override + public String toString() { + return desc; + } +} diff --git a/hutool-cron/src/main/java/cn/hutool/cron/timingwheel/TimerTaskList.java b/hutool-cron/src/main/java/cn/hutool/cron/timingwheel/TimerTaskList.java new file mode 100644 index 000000000..c8185adde --- /dev/null +++ b/hutool-cron/src/main/java/cn/hutool/cron/timingwheel/TimerTaskList.java @@ -0,0 +1,98 @@ +package cn.hutool.cron.timingwheel; + +import java.util.concurrent.Delayed; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; + +/** + * @Author: siran.yao + * @time: 2020/5/8:上午11:13 + */ +public class TimerTaskList implements Delayed { + /** + * 过期时间 + */ + private AtomicLong expiration = new AtomicLong(-1L); + + /** + * 根节点 + */ + private TimerTask root = new TimerTask( null,-1L); + + { + root.prev = root; + root.next = root; + } + + /** + * 设置过期时间 + */ + public boolean setExpiration(long expire) { + return expiration.getAndSet(expire) != expire; + } + + /** + * 获取过期时间 + */ + public long getExpiration() { + return expiration.get(); + } + + /** + * 新增任务 + */ + public void addTask(TimerTask timerTask) { + synchronized (this) { + if (timerTask.timerTaskList == null) { + timerTask.timerTaskList = this; + TimerTask tail = root.prev; + timerTask.next = root; + timerTask.prev = tail; + tail.next = timerTask; + root.prev = timerTask; + } + } + } + + /** + * 移除任务 + */ + public void removeTask(TimerTask timerTask) { + synchronized (this) { + if (timerTask.timerTaskList.equals(this)) { + timerTask.next.prev = timerTask.prev; + timerTask.prev.next = timerTask.next; + timerTask.timerTaskList = null; + timerTask.next = null; + timerTask.prev = null; + } + } + } + + /** + * 重新分配 + */ + public synchronized void flush(Consumer flush) { + TimerTask timerTask = root.next; + while (!timerTask.equals(root)) { + this.removeTask(timerTask); + flush.accept(timerTask); + timerTask = root.next; + } + expiration.set(-1L); + } + + @Override + public long getDelay(TimeUnit unit) { + return Math.max(0, unit.convert(expiration.get() - System.currentTimeMillis(), TimeUnit.MILLISECONDS)); + } + + @Override + public int compareTo(Delayed o) { + if (o instanceof TimerTaskList) { + return Long.compare(expiration.get(), ((TimerTaskList) o).expiration.get()); + } + return 0; + } +} diff --git a/hutool-cron/src/main/java/cn/hutool/cron/timingwheel/TimingWheel.java b/hutool-cron/src/main/java/cn/hutool/cron/timingwheel/TimingWheel.java new file mode 100644 index 000000000..786e91a38 --- /dev/null +++ b/hutool-cron/src/main/java/cn/hutool/cron/timingwheel/TimingWheel.java @@ -0,0 +1,108 @@ +package cn.hutool.cron.timingwheel; + +import java.util.concurrent.DelayQueue; + +public class TimingWheel { + /** + * 一个时间槽的范围 + */ + private long tickMs; + + /** + * 时间轮大小 + */ + private int wheelSize; + + /** + * 时间跨度 + */ + private long interval; + + /** + * 时间槽 + */ + private TimerTaskList[] timerTaskLists; + + /** + * 当前时间 + */ + private long currentTime; + + /** + * 上层时间轮 + */ + private volatile TimingWheel overflowWheel; + + /** + * 一个Timer只有一个delayQueue + */ + private DelayQueue delayQueue; + + public TimingWheel(long tickMs, int wheelSize, long currentTime, DelayQueue delayQueue) { + this.currentTime = currentTime; + this.tickMs = tickMs; + this.wheelSize = wheelSize; + this.interval = tickMs * wheelSize; + this.timerTaskLists = new TimerTaskList[wheelSize]; + //currentTime为tickMs的整数倍 这里做取整操作 + this.currentTime = currentTime - (currentTime % tickMs); + this.delayQueue = delayQueue; + for (int i = 0; i < wheelSize; i++) { + timerTaskLists[i] = new TimerTaskList(); + } + } + + /** + * 创建或者获取上层时间轮 + */ + private TimingWheel getOverflowWheel() { + if (overflowWheel == null) { + synchronized (this) { + if (overflowWheel == null) { + overflowWheel = new TimingWheel(interval, wheelSize, currentTime, delayQueue); + } + } + } + return overflowWheel; + } + + /** + * 添加任务到时间轮 + */ + public boolean addTask(TimerTask timerTask) { + long expiration = timerTask.getDelayMs(); + //过期任务直接执行 + if (expiration < currentTime + tickMs) { + return false; + } else if (expiration < currentTime + interval) { + //当前时间轮可以容纳该任务 加入时间槽 + Long virtualId = expiration / tickMs; + int index = (int) (virtualId % wheelSize); + System.out.println("tickMs:" + tickMs + "------index:" + index + "------expiration:" + expiration); + TimerTaskList timerTaskList = timerTaskLists[index]; + timerTaskList.addTask(timerTask); + if (timerTaskList.setExpiration(virtualId * tickMs)) { + //添加到delayQueue中 + delayQueue.offer(timerTaskList); + } + } else { + //放到上一层的时间轮 + TimingWheel timeWheel = getOverflowWheel(); + timeWheel.addTask(timerTask); + } + return true; + } + + /** + * 推进时间 + */ + public void advanceClock(long timestamp) { + if (timestamp >= currentTime + tickMs) { + currentTime = timestamp - (timestamp % tickMs); + if (overflowWheel != null) { + //推进上层时间轮时间 + this.getOverflowWheel().advanceClock(timestamp); + } + } + } +} diff --git a/hutool-cron/src/main/java/cn/hutool/cron/timingwheel/package-info.java b/hutool-cron/src/main/java/cn/hutool/cron/timingwheel/package-info.java new file mode 100644 index 000000000..220d1ce6b --- /dev/null +++ b/hutool-cron/src/main/java/cn/hutool/cron/timingwheel/package-info.java @@ -0,0 +1,12 @@ +/** + * 时间轮实现,重写了kafka的TimingWheel
+ * 时间轮一般会实现成一个环形结构,类似一个时钟,分为很多槽,一个槽代表一个时间间隔,每个槽使用双向链表存储定时任务。指针周期性地跳动,跳动到一个槽位,就执行该槽位的定时任务。 + * + *

+ * 时间轮算法介绍:https://www.confluent.io/blog/apache-kafka-purgatory-hierarchical-timing-wheels/
+ * 参考:https://github.com/eliasyaoyc/timingwheel + * + * @author looly + * + */ +package cn.hutool.cron.timingwheel;