add TokenBucketRateLimiter

This commit is contained in:
Looly 2024-08-21 09:00:27 +08:00
parent f40092c95e
commit d2a00a6268
5 changed files with 115 additions and 144 deletions

View File

@ -1,77 +0,0 @@
/*
* Copyright (c) 2024 Hutool Team.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.dromara.hutool.core.thread.ratelimiter;
import org.dromara.hutool.core.thread.NamedThreadFactory;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
/**
* 基于{@link Semaphore} 实现的限流器<br>
* 此算法实现了固定窗口(Fixed Window)计数法即设置固定窗口<br>
* 窗口时间为{@link RateLimiterConfig#getRefreshPeriod()}每次窗口内请求数不超过{@link RateLimiterConfig#getCapacity()}<br>
* 在窗口期允许的请求数是固定的请求结束后拒绝访问直到下一个窗口开始则重新开始计数<br>
* 参考https://github.com/TFdream/juice/blob/master/juice-ratelimiter/src/main/java/juice/ratelimiter/internal/SemaphoreBasedRateLimiter.java
*
* <ul>
* <li>优点内存占用小实现简单</li>
* <li>缺点不够平滑在窗口期开始时可能请求暴增窗口结束时大量请求丢失突刺现象</li>
* </ul>
*
* @author Ricky Fung
* @author Looly
* @since 6.0.0
*/
public class FixedRateLimiter extends SemaphoreRateLimiter {
/**
* 构造
*
* @param config 配置
*/
public FixedRateLimiter(final RateLimiterConfig config) {
super(config, null, configureScheduler());
}
/**
* 刷新限制填满许可数为{@link RateLimiterConfig#getCapacity()}<br>
* 用户可手动调用此方法填满许可<br>
*
* @see RateLimiterConfig#getCapacity()
*/
@Override
public void refreshLimit() {
final int permitsToFill = this.config.getCapacity() - semaphore.availablePermits();
if (permitsToFill > 0) {
// 只有在周期内不满时才填充
semaphore.release(permitsToFill);
}
}
/**
* 创建定时器
*
* @return 定时器
*/
private static ScheduledExecutorService configureScheduler() {
final ThreadFactory threadFactory = new NamedThreadFactory("FixedRateLimiterScheduler-", true);
return new ScheduledThreadPoolExecutor(1, threadFactory);
}
}

View File

@ -29,59 +29,69 @@ public class RateLimiterConfig {
/** /**
* 创建限流配置 * 创建限流配置
* *
* @param timeout 超时时间即超过这个时间没有获取到许可则返回false
* @param limitRefreshPeriod 限制刷新周期即每多少时间刷新一次
* @param limitForPeriod 个周期的许可数
* @return RateLimiterConfig * @return RateLimiterConfig
*/ */
public static RateLimiterConfig of(final Duration timeout, final Duration limitRefreshPeriod, final int limitForPeriod) { public static RateLimiterConfig of() {
return new RateLimiterConfig(timeout, limitRefreshPeriod, limitForPeriod); return new RateLimiterConfig();
} }
/** /**
* 超时时间即超过这个时间没有获取到许可则返回false * 超时时间即超过这个时间没有获取到许可则返回false
*/ */
private final Duration timeout; private Duration timeout;
/** /**
* 限制刷新周期即每多少时间刷新一次 * 限制刷新周期即每多少时间刷新一次
*/ */
private final Duration refreshPeriod; private Duration refreshPeriod;
/** /**
* 容量可以是总容量或者每个周期的容量 * 容量可以是总容量或者每个周期的容量
*/ */
private final int capacity; private int capacity;
/** /**
* 构造 * 在刷新周期内释放的最大数量不能超过{@link #capacity}
*
* @param timeout 超时时间即超过这个时间没有获取到许可则返回false
* @param refreshPeriod 刷新周期即每多少时间刷新一次
* @param capacity 容量
*/ */
public RateLimiterConfig(final Duration timeout, final Duration refreshPeriod, final int capacity) { private int maxReleaseCount;
this.timeout = timeout;
this.refreshPeriod = refreshPeriod;
this.capacity = capacity;
}
/** /**
* 超时时间即超过这个时间没有获取到许可则返回false单位毫秒 * 超时时间即超过这个时间没有获取到许可则返回false
* *
* @return 超时时间单位毫秒 * @return 超时时间
*/ */
public Duration getTimeout() { public Duration getTimeout() {
return timeout; return timeout;
} }
/** /**
* 刷新周期即每多少时间刷新一次单位毫秒 * 设置超时时间即超过这个时间没有获取到许可则返回false
* *
* @return 刷新周期单位毫秒 * @param timeout 超时时间
* @return this
*/
public RateLimiterConfig setTimeout(final Duration timeout) {
this.timeout = timeout;
return this;
}
/**
* 刷新周期即每多少时间刷新一次
*
* @return 刷新周期
*/ */
public Duration getRefreshPeriod() { public Duration getRefreshPeriod() {
return refreshPeriod; return refreshPeriod;
} }
/**
* 设置刷新周期即每多少时间刷新一次单位毫秒
*
* @param refreshPeriod 刷新周期
* @return this
*/
public RateLimiterConfig setRefreshPeriod(final Duration refreshPeriod) {
this.refreshPeriod = refreshPeriod;
return this;
}
/** /**
* 容量可以是总容量或者每个周期的容量 * 容量可以是总容量或者每个周期的容量
* *
@ -90,4 +100,34 @@ public class RateLimiterConfig {
public int getCapacity() { public int getCapacity() {
return capacity; return capacity;
} }
/**
* 设置容量可以是总容量或者每个周期的容量
* @param capacity 容量
* @return this
*/
public RateLimiterConfig setCapacity(final int capacity) {
this.capacity = capacity;
return this;
}
/**
* 在刷新周期内释放的最大数量不能超过{@link #capacity}
*
* @return 在刷新周期内释放的最大数量
*/
public int getMaxReleaseCount() {
return maxReleaseCount;
}
/**
* 设置在刷新周期内释放的最大数量不能超过{@link #capacity}
*
* @param maxReleaseCount 在刷新周期内释放的最大数量
* @return this
*/
public RateLimiterConfig setMaxReleaseCount(final int maxReleaseCount) {
this.maxReleaseCount = maxReleaseCount;
return this;
}
} }

