From 74344566a5a602067d0d60c3e1c16731fc3fc2b5 Mon Sep 17 00:00:00 2001 From: kongweiguang Date: Mon, 19 Jun 2023 13:05:25 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E5=BC=82=E6=AD=A5=E5=B7=A5?= =?UTF-8?q?=E5=85=B7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../dromara/hutool/core/thread/AsyncUtil.java | 148 +++++++++++++++++- .../hutool/core/thread/AsyncUtilTest.java | 89 +++++++++++ 2 files changed, 236 insertions(+), 1 deletion(-) diff --git a/hutool-core/src/main/java/org/dromara/hutool/core/thread/AsyncUtil.java b/hutool-core/src/main/java/org/dromara/hutool/core/thread/AsyncUtil.java index b7b5e87b4..1ab283d99 100644 --- a/hutool-core/src/main/java/org/dromara/hutool/core/thread/AsyncUtil.java +++ b/hutool-core/src/main/java/org/dromara/hutool/core/thread/AsyncUtil.java @@ -12,15 +12,24 @@ package org.dromara.hutool.core.thread; +import org.dromara.hutool.core.lang.Assert; +import org.dromara.hutool.core.stream.StreamUtil; + import java.lang.reflect.UndeclaredThrowableException; +import java.util.Arrays; +import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; /** * {@link CompletableFuture}异步工具类
* {@link CompletableFuture} 是 Future 的改进,可以通过传入回调对象,在任务完成后调用之 * * @author achao1441470436@gmail.com + * @author kongweiguang * @since 5.7.17 */ public class AsyncUtil { @@ -42,7 +51,7 @@ public class AsyncUtil { /** * 等待任意一个任务执行完毕,包裹了异常 * - * @param 任务返回值类型 + * @param 任务返回值类型 * @param tasks 并行任务 * @return 执行结束的任务返回值 * @throws UndeclaredThrowableException 未受检异常 @@ -72,4 +81,141 @@ public class AsyncUtil { } } + /** + * 获取所有任务的返回值 + * + * @param 任务返回值类型 + * @param tasks 任务集合 + * @return 任务结果集合 + */ + public static List allOfGet(final List> tasks) { + Assert.notEmpty(tasks); + + return allOfGet(tasks, null); + } + + /** + * 获取所有任务的返回值,重载方法 + * + * @param 任务返回值类型 + * @param tasks 任务集合 + * @return 任务结果集合 + */ + @SafeVarargs + public static List allOfGet(final CompletableFuture... tasks) { + Assert.notEmpty(tasks); + + return allOfGet(Arrays.asList(tasks), null); + } + + + /** + * 获取所有任务的返回值,可以为异常任务添加异常处理方法 + * + * @param 任务内返回值的类型 + * @param tasks 任务集合 + * @param eHandler 异常处理方法 + * @return 任务结果集合 + */ + public static List allOfGet(final CompletableFuture[] tasks, Function eHandler) { + Assert.notEmpty(tasks); + + return allOfGet(Arrays.asList(tasks), eHandler); + } + + /** + * 获取所有任务的返回值,可以为异常任务添加异常处理方法,重载方法 + * + * @param 任务返回值类型 + * @param tasks 任务集合 + * @param eHandler 异常处理方法 + * @return 任务结果集合 + */ + public static List allOfGet(final List> tasks, Function eHandler) { + Assert.notEmpty(tasks); + + return execute(tasks, eHandler, false); + } + + /** + * 获取所有任务的返回值,并行执行,重载方法 + * + * @param 任务返回值类型 + * @param tasks 任务集合 + * @return 任务结果集合 + */ + @SafeVarargs + public static List parallelAllOfGet(final CompletableFuture... tasks) { + Assert.notEmpty(tasks); + + return parallelAllOfGet(Arrays.asList(tasks), null); + } + + /** + * 获取所有任务的返回值,并行执行 + * + * @param 任务返回值类型 + * @param tasks 任务集合 + * @return 任务结果集合 + */ + public static List parallelAllOfGet(final List> tasks) { + Assert.notEmpty(tasks); + + return parallelAllOfGet(tasks, null); + } + + + /** + * 获取所有任务的返回值,并行执行,可以为异常任务添加异常处理方法 + * + * @param 任务返回值类型 + * @param tasks 任务集合 + * @param eHandler 异常处理方法 + * @return 任务结果集合 + */ + public static List parallelAllOfGet(final CompletableFuture[] tasks, Function eHandler) { + Assert.notEmpty(tasks); + + return parallelAllOfGet(Arrays.asList(tasks), eHandler); + } + + /** + * 获取所有任务的返回值,并行执行,可以为异常任务添加异常处理方法,重载方法 + * + * @param 任务返回值类型 + * @param tasks 任务集合 + * @param eHandler 异常处理方法 + * @return 任务结果集合 + */ + public static List parallelAllOfGet(final List> tasks, Function eHandler) { + Assert.notEmpty(tasks); + + return execute(tasks, eHandler, true); + } + + /** + * 处理任务集合 + * + * @param 任务返回值类型 + * @param tasks 任务集合 + * @param eHandler 异常处理方法 + * @param isParallel 是否是并行 {@link Stream} + * @return 任务结果集合 + */ + private static List execute(List> tasks, Function eHandler, boolean isParallel) { + return StreamUtil.of(tasks, isParallel) + .map(e -> { + try { + return e.get(); + } catch (InterruptedException | ExecutionException ex) { + if (eHandler != null) { + return eHandler.apply(ex); + } else { + throw new RuntimeException(ex); + } + } + }) + .collect(Collectors.toList()); + } + } diff --git a/hutool-core/src/test/java/org/dromara/hutool/core/thread/AsyncUtilTest.java b/hutool-core/src/test/java/org/dromara/hutool/core/thread/AsyncUtilTest.java index a6600fce0..171588232 100644 --- a/hutool-core/src/test/java/org/dromara/hutool/core/thread/AsyncUtilTest.java +++ b/hutool-core/src/test/java/org/dromara/hutool/core/thread/AsyncUtilTest.java @@ -1,9 +1,13 @@ package org.dromara.hutool.core.thread; +import org.dromara.hutool.core.collection.ListUtil; +import org.dromara.hutool.core.lang.Console; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; +import java.util.Arrays; +import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; @@ -35,4 +39,89 @@ public class AsyncUtilTest { // 获取结果 Assertions.assertEquals("hutool卫衣真暖和", AsyncUtil.get(hutool) + AsyncUtil.get(sweater) + AsyncUtil.get(warm)); } + + @Test + @Disabled + public void allGetTest() { + final CompletableFuture hutool = CompletableFuture.supplyAsync(() -> { + ThreadUtil.sleep(1, TimeUnit.SECONDS); + return "hutool"; + }); + final CompletableFuture sweater = CompletableFuture.supplyAsync(() -> { + ThreadUtil.sleep(2, TimeUnit.SECONDS); + return "卫衣"; + }); + final CompletableFuture warm = CompletableFuture.supplyAsync(() -> { + ThreadUtil.sleep(3, TimeUnit.SECONDS); + return "真暖和"; + }); + // 等待完成 + List list = AsyncUtil.allOfGet(ListUtil.of(hutool, sweater, warm)); + // 获取结果 + Assertions.assertEquals(Arrays.asList("hutool", "卫衣", "真暖和"), list); + } + + @Test + @Disabled + public void allGetTestException() { + final CompletableFuture hutool = CompletableFuture.supplyAsync(() -> { + ThreadUtil.sleep(1, TimeUnit.SECONDS); + return "hutool"; + }); + final CompletableFuture sweater = CompletableFuture.supplyAsync(() -> { + int a = 1 / 0; + ThreadUtil.sleep(2, TimeUnit.SECONDS); + return "卫衣"; + }); + final CompletableFuture warm = CompletableFuture.supplyAsync(() -> { + ThreadUtil.sleep(3, TimeUnit.SECONDS); + return "真暖和"; + }); + // 等待完成 + List list = AsyncUtil.allOfGet(ListUtil.of(hutool, sweater, warm), (e) -> "出错了"); + // 获取结果 + Assertions.assertEquals(Arrays.asList("hutool", "卫衣", "真暖和"), list); + } + + @Test + @Disabled + public void parallelAllOfGetTest() { + final CompletableFuture hutool = CompletableFuture.supplyAsync(() -> { + ThreadUtil.sleep(1, TimeUnit.SECONDS); + return "hutool"; + }); + final CompletableFuture sweater = CompletableFuture.supplyAsync(() -> { + ThreadUtil.sleep(2, TimeUnit.SECONDS); + return "卫衣"; + }); + final CompletableFuture warm = CompletableFuture.supplyAsync(() -> { + ThreadUtil.sleep(3, TimeUnit.SECONDS); + return "真暖和"; + }); + // 等待完成 + List list = AsyncUtil.parallelAllOfGet(ListUtil.of(hutool, sweater, warm)); + // 获取结果 + Assertions.assertEquals(Arrays.asList("hutool", "卫衣", "真暖和"), list); + } + + @Test + @Disabled + public void parallelAllOfGetTestException() { + final CompletableFuture hutool = CompletableFuture.supplyAsync(() -> { + ThreadUtil.sleep(1, TimeUnit.SECONDS); + return "hutool"; + }); + final CompletableFuture sweater = CompletableFuture.supplyAsync(() -> { + int a = 1 / 0; + ThreadUtil.sleep(2, TimeUnit.SECONDS); + return "卫衣"; + }); + final CompletableFuture warm = CompletableFuture.supplyAsync(() -> { + ThreadUtil.sleep(3, TimeUnit.SECONDS); + return "真暖和"; + }); + // 等待完成 + List list = AsyncUtil.parallelAllOfGet(ListUtil.of(hutool, sweater, warm), (e) -> "出错了"); + Assertions.assertEquals(Arrays.asList("hutool", "出错了", "真暖和"), list); + } }