diff --git a/hutool-core/src/main/java/cn/hutool/core/stream/AbstractEnhancedWrappedStream.java b/hutool-core/src/main/java/cn/hutool/core/stream/AbstractEnhancedWrappedStream.java index 122b7b9c9..741ae09d8 100644 --- a/hutool-core/src/main/java/cn/hutool/core/stream/AbstractEnhancedWrappedStream.java +++ b/hutool-core/src/main/java/cn/hutool/core/stream/AbstractEnhancedWrappedStream.java @@ -19,7 +19,7 @@ public abstract class AbstractEnhancedWrappedStream stream; + protected Stream stream; /** * 获取被包装的元素流实例 diff --git a/hutool-core/src/main/java/cn/hutool/core/stream/EasyStream.java b/hutool-core/src/main/java/cn/hutool/core/stream/EasyStream.java index 4570b25fe..d6ff7879b 100644 --- a/hutool-core/src/main/java/cn/hutool/core/stream/EasyStream.java +++ b/hutool-core/src/main/java/cn/hutool/core/stream/EasyStream.java @@ -263,6 +263,18 @@ public class EasyStream extends AbstractEnhancedWrappedStream(stream); } + /** + * 转换为子类实现 + * + * @param stream 流 + * @return 子类实现 + */ + @Override + public EasyStream transform(Stream stream) { + this.stream = stream; + return this; + } + /** * 建造者 * diff --git a/hutool-core/src/main/java/cn/hutool/core/stream/EntryStream.java b/hutool-core/src/main/java/cn/hutool/core/stream/EntryStream.java index f712f40fd..d31d9e2b7 100644 --- a/hutool-core/src/main/java/cn/hutool/core/stream/EntryStream.java +++ b/hutool-core/src/main/java/cn/hutool/core/stream/EntryStream.java @@ -756,6 +756,18 @@ public class EntryStream extends AbstractEnhancedWrappedStream(stream); } + /** + * 转换为子类实现 + * + * @param stream 流 + * @return 子类实现 + */ + @Override + public EntryStream transform(Stream> stream) { + this.stream = stream; + return this; + } + /** * key重复时直接抛出异常 */ diff --git a/hutool-core/src/main/java/cn/hutool/core/stream/TerminableWrappedStream.java b/hutool-core/src/main/java/cn/hutool/core/stream/TerminableWrappedStream.java index 58f7ed10c..4dbef1997 100644 --- a/hutool-core/src/main/java/cn/hutool/core/stream/TerminableWrappedStream.java +++ b/hutool-core/src/main/java/cn/hutool/core/stream/TerminableWrappedStream.java @@ -181,14 +181,36 @@ public interface TerminableWrappedStream toIdxMap() { + return toIdxMap(Function.identity()); + } + + /** + * 转换为map,key为下标,value为给定操作执行后的返回值 + * + * @param valueMapper 指定value操作 + * @param value类型 + * @return map + */ + default Map toIdxMap(Function valueMapper) { + final MutableInt index = new MutableInt(NOT_FOUND_ELEMENT_INDEX); + return EasyStream.of(toList()).toMap(e -> index.incrementAndGet(), valueMapper, (l, r) -> r); + } + + /** *

将集合转换为树,默认用 {@code parentId == null} 来判断树的根节点 * 因为需要在当前传入数据里查找,所以这是一个结束操作
* - * @param idGetter id的getter对应的lambda,可以写作 {@code Student::getId} 会过滤掉id为null的元素 + * @param idGetter id的getter对应的lambda,可以写作 {@code Student::getId} * @param pIdGetter parentId的getter对应的lambda,可以写作 {@code Student::getParentId} * @param childrenSetter children的setter对应的lambda,可以写作{ @code Student::setChildren} * @param 此处是id、parentId的泛型限制 @@ -211,7 +233,7 @@ public interface TerminableWrappedStream Objects.equals(s.getParentId(),0L) } @@ -535,7 +557,8 @@ public interface TerminableWrappedStream action) { Objects.requireNonNull(action); if (isParallel()) { - unwrap().forEach(e -> action.accept(e, NOT_FOUND_ELEMENT_INDEX)); + EasyStream.of(toIdxMap().entrySet()).parallel(isParallel()) + .forEach(e -> action.accept(e.getValue(), e.getKey())); } else { final MutableInt index = new MutableInt(NOT_FOUND_ELEMENT_INDEX); unwrap().forEach(e -> action.accept(e, index.incrementAndGet())); @@ -551,7 +574,9 @@ public interface TerminableWrappedStream action) { Objects.requireNonNull(action); if (isParallel()) { - unwrap().forEachOrdered(e -> action.accept(e, NOT_FOUND_ELEMENT_INDEX)); + EasyStream.of(toIdxMap().entrySet()) + .parallel(isParallel()) + .forEachOrdered(e -> action.accept(e.getValue(), e.getKey())); } else { final MutableInt index = new MutableInt(NOT_FOUND_ELEMENT_INDEX); unwrap().forEachOrdered(e -> action.accept(e, index.incrementAndGet())); diff --git a/hutool-core/src/main/java/cn/hutool/core/stream/TransformableWrappedStream.java b/hutool-core/src/main/java/cn/hutool/core/stream/TransformableWrappedStream.java index 3272b9bd0..ef85e8ad1 100644 --- a/hutool-core/src/main/java/cn/hutool/core/stream/TransformableWrappedStream.java +++ b/hutool-core/src/main/java/cn/hutool/core/stream/TransformableWrappedStream.java @@ -9,19 +9,10 @@ import cn.hutool.core.map.MapUtil; import cn.hutool.core.map.SafeConcurrentHashMap; import cn.hutool.core.util.ArrayUtil; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Set; +import java.util.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.BiConsumer; -import java.util.function.BiFunction; -import java.util.function.BiPredicate; -import java.util.function.Consumer; -import java.util.function.Function; -import java.util.function.Predicate; +import java.util.function.*; import java.util.stream.Collectors; import java.util.stream.Stream; import java.util.stream.StreamSupport; @@ -51,8 +42,8 @@ public interface TransformableWrappedStream other, final BiFunction zipper) { Objects.requireNonNull(zipper); - final Map idxIdentityMap = mapIdx((e, idx) -> MapUtil.entry(idx, e)).collect(CollectorUtil.entryToMap()); - final Map idxOtherMap = EasyStream.of(other).mapIdx((e, idx) -> MapUtil.entry(idx, e)).collect(CollectorUtil.entryToMap()); + Map idxIdentityMap = mapIdx((e, idx) -> MapUtil.entry(idx, e)).collect(CollectorUtil.entryToMap()); + Map idxOtherMap = EasyStream.of(other).mapIdx((e, idx) -> MapUtil.entry(idx, e)).collect(CollectorUtil.entryToMap()); if (idxIdentityMap.size() <= idxOtherMap.size()) { return EasyStream.of(idxIdentityMap.keySet(), isParallel()).map(k -> zipper.apply(idxIdentityMap.get(k), idxOtherMap.get(k))); } @@ -259,8 +250,7 @@ public interface TransformableWrappedStream action) { Objects.requireNonNull(action); if (isParallel()) { - return peek(e -> action.accept(e, NOT_FOUND_ELEMENT_INDEX)); + final Map idxMap = easyStream().toIdxMap(); + return transform(EasyStream.of(idxMap.entrySet()) + .parallel(isParallel()) + .peek(e -> action.accept(e.getValue(), e.getKey())) + .map(Map.Entry::getValue)); } else { final AtomicInteger index = new AtomicInteger(NOT_FOUND_ELEMENT_INDEX); return peek(e -> action.accept(e, index.incrementAndGet())); @@ -370,8 +364,7 @@ public interface TransformableWrappedStream predicate) { Objects.requireNonNull(predicate); if (isParallel()) { - return filter(e -> predicate.test(e, NOT_FOUND_ELEMENT_INDEX)); + final Map idxMap = easyStream().toIdxMap(); + return transform(EasyStream.of(idxMap.entrySet()) + .parallel(isParallel()) + .filter(e -> predicate.test(e.getValue(), e.getKey())) + .map(Map.Entry::getValue)); } else { final MutableInt index = new MutableInt(NOT_FOUND_ELEMENT_INDEX); return filter(e -> predicate.test(e, index.incrementAndGet())); @@ -423,8 +420,7 @@ public interface TransformableWrappedStream 拆分后流的元素类型 @@ -433,7 +429,10 @@ public interface TransformableWrappedStream EasyStream flatMapIdx(final BiFunction> mapper) { Objects.requireNonNull(mapper); if (isParallel()) { - return flatMap(e -> mapper.apply(e, NOT_FOUND_ELEMENT_INDEX)); + final Map idxMap = easyStream().toIdxMap(); + return EasyStream.of(idxMap.entrySet()) + .parallel(isParallel()) + .flatMap(e -> mapper.apply(e.getValue(), e.getKey())); } else { final MutableInt index = new MutableInt(NOT_FOUND_ELEMENT_INDEX); return flatMap(e -> mapper.apply(e, index.incrementAndGet())); @@ -534,8 +533,7 @@ public interface TransformableWrappedStream 函数执行后返回的类型 @@ -544,7 +542,10 @@ public interface TransformableWrappedStream EasyStream mapIdx(final BiFunction mapper) { Objects.requireNonNull(mapper); if (isParallel()) { - return map(e -> mapper.apply(e, NOT_FOUND_ELEMENT_INDEX)); + final Map idxMap = easyStream().toIdxMap(); + return EasyStream.of(idxMap.entrySet()) + .parallel(isParallel()) + .map(e -> mapper.apply(e.getValue(), e.getKey())); } else { final MutableInt index = new MutableInt(NOT_FOUND_ELEMENT_INDEX); return map(e -> mapper.apply(e, index.incrementAndGet())); diff --git a/hutool-core/src/main/java/cn/hutool/core/stream/WrappedStream.java b/hutool-core/src/main/java/cn/hutool/core/stream/WrappedStream.java index 7e503226a..e89f03262 100644 --- a/hutool-core/src/main/java/cn/hutool/core/stream/WrappedStream.java +++ b/hutool-core/src/main/java/cn/hutool/core/stream/WrappedStream.java @@ -25,7 +25,7 @@ import java.util.stream.*; public interface WrappedStream> extends Stream, Iterable { /** - * 代表不存在的下标, 一般用于并行流的下标, 或者未找到元素时的下标 + * 代表不存在的下标, 或者未找到元素时的下标 */ int NOT_FOUND_ELEMENT_INDEX = -1; @@ -587,4 +587,28 @@ public interface WrappedStream> extends Stream< @Override String toString(); + /** + * 转换为EasyStream + * + * @return 转换后的EasyStream + */ + @SuppressWarnings("unchecked") + default EasyStream easyStream() { + if (this instanceof EasyStream) { + return (EasyStream) this; + } else if (this instanceof Iterator) { + return (EasyStream) EasyStream.of((Iterator) this); + } else { + return EasyStream.of(collect(Collectors.toList())); + } + } + + /** + * 转换为子类实现 + * + * @param stream 流 + * @return 子类实现 + */ + S transform(Stream stream); + } diff --git a/hutool-core/src/test/java/cn/hutool/core/stream/AbstractEnhancedWrappedStreamTest.java b/hutool-core/src/test/java/cn/hutool/core/stream/AbstractEnhancedWrappedStreamTest.java index 88db081cb..9f7563a01 100644 --- a/hutool-core/src/test/java/cn/hutool/core/stream/AbstractEnhancedWrappedStreamTest.java +++ b/hutool-core/src/test/java/cn/hutool/core/stream/AbstractEnhancedWrappedStreamTest.java @@ -181,6 +181,7 @@ public class AbstractEnhancedWrappedStreamTest { final List list = asList(1, 2, 3); final Map> map = new HashMap>() { private static final long serialVersionUID = 1L; + { put(Boolean.TRUE, singletonList(2)); put(Boolean.FALSE, asList(1, 3)); @@ -295,14 +296,8 @@ public class AbstractEnhancedWrappedStreamTest { Assert.assertEquals(asList(1, 2, 3), elements); Assert.assertEquals(asList(0, 1, 2), indexes); - final Set elements2 = new HashSet<>(); - final Set indexes2 = new HashSet<>(); - wrap(1, 2, null).parallel().peekIdx((t, i) -> { - elements2.add(t); - indexes2.add(i); - }).exec(); - Assert.assertEquals(new HashSet<>(asList(1, null, 2)), elements2); - Assert.assertEquals(new HashSet<>(asList(-1, -1, -1)), indexes2); + wrap("one", "two", "three", "four").parallel().filter(e -> e.length() == 4) + .peekIdx((e, i) -> Assert.assertEquals("four:0", e + ":" + i)).exec(); } @Test @@ -630,11 +625,13 @@ public class AbstractEnhancedWrappedStreamTest { public void testToEntries() { final Map expect = new HashMap() { private static final long serialVersionUID = 1L; + { - put(1, 1); - put(2, 2); - put(3, 3); - }}; + put(1, 1); + put(2, 2); + put(3, 3); + } + }; Map map = EasyStream.of(1, 2, 3) .toEntries(Function.identity(), Function.identity()) .toMap(); @@ -709,6 +706,18 @@ public class AbstractEnhancedWrappedStreamTest { return new Wrapper<>(source); } + /** + * 转换为子类实现 + * + * @param stream 流 + * @return 子类实现 + */ + @Override + public Wrapper transform(final Stream stream) { + this.stream = stream; + return this; + } + } @Setter diff --git a/hutool-core/src/test/java/cn/hutool/core/stream/EasyStreamTest.java b/hutool-core/src/test/java/cn/hutool/core/stream/EasyStreamTest.java index 10d462161..de43e3b4b 100644 --- a/hutool-core/src/test/java/cn/hutool/core/stream/EasyStreamTest.java +++ b/hutool-core/src/test/java/cn/hutool/core/stream/EasyStreamTest.java @@ -9,6 +9,7 @@ import org.junit.Assert; import org.junit.Test; import java.util.*; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -142,8 +143,9 @@ public class EasyStreamTest { final List list = Arrays.asList("dromara", "hutool", "sweet"); final List mapIndex = EasyStream.of(list).mapIdx((e, i) -> i + 1 + "." + e).toList(); Assert.assertEquals(Arrays.asList("1.dromara", "2.hutool", "3.sweet"), mapIndex); - // 并行流时为-1 - Assert.assertEquals(Arrays.asList(-1, -1, -1), EasyStream.of(1, 2, 3).parallel().mapIdx((e, i) -> i).toList()); + // 并行流时正常 + Assert.assertEquals(Arrays.asList("1.dromara", "2.hutool", "3.sweet"), + EasyStream.of("dromara", "hutool", "sweet").parallel().mapIdx((e, i) -> i + 1 + "." + e).toList()); } @Test @@ -192,8 +194,10 @@ public class EasyStreamTest { final EasyStream.Builder builder = EasyStream.builder(); EasyStream.of(list).forEachIdx((e, i) -> builder.accept(i + 1 + "." + e)); Assert.assertEquals(Arrays.asList("1.dromara", "2.hutool", "3.sweet"), builder.build().toList()); - // 并行流时为-1 - EasyStream.of(1, 2, 3).parallel().forEachIdx((e, i) -> Assert.assertEquals(-1, (Object) i)); + // 并行流时正常 + final AtomicInteger total = new AtomicInteger(0); + EasyStream.of("dromara", "hutool", "sweet").parallel().forEachIdx((e, i) -> total.addAndGet(i)); + Assert.assertEquals(3, total.get()); } @Test @@ -205,7 +209,7 @@ public class EasyStreamTest { final EasyStream.Builder streamBuilder = EasyStream.builder(); EasyStream.of(list).parallel().forEachOrderedIdx((e, i) -> streamBuilder.accept(i + 1 + "." + e)); - Assert.assertEquals(Arrays.asList("0.dromara", "0.hutool", "0.sweet"), streamBuilder.build().toList()); + Assert.assertEquals(Arrays.asList("1.dromara", "2.hutool", "3.sweet"), streamBuilder.build().toList()); } @@ -214,8 +218,10 @@ public class EasyStreamTest { final List list = Arrays.asList("dromara", "hutool", "sweet"); final List mapIndex = EasyStream.of(list).flatMapIdx((e, i) -> EasyStream.of(i + 1 + "." + e)).toList(); Assert.assertEquals(Arrays.asList("1.dromara", "2.hutool", "3.sweet"), mapIndex); - // 并行流时为-1 - Assert.assertEquals(Arrays.asList(-1, -1, -1), EasyStream.of(1, 2, 3).parallel().flatMapIdx((e, i) -> EasyStream.of(i)).toList()); + // 并行流时正常 + Assert.assertEquals(Arrays.asList("1.dromara", "2.hutool", "3.sweet"), + EasyStream.of("dromara", "hutool", "sweet").parallel() + .flatMapIdx((e, i) -> EasyStream.of(i + 1 + "." + e)).toList()); } @Test @@ -269,8 +275,9 @@ public class EasyStreamTest { final List list = Arrays.asList("dromara", "hutool", "sweet"); final List filterIndex = EasyStream.of(list).filterIdx((e, i) -> i < 2).toList(); Assert.assertEquals(Arrays.asList("dromara", "hutool"), filterIndex); - // 并行流时为-1 - Assert.assertEquals(3L, EasyStream.of(1, 2, 3).parallel().filterIdx((e, i) -> i == -1).count()); + // 并行流时正常 + Assert.assertEquals(Arrays.asList("dromara", "hutool"), + EasyStream.of("dromara", "hutool", "sweet").parallel().filterIdx((e, i) -> i < 2).toList()); } @Test @@ -434,8 +441,6 @@ public class EasyStreamTest { Consumer test = o -> { final List studentTree = EasyStream .of( - // 会过滤掉id为null的元素 - Student.builder().name("top").build(), Student.builder().id(1L).name("dromara").build(), Student.builder().id(2L).name("baomidou").build(), Student.builder().id(3L).name("hutool").parentId(1L).build(),