View File

@ -19,9 +19,7 @@ package org.dromara.hutool.core.thread.ratelimiter;
import org.dromara.hutool.core.lang.Assert; import org.dromara.hutool.core.lang.Assert;
import org.dromara.hutool.core.lang.Opt; import org.dromara.hutool.core.lang.Opt;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore; import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
/** /**
* 基于{@link Semaphore} 实现的限流器 * 基于{@link Semaphore} 实现的限流器
@ -34,7 +32,6 @@ public abstract class SemaphoreRateLimiter implements RateLimiter {
protected final RateLimiterConfig config; protected final RateLimiterConfig config;
protected final Semaphore semaphore; protected final Semaphore semaphore;
protected final ScheduledExecutorService scheduler;
// region ----- Constructor // region ----- Constructor
/** /**
@ -50,25 +47,11 @@ public abstract class SemaphoreRateLimiter implements RateLimiter {
* 构造 * 构造
* *
* @param config 限流配置 * @param config 限流配置
* @param semaphore {@link Semaphore} * @param semaphore {@link Semaphore}默认使用{@link RateLimiterConfig#getCapacity()}创建
*/ */
public SemaphoreRateLimiter(final RateLimiterConfig config, final Semaphore semaphore) { public SemaphoreRateLimiter(final RateLimiterConfig config, final Semaphore semaphore) {
this(config, semaphore, null);
}
/**
* 构造
*
* @param config 限流配置
* @param semaphore {@link Semaphore}默认使用{@link RateLimiterConfig#getCapacity()}创建
* @param scheduler 定时器{@code null}表示不定时
*/
public SemaphoreRateLimiter(final RateLimiterConfig config, final Semaphore semaphore, final ScheduledExecutorService scheduler) {
this.config = Assert.notNull(config); this.config = Assert.notNull(config);
this.semaphore = Opt.ofNullable(semaphore).orElseGet(() -> new Semaphore(config.getCapacity())); this.semaphore = Opt.ofNullable(semaphore).orElseGet(() -> new Semaphore(config.getCapacity()));
this.scheduler = scheduler;
//启动定时器
scheduleLimitRefresh();
} }
// endregion // endregion
@ -94,20 +77,4 @@ public abstract class SemaphoreRateLimiter implements RateLimiter {
* @see RateLimiterConfig#getCapacity() * @see RateLimiterConfig#getCapacity()
*/ */
public abstract void refreshLimit(); public abstract void refreshLimit();
/**
* 启动定时器未定义则不启动
*/
private void scheduleLimitRefresh() {
if (null == this.scheduler) {
return;
}
final long limitRefreshPeriod = this.config.getRefreshPeriod().toNanos();
scheduler.scheduleAtFixedRate(
this::refreshLimit,
limitRefreshPeriod,
limitRefreshPeriod,
TimeUnit.NANOSECONDS
);
}
} }

View File

