重构增强流:

1.移除SimpleStreamWrapper;
2.将EasyStream中部分方法移动至接口中;
3.调整类名、方法名、变量名与部分注释;
4.为实例方法添加空值校验;
5.补充测试用例;
This commit is contained in:
huangchengxing 2022-09-06 11:21:38 +08:00
parent 2d1255cbff
commit 741f0aa53a
13 changed files with 1787 additions and 870 deletions

View File

@ -1,40 +0,0 @@
package cn.hutool.core.stream;
import java.util.Objects;
import java.util.stream.Stream;
/**
* {@link StreamWrapper}的基本实现用于包装一个已有的流实例
* 使其支持相对原生{@link Stream}更多的中间操作与终端操作
*
* @author huangchengxing
* @see EasyStream
* @see EntryStream
*/
public abstract class AbstractEnhancedStreamWrapper<T, I extends AbstractEnhancedStreamWrapper<T, I>>
implements TerminableStreamWrapper<T, I>, TransformableStreamWrapper<T, I> {
/**
* 原始的流实例
*/
protected final Stream<T> stream;
/**
* 获取被包装的元素流实例
*/
@Override
public Stream<T> stream() {
return stream;
}
/**
* 创建一个流包装器
*
* @param stream 包装的流对象
* @throws NullPointerException {@code stream}{@code null}时抛出
*/
protected AbstractEnhancedStreamWrapper(Stream<T> stream) {
this.stream = Objects.requireNonNull(stream, "stream must not null");
}
}

View File

@ -0,0 +1,80 @@
package cn.hutool.core.stream;
import java.util.Objects;
import java.util.stream.Stream;
/**
* {@link WrappedStream}接口的公共实现用于包装并增强一个已有的流实例
*
* @param <T> 流中的元素类型
* @param <S> {@link AbstractEnhancedWrappedStream}的实现类类型
* @author huangchengxing
* @see EasyStream
* @see EntryStream
* @since 6.0.0
*/
public abstract class AbstractEnhancedWrappedStream<T, S extends AbstractEnhancedWrappedStream<T, S>>
implements TerminableWrappedStream<T, S>, TransformableWrappedStream<T, S> {
/**
* 原始的流实例
*/
protected final Stream<T> stream;
/**
* 获取被包装的元素流实例
*/
@Override
public Stream<T> stream() {
return stream;
}
/**
* 创建一个流包装器
*
* @param stream 包装的流对象
* @throws NullPointerException {@code stream}{@code null}时抛出
*/
protected AbstractEnhancedWrappedStream(Stream<T> stream) {
this.stream = Objects.requireNonNull(stream, "stream must not null");
}
/**
* 获取当前被包装的实例的哈希值
*
* @return 哈希值
*/
@Override
public int hashCode() {
return stream.hashCode();
}
/**
* 比较被包装的实例是否相等
*
* @param obj 对象
* @return 是否相等
*/
@Override
public boolean equals(Object obj) {
return obj instanceof Stream && stream.equals(obj);
}
/**
* 将被包装的实例转为字符串
*
* @return 字符串
*/
@Override
public String toString() {
return stream.toString();
}
/**
* 触发流的执行这是一个终端操作
*/
public void exec() {
stream.forEach(t -> {});
}
}

View File

