add TokenBucketRateLimiter

This commit is contained in:
Looly 2024-08-20 20:36:25 +08:00
parent 9ee9fb8866
commit f40092c95e
5 changed files with 187 additions and 56 deletions

View File

@ -0,0 +1,77 @@
/*
* Copyright (c) 2024 Hutool Team.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.dromara.hutool.core.thread.ratelimiter;
import org.dromara.hutool.core.thread.NamedThreadFactory;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
/**
* 基于{@link Semaphore} 实现的限流器<br>
* 此算法实现了固定窗口(Fixed Window)计数法即设置固定窗口<br>
* 窗口时间为{@link RateLimiterConfig#getRefreshPeriod()}每次窗口内请求数不超过{@link RateLimiterConfig#getCapacity()}<br>
* 在窗口期允许的请求数是固定的请求结束后拒绝访问直到下一个窗口开始则重新开始计数<br>
* 参考https://github.com/TFdream/juice/blob/master/juice-ratelimiter/src/main/java/juice/ratelimiter/internal/SemaphoreBasedRateLimiter.java
*
* <ul>
* <li>优点内存占用小实现简单</li>
* <li>缺点不够平滑在窗口期开始时可能请求暴增窗口结束时大量请求丢失突刺现象</li>
* </ul>
*
* @author Ricky Fung
* @author Looly
* @since 6.0.0
*/
public class FixedRateLimiter extends SemaphoreRateLimiter {
/**
* 构造
*
* @param config 配置
*/
public FixedRateLimiter(final RateLimiterConfig config) {
super(config, null, configureScheduler());
}
/**
* 刷新限制填满许可数为{@link RateLimiterConfig#getCapacity()}<br>
* 用户可手动调用此方法填满许可<br>
*
* @see RateLimiterConfig#getCapacity()
*/
@Override
public void refreshLimit() {
final int permitsToFill = this.config.getCapacity() - semaphore.availablePermits();
if (permitsToFill > 0) {
// 只有在周期内不满时才填充
semaphore.release(permitsToFill);
}
}
/**
* 创建定时器
*
* @return 定时器
*/
private static ScheduledExecutorService configureScheduler() {
final ThreadFactory threadFactory = new NamedThreadFactory("FixedRateLimiterScheduler-", true);
return new ScheduledThreadPoolExecutor(1, threadFactory);
}
}

View File