@ -18,20 +18,31 @@ package org.dromara.hutool.core.thread.ratelimiter;
import org.dromara.hutool.core.thread.NamedThreadFactory; import org.dromara.hutool.core.thread.NamedThreadFactory;
import java.io.Closeable;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
/** /**
* 令牌桶Token Bucket限流器<br> * 令牌桶Token Bucket限流器<br>
* 令牌桶算法能够在限制数据的平均传输速率的同时还允许某种程度的突发传输<br> * 令牌桶算法能够在限制数据的平均传输速率的同时还允许某种程度的突发传输概念见https://zhuanlan.zhihu.com/p/110596981
* 概念见https://zhuanlan.zhihu.com/p/110596981<br> *
* 此限流器通过{@link #refreshLimit()}方法配合{@link RateLimiterConfig#getRefreshPeriod()} 实现按照固定速率填充令牌到桶中 * <p>
* 令牌发放通过scheduler定时器定时向令牌桶中添加令牌直到令牌桶满<br>
* 令牌发放周期为{@link RateLimiterConfig#getRefreshPeriod()}周期内发放个数为{@link RateLimiterConfig#getMaxReleaseCount()}
* </p>
*
* <p>
* 令牌请求通过{@link #tryAcquire(int)} 方法请求令牌如果令牌桶中数量不足则返回false表示请求失败
* </p>
* *
* @author looly * @author looly
* @since 6.0.0 * @since 6.0.0
*/ */
public class TokenBucketRateLimiter extends SemaphoreRateLimiter { public class TokenBucketRateLimiter extends SemaphoreRateLimiter implements Closeable {
protected final ScheduledExecutorService scheduler;
/** /**
* 构造 * 构造
@ -39,18 +50,26 @@ public class TokenBucketRateLimiter extends SemaphoreRateLimiter {
* @param config 配置 * @param config 配置
*/ */
public TokenBucketRateLimiter(final RateLimiterConfig config) { public TokenBucketRateLimiter(final RateLimiterConfig config) {
super(config, null, configureScheduler()); super(config, null);
this.scheduler = configureScheduler();
//启动定时器
scheduleLimitRefresh();
} }
@Override @Override
public void refreshLimit() { public void refreshLimit() {
if (this.config.getCapacity() - semaphore.availablePermits() > 0) { final int permitsToFill = this.config.getCapacity() - semaphore.availablePermits();
if (permitsToFill > 0) {
// 只有在周期内不满时才填充 // 只有在周期内不满时才填充
// 令牌桶的填充主要依靠刷新周期调整令牌填充速度每次只填充1个令牌 semaphore.release(Math.min(permitsToFill, config.getMaxReleaseCount()));
semaphore.release(1);
} }
} }
@Override
public void close() {
scheduler.shutdown();
}
/** /**
* 创建定时器 * 创建定时器
* *
@ -60,4 +79,20 @@ public class TokenBucketRateLimiter extends SemaphoreRateLimiter {
final ThreadFactory threadFactory = new NamedThreadFactory("TokenBucketLimiterScheduler-", true); final ThreadFactory threadFactory = new NamedThreadFactory("TokenBucketLimiterScheduler-", true);
return new ScheduledThreadPoolExecutor(1, threadFactory); return new ScheduledThreadPoolExecutor(1, threadFactory);
} }
/**
* 启动定时器未定义则不启动
*/
private void scheduleLimitRefresh() {
if (null == this.scheduler) {
return;
}
final long limitRefreshPeriod = this.config.getRefreshPeriod().toNanos();
scheduler.scheduleAtFixedRate(
this::refreshLimit,
limitRefreshPeriod,
limitRefreshPeriod,
TimeUnit.NANOSECONDS
);
}
} }

View File

@ -10,8 +10,12 @@ public class SemaphoreRateLimiterTest {
@Test @Test
void test() { void test() {
final RateLimiterConfig rateLimiterConfig = RateLimiterConfig.of(Duration.ofSeconds(5), Duration.ofMillis(300), 5); final RateLimiterConfig rateLimiterConfig = RateLimiterConfig.of()
final RateLimiter rateLimiter = new FixedRateLimiter(rateLimiterConfig); .setCapacity(5)
.setMaxReleaseCount(5)
.setRefreshPeriod(Duration.ofMillis(300))
.setTimeout(Duration.ofSeconds(5));
final TokenBucketRateLimiter rateLimiter = new TokenBucketRateLimiter(rateLimiterConfig);
final boolean b = rateLimiter.tryAcquire(5); final boolean b = rateLimiter.tryAcquire(5);
Assertions.assertTrue(b); Assertions.assertTrue(b);
@ -27,5 +31,7 @@ public class SemaphoreRateLimiterTest {
// 超过数量 // 超过数量
final boolean b3 = rateLimiter.tryAcquire(1); final boolean b3 = rateLimiter.tryAcquire(1);
Assertions.assertFalse(b3); Assertions.assertFalse(b3);
rateLimiter.close();
} }
} }