diff --git a/hutool-core/src/main/java/cn/hutool/core/stream/FastStream.java b/hutool-core/src/main/java/cn/hutool/core/stream/FastStream.java new file mode 100644 index 000000000..04df601b9 --- /dev/null +++ b/hutool-core/src/main/java/cn/hutool/core/stream/FastStream.java @@ -0,0 +1,1380 @@ +package cn.hutool.core.stream; + + +import cn.hutool.core.lang.Console; +import cn.hutool.core.lang.Opt; + +import java.util.*; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.*; +import java.util.stream.*; + +/** + * 对Stream的封装和拓展,作者经对比了vavr、eclipse-collection、stream-ex以及其他语言的api,结合日常使用习惯,进行封装和拓展 + * Stream为集合提供了一些易用api,它让开发人员能使用声明式编程的方式去编写代码 + * 它分为中间操作和结束操作 + * 中间操作分为 + * + * 结束操作分为 + * + * 流只有在 结束操作 时才会真正触发执行以往的 中间操作 + *

+ * 它分为串行流和并行流 + * 并行流会使用拆分器{@link Spliterator}将操作拆分为多个异步任务{@link java.util.concurrent.ForkJoinTask}执行 + * 这些异步任务默认使用{@link java.util.concurrent.ForkJoinPool}线程池进行管理 + * + * @author VampireAchao + * @see java.util.stream.Stream + */ +public class FastStream implements Stream, Iterable { + + protected final Stream stream; + + FastStream(Stream stream) { + this.stream = stream; + } + + /** + * 返回{@code FastStream}的建造器 + * + * @param 元素的类型 + * @return a stream builder + */ + public static FastStreamBuilder builder() { + return new FastStreamBuilder() { + private final Builder streamBuilder = Stream.builder(); + + @Override + public void accept(T t) { + streamBuilder.accept(t); + } + + @Override + public FastStream build() { + return new FastStream<>(streamBuilder.build()); + } + }; + } + + /** + * 返回空的串行流 + * + * @param 元素类型 + * @return 一个空的串行流 + */ + public static FastStream empty() { + return new FastStream<>(Stream.empty()); + } + + /** + * 返回包含单个元素的串行流 + * + * @param t 单个元素 + * @param 元素类型 + * @return 包含单个元素的串行流 + */ + public static FastStream of(T t) { + return new FastStream<>(Stream.of(t)); + } + + /** + * 返回包含指定元素的串行流 + * + * @param values 指定元素 + * @param 元素类型 + * @return 包含指定元素的串行流 + * 从一个安全数组中创建流 + */ + @SafeVarargs + @SuppressWarnings("varargs") + public static FastStream of(T... values) { + return new FastStream<>(Arrays.stream(values)); + } + + /** + * 返回无限有序流 + * 该流由 初始值 以及执行 迭代函数 进行迭代获取到元素 + *

+ * 例如 + * {@code FastStream.iterate(0, i -> i + 1)} + * 就可以创建从0开始,每次+1的无限流,使用{@link FastStream#limit(long)}可以限制元素个数 + *

+ * + * @param 元素类型 + * @param seed 初始值 + * @param f 用上一个元素作为参数执行并返回一个新的元素 + * @return 无限有序流 + */ + public static FastStream iterate(final T seed, final UnaryOperator f) { + return new FastStream<>(Stream.iterate(seed, f)); + } + + /** + * 返回无限有序流 + * 该流由 初始值 然后判断条件 以及执行 迭代函数 进行迭代获取到元素 + *

+ * 例如 + * {@code FastStream.iterate(0, i -> i < 3, i -> ++i)} + * 就可以创建包含元素0,1,2的流,使用{@link FastStream#limit(long)}可以限制元素个数 + *

+ * + * @param 元素类型 + * @param hasNext 条件值 + * @param seed 初始值 + * @param f 用上一个元素作为参数执行并返回一个新的元素 + * @return 无限有序流 + */ + public static FastStream iterate(T seed, Predicate hasNext, UnaryOperator next) { + Objects.requireNonNull(next); + Objects.requireNonNull(hasNext); + Spliterator spliterator = new Spliterators.AbstractSpliterator(Long.MAX_VALUE, + Spliterator.ORDERED | Spliterator.IMMUTABLE) { + T prev; + boolean started; + boolean finished; + + @Override + public boolean tryAdvance(Consumer action) { + Objects.requireNonNull(action); + if (finished) { + return false; + } + T t; + if (started) { + t = next.apply(prev); + } else { + t = seed; + started = true; + } + if (!hasNext.test(t)) { + prev = null; + finished = true; + return false; + } + prev = t; + action.accept(prev); + return true; + } + + @Override + public void forEachRemaining(Consumer action) { + Objects.requireNonNull(action); + if (finished) { + return; + } + finished = true; + T t = started ? next.apply(prev) : seed; + prev = null; + while (hasNext.test(t)) { + action.accept(t); + t = next.apply(t); + } + } + }; + return new FastStream<>(StreamSupport.stream(spliterator, false)); + } + + /** + * 返回无限串行无序流 + * 其中每一个元素都由给定的{@code Supplier}生成 + * 适用场景在一些生成常量流、随机元素等 + * + * @param 元素类型 + * @param s 用来生成元素的 {@code Supplier} + * @return 无限串行无序流 + */ + public static FastStream generate(Supplier s) { + return new FastStream<>(Stream.generate(s)); + } + + /** + * 创建一个惰性拼接流,其元素是第一个流的所有元素,然后是第二个流的所有元素。 + * 如果两个输入流都是有序的,则结果流是有序的,如果任一输入流是并行的,则结果流是并行的。 + * 当结果流关闭时,两个输入流的关闭处理程序都会被调用。 + * + * @param 元素类型 + * @param a 第一个流 + * @param b 第二个流 + * @return 拼接两个流之后的流 + * @implNote 从重复串行流进行拼接时可能会导致深度调用链甚至抛出 {@code StackOverflowException} + */ + public static FastStream concat(FastStream a, FastStream b) { + return new FastStream<>(Stream.concat(a, b)); + } + + /** + * 通过实现了{@link Iterable}接口的对象创建串行流 + * + * @param iterable 实现了{@link Iterable}接口的对象 + * @param 元素类型 + * @return 流 + */ + public static FastStream of(Iterable iterable) { + return of(iterable, false); + } + + /** + * 通过传入的{@link Iterable}创建流 + * + * @param iterable {@link Iterable} + * @param parallel 是否并行 + * @param 元素类型 + * @return 流 + */ + public static FastStream of(Iterable iterable, boolean parallel) { + return Optional.ofNullable(iterable).map(Iterable::spliterator).map(spliterator -> StreamSupport.stream(spliterator, parallel)).map(FastStream::new).orElseGet(FastStream::empty); + } + + /** + * 通过传入的{@link Stream}创建流 + * + * @param stream {@link Stream} + * @param 元素类型 + * @return 流 + */ + public static FastStream of(Stream stream) { + return new FastStream<>(stream); + } + + /** + * 拆分字符串,转换为串行流 + * + * @param str 字符串 + * @param regex 正则 + * @return 拆分后元素组成的流 + */ + public static FastStream split(CharSequence str, String regex) { + return Opt.ofBlankAble(str).map(String::valueOf).map(s -> s.split(regex)).map(FastStream::of).orElseGet(FastStream::empty); + } + + /** + * 过滤元素,返回与指定断言匹配的元素组成的流 + * 这是一个无状态中间操作 + * + * @param predicate 断言 + * @return 返回叠加过滤操作后的流 + */ + @Override + public FastStream filter(Predicate predicate) { + return new FastStream<>(stream.filter(predicate)); + } + + /** + * 过滤元素,返回与 指定操作结果 匹配 指定值 的元素组成的流 + * 这是一个无状态中间操作 + * + * @param mapper 操作 + * @param value 用来匹配的值 + * @return 与 指定操作结果 匹配 指定值 的元素组成的流 + */ + public FastStream filter(Function mapper, R value) { + return filter(e -> Objects.equals(Opt.ofNullable(e).map(mapper).get(), value)); + } + + + /** + * 过滤元素,返回与指定断言匹配的元素组成的流,断言带下标,并行流时下标永远为-1 + * 这是一个无状态中间操作 + * + * @param predicate 断言 + * @return 返回叠加过滤操作后的流 + */ + public FastStream filterIdx(BiPredicate predicate) { + AtomicInteger index = new AtomicInteger(-1); + return filter(e -> predicate.test(e, isParallel() ? index.get() : index.incrementAndGet())); + } + + /** + * 过滤掉空元素 + * + * @return 过滤后的流 + */ + public FastStream nonNull() { + return new FastStream<>(stream.filter(Objects::nonNull)); + } + + /** + * 返回与指定函数将元素作为参数执行的结果组成的流 + * 这是一个无状态中间操作 + * + * @param mapper 指定的函数 + * @param 函数执行后返回的类型 + * @return 返回叠加操作后的流 + */ + @Override + public FastStream map(Function mapper) { + return new FastStream<>(stream.map(mapper)); + } + + /** + * 返回与指定函数将元素作为参数执行的结果组成的流,操作带下标,并行流时下标永远为-1 + * 这是一个无状态中间操作 + * + * @param mapper 指定的函数 + * @param 函数执行后返回的类型 + * @return 返回叠加操作后的流 + */ + public FastStream mapIdx(BiFunction mapper) { + AtomicInteger index = new AtomicInteger(-1); + return map(e -> mapper.apply(e, isParallel() ? index.get() : index.incrementAndGet())); + } + + /** + * 扩散流操作,可能影响流元素个数,将原有流元素执行mapper操作,返回多个流所有元素组成的流 + * 这是一个无状态中间操作 + * 例如,将users里所有user的id和parentId组合在一起,形成一个新的流: + *
{@code
+	 *     FastStream ids = FastStream.of(users).flatMap(user -> FastStream.of(user.getId(), user.getParentId()));
+	 * }
+ * + * @param mapper 操作,返回流 + * @param 拆分后流的元素类型 + * @return 返回叠加拆分操作后的流 + */ + @Override + public FastStream flatMap(Function> mapper) { + return new FastStream<>(stream.flatMap(mapper)); + } + + /** + * 扩散流操作,可能影响流元素个数,将原有流元素执行mapper操作,返回多个流所有元素组成的流,操作带下标,并行流时下标永远为-1 + * 这是一个无状态中间操作 + * + * @param mapper 操作,返回流 + * @param 拆分后流的元素类型 + * @return 返回叠加拆分操作后的流 + */ + public FastStream flatMapIdx(BiFunction> mapper) { + AtomicInteger index = new AtomicInteger(-1); + return flatMap(e -> mapper.apply(e, isParallel() ? index.get() : index.incrementAndGet())); + } + + /** + * 和{@link FastStream#map(Function)}一样,只不过函数的返回值必须为int类型 + * 这是一个无状态中间操作 + * + * @param mapper 返回值为int类型的函数 + * @return 叠加操作后元素类型全为int的流 + */ + @Override + public IntStream mapToInt(ToIntFunction mapper) { + return stream.mapToInt(mapper); + } + + /** + * 和{@link FastStream#map(Function)}一样,只不过函数的返回值必须为long类型 + * 这是一个无状态中间操作 + * + * @param mapper 返回值为long类型的函数 + * @return 叠加操作后元素类型全为long的流 + */ + @Override + public LongStream mapToLong(ToLongFunction mapper) { + return stream.mapToLong(mapper); + } + + /** + * 和{@link FastStream#map(Function)}一样,只不过函数的返回值必须为double类型 + * 这是一个无状态中间操作 + * + * @param mapper 返回值为double类型的函数 + * @return 叠加操作后元素类型全为double的流 + */ + @Override + public DoubleStream mapToDouble(ToDoubleFunction mapper) { + return stream.mapToDouble(mapper); + } + + /** + * 扩散流操作,可能影响流元素个数,将原有流元素执行mapper操作,返回多个流所有元素组成的流 + * 这是一个无状态中间操作 + * 例如,将users里所有user的id和parentId组合在一起,形成一个新的流: + *
{@code
+	 *     FastStream ids = FastStream.of(users).flatMap(user -> FastStream.of(user.getId(), user.getParentId()));
+	 * }
+ * + * @param mapper 操作,返回可迭代对象 + * @param 拆分后流的元素类型 + * @return 返回叠加拆分操作后的流 + */ + public FastStream flatMapIter(Function> mapper) { + return flatMap(w -> Opt.of(w).map(mapper).map(FastStream::of).orElseGet(FastStream::empty)); + } + + /** + * 扩散流操作,可能影响流元素个数,将原有流元素执行mapper操作,返回多个流所有元素组成的流 + * 这是一个无状态中间操作 + * + * @param mapper 操作,返回IntStream + * @return 返回叠加拆分操作后的IntStream + */ + @Override + public IntStream flatMapToInt(Function mapper) { + return stream.flatMapToInt(mapper); + } + + /** + * 扩散流操作,可能影响流元素个数,将原有流元素执行mapper操作,返回多个流所有元素组成的流 + * 这是一个无状态中间操作 + * + * @param mapper 操作,返回LongStream + * @return 返回叠加拆分操作后的LongStream + */ + @Override + public LongStream flatMapToLong(Function mapper) { + return stream.flatMapToLong(mapper); + } + + /** + * 扩散流操作,可能影响流元素个数,将原有流元素执行mapper操作,返回多个流所有元素组成的流 + * 这是一个无状态中间操作 + * + * @param mapper 操作,返回DoubleStream + * @return 返回叠加拆分操作后的DoubleStream + */ + @Override + public DoubleStream flatMapToDouble(Function mapper) { + return stream.flatMapToDouble(mapper); + } + + /** + * 扩散流操作,可能影响流元素个数,将原有流元素执行mapper操作,返回多个流所有元素组成的流,操作带一个方法,调用该方法可增加元素 + * 这是一个无状态中间操作 + * + * @param mapper 操作,返回流 + * @param 拆分后流的元素类型 + * @return 返回叠加拆分操作后的流 + */ + public FastStream mapMulti(BiConsumer> mapper) { + Objects.requireNonNull(mapper); + return flatMap(e -> { + FastStreamBuilder buffer = FastStream.builder(); + mapper.accept(e, buffer); + return buffer.build(); + }); + } + + /** + * 返回一个具有去重特征的流 非并行流(顺序流)下对于重复元素,保留遇到顺序中最先出现的元素,并行流情况下不能保证具体保留哪一个 + * 这是一个有状态中间操作 + * + * @return 一个具有去重特征的流 + */ + @Override + public FastStream distinct() { + return new FastStream<>(stream.distinct()); + } + + /** + * 返回一个具有去重特征的流 非并行流(顺序流)下对于重复元素,保留遇到顺序中最先出现的元素,并行流情况下不能保证具体保留哪一个 + * 这是一个有状态中间操作 + * + * @param keyExtractor 去重依据 + * @return 一个具有去重特征的流 + */ + public FastStream distinct(Function keyExtractor) { + return new FastStream<>(toMap(keyExtractor).entrySet().stream()).parallel(isParallel()).map(Map.Entry::getValue); + } + + /** + * 返回一个元素按自然顺序排序的流 + * 如果此流的元素不是{@code Comparable} ,则在执行终端操作时可能会抛出 {@code java.lang.ClassCastException} + * 对于顺序流,排序是稳定的。 对于无序流,没有稳定性保证。 + * 这是一个有状态中间操作 + * + * @return 一个元素按自然顺序排序的流 + */ + @Override + public FastStream sorted() { + return new FastStream<>(stream.sorted()); + } + + /** + * 返回一个元素按指定的{@link Comparator}排序的流 + * 如果此流的元素不是{@code Comparable} ,则在执行终端操作时可能会抛出{@code java.lang.ClassCastException} + * 对于顺序流,排序是稳定的。 对于无序流,没有稳定性保证。 + * 这是一个有状态中间操作 + * + * @param comparator 排序规则 + * @return 一个元素按指定的Comparator排序的流 + */ + @Override + public FastStream sorted(Comparator comparator) { + return new FastStream<>(stream.sorted(comparator)); + } + + /** + * 返回与指定函数将元素作为参数执行后组成的流。 + * 这是一个无状态中间操作 + * + * @param action 指定的函数 + * @return 返回叠加操作后的FastStream + * @apiNote 该方法存在的意义主要是用来调试 + * 当你需要查看经过操作管道某处的元素,可以执行以下操作: + *
{@code
+	 *     .of("one", "two", "three", "four")
+	 *         .filter(e -> e.length() > 3)
+	 *         .peek(e -> System.out.println("Filtered value: " + e))
+	 *         .map(String::toUpperCase)
+	 *         .peek(e -> System.out.println("Mapped value: " + e))
+	 *         .collect(Collectors.toList());
+	 * }
+ */ + @Override + public FastStream peek(Consumer action) { + return new FastStream<>(stream.peek(action)); + } + + /** + * 返回叠加调用{@link Console#log(Object)}打印出结果的流 + * + * @return 返回叠加操作后的FastStream + */ + public FastStream log() { + return peek(Console::log); + } + + /** + * 返回截取后面一些元素的流 + * 这是一个短路状态中间操作 + * + * @param maxSize 元素截取后的个数 + * @return 截取后的流 + */ + @Override + public FastStream limit(long maxSize) { + return new FastStream<>(stream.limit(maxSize)); + } + + /** + * 返回丢弃前面n个元素后的剩余元素组成的流,如果当前元素个数小于n,则返回一个元素为空的流 + * 这是一个有状态中间操作 + * + * @param n 需要丢弃的元素个数 + * @return 丢弃前面n个元素后的剩余元素组成的流 + */ + @Override + public FastStream skip(long n) { + return new FastStream<>(stream.skip(n)); + } + + /** + * 返回一个串行流,该方法可以将并行流转换为串行流 + * + * @return 串行流 + */ + @Override + public FastStream sequential() { + return new FastStream<>(stream.sequential()); + } + + /** + * 对流里面的每一个元素执行一个操作 + * 这是一个终端操作 + * + * @param action 操作 + */ + @Override + public void forEach(Consumer action) { + stream.forEach(action); + } + + /** + * 对流里面的每一个元素执行一个操作,操作带下标,并行流时下标永远为-1 + * 这是一个终端操作 + * + * @param action 操作 + */ + public void forEachIdx(BiConsumer action) { + AtomicInteger index = new AtomicInteger(-1); + stream.forEach(e -> action.accept(e, isParallel() ? index.get() : index.incrementAndGet())); + } + + /** + * 对流里面的每一个元素按照顺序执行一个操作 + * 这是一个终端操作 + * + * @param action 操作 + */ + @Override + public void forEachOrdered(Consumer action) { + stream.forEachOrdered(action); + } + + /** + * 对流里面的每一个元素按照顺序执行一个操作,操作带下标,并行流时下标永远为-1 + * 这是一个终端操作 + * + * @param action 操作 + */ + public void forEachOrderedIdx(BiConsumer action) { + AtomicInteger index = new AtomicInteger(-1); + stream.forEachOrdered(e -> action.accept(e, isParallel() ? index.get() : index.incrementAndGet())); + } + + /** + * 返回一个包含此流元素的数组 + * 这是一个终端操作 + * + * @return 包含此流元素的数组 + */ + @Override + public Object[] toArray() { + return stream.toArray(); + } + + /** + * 返回一个包含此流元素的指定的数组 + * + * @param generator 这里的IntFunction的参数是元素的个数,返回值为数组类型 + * @param 给定的数组类型 + * @return 包含此流元素的指定的数组 + * @throws ArrayStoreException 如果元素转换失败,例如不是该元素类型及其父类,则抛出该异常 + * @apiNote 例如以下代码编译正常,但运行时会抛出 {@link ArrayStoreException} + *
{@code
+	 * String[] strings = Stream.builder().add(1).build().toArray(String[]::new);
+	 * }
+ */ + public
A[] toArray(IntFunction generator) { + return stream.toArray(generator); + } + + /** + * 对元素进行聚合,并返回聚合后的值,相当于在for循环里写sum=sum+ints[i] + * 这是一个终端操作 + * + * @param identity 初始值,还用于限定泛型 + * @param accumulator 你想要的聚合操作 + * @return 聚合计算后的值 + * @apiNote 求和、最小值、最大值、平均值和转换成一个String字符串均为聚合操作 + * 例如这里对int进行求和可以写成: + * + *
{@code
+	 *     Integer sum = integers.reduce(0, (a, b) -> a+b);
+	 * }
+ *

+ * 或者写成: + * + *

{@code
+	 *     Integer sum = integers.reduce(0, Integer::sum);
+	 * }
+ */ + @Override + public T reduce(T identity, BinaryOperator accumulator) { + return stream.reduce(identity, accumulator); + } + + /** + * 对元素进行聚合,并返回聚合后用 {@link Optional}包裹的值,相当于在for循环里写sum=sum+ints[i] + * 该操作相当于: + *
{@code
+	 *     boolean foundAny = false;
+	 *     T result = null;
+	 *     for (T element : this stream) {
+	 *         if (!foundAny) {
+	 *             foundAny = true;
+	 *             result = element;
+	 *         }
+	 *         else
+	 *             result = accumulator.apply(result, element);
+	 *     }
+	 *     return foundAny ? Optional.of(result) : Optional.empty();
+	 * }
+ * 但它不局限于顺序执行,例如并行流等情况下 + * 这是一个终端操作 + * + * @param accumulator 你想要的聚合操作 + * @return 聚合后用 {@link Optional}包裹的值 + * @throws NullPointerException 如果给定的聚合操作中执行后结果为空,并用于下一次执行,则抛出该异常 + * @apiNote 例如以下场景抛出 NPE : + *
{@code
+	 *      Optional reduce = Stream.builder().add(1).add(1).build().reduce((a, b) -> null);
+	 * }
+ * @see #reduce(Object, BinaryOperator) + * @see #min(Comparator) + * @see #max(Comparator) + */ + @Override + public Optional reduce(BinaryOperator accumulator) { + return stream.reduce(accumulator); + } + + /** + * 对元素进行聚合,并返回聚合后的值,并行流时聚合拿到的初始值不稳定 + * 这是一个终端操作 + * + * @param identity 初始值 + * @param accumulator 累加器,具体为你要的聚合操作 + * @param combiner 用于并行流时组合多个结果 + * @param 初始值 + * @return 聚合操作的结果 + * @see #reduce(BinaryOperator) + * @see #reduce(Object, BinaryOperator) + */ + @Override + public U reduce(U identity, BiFunction accumulator, BinaryOperator combiner) { + return stream.reduce(identity, accumulator, combiner); + } + + /** + * 对元素进行收集,并返回收集后的容器 + * 这是一个终端操作 + * + * @param supplier 提供初始值的函数式接口,一般可以传入构造参数 + * @param accumulator 具体收集操作 + * @param combiner 用于并行流时组合多个结果 + * @param 用于收集元素的容器,大多是集合 + * @return 收集后的容器 + *
{@code
+	 *  List collect = Stream.iterate(1, i -> ++i).limit(10).collect(ArrayList::new, ArrayList::add, ArrayList::addAll);
+	 * }
+ */ + @Override + public R collect(Supplier supplier, BiConsumer accumulator, BiConsumer combiner) { + return stream.collect(supplier, accumulator, combiner); + } + + /** + * 对元素进行收集,并返回收集后的元素 + * 这是一个终端操作 + * + * @param collector 收集器 + * @param 容器类型 + * @param
具体操作时容器类型,例如 {@code List::add} 时它为 {@code List} + * @return 收集后的容器 + */ + @Override + public R collect(Collector collector) { + return stream.collect(collector); + } + + /** + * 获取最小值 + * + * @param comparator 一个用来比较大小的比较器{@link Comparator} + * @return 最小值 + */ + @Override + public Optional min(Comparator comparator) { + return stream.min(comparator); + } + + /** + * 获取最大值 + * + * @param comparator 一个用来比较大小的比较器{@link Comparator} + * @return 最大值 + */ + @Override + public Optional max(Comparator comparator) { + return stream.max(comparator); + } + + /** + * 返回流元素个数 + * + * @return 流元素个数 + */ + @Override + public long count() { + return stream.count(); + } + + /** + * 判断是否有任何一个元素满足给定断言 + * + * @param predicate 断言 + * @return 是否有任何一个元素满足给定断言 + */ + @Override + public boolean anyMatch(Predicate predicate) { + return stream.anyMatch(predicate); + } + + /** + * 判断是否所有元素满足给定断言 + * + * @param predicate 断言 + * @return 是否所有元素满足给定断言 + */ + @Override + public boolean allMatch(Predicate predicate) { + return stream.allMatch(predicate); + } + + /** + * 判断是否没有元素满足给定断言 + * + * @param predicate 断言 + * @return 是否没有元素满足给定断言 + */ + @Override + public boolean noneMatch(Predicate predicate) { + return stream.noneMatch(predicate); + } + + /** + * 获取第一个元素 + * + * @return 第一个元素 + */ + @Override + public Optional findFirst() { + return stream.findFirst(); + } + + /** + * 获取与给定断言匹配的第一个元素 + * + * @param predicate 断言 + * @return 与给定断言匹配的第一个元素 + */ + public T findFirst(Predicate predicate) { + return filter(predicate).findFirst().orElse(null); + } + + /** + * 获取与给定断言匹配的第一个元素的下标 + * + * @param predicate 断言 + * @return 与给定断言匹配的第一个元素的下标 + */ + public Integer findFirstIdx(Predicate predicate) { + AtomicInteger idxRef = new AtomicInteger(-1); + forEachIdx((e, i) -> { + if (predicate.test(e) && idxRef.get() == -1) { + idxRef.set(i); + } + }); + return idxRef.get(); + } + + /** + * 获取最后一个元素 + * + * @return 最后一个元素 + */ + public Optional findLast() { + return Optional.of(toList()).filter(l -> !l.isEmpty()).map(l -> l.get(l.size() - 1)); + } + + /** + * 获取与给定断言匹配的最后一个元素 + * + * @param predicate 断言 + * @return 与给定断言匹配的最后一个元素 + */ + public T findLast(Predicate predicate) { + return reverse().filter(predicate).findFirst().orElse(null); + } + + /** + * 获取与给定断言匹配的最后一个元素的下标 + * + * @param predicate 断言 + * @return 与给定断言匹配的最后一个元素的下标 + */ + public Integer findLastIdx(Predicate predicate) { + AtomicInteger idxRef = new AtomicInteger(-1); + forEachIdx((e, i) -> { + if (predicate.test(e)) { + idxRef.set(i); + } + }); + return idxRef.get(); + } + + /** + * 反转顺序 + * + * @return 反转元素顺序 + */ + public FastStream reverse() { + List list = toList(); + Collections.reverse(list); + return FastStream.of(list, isParallel()); + } + + /** + * 考虑性能,随便取一个,这里不是随机取一个,是随便取一个 + * + * @return 随便取一个 + */ + @Override + public Optional findAny() { + return stream.findAny(); + } + + /** + * 返回流的迭代器 + * + * @return 流的迭代器 + */ + @Override + public Iterator iterator() { + return stream.iterator(); + } + + /** + * 返回流的拆分器 + * + * @return 流的拆分器 + */ + @Override + public Spliterator spliterator() { + return stream.spliterator(); + } + + /** + * 将流转换为并行 + * + * @return 并行流 + */ + @Override + public FastStream parallel() { + return new FastStream<>(stream.parallel()); + } + + /** + * 更改流的并行状态 + * + * @param parallel 是否并行 + * @return 流 + */ + public FastStream parallel(boolean parallel) { + return new FastStream<>(parallel ? stream.parallel() : stream.sequential()); + } + + /** + * 返回一个无序流(无手动排序) + * + * @return 无序流 + */ + @Override + public FastStream unordered() { + return new FastStream<>(stream.unordered()); + } + + /** + * 在流关闭时执行操作 + * + * @param closeHandler 在流关闭时执行的操作 + * @return 流 + */ + @Override + public FastStream onClose(Runnable closeHandler) { + return new FastStream<>(stream.onClose(closeHandler)); + } + + /** + * 与给定元素组成的流合并,成为新的流 + * + * @param obj 元素 + * @return 流 + */ + public FastStream push(T obj) { + return FastStream.concat(this, FastStream.of(obj)); + } + + /** + * 与给定元素组成的流合并,成为新的流 + * + * @param obj 元素 + * @return 流 + */ + @SuppressWarnings("unchecked") + public FastStream push(T... obj) { + return FastStream.concat(this, FastStream.of(obj)); + } + + /** + * 给定元素组成的流与当前流合并,成为新的流 + * + * @param obj 元素 + * @return 流 + */ + public FastStream unshift(T obj) { + return FastStream.concat(FastStream.of(obj), this); + } + + /** + * 给定元素组成的流与当前流合并,成为新的流 + * + * @param obj 元素 + * @return 流 + */ + @SuppressWarnings("unchecked") + public FastStream unshift(T... obj) { + return FastStream.concat(FastStream.of(obj), this); + } + + /** + * 获取流中指定下标的元素,如果是负数,则从最后一个开始数起 + * + * @param idx 下标 + * @return 指定下标的元素 + */ + public T at(Integer idx) { + if (Objects.isNull(idx)) { + return null; + } + List list = toList(); + if (idx > -1) { + if (idx >= list.size()) { + return null; + } + return list.get(idx); + } + if (-idx > list.size()) { + return null; + } + return list.get(list.size() + idx); + } + + /** + * 返回流的并行状态 + * + * @return 流的并行状态 + */ + @Override + public boolean isParallel() { + return stream.isParallel(); + } + + /** + * 关闭流 + * + * @see AutoCloseable#close() + */ + @Override + public void close() { + stream.close(); + } + + /** + * hashcode + * + * @return hashcode + */ + @Override + public int hashCode() { + return stream.hashCode(); + } + + /** + * equals + * + * @param obj 对象 + * @return 结果 + */ + @Override + public boolean equals(Object obj) { + return stream.equals(obj); + } + + /** + * toString + * + * @return string + */ + @Override + public String toString() { + return stream.toString(); + } + + /** + * 转换成集合 + * + * @param collectionFactory 集合工厂(可以是集合构造器) + * @param 集合类型 + * @return 集合 + */ + public > C toColl(Supplier collectionFactory) { + return collect(Collectors.toCollection(collectionFactory)); + } + + /** + * 转换为ArrayList + * + * @return list + */ + public List toList() { + return collect(Collectors.toList()); + } + + /** + * 转换为HashSet + * + * @return hashSet + */ + public Set toSet() { + return collect(Collectors.toSet()); + } + + /** + * 与给定的可迭代对象转换成map,key为现有元素,value为给定可迭代对象迭代的元素 + * + * @param other 可迭代对象 + * @param 可迭代对象迭代的元素类型 + * @return map,key为现有元素,value为给定可迭代对象迭代的元素 + */ + public Map toZip(Iterable other) { + Iterator iterator = other.iterator(); + return toMap(Function.identity(), e -> iterator.hasNext() ? iterator.next() : null); + } + + /** + * 返回拼接后的字符串 + * + * @return 拼接后的字符串 + */ + public String join() { + return join(""); + } + + /** + * 返回拼接后的字符串 + * + * @param delimiter 分隔符 + * @return 拼接后的字符串 + */ + public String join(CharSequence delimiter) { + return join(delimiter, "", ""); + } + + /** + * 返回拼接后的字符串 + * + * @param delimiter 分隔符 + * @param prefix 前缀 + * @param suffix 后缀 + * @return 拼接后的字符串 + */ + public String join(CharSequence delimiter, + CharSequence prefix, + CharSequence suffix) { + return map(String::valueOf).collect(Collectors.joining(delimiter, prefix, suffix)); + } + + /** + * 转换为map,key为给定操作执行后的返回值,value为当前元素 + * + * @param keyMapper 指定的key操作 + * @param key类型 + * @return map + */ + public Map toMap(Function keyMapper) { + return toMap(keyMapper, Function.identity()); + } + + /** + * 转换为map,key,value为给定操作执行后的返回值 + * + * @param keyMapper 指定的key操作 + * @param valueMapper 指定value操作 + * @param key类型 + * @param value类型 + * @return map + */ + public Map toMap(Function keyMapper, + Function valueMapper) { + return toMap(keyMapper, valueMapper, (l, r) -> r); + } + + /** + * 转换为map,key,value为给定操作执行后的返回值 + * + * @param keyMapper 指定的key操作 + * @param valueMapper 指定value操作 + * @param mergeFunction 合并操作 + * @param key类型 + * @param value类型 + * @return map + */ + public Map toMap(Function keyMapper, + Function valueMapper, + BinaryOperator mergeFunction) { + return toMap(keyMapper, valueMapper, mergeFunction, HashMap::new); + } + + /** + * 转换为map,key,value为给定操作执行后的返回值 + * + * @param keyMapper 指定的key操作 + * @param valueMapper 指定value操作 + * @param mergeFunction 合并操作 + * @param mapSupplier map工厂 + * @param key类型 + * @param value类型 + * @param map类型 + * @return map + */ + public > M toMap(Function keyMapper, + Function valueMapper, + BinaryOperator mergeFunction, + Supplier mapSupplier) { + return collect(CollectorUtil.toMap(keyMapper, valueMapper, mergeFunction, mapSupplier)); + } + + + /** + * 通过给定分组依据进行分组 + * + * @param classifier 分组依据 + * @param 实体中的分组依据对应类型,也是Map中key的类型 + * @return {@link Collector} + */ + public Map> group(Function classifier) { + return group(classifier, Collectors.toList()); + } + + /** + * 通过给定分组依据进行分组 + * + * @param classifier 分组依据 + * @param downstream 下游操作 + * @param 实体中的分组依据对应类型,也是Map中key的类型 + * @param 下游操作对应返回类型,也是Map中value的类型 + * @param 下游操作在进行中间操作时对应类型 + * @return {@link Collector} + */ + public Map group(Function classifier, + Collector downstream) { + return group(classifier, HashMap::new, downstream); + } + + /** + * 通过给定分组依据进行分组 + * + * @param classifier 分组依据 + * @param mapFactory 提供的map + * @param downstream 下游操作 + * @param 实体中的分组依据对应类型,也是Map中key的类型 + * @param 下游操作对应返回类型,也是Map中value的类型 + * @param 下游操作在进行中间操作时对应类型 + * @param 最后返回结果Map类型 + * @return {@link Collector} + */ + public > M group(Function classifier, + Supplier mapFactory, + Collector downstream) { + return collect(CollectorUtil.groupingBy(classifier, mapFactory, downstream)); + } + + public FastStream zip(Iterable other, + BiFunction zipper) { + Iterator iterator = other.iterator(); + return new FastStream<>(stream.map(e -> zipper.apply(e, iterator.hasNext() ? iterator.next() : null))); + } + + /** + * 类似js的splice函数 + * + * @param start 起始下标 + * @param deleteCount 删除个数 + * @param items 放入值 + * @return 操作后的流 + */ + @SuppressWarnings("unchecked") + public FastStream splice(int start, int deleteCount, T... items) { + List list = toList(); + if (start > -1) { + if (start >= list.size()) { + return FastStream.concat(FastStream.of(list), FastStream.of(items)); + } + list.removeAll(list.subList(start, start + deleteCount)); + list.addAll(start, Arrays.asList(items)); + return FastStream.of(list); + } + if (-start > list.size()) { + return FastStream.concat(FastStream.of(items), FastStream.of(list)); + } + start = list.size() + start; + list.removeAll(list.subList(start, start + deleteCount)); + list.addAll(start, Arrays.asList(items)); + return FastStream.of(list); + } + + /** + * 按指定长度切分为双层流 + * + * @param batchSize 指定长度 + * @return 切好的流 + */ + public FastStream> sub(int batchSize) { + List list = toList(); + if (list.size() <= batchSize) { + return FastStream.>of(FastStream.of(list)).parallel(isParallel()); + } + return FastStream.iterate(0, i -> i < list.size(), i -> i + batchSize) + .map(skip -> FastStream.of(list).skip(skip).limit(batchSize)).parallel(isParallel()); + } + + /** + * 按指定长度切分为元素为list的流 + * + * @param batchSize 指定长度 + * @return 切好的流 + */ + public FastStream> subList(int batchSize) { + return sub(batchSize).map(FastStream::toList); + } + + public interface FastStreamBuilder extends Consumer, cn.hutool.core.builder.Builder> { + + /** + * Adds an element to the stream being built. + * + * @param t the element to add + * @throws IllegalStateException if the builder has already transitioned to + * the built state + */ + @Override + void accept(T t); + + /** + * Adds an element to the stream being built. + * + * @param t the element to add + * @return {@code this} builder + * @throws IllegalStateException if the builder has already transitioned to + * the built state + * @implSpec The default implementation behaves as if: + *
{@code
+		 *     accept(t)
+		 *     return this;
+		 * }
+ */ + default FastStreamBuilder add(T t) { + accept(t); + return this; + } + + /** + * Builds the stream, transitioning this builder to the built state. + * An {@code IllegalStateException} is thrown if there are further attempts + * to operate on the builder after it has entered the built state. + * + * @return the built stream + * @throws IllegalStateException if the builder has already transitioned to + * the built state + */ + FastStream build(); + + } + +} diff --git a/hutool-core/src/test/java/cn/hutool/core/stream/FastStreamTest.java b/hutool-core/src/test/java/cn/hutool/core/stream/FastStreamTest.java new file mode 100644 index 000000000..18dfd1380 --- /dev/null +++ b/hutool-core/src/test/java/cn/hutool/core/stream/FastStreamTest.java @@ -0,0 +1,291 @@ +package cn.hutool.core.stream; + + +import org.junit.Assert; +import org.junit.Test; + +import java.util.*; +import java.util.stream.Stream; + +import static java.util.Collections.singletonList; + +/** + * @author VampireAchao + */ +public class FastStreamTest { + + @Test + public void testBuilder() { + List list = FastStream.builder().add(1).add(2).add(3).build().toList(); + Assert.assertEquals(Arrays.asList(1, 2, 3), list); + } + + @Test + public void testOf() { + Assert.assertEquals(3, FastStream.of(Arrays.asList(1, 2, 3), true).count()); + Assert.assertEquals(3, FastStream.of(1, 2, 3).count()); + Assert.assertEquals(3, FastStream.of(Stream.builder().add(1).add(2).add(3).build()).count()); + } + + @Test + public void testSplit() { + List list = FastStream.split("1,2,3", ",").map(Integer::valueOf).toList(); + Assert.assertEquals(Arrays.asList(1, 2, 3), list); + } + + @Test + public void testIterator() { + List list = FastStream.iterate(0, i -> i < 3, i -> ++i).toList(); + Assert.assertEquals(Arrays.asList(0, 1, 2), list); + } + + @Test + public void testToCollection() { + List list = Arrays.asList(1, 2, 3); + List toCollection = FastStream.of(list).map(String::valueOf).toColl(LinkedList::new); + Assert.assertEquals(Arrays.asList("1", "2", "3"), toCollection); + } + + @Test + public void testToList() { + List list = Arrays.asList(1, 2, 3); + List toList = FastStream.of(list).map(String::valueOf).toList(); + Assert.assertEquals(Arrays.asList("1", "2", "3"), toList); + } + + @Test + public void testToSet() { + List list = Arrays.asList(1, 2, 3); + Set toSet = FastStream.of(list).map(String::valueOf).toSet(); + Assert.assertEquals(new HashSet<>(Arrays.asList("1", "2", "3")), toSet); + } + + @Test + public void testToZip() { + List orders = Arrays.asList(1, 2, 3); + List list = Arrays.asList("dromara", "hutool", "sweet"); + Map toZip = FastStream.of(orders).toZip(list); + Assert.assertEquals(new HashMap() {{ + put(1, "dromara"); + put(2, "hutool"); + put(3, "sweet"); + }}, toZip); + } + + @Test + public void testJoin() { + List list = Arrays.asList(1, 2, 3); + String joining = FastStream.of(list).join(); + Assert.assertEquals("123", joining); + Assert.assertEquals("1,2,3", FastStream.of(list).join(",")); + Assert.assertEquals("(1,2,3)", FastStream.of(list).join(",", "(", ")")); + } + + @Test + public void testToMap() { + List list = Arrays.asList(1, 2, 3); + Map identityMap = FastStream.of(list).toMap(String::valueOf); + Assert.assertEquals(new HashMap() {{ + put("1", 1); + put("2", 2); + put("3", 3); + }}, identityMap); + } + + @Test + public void testGroup() { + List list = Arrays.asList(1, 2, 3); + Map> group = FastStream.of(list).group(String::valueOf); + Assert.assertEquals( + new HashMap>() {{ + put("1", singletonList(1)); + put("2", singletonList(2)); + put("3", singletonList(3)); + }}, group); + } + + @Test + public void testMapIdx() { + List list = Arrays.asList("dromara", "hutool", "sweet"); + List mapIndex = FastStream.of(list).mapIdx((e, i) -> i + 1 + "." + e).toList(); + Assert.assertEquals(Arrays.asList("1.dromara", "2.hutool", "3.sweet"), mapIndex); + // 并行流时为-1 + Assert.assertEquals(Arrays.asList(-1, -1, -1), FastStream.of(1, 2, 3).parallel().mapIdx((e, i) -> i).toList()); + } + + @Test + public void testMapMulti() { + List list = Arrays.asList(1, 2, 3); + List mapMulti = FastStream.of(list).mapMulti((e, buffer) -> { + if (e % 2 == 0) { + buffer.accept(e); + } + buffer.accept(e); + }).toList(); + Assert.assertEquals(Arrays.asList(1, 2, 2, 3), mapMulti); + } + + @Test + public void testDistinct() { + List list = Arrays.asList(1, 2, 2, 3); + List distinctBy = FastStream.of(list).distinct(String::valueOf).toList(); + Assert.assertEquals(Arrays.asList(1, 2, 3), distinctBy); + } + + @Test + public void testForeachIdx() { + List list = Arrays.asList("dromara", "hutool", "sweet"); + FastStream.FastStreamBuilder builder = FastStream.builder(); + FastStream.of(list).forEachIdx((e, i) -> builder.accept(i + 1 + "." + e)); + Assert.assertEquals(Arrays.asList("1.dromara", "2.hutool", "3.sweet"), builder.build().toList()); + // 并行流时为-1 + FastStream.of(1, 2, 3).parallel().forEachIdx((e, i) -> Assert.assertEquals(-1, (Object) i)); + } + + @Test + public void testForEachOrderedIdx() { + List list = Arrays.asList("dromara", "hutool", "sweet"); + FastStream.FastStreamBuilder builder = FastStream.builder(); + FastStream.of(list).forEachOrderedIdx((e, i) -> builder.accept(i + 1 + "." + e)); + Assert.assertEquals(Arrays.asList("1.dromara", "2.hutool", "3.sweet"), builder.build().toList()); + } + + @Test + public void testFlatMapIdx() { + List list = Arrays.asList("dromara", "hutool", "sweet"); + List mapIndex = FastStream.of(list).flatMapIdx((e, i) -> FastStream.of(i + 1 + "." + e)).toList(); + Assert.assertEquals(Arrays.asList("1.dromara", "2.hutool", "3.sweet"), mapIndex); + // 并行流时为-1 + Assert.assertEquals(Arrays.asList(-1, -1, -1), FastStream.of(1, 2, 3).parallel().mapIdx((e, i) -> i).toList()); + } + + @Test + public void testFlatMapIter() { + List list = Arrays.asList(1, 2, 3); + List flatMapIter = FastStream.of(list).flatMapIter(e -> null).toList(); + Assert.assertEquals(Collections.emptyList(), flatMapIter); + } + + @Test + public void testFilter() { + List list = Arrays.asList(1, 2, 3); + List filterIndex = FastStream.of(list).filter(String::valueOf, "1").toList(); + Assert.assertEquals(Collections.singletonList(1), filterIndex); + } + + @Test + public void testFilterIdx() { + List list = Arrays.asList("dromara", "hutool", "sweet"); + List filterIndex = FastStream.of(list).filterIdx((e, i) -> i < 2).toList(); + Assert.assertEquals(Arrays.asList("dromara", "hutool"), filterIndex); + // 并行流时为-1 + Assert.assertEquals(3L, FastStream.of(1, 2, 3).parallel().filterIdx((e, i) -> i == -1).count()); + } + + @Test + public void testNonNull() { + List list = Arrays.asList(1, null, 2, 3); + List nonNull = FastStream.of(list).nonNull().toList(); + Assert.assertEquals(Arrays.asList(1, 2, 3), nonNull); + } + + @Test + public void testParallel() { + Assert.assertTrue(FastStream.of(1, 2, 3).parallel(true).isParallel()); + Assert.assertFalse(FastStream.of(1, 2, 3).parallel(false).isParallel()); + } + + @Test + public void testPush() { + List list = Arrays.asList(1, 2); + List push = FastStream.of(list).push(3).toList(); + Assert.assertEquals(Arrays.asList(1, 2, 3), push); + } + + @Test + public void testUnshift() { + List list = Arrays.asList(2, 3); + List unshift = FastStream.of(list).unshift(1).toList(); + Assert.assertEquals(Arrays.asList(1, 2, 3), unshift); + } + + @Test + public void testAt() { + List list = Arrays.asList(1, 2, 3); + Assert.assertEquals(1, (Object) FastStream.of(list).at(0)); + Assert.assertEquals(1, (Object) FastStream.of(list).at(-3)); + Assert.assertEquals(3, (Object) FastStream.of(list).at(-1)); + Assert.assertNull(FastStream.of(list).at(-4)); + } + + @Test + public void testSplice() { + List list = Arrays.asList(1, 2, 3); + Assert.assertEquals(Arrays.asList(1, 2, 2, 3), FastStream.of(list).splice(1, 0, 2).toList()); + Assert.assertEquals(Arrays.asList(1, 2, 3, 3), FastStream.of(list).splice(3, 1, 3).toList()); + Assert.assertEquals(Arrays.asList(1, 2, 4), FastStream.of(list).splice(2, 1, 4).toList()); + Assert.assertEquals(Arrays.asList(1, 2), FastStream.of(list).splice(2, 1).toList()); + Assert.assertEquals(Arrays.asList(1, 2, 3), FastStream.of(list).splice(2, 0).toList()); + Assert.assertEquals(Arrays.asList(1, 2), FastStream.of(list).splice(-1, 1).toList()); + Assert.assertEquals(Arrays.asList(1, 2, 3), FastStream.of(list).splice(-2, 2, 2, 3).toList()); + } + + @Test + public void testFindFirst() { + List list = Arrays.asList(1, 2, 3); + Integer find = FastStream.of(list).findFirst(Objects::nonNull); + Assert.assertEquals(1, (Object) find); + } + + @Test + public void testFindFirstIdx() { + List list = Arrays.asList(null, 2, 3); + Integer idx = FastStream.of(list).findFirstIdx(Objects::nonNull); + Assert.assertEquals(1, (Object) idx); + Assert.assertEquals(-1, (Object) FastStream.of(list).parallel().findFirstIdx(Objects::nonNull)); + } + + @Test + public void testFindLast() { + List list = Arrays.asList(1, null, 3); + Integer find = FastStream.of(list).findLast(Objects::nonNull); + Assert.assertEquals(3, (Object) find); + Assert.assertEquals(3, (Object) FastStream.of(list).findLast().orElse(null)); + } + + @Test + public void testFindLastIdx() { + List list = Arrays.asList(1, null, 3); + Integer idx = FastStream.of(list).findLastIdx(Objects::nonNull); + Assert.assertEquals(2, (Object) idx); + Assert.assertEquals(-1, (Object) FastStream.of(list).parallel().findLastIdx(Objects::nonNull)); + } + + @Test + public void testZip() { + List orders = Arrays.asList(1, 2, 3); + List list = Arrays.asList("dromara", "hutool", "sweet"); + List zip = FastStream.of(orders).zip(list, (e1, e2) -> e1 + "." + e2).toList(); + Assert.assertEquals(Arrays.asList("1.dromara", "2.hutool", "3.sweet"), zip); + } + + @Test + void testSub() { + List list = Arrays.asList(1, 2, 3, 4, 5); + List> lists = FastStream.of(list).sub(2).map(FastStream::toList).toList(); + Assert.assertEquals(Arrays.asList(Arrays.asList(1, 2), + Arrays.asList(3, 4), + singletonList(5) + ), lists); + } + + @Test + void testSubList() { + List list = Arrays.asList(1, 2, 3, 4, 5); + List> lists = FastStream.of(list).subList(2).toList(); + Assert.assertEquals(Arrays.asList(Arrays.asList(1, 2), + Arrays.asList(3, 4), + singletonList(5) + ), lists); + } +}