diff --git a/hutool-core/src/main/java/cn/hutool/core/stream/AbstractEnhancedStreamWrapper.java b/hutool-core/src/main/java/cn/hutool/core/stream/AbstractEnhancedStreamWrapper.java new file mode 100644 index 000000000..02a9a6789 --- /dev/null +++ b/hutool-core/src/main/java/cn/hutool/core/stream/AbstractEnhancedStreamWrapper.java @@ -0,0 +1,40 @@ +package cn.hutool.core.stream; + +import java.util.Objects; +import java.util.stream.Stream; + +/** + * {@link StreamWrapper}的基本实现,用于包装一个已有的流实例, + * 使其支持相对原生{@link Stream}更多的中间操作与终端操作。 + * + * @author huangchengxing + * @see EasyStream + * @see EntryStream + */ +public abstract class AbstractEnhancedStreamWrapper> + implements TerminableStreamWrapper, TransformableStreamWrapper { + + /** + * 原始的流实例 + */ + protected final Stream stream; + + /** + * 获取被包装的元素流实例 + */ + @Override + public Stream stream() { + return stream; + } + + /** + * 创建一个流包装器 + * + * @param stream 包装的流对象 + * @throws NullPointerException 当{@code stream}为{@code null}时抛出 + */ + protected AbstractEnhancedStreamWrapper(Stream stream) { + this.stream = Objects.requireNonNull(stream, "stream must not null"); + } + +} diff --git a/hutool-core/src/main/java/cn/hutool/core/stream/EasyStream.java b/hutool-core/src/main/java/cn/hutool/core/stream/EasyStream.java index 5de688e5c..6fed9712c 100644 --- a/hutool-core/src/main/java/cn/hutool/core/stream/EasyStream.java +++ b/hutool-core/src/main/java/cn/hutool/core/stream/EasyStream.java @@ -1,23 +1,16 @@ package cn.hutool.core.stream; -import cn.hutool.core.collection.ListUtil; -import cn.hutool.core.lang.Assert; -import cn.hutool.core.lang.Console; import cn.hutool.core.lang.Opt; import cn.hutool.core.lang.mutable.MutableInt; import cn.hutool.core.lang.mutable.MutableObj; import cn.hutool.core.map.MapUtil; -import cn.hutool.core.text.StrUtil; import cn.hutool.core.util.ArrayUtil; import cn.hutool.core.util.ObjUtil; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import java.util.function.*; -import java.util.stream.Collector; -import java.util.stream.Collectors; import java.util.stream.Stream; import java.util.stream.StreamSupport; @@ -56,11 +49,7 @@ import java.util.stream.StreamSupport; * @see java.util.stream.Stream * @since 6.0.0 */ -public class EasyStream extends StreamWrapper> implements Stream, Iterable { - /** - * 代表不存在的下标, 一般用于并行流的下标, 或者未找到元素时的下标 - */ - private static final int NOT_FOUND_INDEX = -1; +public class EasyStream extends AbstractEnhancedStreamWrapper> { /** * 构造 @@ -266,32 +255,6 @@ public class EasyStream extends StreamWrapper> implements St return filter(e -> Objects.equals(mapper.apply(e), value)); } - /** - * 过滤元素,返回与指定断言匹配的元素组成的流,断言带下标,并行流时下标永远为-1 - * 这是一个无状态中间操作 - * - * @param predicate 断言 - * @return 返回叠加过滤操作后的流 - */ - public EasyStream filterIdx(final BiPredicate predicate) { - Objects.requireNonNull(predicate); - if (isParallel()) { - return filter(e -> predicate.test(e, NOT_FOUND_INDEX)); - } else { - final MutableInt index = new MutableInt(NOT_FOUND_INDEX); - return filter(e -> predicate.test(e, index.incrementAndGet())); - } - } - - /** - * 过滤掉空元素 - * - * @return 过滤后的流 - */ - public EasyStream nonNull() { - return new EasyStream<>(stream.filter(Objects::nonNull)); - } - /** * 返回与指定函数将元素作为参数执行的结果组成的流 * 这是一个无状态中间操作 @@ -459,175 +422,6 @@ public class EasyStream extends StreamWrapper> implements St } } - /** - * 返回与指定函数将元素作为参数执行后组成的流。操作带下标,并行流时下标永远为-1 - * 这是一个无状态中间操作 - * @param action 指定的函数 - * @return 返回叠加操作后的FastStream - * @apiNote 该方法存在的意义主要是用来调试 - * 当你需要查看经过操作管道某处的元素和下标,可以执行以下操作: - *
{@code
-	 *     .of("one", "two", "three", "four")
-	 * 				.filter(e -> e.length() > 3)
-	 * 				.peekIdx((e,i) -> System.out.println("Filtered value: " + e + " Filtered idx:" + i))
-	 * 				.map(String::toUpperCase)
-	 * 				.peekIdx((e,i) -> System.out.println("Mapped value: " + e + " Mapped idx:" + i))
-	 * 				.collect(Collectors.toList());
-	 * }
- */ - public EasyStream peekIdx(BiConsumer action) { - Objects.requireNonNull(action); - if (isParallel()) { - return peek(e -> action.accept(e, NOT_FOUND_INDEX)); - } else { - AtomicInteger index = new AtomicInteger(NOT_FOUND_INDEX); - return peek(e -> action.accept(e, index.incrementAndGet())); - } - } - - /** - * 返回叠加调用{@link Console#log(Object)}打印出结果的流 - * - * @return 返回叠加操作后的FastStream - */ - public EasyStream log() { - return peek(Console::log); - } - - /** - * 对流里面的每一个元素执行一个操作,操作带下标,并行流时下标永远为-1 - * 这是一个终端操作 - * - * @param action 操作 - */ - public void forEachIdx(final BiConsumer action) { - Objects.requireNonNull(action); - if (isParallel()) { - stream.forEach(e -> action.accept(e, NOT_FOUND_INDEX)); - } else { - final MutableInt index = new MutableInt(NOT_FOUND_INDEX); - stream.forEach(e -> action.accept(e, index.incrementAndGet())); - } - } - - /** - * 对流里面的每一个元素按照顺序执行一个操作,操作带下标,并行流时下标永远为-1 - * 这是一个终端操作 - * - * @param action 操作 - */ - public void forEachOrderedIdx(final BiConsumer action) { - Objects.requireNonNull(action); - if (isParallel()) { - stream.forEachOrdered(e -> action.accept(e, NOT_FOUND_INDEX)); - } else { - final MutableInt index = new MutableInt(NOT_FOUND_INDEX); - stream.forEachOrdered(e -> action.accept(e, index.incrementAndGet())); - } - } - - /** - * 获取与给定断言匹配的第一个元素 - * - * @param predicate 断言 - * @return 与给定断言匹配的第一个元素 - */ - public Optional findFirst(final Predicate predicate) { - return stream.filter(predicate).findFirst(); - } - - /** - * 获取与给定断言匹配的第一个元素的下标,并行流下标永远为-1 - * - * @param predicate 断言 - * @return 与给定断言匹配的第一个元素的下标,如果不存在则返回-1 - */ - public int findFirstIdx(final Predicate predicate) { - Objects.requireNonNull(predicate); - if (isParallel()) { - return NOT_FOUND_INDEX; - } else { - final MutableInt index = new MutableInt(NOT_FOUND_INDEX); - //noinspection ResultOfMethodCallIgnored - stream.filter(e -> { - index.increment(); - return predicate.test(e); - }).findFirst(); - return index.get(); - } - } - - /** - * 获取最后一个元素 - * - * @return 最后一个元素 - */ - public Optional findLast() { - final MutableObj last = new MutableObj<>(null); - spliterator().forEachRemaining(last::set); - return Optional.ofNullable(last.get()); - } - - /** - * 获取与给定断言匹配的最后一个元素 - * - * @param predicate 断言 - * @return 与给定断言匹配的最后一个元素 - */ - public Optional findLast(final Predicate predicate) { - Objects.requireNonNull(predicate); - final MutableObj last = new MutableObj<>(null); - spliterator().forEachRemaining(e -> { - if (predicate.test(e)) { - last.set(e); - } - }); - return Optional.ofNullable(last.get()); - } - - /** - * 获取与给定断言匹配的最后一个元素的下标,并行流下标永远为-1 - * - * @param predicate 断言 - * @return 与给定断言匹配的最后一个元素的下标,如果不存在则返回-1 - */ - public int findLastIdx(final Predicate predicate) { - Objects.requireNonNull(predicate); - if (isParallel()) { - return NOT_FOUND_INDEX; - } else { - final MutableInt idxRef = new MutableInt(NOT_FOUND_INDEX); - forEachIdx((e, i) -> { - if (predicate.test(e)) { - idxRef.set(i); - } - }); - return idxRef.get(); - } - } - - /** - * 反转顺序 - * - * @return 反转元素顺序 - */ - @SuppressWarnings("unchecked") - public EasyStream reverse() { - final T[] array = (T[]) toArray(); - ArrayUtil.reverse(array); - return of(array).parallel(isParallel()).onClose(stream::close); - } - - /** - * 更改流的并行状态 - * - * @param parallel 是否并行 - * @return 流 - */ - public EasyStream parallel(final boolean parallel) { - return parallel ? parallel() : sequential(); - } - /** * 与给定元素组成的流合并,成为新的流 * @@ -638,16 +432,6 @@ public class EasyStream extends StreamWrapper> implements St return EasyStream.concat(this.stream, of(obj)); } - /** - * 与给定元素组成的流合并,成为新的流 - * - * @param obj 元素 - * @return 流 - */ - @SuppressWarnings("unchecked") - public EasyStream push(final T... obj) { - return EasyStream.concat(this.stream, of(obj)); - } /** * 给定元素组成的流与当前流合并,成为新的流 @@ -659,27 +443,6 @@ public class EasyStream extends StreamWrapper> implements St return EasyStream.concat(of(obj), this.stream); } - /** - * 给定元素组成的流与当前流合并,成为新的流 - * - * @param obj 元素 - * @return 流 - */ - @SafeVarargs - public final EasyStream unshift(final T... obj) { - return EasyStream.concat(of(obj), this.stream); - } - - /** - * 获取流中指定下标的元素,如果是负数,则从最后一个开始数起 - * - * @param idx 下标 - * @return 指定下标的元素 - */ - @SuppressWarnings("unchecked") - public Optional at(final Integer idx) { - return Opt.ofNullable(idx).map(i -> (T) ArrayUtil.get(toArray(), i)).toOptional(); - } /** * 根据一个原始的流,返回一个新包装类实例 @@ -688,158 +451,10 @@ public class EasyStream extends StreamWrapper> implements St * @return 实现类 */ @Override - protected EasyStream convertToStreamImpl(Stream stream) { + public EasyStream wrapping(Stream stream) { return new EasyStream<>(stream); } - /** - * 转换成集合 - * - * @param collectionFactory 集合工厂(可以是集合构造器) - * @param 集合类型 - * @return 集合 - */ - public > C toColl(final Supplier collectionFactory) { - return collect(Collectors.toCollection(collectionFactory)); - } - - /** - * 转换为ArrayList - * - * @return list - */ - public List toList() { - return collect(Collectors.toList()); - } - - /** - * 转换为HashSet - * - * @return hashSet - */ - public Set toSet() { - return collect(Collectors.toSet()); - } - - /** - * 与给定的可迭代对象转换成Map,key为现有元素,value为给定可迭代对象迭代的元素
- * Map的大小与两个集合中较小的数量一致, 即, 只合并下标位置相同的部分 - * - * @param other 可迭代对象 - * @param 可迭代对象迭代的元素类型 - * @return map,key为现有元素,value为给定可迭代对象迭代的元素 - */ - public Map toZip(final Iterable other) { - final Spliterator keys = spliterator(); - final Spliterator values = Opt.ofNullable(other).map(Iterable::spliterator).orElseGet(Spliterators::emptySpliterator); - // 获取两个Spliterator的中较小的数量 - // 如果Spliterator经过流操作, getExactSizeIfKnown()可能会返回-1, 所以默认大小为 MapUtil.DEFAULT_INITIAL_CAPACITY - final int sizeIfKnown = (int) Math.max(Math.min(keys.getExactSizeIfKnown(), values.getExactSizeIfKnown()), MapUtil.DEFAULT_INITIAL_CAPACITY); - final Map map = MapUtil.newHashMap(sizeIfKnown); - // 保存第一个Spliterator的值 - final MutableObj key = new MutableObj<>(); - // 保存第二个Spliterator的值 - final MutableObj value = new MutableObj<>(); - // 当两个Spliterator中都还有剩余元素时 - while (keys.tryAdvance(key::set) && values.tryAdvance(value::set)) { - map.put(key.get(), value.get()); - } - return map; - } - - /** - * 返回拼接后的字符串 - * - * @return 拼接后的字符串 - */ - public String join() { - return join(StrUtil.EMPTY); - } - - /** - * 返回拼接后的字符串 - * - * @param delimiter 分隔符 - * @return 拼接后的字符串 - */ - public String join(final CharSequence delimiter) { - return join(delimiter, StrUtil.EMPTY, StrUtil.EMPTY); - } - - /** - * 返回拼接后的字符串 - * - * @param delimiter 分隔符 - * @param prefix 前缀 - * @param suffix 后缀 - * @return 拼接后的字符串 - */ - public String join(final CharSequence delimiter, - final CharSequence prefix, - final CharSequence suffix) { - return map(String::valueOf).collect(Collectors.joining(delimiter, prefix, suffix)); - } - - /** - * 转换为map,key为给定操作执行后的返回值,value为当前元素 - * - * @param keyMapper 指定的key操作 - * @param key类型 - * @return map - */ - public Map toMap(final Function keyMapper) { - return toMap(keyMapper, Function.identity()); - } - - /** - * 转换为map,key,value为给定操作执行后的返回值 - * - * @param keyMapper 指定的key操作 - * @param valueMapper 指定value操作 - * @param key类型 - * @param value类型 - * @return map - */ - public Map toMap(final Function keyMapper, - final Function valueMapper) { - return toMap(keyMapper, valueMapper, (l, r) -> r); - } - - /** - * 转换为map,key,value为给定操作执行后的返回值 - * - * @param keyMapper 指定的key操作 - * @param valueMapper 指定value操作 - * @param mergeFunction 合并操作 - * @param key类型 - * @param value类型 - * @return map - */ - public Map toMap(final Function keyMapper, - final Function valueMapper, - final BinaryOperator mergeFunction) { - return toMap(keyMapper, valueMapper, mergeFunction, HashMap::new); - } - - /** - * 转换为map,key,value为给定操作执行后的返回值 - * - * @param keyMapper 指定的key操作 - * @param valueMapper 指定value操作 - * @param mergeFunction 合并操作 - * @param mapSupplier map工厂 - * @param key类型 - * @param value类型 - * @param map类型 - * @return map - */ - public > M toMap(final Function keyMapper, - final Function valueMapper, - final BinaryOperator mergeFunction, - final Supplier mapSupplier) { - return collect(CollectorUtil.toMap(keyMapper, valueMapper, mergeFunction, mapSupplier)); - } - /** * 将集合转换为树,默认用 {@code parentId == null} 作为顶部,内置一个小递归 * 因为需要在当前传入数据里查找,所以这是一个结束操作 @@ -929,51 +544,6 @@ public class EasyStream extends StreamWrapper> implements St return flat(recursive).peek(e -> childrenSetter.accept(e, null)); } - - /** - * 通过给定分组依据进行分组 - * - * @param classifier 分组依据 - * @param 实体中的分组依据对应类型,也是Map中key的类型 - * @return {@link Collector} - */ - public Map> group(final Function classifier) { - return group(classifier, Collectors.toList()); - } - - /** - * 通过给定分组依据进行分组 - * - * @param classifier 分组依据 - * @param downstream 下游操作 - * @param 实体中的分组依据对应类型,也是Map中key的类型 - * @param 下游操作对应返回类型,也是Map中value的类型 - * @param 下游操作在进行中间操作时对应类型 - * @return {@link Collector} - */ - public Map group(final Function classifier, - final Collector downstream) { - return group(classifier, HashMap::new, downstream); - } - - /** - * 通过给定分组依据进行分组 - * - * @param classifier 分组依据 - * @param mapFactory 提供的map - * @param downstream 下游操作 - * @param 实体中的分组依据对应类型,也是Map中key的类型 - * @param 下游操作对应返回类型,也是Map中value的类型 - * @param 下游操作在进行中间操作时对应类型 - * @param 最后返回结果Map类型 - * @return {@link Collector} - */ - public > M group(final Function classifier, - final Supplier mapFactory, - final Collector downstream) { - return collect(CollectorUtil.groupingBy(classifier, mapFactory, downstream)); - } - /** * 将 现有元素 与 给定迭代器中对应位置的元素 使用 zipper 转换为新的元素,并返回新元素组成的流
* 新流的数量为两个集合中较小的数量, 即, 只合并下标位置相同的部分
@@ -1004,21 +574,6 @@ public class EasyStream extends StreamWrapper> implements St return of(list).parallel(isParallel()).onClose(stream::close); } - /** - * 类似js的
splice函数 - * - * @param start 起始下标 - * @param deleteCount 删除个数,正整数 - * @param items 放入值 - * @return 操作后的流 - */ - @SafeVarargs - public final EasyStream splice(final int start, final int deleteCount, final T... items) { - return of(ListUtil.splice(toList(), start, deleteCount, items)) - .parallel(isParallel()) - .onClose(stream::close); - } - /** * 按指定长度切分为双层流 *

@@ -1055,85 +610,6 @@ public class EasyStream extends StreamWrapper> implements St return split(batchSize).map(EasyStream::toList); } - /** - * 保留 与指定断言 匹配时的元素, 在第一次不匹配时终止, 抛弃当前(第一个不匹配元素)及后续所有元素 - *

与 jdk9 中的 takeWhile 方法不太一样, 这里的实现是个 顺序的、有状态的中间操作

- *
本环节中是顺序执行的, 但是后续操作可以支持并行流: {@code
-	 * FastStream.iterate(1, i -> i + 1)
-	 * 	.parallel()
-	 * 	// 顺序执行
-	 * 	.takeWhile(e -> e < 50)
-	 * 	// 并发
-	 * 	.map(e -> e + 1)
-	 * 	// 并发
-	 * 	.map(String::valueOf)
-	 * 	.toList();
-	 * }
- *

但是不建议在并行流中使用, 除非你确定 takeWhile 之后的操作能在并行流中受益很多

- * - * @param predicate 断言 - * @return 与指定断言匹配的元素组成的流 - */ - public EasyStream takeWhile(final Predicate predicate) { - Objects.requireNonNull(predicate); - return of(StreamUtil.takeWhile(stream, predicate)); - } - - - /** - * 删除 与指定断言 匹配的元素, 在第一次不匹配时终止, 返回当前(第一个不匹配元素)及剩余元素组成的新流 - *

与 jdk9 中的 dropWhile 方法不太一样, 这里的实现是个 顺序的、有状态的中间操作

- *
本环节中是顺序执行的, 但是后续操作可以支持并行流: {@code
-	 * FastStream.iterate(1, i <= 100, i -> i + 1)
-	 * 	.parallel()
-	 * 	// 顺序执行
-	 * 	.dropWhile(e -> e < 50)
-	 * 	// 并发
-	 * 	.map(e -> e + 1)
-	 * 	// 并发
-	 * 	.map(String::valueOf)
-	 * 	.toList();
-	 * }
- *

但是不建议在并行流中使用, 除非你确定 dropWhile 之后的操作能在并行流中受益很多

- * - * @param predicate 断言 - * @return 剩余元素组成的流 - */ - public EasyStream dropWhile(final Predicate predicate) { - Objects.requireNonNull(predicate); - return of(StreamUtil.dropWhile(stream, predicate)); - } - - /** - * 流是否为空 - * - * @return 流是否为空 - */ - public boolean isEmpty() { - return !findAny().isPresent(); - } - - /** - * 流是否不为空 - * - * @return 流是否不为空 - */ - public boolean isNotEmpty() { - return !isEmpty(); - } - - /** - * 将当前流转为另一对象。用于提供针对流本身而非流中元素的操作 - * - * @param 转换类型 - * @param transform 转换 - * @return 转换后的流 - */ - public Optional transform(final Function, R> transform) { - Assert.notNull(transform, "transform must not null"); - return Optional.ofNullable(transform.apply(this)); - } - public interface FastStreamBuilder extends Consumer, cn.hutool.core.builder.Builder> { /** diff --git a/hutool-core/src/main/java/cn/hutool/core/stream/EntryStream.java b/hutool-core/src/main/java/cn/hutool/core/stream/EntryStream.java index 81b7a6ecd..c9eadd81c 100644 --- a/hutool-core/src/main/java/cn/hutool/core/stream/EntryStream.java +++ b/hutool-core/src/main/java/cn/hutool/core/stream/EntryStream.java @@ -22,7 +22,7 @@ import java.util.stream.StreamSupport; * @param 值类型 * @author huangchengxing */ -public class EntryStream extends StreamWrapper, EntryStream> { +public class EntryStream extends AbstractEnhancedStreamWrapper, EntryStream> { /** * 默认的空键值对 @@ -157,7 +157,7 @@ public class EntryStream extends StreamWrapper, EntryStrea */ public EntryStream distinctByKey() { Set accessed = new ConcurrentHashSet<>(16); - return convertToStreamImpl(stream.filter(e -> { + return wrapping(stream.filter(e -> { K key = e.getKey(); if (accessed.contains(key)) { return false; @@ -174,7 +174,7 @@ public class EntryStream extends StreamWrapper, EntryStrea */ public EntryStream distinctByValue() { Set accessed = new ConcurrentHashSet<>(16); - return convertToStreamImpl(stream.filter(e -> { + return wrapping(stream.filter(e -> { V val = e.getValue(); if (accessed.contains(val)) { return false; @@ -299,7 +299,7 @@ public class EntryStream extends StreamWrapper, EntryStrea * @return {@link EntryStream}实例 */ public EntryStream append(K key, V value) { - return convertToStreamImpl(Stream.concat(stream, Stream.of(ofEntry(key, value)))); + return wrapping(Stream.concat(stream, Stream.of(ofEntry(key, value)))); } /** @@ -310,7 +310,7 @@ public class EntryStream extends StreamWrapper, EntryStrea * @return {@link EntryStream}实例 */ public EntryStream prepend(K key, V value) { - return convertToStreamImpl(Stream.concat(Stream.of(ofEntry(key, value)), stream)); + return wrapping(Stream.concat(Stream.of(ofEntry(key, value)), stream)); } /** @@ -325,7 +325,7 @@ public class EntryStream extends StreamWrapper, EntryStrea } final Stream> contacted = StreamSupport.stream(entries.spliterator(), isParallel()) .map(EntryStream::ofEntry); - return convertToStreamImpl(Stream.concat(stream, contacted)); + return wrapping(Stream.concat(stream, contacted)); } /** @@ -340,7 +340,7 @@ public class EntryStream extends StreamWrapper, EntryStrea } final Stream> contacted = StreamSupport.stream(entries.spliterator(), isParallel()) .map(EntryStream::ofEntry); - return convertToStreamImpl(Stream.concat(contacted, stream)); + return wrapping(Stream.concat(contacted, stream)); } /** @@ -744,7 +744,7 @@ public class EntryStream extends StreamWrapper, EntryStrea * @return 实现类 */ @Override - public EntryStream convertToStreamImpl(Stream> stream) { + public EntryStream wrapping(Stream> stream) { return new EntryStream<>(stream); } diff --git a/hutool-core/src/main/java/cn/hutool/core/stream/SimpleStreamWrapper.java b/hutool-core/src/main/java/cn/hutool/core/stream/SimpleStreamWrapper.java new file mode 100644 index 000000000..636e54f1f --- /dev/null +++ b/hutool-core/src/main/java/cn/hutool/core/stream/SimpleStreamWrapper.java @@ -0,0 +1,55 @@ +package cn.hutool.core.stream; + +import java.util.function.Function; +import java.util.stream.Stream; + +/** + * {@link AbstractEnhancedStreamWrapper}的基本实现 + * + * @author huangchengxing + */ +public class SimpleStreamWrapper extends AbstractEnhancedStreamWrapper> { + + /** + * 创建一个流包装器 + * + * @param stream 包装的流对象 + * @throws NullPointerException 当{@code stream}为{@code null}时抛出 + */ + SimpleStreamWrapper(Stream stream) { + super(stream); + } + + /** + * 将一个流包装为{@link SimpleStreamWrapper} + * + * @param source 被包装的流 + * @return 包装后的流 + */ + @Override + public SimpleStreamWrapper wrapping(Stream source) { + return new SimpleStreamWrapper<>(source); + } + + /** + * 将当前流中元素映射为另一类型 + * + * @param mapper 映射方法 + * @return 映射后的流 + */ + @Override + public SimpleStreamWrapper map(Function mapper) { + return new SimpleStreamWrapper<>(stream().map(mapper)); + } + + /** + * 将当前流中元素展开为流,然后返回由这些新流中元素组成的流 + * + * @param mapper 映射方法 + * @return 映射后的流 + */ + @Override + public SimpleStreamWrapper flatMap(Function> mapper) { + return new SimpleStreamWrapper<>(stream().flatMap(mapper)); + } +} diff --git a/hutool-core/src/main/java/cn/hutool/core/stream/StreamWrapper.java b/hutool-core/src/main/java/cn/hutool/core/stream/StreamWrapper.java index 60da8e88d..8c6519f66 100644 --- a/hutool-core/src/main/java/cn/hutool/core/stream/StreamWrapper.java +++ b/hutool-core/src/main/java/cn/hutool/core/stream/StreamWrapper.java @@ -1,32 +1,57 @@ package cn.hutool.core.stream; -import java.util.*; +import cn.hutool.core.util.ObjUtil; + +import java.util.Comparator; +import java.util.Iterator; +import java.util.Optional; +import java.util.Spliterator; import java.util.function.*; import java.util.stream.*; /** - * {@link Stream}的包装类,用于基于一个已有的流实例进行扩展 + *

表示一个用于增强原始{@link Stream}对象的包装器,当调用{@link Stream}中的方法时, + * 将会代理到被包装的原始流对象,并返回指定的包装器实例。 * + * @param 流中的元素类型 + * @param 链式调用获得的实现类类型 * @author huangchengxing - * @see EasyStream + * @see TerminableStreamWrapper + * @see TransformableStreamWrapper + * @see AbstractEnhancedStreamWrapper */ -abstract class StreamWrapper> implements Stream, Iterable { +public interface StreamWrapper> extends Stream, Iterable { /** - * 原始的流实例 + * 代表不存在的下标, 一般用于并行流的下标, 或者未找到元素时的下标 */ - protected final Stream stream; + int NOT_FOUND_INDEX = -1; /** - * 创建一个流包装器 + * 将一个流包装为简单增强流,若{@code stream}为{@code null}则默认返回一个空串行流 * - * @param stream 包装的流对象 + * @param stream 被包装的流 + * @return {@link SimpleStreamWrapper}实例 */ - protected StreamWrapper(Stream stream) { - Objects.requireNonNull(stream, "stream must not null"); - this.stream = stream; + static SimpleStreamWrapper create(Stream stream) { + return new SimpleStreamWrapper<>(ObjUtil.defaultIfNull(stream, Stream.empty())); } + /** + * 获取被包装的原始流 + * + * @return 被包装的原始流 + */ + Stream stream(); + + /** + * 将一个原始流包装为指定类型的增强流 + * + * @param source 被包装的流 + * @return I + */ + I wrapping(Stream source); + /** * 过滤元素,返回与指定断言匹配的元素组成的流 * 这是一个无状态中间操作 @@ -35,8 +60,8 @@ abstract class StreamWrapper> implements Stream, Itera * @return 返回叠加过滤操作后的流 */ @Override - public I filter(Predicate predicate) { - return convertToStreamImpl(stream.filter(predicate)); + default I filter(Predicate predicate) { + return wrapping(stream().filter(predicate)); } /** @@ -47,8 +72,8 @@ abstract class StreamWrapper> implements Stream, Itera * @return 叠加操作后元素类型全为int的流 */ @Override - public IntStream mapToInt(ToIntFunction mapper) { - return stream.mapToInt(mapper); + default IntStream mapToInt(ToIntFunction mapper) { + return stream().mapToInt(mapper); } /** @@ -59,8 +84,8 @@ abstract class StreamWrapper> implements Stream, Itera * @return 叠加操作后元素类型全为long的流 */ @Override - public LongStream mapToLong(ToLongFunction mapper) { - return stream.mapToLong(mapper); + default LongStream mapToLong(ToLongFunction mapper) { + return stream().mapToLong(mapper); } /** @@ -71,8 +96,8 @@ abstract class StreamWrapper> implements Stream, Itera * @return 叠加操作后元素类型全为double的流 */ @Override - public DoubleStream mapToDouble(ToDoubleFunction mapper) { - return stream.mapToDouble(mapper); + default DoubleStream mapToDouble(ToDoubleFunction mapper) { + return stream().mapToDouble(mapper); } /** @@ -83,8 +108,8 @@ abstract class StreamWrapper> implements Stream, Itera * @return 返回叠加拆分操作后的IntStream */ @Override - public IntStream flatMapToInt(Function mapper) { - return stream.flatMapToInt(mapper); + default IntStream flatMapToInt(Function mapper) { + return stream().flatMapToInt(mapper); } /** @@ -95,8 +120,8 @@ abstract class StreamWrapper> implements Stream, Itera * @return 返回叠加拆分操作后的LongStream */ @Override - public LongStream flatMapToLong(Function mapper) { - return stream.flatMapToLong(mapper); + default LongStream flatMapToLong(Function mapper) { + return stream().flatMapToLong(mapper); } /** @@ -107,8 +132,8 @@ abstract class StreamWrapper> implements Stream, Itera * @return 返回叠加拆分操作后的DoubleStream */ @Override - public DoubleStream flatMapToDouble(Function mapper) { - return stream.flatMapToDouble(mapper); + default DoubleStream flatMapToDouble(Function mapper) { + return stream().flatMapToDouble(mapper); } /** @@ -118,8 +143,8 @@ abstract class StreamWrapper> implements Stream, Itera * @return 一个具有去重特征的流 */ @Override - public I distinct() { - return convertToStreamImpl(stream.distinct()); + default I distinct() { + return wrapping(stream().distinct()); } /** @@ -131,8 +156,8 @@ abstract class StreamWrapper> implements Stream, Itera * @return 一个元素按自然顺序排序的流 */ @Override - public I sorted() { - return convertToStreamImpl(stream.sorted()); + default I sorted() { + return wrapping(stream().sorted()); } /** @@ -145,8 +170,8 @@ abstract class StreamWrapper> implements Stream, Itera * @return 一个元素按指定的Comparator排序的流 */ @Override - public I sorted(Comparator comparator) { - return convertToStreamImpl(stream.sorted(comparator)); + default I sorted(Comparator comparator) { + return wrapping(stream().sorted(comparator)); } /** @@ -167,8 +192,8 @@ abstract class StreamWrapper> implements Stream, Itera * } */ @Override - public I peek(Consumer action) { - return convertToStreamImpl(stream.peek(action)); + default I peek(Consumer action) { + return wrapping(stream().peek(action)); } /** @@ -179,8 +204,8 @@ abstract class StreamWrapper> implements Stream, Itera * @return 截取后的流 */ @Override - public I limit(long maxSize) { - return convertToStreamImpl(stream.limit(maxSize)); + default I limit(long maxSize) { + return wrapping(stream().limit(maxSize)); } /** @@ -191,8 +216,8 @@ abstract class StreamWrapper> implements Stream, Itera * @return 丢弃前面n个元素后的剩余元素组成的流 */ @Override - public I skip(long n) { - return convertToStreamImpl(stream.skip(n)); + default I skip(long n) { + return wrapping(stream().skip(n)); } /** @@ -202,8 +227,8 @@ abstract class StreamWrapper> implements Stream, Itera * @param action 操作 */ @Override - public void forEach(Consumer action) { - stream.forEach(action); + default void forEach(Consumer action) { + stream().forEach(action); } /** @@ -213,8 +238,8 @@ abstract class StreamWrapper> implements Stream, Itera * @param action 操作 */ @Override - public void forEachOrdered(Consumer action) { - stream.forEachOrdered(action); + default void forEachOrdered(Consumer action) { + stream().forEachOrdered(action); } /** @@ -224,8 +249,8 @@ abstract class StreamWrapper> implements Stream, Itera * @return 包含此流元素的数组 */ @Override - public Object[] toArray() { - return stream.toArray(); + default Object[] toArray() { + return stream().toArray(); } /** @@ -238,8 +263,8 @@ abstract class StreamWrapper> implements Stream, Itera * @throws ArrayStoreException 如果元素转换失败,例如不是该元素类型及其父类,则抛出该异常 */ @Override - public A[] toArray(IntFunction generator) { - return stream.toArray(generator); + default A[] toArray(IntFunction generator) { + return stream().toArray(generator); } /** @@ -263,8 +288,8 @@ abstract class StreamWrapper> implements Stream, Itera * @return 聚合计算后的值 */ @Override - public T reduce(T identity, BinaryOperator accumulator) { - return stream.reduce(identity, accumulator); + default T reduce(T identity, BinaryOperator accumulator) { + return stream().reduce(identity, accumulator); } /** @@ -298,8 +323,8 @@ abstract class StreamWrapper> implements Stream, Itera * @see #max(Comparator) */ @Override - public Optional reduce(BinaryOperator accumulator) { - return stream.reduce(accumulator); + default Optional reduce(BinaryOperator accumulator) { + return stream().reduce(accumulator); } /** @@ -315,8 +340,8 @@ abstract class StreamWrapper> implements Stream, Itera * @see #reduce(Object, BinaryOperator) */ @Override - public U reduce(U identity, BiFunction accumulator, BinaryOperator combiner) { - return stream.reduce(identity, accumulator, combiner); + default U reduce(U identity, BiFunction accumulator, BinaryOperator combiner) { + return stream().reduce(identity, accumulator, combiner); } /** @@ -333,8 +358,8 @@ abstract class StreamWrapper> implements Stream, Itera * } */ @Override - public R collect(Supplier supplier, BiConsumer accumulator, BiConsumer combiner) { - return stream.collect(supplier, accumulator, combiner); + default R collect(Supplier supplier, BiConsumer accumulator, BiConsumer combiner) { + return stream().collect(supplier, accumulator, combiner); } /** @@ -347,8 +372,8 @@ abstract class StreamWrapper> implements Stream, Itera * @return 收集后的容器 */ @Override - public R collect(Collector collector) { - return stream.collect(collector); + default R collect(Collector collector) { + return stream().collect(collector); } /** @@ -358,8 +383,8 @@ abstract class StreamWrapper> implements Stream, Itera * @return 最小值 */ @Override - public Optional min(Comparator comparator) { - return stream.min(comparator); + default Optional min(Comparator comparator) { + return stream().min(comparator); } /** @@ -369,8 +394,8 @@ abstract class StreamWrapper> implements Stream, Itera * @return 最大值 */ @Override - public Optional max(Comparator comparator) { - return stream.max(comparator); + default Optional max(Comparator comparator) { + return stream().max(comparator); } /** @@ -379,8 +404,8 @@ abstract class StreamWrapper> implements Stream, Itera * @return 流元素个数 */ @Override - public long count() { - return stream.count(); + default long count() { + return stream().count(); } /** @@ -390,8 +415,8 @@ abstract class StreamWrapper> implements Stream, Itera * @return 是否有任何一个元素满足给定断言 */ @Override - public boolean anyMatch(Predicate predicate) { - return stream.anyMatch(predicate); + default boolean anyMatch(Predicate predicate) { + return stream().anyMatch(predicate); } /** @@ -401,8 +426,8 @@ abstract class StreamWrapper> implements Stream, Itera * @return 是否所有元素满足给定断言 */ @Override - public boolean allMatch(Predicate predicate) { - return stream.allMatch(predicate); + default boolean allMatch(Predicate predicate) { + return stream().allMatch(predicate); } /** @@ -412,8 +437,8 @@ abstract class StreamWrapper> implements Stream, Itera * @return 是否没有元素满足给定断言 */ @Override - public boolean noneMatch(Predicate predicate) { - return stream.noneMatch(predicate); + default boolean noneMatch(Predicate predicate) { + return stream().noneMatch(predicate); } /** @@ -422,8 +447,8 @@ abstract class StreamWrapper> implements Stream, Itera * @return 第一个元素 */ @Override - public Optional findFirst() { - return stream.findFirst(); + default Optional findFirst() { + return stream().findFirst(); } /** @@ -432,8 +457,8 @@ abstract class StreamWrapper> implements Stream, Itera * @return 随便取一个 */ @Override - public Optional findAny() { - return stream.findAny(); + default Optional findAny() { + return stream().findAny(); } /** @@ -442,8 +467,8 @@ abstract class StreamWrapper> implements Stream, Itera * @return 流的迭代器 */ @Override - public Iterator iterator() { - return stream.iterator(); + default Iterator iterator() { + return stream().iterator(); } /** @@ -452,8 +477,8 @@ abstract class StreamWrapper> implements Stream, Itera * @return 流的拆分器 */ @Override - public Spliterator spliterator() { - return stream.spliterator(); + default Spliterator spliterator() { + return stream().spliterator(); } /** @@ -462,8 +487,8 @@ abstract class StreamWrapper> implements Stream, Itera * @return 流的并行状态 */ @Override - public boolean isParallel() { - return stream.isParallel(); + default boolean isParallel() { + return stream().isParallel(); } /** @@ -472,8 +497,8 @@ abstract class StreamWrapper> implements Stream, Itera * @return 串行流 */ @Override - public I sequential() { - return convertToStreamImpl(stream.sequential()); + default I sequential() { + return wrapping(stream().sequential()); } /** @@ -482,8 +507,8 @@ abstract class StreamWrapper> implements Stream, Itera * @return 并行流 */ @Override - public I parallel() { - return convertToStreamImpl(stream.parallel()); + default I parallel() { + return wrapping(stream().parallel()); } /** @@ -493,8 +518,8 @@ abstract class StreamWrapper> implements Stream, Itera * @return 无序流 */ @Override - public I unordered() { - return convertToStreamImpl(stream.unordered()); + default I unordered() { + return wrapping(stream().unordered()); } /** @@ -504,8 +529,8 @@ abstract class StreamWrapper> implements Stream, Itera * @return 流 */ @Override - public I onClose(Runnable closeHandler) { - return convertToStreamImpl(stream.onClose(closeHandler)); + default I onClose(Runnable closeHandler) { + return wrapping(stream().onClose(closeHandler)); } /** @@ -514,50 +539,33 @@ abstract class StreamWrapper> implements Stream, Itera * @see AutoCloseable#close() */ @Override - public void close() { - stream.close(); + default void close() { + stream().close(); } /** - * hashcode + * 获取当前实例的哈希值 * - * @return hashcode + * @return 哈希值 */ @Override - public int hashCode() { - return stream.hashCode(); - } + int hashCode(); /** - * equals + * 比较实例是否相等 * * @param obj 对象 - * @return 结果 + * @return 是否相等 */ @Override - public boolean equals(final Object obj) { - if (obj instanceof Stream) { - return stream.equals(obj); - } - return false; - } + boolean equals(final Object obj); /** - * toString + * 将当前实例转为字符串 * - * @return string + * @return 字符串 */ @Override - public String toString() { - return stream.toString(); - } - - /** - * 根据一个原始的流,返回一个新包装类实例 - * - * @param stream 流 - * @return 实现类 - */ - protected abstract I convertToStreamImpl(Stream stream); + String toString(); } diff --git a/hutool-core/src/main/java/cn/hutool/core/stream/TerminableStreamWrapper.java b/hutool-core/src/main/java/cn/hutool/core/stream/TerminableStreamWrapper.java new file mode 100644 index 000000000..eee30f86f --- /dev/null +++ b/hutool-core/src/main/java/cn/hutool/core/stream/TerminableStreamWrapper.java @@ -0,0 +1,475 @@ +package cn.hutool.core.stream; + +import cn.hutool.core.lang.Opt; +import cn.hutool.core.lang.mutable.MutableInt; +import cn.hutool.core.lang.mutable.MutableObj; +import cn.hutool.core.util.ArrayUtil; + +import java.util.*; +import java.util.function.*; +import java.util.stream.Collector; +import java.util.stream.Collectors; + +/** + * {@link StreamWrapper}的扩展,为实现类提供更多终端操作方法 + * + * @param 流中的元素类型 + * @param 链式调用获得的实现类类型 + * @author huangchengxing + */ +public interface TerminableStreamWrapper> extends StreamWrapper { + + // region ============ join ============ + + /** + * 返回拼接后的字符串 + * + * @return 拼接后的字符串 + */ + default String join() { + return this.join(""); + } + + /** + * 返回拼接后的字符串 + * + * @param delimiter 分隔符 + * @return 拼接后的字符串 + */ + default String join(CharSequence delimiter) { + return this.join(delimiter, "", ""); + } + + /** + * 返回拼接后的字符串 + * + * @param delimiter 分隔符 + * @param prefix 前缀 + * @param suffix 后缀 + * @return 拼接后的字符串 + */ + default String join(CharSequence delimiter, + CharSequence prefix, + CharSequence suffix) { + return stream().map(String::valueOf).collect(Collectors.joining(delimiter, prefix, suffix)); + } + + // endregion + + // region ============ to collection ============ + + /** + * 转换成集合 + * + * @param collectionFactory 集合工厂(可以是集合构造器) + * @param 集合类型 + * @return 集合 + */ + default > C toColl(Supplier collectionFactory) { + return stream().collect(Collectors.toCollection(collectionFactory)); + } + + /** + * 转换为{@link ArrayList} + * + * @return 集合 + */ + default List toList() { + return this.toColl(ArrayList::new); + } + + /** + * 换为不可变集合 + * + * @return 集合 + */ + default List toUnmodifiableList() { + return Collections.unmodifiableList(this.toList()); + } + + /** + * 转换为HashSet + * + * @return 集合 + */ + default Set toSet() { + return this.toColl(HashSet::new); + } + + /** + * 换为不可变集合 + * + * @return 集合 + */ + default Set toUnmodifiableSet() { + return Collections.unmodifiableSet(this.toSet()); + } + + // endregion + + // region ============ to map ============ + + /** + * 转换为map,key为给定操作执行后的返回值,value为当前元素 + * + * @param keyMapper 指定的key操作 + * @param key类型 + * @return map + */ + default Map toMap(Function keyMapper) { + return this.toMap(keyMapper, Function.identity()); + } + + /** + * 转换为map,key,value为给定操作执行后的返回值 + * + * @param keyMapper 指定的key操作 + * @param valueMapper 指定value操作 + * @param key类型 + * @param value类型 + * @return map + */ + default Map toMap( + Function keyMapper, Function valueMapper) { + return this.toMap(keyMapper, valueMapper, (l, r) -> r); + } + + /** + * 转换为不可变map,key,value为给定操作执行后的返回值 + * + * @param keyMapper 指定的key操作 + * @param valueMapper 指定value操作 + * @param key类型 + * @param value类型 + * @return map + */ + default Map toUnmodifiableMap( + Function keyMapper, Function valueMapper) { + return Collections.unmodifiableMap(this.toMap(keyMapper, valueMapper)); + } + + /** + * 转换为map,key,value为给定操作执行后的返回值 + * + * @param keyMapper 指定的key操作 + * @param valueMapper 指定value操作 + * @param mergeFunction 合并操作 + * @param key类型 + * @param value类型 + * @return map + */ + default Map toMap( + Function keyMapper, + Function valueMapper, + BinaryOperator mergeFunction) { + return this.toMap(keyMapper, valueMapper, mergeFunction, HashMap::new); + } + + /** + * 转换为不可变map,key,value为给定操作执行后的返回值 + * + * @param keyMapper 指定的key操作 + * @param valueMapper 指定value操作 + * @param mergeFunction 合并操作 + * @param key类型 + * @param value类型 + * @return map + */ + default Map toUnmodifiableMap( + Function keyMapper, + Function valueMapper, + BinaryOperator mergeFunction) { + return Collections.unmodifiableMap( + this.toMap(keyMapper, valueMapper, mergeFunction, HashMap::new) + ); + } + + /** + * 转换为map,key,value为给定操作执行后的返回值 + * + * @param keyMapper 指定的key操作 + * @param valueMapper 指定value操作 + * @param mergeFunction 合并操作 + * @param mapSupplier map工厂 + * @param key类型 + * @param value类型 + * @param map类型 + * @return map + */ + default > M toMap( + Function keyMapper, + Function valueMapper, + BinaryOperator mergeFunction, + Supplier mapSupplier) { + return stream().collect(Collectors.toMap(keyMapper, valueMapper, mergeFunction, mapSupplier)); + } + + // endregion + + // region ============ to zip ============ + + /** + * 与给定的可迭代对象转换成map,key为现有元素,value为给定可迭代对象迭代的元素
+ * 至少包含全部的key,如果对应位置上的value不存在,则为null + * + * @param other 可迭代对象 + * @param 可迭代对象迭代的元素类型 + * @return map,key为现有元素,value为给定可迭代对象迭代的元素;
+ * 至少包含全部的key,如果对应位置上的value不存在,则为null;
+ * 如果key重复, 则保留最后一个关联的value;
+ */ + default Map toZip(Iterable other) { + // value对象迭代器 + final Iterator iterator = Opt.ofNullable(other).map(Iterable::iterator).orElseGet(Collections::emptyIterator); + if (this.isParallel()) { + List keyList = toList(); + final Map map = new HashMap<>(keyList.size()); + for (T key : keyList) { + map.put(key, iterator.hasNext() ? iterator.next() : null); + } + return map; + } else { + return this.toMap(Function.identity(), e -> iterator.hasNext() ? iterator.next() : null); + } + } + + // endregion + + // region ============ group ============ + + /** + * 通过给定分组依据进行分组 + * + * @param classifier 分组依据,得到的键为{@code null}时不会抛出异常 + * @param 实体中的分组依据对应类型,也是Map中key的类型 + * @return map + */ + default Map> group(Function classifier) { + return this.group(classifier, Collectors.toList()); + } + + /** + * 通过给定分组依据进行分组 + * + * @param classifier 分组依据,得到的键为{@code null}时不会抛出异常 + * @param downstream 下游操作 + * @param 实体中的分组依据对应类型,也是Map中key的类型 + * @param 下游操作对应返回类型,也是Map中value的类型 + * @param
下游操作在进行中间操作时对应类型 + * @return map + */ + default Map group( + Function classifier, Collector downstream) { + return this.group(classifier, HashMap::new, downstream); + } + + /** + * 通过给定分组依据进行分组 + * + * @param classifier 分组依据,得到的键为{@code null}时不会抛出异常 + * @param mapFactory 提供的map + * @param downstream 下游操作 + * @param 实体中的分组依据对应类型,也是Map中key的类型 + * @param 下游操作对应返回类型,也是Map中value的类型 + * @param 下游操作在进行中间操作时对应类型 + * @param 最后返回结果Map类型 + * @return map + * @see CollectorUtil#groupingBy(Function, Supplier, Collector) + */ + default > M group( + Function classifier, + Supplier mapFactory, + Collector downstream) { + return stream().collect(CollectorUtil.groupingBy(classifier, mapFactory, downstream)); + } + + /** + * 根据给定判断条件分组 + * + * @param predicate 判断条件 + * @return map + */ + default Map> partition(Predicate predicate) { + return this.partition(predicate, ArrayList::new); + } + + /** + * 根据给定判断条件分组 + * + * @param predicate 判断条件 + * @param collFactory 提供的集合 + * @return map + */ + default > Map partition(Predicate predicate, Supplier collFactory) { + return this.partition(predicate, Collectors.toCollection(collFactory)); + } + + /** + * 根据给定判断条件分组 + * + * @param predicate 判断条件 + * @param downstream 下游操作 + * @param 返回值类型 + * @return map + */ + default Map partition(Predicate predicate, Collector downstream) { + return stream().collect(Collectors.partitioningBy(predicate, downstream)); + } + + // endregion + + // region ============ foreach ============ + + /** + * 对流里面的每一个元素执行一个操作,操作带下标,并行流时下标永远为-1 + * 这是一个终端操作 + * + * @param action 操作 + */ + default void forEachIdx(final BiConsumer action) { + Objects.requireNonNull(action); + if (isParallel()) { + stream().forEach(e -> action.accept(e, NOT_FOUND_INDEX)); + } else { + final MutableInt index = new MutableInt(NOT_FOUND_INDEX); + stream().forEach(e -> action.accept(e, index.incrementAndGet())); + } + } + + /** + * 对流里面的每一个元素按照顺序执行一个操作,操作带下标,并行流时下标永远为-1 + * 这是一个终端操作 + * + * @param action 操作 + */ + default void forEachOrderedIdx(final BiConsumer action) { + Objects.requireNonNull(action); + if (isParallel()) { + stream().forEachOrdered(e -> action.accept(e, NOT_FOUND_INDEX)); + } else { + final MutableInt index = new MutableInt(NOT_FOUND_INDEX); + stream().forEachOrdered(e -> action.accept(e, index.incrementAndGet())); + } + } + + // endregion + + // region ============ find & get ============ + + + + /** + * 获取与给定断言匹配的第一个元素 + * + * @param predicate 断言 + * @return 与给定断言匹配的第一个元素 + */ + default Optional findFirst(final Predicate predicate) { + return stream().filter(predicate).findFirst(); + } + + /** + * 获取与给定断言匹配的第一个元素的下标,并行流下标永远为-1 + * + * @param predicate 断言 + * @return 与给定断言匹配的第一个元素的下标,如果不存在则返回-1 + */ + default int findFirstIdx(final Predicate predicate) { + Objects.requireNonNull(predicate); + if (isParallel()) { + return NOT_FOUND_INDEX; + } else { + final MutableInt index = new MutableInt(NOT_FOUND_INDEX); + stream().filter(e -> { + index.increment(); + return predicate.test(e); + }).findFirst(); + return index.get(); + } + } + + /** + * 获取最后一个元素 + * + * @return 最后一个元素 + */ + default Optional findLast() { + final MutableObj last = new MutableObj<>(null); + spliterator().forEachRemaining(last::set); + return Optional.ofNullable(last.get()); + } + + /** + * 获取与给定断言匹配的最后一个元素 + * + * @param predicate 断言 + * @return 与给定断言匹配的最后一个元素 + */ + default Optional findLast(final Predicate predicate) { + Objects.requireNonNull(predicate); + final MutableObj last = new MutableObj<>(null); + spliterator().forEachRemaining(e -> { + if (predicate.test(e)) { + last.set(e); + } + }); + return Optional.ofNullable(last.get()); + } + + /** + * 获取与给定断言匹配的最后一个元素的下标,并行流下标永远为-1 + * + * @param predicate 断言 + * @return 与给定断言匹配的最后一个元素的下标,如果不存在则返回-1 + */ + default int findLastIdx(final Predicate predicate) { + Objects.requireNonNull(predicate); + if (isParallel()) { + return NOT_FOUND_INDEX; + } else { + final MutableInt idxRef = new MutableInt(NOT_FOUND_INDEX); + forEachIdx((e, i) -> { + if (predicate.test(e)) { + idxRef.set(i); + } + }); + return idxRef.get(); + } + } + + /** + * 获取流中指定下标的元素,如果是负数,则从最后一个开始数起 + * + * @param idx 下标 + * @return 指定下标的元素 + */ + @SuppressWarnings("unchecked") + default Optional at(final Integer idx) { + return Opt.ofNullable(idx).map(i -> (T) ArrayUtil.get(toArray(), i)).toOptional(); + } + + // endregion + + // region ============ is ============ + + /** + * 流是否为空 + * + * @return 流是否为空 + */ + default boolean isEmpty() { + return !findAny().isPresent(); + } + + /** + * 流是否不为空 + * + * @return 流是否不为空 + */ + default boolean isNotEmpty() { + return !isEmpty(); + } + + // endregion + +} diff --git a/hutool-core/src/main/java/cn/hutool/core/stream/TransformableStreamWrapper.java b/hutool-core/src/main/java/cn/hutool/core/stream/TransformableStreamWrapper.java new file mode 100644 index 000000000..10ad264bf --- /dev/null +++ b/hutool-core/src/main/java/cn/hutool/core/stream/TransformableStreamWrapper.java @@ -0,0 +1,216 @@ +package cn.hutool.core.stream; + +import cn.hutool.core.collection.ListUtil; +import cn.hutool.core.lang.Assert; +import cn.hutool.core.lang.Console; +import cn.hutool.core.lang.mutable.MutableInt; +import cn.hutool.core.util.ArrayUtil; + +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiConsumer; +import java.util.function.BiPredicate; +import java.util.function.Function; +import java.util.function.Predicate; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * {@link StreamWrapper}的扩展,为实现类提供更多中间操作方法 + * + * @param 流中的元素类型 + * @param 链式调用获得的实现类类型 + * @author huangchengxing + */ +public interface TransformableStreamWrapper> extends StreamWrapper { + + /** + * 过滤元素,返回与指定断言匹配的元素组成的流,断言带下标,并行流时下标永远为-1 + * 这是一个无状态中间操作 + * + * @param predicate 断言 + * @return 返回叠加过滤操作后的流 + */ + default I filterIdx(final BiPredicate predicate) { + Objects.requireNonNull(predicate); + if (isParallel()) { + return filter(e -> predicate.test(e, NOT_FOUND_INDEX)); + } else { + final MutableInt index = new MutableInt(NOT_FOUND_INDEX); + return filter(e -> predicate.test(e, index.incrementAndGet())); + } + } + + /** + * 过滤掉空元素 + * + * @return 过滤后的流 + */ + default I nonNull() { + return filter(Objects::nonNull); + } + + /** + * 返回与指定函数将元素作为参数执行后组成的流。操作带下标,并行流时下标永远为-1 + * 这是一个无状态中间操作 + * @param action 指定的函数 + * @return 返回叠加操作后的FastStream + * @apiNote 该方法存在的意义主要是用来调试 + * 当你需要查看经过操作管道某处的元素和下标,可以执行以下操作: + *

{@code
+	 *     Stream.of("one", "two", "three", "four")
+	 * 				.filter(e -> e.length() > 3)
+	 * 				.peekIdx((e,i) -> System.out.println("Filtered value: " + e + " Filtered idx:" + i))
+	 * 				.map(String::toUpperCase)
+	 * 				.peekIdx((e,i) -> System.out.println("Mapped value: " + e + " Mapped idx:" + i))
+	 * 				.collect(Collectors.toList());
+	 * }
+ */ + default I peekIdx(BiConsumer action) { + Objects.requireNonNull(action); + if (isParallel()) { + return peek(e -> action.accept(e, NOT_FOUND_INDEX)); + } else { + AtomicInteger index = new AtomicInteger(NOT_FOUND_INDEX); + return peek(e -> action.accept(e, index.incrementAndGet())); + } + } + + /** + * 返回叠加调用{@link Console#log(Object)}打印出结果的流 + * + * @return 返回叠加操作后的FastStream + */ + default I log() { + return peek(Console::log); + } + + /** + * 反转顺序 + * + * @return 反转元素顺序 + */ + @SuppressWarnings("unchecked") + default I reverse() { + final T[] array = (T[]) toArray(); + ArrayUtil.reverse(array); + return wrapping(Stream.of(array)).parallel(isParallel()); + } + + /** + * 更改流的并行状态 + * + * @param parallel 是否并行 + * @return 流 + */ + default I parallel(final boolean parallel) { + return parallel ? parallel() : sequential(); + } + + /** + * 与给定元素组成的流合并,成为新的流 + * + * @param obj 元素 + * @return 流 + */ + @SuppressWarnings("unchecked") + default I push(final T... obj) { + Stream result = stream(); + if (ArrayUtil.isNotEmpty(obj)) { + result = Stream.concat(stream(), Stream.of(obj)); + } + return wrapping(result); + } + + /** + * 给定元素组成的流与当前流合并,成为新的流 + * + * @param obj 元素 + * @return 流 + */ + default I unshift(final T... obj) { + Stream result = stream(); + if (ArrayUtil.isNotEmpty(obj)) { + result = Stream.concat(Stream.of(obj), stream()); + } + return wrapping(result); + } + + /** + * 通过删除或替换现有元素或者原地添加新的元素来修改列表,并以列表形式返回被修改的内容。此方法不会改变原列表。 + * 类似js的splice函数 + * + * @param start 起始下标 + * @param deleteCount 删除个数,正整数 + * @param items 放入值 + * @return 操作后的流 + */ + default I splice(final int start, final int deleteCount, final T... items) { + final List elements = stream().collect(Collectors.toList()); + return wrapping(ListUtil.splice(elements, start, deleteCount, items).stream()) + .parallel(isParallel()); + } + + /** + * 保留 与指定断言 匹配时的元素, 在第一次不匹配时终止, 抛弃当前(第一个不匹配元素)及后续所有元素 + *

与 jdk9 中的 takeWhile 方法不太一样, 这里的实现是个 顺序的、有状态的中间操作

+ *
本环节中是顺序执行的, 但是后续操作可以支持并行流: {@code
+	 * FastStream.iterate(1, i -> i + 1)
+	 * 	.parallel()
+	 * 	// 顺序执行
+	 * 	.takeWhile(e -> e < 50)
+	 * 	// 并发
+	 * 	.map(e -> e + 1)
+	 * 	// 并发
+	 * 	.map(String::valueOf)
+	 * 	.toList();
+	 * }
+ *

但是不建议在并行流中使用, 除非你确定 takeWhile 之后的操作能在并行流中受益很多

+ * + * @param predicate 断言 + * @return 与指定断言匹配的元素组成的流 + */ + default I takeWhile(final Predicate predicate) { + Objects.requireNonNull(predicate); + return wrapping(StreamUtil.takeWhile(stream(), predicate)); + } + + /** + * 删除 与指定断言 匹配的元素, 在第一次不匹配时终止, 返回当前(第一个不匹配元素)及剩余元素组成的新流 + *

与 jdk9 中的 dropWhile 方法不太一样, 这里的实现是个 顺序的、有状态的中间操作

+ *
本环节中是顺序执行的, 但是后续操作可以支持并行流: {@code
+	 * FastStream.iterate(1, i <= 100, i -> i + 1)
+	 * 	.parallel()
+	 * 	// 顺序执行
+	 * 	.dropWhile(e -> e < 50)
+	 * 	// 并发
+	 * 	.map(e -> e + 1)
+	 * 	// 并发
+	 * 	.map(String::valueOf)
+	 * 	.toList();
+	 * }
+ *

但是不建议在并行流中使用, 除非你确定 dropWhile 之后的操作能在并行流中受益很多

+ * + * @param predicate 断言 + * @return 剩余元素组成的流 + */ + default I dropWhile(final Predicate predicate) { + Objects.requireNonNull(predicate); + return wrapping(StreamUtil.dropWhile(stream(), predicate)); + } + + /** + * 将当前流转为另一对象。用于提供针对流本身而非流中元素的操作 + * + * @param 转换类型 + * @param transform 转换 + * @return 转换后的流 + */ + default Optional transform(final Function transform) { + Assert.notNull(transform, "transform must not null"); + return Optional.ofNullable(transform.apply(wrapping(this))); + } + +} diff --git a/hutool-core/src/test/java/cn/hutool/core/stream/EntryStreamTest.java b/hutool-core/src/test/java/cn/hutool/core/stream/EntryStreamTest.java index c841d031f..7e91c0144 100644 --- a/hutool-core/src/test/java/cn/hutool/core/stream/EntryStreamTest.java +++ b/hutool-core/src/test/java/cn/hutool/core/stream/EntryStreamTest.java @@ -6,6 +6,7 @@ import org.junit.Test; import java.util.*; import java.util.function.Function; +import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -327,7 +328,7 @@ public class EntryStreamTest { Map result = EntryStream.of(map).toMap(); Assert.assertEquals(map, result); - result = EntryStream.of(map).toMap(LinkedHashMap::new); + result = EntryStream.of(map).toMap((Supplier>)LinkedHashMap::new); Assert.assertEquals(new LinkedHashMap<>(map), result); result = EntryStream.of(map).toMap(LinkedHashMap::new, (t1, t2) -> t1);