!1022 异步工具类添加一次性获取所有结果方法

Merge pull request !1022 from 孔皮皮/v6-dev-async
This commit is contained in:
Looly 2023-06-24 12:01:38 +00:00 committed by Gitee
commit e526afcbee
No known key found for this signature in database
GPG Key ID: 173E9B9CA92EEF8F
2 changed files with 236 additions and 1 deletions

View File

@ -12,15 +12,24 @@
package org.dromara.hutool.core.thread; package org.dromara.hutool.core.thread;
import org.dromara.hutool.core.lang.Assert;
import org.dromara.hutool.core.stream.StreamUtil;
import java.lang.reflect.UndeclaredThrowableException; import java.lang.reflect.UndeclaredThrowableException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/** /**
* {@link CompletableFuture}异步工具类<br> * {@link CompletableFuture}异步工具类<br>
* {@link CompletableFuture} Future 的改进可以通过传入回调对象在任务完成后调用之 * {@link CompletableFuture} Future 的改进可以通过传入回调对象在任务完成后调用之
* *
* @author achao1441470436@gmail.com * @author achao1441470436@gmail.com
* @author kongweiguang
* @since 5.7.17 * @since 5.7.17
*/ */
public class AsyncUtil { public class AsyncUtil {
@ -42,7 +51,7 @@ public class AsyncUtil {
/** /**
* 等待任意一个任务执行完毕包裹了异常 * 等待任意一个任务执行完毕包裹了异常
* *
* @param <T> 任务返回值类型 * @param <T> 任务返回值类型
* @param tasks 并行任务 * @param tasks 并行任务
* @return 执行结束的任务返回值 * @return 执行结束的任务返回值
* @throws UndeclaredThrowableException 未受检异常 * @throws UndeclaredThrowableException 未受检异常
@ -72,4 +81,141 @@ public class AsyncUtil {
} }
} }
/**
* 获取所有任务的返回值
*
* @param <T> 任务返回值类型
* @param tasks 任务集合
* @return 任务结果集合
*/
public static <T> List<T> allOfGet(final List<CompletableFuture<T>> tasks) {
Assert.notEmpty(tasks);
return allOfGet(tasks, null);
}
/**
* 获取所有任务的返回值重载方法
*
* @param <T> 任务返回值类型
* @param tasks 任务集合
* @return 任务结果集合
*/
@SafeVarargs
public static <T> List<T> allOfGet(final CompletableFuture<T>... tasks) {
Assert.notEmpty(tasks);
return allOfGet(Arrays.asList(tasks), null);
}
/**
* 获取所有任务的返回值可以为异常任务添加异常处理方法
*
* @param <T> 任务内返回值的类型
* @param tasks 任务集合
* @param eHandler 异常处理方法
* @return 任务结果集合
*/
public static <T> List<T> allOfGet(final CompletableFuture<T>[] tasks, Function<Exception, T> eHandler) {
Assert.notEmpty(tasks);
return allOfGet(Arrays.asList(tasks), eHandler);
}
/**
* 获取所有任务的返回值可以为异常任务添加异常处理方法重载方法
*
* @param <T> 任务返回值类型
* @param tasks 任务集合
* @param eHandler 异常处理方法
* @return 任务结果集合
*/
public static <T> List<T> allOfGet(final List<CompletableFuture<T>> tasks, Function<Exception, T> eHandler) {
Assert.notEmpty(tasks);
return execute(tasks, eHandler, false);
}
/**
* 获取所有任务的返回值并行执行重载方法
*
* @param <T> 任务返回值类型
* @param tasks 任务集合
* @return 任务结果集合
*/
@SafeVarargs
public static <T> List<T> parallelAllOfGet(final CompletableFuture<T>... tasks) {
Assert.notEmpty(tasks);
return parallelAllOfGet(Arrays.asList(tasks), null);
}
/**
* 获取所有任务的返回值并行执行
*
* @param <T> 任务返回值类型
* @param tasks 任务集合
* @return 任务结果集合
*/
public static <T> List<T> parallelAllOfGet(final List<CompletableFuture<T>> tasks) {
Assert.notEmpty(tasks);
return parallelAllOfGet(tasks, null);
}
/**
* 获取所有任务的返回值并行执行可以为异常任务添加异常处理方法
*
* @param <T> 任务返回值类型
* @param tasks 任务集合
* @param eHandler 异常处理方法
* @return 任务结果集合
*/
public static <T> List<T> parallelAllOfGet(final CompletableFuture<T>[] tasks, Function<Exception, T> eHandler) {
Assert.notEmpty(tasks);
return parallelAllOfGet(Arrays.asList(tasks), eHandler);
}
/**
* 获取所有任务的返回值并行执行可以为异常任务添加异常处理方法重载方法
*
* @param <T> 任务返回值类型
* @param tasks 任务集合
* @param eHandler 异常处理方法
* @return 任务结果集合
*/
public static <T> List<T> parallelAllOfGet(final List<CompletableFuture<T>> tasks, Function<Exception, T> eHandler) {
Assert.notEmpty(tasks);
return execute(tasks, eHandler, true);
}
/**
* 处理任务集合
*
* @param <T> 任务返回值类型
* @param tasks 任务集合
* @param eHandler 异常处理方法
* @param isParallel 是否是并行 {@link Stream}
* @return 任务结果集合
*/
private static <T> List<T> execute(List<CompletableFuture<T>> tasks, Function<Exception, T> eHandler, boolean isParallel) {
return StreamUtil.of(tasks, isParallel)
.map(e -> {
try {
return e.get();
} catch (InterruptedException | ExecutionException ex) {
if (eHandler != null) {
return eHandler.apply(ex);
} else {
throw new RuntimeException(ex);
}
}
})
.collect(Collectors.toList());
}
} }

View File

@ -1,9 +1,13 @@
package org.dromara.hutool.core.thread; package org.dromara.hutool.core.thread;
import org.dromara.hutool.core.collection.ListUtil;
import org.dromara.hutool.core.lang.Console;
import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -35,4 +39,89 @@ public class AsyncUtilTest {
// 获取结果 // 获取结果
Assertions.assertEquals("hutool卫衣真暖和", AsyncUtil.get(hutool) + AsyncUtil.get(sweater) + AsyncUtil.get(warm)); Assertions.assertEquals("hutool卫衣真暖和", AsyncUtil.get(hutool) + AsyncUtil.get(sweater) + AsyncUtil.get(warm));
} }
@Test
@Disabled
public void allGetTest() {
final CompletableFuture<String> hutool = CompletableFuture.supplyAsync(() -> {
ThreadUtil.sleep(1, TimeUnit.SECONDS);
return "hutool";
});
final CompletableFuture<String> sweater = CompletableFuture.supplyAsync(() -> {
ThreadUtil.sleep(2, TimeUnit.SECONDS);
return "卫衣";
});
final CompletableFuture<String> warm = CompletableFuture.supplyAsync(() -> {
ThreadUtil.sleep(3, TimeUnit.SECONDS);
return "真暖和";
});
// 等待完成
List<String> list = AsyncUtil.allOfGet(ListUtil.of(hutool, sweater, warm));
// 获取结果
Assertions.assertEquals(Arrays.asList("hutool", "卫衣", "真暖和"), list);
}
@Test
@Disabled
public void allGetTestException() {
final CompletableFuture<String> hutool = CompletableFuture.supplyAsync(() -> {
ThreadUtil.sleep(1, TimeUnit.SECONDS);
return "hutool";
});
final CompletableFuture<String> sweater = CompletableFuture.supplyAsync(() -> {
int a = 1 / 0;
ThreadUtil.sleep(2, TimeUnit.SECONDS);
return "卫衣";
});
final CompletableFuture<String> warm = CompletableFuture.supplyAsync(() -> {
ThreadUtil.sleep(3, TimeUnit.SECONDS);
return "真暖和";
});
// 等待完成
List<String> list = AsyncUtil.allOfGet(ListUtil.of(hutool, sweater, warm), (e) -> "出错了");
// 获取结果
Assertions.assertEquals(Arrays.asList("hutool", "卫衣", "真暖和"), list);
}
@Test
@Disabled
public void parallelAllOfGetTest() {
final CompletableFuture<String> hutool = CompletableFuture.supplyAsync(() -> {
ThreadUtil.sleep(1, TimeUnit.SECONDS);
return "hutool";
});
final CompletableFuture<String> sweater = CompletableFuture.supplyAsync(() -> {
ThreadUtil.sleep(2, TimeUnit.SECONDS);
return "卫衣";
});
final CompletableFuture<String> warm = CompletableFuture.supplyAsync(() -> {
ThreadUtil.sleep(3, TimeUnit.SECONDS);
return "真暖和";
});
// 等待完成
List<String> list = AsyncUtil.parallelAllOfGet(ListUtil.of(hutool, sweater, warm));
// 获取结果
Assertions.assertEquals(Arrays.asList("hutool", "卫衣", "真暖和"), list);
}
@Test
@Disabled
public void parallelAllOfGetTestException() {
final CompletableFuture<String> hutool = CompletableFuture.supplyAsync(() -> {
ThreadUtil.sleep(1, TimeUnit.SECONDS);
return "hutool";
});
final CompletableFuture<String> sweater = CompletableFuture.supplyAsync(() -> {
int a = 1 / 0;
ThreadUtil.sleep(2, TimeUnit.SECONDS);
return "卫衣";
});
final CompletableFuture<String> warm = CompletableFuture.supplyAsync(() -> {
ThreadUtil.sleep(3, TimeUnit.SECONDS);
return "真暖和";
});
// 等待完成
List<String> list = AsyncUtil.parallelAllOfGet(ListUtil.of(hutool, sweater, warm), (e) -> "出错了");
Assertions.assertEquals(Arrays.asList("hutool", "出错了", "真暖和"), list);
}
} }