From 3987f5b7ee31be03d970fdcde9652b014556c95f Mon Sep 17 00:00:00 2001 From: Looly Date: Thu, 31 Mar 2022 21:30:32 +0800 Subject: [PATCH] add methods --- CHANGELOG.md | 1 + .../cn/hutool/core/thread/BlockPolicy.java | 28 +++ .../hutool/core/thread/ExecutorBuilder.java | 2 +- .../cn/hutool/core/thread/RejectPolicy.java | 4 +- .../core/thread/ThreadFactoryBuilder.java | 28 ++- .../cn/hutool/core/thread/ThreadUtil.java | 187 ++++++++---------- .../RejectedExecutionHandlerUtility.java | 32 --- 7 files changed, 129 insertions(+), 153 deletions(-) create mode 100755 hutool-core/src/main/java/cn/hutool/core/thread/BlockPolicy.java delete mode 100644 hutool-core/src/main/java/cn/hutool/core/thread/rejected/RejectedExecutionHandlerUtility.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 95daf4a7e..ac3e4c630 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ * 【core 】 增加NodeListIter、ResettableIter * 【crypto 】 HmacAlgorithm增加SM4CMAC(issue#2206@Github) * 【http 】 增加HttpConfig,响应支持拦截(issue#2217@Github) +* 【core 】 增加BlockPolicy,ThreadUtil增加newFixedExecutor方法(pr#2231@Github) ### 🐞Bug修复 * 【core 】 IdcardUtil#getCityCodeByIdCard位数问题(issue#2224@Github) diff --git a/hutool-core/src/main/java/cn/hutool/core/thread/BlockPolicy.java b/hutool-core/src/main/java/cn/hutool/core/thread/BlockPolicy.java new file mode 100755 index 000000000..8128a483b --- /dev/null +++ b/hutool-core/src/main/java/cn/hutool/core/thread/BlockPolicy.java @@ -0,0 +1,28 @@ +package cn.hutool.core.thread; + +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ThreadPoolExecutor; + +/** + * 当任务队列过长时处于阻塞状态,直到添加到队列中 + * 如果阻塞过程中被中断,就会抛出{@link InterruptedException}异常
+ * 有时候在线程池内访问第三方接口,只希望固定并发数去访问,并且不希望丢弃任务时使用此策略,队列满的时候会处于阻塞状态(例如刷库的场景) + * + * @author luozongle + * @since 5.8.0 + */ +public class BlockPolicy implements RejectedExecutionHandler { + + public BlockPolicy() { + } + + @Override + public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { + try { + e.getQueue().put(r); + } catch (InterruptedException ex) { + throw new RejectedExecutionException("Task " + r + " rejected from " + e); + } + } +} diff --git a/hutool-core/src/main/java/cn/hutool/core/thread/ExecutorBuilder.java b/hutool-core/src/main/java/cn/hutool/core/thread/ExecutorBuilder.java index bdd5c95ca..ccb9c86d3 100644 --- a/hutool-core/src/main/java/cn/hutool/core/thread/ExecutorBuilder.java +++ b/hutool-core/src/main/java/cn/hutool/core/thread/ExecutorBuilder.java @@ -242,7 +242,7 @@ public class ExecutorBuilder implements Builder { workQueue = (corePoolSize <= 0) ? new SynchronousQueue<>() : new LinkedBlockingQueue<>(DEFAULT_QUEUE_CAPACITY); } final ThreadFactory threadFactory = (null != builder.threadFactory) ? builder.threadFactory : Executors.defaultThreadFactory(); - RejectedExecutionHandler handler = ObjectUtil.defaultIfNull(builder.handler, ThreadPoolExecutor.AbortPolicy::new); + RejectedExecutionHandler handler = ObjectUtil.defaultIfNull(builder.handler, RejectPolicy.ABORT.getValue()); final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(// corePoolSize, // diff --git a/hutool-core/src/main/java/cn/hutool/core/thread/RejectPolicy.java b/hutool-core/src/main/java/cn/hutool/core/thread/RejectPolicy.java index dda8d1350..36956ca47 100644 --- a/hutool-core/src/main/java/cn/hutool/core/thread/RejectPolicy.java +++ b/hutool-core/src/main/java/cn/hutool/core/thread/RejectPolicy.java @@ -21,7 +21,9 @@ public enum RejectPolicy { /** 如果执行程序尚未关闭,则位于工作队列头部的任务将被删除,然后重试执行程序(如果再次失败,则重复此过程) */ DISCARD_OLDEST(new ThreadPoolExecutor.DiscardOldestPolicy()), /** 由主线程来直接执行 */ - CALLER_RUNS(new ThreadPoolExecutor.CallerRunsPolicy()); + CALLER_RUNS(new ThreadPoolExecutor.CallerRunsPolicy()), + /** 当任务队列过长时处于阻塞状态,直到添加到队列中,固定并发数去访问,并且不希望丢弃任务时使用此策略 */ + BLOCK(new BlockPolicy()); private final RejectedExecutionHandler value; diff --git a/hutool-core/src/main/java/cn/hutool/core/thread/ThreadFactoryBuilder.java b/hutool-core/src/main/java/cn/hutool/core/thread/ThreadFactoryBuilder.java index 80d33cb6e..686f95cfd 100644 --- a/hutool-core/src/main/java/cn/hutool/core/thread/ThreadFactoryBuilder.java +++ b/hutool-core/src/main/java/cn/hutool/core/thread/ThreadFactoryBuilder.java @@ -15,24 +15,34 @@ import java.util.concurrent.atomic.AtomicLong; * @author looly * @since 4.1.9 */ -public class ThreadFactoryBuilder implements Builder{ +public class ThreadFactoryBuilder implements Builder { private static final long serialVersionUID = 1L; - /** 用于线程创建的线程工厂类 */ + /** + * 用于线程创建的线程工厂类 + */ private ThreadFactory backingThreadFactory; - /** 线程名的前缀 */ + /** + * 线程名的前缀 + */ private String namePrefix; - /** 是否守护线程,默认false */ + /** + * 是否守护线程,默认false + */ private Boolean daemon; - /** 线程优先级 */ + /** + * 线程优先级 + */ private Integer priority; - /** 未捕获异常处理器 */ + /** + * 未捕获异常处理器 + */ private UncaughtExceptionHandler uncaughtExceptionHandler; /** - * 创建{@link ThreadFactoryBuilder} + * 创建{@code ThreadFactoryBuilder} * - * @return {@link ThreadFactoryBuilder} + * @return {@code ThreadFactoryBuilder} */ public static ThreadFactoryBuilder create() { return new ThreadFactoryBuilder(); @@ -115,7 +125,7 @@ public class ThreadFactoryBuilder implements Builder{ /** * 构建 * - * @param builder {@link ThreadFactoryBuilder} + * @param builder {@code ThreadFactoryBuilder} * @return {@link ThreadFactory} */ private static ThreadFactory build(ThreadFactoryBuilder builder) { diff --git a/hutool-core/src/main/java/cn/hutool/core/thread/ThreadUtil.java b/hutool-core/src/main/java/cn/hutool/core/thread/ThreadUtil.java index 28c526135..b6f6d242b 100644 --- a/hutool-core/src/main/java/cn/hutool/core/thread/ThreadUtil.java +++ b/hutool-core/src/main/java/cn/hutool/core/thread/ThreadUtil.java @@ -1,7 +1,5 @@ package cn.hutool.core.thread; -import cn.hutool.core.thread.rejected.RejectedExecutionHandlerUtility; - import java.lang.Thread.UncaughtExceptionHandler; import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; @@ -43,114 +41,6 @@ public class ThreadUtil { return builder.build(); } - /** - * 获取一个新的线程池,默认的策略如下
- *
-	 *     1. 核心线程数与最大线程数为nThreads指定的大小
-	 *     2. 默认使用LinkedBlockingQueue,默认队列大小为1024
-	 * 
- * - * @param nThreads 线程池大小 - * @param threadNamePrefix 线程名称前缀 - * @return ExecutorService - */ - public static ExecutorService newFixedExecutor(int nThreads, String threadNamePrefix) { - ThreadFactory threadFactory = ThreadFactoryBuilder.create().setNamePrefix(threadNamePrefix).build(); - ExecutorBuilder builder = ExecutorBuilder.create().setThreadFactory(threadFactory); - if (nThreads > 0) { - builder.setCorePoolSize(nThreads) - .setMaxPoolSize(nThreads); - } - return builder.build(); - } - - /** - * 获取一个新的线程池,默认的策略如下
- *
-	 *     1. 核心线程数与最大线程数为nThreads指定的大小
-	 *     2. 默认使用LinkedBlockingQueue,默认队列大小为1024
-	 *     3. 当执行拒绝策略的时候会处于阻塞状态,直到能添加到队列中或者被{@link Thread#interrupt()}中断
-	 * 
- * - * @param nThreads 线程池大小 - * @param threadNamePrefix 线程名称前缀 - * @return ExecutorService - */ - public static ExecutorService newFixedBlockedExecutor(int nThreads, String threadNamePrefix) { - ThreadFactory threadFactory = ThreadFactoryBuilder.create().setNamePrefix(threadNamePrefix).build(); - ExecutorBuilder builder = ExecutorBuilder.create() - .setCorePoolSize(nThreads) - .setMaxPoolSize(nThreads) - .setThreadFactory(threadFactory) - .setHandler(new RejectedExecutionHandlerUtility.BlockPolicy()); - return builder.build(); - } - - /** - * 获取一个新的线程池,默认的策略如下
- *
-	 *     1. 核心线程数与最大线程数为nThreads指定的大小
-	 *     2. 默认使用LinkedBlockingQueue
-	 * 
- * - * @param nThreads 线程池大小 - * @param maximumQueueSize 队列大小 - * @param threadNamePrefix 线程名称前缀 - * @return ExecutorService - */ - public static ExecutorService newFixedExecutor(int nThreads, int maximumQueueSize, String threadNamePrefix) { - return newFixedExecutor(nThreads, maximumQueueSize, threadNamePrefix, null); - } - - /** - * 获取一个新的线程池,默认的策略如下
- *
-	 *     1. 核心线程数与最大线程数为nThreads指定的大小
-	 *     2. 默认使用LinkedBlockingQueue
-	 *     3. 当执行拒绝策略的时候会处于阻塞状态,直到能添加到队列中或者被{@link Thread#interrupt()}中断
-	 * 
- * - * @param nThreads 线程池大小 - * @param threadNamePrefix 线程名称前缀 - * @return ExecutorService - */ - public static ExecutorService newFixedBlockingExecutor(int nThreads, int maximumQueueSize, String threadNamePrefix) { - return newFixedExecutor(nThreads, - maximumQueueSize, - threadNamePrefix, - new RejectedExecutionHandlerUtility.BlockPolicy()); - } - - /** - * 获得一个新的线程池,默认策略如下
- *
-	 *     1. 核心线程数与最大线程数为nThreads指定的大小
-	 *     2. 默认使用LinkedBlockingQueue
-	 * 
- * - * @param nThreads 线程池大小 - * @param maximumQueueSize 队列大小 - * @param threadNamePrefix 线程名称前缀 - * @param handler 拒绝策略 - * @return ExecutorService - */ - public static ExecutorService newFixedExecutor(int nThreads, - int maximumQueueSize, - String threadNamePrefix, - RejectedExecutionHandler handler) { - - ThreadFactory threadFactory = ThreadFactoryBuilder.create().setNamePrefix(threadNamePrefix).build(); - - ExecutorBuilder builder = ExecutorBuilder.create() - .setCorePoolSize(nThreads) - .setMaxPoolSize(nThreads) - .setWorkQueue(new LinkedBlockingQueue<>(maximumQueueSize)) - .setThreadFactory(threadFactory) - .setHandler(handler); - - return builder.build(); - } - /** * 获得一个新的线程池,默认的策略如下: *
@@ -240,6 +130,71 @@ public class ThreadUtil {
 		return ExecutorBuilder.create().setCorePoolSize(poolSize).setMaxPoolSize(poolSize).setKeepAliveTime(0L).build();
 	}
 
+	/**
+	 * 获取一个新的线程池,默认的策略如下
+ *
+	 *     1. 核心线程数与最大线程数为nThreads指定的大小
+	 *     2. 默认使用LinkedBlockingQueue,默认队列大小为1024
+	 *     3. 如果isBlocked为{code true},当执行拒绝策略的时候会处于阻塞状态,直到能添加到队列中或者被{@link Thread#interrupt()}中断
+	 * 
+ * + * @param nThreads 线程池大小 + * @param threadNamePrefix 线程名称前缀 + * @param isBlocked 是否使用{@link BlockPolicy}策略 + * @return ExecutorService + * @author luozongle + * @since 5.8.0 + */ + public static ExecutorService newFixedExecutor(int nThreads, String threadNamePrefix, boolean isBlocked) { + return newFixedExecutor(nThreads, 1024, threadNamePrefix, isBlocked); + } + + /** + * 获取一个新的线程池,默认的策略如下
+ *
+	 *     1. 核心线程数与最大线程数为nThreads指定的大小
+	 *     2. 默认使用LinkedBlockingQueue
+	 *     3. 当执行拒绝策略的时候会处于阻塞状态,直到能添加到队列中或者被{@link Thread#interrupt()}中断
+	 * 
+ * + * @param nThreads 线程池大小 + * @param threadNamePrefix 线程名称前缀 + * @return ExecutorService + * @author luozongle + * @since 5.8.0 + */ + public static ExecutorService newFixedExecutor(int nThreads, int maximumQueueSize, String threadNamePrefix, boolean isBlocked) { + return newFixedExecutor(nThreads, maximumQueueSize, threadNamePrefix, + (isBlocked ? RejectPolicy.BLOCK : RejectPolicy.ABORT).getValue()); + } + + /** + * 获得一个新的线程池,默认策略如下
+ *
+	 *     1. 核心线程数与最大线程数为nThreads指定的大小
+	 *     2. 默认使用LinkedBlockingQueue
+	 * 
+ * + * @param nThreads 线程池大小 + * @param maximumQueueSize 队列大小 + * @param threadNamePrefix 线程名称前缀 + * @param handler 拒绝策略 + * @return ExecutorService + * @author luozongle + * @since 5.8.0 + */ + public static ExecutorService newFixedExecutor(int nThreads, + int maximumQueueSize, + String threadNamePrefix, + RejectedExecutionHandler handler) { + return ExecutorBuilder.create() + .setCorePoolSize(nThreads).setMaxPoolSize(nThreads) + .setWorkQueue(new LinkedBlockingQueue<>(maximumQueueSize)) + .setThreadFactory(createThreadFactory(threadNamePrefix)) + .setHandler(handler) + .build(); + } + /** * 直接在公共线程池中执行线程 * @@ -501,6 +456,18 @@ public class ThreadUtil { return ThreadFactoryBuilder.create(); } + /** + * 创建自定义线程名称前缀的{@link ThreadFactory} + * + * @param threadNamePrefix 线程名称前缀 + * @return {@link ThreadFactory} + * @see ThreadFactoryBuilder#build() + * @since 5.8.0 + */ + public static ThreadFactory createThreadFactory(String threadNamePrefix) { + return ThreadFactoryBuilder.create().setNamePrefix(threadNamePrefix).build(); + } + /** * 结束线程,调用此方法后,线程将抛出 {@link InterruptedException}异常 * diff --git a/hutool-core/src/main/java/cn/hutool/core/thread/rejected/RejectedExecutionHandlerUtility.java b/hutool-core/src/main/java/cn/hutool/core/thread/rejected/RejectedExecutionHandlerUtility.java deleted file mode 100644 index d85a6f907..000000000 --- a/hutool-core/src/main/java/cn/hutool/core/thread/rejected/RejectedExecutionHandlerUtility.java +++ /dev/null @@ -1,32 +0,0 @@ -package cn.hutool.core.thread.rejected; - -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.RejectedExecutionHandler; -import java.util.concurrent.ThreadPoolExecutor; - -/** - * 线程池拒绝策略工具类 - * - * @author luozongle - */ -public class RejectedExecutionHandlerUtility { - - /** - * 当任务队列过长时处于阻塞状态,直到添加到队列中 - * 如果阻塞过程中被中断,就会抛出{@link InterruptedException}异常 - */ - public static class BlockPolicy implements RejectedExecutionHandler { - - public BlockPolicy() { - } - - @Override - public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { - try { - e.getQueue().put(r); - } catch (InterruptedException ex) { - throw new RejectedExecutionException("Task " + r + " rejected from " + e); - } - } - } -}