From 9a7367ffe14f28d22c23b73459641a514a140b5f Mon Sep 17 00:00:00 2001 From: Zjp <1215582715@qq.com> Date: Mon, 1 Aug 2022 16:25:07 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96FastStream=E5=AE=9E=E7=8E=B0?= =?UTF-8?q?=E7=BB=86=E8=8A=82,=20=E4=BD=BF=E7=94=A8=E9=AB=98=E6=95=88?= =?UTF-8?q?=E4=B8=80=E4=BA=9B=E7=9A=84=E5=86=99=E6=B3=95;?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../cn/hutool/core/collection/CollUtil.java | 2 +- .../cn/hutool/core/collection/ListUtil.java | 2 +- .../cn/hutool/core/stream/FastStream.java | 325 +++++++++++++----- .../cn/hutool/core/stream/FastStreamTest.java | 104 +++--- 4 files changed, 298 insertions(+), 135 deletions(-) diff --git a/hutool-core/src/main/java/cn/hutool/core/collection/CollUtil.java b/hutool-core/src/main/java/cn/hutool/core/collection/CollUtil.java index aa9f14691..6a541ad79 100755 --- a/hutool-core/src/main/java/cn/hutool/core/collection/CollUtil.java +++ b/hutool-core/src/main/java/cn/hutool/core/collection/CollUtil.java @@ -1467,7 +1467,7 @@ public class CollUtil { * @return 最后一个位置 * @since 5.6.6 */ - public static int lastIndexOf(final Collection collection, final Predicate predicate) { + public static int lastIndexOf(final Collection collection, final Predicate predicate) { if (collection instanceof List) { // List的查找最后一个有优化算法 return ListUtil.lastIndexOf((List) collection, predicate); diff --git a/hutool-core/src/main/java/cn/hutool/core/collection/ListUtil.java b/hutool-core/src/main/java/cn/hutool/core/collection/ListUtil.java index a0147343d..28f446daa 100755 --- a/hutool-core/src/main/java/cn/hutool/core/collection/ListUtil.java +++ b/hutool-core/src/main/java/cn/hutool/core/collection/ListUtil.java @@ -524,7 +524,7 @@ public class ListUtil { * @return 最后一个位置 * @since 5.6.6 */ - public static int lastIndexOf(final List list, final Predicate matcher) { + public static int lastIndexOf(final List list, final Predicate matcher) { if (null != list) { final int size = list.size(); if (size > 0) { diff --git a/hutool-core/src/main/java/cn/hutool/core/stream/FastStream.java b/hutool-core/src/main/java/cn/hutool/core/stream/FastStream.java index f86f62916..fad246885 100644 --- a/hutool-core/src/main/java/cn/hutool/core/stream/FastStream.java +++ b/hutool-core/src/main/java/cn/hutool/core/stream/FastStream.java @@ -1,12 +1,17 @@ package cn.hutool.core.stream; - +import cn.hutool.core.collection.CollUtil; +import cn.hutool.core.collection.ListUtil; import cn.hutool.core.lang.Console; import cn.hutool.core.lang.Opt; +import cn.hutool.core.lang.mutable.MutableInt; +import cn.hutool.core.lang.mutable.MutableObj; +import cn.hutool.core.map.MapUtil; import cn.hutool.core.text.StrUtil; import java.util.*; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.*; import java.util.stream.*; @@ -39,9 +44,15 @@ import java.util.stream.*; * 这些异步任务默认使用{@link java.util.concurrent.ForkJoinPool}线程池进行管理 * * @author VampireAchao + * @author emptypoint * @see java.util.stream.Stream + * @since 6.0.0 */ public class FastStream implements Stream, Iterable { + /** + * 代表不存在的下标, 一般用于并行流的下标, 或者未找到元素时的下标 + */ + private static final int NOT_FOUND_INDEX = -1; protected final Stream stream; @@ -215,7 +226,7 @@ public class FastStream implements Stream, Iterable { * @param b 第二个流 * @return 拼接两个流之后的流 */ - public static FastStream concat(FastStream a, FastStream b) { + public static FastStream concat(Stream a, Stream b) { return new FastStream<>(Stream.concat(a, b)); } @@ -239,7 +250,7 @@ public class FastStream implements Stream, Iterable { * @return 流 */ public static FastStream of(Iterable iterable, boolean parallel) { - return Optional.ofNullable(iterable).map(Iterable::spliterator).map(spliterator -> StreamSupport.stream(spliterator, parallel)).map(FastStream::new).orElseGet(FastStream::empty); + return Opt.ofNullable(iterable).map(Iterable::spliterator).map(spliterator -> StreamSupport.stream(spliterator, parallel)).map(FastStream::new).orElseGet(FastStream::empty); } /** @@ -250,7 +261,7 @@ public class FastStream implements Stream, Iterable { * @return 流 */ public static FastStream of(Stream stream) { - return new FastStream<>(stream); + return new FastStream<>(Objects.requireNonNull(stream)); } /** @@ -261,7 +272,7 @@ public class FastStream implements Stream, Iterable { * @return 拆分后元素组成的流 */ public static FastStream split(CharSequence str, String regex) { - return Opt.ofBlankAble(str).map(String::valueOf).map(s -> s.split(regex)).map(FastStream::of).orElseGet(FastStream::empty); + return Opt.ofBlankAble(str).map(CharSequence::toString).map(s -> s.split(regex)).map(FastStream::of).orElseGet(FastStream::empty); } /** @@ -286,6 +297,7 @@ public class FastStream implements Stream, Iterable { * @return 与 指定操作结果 匹配 指定值 的元素组成的流 */ public FastStream filter(Function mapper, R value) { + Objects.requireNonNull(mapper); return filter(e -> Objects.equals(Opt.ofNullable(e).map(mapper).get(), value)); } @@ -298,8 +310,13 @@ public class FastStream implements Stream, Iterable { * @return 返回叠加过滤操作后的流 */ public FastStream filterIdx(BiPredicate predicate) { - AtomicInteger index = new AtomicInteger(-1); - return filter(e -> predicate.test(e, isParallel() ? index.get() : index.incrementAndGet())); + Objects.requireNonNull(predicate); + if (isParallel()) { + return filter(e -> predicate.test(e, NOT_FOUND_INDEX)); + } else { + MutableInt index = new MutableInt(NOT_FOUND_INDEX); + return filter(e -> predicate.test(e, index.increment().get())); + } } /** @@ -333,8 +350,13 @@ public class FastStream implements Stream, Iterable { * @return 返回叠加操作后的流 */ public FastStream mapIdx(BiFunction mapper) { - AtomicInteger index = new AtomicInteger(-1); - return map(e -> mapper.apply(e, isParallel() ? index.get() : index.incrementAndGet())); + Objects.requireNonNull(mapper); + if (isParallel()) { + return map(e -> mapper.apply(e, NOT_FOUND_INDEX)); + } else { + MutableInt index = new MutableInt(NOT_FOUND_INDEX); + return map(e -> mapper.apply(e, index.increment().get())); + } } /** @@ -363,8 +385,13 @@ public class FastStream implements Stream, Iterable { * @return 返回叠加拆分操作后的流 */ public FastStream flatMapIdx(BiFunction> mapper) { - AtomicInteger index = new AtomicInteger(-1); - return flatMap(e -> mapper.apply(e, isParallel() ? index.get() : index.incrementAndGet())); + Objects.requireNonNull(mapper); + if (isParallel()) { + return flatMap(e -> mapper.apply(e, NOT_FOUND_INDEX)); + } else { + MutableInt index = new MutableInt(NOT_FOUND_INDEX); + return flatMap(e -> mapper.apply(e, index.increment().get())); + } } /** @@ -416,6 +443,7 @@ public class FastStream implements Stream, Iterable { * @return 返回叠加拆分操作后的流 */ public FastStream flatMapIter(Function> mapper) { + Objects.requireNonNull(mapper); return flatMap(w -> Opt.of(w).map(mapper).map(FastStream::of).orElseGet(FastStream::empty)); } @@ -463,7 +491,7 @@ public class FastStream implements Stream, Iterable { * @param 拆分后流的元素类型 * @return 返回叠加拆分操作后的流 */ - public FastStream mapMulti(BiConsumer> mapper) { + public FastStream mapMulti(BiConsumer> mapper) { Objects.requireNonNull(mapper); return flatMap(e -> { FastStreamBuilder buffer = FastStream.builder(); @@ -490,8 +518,31 @@ public class FastStream implements Stream, Iterable { * @param keyExtractor 去重依据 * @return 一个具有去重特征的流 */ - public FastStream distinct(Function keyExtractor) { - return new FastStream<>(toMap(keyExtractor).entrySet().stream()).parallel(isParallel()).map(Map.Entry::getValue); + public FastStream distinct(Function keyExtractor) { + Objects.requireNonNull(keyExtractor); + if (isParallel()) { + ConcurrentHashMap exists = MapUtil.newConcurrentHashMap(); + // 标记是否出现过null值,用于保留第一个出现的null + // 由于ConcurrentHashMap的key不能为null,所以用此变量来标记 + AtomicBoolean hasNull = new AtomicBoolean(false); + return of(stream.filter(e -> { + F key = keyExtractor.apply(e); + if (key == null) { + // 已经出现过null值,跳过该值 + if (hasNull.get()) { + return false; + } + hasNull.set(Boolean.TRUE); + return true; + } else { + // 第一次出现的key返回true + return null == exists.putIfAbsent(key, Boolean.TRUE); + } + })).parallel(isParallel()); + } else { + Set exists = new HashSet<>(); + return of(stream.filter(e -> exists.add(keyExtractor.apply(e)))).parallel(isParallel()); + } } /** @@ -583,7 +634,9 @@ public class FastStream implements Stream, Iterable { */ @Override public FastStream sequential() { - return new FastStream<>(stream.sequential()); + //noinspection ResultOfMethodCallIgnored + stream.sequential(); + return this; } /** @@ -604,8 +657,13 @@ public class FastStream implements Stream, Iterable { * @param action 操作 */ public void forEachIdx(BiConsumer action) { - AtomicInteger index = new AtomicInteger(-1); - stream.forEach(e -> action.accept(e, isParallel() ? index.get() : index.incrementAndGet())); + Objects.requireNonNull(action); + if (isParallel()) { + stream.forEach(e -> action.accept(e, NOT_FOUND_INDEX)); + } else { + MutableInt index = new MutableInt(NOT_FOUND_INDEX); + stream.forEach(e -> action.accept(e, index.increment().get())); + } } /** @@ -626,8 +684,13 @@ public class FastStream implements Stream, Iterable { * @param action 操作 */ public void forEachOrderedIdx(BiConsumer action) { - AtomicInteger index = new AtomicInteger(-1); - stream.forEachOrdered(e -> action.accept(e, isParallel() ? index.get() : index.incrementAndGet())); + Objects.requireNonNull(action); + if (isParallel()) { + stream.forEachOrdered(e -> action.accept(e, NOT_FOUND_INDEX)); + } else { + MutableInt index = new MutableInt(NOT_FOUND_INDEX); + stream.forEachOrdered(e -> action.accept(e, index.increment().get())); + } } /** @@ -653,6 +716,7 @@ public class FastStream implements Stream, Iterable { * String[] strings = Stream.builder().add(1).build().toArray(String[]::new); * } */ + @Override public A[] toArray(IntFunction generator) { //noinspection SuspiciousToArrayCall return stream.toArray(generator); @@ -849,23 +913,28 @@ public class FastStream implements Stream, Iterable { * @return 与给定断言匹配的第一个元素 */ public T findFirst(Predicate predicate) { - return filter(predicate).findFirst().orElse(null); + return stream.filter(predicate).findFirst().orElse(null); } /** - * 获取与给定断言匹配的第一个元素的下标 + * 获取与给定断言匹配的第一个元素的下标,并行流下标永远为-1 * * @param predicate 断言 - * @return 与给定断言匹配的第一个元素的下标 + * @return 与给定断言匹配的第一个元素的下标,如果不存在则返回-1 */ public Integer findFirstIdx(Predicate predicate) { - AtomicInteger idxRef = new AtomicInteger(-1); - forEachIdx((e, i) -> { - if (predicate.test(e) && idxRef.get() == -1) { - idxRef.set(i); - } - }); - return idxRef.get(); + Objects.requireNonNull(predicate); + if (isParallel()) { + return NOT_FOUND_INDEX; + } else { + MutableInt index = new MutableInt(NOT_FOUND_INDEX); + //noinspection ResultOfMethodCallIgnored + stream.filter(e -> { + index.increment(); + return predicate.test(e); + }).findFirst(); + return index.get(); + } } /** @@ -874,7 +943,13 @@ public class FastStream implements Stream, Iterable { * @return 最后一个元素 */ public Optional findLast() { - return Optional.of(toList()).filter(l -> !l.isEmpty()).map(l -> l.get(l.size() - 1)); + if (isParallel()) { + return Optional.of(toList()).filter(CollUtil::isNotEmpty).map(l -> l.get(l.size() - 1)); + } else { + MutableObj last = new MutableObj<>(null); + forEach(last::set); + return Optional.ofNullable(last.get()); + } } /** @@ -884,23 +959,41 @@ public class FastStream implements Stream, Iterable { * @return 与给定断言匹配的最后一个元素 */ public T findLast(Predicate predicate) { - return reverse().filter(predicate).findFirst().orElse(null); + Objects.requireNonNull(predicate); + if (isParallel()) { + List list = toList(); + final int index = ListUtil.lastIndexOf(list, predicate); + return index == NOT_FOUND_INDEX ? null : list.get(index); + } else { + MutableObj last = new MutableObj<>(null); + forEach(e -> { + if (predicate.test(e)) { + last.set(e); + } + }); + return last.get(); + } } /** - * 获取与给定断言匹配的最后一个元素的下标 + * 获取与给定断言匹配的最后一个元素的下标,并行流下标永远为-1 * * @param predicate 断言 - * @return 与给定断言匹配的最后一个元素的下标 + * @return 与给定断言匹配的最后一个元素的下标,如果不存在则返回-1 */ public Integer findLastIdx(Predicate predicate) { - AtomicInteger idxRef = new AtomicInteger(-1); - forEachIdx((e, i) -> { - if (predicate.test(e)) { - idxRef.set(i); - } - }); - return idxRef.get(); + Objects.requireNonNull(predicate); + if (isParallel()) { + return NOT_FOUND_INDEX; + } else { + MutableInt idxRef = new MutableInt(NOT_FOUND_INDEX); + forEachIdx((e, i) -> { + if (predicate.test(e)) { + idxRef.set(i); + } + }); + return idxRef.get(); + } } /** @@ -951,7 +1044,9 @@ public class FastStream implements Stream, Iterable { */ @Override public FastStream parallel() { - return new FastStream<>(stream.parallel()); + //noinspection ResultOfMethodCallIgnored + stream.parallel(); + return this; } /** @@ -961,7 +1056,7 @@ public class FastStream implements Stream, Iterable { * @return 流 */ public FastStream parallel(boolean parallel) { - return new FastStream<>(parallel ? stream.parallel() : stream.sequential()); + return parallel ? parallel() : sequential(); } /** @@ -982,7 +1077,9 @@ public class FastStream implements Stream, Iterable { */ @Override public FastStream onClose(Runnable closeHandler) { - return new FastStream<>(stream.onClose(closeHandler)); + //noinspection ResultOfMethodCallIgnored + stream.onClose(closeHandler); + return this; } /** @@ -992,7 +1089,7 @@ public class FastStream implements Stream, Iterable { * @return 流 */ public FastStream push(T obj) { - return FastStream.concat(this, FastStream.of(obj)); + return FastStream.concat(this.stream, Stream.of(obj)); } /** @@ -1003,7 +1100,7 @@ public class FastStream implements Stream, Iterable { */ @SuppressWarnings("unchecked") public FastStream push(T... obj) { - return FastStream.concat(this, FastStream.of(obj)); + return FastStream.concat(this.stream, Stream.of(obj)); } /** @@ -1012,8 +1109,8 @@ public class FastStream implements Stream, Iterable { * @param obj 元素 * @return 流 */ - public FastStream unshift(T obj) { - return FastStream.concat(FastStream.of(obj), this); + public FastStream addFirst(T obj) { + return FastStream.concat(Stream.of(obj), this.stream); } /** @@ -1022,9 +1119,9 @@ public class FastStream implements Stream, Iterable { * @param obj 元素 * @return 流 */ - @SuppressWarnings("unchecked") - public FastStream unshift(T... obj) { - return FastStream.concat(FastStream.of(obj), this); + @SafeVarargs + public final FastStream addFirst(T... obj) { + return FastStream.concat(Stream.of(obj), this.stream); } /** @@ -1037,17 +1134,7 @@ public class FastStream implements Stream, Iterable { if (Objects.isNull(idx)) { return null; } - List list = toList(); - if (idx > -1) { - if (idx >= list.size()) { - return null; - } - return list.get(idx); - } - if (-idx > list.size()) { - return null; - } - return list.get(list.size() + idx); + return CollUtil.get(toList(), idx); } /** @@ -1134,15 +1221,28 @@ public class FastStream implements Stream, Iterable { } /** - * 与给定的可迭代对象转换成map,key为现有元素,value为给定可迭代对象迭代的元素 + * 与给定的可迭代对象转换成map,key为现有元素,value为给定可迭代对象迭代的元素
+ * 至少包含全部的key,如果对应位置上的value不存在,则为null * * @param other 可迭代对象 * @param 可迭代对象迭代的元素类型 - * @return map,key为现有元素,value为给定可迭代对象迭代的元素 + * @return map,key为现有元素,value为给定可迭代对象迭代的元素;
+ * 至少包含全部的key,如果对应位置上的value不存在,则为null;
+ * 如果key重复, 则保留最后一个关联的value;
*/ public Map toZip(Iterable other) { - Iterator iterator = other.iterator(); - return toMap(Function.identity(), e -> iterator.hasNext() ? iterator.next() : null); + // value对象迭代器 + final Iterator iterator = Opt.ofNullable(other).map(Iterable::iterator).orElseGet(Collections::emptyIterator); + if (isParallel()) { + List keyList = toList(); + final Map map = MapUtil.newHashMap(keyList.size()); + for (T key : keyList) { + map.put(key, iterator.hasNext() ? iterator.next() : null); + } + return map; + } else { + return toMap(Function.identity(), e -> iterator.hasNext() ? iterator.next() : null); + } } /** @@ -1283,63 +1383,106 @@ public class FastStream implements Stream, Iterable { return collect(CollectorUtil.groupingBy(classifier, mapFactory, downstream)); } + /** + * 将 现有元素 与 给定迭代器中对应位置的元素 使用 zipper 转换为新的元素,并返回新元素组成的流
+ * 新流的数量等于旧流元素的数量
+ * 使用 zipper 转换时, 如果对应位置上已经没有other元素,则other元素为null
+ * + * @param other 给定的迭代器 + * @param zipper 两个元素的合并器 + * @param 给定的迭代对象类型 + * @param 合并后的结果对象类型 + * @return 合并后的结果对象的流 + */ public FastStream zip(Iterable other, BiFunction zipper) { - Iterator iterator = other.iterator(); - return new FastStream<>(stream.map(e -> zipper.apply(e, iterator.hasNext() ? iterator.next() : null))); + Objects.requireNonNull(zipper); + // 给定对象迭代器 + final Iterator iterator = Opt.ofNullable(other).map(Iterable::iterator).orElseGet(Collections::emptyIterator); + Stream resStream = this.stream; + if (isParallel()) { + resStream = toList().stream(); + } + return of(resStream.map(e -> zipper.apply(e, iterator.hasNext() ? iterator.next() : null))); } /** * 类似js的
splice函数 * * @param start 起始下标 - * @param deleteCount 删除个数 + * @param deleteCount 删除个数,正整数 * @param items 放入值 * @return 操作后的流 */ @SuppressWarnings("unchecked") public FastStream splice(int start, int deleteCount, T... items) { List list = toList(); - if (start > -1) { - if (start >= list.size()) { - return FastStream.concat(FastStream.of(list), FastStream.of(items)); - } - list.removeAll(list.subList(start, start + deleteCount)); - list.addAll(start, Arrays.asList(items)); - return FastStream.of(list); + final int size = list.size(); + // 从后往前查找 + if (start < 0) { + start += size; + } else if (start >= size) { + // 直接在尾部追加,不删除 + start = size; + deleteCount = 0; } - if (-start > list.size()) { - return FastStream.concat(FastStream.of(items), FastStream.of(list)); + // 起始位置 加上 删除的数量 超过 数据长度,需要重新计算需要删除的数量 + if (start + deleteCount > size) { + deleteCount = size - start; } - start = list.size() + start; - list.removeAll(list.subList(start, start + deleteCount)); - list.addAll(start, Arrays.asList(items)); - return FastStream.of(list); + + // 新列表的长度 + final int newSize = size - deleteCount + items.length; + List resList = list; + // 新列表的长度 大于 旧列表,创建新列表 + if (newSize > size) { + resList = new ArrayList<>(newSize); + resList.addAll(list); + } + // 需要删除的部分 + if (deleteCount > 0) { + resList.subList(start, start + deleteCount).clear(); + } + // 新增的部分 + if (items.length > 0) { + resList.addAll(start, Arrays.asList(items)); + } + return FastStream.of(resList); } /** * 按指定长度切分为双层流 + *

+ * 形如:[1,2,3,4,5] -> [[1,2], [3,4], [5,6]] + *

* - * @param batchSize 指定长度 + * @param batchSize 指定长度, 正整数 * @return 切好的流 */ - public FastStream> sub(int batchSize) { + public FastStream> split(final int batchSize) { List list = toList(); - if (list.size() <= batchSize) { - return FastStream.>of(FastStream.of(list)).parallel(isParallel()); + final int size = list.size(); + // 指定长度 大于等于 列表长度 + if (size <= batchSize) { + // 返回第一层只有单个元素的双层流,形如:[[1,2,3,4,5]] + return FastStream.>of(of(list, isParallel())); } - return FastStream.iterate(0, i -> i < list.size(), i -> i + batchSize) - .map(skip -> FastStream.of(list).skip(skip).limit(batchSize)).parallel(isParallel()); + return FastStream.iterate(0, i -> i < size, i -> i + batchSize) + .map(skip -> of(list.subList(skip, Math.min(size, skip + batchSize)), isParallel())) + .parallel(isParallel()); } /** * 按指定长度切分为元素为list的流 + *

+ * 形如:[1,2,3,4,5] -> [[1,2], [3,4], [5,6]] + *

* - * @param batchSize 指定长度 + * @param batchSize 指定长度, 正整数 * @return 切好的流 */ - public FastStream> subList(int batchSize) { - return sub(batchSize).map(FastStream::toList); + public FastStream> splitList(final int batchSize) { + return split(batchSize).map(FastStream::toList); } public interface FastStreamBuilder extends Consumer, cn.hutool.core.builder.Builder> { diff --git a/hutool-core/src/test/java/cn/hutool/core/stream/FastStreamTest.java b/hutool-core/src/test/java/cn/hutool/core/stream/FastStreamTest.java index b4060d6b9..26d985094 100644 --- a/hutool-core/src/test/java/cn/hutool/core/stream/FastStreamTest.java +++ b/hutool-core/src/test/java/cn/hutool/core/stream/FastStreamTest.java @@ -1,10 +1,13 @@ package cn.hutool.core.stream; +import cn.hutool.core.collection.ListUtil; +import cn.hutool.core.map.MapUtil; import org.junit.Assert; import org.junit.Test; import java.util.*; +import java.util.stream.Collectors; import java.util.stream.Stream; import static java.util.Collections.singletonList; @@ -62,17 +65,19 @@ public class FastStreamTest { @Test public void testToZip() { - List orders = Arrays.asList(1, 2, 3); - List list = Arrays.asList("dromara", "hutool", "sweet"); - Map toZip = FastStream.of(orders).toZip(list); - Assert.assertEquals(new HashMap() { - private static final long serialVersionUID = 1L; + List orders = Arrays.asList(1, 2, 3, 2); + List list = Arrays.asList("dromara", "guava", "sweet", "hutool"); + final Map map = MapUtil.builder() + .put(1, "dromara") + .put(2, "hutool") + .put(3, "sweet") + .build(); - { - put(1, "dromara"); - put(2, "hutool"); - put(3, "sweet"); - }}, toZip); + Map toZip = FastStream.of(orders).toZip(list); + Assert.assertEquals(map, toZip); + + Map toZipParallel = FastStream.of(orders).parallel().toZip(list); + Assert.assertEquals(map, toZipParallel); } @Test @@ -92,10 +97,10 @@ public class FastStreamTest { private static final long serialVersionUID = 1L; { - put("1", 1); - put("2", 2); - put("3", 3); - }}, identityMap); + put("1", 1); + put("2", 2); + put("3", 3); + }}, identityMap); } @Test @@ -107,10 +112,10 @@ public class FastStreamTest { private static final long serialVersionUID = 1L; { - put("1", singletonList(1)); - put("2", singletonList(2)); - put("3", singletonList(3)); - }}, group); + put("1", singletonList(1)); + put("2", singletonList(2)); + put("3", singletonList(3)); + }}, group); } @Test @@ -126,19 +131,29 @@ public class FastStreamTest { public void testMapMulti() { List list = Arrays.asList(1, 2, 3); List mapMulti = FastStream.of(list).mapMulti((e, buffer) -> { - if (e % 2 == 0) { + for (int i = 0; i < e; i++) { buffer.accept(e); } - buffer.accept(e); }).toList(); - Assert.assertEquals(Arrays.asList(1, 2, 2, 3), mapMulti); + Assert.assertEquals(Arrays.asList(1, 2, 2, 3, 3, 3), mapMulti); } @Test public void testDistinct() { - List list = Arrays.asList(1, 2, 2, 3); - List distinctBy = FastStream.of(list).distinct(String::valueOf).toList(); - Assert.assertEquals(Arrays.asList(1, 2, 3), distinctBy); + List list = ListUtil.of(3, 2, 2, 1, null, null); + for (int i = 0; i < 1000; i++) { + list.add(i); + } + // 使用stream去重 + List collect1 = list.stream().distinct().collect(Collectors.toList()); + List collect2 = list.stream().parallel().distinct().collect(Collectors.toList()); + + // 使用FastStream去重 + List distinctBy1 = FastStream.of(list).distinct().toList(); + List distinctBy2 = FastStream.of(list).parallel().distinct(String::valueOf).toList(); + + Assert.assertEquals(collect1, distinctBy1); + Assert.assertEquals(collect2, distinctBy2); } @Test @@ -165,7 +180,7 @@ public class FastStreamTest { List mapIndex = FastStream.of(list).flatMapIdx((e, i) -> FastStream.of(i + 1 + "." + e)).toList(); Assert.assertEquals(Arrays.asList("1.dromara", "2.hutool", "3.sweet"), mapIndex); // 并行流时为-1 - Assert.assertEquals(Arrays.asList(-1, -1, -1), FastStream.of(1, 2, 3).parallel().mapIdx((e, i) -> i).toList()); + Assert.assertEquals(Arrays.asList(-1, -1, -1), FastStream.of(1, 2, 3).parallel().flatMapIdx((e, i) -> FastStream.of(i)).toList()); } @Test @@ -173,6 +188,9 @@ public class FastStreamTest { List list = Arrays.asList(1, 2, 3); List flatMapIter = FastStream.of(list).flatMapIter(e -> null).toList(); Assert.assertEquals(Collections.emptyList(), flatMapIter); + + flatMapIter = FastStream.of(list).flatMapIter(e -> Arrays.asList(e, e * 10)).toList(); + Assert.assertEquals(ListUtil.of(1, 10, 2, 20, 3, 30), flatMapIter); } @Test @@ -212,9 +230,9 @@ public class FastStreamTest { } @Test - public void testUnshift() { + public void testAddFirst() { List list = Arrays.asList(2, 3); - List unshift = FastStream.of(list).unshift(1).toList(); + List unshift = FastStream.of(list).addFirst(1).toList(); Assert.assertEquals(Arrays.asList(1, 2, 3), unshift); } @@ -256,10 +274,10 @@ public class FastStreamTest { @Test public void testFindLast() { - List list = Arrays.asList(1, null, 3); - Integer find = FastStream.of(list).findLast(Objects::nonNull); + List list = ListUtil.of(1, null, 3); + Integer find = FastStream.of(list).parallel().findLast(Objects::nonNull); Assert.assertEquals(3, (Object) find); - Assert.assertEquals(3, (Object) FastStream.of(list).findLast().orElse(null)); + Assert.assertEquals(3, (Object) FastStream.of(list).parallel().findLast().orElse(null)); } @Test @@ -279,22 +297,24 @@ public class FastStreamTest { } @Test - public void testSub() { + public void testListSplit() { List list = Arrays.asList(1, 2, 3, 4, 5); - List> lists = FastStream.of(list).sub(2).map(FastStream::toList).toList(); - Assert.assertEquals(Arrays.asList(Arrays.asList(1, 2), - Arrays.asList(3, 4), - singletonList(5) - ), lists); + List> lists = FastStream.of(list).split(2).map(FastStream::toList).toList(); + Assert.assertEquals(ListUtil.split(list, 2), lists); + + // 指定长度 大于等于 列表长度 + lists = FastStream.of(list).split(list.size()).map(FastStream::toList).toList(); + Assert.assertEquals(singletonList(list), lists); } @Test - public void testSubList() { + public void testSplitList() { List list = Arrays.asList(1, 2, 3, 4, 5); - List> lists = FastStream.of(list).subList(2).toList(); - Assert.assertEquals(Arrays.asList(Arrays.asList(1, 2), - Arrays.asList(3, 4), - singletonList(5) - ), lists); + List> lists = FastStream.of(list).splitList(2).toList(); + Assert.assertEquals(ListUtil.split(list, 2), lists); + + // 指定长度 大于等于 列表长度 + lists = FastStream.of(list).splitList(list.size()).toList(); + Assert.assertEquals(singletonList(list), lists); } }