This commit is contained in:
Looly 2024-08-20 09:02:57 +08:00
parent ec5fcaa76b
commit 77e4600d44
5 changed files with 97 additions and 31 deletions

View File

@ -19,6 +19,7 @@ package org.dromara.hutool.core.thread;
import org.dromara.hutool.core.util.RuntimeUtil; import org.dromara.hutool.core.util.RuntimeUtil;
import java.lang.Thread.UncaughtExceptionHandler; import java.lang.Thread.UncaughtExceptionHandler;
import java.time.Duration;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService; import java.util.concurrent.CompletionService;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
@ -203,9 +204,9 @@ public class ThreadUtil {
* @since 5.8.0 * @since 5.8.0
*/ */
public static ThreadPoolExecutor newFixedExecutor(final int nThreads, public static ThreadPoolExecutor newFixedExecutor(final int nThreads,
final int maximumQueueSize, final int maximumQueueSize,
final String threadNamePrefix, final String threadNamePrefix,
final RejectedExecutionHandler handler) { final RejectedExecutionHandler handler) {
return ExecutorBuilder.of() return ExecutorBuilder.of()
.setCorePoolSize(nThreads).setMaxPoolSize(nThreads) .setCorePoolSize(nThreads).setMaxPoolSize(nThreads)
.setWorkQueue(new LinkedBlockingQueue<>(maximumQueueSize)) .setWorkQueue(new LinkedBlockingQueue<>(maximumQueueSize))
@ -429,6 +430,8 @@ public class ThreadUtil {
return t; return t;
} }
// region ----- sleep
/** /**
* 挂起当前线程 * 挂起当前线程
* *
@ -458,6 +461,20 @@ public class ThreadUtil {
return sleep(millis.longValue()); return sleep(millis.longValue());
} }
/**
* 挂起当前线程
*
* @param duration 挂起的时长
* @return 被中断返回false否则true
* @since 6.0.0
*/
public static boolean sleep(final Duration duration) {
if (duration == null) {
return true;
}
return sleep(duration.toMillis());
}
/** /**
* 挂起当前线程 * 挂起当前线程
* *
@ -491,6 +508,20 @@ public class ThreadUtil {
return safeSleep(millis.longValue()); return safeSleep(millis.longValue());
} }
/**
* 挂起当前线程
*
* @param duration 挂起的时长
* @return 被中断返回false否则true
* @since 6.0.0
*/
public static boolean safeSleep(final Duration duration) {
if (duration == null) {
return true;
}
return safeSleep(duration.toMillis());
}
/** /**
* 考虑{@link Thread#sleep(long)}方法有可能时间不足给定毫秒数此方法保证sleep时间不小于给定的毫秒数 * 考虑{@link Thread#sleep(long)}方法有可能时间不足给定毫秒数此方法保证sleep时间不小于给定的毫秒数
* *
@ -513,6 +544,7 @@ public class ThreadUtil {
} }
return true; return true;
} }
// endregion
/** /**
* @return 获得堆栈列表 * @return 获得堆栈列表

View File

@ -25,6 +25,15 @@ package org.dromara.hutool.core.thread.ratelimiter;
*/ */
public interface RateLimiter { public interface RateLimiter {
/**
* 尝试获取许可非阻塞方法
*
* @return {@code true}表示成功获取{@code false}表示无足够的许可可获取此时需要等待给定的时间
*/
default boolean tryAcquire() {
return tryAcquire(1);
}
/** /**
* 尝试获取许可非阻塞方法 * 尝试获取许可非阻塞方法
* *
@ -32,4 +41,18 @@ public interface RateLimiter {
* @return {@code true}表示成功获取{@code false}表示无足够的许可可获取此时需要等待给定的时间 * @return {@code true}表示成功获取{@code false}表示无足够的许可可获取此时需要等待给定的时间
*/ */
boolean tryAcquire(int permits); boolean tryAcquire(int permits);
/**
* 获取许可阻塞方法如果没有足够的许可则阻塞等待
*/
default void acquire() {
acquire(1);
}
/**
* 获取许可阻塞方法如果没有足够的许可则阻塞等待
*
* @param permits 需要的许可数
*/
void acquire(int permits);
} }

View File

@ -16,6 +16,8 @@
package org.dromara.hutool.core.thread.ratelimiter; package org.dromara.hutool.core.thread.ratelimiter;
import java.time.Duration;
/** /**
* 限流通用配置 * 限流通用配置
* *
@ -27,23 +29,23 @@ public class RateLimiterConfig {
/** /**
* 创建限流配置 * 创建限流配置
* *
* @param timeoutDuration 超时时间即超过这个时间没有获取到许可则返回false * @param timeout 超时时间即超过这个时间没有获取到许可则返回false
* @param limitRefreshPeriod 限制刷新周期即每多少时间刷新一次 * @param limitRefreshPeriod 限制刷新周期即每多少时间刷新一次
* @param limitForPeriod 个周期的许可数 * @param limitForPeriod 个周期的许可数
* @return RateLimiterConfig * @return RateLimiterConfig
*/ */
public static RateLimiterConfig of(final long timeoutDuration, final long limitRefreshPeriod, final int limitForPeriod) { public static RateLimiterConfig of(final Duration timeout, final Duration limitRefreshPeriod, final int limitForPeriod) {
return new RateLimiterConfig(timeoutDuration, limitRefreshPeriod, limitForPeriod); return new RateLimiterConfig(timeout, limitRefreshPeriod, limitForPeriod);
} }
/** /**
* 超时时间即超过这个时间没有获取到许可则返回false单位毫秒 * 超时时间即超过这个时间没有获取到许可则返回false
*/ */
private final long timeoutDuration; private final Duration timeout;
/** /**
* 限制刷新周期即每多少时间刷新一次单位毫秒 * 限制刷新周期即每多少时间刷新一次
*/ */
private final long limitRefreshPeriod; private final Duration limitRefreshPeriod;
/** /**
* 每个周期的许可数 * 每个周期的许可数
*/ */
@ -52,12 +54,12 @@ public class RateLimiterConfig {
/** /**
* 构造 * 构造
* *
* @param timeoutDuration 超时时间即超过这个时间没有获取到许可则返回false * @param timeout 超时时间即超过这个时间没有获取到许可则返回false
* @param limitRefreshPeriod 限制刷新周期即每多少时间刷新一次 * @param limitRefreshPeriod 限制刷新周期即每多少时间刷新一次
* @param limitForPeriod 个周期的许可数 * @param limitForPeriod 个周期的许可数
*/ */
public RateLimiterConfig(final long timeoutDuration, final long limitRefreshPeriod, final int limitForPeriod) { public RateLimiterConfig(final Duration timeout, final Duration limitRefreshPeriod, final int limitForPeriod) {
this.timeoutDuration = timeoutDuration; this.timeout = timeout;
this.limitRefreshPeriod = limitRefreshPeriod; this.limitRefreshPeriod = limitRefreshPeriod;
this.limitForPeriod = limitForPeriod; this.limitForPeriod = limitForPeriod;
} }
@ -67,8 +69,8 @@ public class RateLimiterConfig {
* *
* @return 超时时间单位毫秒 * @return 超时时间单位毫秒
*/ */
public long getTimeoutDuration() { public Duration getTimeout() {
return timeoutDuration; return timeout;
} }
/** /**
@ -76,7 +78,7 @@ public class RateLimiterConfig {
* *
* @return 限制刷新周期单位毫秒 * @return 限制刷新周期单位毫秒
*/ */
public long getLimitRefreshPeriod() { public Duration getLimitRefreshPeriod() {
return limitRefreshPeriod; return limitRefreshPeriod;
} }

View File

@ -31,39 +31,39 @@ import java.util.concurrent.*;
*/ */
public class SemaphoreRateLimiter implements RateLimiter { public class SemaphoreRateLimiter implements RateLimiter {
private final RateLimiterConfig rateLimiterConfig; private final RateLimiterConfig config;
private final ScheduledExecutorService scheduler; private final ScheduledExecutorService scheduler;
private final Semaphore semaphore; private final Semaphore semaphore;
/** /**
* 构造 * 构造
* *
* @param rateLimiterConfig 限流配置 * @param config 限流配置
*/ */
public SemaphoreRateLimiter(final RateLimiterConfig rateLimiterConfig) { public SemaphoreRateLimiter(final RateLimiterConfig config) {
this(rateLimiterConfig, null); this(config, null);
} }
/** /**
* 构造 * 构造
* *
* @param rateLimiterConfig 限流配置 * @param config 限流配置
* @param semaphore {@link Semaphore} * @param semaphore {@link Semaphore}
*/ */
public SemaphoreRateLimiter(final RateLimiterConfig rateLimiterConfig, final Semaphore semaphore) { public SemaphoreRateLimiter(final RateLimiterConfig config, final Semaphore semaphore) {
this(rateLimiterConfig, semaphore, null); this(config, semaphore, null);
} }
/** /**
* 构造 * 构造
* *
* @param rateLimiterConfig 限流配置 * @param config 限流配置
* @param semaphore {@link Semaphore} * @param semaphore {@link Semaphore}
* @param scheduler 定时器 * @param scheduler 定时器
*/ */
public SemaphoreRateLimiter(final RateLimiterConfig rateLimiterConfig, final Semaphore semaphore, final ScheduledExecutorService scheduler) { public SemaphoreRateLimiter(final RateLimiterConfig config, final Semaphore semaphore, final ScheduledExecutorService scheduler) {
this.rateLimiterConfig = Assert.notNull(rateLimiterConfig); this.config = Assert.notNull(config);
this.semaphore = Opt.ofNullable(semaphore).orElseGet(() -> new Semaphore(rateLimiterConfig.getLimitForPeriod())); this.semaphore = Opt.ofNullable(semaphore).orElseGet(() -> new Semaphore(config.getLimitForPeriod()));
this.scheduler = Opt.ofNullable(scheduler).orElseGet(this::configureScheduler); this.scheduler = Opt.ofNullable(scheduler).orElseGet(this::configureScheduler);
//启动定时器 //启动定时器
scheduleLimitRefresh(); scheduleLimitRefresh();
@ -74,6 +74,11 @@ public class SemaphoreRateLimiter implements RateLimiter {
return semaphore.tryAcquire(permits); return semaphore.tryAcquire(permits);
} }
@Override
public void acquire(final int permits) {
semaphore.acquireUninterruptibly(permits);
}
/** /**
* 刷新限制填满许可数为{@link RateLimiterConfig#getLimitForPeriod()}<br> * 刷新限制填满许可数为{@link RateLimiterConfig#getLimitForPeriod()}<br>
* 用户可手动调用此方法填满许可 * 用户可手动调用此方法填满许可
@ -81,7 +86,7 @@ public class SemaphoreRateLimiter implements RateLimiter {
* @see RateLimiterConfig#getLimitForPeriod() * @see RateLimiterConfig#getLimitForPeriod()
*/ */
public void refreshLimit() { public void refreshLimit() {
semaphore.release(this.rateLimiterConfig.getLimitForPeriod() - semaphore.availablePermits()); semaphore.release(this.config.getLimitForPeriod() - semaphore.availablePermits());
} }
/** /**
@ -98,12 +103,12 @@ public class SemaphoreRateLimiter implements RateLimiter {
* 启动定时器 * 启动定时器
*/ */
private void scheduleLimitRefresh() { private void scheduleLimitRefresh() {
final long limitRefreshPeriod = this.rateLimiterConfig.getLimitRefreshPeriod(); final long limitRefreshPeriod = this.config.getLimitRefreshPeriod().toNanos();
scheduler.scheduleAtFixedRate( scheduler.scheduleAtFixedRate(
this::refreshLimit, this::refreshLimit,
limitRefreshPeriod, limitRefreshPeriod,
limitRefreshPeriod, limitRefreshPeriod,
TimeUnit.MILLISECONDS TimeUnit.NANOSECONDS
); );
} }
} }

View File

@ -4,11 +4,13 @@ import org.dromara.hutool.core.thread.ThreadUtil;
import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import java.time.Duration;
public class SemaphoreRateLimiterTest { public class SemaphoreRateLimiterTest {
@Test @Test
void test() { void test() {
final RateLimiterConfig rateLimiterConfig = RateLimiterConfig.of(5000, 300, 5); final RateLimiterConfig rateLimiterConfig = RateLimiterConfig.of(Duration.ofSeconds(5), Duration.ofMillis(300), 5);
final RateLimiter rateLimiter = new SemaphoreRateLimiter(rateLimiterConfig); final RateLimiter rateLimiter = new SemaphoreRateLimiter(rateLimiterConfig);
final boolean b = rateLimiter.tryAcquire(5); final boolean b = rateLimiter.tryAcquire(5);
@ -16,7 +18,9 @@ public class SemaphoreRateLimiterTest {
// 超过数量 // 超过数量
final boolean b1 = rateLimiter.tryAcquire(1); final boolean b1 = rateLimiter.tryAcquire(1);
Assertions.assertFalse(b1); Assertions.assertFalse(b1);
ThreadUtil.sleep(310); ThreadUtil.sleep(310);
// 填充新的许可 // 填充新的许可
final boolean b2 = rateLimiter.tryAcquire(5); final boolean b2 = rateLimiter.tryAcquire(5);
Assertions.assertTrue(b2); Assertions.assertTrue(b2);