mirror of
https://gitee.com/chinabugotech/hutool.git
synced 2025-05-09 23:51:34 +08:00
add TimingWheel
This commit is contained in:
parent
9da63b9316
commit
732e740d94
@ -3,9 +3,11 @@
|
|||||||
|
|
||||||
-------------------------------------------------------------------------------------------------------------
|
-------------------------------------------------------------------------------------------------------------
|
||||||
|
|
||||||
# 5.6.6 (2021-05-08)
|
# 5.6.6 (2021-05-10)
|
||||||
|
|
||||||
### 🐣新特性
|
### 🐣新特性
|
||||||
|
* 【cron 】 增加时间轮简单实现
|
||||||
|
|
||||||
### 🐞Bug修复
|
### 🐞Bug修复
|
||||||
|
|
||||||
-------------------------------------------------------------------------------------------------------------
|
-------------------------------------------------------------------------------------------------------------
|
||||||
|
@ -0,0 +1,70 @@
|
|||||||
|
package cn.hutool.cron.timingwheel;
|
||||||
|
|
||||||
|
import cn.hutool.core.thread.ThreadUtil;
|
||||||
|
|
||||||
|
import java.util.concurrent.DelayQueue;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
public class SystemTimer {
|
||||||
|
/**
|
||||||
|
* 底层时间轮
|
||||||
|
*/
|
||||||
|
private final TimingWheel timeWheel;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 一个Timer只有一个delayQueue
|
||||||
|
*/
|
||||||
|
private final DelayQueue<TimerTaskList> delayQueue = new DelayQueue<>();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 过期任务执行线程
|
||||||
|
*/
|
||||||
|
private final ExecutorService workerThreadPool;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 轮询delayQueue获取过期任务线程
|
||||||
|
*/
|
||||||
|
private ExecutorService bossThreadPool;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 构造函数
|
||||||
|
*/
|
||||||
|
public SystemTimer(int timeout) {
|
||||||
|
timeWheel = new TimingWheel(1, 20, System.currentTimeMillis(), delayQueue);
|
||||||
|
workerThreadPool = ThreadUtil.newExecutor(100);
|
||||||
|
bossThreadPool = ThreadUtil.newSingleExecutor();
|
||||||
|
bossThreadPool.submit(() -> {
|
||||||
|
while (true) {
|
||||||
|
this.advanceClock(timeout);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 添加任务
|
||||||
|
*/
|
||||||
|
public void addTask(TimerTask timerTask) {
|
||||||
|
//添加失败任务直接执行
|
||||||
|
if (!timeWheel.addTask(timerTask)) {
|
||||||
|
workerThreadPool.submit(timerTask.getTask());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 获取过期任务
|
||||||
|
*/
|
||||||
|
private void advanceClock(long timeout) {
|
||||||
|
try {
|
||||||
|
TimerTaskList timerTaskList = delayQueue.poll(timeout, TimeUnit.MILLISECONDS);
|
||||||
|
if (timerTaskList != null) {
|
||||||
|
//推进时间
|
||||||
|
timeWheel.advanceClock(timerTaskList.getExpiration());
|
||||||
|
//执行过期任务(包含降级操作)
|
||||||
|
timerTaskList.flush(this::addTask);
|
||||||
|
}
|
||||||
|
} catch (InterruptedException ignore) {
|
||||||
|
// ignore
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,54 @@
|
|||||||
|
package cn.hutool.cron.timingwheel;
|
||||||
|
|
||||||
|
public class TimerTask {
|
||||||
|
/**
|
||||||
|
* 延迟时间
|
||||||
|
*/
|
||||||
|
private final long delayMs;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 任务
|
||||||
|
*/
|
||||||
|
private final Runnable task;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 时间槽
|
||||||
|
*/
|
||||||
|
protected TimerTaskList timerTaskList;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 下一个节点
|
||||||
|
*/
|
||||||
|
protected TimerTask next;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 上一个节点
|
||||||
|
*/
|
||||||
|
protected TimerTask prev;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 描述
|
||||||
|
*/
|
||||||
|
public String desc;
|
||||||
|
|
||||||
|
public TimerTask(Runnable task, long delayMs) {
|
||||||
|
this.delayMs = System.currentTimeMillis() + delayMs;
|
||||||
|
this.task = task;
|
||||||
|
this.timerTaskList = null;
|
||||||
|
this.next = null;
|
||||||
|
this.prev = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Runnable getTask() {
|
||||||
|
return task;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getDelayMs() {
|
||||||
|
return delayMs;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return desc;
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,98 @@
|
|||||||
|
package cn.hutool.cron.timingwheel;
|
||||||
|
|
||||||
|
import java.util.concurrent.Delayed;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
import java.util.function.Consumer;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @Author: siran.yao
|
||||||
|
* @time: 2020/5/8:上午11:13
|
||||||
|
*/
|
||||||
|
public class TimerTaskList implements Delayed {
|
||||||
|
/**
|
||||||
|
* 过期时间
|
||||||
|
*/
|
||||||
|
private AtomicLong expiration = new AtomicLong(-1L);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 根节点
|
||||||
|
*/
|
||||||
|
private TimerTask root = new TimerTask( null,-1L);
|
||||||
|
|
||||||
|
{
|
||||||
|
root.prev = root;
|
||||||
|
root.next = root;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 设置过期时间
|
||||||
|
*/
|
||||||
|
public boolean setExpiration(long expire) {
|
||||||
|
return expiration.getAndSet(expire) != expire;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 获取过期时间
|
||||||
|
*/
|
||||||
|
public long getExpiration() {
|
||||||
|
return expiration.get();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 新增任务
|
||||||
|
*/
|
||||||
|
public void addTask(TimerTask timerTask) {
|
||||||
|
synchronized (this) {
|
||||||
|
if (timerTask.timerTaskList == null) {
|
||||||
|
timerTask.timerTaskList = this;
|
||||||
|
TimerTask tail = root.prev;
|
||||||
|
timerTask.next = root;
|
||||||
|
timerTask.prev = tail;
|
||||||
|
tail.next = timerTask;
|
||||||
|
root.prev = timerTask;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 移除任务
|
||||||
|
*/
|
||||||
|
public void removeTask(TimerTask timerTask) {
|
||||||
|
synchronized (this) {
|
||||||
|
if (timerTask.timerTaskList.equals(this)) {
|
||||||
|
timerTask.next.prev = timerTask.prev;
|
||||||
|
timerTask.prev.next = timerTask.next;
|
||||||
|
timerTask.timerTaskList = null;
|
||||||
|
timerTask.next = null;
|
||||||
|
timerTask.prev = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 重新分配
|
||||||
|
*/
|
||||||
|
public synchronized void flush(Consumer<TimerTask> flush) {
|
||||||
|
TimerTask timerTask = root.next;
|
||||||
|
while (!timerTask.equals(root)) {
|
||||||
|
this.removeTask(timerTask);
|
||||||
|
flush.accept(timerTask);
|
||||||
|
timerTask = root.next;
|
||||||
|
}
|
||||||
|
expiration.set(-1L);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getDelay(TimeUnit unit) {
|
||||||
|
return Math.max(0, unit.convert(expiration.get() - System.currentTimeMillis(), TimeUnit.MILLISECONDS));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int compareTo(Delayed o) {
|
||||||
|
if (o instanceof TimerTaskList) {
|
||||||
|
return Long.compare(expiration.get(), ((TimerTaskList) o).expiration.get());
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,108 @@
|
|||||||
|
package cn.hutool.cron.timingwheel;
|
||||||
|
|
||||||
|
import java.util.concurrent.DelayQueue;
|
||||||
|
|
||||||
|
public class TimingWheel {
|
||||||
|
/**
|
||||||
|
* 一个时间槽的范围
|
||||||
|
*/
|
||||||
|
private long tickMs;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 时间轮大小
|
||||||
|
*/
|
||||||
|
private int wheelSize;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 时间跨度
|
||||||
|
*/
|
||||||
|
private long interval;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 时间槽
|
||||||
|
*/
|
||||||
|
private TimerTaskList[] timerTaskLists;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 当前时间
|
||||||
|
*/
|
||||||
|
private long currentTime;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 上层时间轮
|
||||||
|
*/
|
||||||
|
private volatile TimingWheel overflowWheel;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 一个Timer只有一个delayQueue
|
||||||
|
*/
|
||||||
|
private DelayQueue<TimerTaskList> delayQueue;
|
||||||
|
|
||||||
|
public TimingWheel(long tickMs, int wheelSize, long currentTime, DelayQueue<TimerTaskList> delayQueue) {
|
||||||
|
this.currentTime = currentTime;
|
||||||
|
this.tickMs = tickMs;
|
||||||
|
this.wheelSize = wheelSize;
|
||||||
|
this.interval = tickMs * wheelSize;
|
||||||
|
this.timerTaskLists = new TimerTaskList[wheelSize];
|
||||||
|
//currentTime为tickMs的整数倍 这里做取整操作
|
||||||
|
this.currentTime = currentTime - (currentTime % tickMs);
|
||||||
|
this.delayQueue = delayQueue;
|
||||||
|
for (int i = 0; i < wheelSize; i++) {
|
||||||
|
timerTaskLists[i] = new TimerTaskList();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 创建或者获取上层时间轮
|
||||||
|
*/
|
||||||
|
private TimingWheel getOverflowWheel() {
|
||||||
|
if (overflowWheel == null) {
|
||||||
|
synchronized (this) {
|
||||||
|
if (overflowWheel == null) {
|
||||||
|
overflowWheel = new TimingWheel(interval, wheelSize, currentTime, delayQueue);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return overflowWheel;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 添加任务到时间轮
|
||||||
|
*/
|
||||||
|
public boolean addTask(TimerTask timerTask) {
|
||||||
|
long expiration = timerTask.getDelayMs();
|
||||||
|
//过期任务直接执行
|
||||||
|
if (expiration < currentTime + tickMs) {
|
||||||
|
return false;
|
||||||
|
} else if (expiration < currentTime + interval) {
|
||||||
|
//当前时间轮可以容纳该任务 加入时间槽
|
||||||
|
Long virtualId = expiration / tickMs;
|
||||||
|
int index = (int) (virtualId % wheelSize);
|
||||||
|
System.out.println("tickMs:" + tickMs + "------index:" + index + "------expiration:" + expiration);
|
||||||
|
TimerTaskList timerTaskList = timerTaskLists[index];
|
||||||
|
timerTaskList.addTask(timerTask);
|
||||||
|
if (timerTaskList.setExpiration(virtualId * tickMs)) {
|
||||||
|
//添加到delayQueue中
|
||||||
|
delayQueue.offer(timerTaskList);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
//放到上一层的时间轮
|
||||||
|
TimingWheel timeWheel = getOverflowWheel();
|
||||||
|
timeWheel.addTask(timerTask);
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 推进时间
|
||||||
|
*/
|
||||||
|
public void advanceClock(long timestamp) {
|
||||||
|
if (timestamp >= currentTime + tickMs) {
|
||||||
|
currentTime = timestamp - (timestamp % tickMs);
|
||||||
|
if (overflowWheel != null) {
|
||||||
|
//推进上层时间轮时间
|
||||||
|
this.getOverflowWheel().advanceClock(timestamp);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,12 @@
|
|||||||
|
/**
|
||||||
|
* 时间轮实现,重写了kafka的TimingWheel<br>
|
||||||
|
* 时间轮一般会实现成一个环形结构,类似一个时钟,分为很多槽,一个槽代表一个时间间隔,每个槽使用双向链表存储定时任务。指针周期性地跳动,跳动到一个槽位,就执行该槽位的定时任务。
|
||||||
|
*
|
||||||
|
* <p>
|
||||||
|
* 时间轮算法介绍:https://www.confluent.io/blog/apache-kafka-purgatory-hierarchical-timing-wheels/<br>
|
||||||
|
* 参考:https://github.com/eliasyaoyc/timingwheel
|
||||||
|
*
|
||||||
|
* @author looly
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
package cn.hutool.cron.timingwheel;
|
Loading…
x
Reference in New Issue
Block a user