@ -45,23 +45,23 @@ public class RateLimiterConfig {
/**
* 限制刷新周期即每多少时间刷新一次
*/
private final Duration limitRefreshPeriod;
private final Duration refreshPeriod;
/**
* 每个周期的许可数
* 容量可以是总容量或者每个周期的容量
*/
private final int limitForPeriod;
private final int capacity;
/**
* 构造
*
* @param timeout 超时时间即超过这个时间没有获取到许可则返回false
* @param limitRefreshPeriod 限制刷新周期即每多少时间刷新一次
* @param limitForPeriod 个周期的许可数
* @param refreshPeriod 刷新周期即每多少时间刷新一次
* @param capacity 容量
*/
public RateLimiterConfig(final Duration timeout, final Duration limitRefreshPeriod, final int limitForPeriod) {
public RateLimiterConfig(final Duration timeout, final Duration refreshPeriod, final int capacity) {
this.timeout = timeout;
this.limitRefreshPeriod = limitRefreshPeriod;
this.limitForPeriod = limitForPeriod;
this.refreshPeriod = refreshPeriod;
this.capacity = capacity;
}
/**
@ -74,20 +74,20 @@ public class RateLimiterConfig {
}
/**
* 限制刷新周期即每多少时间刷新一次单位毫秒
* 刷新周期即每多少时间刷新一次单位毫秒
*
* @return 限制刷新周期单位毫秒
* @return 刷新周期单位毫秒
*/
public Duration getLimitRefreshPeriod() {
return limitRefreshPeriod;
public Duration getRefreshPeriod() {
return refreshPeriod;
}
/**
* 每个周期的许可数
* 容量可以是总容量或者每个周期的容量
*
* @return 每个周期的许可数
* @return 容量
*/
public int getLimitForPeriod() {
return limitForPeriod;
public int getCapacity() {
return capacity;
}
}

View File

@ -18,31 +18,25 @@ package org.dromara.hutool.core.thread.ratelimiter;
import org.dromara.hutool.core.lang.Assert;
import org.dromara.hutool.core.lang.Opt;
import org.dromara.hutool.core.thread.NamedThreadFactory;
import java.util.concurrent.*;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
/**
* 基于{@link Semaphore} 实现的限流器<br>
* 此算法实现了固定窗口(Fixed Window)计数法即设置固定窗口<br>
* 窗口时间为{@link RateLimiterConfig#getLimitRefreshPeriod()}每次窗口内请求数不超过{@link RateLimiterConfig#getLimitForPeriod()}<br>
* 在窗口期允许的请求数是固定的请求结束后拒绝访问直到下一个窗口开始则重新开始计数<br>
* 参考https://github.com/TFdream/juice/blob/master/juice-ratelimiter/src/main/java/juice/ratelimiter/internal/SemaphoreBasedRateLimiter.java
*
* <ul>
* <li>优点内存占用小实现简单</li>
* <li>缺点不够平滑在窗口期开始时可能请求暴增窗口结束时大量请求丢失突刺现象</li>
* </ul>
* 基于{@link Semaphore} 实现的限流器
*
* @author Ricky Fung
* @author Looly
* @since 6.0.0
*/
public class SemaphoreRateLimiter implements RateLimiter {
public abstract class SemaphoreRateLimiter implements RateLimiter {
private final RateLimiterConfig config;
private final ScheduledExecutorService scheduler;
private final Semaphore semaphore;
protected final RateLimiterConfig config;
protected final Semaphore semaphore;
protected final ScheduledExecutorService scheduler;
// region ----- Constructor
/**
* 构造
*
@ -66,16 +60,17 @@ public class SemaphoreRateLimiter implements RateLimiter {
* 构造
*
* @param config 限流配置
* @param semaphore {@link Semaphore}
* @param scheduler 定时器
* @param semaphore {@link Semaphore}默认使用{@link RateLimiterConfig#getCapacity()}创建
* @param scheduler 定时器{@code null}表示不定时
*/
public SemaphoreRateLimiter(final RateLimiterConfig config, final Semaphore semaphore, final ScheduledExecutorService scheduler) {
this.config = Assert.notNull(config);
this.semaphore = Opt.ofNullable(semaphore).orElseGet(() -> new Semaphore(config.getLimitForPeriod()));
this.scheduler = Opt.ofNullable(scheduler).orElseGet(this::configureScheduler);
this.semaphore = Opt.ofNullable(semaphore).orElseGet(() -> new Semaphore(config.getCapacity()));
this.scheduler = scheduler;
//启动定时器
scheduleLimitRefresh();
}
// endregion
@Override
public boolean tryAcquire(final int permits) {
@ -88,30 +83,26 @@ public class SemaphoreRateLimiter implements RateLimiter {
}
/**
* 刷新限制填满许可数为{@link RateLimiterConfig#getLimitForPeriod()}<br>
* 用户可手动调用此方法填满许可
* 刷新限制用户可重写此方法改变填充许可方式
* <ul>
* <li>填满窗口一般用于固定窗口Fixed Window</li>
* <li>固定频率填充如每个周期只填充1个配合{@link RateLimiterConfig#getRefreshPeriod()}可实现令牌桶Token Bucket</li>
* </ul>
* 同样用户可通过调用此方法手动刷新<br>
* 注意重写此方法前需判断许可是否已满
*
* @see RateLimiterConfig#getLimitForPeriod()
* @see RateLimiterConfig#getCapacity()
*/
public void refreshLimit() {
semaphore.release(this.config.getLimitForPeriod() - semaphore.availablePermits());
}
public abstract void refreshLimit();
/**
* 创建定时器
*
* @return 定时器
*/
private ScheduledExecutorService configureScheduler() {
final ThreadFactory threadFactory = new NamedThreadFactory("SemaphoreRateLimiterScheduler-", true);
return new ScheduledThreadPoolExecutor(1, threadFactory);
}
/**
* 启动定时器
* 启动定时器未定义则不启动
*/
private void scheduleLimitRefresh() {
final long limitRefreshPeriod = this.config.getLimitRefreshPeriod().toNanos();
if (null == this.scheduler) {
return;
}
final long limitRefreshPeriod = this.config.getRefreshPeriod().toNanos();
scheduler.scheduleAtFixedRate(
this::refreshLimit,
limitRefreshPeriod,

View File

@ -0,0 +1,63 @@
/*
* Copyright (c) 2024 Hutool Team.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.dromara.hutool.core.thread.ratelimiter;
import org.dromara.hutool.core.thread.NamedThreadFactory;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
/**
* 令牌桶Token Bucket限流器<br>
* 令牌桶算法能够在限制数据的平均传输速率的同时还允许某种程度的突发传输<br>
* 概念见https://zhuanlan.zhihu.com/p/110596981<br>
* 此限流器通过{@link #refreshLimit()}方法配合{@link RateLimiterConfig#getRefreshPeriod()} 实现按照固定速率填充令牌到桶中
*
* @author looly
* @since 6.0.0
*/
public class TokenBucketRateLimiter extends SemaphoreRateLimiter {
/**
* 构造
*
* @param config 配置
*/
public TokenBucketRateLimiter(final RateLimiterConfig config) {
super(config, null, configureScheduler());
}
@Override
public void refreshLimit() {
if (this.config.getCapacity() - semaphore.availablePermits() > 0) {
// 只有在周期内不满时才填充
// 令牌桶的填充主要依靠刷新周期调整令牌填充速度每次只填充1个令牌
semaphore.release(1);
}
}
/**
* 创建定时器
*
* @return 定时器
*/
private static ScheduledExecutorService configureScheduler() {
final ThreadFactory threadFactory = new NamedThreadFactory("TokenBucketLimiterScheduler-", true);
return new ScheduledThreadPoolExecutor(1, threadFactory);
}
}

View File

@ -11,7 +11,7 @@ public class SemaphoreRateLimiterTest {
@Test
void test() {
final RateLimiterConfig rateLimiterConfig = RateLimiterConfig.of(Duration.ofSeconds(5), Duration.ofMillis(300), 5);
final RateLimiter rateLimiter = new SemaphoreRateLimiter(rateLimiterConfig);
final RateLimiter rateLimiter = new FixedRateLimiter(rateLimiterConfig);
final boolean b = rateLimiter.tryAcquire(5);
Assertions.assertTrue(b);