@ -15,7 +15,9 @@ import java.util.stream.Collectors;
/**
* 可变的汇聚操作{@link Collector} 相关工具封装
*
* @author looly, VampireAchao
* @author looly
* @author VampireAchao
* @author huangchengxing
* @since 5.6.7
*/
public class CollectorUtil {
@ -243,6 +245,20 @@ public class CollectorUtil {
);
}
/**
* 将流转为{@link EntryStream}
*
* @param keyMapper 键的映射方法
* @param <T> 输入元素类型
* @param <K> 元素的键类型
* @return 收集器
* @since 6.0.0
*/
public static <T, K> Collector<T, List<T>, EntryStream<K, T>> toEntryStream(
Function<? super T, ? extends K> keyMapper) {
return toEntryStream(keyMapper, Function.identity());
}
/**
* 将流转为{@link EntryStream}
*
@ -252,6 +268,7 @@ public class CollectorUtil {
* @param <K> 元素的键类型
* @param <V> 元素的值类型
* @return 收集器
* @since 6.0.0
*/
public static <T, K, V> Collector<T, List<T>, EntryStream<K, V>> toEntryStream(
Function<? super T, ? extends K> keyMapper, Function<? super T, ? extends V> valueMapper) {
@ -265,6 +282,7 @@ public class CollectorUtil {
*
* @param <T> 输入元素类型
* @return 收集器
* @since 6.0.0
*/
public static <T> Collector<T, ?, EasyStream<T>> toEasyStream() {
return transform(ArrayList::new, EasyStream::of);
@ -275,7 +293,7 @@ public class CollectorUtil {
* 返回的收集器的效果等同于
* <pre>{@code
* Collection<T> coll = Stream.of(a, b, c, d)
* .collect(Collectors.toCollection(collFactory));
* .collect(Collectors.toColl(collFactory));
* R result = mapper.apply(coll);
* }</pre>
*
@ -285,6 +303,7 @@ public class CollectorUtil {
* @param <T> 输入元素类型
* @param <C> 中间收集输入元素的集合类型
* @return 收集器
* @since 6.0.0
*/
public static <T, R, C extends Collection<T>> Collector<T, C, R> transform(
Supplier<C> collFactory, Function<C, R> mapper) {
@ -308,6 +327,7 @@ public class CollectorUtil {
* @param <R> 返回值类型
* @param <T> 输入元素类型
* @return 收集器
* @since 6.0.0
*/
public static <T, R> Collector<T, List<T>, R> transform(Function<List<T>, R> mapper) {
return transform(ArrayList::new, mapper);

View File

@ -1,21 +1,20 @@
package cn.hutool.core.stream;
import cn.hutool.core.lang.Opt;
import cn.hutool.core.lang.mutable.MutableInt;
import cn.hutool.core.lang.mutable.MutableObj;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.ArrayUtil;
import cn.hutool.core.util.ObjUtil;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Spliterator;
import java.util.function.*;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
/**
* <p>{@link Stream}的扩展实现基于原生Stream进行了封装和增强<br>
* <p>单元素的扩展流实现基于原生Stream进行了封装和增强<br>
* 作者经对比了vavreclipse-collectionstream-ex以及其他语言的api结合日常使用习惯进行封装和拓展
* Stream为集合提供了一些易用api它让开发人员能使用声明式编程的方式去编写代码
*
@ -46,10 +45,11 @@ import java.util.stream.StreamSupport;
*
* @author VampireAchao
* @author emptypoint
* @author huangchengxing
* @see java.util.stream.Stream
* @since 6.0.0
*/
public class EasyStream<T> extends AbstractEnhancedStreamWrapper<T, EasyStream<T>> {
public class EasyStream<T> extends AbstractEnhancedWrappedStream<T, EasyStream<T>> {
/**
* 构造
@ -69,10 +69,10 @@ public class EasyStream<T> extends AbstractEnhancedStreamWrapper<T, EasyStream<T
* @param <T> 元素的类型
* @return a stream builder
*/
public static <T> FastStreamBuilder<T> builder() {
return new FastStreamBuilder<T>() {
public static <T> Builder<T> builder() {
return new Builder<T>() {
private static final long serialVersionUID = 1L;
private final Builder<T> streamBuilder = Stream.builder();
private final Stream.Builder<T> streamBuilder = Stream.builder();
@Override
public void accept(final T t) {
@ -241,20 +241,6 @@ public class EasyStream<T> extends AbstractEnhancedStreamWrapper<T, EasyStream<T
// --------------------------------------------------------------- Static method end
// endregion
/**
* 过滤元素返回与 指定操作结果 匹配 指定值 的元素组成的流
* 这是一个无状态中间操作
*
* @param <R> 返回类型
* @param mapper 操作
* @param value 用来匹配的值
* @return 指定操作结果 匹配 指定值 的元素组成的流
*/
public <R> EasyStream<T> filter(final Function<? super T, ? extends R> mapper, final R value) {
Objects.requireNonNull(mapper);
return filter(e -> Objects.equals(mapper.apply(e), value));
}
/**
* 返回与指定函数将元素作为参数执行的结果组成的流
* 这是一个无状态中间操作
@ -265,185 +251,10 @@ public class EasyStream<T> extends AbstractEnhancedStreamWrapper<T, EasyStream<T
*/
@Override
public <R> EasyStream<R> map(final Function<? super T, ? extends R> mapper) {
Objects.requireNonNull(mapper);
return new EasyStream<>(stream.map(mapper));
}
/**
* 返回 元素 转换后 并且不为 {@code null} 新元素组成的流<br>
* 这是一个无状态中间操作<br>
* <pre>{@code
* // 等价于先调用map再调用nonNull
* .map(...).nonNull()...
* }</pre>
*
* @param mapper 指定的函数
* @param <R> 函数执行后返回的类型
* @return 新元素组成的流
*/
public <R> EasyStream<R> mapNonNull(final Function<? super T, ? extends R> mapper) {
return nonNull().<R>map(mapper).nonNull();
}
/**
* 返回与指定函数将元素作为参数执行的结果组成的流操作带下标并行流时下标永远为-1
* 这是一个无状态中间操作
*
* @param mapper 指定的函数
* @param <R> 函数执行后返回的类型
* @return 返回叠加操作后的流
*/
public <R> EasyStream<R> mapIdx(final BiFunction<? super T, Integer, ? extends R> mapper) {
Objects.requireNonNull(mapper);
if (isParallel()) {
return map(e -> mapper.apply(e, NOT_FOUND_INDEX));
} else {
final MutableInt index = new MutableInt(NOT_FOUND_INDEX);
return map(e -> mapper.apply(e, index.incrementAndGet()));
}
}
/**
* 扩散流操作可能影响流元素个数将原有流元素执行mapper操作返回多个流所有元素组成的流<br>
* 这是一个无状态中间操作<br>
* 例如将users里所有user的id和parentId组合在一起形成一个新的流:
* <pre>{@code
* FastStream<Long> ids = FastStream.of(users).flatMap(user -> FastStream.of(user.getId(), user.getParentId()));
* }</pre>
*
* @param mapper 操作返回流
* @param <R> 拆分后流的元素类型
* @return 返回叠加拆分操作后的流
*/
@Override
public <R> EasyStream<R> flatMap(final Function<? super T, ? extends Stream<? extends R>> mapper) {
return new EasyStream<>(stream.flatMap(mapper));
}
/**
* 扩散流操作可能影响流元素个数将原有流元素执行mapper操作返回多个流所有元素组成的流操作带下标并行流时下标永远为-1
* 这是一个无状态中间操作
*
* @param mapper 操作返回流
* @param <R> 拆分后流的元素类型
* @return 返回叠加拆分操作后的流
*/
public <R> EasyStream<R> flatMapIdx(final BiFunction<? super T, Integer, ? extends Stream<? extends R>> mapper) {
Objects.requireNonNull(mapper);
if (isParallel()) {
return flatMap(e -> mapper.apply(e, NOT_FOUND_INDEX));
} else {
final MutableInt index = new MutableInt(NOT_FOUND_INDEX);
return flatMap(e -> mapper.apply(e, index.incrementAndGet()));
}
}
/**
* 扩散流操作可能影响流元素个数将原有流元素执行mapper操作, 转换为迭代器元素,
* 最后返回所有迭代器的所有元素组成的流<br>
* 这是一个无状态中间操作<br>
* 例如将users里所有user的id和parentId组合在一起形成一个新的流:
* <pre>{@code
* FastStream<Long> ids = FastStream.of(users).flat(user -> FastStream.of(user.getId(), user.getParentId()));
* }</pre>
*
* @param mapper 操作返回可迭代对象
* @param <R> 拆分后流的元素类型
* @return 返回叠加拆分操作后的流
*/
public <R> EasyStream<R> flat(final Function<? super T, ? extends Iterable<? extends R>> mapper) {
Objects.requireNonNull(mapper);
return flatMap(w -> of(mapper.apply(w)));
}
/**
* 扩散流操作可能影响流元素个数对过滤后的非{@code null}元素执行mapper操作转换为迭代器,
* 并过滤迭代器中为{@code null}的元素, 返回所有迭代器的所有非空元素组成的流<br>
* 这是一个无状态中间操作<br>
*
* @param mapper 操作返回流
* @param <R> 拆分后流的元素类型
* @return 返回叠加拆分操作后的流
* @see #flat(Function)
* @see #nonNull()
*/
public <R> EasyStream<R> flatNonNull(final Function<? super T, ? extends Iterable<? extends R>> mapper) {
return nonNull().flat(mapper).nonNull();
}
/**
* 扩散流操作可能影响流元素个数将原有流元素执行mapper操作返回多个流所有元素组成的流操作带一个方法调用该方法可增加元素
* 这是一个无状态中间操作
*
* @param mapper 操作返回流
* @param <R> 拆分后流的元素类型
* @return 返回叠加拆分操作后的流
*/
public <R> EasyStream<R> mapMulti(final BiConsumer<? super T, ? super Consumer<R>> mapper) {
Objects.requireNonNull(mapper);
return flatMap(e -> {
final FastStreamBuilder<R> buffer = EasyStream.builder();
mapper.accept(e, buffer);
return buffer.build();
});
}
/**
* 返回一个具有去重特征的流 非并行流(顺序流)下对于重复元素保留遇到顺序中最先出现的元素并行流情况下不能保证具体保留哪一个
* 这是一个有状态中间操作
*
* @param <F> 参数类型
* @param keyExtractor 去重依据
* @return 一个具有去重特征的流
*/
public <F> EasyStream<T> distinct(final Function<? super T, F> keyExtractor) {
Objects.requireNonNull(keyExtractor);
if (isParallel()) {
final ConcurrentHashMap<F, Boolean> exists = MapUtil.newConcurrentHashMap();
// 标记是否出现过null值用于保留第一个出现的null
// 由于ConcurrentHashMap的key不能为null所以用此变量来标记
final AtomicBoolean hasNull = new AtomicBoolean(false);
return of(stream.filter(e -> {
final F key = keyExtractor.apply(e);
if (key == null) {
// 已经出现过null值跳过该值
if (hasNull.get()) {
return false;
}
hasNull.set(Boolean.TRUE);
return true;
} else {
// 第一次出现的key返回true
return null == exists.putIfAbsent(key, Boolean.TRUE);
}
})).parallel();
} else {
final Set<F> exists = new HashSet<>();
return of(stream.filter(e -> exists.add(keyExtractor.apply(e))));
}
}
/**
* 与给定元素组成的流合并成为新的流
*
* @param obj 元素
* @return
*/
public EasyStream<T> push(final T obj) {
return EasyStream.concat(this.stream, of(obj));
}
/**
* 给定元素组成的流与当前流合并成为新的流
*
* @param obj 元素
* @return
*/
public EasyStream<T> unshift(final T obj) {
return EasyStream.concat(of(obj), this.stream);
}
/**
* 根据一个原始的流返回一个新包装类实例
*
@ -456,16 +267,19 @@ public class EasyStream<T> extends AbstractEnhancedStreamWrapper<T, EasyStream<T
}
/**
* 将集合转换为树默认用 {@code parentId == null} 作为顶部内置一个小递归
* 因为需要在当前传入数据里查找所以这是一个结束操作
* <p>将集合转换为树默认用 {@code parentId == null} 作为顶部内置一个小递归
* 因为需要在当前传入数据里查找所以这是一个结束操作 <br>
*
* @param idGetter id的getter对应的lambda可以写作 {@code Student::getId}
* @param pIdGetter parentId的getter对应的lambda可以写作 {@code Student::getParentId}
* @param childrenSetter children的setter对应的lambda可以写作{ @code Student::setChildren}
* @param <R> 此处是idparentId的泛型限制
* @return list 组装好的树
* @return list 组装好的树 <br>
* eg:
* {@code List studentTree = EasyStream.of(students).toTree(Student::getId, Student::getParentId, Student::setChildren) }
* <pre>{@code
* List<Student> studentTree = EasyStream.of(students).
* toTree(Student::getId, Student::getParentId, Student::setChildren);
* }</pre>
*/
public <R extends Comparable<R>> List<T> toTree(Function<T, R> idGetter,
Function<T, R> pIdGetter,
@ -483,15 +297,19 @@ public class EasyStream<T> extends AbstractEnhancedStreamWrapper<T, EasyStream<T
* @param childrenSetter children的setter对应的lambda可以写作 {@code Student::setChildren}
* @param parentPredicate 树顶部的判断条件可以写作 {@code s -> Objects.equals(s.getParentId(),0L) }
* @param <R> 此处是idparentId的泛型限制
* @return list 组装好的树
* @return list 组装好的树 <br>
* eg:
* {@code List studentTree = EasyStream.of(students).toTree(Student::getId, Student::getParentId, Student::setChildren, Student::getMatchParent) }
* <pre>{@code
* List<Student> studentTree = EasyStream.of(students).
* .toTree(Student::getId, Student::getParentId, Student::setChildren, Student::getMatchParent);
* }</pre>
*/
public <R extends Comparable<R>> List<T> toTree(Function<T, R> idGetter,
Function<T, R> pIdGetter,
BiConsumer<T, List<T>> childrenSetter,
Predicate<T> parentPredicate) {
Objects.requireNonNull(parentPredicate);
List<T> list = toList();
List<T> parents = EasyStream.of(list).filter(e ->
// 此处是为了适配 parentPredicate.test空指针 情况
@ -516,6 +334,9 @@ public class EasyStream<T> extends AbstractEnhancedStreamWrapper<T, EasyStream<T
BiConsumer<T, List<T>> childrenSetter,
Map<R, List<T>> pIdValuesMap,
List<T> parents) {
Objects.requireNonNull(idGetter);
Objects.requireNonNull(childrenSetter);
Objects.requireNonNull(pIdValuesMap);
MutableObj<Consumer<List<T>>> recursiveRef = new MutableObj<>();
Consumer<List<T>> recursive = values -> EasyStream.of(values, isParallel()).forEach(value -> {
List<T> children = pIdValuesMap.get(idGetter.apply(value));
@ -528,89 +349,11 @@ public class EasyStream<T> extends AbstractEnhancedStreamWrapper<T, EasyStream<T
}
/**
* 将树递归扁平化为集合内置一个小递归(没错lambda可以写递归)
* 这是一个无状态中间操作
* 建造者
*
* @param childrenGetter 获取子节点的lambda可以写作 {@code Student::getChildren}
* @param childrenSetter 设置子节点的lambda可以写作 {@code Student::setChildren}
* @return EasyStream 一个流
* eg:
* {@code List students = EasyStream.of(studentTree).flatTree(Student::getChildren, Student::setChildren).toList() }
* @author VampireAchao
*/
public EasyStream<T> flatTree(Function<T, List<T>> childrenGetter, BiConsumer<T, List<T>> childrenSetter) {
MutableObj<Function<T, EasyStream<T>>> recursiveRef = new MutableObj<>();
Function<T, EasyStream<T>> recursive = e -> EasyStream.of(childrenGetter.apply(e)).flat(recursiveRef.get()).unshift(e);
recursiveRef.set(recursive);
return flat(recursive).peek(e -> childrenSetter.accept(e, null));
}
/**
* 现有元素 给定迭代器中对应位置的元素 使用 zipper 转换为新的元素并返回新元素组成的流<br>
* 新流的数量为两个集合中较小的数量, , 只合并下标位置相同的部分<br>
*
* @param other 给定的迭代器
* @param zipper 两个元素的合并器
* @param <U> 给定的迭代对象类型
* @param <R> 合并后的结果对象类型
* @return 合并后的结果对象的流
*/
public <U, R> EasyStream<R> zip(final Iterable<U> other,
final BiFunction<? super T, ? super U, ? extends R> zipper) {
Objects.requireNonNull(zipper);
final Spliterator<T> keys = spliterator();
final Spliterator<U> values = Opt.ofNullable(other).map(Iterable::spliterator).orElseGet(Spliterators::emptySpliterator);
// 获取两个Spliterator的中较小的数量
// 如果Spliterator经过流操作, getExactSizeIfKnown()可能会返回-1, 所以默认大小为 ArrayList.DEFAULT_CAPACITY
final int sizeIfKnown = (int) Math.max(Math.min(keys.getExactSizeIfKnown(), values.getExactSizeIfKnown()), 10);
final List<R> list = new ArrayList<>(sizeIfKnown);
// 保存第一个Spliterator的值
final MutableObj<T> key = new MutableObj<>();
// 保存第二个Spliterator的值
final MutableObj<U> value = new MutableObj<>();
// 当两个Spliterator中都还有剩余元素时
while (keys.tryAdvance(key::set) && values.tryAdvance(value::set)) {
list.add(zipper.apply(key.get(), value.get()));
}
return of(list).parallel(isParallel()).onClose(stream::close);
}
/**
* 按指定长度切分为双层流
* <p>
* 形如[1,2,3,4,5] -&gt; [[1,2], [3,4], [5,6]]
* </p>
*
* @param batchSize 指定长度, 正整数
* @return 切好的流
*/
public EasyStream<EasyStream<T>> split(final int batchSize) {
final List<T> list = toList();
final int size = list.size();
// 指定长度 大于等于 列表长度
if (size <= batchSize) {
// 返回第一层只有单个元素的双层流形如[[1,2,3,4,5]]
return EasyStream.<EasyStream<T>>of(of(list, isParallel()));
}
return iterate(0, i -> i < size, i -> i + batchSize)
.map(skip -> of(list.subList(skip, Math.min(size, skip + batchSize)), isParallel()))
.parallel(isParallel())
.onClose(stream::close);
}
/**
* 按指定长度切分为元素为list的流
* <p>
* 形如[1,2,3,4,5] -&gt; [[1,2], [3,4], [5,6]]
* </p>
*
* @param batchSize 指定长度, 正整数
* @return 切好的流
*/
public EasyStream<List<T>> splitList(final int batchSize) {
return split(batchSize).map(EasyStream::toList);
}
public interface FastStreamBuilder<T> extends Consumer<T>, cn.hutool.core.builder.Builder<EasyStream<T>> {
public interface Builder<T> extends Consumer<T>, cn.hutool.core.builder.Builder<EasyStream<T>> {
/**
* Adds an element to the stream being built.
@ -625,7 +368,7 @@ public class EasyStream<T> extends AbstractEnhancedStreamWrapper<T, EasyStream<T
* return this;
* }</pre>
*/
default FastStreamBuilder<T> add(final T t) {
default Builder<T> add(final T t) {
accept(t);
return this;
}

View File

@ -14,15 +14,16 @@ import java.util.stream.Stream;
import java.util.stream.StreamSupport;
/**
* <p>参考StreamEx的EntryStream与vavr的Map是针对键值对对象{@link Map.Entry}特化的增强流实现<br>
* <p>参考StreamEx的EntryStream与vavr的Map是针对键值对对象{@link Map.Entry}特化的单元素增强流实现<br>
* 本身可视为一个元素类型为{@link Map.Entry}{@link Stream}
* 用于支持流式处理{@link Map}集合中的或其他键值对类型的数据
*
* @param <K> 键类型
* @param <V> 值类型
* @author huangchengxing
* @since 6.0.0
*/
public class EntryStream<K, V> extends AbstractEnhancedStreamWrapper<Map.Entry<K, V>, EntryStream<K, V>> {
public class EntryStream<K, V> extends AbstractEnhancedWrappedStream<Map.Entry<K, V>, EntryStream<K, V>> {
/**
* 默认的空键值对
@ -222,8 +223,7 @@ public class EntryStream<K, V> extends AbstractEnhancedStreamWrapper<Map.Entry<K
*
* @return {@link EntryStream}实例
*/
@Override
public EntryStream<K, V> nonNull() {
public EntryStream<K, V> nonNullKeyValue() {
return super.filter(e -> ObjUtil.isNotNull(e) && ObjUtil.isNotNull(e.getKey()) && ObjUtil.isNotNull(e.getValue()));
}
@ -232,7 +232,7 @@ public class EntryStream<K, V> extends AbstractEnhancedStreamWrapper<Map.Entry<K
*
* @return {@link EntryStream}实例
*/
public EntryStream<K, V> keyNonNull() {
public EntryStream<K, V> nonNullKey() {
return super.filter(e -> ObjUtil.isNotNull(e) && ObjUtil.isNotNull(e.getKey()));
}
@ -241,7 +241,7 @@ public class EntryStream<K, V> extends AbstractEnhancedStreamWrapper<Map.Entry<K
*
* @return {@link EntryStream}实例
*/
public EntryStream<K, V> valueNonNull() {
public EntryStream<K, V> nonNullValue() {
return super.filter(e -> ObjUtil.isNotNull(e) && ObjUtil.isNotNull(e.getValue()));
}
@ -298,7 +298,7 @@ public class EntryStream<K, V> extends AbstractEnhancedStreamWrapper<Map.Entry<K
* @param value
* @return {@link EntryStream}实例
*/
public EntryStream<K, V> append(K key, V value) {
public EntryStream<K, V> push(K key, V value) {
return wrapping(Stream.concat(stream, Stream.of(ofEntry(key, value))));
}
@ -309,7 +309,7 @@ public class EntryStream<K, V> extends AbstractEnhancedStreamWrapper<Map.Entry<K
* @param value
* @return {@link EntryStream}实例
*/
public EntryStream<K, V> prepend(K key, V value) {
public EntryStream<K, V> unshift(K key, V value) {
return wrapping(Stream.concat(Stream.of(ofEntry(key, value)), stream));
}
@ -319,6 +319,7 @@ public class EntryStream<K, V> extends AbstractEnhancedStreamWrapper<Map.Entry<K
* @param entries 键值对
* @return {@link EntryStream}实例
*/
@Override
public EntryStream<K, V> append(Iterable<? extends Map.Entry<K, V>> entries) {
if (IterUtil.isEmpty(entries)) {
return this;
@ -334,6 +335,7 @@ public class EntryStream<K, V> extends AbstractEnhancedStreamWrapper<Map.Entry<K
* @param entries 键值对
* @return {@link EntryStream}实例
*/
@Override
public EntryStream<K, V> prepend(Iterable<? extends Map.Entry<K, V>> entries) {
if (IterUtil.isEmpty(entries)) {
return this;
@ -724,7 +726,7 @@ public class EntryStream<K, V> extends AbstractEnhancedStreamWrapper<Map.Entry<K
* 将键值对转为{@link AbstractMap.SimpleImmutableEntry}
*/
@SuppressWarnings("unchecked")
private static <K, V> Map.Entry<K, V> ofEntry(Map.Entry<K, V> entry) {
static <K, V> Map.Entry<K, V> ofEntry(Map.Entry<K, V> entry) {
return ObjUtil.defaultIfNull(
entry, e -> ofEntry(e.getKey(), e.getValue()), (Map.Entry<K, V>)EMPTY_ENTRY
);
@ -733,7 +735,7 @@ public class EntryStream<K, V> extends AbstractEnhancedStreamWrapper<Map.Entry<K
/**
* 将键值对转为{@link AbstractMap.SimpleImmutableEntry}
*/
private static <K, V> Map.Entry<K, V> ofEntry(K key, V value) {
static <K, V> Map.Entry<K, V> ofEntry(K key, V value) {
return new AbstractMap.SimpleImmutableEntry<>(key, value);
}

View File

@ -1,55 +0,0 @@
package cn.hutool.core.stream;
import java.util.function.Function;
import java.util.stream.Stream;
/**
* {@link AbstractEnhancedStreamWrapper}的基本实现
*
* @author huangchengxing
*/
public class SimpleStreamWrapper<T> extends AbstractEnhancedStreamWrapper<T, SimpleStreamWrapper<T>> {
/**
* 创建一个流包装器
*
* @param stream 包装的流对象
* @throws NullPointerException {@code stream}{@code null}时抛出
*/
SimpleStreamWrapper(Stream<T> stream) {
super(stream);
}
/**
* 将一个流包装为{@link SimpleStreamWrapper}
*
* @param source 被包装的流
* @return 包装后的流
*/
@Override
public SimpleStreamWrapper<T> wrapping(Stream<T> source) {
return new SimpleStreamWrapper<>(source);
}
/**
* 将当前流中元素映射为另一类型
*
* @param mapper 映射方法
* @return 映射后的流
*/
@Override
public <R> SimpleStreamWrapper<R> map(Function<? super T, ? extends R> mapper) {
return new SimpleStreamWrapper<>(stream().map(mapper));
}
/**
* 将当前流中元素展开为流然后返回由这些新流中元素组成的流
*
* @param mapper 映射方法
* @return 映射后的流
*/
@Override
public <R> SimpleStreamWrapper<R> flatMap(Function<? super T, ? extends Stream<? extends R>> mapper) {
return new SimpleStreamWrapper<>(stream().flatMap(mapper));
}
}

View File

@ -9,70 +9,26 @@ import java.util.*;
import java.util.function.*;
import java.util.stream.Collector;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* {@link StreamWrapper}的扩展为实现类提供更多终端操作方法
* {@link WrappedStream}的扩展用于为实现类提供更多终端操作方法的增强接口
* 该接口提供的方法返回值类型都不为{@link Stream}
*
* @param <T> 流中的元素类型
* @param <I> 链式调用获得的实现类类型
* @param <S> {@link TerminableWrappedStream}的实现类类型
* @author huangchengxing
* @since 6.0.0
*/
public interface TerminableStreamWrapper<T, I extends TerminableStreamWrapper<T, I>> extends StreamWrapper<T, I> {
// region ============ join ============
/**
* 返回拼接后的字符串
*
* @return 拼接后的字符串
*/
default String join() {
return this.join("");
}
/**
* 返回拼接后的字符串
*
* @param delimiter 分隔符
* @return 拼接后的字符串
*/
default String join(CharSequence delimiter) {
return this.join(delimiter, "", "");
}
/**
* 返回拼接后的字符串
*
* @param delimiter 分隔符
* @param prefix 前缀
* @param suffix 后缀
* @return 拼接后的字符串
*/
default String join(CharSequence delimiter,
CharSequence prefix,
CharSequence suffix) {
return stream().map(String::valueOf).collect(Collectors.joining(delimiter, prefix, suffix));
}
// endregion
public interface TerminableWrappedStream<T, S extends TerminableWrappedStream<T, S>> extends WrappedStream<T, S> {
// region ============ to collection ============
/**
* 转换成集合
*
* @param collectionFactory 集合工厂(可以是集合构造器)
* @param <C> 集合类型
* @return 集合
*/
default <C extends Collection<T>> C toColl(Supplier<C> collectionFactory) {
return stream().collect(Collectors.toCollection(collectionFactory));
}
/**
* 转换为{@link ArrayList}
*
* @return 集合
* @see #toColl(Supplier)
*/
default List<T> toList() {
return this.toColl(ArrayList::new);
@ -82,7 +38,8 @@ public interface TerminableStreamWrapper<T, I extends TerminableStreamWrapper<T,
* 换为不可变集合
*
* @return 集合
*/
* @see #toColl(Supplier)
*/
default List<T> toUnmodifiableList() {
return Collections.unmodifiableList(this.toList());
}
@ -91,7 +48,8 @@ public interface TerminableStreamWrapper<T, I extends TerminableStreamWrapper<T,
* 转换为HashSet
*
* @return 集合
*/
* @see #toColl(Supplier)
*/
default Set<T> toSet() {
return this.toColl(HashSet::new);
}
@ -100,11 +58,24 @@ public interface TerminableStreamWrapper<T, I extends TerminableStreamWrapper<T,
* 换为不可变集合
*
* @return 集合
*/
* @see #toColl(Supplier)
*/
default Set<T> toUnmodifiableSet() {
return Collections.unmodifiableSet(this.toSet());
}
/**
* 转换成集合
*
* @param collectionFactory 集合工厂(可以是集合构造器)
* @param <C> 集合类型
* @return 集合
*/
default <C extends Collection<T>> C toColl(Supplier<C> collectionFactory) {
Objects.requireNonNull(collectionFactory);
return stream().collect(Collectors.toCollection(collectionFactory));
}
// endregion
// region ============ to map ============
@ -115,6 +86,7 @@ public interface TerminableStreamWrapper<T, I extends TerminableStreamWrapper<T,
* @param keyMapper 指定的key操作
* @param <K> key类型
* @return map
* @see #toMap(Function, Function, BinaryOperator, Supplier)
*/
default <K> Map<K, T> toMap(Function<? super T, ? extends K> keyMapper) {
return this.toMap(keyMapper, Function.identity());
@ -128,6 +100,7 @@ public interface TerminableStreamWrapper<T, I extends TerminableStreamWrapper<T,
* @param <K> key类型
* @param <U> value类型
* @return map
* @see #toMap(Function, Function, BinaryOperator, Supplier)
*/
default <K, U> Map<K, U> toMap(
Function<? super T, ? extends K> keyMapper, Function<? super T, ? extends U> valueMapper) {
@ -142,6 +115,7 @@ public interface TerminableStreamWrapper<T, I extends TerminableStreamWrapper<T,
* @param <K> key类型
* @param <U> value类型
* @return map
* @see #toMap(Function, Function, BinaryOperator, Supplier)
*/
default <K, U> Map<K, U> toUnmodifiableMap(
Function<? super T, ? extends K> keyMapper, Function<? super T, ? extends U> valueMapper) {
@ -157,6 +131,7 @@ public interface TerminableStreamWrapper<T, I extends TerminableStreamWrapper<T,
* @param <K> key类型
* @param <U> value类型
* @return map
* @see #toMap(Function, Function, BinaryOperator, Supplier)
*/
default <K, U> Map<K, U> toMap(
Function<? super T, ? extends K> keyMapper,
@ -174,6 +149,7 @@ public interface TerminableStreamWrapper<T, I extends TerminableStreamWrapper<T,
* @param <K> key类型
* @param <U> value类型
* @return map
* @see #toMap(Function, Function, BinaryOperator, Supplier)
*/
default <K, U> Map<K, U> toUnmodifiableMap(
Function<? super T, ? extends K> keyMapper,
@ -201,6 +177,10 @@ public interface TerminableStreamWrapper<T, I extends TerminableStreamWrapper<T,
Function<? super T, ? extends U> valueMapper,
BinaryOperator<U> mergeFunction,
Supplier<M> mapSupplier) {
Objects.requireNonNull(keyMapper);
Objects.requireNonNull(valueMapper);
Objects.requireNonNull(mergeFunction);
Objects.requireNonNull(mapSupplier);
return stream().collect(Collectors.toMap(keyMapper, valueMapper, mergeFunction, mapSupplier));
}
@ -219,6 +199,7 @@ public interface TerminableStreamWrapper<T, I extends TerminableStreamWrapper<T,
* 如果key重复, 则保留最后一个关联的value;<br>
*/
default <R> Map<T, R> toZip(Iterable<R> other) {
Objects.requireNonNull(other);
// value对象迭代器
final Iterator<R> iterator = Opt.ofNullable(other).map(Iterable::iterator).orElseGet(Collections::emptyIterator);
if (this.isParallel()) {
@ -235,129 +216,20 @@ public interface TerminableStreamWrapper<T, I extends TerminableStreamWrapper<T,
// endregion
// region ============ group ============
/**
* 通过给定分组依据进行分组
*
* @param classifier 分组依据得到的键为{@code null}时不会抛出异常
* @param <K> 实体中的分组依据对应类型也是Map中key的类型
* @return map
*/
default <K> Map<K, List<T>> group(Function<? super T, ? extends K> classifier) {
return this.group(classifier, Collectors.toList());
}
/**
* 通过给定分组依据进行分组
*
* @param classifier 分组依据得到的键为{@code null}时不会抛出异常
* @param downstream 下游操作
* @param <K> 实体中的分组依据对应类型也是Map中key的类型
* @param <D> 下游操作对应返回类型也是Map中value的类型
* @param <A> 下游操作在进行中间操作时对应类型
* @return map
*/
default <K, A, D> Map<K, D> group(
Function<? super T, ? extends K> classifier, Collector<? super T, A, D> downstream) {
return this.group(classifier, HashMap::new, downstream);
}
/**
* 通过给定分组依据进行分组
*
* @param classifier 分组依据得到的键为{@code null}时不会抛出异常
* @param mapFactory 提供的map
* @param downstream 下游操作
* @param <K> 实体中的分组依据对应类型也是Map中key的类型
* @param <D> 下游操作对应返回类型也是Map中value的类型
* @param <A> 下游操作在进行中间操作时对应类型
* @param <M> 最后返回结果Map类型
* @return map
* @see CollectorUtil#groupingBy(Function, Supplier, Collector)
*/
default <K, D, A, M extends Map<K, D>> M group(
Function<? super T, ? extends K> classifier,
Supplier<M> mapFactory,
Collector<? super T, A, D> downstream) {
return stream().collect(CollectorUtil.groupingBy(classifier, mapFactory, downstream));
}
/**
* 根据给定判断条件分组
*
* @param predicate 判断条件
* @return map
*/
default Map<Boolean, List<T>> partition(Predicate<T> predicate) {
return this.partition(predicate, ArrayList::new);
}
/**
* 根据给定判断条件分组
*
* @param predicate 判断条件
* @param collFactory 提供的集合
* @return map
*/
default <C extends Collection<T>> Map<Boolean, C> partition(Predicate<T> predicate, Supplier<C> collFactory) {
return this.partition(predicate, Collectors.toCollection(collFactory));
}
/**
* 根据给定判断条件分组
*
* @param predicate 判断条件
* @param downstream 下游操作
* @param <R> 返回值类型
* @return map
*/
default <R> Map<Boolean, R> partition(Predicate<T> predicate, Collector<T, ?, R> downstream) {
return stream().collect(Collectors.partitioningBy(predicate, downstream));
}
// endregion
// region ============ foreach ============
// region ============ to optional ============
/**
* 对流里面的每一个元素执行一个操作操作带下标并行流时下标永远为-1
* 这是一个终端操作
* 将当前流转为另一对象用于提供针对流本身而非流中元素的操作
*
* @param action 操作
* @param <R> 转换类型
* @param transform 转换
* @return 转换后的流
*/
default void forEachIdx(final BiConsumer<? super T, Integer> action) {
Objects.requireNonNull(action);
if (isParallel()) {
stream().forEach(e -> action.accept(e, NOT_FOUND_INDEX));
} else {
final MutableInt index = new MutableInt(NOT_FOUND_INDEX);
stream().forEach(e -> action.accept(e, index.incrementAndGet()));
}
default <R> Optional<R> transform(final Function<? super S, R> transform) {
Objects.requireNonNull(transform);
return Optional.ofNullable(transform.apply(wrapping(this)));
}
/**
* 对流里面的每一个元素按照顺序执行一个操作操作带下标并行流时下标永远为-1
* 这是一个终端操作
*
* @param action 操作
*/
default void forEachOrderedIdx(final BiConsumer<? super T, Integer> action) {
Objects.requireNonNull(action);
if (isParallel()) {
stream().forEachOrdered(e -> action.accept(e, NOT_FOUND_INDEX));
} else {
final MutableInt index = new MutableInt(NOT_FOUND_INDEX);
stream().forEachOrdered(e -> action.accept(e, index.incrementAndGet()));
}
}
// endregion
// region ============ find & get ============
/**
* 获取与给定断言匹配的第一个元素
*
@ -365,6 +237,7 @@ public interface TerminableStreamWrapper<T, I extends TerminableStreamWrapper<T,
* @return 与给定断言匹配的第一个元素
*/
default Optional<T> findFirst(final Predicate<? super T> predicate) {
Objects.requireNonNull(predicate);
return stream().filter(predicate).findFirst();
}
@ -377,9 +250,9 @@ public interface TerminableStreamWrapper<T, I extends TerminableStreamWrapper<T,
default int findFirstIdx(final Predicate<? super T> predicate) {
Objects.requireNonNull(predicate);
if (isParallel()) {
return NOT_FOUND_INDEX;
return NOT_FOUND_ELEMENT_INDEX;
} else {
final MutableInt index = new MutableInt(NOT_FOUND_INDEX);
final MutableInt index = new MutableInt(NOT_FOUND_ELEMENT_INDEX);
stream().filter(e -> {
index.increment();
return predicate.test(e);
@ -425,9 +298,9 @@ public interface TerminableStreamWrapper<T, I extends TerminableStreamWrapper<T,
default int findLastIdx(final Predicate<? super T> predicate) {
Objects.requireNonNull(predicate);
if (isParallel()) {
return NOT_FOUND_INDEX;
return NOT_FOUND_ELEMENT_INDEX;
} else {
final MutableInt idxRef = new MutableInt(NOT_FOUND_INDEX);
final MutableInt idxRef = new MutableInt(NOT_FOUND_ELEMENT_INDEX);
forEachIdx((e, i) -> {
if (predicate.test(e)) {
idxRef.set(i);
@ -450,7 +323,7 @@ public interface TerminableStreamWrapper<T, I extends TerminableStreamWrapper<T,
// endregion
// region ============ is ============
// region ============ to boolean ============
/**
* 流是否为空
@ -472,4 +345,169 @@ public interface TerminableStreamWrapper<T, I extends TerminableStreamWrapper<T,
// endregion
// region ============ join ============
/**
* 返回拼接后的字符串
*
* @return 拼接后的字符串
* @see #join(CharSequence, CharSequence, CharSequence)
*/
default String join() {
return this.join("");
}
/**
* 返回拼接后的字符串
*
* @param delimiter 分隔符
* @return 拼接后的字符串
* @see #join(CharSequence, CharSequence, CharSequence)
*/
default String join(CharSequence delimiter) {
return this.join(delimiter, "", "");
}
/**
* 返回拼接后的字符串
*
* @param delimiter 分隔符
* @param prefix 前缀
* @param suffix 后缀
* @return 拼接后的字符串
*/
default String join(CharSequence delimiter, CharSequence prefix, CharSequence suffix) {
return stream().map(String::valueOf).collect(Collectors.joining(delimiter, prefix, suffix));
}
// endregion
// region ============ group ============
/**
* 通过给定分组依据进行分组
*
* @param classifier 分组依据得到的键为{@code null}时不会抛出异常
* @param <K> 实体中的分组依据对应类型也是Map中key的类型
* @return map
* @see #group(Function, Supplier, Collector)
*/
default <K> Map<K, List<T>> group(Function<? super T, ? extends K> classifier) {
return this.group(classifier, Collectors.toList());
}
/**
* 通过给定分组依据进行分组
*
* @param classifier 分组依据得到的键为{@code null}时不会抛出异常
* @param downstream 下游操作
* @param <K> 实体中的分组依据对应类型也是Map中key的类型
* @param <D> 下游操作对应返回类型也是Map中value的类型
* @param <A> 下游操作在进行中间操作时对应类型
* @return map
* @see #group(Function, Supplier, Collector)
*/
default <K, A, D> Map<K, D> group(
Function<? super T, ? extends K> classifier, Collector<? super T, A, D> downstream) {
return this.group(classifier, HashMap::new, downstream);
}
/**
* 通过给定分组依据进行分组
*
* @param classifier 分组依据得到的键为{@code null}时不会抛出异常
* @param mapFactory 提供的map
* @param downstream 下游操作
* @param <K> 实体中的分组依据对应类型也是Map中key的类型
* @param <D> 下游操作对应返回类型也是Map中value的类型
* @param <A> 下游操作在进行中间操作时对应类型
* @param <M> 最后返回结果Map类型
* @return map
* @see CollectorUtil#groupingBy(Function, Supplier, Collector)
*/
default <K, D, A, M extends Map<K, D>> M group(
Function<? super T, ? extends K> classifier,
Supplier<M> mapFactory,
Collector<? super T, A, D> downstream) {
Objects.requireNonNull(classifier);
Objects.requireNonNull(mapFactory);
Objects.requireNonNull(downstream);
return stream().collect(CollectorUtil.groupingBy(classifier, mapFactory, downstream));
}
/**
* 根据给定判断条件分组
*
* @param predicate 判断条件
* @return map
* @see #partitioning(Predicate, Collector)
*/
default Map<Boolean, List<T>> partitioning(Predicate<T> predicate) {
return this.partitioning(predicate, ArrayList::new);
}
/**
* 根据给定判断条件分组
*
* @param predicate 判断条件
* @param collFactory 提供的集合
* @return map
* @see #partitioning(Predicate, Collector)
*/
default <C extends Collection<T>> Map<Boolean, C> partitioning(Predicate<T> predicate, Supplier<C> collFactory) {
return this.partitioning(predicate, Collectors.toCollection(collFactory));
}
/**
* 根据给定判断条件分组
*
* @param predicate 判断条件
* @param downstream 下游操作
* @param <R> 返回值类型
* @return map
*/
default <R> Map<Boolean, R> partitioning(Predicate<T> predicate, Collector<T, ?, R> downstream) {
Objects.requireNonNull(predicate);
Objects.requireNonNull(downstream);
return stream().collect(Collectors.partitioningBy(predicate, downstream));
}
// endregion
// region ============ foreach ============
/**
* 对流里面的每一个元素执行一个操作操作带下标并行流时下标永远为-1
* 这是一个终端操作
*
* @param action 操作
*/
default void forEachIdx(final BiConsumer<? super T, Integer> action) {
Objects.requireNonNull(action);
if (isParallel()) {
stream().forEach(e -> action.accept(e, NOT_FOUND_ELEMENT_INDEX));
} else {
final MutableInt index = new MutableInt(NOT_FOUND_ELEMENT_INDEX);
stream().forEach(e -> action.accept(e, index.incrementAndGet()));
}
}
/**
* 对流里面的每一个元素按照顺序执行一个操作操作带下标并行流时下标永远为-1
* 这是一个终端操作
*
* @param action 操作
*/
default void forEachOrderedIdx(final BiConsumer<? super T, Integer> action) {
Objects.requireNonNull(action);
if (isParallel()) {
stream().forEachOrdered(e -> action.accept(e, NOT_FOUND_ELEMENT_INDEX));
} else {
final MutableInt index = new MutableInt(NOT_FOUND_ELEMENT_INDEX);
stream().forEachOrdered(e -> action.accept(e, index.incrementAndGet()));
}
}
// endregion
}

View File

@ -1,216 +0,0 @@
package cn.hutool.core.stream;
import cn.hutool.core.collection.ListUtil;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.lang.Console;
import cn.hutool.core.lang.mutable.MutableInt;
import cn.hutool.core.util.ArrayUtil;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.BiPredicate;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* {@link StreamWrapper}的扩展为实现类提供更多中间操作方法
*
* @param <T> 流中的元素类型
* @param <I> 链式调用获得的实现类类型
* @author huangchengxing
*/
public interface TransformableStreamWrapper<T, I extends TransformableStreamWrapper<T, I>> extends StreamWrapper<T, I> {
/**
* 过滤元素返回与指定断言匹配的元素组成的流断言带下标并行流时下标永远为-1
* 这是一个无状态中间操作
*
* @param predicate 断言
* @return 返回叠加过滤操作后的流
*/
default I filterIdx(final BiPredicate<? super T, Integer> predicate) {
Objects.requireNonNull(predicate);
if (isParallel()) {
return filter(e -> predicate.test(e, NOT_FOUND_INDEX));
} else {
final MutableInt index = new MutableInt(NOT_FOUND_INDEX);
return filter(e -> predicate.test(e, index.incrementAndGet()));
}
}
/**
* 过滤掉空元素
*
* @return 过滤后的流
*/
default I nonNull() {
return filter(Objects::nonNull);
}
/**
* 返回与指定函数将元素作为参数执行后组成的流操作带下标并行流时下标永远为-1
* 这是一个无状态中间操作
* @param action 指定的函数
* @return 返回叠加操作后的FastStream
* @apiNote 该方法存在的意义主要是用来调试
* 当你需要查看经过操作管道某处的元素和下标可以执行以下操作:
* <pre>{@code
* Stream.of("one", "two", "three", "four")
* .filter(e -> e.length() > 3)
* .peekIdx((e,i) -> System.out.println("Filtered value: " + e + " Filtered idx:" + i))
* .map(String::toUpperCase)
* .peekIdx((e,i) -> System.out.println("Mapped value: " + e + " Mapped idx:" + i))
* .collect(Collectors.toList());
* }</pre>
*/
default I peekIdx(BiConsumer<? super T, Integer> action) {
Objects.requireNonNull(action);
if (isParallel()) {
return peek(e -> action.accept(e, NOT_FOUND_INDEX));
} else {
AtomicInteger index = new AtomicInteger(NOT_FOUND_INDEX);
return peek(e -> action.accept(e, index.incrementAndGet()));
}
}
/**
* 返回叠加调用{@link Console#log(Object)}打印出结果的流
*
* @return 返回叠加操作后的FastStream
*/
default I log() {
return peek(Console::log);
}
/**
* 反转顺序
*
* @return 反转元素顺序
*/
@SuppressWarnings("unchecked")
default I reverse() {
final T[] array = (T[]) toArray();
ArrayUtil.reverse(array);
return wrapping(Stream.of(array)).parallel(isParallel());
}
/**
* 更改流的并行状态
*
* @param parallel 是否并行
* @return
*/
default I parallel(final boolean parallel) {
return parallel ? parallel() : sequential();
}
/**
* 与给定元素组成的流合并成为新的流
*
* @param obj 元素
* @return
*/
@SuppressWarnings("unchecked")
default I push(final T... obj) {
Stream<T> result = stream();
if (ArrayUtil.isNotEmpty(obj)) {
result = Stream.concat(stream(), Stream.of(obj));
}
return wrapping(result);
}
/**
* 给定元素组成的流与当前流合并成为新的流
*
* @param obj 元素
* @return
*/
default I unshift(final T... obj) {
Stream<T> result = stream();
if (ArrayUtil.isNotEmpty(obj)) {
result = Stream.concat(Stream.of(obj), stream());
}
return wrapping(result);
}
/**
* 通过删除或替换现有元素或者原地添加新的元素来修改列表并以列表形式返回被修改的内容此方法不会改变原列表
* 类似js的<a href="https://developer.mozilla.org/zh-CN/docs/Web/JavaScript/Reference/Global_Objects/Array/splice">splice</a>函数
*
* @param start 起始下标
* @param deleteCount 删除个数正整数
* @param items 放入值
* @return 操作后的流
*/
default I splice(final int start, final int deleteCount, final T... items) {
final List<T> elements = stream().collect(Collectors.toList());
return wrapping(ListUtil.splice(elements, start, deleteCount, items).stream())
.parallel(isParallel());
}
/**
* 保留 与指定断言 匹配时的元素, 在第一次不匹配时终止, 抛弃当前(第一个不匹配元素)及后续所有元素
* <p> jdk9 中的 takeWhile 方法不太一样, 这里的实现是个 顺序的有状态的中间操作</p>
* <pre>本环节中是顺序执行的, 但是后续操作可以支持并行流: {@code
* FastStream.iterate(1, i -> i + 1)
* .parallel()
* // 顺序执行
* .takeWhile(e -> e < 50)
* // 并发
* .map(e -> e + 1)
* // 并发
* .map(String::valueOf)
* .toList();
* }</pre>
* <p>但是不建议在并行流中使用, 除非你确定 takeWhile 之后的操作能在并行流中受益很多</p>
*
* @param predicate 断言
* @return 与指定断言匹配的元素组成的流
*/
default I takeWhile(final Predicate<? super T> predicate) {
Objects.requireNonNull(predicate);
return wrapping(StreamUtil.takeWhile(stream(), predicate));
}
/**
* 删除 与指定断言 匹配的元素, 在第一次不匹配时终止, 返回当前(第一个不匹配元素)及剩余元素组成的新流
* <p> jdk9 中的 dropWhile 方法不太一样, 这里的实现是个 顺序的有状态的中间操作</p>
* <pre>本环节中是顺序执行的, 但是后续操作可以支持并行流: {@code
* FastStream.iterate(1, i <= 100, i -> i + 1)
* .parallel()
* // 顺序执行
* .dropWhile(e -> e < 50)
* // 并发
* .map(e -> e + 1)
* // 并发
* .map(String::valueOf)
* .toList();
* }</pre>
* <p>但是不建议在并行流中使用, 除非你确定 dropWhile 之后的操作能在并行流中受益很多</p>
*
* @param predicate 断言
* @return 剩余元素组成的流
*/
default I dropWhile(final Predicate<? super T> predicate) {
Objects.requireNonNull(predicate);
return wrapping(StreamUtil.dropWhile(stream(), predicate));
}
/**
* 将当前流转为另一对象用于提供针对流本身而非流中元素的操作
*
* @param <R> 转换类型
* @param transform 转换
* @return 转换后的流
*/
default <R> Optional<R> transform(final Function<? super I, R> transform) {
Assert.notNull(transform, "transform must not null");
return Optional.ofNullable(transform.apply(wrapping(this)));
}
}

View File

@ -0,0 +1,570 @@
package cn.hutool.core.stream;
import cn.hutool.core.collection.ListUtil;
import cn.hutool.core.collection.iter.IterUtil;
import cn.hutool.core.lang.Console;
import cn.hutool.core.lang.Opt;
import cn.hutool.core.lang.mutable.MutableInt;
import cn.hutool.core.lang.mutable.MutableObj;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.ArrayUtil;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
/**
* {@link WrappedStream}的扩展用于为实现类提供更多中间操作方法的增强接口
* 该接口提供的方法返回值类型都为{@link Stream}
*
* @param <T> 流中的元素类型
* @param <S> {@link TransformableWrappedStream}的实现类类型
* @author huangchengxing
* @since 6.0.0
*/
public interface TransformableWrappedStream<T, S extends TransformableWrappedStream<T, S>> extends WrappedStream<T, S> {
/**
* 现有元素 给定迭代器中对应位置的元素 使用 zipper 转换为新的元素并返回新元素组成的流<br>
* 新流的数量为两个集合中较小的数量, , 只合并下标位置相同的部分<br>
*
* @param other 给定的迭代器
* @param zipper 两个元素的合并器
* @param <U> 给定的迭代对象类型
* @param <R> 合并后的结果对象类型
* @return 合并后的结果对象的流
*/
default <U, R> EasyStream<R> zip(final Iterable<U> other,
final BiFunction<? super T, ? super U, ? extends R> zipper) {
Objects.requireNonNull(zipper);
final Spliterator<T> keys = spliterator();
final Spliterator<U> values = Opt.ofNullable(other).map(Iterable::spliterator).orElseGet(Spliterators::emptySpliterator);
// 获取两个Spliterator的中较小的数量
// 如果Spliterator经过流操作, getExactSizeIfKnown()可能会返回-1, 所以默认大小为 ArrayList.DEFAULT_CAPACITY
final int sizeIfKnown = (int) Math.max(Math.min(keys.getExactSizeIfKnown(), values.getExactSizeIfKnown()), 10);
final List<R> list = new ArrayList<>(sizeIfKnown);
// 保存第一个Spliterator的值
final MutableObj<T> key = new MutableObj<>();
// 保存第二个Spliterator的值
final MutableObj<U> value = new MutableObj<>();
// 当两个Spliterator中都还有剩余元素时
while (keys.tryAdvance(key::set) && values.tryAdvance(value::set)) {
list.add(zipper.apply(key.get(), value.get()));
}
return EasyStream.of(list).parallel(isParallel()).onClose(stream()::close);
}
/**
* 按指定长度切分为双层流
* <p>
* 形如[1,2,3,4,5] -&gt; [[1,2], [3,4], [5,6]]
* </p>
*
* @param batchSize 指定长度, 正整数
* @return 切好的流
*/
default EasyStream<EasyStream<T>> split(final int batchSize) {
final List<T> list = this.collect(Collectors.toList());
final int size = list.size();
// 指定长度 大于等于 列表长度
if (size <= batchSize) {
// 返回第一层只有单个元素的双层流形如[[1,2,3,4,5]]
return EasyStream.<EasyStream<T>>of(EasyStream.of(list, isParallel()));
}
return EasyStream.iterate(0, i -> i < size, i -> i + batchSize)
.map(skip -> EasyStream.of(list.subList(skip, Math.min(size, skip + batchSize)), isParallel()))
.parallel(isParallel())
.onClose(stream()::close);
}
/**
* 按指定长度切分为元素为list的流
* <p>
* 形如[1,2,3,4,5] -&gt; [[1,2], [3,4], [5,6]]
* </p>
*
* @param batchSize 指定长度, 正整数
* @return 切好的流
*/
default EasyStream<List<T>> splitList(final int batchSize) {
return split(batchSize).map(EasyStream::toList);
}
/**
* 将当前流转为键值对流
*
* @param keyMapper 键的映射方法
* @param valueMapper 值的映射方法
* @param <K> 键类型
* @param <V> 值类型
* @return {@link EntryStream}实例
*/
default <K, V> EntryStream<K, V> toEntries(Function<T, K> keyMapper, Function<T, V> valueMapper) {
Objects.requireNonNull(keyMapper);
Objects.requireNonNull(valueMapper);
return new EntryStream<>(map(t -> EntryStream.ofEntry(keyMapper.apply(t), valueMapper.apply(t))));
}
/**
* 将当前流转为键值对流
*
* @param keyMapper 键的映射方法
* @param <K> 键类型
* @return {@link EntryStream}实例
*/
default <K> EntryStream<K, T> toEntries(Function<T, K> keyMapper) {
return toEntries(keyMapper, Function.identity());
}
// region ============ generic ============
/**
* 反转顺序
*
* @return 反转元素顺序
*/
@SuppressWarnings("unchecked")
default S reverse() {
final T[] array = (T[]) toArray();
ArrayUtil.reverse(array);
return wrapping(Stream.of(array)).parallel(isParallel());
}
/**
* 更改流的并行状态
*
* @param parallel 是否并行
* @return
*/
default S parallel(final boolean parallel) {
return parallel ? parallel() : sequential();
}
/**
* 通过删除或替换现有元素或者原地添加新的元素来修改列表并以列表形式返回被修改的内容此方法不会改变原列表
* 类似js的<a href="https://developer.mozilla.org/zh-CN/docs/Web/JavaScript/Reference/Global_Objects/Array/splice">splice</a>函数
*
* @param start 起始下标
* @param deleteCount 删除个数正整数
* @param items 放入值
* @return 操作后的流
*/
default S splice(final int start, final int deleteCount, final T... items) {
final List<T> elements = stream().collect(Collectors.toList());
return wrapping(ListUtil.splice(elements, start, deleteCount, items).stream())
.parallel(isParallel());
}
/**
* <p>遍历流中与断言匹配的元素当遇到第一个不匹配的元素时终止返回由匹配的元素组成的流<br>
* eg:
* <pre>{@code
* EasyStream.of(1, 2, 3, 4, 5)
* .takeWhile(i -> Objects.equals(3, i)) // 获取元素一直到遇到第一个3为止
* .toList(); // = [1, 2]
* }</pre>
*
* <p>{@code JDK9}中的{@code takeWhile}方法不太一样此操作为顺序且有状态的中间操作
* 即使在并行流中该操作仍然是顺序执行的并且不影响后续的并行操作
* <pre>{@code
* EasyStream.iterate(1, i -> i + 1)
* .parallel()
* .takeWhile(e -> e < 50) // 顺序执行
* .map(e -> e + 1) // 并发
* .map(String::valueOf) // 并发
* .toList();
* }</pre>
* 若非必要不推荐在并行流中进行该操作
*
* @param predicate 断言
* @return 与指定断言匹配的元素组成的流
*/
default S takeWhile(final Predicate<? super T> predicate) {
Objects.requireNonNull(predicate);
return wrapping(StreamUtil.takeWhile(stream(), predicate));
}
/**
* <<p>删除流中与断言匹配的元素当遇到第一个不匹配的元素时终止返回由剩余不匹配的元素组成的流<br>
* eg:
* <pre>{@code
* EasyStream.of(1, 2, 3, 4, 5)
* .dropWhile(i -> !Objects.equals(3, i)) // 删除不为3的元素一直到遇到第一个3为止
* .toList(); // = [3, 4, 5]
* }</pre>
*
* <p>{@code JDK9}中的{@code dropWhile}方法不太一样此操作为顺序且有状态的中间操作
* 即使在并行流中该操作仍然是顺序执行的并且不影响后续的并行操作
* <pre>{@code
* EasyStream.iterate(1, i -> i + 1)
* .parallel()
* .dropWhile(e -> e < 50) // 顺序执行
* .map(e -> e + 1) // 并发
* .map(String::valueOf) // 并发
* .toList();
* }</pre>
* 若非必要不推荐在并行流中进行该操作
*
* @param predicate 断言
* @return 剩余元素组成的流
*/
default S dropWhile(final Predicate<? super T> predicate) {
Objects.requireNonNull(predicate);
return wrapping(StreamUtil.dropWhile(stream(), predicate));
}
/**
* 返回一个具有去重特征的流 非并行流(顺序流)下对于重复元素保留遇到顺序中最先出现的元素并行流情况下不能保证具体保留哪一个
* 这是一个有状态中间操作
*
* @param <F> 参数类型
* @param keyExtractor 去重依据
* @return 一个具有去重特征的流
*/
default <F> EasyStream<T> distinct(final Function<? super T, F> keyExtractor) {
Objects.requireNonNull(keyExtractor);
if (isParallel()) {
final ConcurrentHashMap<F, Boolean> exists = MapUtil.newConcurrentHashMap();
// 标记是否出现过null值用于保留第一个出现的null
// 由于ConcurrentHashMap的key不能为null所以用此变量来标记
final AtomicBoolean hasNull = new AtomicBoolean(false);
return EasyStream.of(stream().filter(e -> {
final F key = keyExtractor.apply(e);
if (key == null) {
// 已经出现过null值跳过该值
if (hasNull.get()) {
return false;
}
hasNull.set(Boolean.TRUE);
return true;
} else {
// 第一次出现的key返回true
return null == exists.putIfAbsent(key, Boolean.TRUE);
}
})).parallel();
} else {
final Set<F> exists = new HashSet<>();
return EasyStream.of(stream().filter(e -> exists.add(keyExtractor.apply(e))));
}
}
// endregion
// region ============ peek ============
/**
* 返回与指定函数将元素作为参数执行后组成的流操作带下标并行流时下标永远为-1
* 这是一个无状态中间操作
* @param action 指定的函数
* @return 返回叠加操作后的FastStream
* @apiNote 该方法存在的意义主要是用来调试
* 当你需要查看经过操作管道某处的元素和下标可以执行以下操作:
* <pre>{@code
* Stream.of("one", "two", "three", "four")
* .filter(e -> e.length() > 3)
* .peekIdx((e,i) -> System.out.println("Filtered value: " + e + " Filtered idx:" + i))
* .map(String::toUpperCase)
* .peekIdx((e,i) -> System.out.println("Mapped value: " + e + " Mapped idx:" + i))
* .collect(Collectors.toList());
* }</pre>
*/
default S peekIdx(BiConsumer<? super T, Integer> action) {
Objects.requireNonNull(action);
if (isParallel()) {
return peek(e -> action.accept(e, NOT_FOUND_ELEMENT_INDEX));
} else {
AtomicInteger index = new AtomicInteger(NOT_FOUND_ELEMENT_INDEX);
return peek(e -> action.accept(e, index.incrementAndGet()));
}
}
/**
* 返回叠加调用{@link Console#log(Object)}打印出结果的流
*
* @return 返回叠加操作后的FastStream
*/
default S log() {
return peek(Console::log);
}
// endregion
// region ============ concat ============
/**
* 与给定元素组成的流合并成为新的流
*
* @param obj 元素
* @return
*/
@SuppressWarnings("unchecked")
default S push(final T... obj) {
Stream<T> result = stream();
if (ArrayUtil.isNotEmpty(obj)) {
result = Stream.concat(stream(), Stream.of(obj));
}
return wrapping(result);
}
/**
* 给定元素组成的流与当前流合并成为新的流
*
* @param obj 元素
* @return
*/
default S unshift(final T... obj) {
Stream<T> result = stream();
if (ArrayUtil.isNotEmpty(obj)) {
result = Stream.concat(Stream.of(obj), stream());
}
return wrapping(result);
}
/**
* 将输入元素转为流返回一个前半段为当前流后半段为新流的新实例
*
* @param iterable 集合
* @return {@link EntryStream}实例
*/
default S append(Iterable<? extends T> iterable) {
if (IterUtil.isEmpty(iterable)) {
return wrapping(this);
}
final Stream<? extends T> contacted = StreamSupport.stream(iterable.spliterator(), isParallel());
return wrapping(Stream.concat(this, contacted));
}
/**
* 将输入元素转为流返回一个前半段为新流后半段为当前流的新实例
*
* @param iterable 集合
* @return {@link EntryStream}实例
*/
default S prepend(Iterable<? extends T> iterable) {
if (IterUtil.isEmpty(iterable)) {
return wrapping(this);
}
final Stream<? extends T> contacted = StreamSupport.stream(iterable.spliterator(), isParallel());
return wrapping(Stream.concat(contacted, this));
}
// endregion
// region ============ filter ============
/**
* 过滤掉空元素
*
* @return 过滤后的流
*/
default S nonNull() {
return filter(Objects::nonNull);
}
/**
* 过滤元素返回与指定断言匹配的元素组成的流断言带下标并行流时下标永远为-1
* 这是一个无状态中间操作
*
* @param predicate 断言
* @return 返回叠加过滤操作后的流
*/
default S filterIdx(final BiPredicate<? super T, Integer> predicate) {
Objects.requireNonNull(predicate);
if (isParallel()) {
return filter(e -> predicate.test(e, NOT_FOUND_ELEMENT_INDEX));
} else {
final MutableInt index = new MutableInt(NOT_FOUND_ELEMENT_INDEX);
return filter(e -> predicate.test(e, index.incrementAndGet()));
}
}
/**
* 过滤元素返回与 指定操作结果 匹配 指定值 的元素组成的流
* 这是一个无状态中间操作
*
* @param <R> 返回类型
* @param mapper 操作
* @param value 用来匹配的值
* @return 指定操作结果 匹配 指定值 的元素组成的流
*/
default <R> S filter(final Function<? super T, ? extends R> mapper, final R value) {
Objects.requireNonNull(mapper);
return filter(e -> Objects.equals(mapper.apply(e), value));
}
// endregion
// region ============ flat ============
/**
* 扩散流操作可能影响流元素个数将原有流元素执行mapper操作返回多个流所有元素组成的流<br>
* 这是一个无状态中间操作<br>
* 例如将users里所有user的id和parentId组合在一起形成一个新的流:
* <pre>{@code
* EasyStream<Long> ids = EasyStream.of(users).flatMap(user -> FastStream.of(user.getId(), user.getParentId()));
* }</pre>
*
* @param mapper 操作返回流
* @param <R> 拆分后流的元素类型
* @return 返回叠加拆分操作后的流
*/
@Override
default <R> EasyStream<R> flatMap(final Function<? super T, ? extends Stream<? extends R>> mapper) {
Objects.requireNonNull(mapper);
return new EasyStream<>(stream().flatMap(mapper));
}
/**
* 扩散流操作可能影响流元素个数将原有流元素执行mapper操作返回多个流所有元素组成的流操作带下标并行流时下标永远为-1
* 这是一个无状态中间操作
*
* @param mapper 操作返回流
* @param <R> 拆分后流的元素类型
* @return 返回叠加拆分操作后的流
*/
default <R> EasyStream<R> flatMapIdx(final BiFunction<? super T, Integer, ? extends Stream<? extends R>> mapper) {
Objects.requireNonNull(mapper);
if (isParallel()) {
return flatMap(e -> mapper.apply(e, NOT_FOUND_ELEMENT_INDEX));
} else {
final MutableInt index = new MutableInt(NOT_FOUND_ELEMENT_INDEX);
return flatMap(e -> mapper.apply(e, index.incrementAndGet()));
}
}
/**
* 扩散流操作可能影响流元素个数将原有流元素执行mapper操作, 转换为迭代器元素,
* 最后返回所有迭代器的所有元素组成的流<br>
* 这是一个无状态中间操作<br>
* 例如将users里所有user的id和parentId组合在一起形成一个新的流:
* <pre>{@code
* EasyStream<Long> ids = EasyStream.of(users).flat(user -> FastStream.of(user.getId(), user.getParentId()));
* }</pre>
*
* @param mapper 操作返回可迭代对象
* @param <R> 拆分后流的元素类型
* @return 返回叠加拆分操作后的流
*/
default <R> EasyStream<R> flat(final Function<? super T, ? extends Iterable<? extends R>> mapper) {
Objects.requireNonNull(mapper);
return flatMap(w -> EasyStream.of(mapper.apply(w)));
}
/**
* 扩散流操作可能影响流元素个数对过滤后的非{@code null}元素执行mapper操作转换为迭代器,
* 并过滤迭代器中为{@code null}的元素, 返回所有迭代器的所有非空元素组成的流<br>
* 这是一个无状态中间操作<br>
*
* @param mapper 操作返回流
* @param <R> 拆分后流的元素类型
* @return 返回叠加拆分操作后的流
* @see #flat(Function)
* @see #nonNull()
*/
default <R> EasyStream<R> flatNonNull(final Function<? super T, ? extends Iterable<? extends R>> mapper) {
return nonNull().flat(mapper).nonNull();
}
/**
* 将树递归扁平化为集合内置一个小递归
* 这是一个无状态中间操作 <br>
* eg:
* <pre>{@code
* List<Student> students = EasyStream.of(studentTree)
* .flatTree(Student::getChildren, Student::setChildren)
* .toList();
* }</pre>
*
* @param childrenGetter 获取子节点的lambda可以写作 {@code Student::getChildren}
* @param childrenSetter 设置子节点的lambda可以写作 {@code Student::setChildren}
* @return EasyStream 一个流
*/
default S flatTree(Function<T, List<T>> childrenGetter, BiConsumer<T, List<T>> childrenSetter) {
Objects.requireNonNull(childrenGetter);
Objects.requireNonNull(childrenSetter);
MutableObj<Function<T, EasyStream<T>>> recursiveRef = new MutableObj<>();
Function<T, EasyStream<T>> recursive = e -> EasyStream.of(childrenGetter.apply(e))
.flat(recursiveRef.get())
.unshift(e);
recursiveRef.set(recursive);
return wrapping(flatMap(recursive).peek(e -> childrenSetter.accept(e, null)));
}
// endregion
// region ============ map ============
/**
* 返回与指定函数将元素作为参数执行的结果组成的流
* 这是一个无状态中间操作
*
* @param mapper 指定的函数
* @param <R> 函数执行后返回的类型
* @return 返回叠加操作后的流
*/
@Override
default <R> EasyStream<R> map(final Function<? super T, ? extends R> mapper) {
Objects.requireNonNull(mapper);
return new EasyStream<>(stream().map(mapper));
}
/**
* 返回 元素 转换后 并且不为 {@code null} 新元素组成的流<br>
* 这是一个无状态中间操作<br>
* <pre>{@code
* // 等价于先调用map再调用nonNull
* .nonNull().map(...).nonNull()...
* }</pre>
*
* @param mapper 指定的函数
* @param <R> 函数执行后返回的类型
* @return 新元素组成的流
*/
default <R> EasyStream<R> mapNonNull(final Function<? super T, ? extends R> mapper) {
Objects.requireNonNull(mapper);
return new EasyStream<>(nonNull().<R>map(mapper).nonNull());
}
/**
* 返回与指定函数将元素作为参数执行的结果组成的流操作带下标并行流时下标永远为-1
* 这是一个无状态中间操作
*
* @param mapper 指定的函数
* @param <R> 函数执行后返回的类型
* @return 返回叠加操作后的流
*/
default <R> EasyStream<R> mapIdx(final BiFunction<? super T, Integer, ? extends R> mapper) {
Objects.requireNonNull(mapper);
if (isParallel()) {
return map(e -> mapper.apply(e, NOT_FOUND_ELEMENT_INDEX));
} else {
final MutableInt index = new MutableInt(NOT_FOUND_ELEMENT_INDEX);
return map(e -> mapper.apply(e, index.incrementAndGet()));
}
}
/**
* 扩散流操作可能影响流元素个数将原有流元素执行mapper操作返回多个流所有元素组成的流操作带一个方法调用该方法可增加元素
* 这是一个无状态中间操作
*
* @param mapper 操作返回流
* @param <R> 拆分后流的元素类型
* @return 返回叠加拆分操作后的流
*/
default <R> EasyStream<R> mapMulti(final BiConsumer<? super T, ? super Consumer<R>> mapper) {
Objects.requireNonNull(mapper);
return flatMap(e -> {
final EasyStream.Builder<R> buffer = EasyStream.builder();
mapper.accept(e, buffer);
return buffer.build();
});
}
// endregion
}

View File

@ -1,41 +1,33 @@
package cn.hutool.core.stream;
import cn.hutool.core.util.ObjUtil;
import java.util.Comparator;
import java.util.Iterator;
import java.util.Optional;
import java.util.Spliterator;
import java.util.*;
import java.util.function.*;
import java.util.stream.*;
/**
* <p>表示一个用于增强原始{@link Stream}对象的包装器当调用{@link Stream}中的方法时
* 将会代理到被包装的原始流对象并返回指定的包装器实例
* <p>{@link Stream}实例的包装器用于增强原始的{@link Stream}提供一些额外的中间与终端操作 <br>
* 默认提供两个可用实现
* <ul>
* <li>{@link EasyStream}针对单元素的通用增强流实现</li>
* <li>{@link EntryStream}针对键值对类型元素的增强流实现</li>
* </ul>
*
* @param <T> 流中的元素类型
* @param <I> 链式调用获得的实现类类型
* @param <S> {@link WrappedStream}的实现类类型
* @author huangchengxing
* @see TerminableStreamWrapper
* @see TransformableStreamWrapper
* @see AbstractEnhancedStreamWrapper
* @see TerminableWrappedStream
* @see TransformableWrappedStream
* @see AbstractEnhancedWrappedStream
* @see EasyStream
* @see EntryStream
* @since 6.0.0
*/
public interface StreamWrapper<T, I extends StreamWrapper<T, I>> extends Stream<T>, Iterable<T> {
public interface WrappedStream<T, S extends WrappedStream<T, S>> extends Stream<T>, Iterable<T> {
/**
* 代表不存在的下标, 一般用于并行流的下标, 或者未找到元素时的下标
*/
int NOT_FOUND_INDEX = -1;
/**
* 将一个流包装为简单增强流{@code stream}{@code null}则默认返回一个空串行流
*
* @param stream 被包装的流
* @return {@link SimpleStreamWrapper}实例
*/
static <T> SimpleStreamWrapper<T> create(Stream<T> stream) {
return new SimpleStreamWrapper<>(ObjUtil.defaultIfNull(stream, Stream.empty()));
}
int NOT_FOUND_ELEMENT_INDEX = -1;
/**
* 获取被包装的原始流
@ -48,9 +40,9 @@ public interface StreamWrapper<T, I extends StreamWrapper<T, I>> extends Stream<
* 将一个原始流包装为指定类型的增强流
*
* @param source 被包装的流
* @return I
* @return S
*/
I wrapping(Stream<T> source);
S wrapping(Stream<T> source);
/**
* 过滤元素返回与指定断言匹配的元素组成的流
@ -60,7 +52,8 @@ public interface StreamWrapper<T, I extends StreamWrapper<T, I>> extends Stream<
* @return 返回叠加过滤操作后的流
*/
@Override
default I filter(Predicate<? super T> predicate) {
default S filter(Predicate<? super T> predicate) {
Objects.requireNonNull(predicate);
return wrapping(stream().filter(predicate));
}
@ -73,6 +66,7 @@ public interface StreamWrapper<T, I extends StreamWrapper<T, I>> extends Stream<
*/
@Override
default IntStream mapToInt(ToIntFunction<? super T> mapper) {
Objects.requireNonNull(mapper);
return stream().mapToInt(mapper);
}
@ -85,6 +79,7 @@ public interface StreamWrapper<T, I extends StreamWrapper<T, I>> extends Stream<
*/
@Override
default LongStream mapToLong(ToLongFunction<? super T> mapper) {
Objects.requireNonNull(mapper);
return stream().mapToLong(mapper);
}
@ -97,6 +92,7 @@ public interface StreamWrapper<T, I extends StreamWrapper<T, I>> extends Stream<
*/
@Override
default DoubleStream mapToDouble(ToDoubleFunction<? super T> mapper) {
Objects.requireNonNull(mapper);
return stream().mapToDouble(mapper);
}
@ -109,6 +105,7 @@ public interface StreamWrapper<T, I extends StreamWrapper<T, I>> extends Stream<
*/
@Override
default IntStream flatMapToInt(Function<? super T, ? extends IntStream> mapper) {
Objects.requireNonNull(mapper);
return stream().flatMapToInt(mapper);
}
@ -121,6 +118,7 @@ public interface StreamWrapper<T, I extends StreamWrapper<T, I>> extends Stream<
*/
@Override
default LongStream flatMapToLong(Function<? super T, ? extends LongStream> mapper) {
Objects.requireNonNull(mapper);
return stream().flatMapToLong(mapper);
}
@ -133,6 +131,7 @@ public interface StreamWrapper<T, I extends StreamWrapper<T, I>> extends Stream<
*/
@Override
default DoubleStream flatMapToDouble(Function<? super T, ? extends DoubleStream> mapper) {
Objects.requireNonNull(mapper);
return stream().flatMapToDouble(mapper);
}
@ -143,7 +142,7 @@ public interface StreamWrapper<T, I extends StreamWrapper<T, I>> extends Stream<
* @return 一个具有去重特征的流
*/
@Override
default I distinct() {
default S distinct() {
return wrapping(stream().distinct());
}
@ -156,7 +155,7 @@ public interface StreamWrapper<T, I extends StreamWrapper<T, I>> extends Stream<
* @return 一个元素按自然顺序排序的流
*/
@Override
default I sorted() {
default S sorted() {
return wrapping(stream().sorted());
}
@ -170,7 +169,8 @@ public interface StreamWrapper<T, I extends StreamWrapper<T, I>> extends Stream<
* @return 一个元素按指定的Comparator排序的流
*/
@Override
default I sorted(Comparator<? super T> comparator) {
default S sorted(Comparator<? super T> comparator) {
Objects.requireNonNull(comparator);
return wrapping(stream().sorted(comparator));
}
@ -192,7 +192,8 @@ public interface StreamWrapper<T, I extends StreamWrapper<T, I>> extends Stream<
* }</pre>
*/
@Override
default I peek(Consumer<? super T> action) {
default S peek(Consumer<? super T> action) {
Objects.requireNonNull(action);
return wrapping(stream().peek(action));
}
@ -204,7 +205,7 @@ public interface StreamWrapper<T, I extends StreamWrapper<T, I>> extends Stream<
* @return 截取后的流
*/
@Override
default I limit(long maxSize) {
default S limit(long maxSize) {
return wrapping(stream().limit(maxSize));
}
@ -216,7 +217,7 @@ public interface StreamWrapper<T, I extends StreamWrapper<T, I>> extends Stream<
* @return 丢弃前面n个元素后的剩余元素组成的流
*/
@Override
default I skip(long n) {
default S skip(long n) {
return wrapping(stream().skip(n));
}
@ -228,6 +229,7 @@ public interface StreamWrapper<T, I extends StreamWrapper<T, I>> extends Stream<
*/
@Override
default void forEach(Consumer<? super T> action) {
Objects.requireNonNull(action);
stream().forEach(action);
}
@ -239,6 +241,7 @@ public interface StreamWrapper<T, I extends StreamWrapper<T, I>> extends Stream<
*/
@Override
default void forEachOrdered(Consumer<? super T> action) {
Objects.requireNonNull(action);
stream().forEachOrdered(action);
}
@ -264,6 +267,7 @@ public interface StreamWrapper<T, I extends StreamWrapper<T, I>> extends Stream<
*/
@Override
default <A> A[] toArray(IntFunction<A[]> generator) {
Objects.requireNonNull(generator);
return stream().toArray(generator);
}
@ -289,6 +293,7 @@ public interface StreamWrapper<T, I extends StreamWrapper<T, I>> extends Stream<
*/
@Override
default T reduce(T identity, BinaryOperator<T> accumulator) {
Objects.requireNonNull(accumulator);
return stream().reduce(identity, accumulator);
}
@ -324,6 +329,7 @@ public interface StreamWrapper<T, I extends StreamWrapper<T, I>> extends Stream<
*/
@Override
default Optional<T> reduce(BinaryOperator<T> accumulator) {
Objects.requireNonNull(accumulator);
return stream().reduce(accumulator);
}
@ -341,6 +347,8 @@ public interface StreamWrapper<T, I extends StreamWrapper<T, I>> extends Stream<
*/
@Override
default <U> U reduce(U identity, BiFunction<U, ? super T, U> accumulator, BinaryOperator<U> combiner) {
Objects.requireNonNull(accumulator);
Objects.requireNonNull(combiner);
return stream().reduce(identity, accumulator, combiner);
}
@ -359,6 +367,9 @@ public interface StreamWrapper<T, I extends StreamWrapper<T, I>> extends Stream<
*/
@Override
default <R> R collect(Supplier<R> supplier, BiConsumer<R, ? super T> accumulator, BiConsumer<R, R> combiner) {
Objects.requireNonNull(supplier);
Objects.requireNonNull(accumulator);
Objects.requireNonNull(combiner);
return stream().collect(supplier, accumulator, combiner);
}
@ -373,6 +384,7 @@ public interface StreamWrapper<T, I extends StreamWrapper<T, I>> extends Stream<
*/
@Override
default <R, A> R collect(Collector<? super T, A, R> collector) {
Objects.requireNonNull(collector);
return stream().collect(collector);
}
@ -384,6 +396,7 @@ public interface StreamWrapper<T, I extends StreamWrapper<T, I>> extends Stream<
*/
@Override
default Optional<T> min(Comparator<? super T> comparator) {
Objects.requireNonNull(comparator);
return stream().min(comparator);
}
@ -395,6 +408,7 @@ public interface StreamWrapper<T, I extends StreamWrapper<T, I>> extends Stream<
*/
@Override
default Optional<T> max(Comparator<? super T> comparator) {
Objects.requireNonNull(comparator);
return stream().max(comparator);
}
@ -416,6 +430,7 @@ public interface StreamWrapper<T, I extends StreamWrapper<T, I>> extends Stream<
*/
@Override
default boolean anyMatch(Predicate<? super T> predicate) {
Objects.requireNonNull(predicate);
return stream().anyMatch(predicate);
}
@ -427,6 +442,7 @@ public interface StreamWrapper<T, I extends StreamWrapper<T, I>> extends Stream<
*/
@Override
default boolean allMatch(Predicate<? super T> predicate) {
Objects.requireNonNull(predicate);
return stream().allMatch(predicate);
}
@ -438,6 +454,7 @@ public interface StreamWrapper<T, I extends StreamWrapper<T, I>> extends Stream<
*/
@Override
default boolean noneMatch(Predicate<? super T> predicate) {
Objects.requireNonNull(predicate);
return stream().noneMatch(predicate);
}
@ -497,7 +514,7 @@ public interface StreamWrapper<T, I extends StreamWrapper<T, I>> extends Stream<
* @return 串行流
*/
@Override
default I sequential() {
default S sequential() {
return wrapping(stream().sequential());
}
@ -507,7 +524,7 @@ public interface StreamWrapper<T, I extends StreamWrapper<T, I>> extends Stream<
* @return 并行流
*/
@Override
default I parallel() {
default S parallel() {
return wrapping(stream().parallel());
}
@ -518,7 +535,7 @@ public interface StreamWrapper<T, I extends StreamWrapper<T, I>> extends Stream<
* @return 无序流
*/
@Override
default I unordered() {
default S unordered() {
return wrapping(stream().unordered());
}
@ -529,7 +546,7 @@ public interface StreamWrapper<T, I extends StreamWrapper<T, I>> extends Stream<
* @return
*/
@Override
default I onClose(Runnable closeHandler) {
default S onClose(Runnable closeHandler) {
return wrapping(stream().onClose(closeHandler));
}

View File

@ -0,0 +1,705 @@
package cn.hutool.core.stream;
import cn.hutool.core.collection.ListUtil;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;
import org.junit.Assert;
import org.junit.Test;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.stream.*;
import static java.util.Arrays.asList;
import static java.util.Collections.singletonList;
/**
* {@link AbstractEnhancedWrappedStream}{@link TerminableWrappedStream}{@link TransformableWrappedStream}的测试用例
* 此用例用于保证通过{@link AbstractEnhancedWrappedStream}获得的默认方法在子类不重写的情况下能够按照预期效果生效
*
* @author huangchengxing
*/
public class AbstractEnhancedWrappedStreamTest {
@Test
public void testToList() {
List<Integer> list = asList(1, 2, 3);
List<Integer> toList = wrap(list).toList();
Assert.assertEquals(list, toList);
}
@Test
public void testToUnmodifiableList() {
List<Integer> list = wrap(1, 2, 3)
.toUnmodifiableList();
Assert.assertThrows(UnsupportedOperationException.class, () -> list.remove(0));
}
@Test
public void testToSet() {
List<Integer> list = asList(1, 2, 3);
Set<String> toSet = wrap(list).map(String::valueOf).toSet();
Assert.assertEquals(new HashSet<>(asList("1", "2", "3")), toSet);
}
@Test
public void testToUnmodifiableSet() {
Set<Integer> set = wrap(1, 2, 3)
.toUnmodifiableSet();
Assert.assertThrows(UnsupportedOperationException.class, () -> set.remove(0));
}
@Test
public void testToCollection() {
List<Integer> list = asList(1, 2, 3);
List<String> toCollection = wrap(list).map(String::valueOf).toColl(LinkedList::new);
Assert.assertEquals(asList("1", "2", "3"), toCollection);
}
@Test
public void testToMap() {
List<Integer> list = asList(1, 2, 3);
Map<String, Integer> identityMap = wrap(list).toMap(String::valueOf);
Assert.assertEquals(new HashMap<String, Integer>() {{
put("1", 1);
put("2", 2);
put("3", 3);
}}, identityMap);
}
@Test
public void testToUnmodifiableMap() {
Map<Integer, Integer> map1 = wrap(1, 2, 3).toUnmodifiableMap(Function.identity(), Function.identity());
Assert.assertThrows(UnsupportedOperationException.class, () -> map1.remove(1));
Map<Integer, Integer> map2 = wrap(1, 2, 3).toUnmodifiableMap(Function.identity(), Function.identity(), (t1, t2) -> t1);
Assert.assertThrows(UnsupportedOperationException.class, () -> map2.remove(1));
}
@Test
public void testToZip() {
List<Integer> orders = asList(1, 2, 3);
List<String> list = asList("dromara", "hutool", "sweet");
Map<Integer, String> toZip = wrap(orders).toZip(list);
Assert.assertEquals(new HashMap<Integer, String>() {{
put(1, "dromara");
put(2, "hutool");
put(3, "sweet");
}}, toZip);
}
@Test
public void testTransform() {
List<Integer> list = wrap(1, 2, 3).transform(Wrapper::toList).orElse(null);
Assert.assertEquals(asList(1, 2, 3), list);
}
@Test
public void testFindFirst() {
List<Integer> list = asList(1, 2, 3);
Assert.assertEquals((Integer)1, wrap(list).findFirst(t -> (t & 1) == 1).orElse(null));
Assert.assertEquals((Integer)1, wrap(list).filter(t -> (t & 1) == 1).findFirst().orElse(null));
}
@Test
public void testFindFirstIdx() {
List<Integer> list = asList(1, 2, 3);
Assert.assertEquals(1, wrap(list).findFirstIdx(t -> (t & 1) == 0));
}
@Test
public void testFindLast() {
List<Integer> list = asList(1, 2, 3);
Assert.assertEquals((Integer)3, wrap(list).findLast(t -> (t & 1) == 1).orElse(null));
}
@Test
public void testFindLastIdx() {
List<Integer> list = asList(1, 2, 3);
Assert.assertEquals(1, wrap(list).findLastIdx(t -> (t & 1) == 0));
}
@Test
public void testAt() {
List<Integer> list = asList(1, 2, 3);
Assert.assertEquals((Integer)3, wrap(list).at(2).orElse(null));
}
@Test
public void testIsEmpty() {
Assert.assertTrue(wrap(Collections.emptyList()).isEmpty());
Assert.assertFalse(wrap(asList(1, 2, 3)).isEmpty());
}
@Test
public void testIsNotEmpty() {
Assert.assertFalse(wrap(Collections.emptyList()).isNotEmpty());
Assert.assertTrue(wrap(asList(1, 2, 3)).isNotEmpty());
}
@Test
public void testJoining() {
List<Integer> list = asList(1, 2, 3);
String joining = wrap(list).join();
Assert.assertEquals("123", joining);
Assert.assertEquals("1,2,3", wrap(list).join(","));
Assert.assertEquals("(1,2,3)", wrap(list).join(",", "(", ")"));
}
@Test
public void testGrouping() {
List<Integer> list = asList(1, 2, 3);
Map<String, List<Integer>> map = new HashMap<String, List<Integer>>() {{
put("1", singletonList(1));
put("2", singletonList(2));
put("3", singletonList(3));
}};
Map<String, List<Integer>> group = wrap(list).group(String::valueOf, HashMap::new, Collectors.toList());
Assert.assertEquals(map, group);
group = wrap(list).group(String::valueOf, Collectors.toList());
Assert.assertEquals(map, group);
group = wrap(list).group(String::valueOf);
Assert.assertEquals(map, group);
}
@Test
public void testPartitioning() {
List<Integer> list = asList(1, 2, 3);
Map<Boolean, List<Integer>> map = new HashMap<Boolean, List<Integer>>() {{
put(Boolean.TRUE, singletonList(2));
put(Boolean.FALSE, asList(1, 3));
}};
Map<Boolean, List<Integer>> partition = wrap(list).partitioning(t -> (t & 1) == 0, Collectors.toList());
Assert.assertEquals(map, partition);
partition = wrap(list).partitioning(t -> (t & 1) == 0);
Assert.assertEquals(map, partition);
}
@Test
public void testForEachIdx() {
List<Integer> elements = new ArrayList<>();
List<Integer> indexes = new ArrayList<>();
wrap(1, 2, 3).forEachIdx((t, i) -> {
elements.add(t);
indexes.add(i);
});
Assert.assertEquals(asList(1, 2, 3), elements);
Assert.assertEquals(asList(0, 1, 2), indexes);
}
@Test
public void testForEachOrderedIdx() {
List<Integer> elements = new ArrayList<>();
List<Integer> indexes = new ArrayList<>();
wrap(1, 2, 3).forEachOrderedIdx((t, i) -> {
elements.add(t);
indexes.add(i);
});
Assert.assertEquals(asList(1, 2, 3), elements);
Assert.assertEquals(asList(0, 1, 2), indexes);
}
@Test
public void testForEachOrdered() {
List<Integer> elements = new ArrayList<>();
wrap(1, 2, 3).forEachOrdered(elements::add);
Assert.assertEquals(asList(1, 2, 3), elements);
}
@Test
public void testForEach() {
List<Integer> elements = new ArrayList<>();
wrap(1, 2, 3).forEach(elements::add);
Assert.assertEquals(asList(1, 2, 3), elements);
}
@Test
public void testMapToInt() {
int[] array = wrap(1, 2, 3).mapToInt(Integer::intValue).toArray();
Assert.assertArrayEquals(new int[] {1, 2, 3}, array);
}
@Test
public void testMapToLong() {
long[] array = wrap(1L, 2L, 3L).mapToLong(Long::intValue).toArray();
Assert.assertArrayEquals(new long[] {1L, 2L, 3L}, array);
}
@Test
public void testMapToDouble() {
double[] array = wrap(1d, 2d, 3d).mapToDouble(Double::intValue).toArray();
Assert.assertEquals(1d, array[0], 0.01);
Assert.assertEquals(2d, array[1], 0.01);
Assert.assertEquals(3d, array[2], 0.01);
}
@Test
public void testFlatMapToInt() {
int[] array = wrap(1, 2, 3).flatMapToInt(IntStream::of).toArray();
Assert.assertArrayEquals(new int[] {1, 2, 3}, array);
}
@Test
public void testFlatMapToLong() {
long[] array = wrap(1L, 2L, 3L).flatMapToLong(LongStream::of).toArray();
Assert.assertArrayEquals(new long[] {1L, 2L, 3L}, array);
}
@Test
public void testFlatMapToDouble() {
double[] array = wrap(1d, 2d, 3d).flatMapToDouble(DoubleStream::of).toArray();
Assert.assertEquals(1d, array[0], 0.01);
Assert.assertEquals(2d, array[1], 0.01);
Assert.assertEquals(3d, array[2], 0.01);
}
@Test
public void testSorted() {
List<Integer> list = wrap(3, 1, 2).sorted().toList();
Assert.assertEquals(asList(1, 2, 3), list);
}
@Test
public void testPeek() {
List<Integer> elements = new ArrayList<>();
wrap(1, 2, 3).peek(elements::add).exec();
Assert.assertEquals(asList(1, 2, 3), elements);
}
@Test
public void testPeekIdx() {
List<Integer> elements = new ArrayList<>();
List<Integer> indexes = new ArrayList<>();
wrap(1, 2, 3).peekIdx((t, i) -> {
elements.add(t);
indexes.add(i);
}).exec();
Assert.assertEquals(asList(1, 2, 3), elements);
Assert.assertEquals(asList(0, 1, 2), indexes);
Set<Integer> elements2 = new HashSet<>();
Set<Integer> indexes2 = new HashSet<>();
wrap(1, 2, null).parallel().peekIdx((t, i) -> {
elements2.add(t);
indexes2.add(i);
}).exec();
Assert.assertEquals(new HashSet<>(asList(1, null, 2)), elements2);
Assert.assertEquals(new HashSet<>(asList(-1, -1, -1)), indexes2);
}
@Test
public void testLimit() {
List<Integer> list = wrap(1, 2, 3).limit(2L).toList();
Assert.assertEquals(asList(1, 2), list);
}
@Test
public void testSkip() {
List<Integer> list = wrap(1, 2, 3).skip(1L).toList();
Assert.assertEquals(asList(2, 3), list);
}
@Test
public void testToArray() {
Object[] array1 = wrap(1, 2, 3).toArray();
Assert.assertArrayEquals(new Object[]{1, 2, 3}, array1);
array1 = wrap(1, 2, 3).toArray(Object[]::new);
Assert.assertArrayEquals(new Object[]{1, 2, 3}, array1);
}
@Test
public void testReduce() {
Assert.assertEquals((Integer)6, wrap(1, 2, 3).reduce(Integer::sum).orElse(null));
Assert.assertEquals((Integer)6, wrap(1, 2, 3).reduce(0, Integer::sum));
Assert.assertEquals((Integer)6, wrap(1, 2, 3).reduce(0, Integer::sum, Integer::sum));
}
@Test
public void testCollect() {
Assert.assertEquals(asList(1, 2, 3), wrap(1, 2, 3).collect(Collectors.toList()));
Assert.assertEquals(
asList(1, 2, 3),
wrap(1, 2, 3).collect(ArrayList::new, List::add, List::addAll)
);
}
@Test
public void testMin() {
Assert.assertEquals((Integer)1, wrap(1, 2, 3).min(Comparator.comparingInt(Integer::intValue)).orElse(null));
}
@Test
public void testMax() {
Assert.assertEquals((Integer)3, wrap(1, 2, 3).max(Comparator.comparingInt(Integer::intValue)).orElse(null));
}
@Test
public void testCount() {
Assert.assertEquals(3, wrap(1, 2, 3).count());
}
@Test
public void testAnyMatch() {
Assert.assertTrue(wrap(1, 2, 3).anyMatch(t -> (t & 1) == 0));
Assert.assertFalse(wrap(1, 3).anyMatch(t -> (t & 1) == 0));
}
@Test
public void testAllMatch() {
Assert.assertFalse(wrap(1, 2, 3).allMatch(t -> (t & 1) == 0));
Assert.assertTrue(wrap(2, 4).anyMatch(t -> (t & 1) == 0));
}
@Test
public void testNoneMatch() {
Assert.assertFalse(wrap(1, 2, 3).noneMatch(t -> (t & 1) == 0));
Assert.assertTrue(wrap(1, 3).noneMatch(t -> (t & 1) == 0));
}
@Test
public void testFindAny() {
Assert.assertNotNull(wrap(1, 2, 3).findAny());
}
@Test
public void testIterator() {
Iterator<Integer> iter1 = Stream.of(1, 2, 3).iterator();
Iterator<Integer> iter2 = wrap(1, 2, 3).iterator();
while (iter1.hasNext() && iter2.hasNext()) {
Assert.assertEquals(iter1.next(), iter2.next());
}
}
@Test
public void testSpliterator() {
Spliterator<Integer> iter1 = Stream.of(1, 2, 3).spliterator();
Spliterator<Integer> iter2 = wrap(1, 2, 3).spliterator();
Assert.assertEquals(iter1.trySplit().estimateSize(), iter2.trySplit().estimateSize());
}
@Test
public void testIsParallel() {
Assert.assertTrue(wrap(Stream.of(1, 2, 3).parallel()).isParallel());
}
@Test
public void testSequential() {
Assert.assertFalse(wrap(Stream.of(1, 2, 3).parallel()).sequential().isParallel());
}
@Test
public void testUnordered() {
Assert.assertNotNull(wrap(Stream.of(1, 2, 3)).unordered());
}
@Test
public void testOnClose() {
AtomicBoolean atomicBoolean = new AtomicBoolean(false);
wrap(Stream.of(1, 2, 3).onClose(() -> atomicBoolean.set(true))).close();
Assert.assertTrue(atomicBoolean.get());
}
@Test
public void testClose() {
Wrapper<Integer> stream = wrap(Stream.of(1, 2, 3));
stream.close();
Assert.assertThrows(IllegalStateException.class, stream::exec);
}
@Test
public void testReverse() {
Assert.assertEquals(
asList(3, 2, 1), wrap(1, 2, 3).reverse().toList()
);
}
@Test
public void testParallel() {
Assert.assertTrue(wrap(1, 2, 3).parallel().isParallel());
}
@Test
public void testSplice() {
Assert.assertEquals(
asList(1, 4, 5), wrap(1, 2, 3).splice(1, 2, 4, 5).toList()
);
}
@Test
public void testTakeWhile() {
Assert.assertEquals(
asList(1, 2),
wrap(1, 2, 3, 4).takeWhile(i -> !Objects.equals(i, 3)).toList()
);
}
@Test
public void testDropWhile() {
Assert.assertEquals(
asList(3, 4),
wrap(1, 2, 3, 4).dropWhile(i -> !Objects.equals(i, 3)).toList()
);
}
@Test
public void testDistinct() {
Assert.assertEquals(
asList(1, 2, 3), wrap(1, 1, 2, 3).distinct().toList()
);
}
@Test
public void testLog() {
Assert.assertNotNull(wrap(1, 2, 3).log().toList());
}
@Test
public void testPush() {
Assert.assertEquals(
asList(1, 2, 3), wrap(1).push(2, 3).toList()
);
}
@Test
public void testUnshift() {
Assert.assertEquals(
asList(1, 2, 3), wrap(3).unshift(1, 2).toList()
);
}
@Test
public void testAppend() {
Assert.assertEquals(
asList(1, 2, 3), wrap(1).append(asList(2, 3)).toList()
);
Assert.assertEquals(
asList(1, 2, 3), wrap(1, 2, 3).append(null).toList()
);
}
@Test
public void testPrepend() {
Assert.assertEquals(
asList(1, 2, 3), wrap(3).prepend(asList(1, 2)).toList()
);
Assert.assertEquals(
asList(1, 2, 3), wrap(1, 2, 3).prepend(null).toList()
);
}
@Test
public void testNonNull() {
Assert.assertEquals(
asList(1, 3), wrap(1, null, 3).nonNull().toList()
);
}
@Test
public void testFilterIdx() {
List<Integer> indexes = new ArrayList<>();
Assert.assertEquals(
asList(1, 3),
wrap(1, 2, 3).filterIdx((t, i) -> {
indexes.add(i);
return (t & 1) == 1;
}).toList()
);
Assert.assertEquals(asList(0, 1, 2), indexes);
}
@Test
public void testFilter() {
Assert.assertEquals(
asList(1, 3), wrap(1, 2, 3).filter(i -> (i & 1) == 1).toList()
);
}
@Test
public void testFlatMap() {
Assert.assertEquals(
asList(1, 2, 3), wrap(1, 2, 3).flatMap(Stream::of).toList()
);
}
@Test
public void testFlatMapIdx() {
List<Integer> indexes = new ArrayList<>();
Assert.assertEquals(
asList(1, 2, 3), wrap(1, 2, 3).flatMapIdx((t, i) -> {
indexes.add(i);
return Stream.of(t);
}).toList()
);
Assert.assertEquals(asList(0, 1, 2), indexes);
}
@Test
public void testFlat() {
Assert.assertEquals(
asList(1, 2, 3), wrap(1, 2, 3).flat(Collections::singletonList).toList()
);
}
@Test
public void testFlatNonNull() {
Assert.assertEquals(
asList(2, 3), wrap(null, 2, 3).flatNonNull(Collections::singletonList).toList()
);
}
@Test
public void testFlatTree() {
Tree root = new Tree(1, asList(new Tree(2, asList(new Tree(3, Collections.emptyList())))));
Assert.assertEquals(3L, wrap(root).flatTree(Tree::getChildren, Tree::setChildren).count());
}
@Test
public void testMap() {
Assert.assertEquals(
asList("1", "2", "3"), wrap(1, 2, 3).map(String::valueOf).toList()
);
}
@Test
public void testMapNonNull() {
Assert.assertEquals(
asList("3"), wrap(null, 2, 3, 4).mapNonNull(t -> ((t & 1) == 0) ? null : String.valueOf(t)).toList()
);
}
@Test
public void testMapIdx() {
List<Integer> indexes = new ArrayList<>();
Assert.assertEquals(
asList("1", "2", "3"), wrap(1, 2, 3).mapIdx((t, i) -> {
indexes.add(i);
return String.valueOf(t);
}).toList()
);
Assert.assertEquals(asList(0, 1, 2), indexes);
}
@Test
public void testMapMulti() {
Assert.assertEquals(
asList(1, 2, 3),
wrap(1, 2, 3).mapMulti((t, builder) -> {
builder.accept(t);
}).toList()
);
}
@Test
public void testHashCode() {
Stream<Integer> stream = Stream.of(1, 2, 3);
Assert.assertEquals(stream.hashCode(), wrap(stream).hashCode());
}
@Test
public void testEquals() {
Stream<Integer> stream = Stream.of(1, 2, 3);
Assert.assertEquals(wrap(stream), stream);
}
@Test
public void testToString() {
Stream<Integer> stream = Stream.of(1, 2, 3);
Assert.assertEquals(stream.toString(), wrap(stream).toString());
}
@Test
public void testToEntries() {
Map<Integer, Integer> expect = new HashMap<Integer, Integer>(){{
put(1, 1);
put(2, 2);
put(3, 3);
}};
Map<Integer, Integer> map = EasyStream.of(1, 2, 3)
.toEntries(Function.identity(), Function.identity())
.toMap();
Assert.assertEquals(expect, map);
map = EasyStream.of(1, 2, 3)
.toEntries(Function.identity())
.toMap();
Assert.assertEquals(expect, map);
}
@Test
public void testZip() {
final List<Integer> orders = Arrays.asList(1, 2, 3);
final List<String> list = Arrays.asList("dromara", "hutool", "sweet");
List<String> zip = wrap(orders).zip(list, (e1, e2) -> e1 + "." + e2).toList();
Assert.assertEquals(Arrays.asList("1.dromara", "2.hutool", "3.sweet"), zip);
zip = wrap((Stream<? extends Object>)EasyStream.iterate(1, i -> i + 1)).zip(list, (e1, e2) -> e1 + "." + e2).toList();
Assert.assertEquals(Arrays.asList("1.dromara", "2.hutool", "3.sweet"), zip);
}
@Test
public void testListSplit() {
final List<Integer> list = Arrays.asList(1, 2, 3, 4, 5);
List<List<Integer>> lists = wrap(list).split(2).map(TerminableWrappedStream::toList).toList();
Assert.assertEquals(ListUtil.split(list, 2), lists);
// 指定长度 大于等于 列表长度
lists = wrap(list).split(list.size()).map(TerminableWrappedStream::toList).toList();
Assert.assertEquals(singletonList(list), lists);
}
@Test
public void testSplitList() {
final List<Integer> list = Arrays.asList(1, 2, 3, 4, 5);
List<List<Integer>> lists = wrap(list).splitList(2).toList();
Assert.assertEquals(ListUtil.split(list, 2), lists);
// 指定长度 大于等于 列表长度
lists = wrap(list).splitList(list.size()).toList();
Assert.assertEquals(singletonList(list), lists);
}
@SafeVarargs
private static <T> Wrapper<T> wrap(T... array) {
return new Wrapper<>(Stream.of(array));
}
private static <T> Wrapper<T> wrap(Iterable<T> iterable) {
return new Wrapper<>(StreamSupport.stream(iterable.spliterator(), false));
}
private static <T> Wrapper<T> wrap(Stream<T> stream) {
return new Wrapper<>(stream);
}
private static class Wrapper<T> extends AbstractEnhancedWrappedStream<T, Wrapper<T>> {
/**
* 创建一个流包装器
*
* @param stream 包装的流对象
* @throws NullPointerException {@code stream}{@code null}时抛出
*/
protected Wrapper(Stream<T> stream) {
super(stream);
}
@Override
public Wrapper<T> wrapping(Stream<T> source) {
return new Wrapper<>(source);
}
}
@Setter
@Getter
@AllArgsConstructor
private static class Tree {
private final Integer id;
private List<Tree> children;
}
}

View File

@ -1,9 +1,7 @@
package cn.hutool.core.stream;
import cn.hutool.core.collection.ListUtil;
import cn.hutool.core.map.MapUtil;
import lombok.Builder;
import lombok.Data;
import lombok.experimental.Tolerate;
import org.junit.Assert;
@ -22,6 +20,13 @@ import static java.util.Collections.singletonList;
*/
public class EasyStreamTest {
@Test
public void testConcat() {
Stream<Integer> stream1 = Stream.of(1, 2);
Stream<Integer> stream2 = Stream.of(3, 4);
Assert.assertEquals(4, EasyStream.concat(stream1, stream2).count());
}
@Test
public void testBuilder() {
final List<Integer> list = EasyStream.<Integer>builder().add(1).add(2).add(3).build().toList();
@ -54,7 +59,7 @@ public class EasyStreamTest {
}
@Test
public void testToCollection() {
public void testToColl() {
final List<Integer> list = Arrays.asList(1, 2, 3);
final List<String> toCollection = EasyStream.of(list).map(String::valueOf).toColl(LinkedList::new);
Assert.assertEquals(Arrays.asList("1", "2", "3"), toCollection);
@ -168,18 +173,22 @@ public class EasyStreamTest {
final List<Integer> collect1 = list.stream().distinct().collect(Collectors.toList());
final List<Integer> collect2 = list.stream().parallel().distinct().collect(Collectors.toList());
// 使用FastStream去重
// 使用EasyStream去重
final List<Integer> distinctBy1 = EasyStream.of(list).distinct().toList();
final List<Integer> distinctBy2 = EasyStream.of(list).parallel().distinct(String::valueOf).toList();
Assert.assertEquals(collect1, distinctBy1);
Assert.assertEquals(collect2, distinctBy2);
Assert.assertEquals(
4, EasyStream.of(1, 2, 2, null, 3, null).parallel(true).distinct(t -> Objects.isNull(t) ? null : t.toString()).count()
);
}
@Test
public void testForeachIdx() {
final List<String> list = Arrays.asList("dromara", "hutool", "sweet");
final EasyStream.FastStreamBuilder<String> builder = EasyStream.builder();
final EasyStream.Builder<String> builder = EasyStream.builder();
EasyStream.of(list).forEachIdx((e, i) -> builder.accept(i + 1 + "." + e));
Assert.assertEquals(Arrays.asList("1.dromara", "2.hutool", "3.sweet"), builder.build().toList());
// 并行流时为-1
@ -189,11 +198,11 @@ public class EasyStreamTest {
@Test
public void testForEachOrderedIdx() {
final List<String> list = Arrays.asList("dromara", "hutool", "sweet");
final EasyStream.FastStreamBuilder<String> builder = EasyStream.builder();
final EasyStream.Builder<String> builder = EasyStream.builder();
EasyStream.of(list).forEachOrderedIdx((e, i) -> builder.accept(i + 1 + "." + e));
Assert.assertEquals(Arrays.asList("1.dromara", "2.hutool", "3.sweet"), builder.build().toList());
final EasyStream.FastStreamBuilder<String> streamBuilder = EasyStream.builder();
final EasyStream.Builder<String> streamBuilder = EasyStream.builder();
EasyStream.of(list).parallel().forEachOrderedIdx((e, i) -> streamBuilder.accept(i + 1 + "." + e));
Assert.assertEquals(Arrays.asList("0.dromara", "0.hutool", "0.sweet"), streamBuilder.build().toList());
@ -360,39 +369,6 @@ public class EasyStreamTest {
Assert.assertEquals(ListUtil.of((Object) null), EasyStream.of((Object) null).reverse().toList());
}
@Test
public void testZip() {
final List<Integer> orders = Arrays.asList(1, 2, 3);
final List<String> list = Arrays.asList("dromara", "hutool", "sweet");
List<String> zip = EasyStream.of(orders).zip(list, (e1, e2) -> e1 + "." + e2).toList();
Assert.assertEquals(Arrays.asList("1.dromara", "2.hutool", "3.sweet"), zip);
zip = EasyStream.iterate(1, i -> i + 1).zip(list, (e1, e2) -> e1 + "." + e2).toList();
Assert.assertEquals(Arrays.asList("1.dromara", "2.hutool", "3.sweet"), zip);
}
@Test
public void testListSplit() {
final List<Integer> list = Arrays.asList(1, 2, 3, 4, 5);
List<List<Integer>> lists = EasyStream.of(list).split(2).map(EasyStream::toList).toList();
Assert.assertEquals(ListUtil.split(list, 2), lists);
// 指定长度 大于等于 列表长度
lists = EasyStream.of(list).split(list.size()).map(EasyStream::toList).toList();
Assert.assertEquals(singletonList(list), lists);
}
@Test
public void testSplitList() {
final List<Integer> list = Arrays.asList(1, 2, 3, 4, 5);
List<List<Integer>> lists = EasyStream.of(list).splitList(2).toList();
Assert.assertEquals(ListUtil.split(list, 2), lists);
// 指定长度 大于等于 列表长度
lists = EasyStream.of(list).splitList(list.size()).toList();
Assert.assertEquals(singletonList(list), lists);
}
@Test
public void testTakeWhile() {
// 1 10
@ -557,7 +533,7 @@ public class EasyStreamTest {
}
@Data
@Builder
@lombok.Builder
public static class Student {
private String name;
private Integer age;

View File

@ -31,6 +31,17 @@ public class EntryStreamTest {
EntryStream.merge(Arrays.asList(1, 2), Arrays.asList(1, 2, 3))
.collectKeys(Collectors.toList())
);
Assert.assertEquals(
Arrays.asList(1, 2),
EntryStream.merge(null, Arrays.asList(1, 2))
.collectValues(Collectors.toList())
);
Assert.assertEquals(
Arrays.asList(1, 2),
EntryStream.merge(Arrays.asList(1, 2), null)
.collectKeys(Collectors.toList())
);
}
@Test
@ -38,15 +49,21 @@ public class EntryStreamTest {
Map<String, String> map = new HashMap<>();
map.put("1", "1");
Assert.assertEquals(1, EntryStream.of(map).count());
Assert.assertEquals(0, EntryStream.of((Map<String, String>)null).count());
Set<Map.Entry<Integer, Integer>> entries = new HashSet<>();
entries.add(new Entry<>(1, 1));
entries.add(null);
Assert.assertEquals(2, EntryStream.of(entries).count());
Assert.assertEquals(0, EntryStream.of((Set<Map.Entry<Integer, Integer>>)null).count());
Assert.assertEquals(2, EntryStream.of(entries.stream()).count());
Assert.assertEquals(0, EntryStream.of((Stream<Map.Entry<Integer, Integer>>)null).count());
Assert.assertEquals(2, new EntryStream<>(entries.stream()).count());
Assert.assertThrows(NullPointerException.class, () -> new EntryStream<>(null));
Iterable<Integer> iterable = Arrays.asList(1, 2, null);
Assert.assertEquals(3, EntryStream.of(iterable, Function.identity(), Function.identity()).count());
Assert.assertEquals(0, EntryStream.of(null, Function.identity(), Function.identity()).count());
}
@Test
@ -117,11 +134,65 @@ public class EntryStreamTest {
Assert.assertEquals(
5,
EntryStream.of(Arrays.asList(1, 2, 3), Function.identity(), Function.identity())
.append(4, 4)
.append(5, 5)
.push(4, 4)
.push(5, 5)
.count()
);
}
@Test
public void testUnshift() {
Assert.assertEquals(
5,
EntryStream.of(Arrays.asList(1, 2, 3), Function.identity(), Function.identity())
.unshift(4, 4)
.unshift(5, 5)
.count()
);
}
@Test
public void testAppend() {
Map<Integer, Integer> map1 = new HashMap<Integer, Integer>(){{
put(1, 1);
put(2, 2);
}};
Map<Integer, Integer> map2 = new HashMap<Integer, Integer>(){{
put(3, 3);
put(4, 4);
}};
Assert.assertEquals(
new ArrayList<Map.Entry<Integer, Integer>>(){{
addAll(map1.entrySet());
addAll(map2.entrySet());
}},
EntryStream.of(map1).append(map2.entrySet()).toList()
);
Assert.assertEquals(
new ArrayList<>(map1.entrySet()), EntryStream.of(map1).append(null).toList()
);
}
@Test
public void testPrepend() {
Map<Integer, Integer> map1 = new HashMap<Integer, Integer>(){{
put(1, 1);
put(2, 2);
}};
Map<Integer, Integer> map2 = new HashMap<Integer, Integer>(){{
put(3, 3);
put(4, 4);
}};
Assert.assertEquals(
new ArrayList<Map.Entry<Integer, Integer>>(){{
addAll(map2.entrySet());
addAll(map1.entrySet());
}},
EntryStream.of(map1).prepend(map2.entrySet()).toList()
);
Assert.assertEquals(
new ArrayList<>(map1.entrySet()), EntryStream.of(map1).prepend(null).toList()
);
}
@Test
@ -230,6 +301,12 @@ public class EntryStreamTest {
.map((k, v) -> k.toString() + v.toString())
.collect(Collectors.toList())
);
Assert.assertEquals(
Arrays.asList("11", "22", "33"),
EntryStream.of(map)
.map(e -> e.getKey().toString() + e.getValue().toString())
.collect(Collectors.toList())
);
}
@Test
@ -457,7 +534,7 @@ public class EntryStreamTest {
Map<Integer, Integer> map = new HashMap<>();
map.put(1, null);
map.put(null, 1);
Assert.assertEquals(0, EntryStream.of(map).nonNull().count());
Assert.assertEquals(0, EntryStream.of(map).nonNullKeyValue().count());
}
@Test
@ -465,7 +542,7 @@ public class EntryStreamTest {
Map<Integer, Integer> map = new HashMap<>();
map.put(1, null);
map.put(null, 1);
Assert.assertEquals(1, EntryStream.of(map).keyNonNull().count());
Assert.assertEquals(1, EntryStream.of(map).nonNullKey().count());
}
@Test
@ -473,7 +550,7 @@ public class EntryStreamTest {
Map<Integer, Integer> map = new HashMap<>();
map.put(1, null);
map.put(null, 1);
Assert.assertEquals(1, EntryStream.of(map).valueNonNull().count());
Assert.assertEquals(1, EntryStream.of(map).nonNullValue().count());
}
private static class Entry<K, V> implements Map.Entry<K, V> {