mirror of
https://gitee.com/chinabugotech/hutool.git
synced 2025-05-09 23:51:34 +08:00
将增强流中的部分通用方法分离为接口中的默认方法
This commit is contained in:
parent
3c5313f0a7
commit
2d1255cbff
@ -0,0 +1,40 @@
|
||||
package cn.hutool.core.stream;
|
||||
|
||||
import java.util.Objects;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
/**
|
||||
* {@link StreamWrapper}的基本实现,用于包装一个已有的流实例,
|
||||
* 使其支持相对原生{@link Stream}更多的中间操作与终端操作。
|
||||
*
|
||||
* @author huangchengxing
|
||||
* @see EasyStream
|
||||
* @see EntryStream
|
||||
*/
|
||||
public abstract class AbstractEnhancedStreamWrapper<T, I extends AbstractEnhancedStreamWrapper<T, I>>
|
||||
implements TerminableStreamWrapper<T, I>, TransformableStreamWrapper<T, I> {
|
||||
|
||||
/**
|
||||
* 原始的流实例
|
||||
*/
|
||||
protected final Stream<T> stream;
|
||||
|
||||
/**
|
||||
* 获取被包装的元素流实例
|
||||
*/
|
||||
@Override
|
||||
public Stream<T> stream() {
|
||||
return stream;
|
||||
}
|
||||
|
||||
/**
|
||||
* 创建一个流包装器
|
||||
*
|
||||
* @param stream 包装的流对象
|
||||
* @throws NullPointerException 当{@code stream}为{@code null}时抛出
|
||||
*/
|
||||
protected AbstractEnhancedStreamWrapper(Stream<T> stream) {
|
||||
this.stream = Objects.requireNonNull(stream, "stream must not null");
|
||||
}
|
||||
|
||||
}
|
@ -1,23 +1,16 @@
|
||||
package cn.hutool.core.stream;
|
||||
|
||||
import cn.hutool.core.collection.ListUtil;
|
||||
import cn.hutool.core.lang.Assert;
|
||||
import cn.hutool.core.lang.Console;
|
||||
import cn.hutool.core.lang.Opt;
|
||||
import cn.hutool.core.lang.mutable.MutableInt;
|
||||
import cn.hutool.core.lang.mutable.MutableObj;
|
||||
import cn.hutool.core.map.MapUtil;
|
||||
import cn.hutool.core.text.StrUtil;
|
||||
import cn.hutool.core.util.ArrayUtil;
|
||||
import cn.hutool.core.util.ObjUtil;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.function.*;
|
||||
import java.util.stream.Collector;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
import java.util.stream.StreamSupport;
|
||||
|
||||
@ -56,11 +49,7 @@ import java.util.stream.StreamSupport;
|
||||
* @see java.util.stream.Stream
|
||||
* @since 6.0.0
|
||||
*/
|
||||
public class EasyStream<T> extends StreamWrapper<T, EasyStream<T>> implements Stream<T>, Iterable<T> {
|
||||
/**
|
||||
* 代表不存在的下标, 一般用于并行流的下标, 或者未找到元素时的下标
|
||||
*/
|
||||
private static final int NOT_FOUND_INDEX = -1;
|
||||
public class EasyStream<T> extends AbstractEnhancedStreamWrapper<T, EasyStream<T>> {
|
||||
|
||||
/**
|
||||
* 构造
|
||||
@ -266,32 +255,6 @@ public class EasyStream<T> extends StreamWrapper<T, EasyStream<T>> implements St
|
||||
return filter(e -> Objects.equals(mapper.apply(e), value));
|
||||
}
|
||||
|
||||
/**
|
||||
* 过滤元素,返回与指定断言匹配的元素组成的流,断言带下标,并行流时下标永远为-1
|
||||
* 这是一个无状态中间操作
|
||||
*
|
||||
* @param predicate 断言
|
||||
* @return 返回叠加过滤操作后的流
|
||||
*/
|
||||
public EasyStream<T> filterIdx(final BiPredicate<? super T, Integer> predicate) {
|
||||
Objects.requireNonNull(predicate);
|
||||
if (isParallel()) {
|
||||
return filter(e -> predicate.test(e, NOT_FOUND_INDEX));
|
||||
} else {
|
||||
final MutableInt index = new MutableInt(NOT_FOUND_INDEX);
|
||||
return filter(e -> predicate.test(e, index.incrementAndGet()));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 过滤掉空元素
|
||||
*
|
||||
* @return 过滤后的流
|
||||
*/
|
||||
public EasyStream<T> nonNull() {
|
||||
return new EasyStream<>(stream.filter(Objects::nonNull));
|
||||
}
|
||||
|
||||
/**
|
||||
* 返回与指定函数将元素作为参数执行的结果组成的流
|
||||
* 这是一个无状态中间操作
|
||||
@ -459,175 +422,6 @@ public class EasyStream<T> extends StreamWrapper<T, EasyStream<T>> implements St
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 返回与指定函数将元素作为参数执行后组成的流。操作带下标,并行流时下标永远为-1
|
||||
* 这是一个无状态中间操作
|
||||
* @param action 指定的函数
|
||||
* @return 返回叠加操作后的FastStream
|
||||
* @apiNote 该方法存在的意义主要是用来调试
|
||||
* 当你需要查看经过操作管道某处的元素和下标,可以执行以下操作:
|
||||
* <pre>{@code
|
||||
* .of("one", "two", "three", "four")
|
||||
* .filter(e -> e.length() > 3)
|
||||
* .peekIdx((e,i) -> System.out.println("Filtered value: " + e + " Filtered idx:" + i))
|
||||
* .map(String::toUpperCase)
|
||||
* .peekIdx((e,i) -> System.out.println("Mapped value: " + e + " Mapped idx:" + i))
|
||||
* .collect(Collectors.toList());
|
||||
* }</pre>
|
||||
*/
|
||||
public EasyStream<T> peekIdx(BiConsumer<? super T, Integer> action) {
|
||||
Objects.requireNonNull(action);
|
||||
if (isParallel()) {
|
||||
return peek(e -> action.accept(e, NOT_FOUND_INDEX));
|
||||
} else {
|
||||
AtomicInteger index = new AtomicInteger(NOT_FOUND_INDEX);
|
||||
return peek(e -> action.accept(e, index.incrementAndGet()));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 返回叠加调用{@link Console#log(Object)}打印出结果的流
|
||||
*
|
||||
* @return 返回叠加操作后的FastStream
|
||||
*/
|
||||
public EasyStream<T> log() {
|
||||
return peek(Console::log);
|
||||
}
|
||||
|
||||
/**
|
||||
* 对流里面的每一个元素执行一个操作,操作带下标,并行流时下标永远为-1
|
||||
* 这是一个终端操作
|
||||
*
|
||||
* @param action 操作
|
||||
*/
|
||||
public void forEachIdx(final BiConsumer<? super T, Integer> action) {
|
||||
Objects.requireNonNull(action);
|
||||
if (isParallel()) {
|
||||
stream.forEach(e -> action.accept(e, NOT_FOUND_INDEX));
|
||||
} else {
|
||||
final MutableInt index = new MutableInt(NOT_FOUND_INDEX);
|
||||
stream.forEach(e -> action.accept(e, index.incrementAndGet()));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 对流里面的每一个元素按照顺序执行一个操作,操作带下标,并行流时下标永远为-1
|
||||
* 这是一个终端操作
|
||||
*
|
||||
* @param action 操作
|
||||
*/
|
||||
public void forEachOrderedIdx(final BiConsumer<? super T, Integer> action) {
|
||||
Objects.requireNonNull(action);
|
||||
if (isParallel()) {
|
||||
stream.forEachOrdered(e -> action.accept(e, NOT_FOUND_INDEX));
|
||||
} else {
|
||||
final MutableInt index = new MutableInt(NOT_FOUND_INDEX);
|
||||
stream.forEachOrdered(e -> action.accept(e, index.incrementAndGet()));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取与给定断言匹配的第一个元素
|
||||
*
|
||||
* @param predicate 断言
|
||||
* @return 与给定断言匹配的第一个元素
|
||||
*/
|
||||
public Optional<T> findFirst(final Predicate<? super T> predicate) {
|
||||
return stream.filter(predicate).findFirst();
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取与给定断言匹配的第一个元素的下标,并行流下标永远为-1
|
||||
*
|
||||
* @param predicate 断言
|
||||
* @return 与给定断言匹配的第一个元素的下标,如果不存在则返回-1
|
||||
*/
|
||||
public int findFirstIdx(final Predicate<? super T> predicate) {
|
||||
Objects.requireNonNull(predicate);
|
||||
if (isParallel()) {
|
||||
return NOT_FOUND_INDEX;
|
||||
} else {
|
||||
final MutableInt index = new MutableInt(NOT_FOUND_INDEX);
|
||||
//noinspection ResultOfMethodCallIgnored
|
||||
stream.filter(e -> {
|
||||
index.increment();
|
||||
return predicate.test(e);
|
||||
}).findFirst();
|
||||
return index.get();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取最后一个元素
|
||||
*
|
||||
* @return 最后一个元素
|
||||
*/
|
||||
public Optional<T> findLast() {
|
||||
final MutableObj<T> last = new MutableObj<>(null);
|
||||
spliterator().forEachRemaining(last::set);
|
||||
return Optional.ofNullable(last.get());
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取与给定断言匹配的最后一个元素
|
||||
*
|
||||
* @param predicate 断言
|
||||
* @return 与给定断言匹配的最后一个元素
|
||||
*/
|
||||
public Optional<T> findLast(final Predicate<? super T> predicate) {
|
||||
Objects.requireNonNull(predicate);
|
||||
final MutableObj<T> last = new MutableObj<>(null);
|
||||
spliterator().forEachRemaining(e -> {
|
||||
if (predicate.test(e)) {
|
||||
last.set(e);
|
||||
}
|
||||
});
|
||||
return Optional.ofNullable(last.get());
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取与给定断言匹配的最后一个元素的下标,并行流下标永远为-1
|
||||
*
|
||||
* @param predicate 断言
|
||||
* @return 与给定断言匹配的最后一个元素的下标,如果不存在则返回-1
|
||||
*/
|
||||
public int findLastIdx(final Predicate<? super T> predicate) {
|
||||
Objects.requireNonNull(predicate);
|
||||
if (isParallel()) {
|
||||
return NOT_FOUND_INDEX;
|
||||
} else {
|
||||
final MutableInt idxRef = new MutableInt(NOT_FOUND_INDEX);
|
||||
forEachIdx((e, i) -> {
|
||||
if (predicate.test(e)) {
|
||||
idxRef.set(i);
|
||||
}
|
||||
});
|
||||
return idxRef.get();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 反转顺序
|
||||
*
|
||||
* @return 反转元素顺序
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public EasyStream<T> reverse() {
|
||||
final T[] array = (T[]) toArray();
|
||||
ArrayUtil.reverse(array);
|
||||
return of(array).parallel(isParallel()).onClose(stream::close);
|
||||
}
|
||||
|
||||
/**
|
||||
* 更改流的并行状态
|
||||
*
|
||||
* @param parallel 是否并行
|
||||
* @return 流
|
||||
*/
|
||||
public EasyStream<T> parallel(final boolean parallel) {
|
||||
return parallel ? parallel() : sequential();
|
||||
}
|
||||
|
||||
/**
|
||||
* 与给定元素组成的流合并,成为新的流
|
||||
*
|
||||
@ -638,16 +432,6 @@ public class EasyStream<T> extends StreamWrapper<T, EasyStream<T>> implements St
|
||||
return EasyStream.concat(this.stream, of(obj));
|
||||
}
|
||||
|
||||
/**
|
||||
* 与给定元素组成的流合并,成为新的流
|
||||
*
|
||||
* @param obj 元素
|
||||
* @return 流
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public EasyStream<T> push(final T... obj) {
|
||||
return EasyStream.concat(this.stream, of(obj));
|
||||
}
|
||||
|
||||
/**
|
||||
* 给定元素组成的流与当前流合并,成为新的流
|
||||
@ -659,27 +443,6 @@ public class EasyStream<T> extends StreamWrapper<T, EasyStream<T>> implements St
|
||||
return EasyStream.concat(of(obj), this.stream);
|
||||
}
|
||||
|
||||
/**
|
||||
* 给定元素组成的流与当前流合并,成为新的流
|
||||
*
|
||||
* @param obj 元素
|
||||
* @return 流
|
||||
*/
|
||||
@SafeVarargs
|
||||
public final EasyStream<T> unshift(final T... obj) {
|
||||
return EasyStream.concat(of(obj), this.stream);
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取流中指定下标的元素,如果是负数,则从最后一个开始数起
|
||||
*
|
||||
* @param idx 下标
|
||||
* @return 指定下标的元素
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public Optional<T> at(final Integer idx) {
|
||||
return Opt.ofNullable(idx).map(i -> (T) ArrayUtil.get(toArray(), i)).toOptional();
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据一个原始的流,返回一个新包装类实例
|
||||
@ -688,158 +451,10 @@ public class EasyStream<T> extends StreamWrapper<T, EasyStream<T>> implements St
|
||||
* @return 实现类
|
||||
*/
|
||||
@Override
|
||||
protected EasyStream<T> convertToStreamImpl(Stream<T> stream) {
|
||||
public EasyStream<T> wrapping(Stream<T> stream) {
|
||||
return new EasyStream<>(stream);
|
||||
}
|
||||
|
||||
/**
|
||||
* 转换成集合
|
||||
*
|
||||
* @param collectionFactory 集合工厂(可以是集合构造器)
|
||||
* @param <C> 集合类型
|
||||
* @return 集合
|
||||
*/
|
||||
public <C extends Collection<T>> C toColl(final Supplier<C> collectionFactory) {
|
||||
return collect(Collectors.toCollection(collectionFactory));
|
||||
}
|
||||
|
||||
/**
|
||||
* 转换为ArrayList
|
||||
*
|
||||
* @return list
|
||||
*/
|
||||
public List<T> toList() {
|
||||
return collect(Collectors.toList());
|
||||
}
|
||||
|
||||
/**
|
||||
* 转换为HashSet
|
||||
*
|
||||
* @return hashSet
|
||||
*/
|
||||
public Set<T> toSet() {
|
||||
return collect(Collectors.toSet());
|
||||
}
|
||||
|
||||
/**
|
||||
* 与给定的可迭代对象转换成Map,key为现有元素,value为给定可迭代对象迭代的元素<br>
|
||||
* Map的大小与两个集合中较小的数量一致, 即, 只合并下标位置相同的部分
|
||||
*
|
||||
* @param other 可迭代对象
|
||||
* @param <R> 可迭代对象迭代的元素类型
|
||||
* @return map,key为现有元素,value为给定可迭代对象迭代的元素
|
||||
*/
|
||||
public <R> Map<T, R> toZip(final Iterable<R> other) {
|
||||
final Spliterator<T> keys = spliterator();
|
||||
final Spliterator<R> values = Opt.ofNullable(other).map(Iterable::spliterator).orElseGet(Spliterators::emptySpliterator);
|
||||
// 获取两个Spliterator的中较小的数量
|
||||
// 如果Spliterator经过流操作, getExactSizeIfKnown()可能会返回-1, 所以默认大小为 MapUtil.DEFAULT_INITIAL_CAPACITY
|
||||
final int sizeIfKnown = (int) Math.max(Math.min(keys.getExactSizeIfKnown(), values.getExactSizeIfKnown()), MapUtil.DEFAULT_INITIAL_CAPACITY);
|
||||
final Map<T, R> map = MapUtil.newHashMap(sizeIfKnown);
|
||||
// 保存第一个Spliterator的值
|
||||
final MutableObj<T> key = new MutableObj<>();
|
||||
// 保存第二个Spliterator的值
|
||||
final MutableObj<R> value = new MutableObj<>();
|
||||
// 当两个Spliterator中都还有剩余元素时
|
||||
while (keys.tryAdvance(key::set) && values.tryAdvance(value::set)) {
|
||||
map.put(key.get(), value.get());
|
||||
}
|
||||
return map;
|
||||
}
|
||||
|
||||
/**
|
||||
* 返回拼接后的字符串
|
||||
*
|
||||
* @return 拼接后的字符串
|
||||
*/
|
||||
public String join() {
|
||||
return join(StrUtil.EMPTY);
|
||||
}
|
||||
|
||||
/**
|
||||
* 返回拼接后的字符串
|
||||
*
|
||||
* @param delimiter 分隔符
|
||||
* @return 拼接后的字符串
|
||||
*/
|
||||
public String join(final CharSequence delimiter) {
|
||||
return join(delimiter, StrUtil.EMPTY, StrUtil.EMPTY);
|
||||
}
|
||||
|
||||
/**
|
||||
* 返回拼接后的字符串
|
||||
*
|
||||
* @param delimiter 分隔符
|
||||
* @param prefix 前缀
|
||||
* @param suffix 后缀
|
||||
* @return 拼接后的字符串
|
||||
*/
|
||||
public String join(final CharSequence delimiter,
|
||||
final CharSequence prefix,
|
||||
final CharSequence suffix) {
|
||||
return map(String::valueOf).collect(Collectors.joining(delimiter, prefix, suffix));
|
||||
}
|
||||
|
||||
/**
|
||||
* 转换为map,key为给定操作执行后的返回值,value为当前元素
|
||||
*
|
||||
* @param keyMapper 指定的key操作
|
||||
* @param <K> key类型
|
||||
* @return map
|
||||
*/
|
||||
public <K> Map<K, T> toMap(final Function<? super T, ? extends K> keyMapper) {
|
||||
return toMap(keyMapper, Function.identity());
|
||||
}
|
||||
|
||||
/**
|
||||
* 转换为map,key,value为给定操作执行后的返回值
|
||||
*
|
||||
* @param keyMapper 指定的key操作
|
||||
* @param valueMapper 指定value操作
|
||||
* @param <K> key类型
|
||||
* @param <U> value类型
|
||||
* @return map
|
||||
*/
|
||||
public <K, U> Map<K, U> toMap(final Function<? super T, ? extends K> keyMapper,
|
||||
final Function<? super T, ? extends U> valueMapper) {
|
||||
return toMap(keyMapper, valueMapper, (l, r) -> r);
|
||||
}
|
||||
|
||||
/**
|
||||
* 转换为map,key,value为给定操作执行后的返回值
|
||||
*
|
||||
* @param keyMapper 指定的key操作
|
||||
* @param valueMapper 指定value操作
|
||||
* @param mergeFunction 合并操作
|
||||
* @param <K> key类型
|
||||
* @param <U> value类型
|
||||
* @return map
|
||||
*/
|
||||
public <K, U> Map<K, U> toMap(final Function<? super T, ? extends K> keyMapper,
|
||||
final Function<? super T, ? extends U> valueMapper,
|
||||
final BinaryOperator<U> mergeFunction) {
|
||||
return toMap(keyMapper, valueMapper, mergeFunction, HashMap::new);
|
||||
}
|
||||
|
||||
/**
|
||||
* 转换为map,key,value为给定操作执行后的返回值
|
||||
*
|
||||
* @param keyMapper 指定的key操作
|
||||
* @param valueMapper 指定value操作
|
||||
* @param mergeFunction 合并操作
|
||||
* @param mapSupplier map工厂
|
||||
* @param <K> key类型
|
||||
* @param <U> value类型
|
||||
* @param <M> map类型
|
||||
* @return map
|
||||
*/
|
||||
public <K, U, M extends Map<K, U>> M toMap(final Function<? super T, ? extends K> keyMapper,
|
||||
final Function<? super T, ? extends U> valueMapper,
|
||||
final BinaryOperator<U> mergeFunction,
|
||||
final Supplier<M> mapSupplier) {
|
||||
return collect(CollectorUtil.toMap(keyMapper, valueMapper, mergeFunction, mapSupplier));
|
||||
}
|
||||
|
||||
/**
|
||||
* 将集合转换为树,默认用 {@code parentId == null} 作为顶部,内置一个小递归
|
||||
* 因为需要在当前传入数据里查找,所以这是一个结束操作
|
||||
@ -929,51 +544,6 @@ public class EasyStream<T> extends StreamWrapper<T, EasyStream<T>> implements St
|
||||
return flat(recursive).peek(e -> childrenSetter.accept(e, null));
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 通过给定分组依据进行分组
|
||||
*
|
||||
* @param classifier 分组依据
|
||||
* @param <K> 实体中的分组依据对应类型,也是Map中key的类型
|
||||
* @return {@link Collector}
|
||||
*/
|
||||
public <K> Map<K, List<T>> group(final Function<? super T, ? extends K> classifier) {
|
||||
return group(classifier, Collectors.toList());
|
||||
}
|
||||
|
||||
/**
|
||||
* 通过给定分组依据进行分组
|
||||
*
|
||||
* @param classifier 分组依据
|
||||
* @param downstream 下游操作
|
||||
* @param <K> 实体中的分组依据对应类型,也是Map中key的类型
|
||||
* @param <D> 下游操作对应返回类型,也是Map中value的类型
|
||||
* @param <A> 下游操作在进行中间操作时对应类型
|
||||
* @return {@link Collector}
|
||||
*/
|
||||
public <K, A, D> Map<K, D> group(final Function<? super T, ? extends K> classifier,
|
||||
final Collector<? super T, A, D> downstream) {
|
||||
return group(classifier, HashMap::new, downstream);
|
||||
}
|
||||
|
||||
/**
|
||||
* 通过给定分组依据进行分组
|
||||
*
|
||||
* @param classifier 分组依据
|
||||
* @param mapFactory 提供的map
|
||||
* @param downstream 下游操作
|
||||
* @param <K> 实体中的分组依据对应类型,也是Map中key的类型
|
||||
* @param <D> 下游操作对应返回类型,也是Map中value的类型
|
||||
* @param <A> 下游操作在进行中间操作时对应类型
|
||||
* @param <M> 最后返回结果Map类型
|
||||
* @return {@link Collector}
|
||||
*/
|
||||
public <K, D, A, M extends Map<K, D>> M group(final Function<? super T, ? extends K> classifier,
|
||||
final Supplier<M> mapFactory,
|
||||
final Collector<? super T, A, D> downstream) {
|
||||
return collect(CollectorUtil.groupingBy(classifier, mapFactory, downstream));
|
||||
}
|
||||
|
||||
/**
|
||||
* 将 现有元素 与 给定迭代器中对应位置的元素 使用 zipper 转换为新的元素,并返回新元素组成的流<br>
|
||||
* 新流的数量为两个集合中较小的数量, 即, 只合并下标位置相同的部分<br>
|
||||
@ -1004,21 +574,6 @@ public class EasyStream<T> extends StreamWrapper<T, EasyStream<T>> implements St
|
||||
return of(list).parallel(isParallel()).onClose(stream::close);
|
||||
}
|
||||
|
||||
/**
|
||||
* 类似js的<a href="https://developer.mozilla.org/zh-CN/docs/Web/JavaScript/Reference/Global_Objects/Array/splice">splice</a>函数
|
||||
*
|
||||
* @param start 起始下标
|
||||
* @param deleteCount 删除个数,正整数
|
||||
* @param items 放入值
|
||||
* @return 操作后的流
|
||||
*/
|
||||
@SafeVarargs
|
||||
public final EasyStream<T> splice(final int start, final int deleteCount, final T... items) {
|
||||
return of(ListUtil.splice(toList(), start, deleteCount, items))
|
||||
.parallel(isParallel())
|
||||
.onClose(stream::close);
|
||||
}
|
||||
|
||||
/**
|
||||
* 按指定长度切分为双层流
|
||||
* <p>
|
||||
@ -1055,85 +610,6 @@ public class EasyStream<T> extends StreamWrapper<T, EasyStream<T>> implements St
|
||||
return split(batchSize).map(EasyStream::toList);
|
||||
}
|
||||
|
||||
/**
|
||||
* 保留 与指定断言 匹配时的元素, 在第一次不匹配时终止, 抛弃当前(第一个不匹配元素)及后续所有元素
|
||||
* <p>与 jdk9 中的 takeWhile 方法不太一样, 这里的实现是个 顺序的、有状态的中间操作</p>
|
||||
* <pre>本环节中是顺序执行的, 但是后续操作可以支持并行流: {@code
|
||||
* FastStream.iterate(1, i -> i + 1)
|
||||
* .parallel()
|
||||
* // 顺序执行
|
||||
* .takeWhile(e -> e < 50)
|
||||
* // 并发
|
||||
* .map(e -> e + 1)
|
||||
* // 并发
|
||||
* .map(String::valueOf)
|
||||
* .toList();
|
||||
* }</pre>
|
||||
* <p>但是不建议在并行流中使用, 除非你确定 takeWhile 之后的操作能在并行流中受益很多</p>
|
||||
*
|
||||
* @param predicate 断言
|
||||
* @return 与指定断言匹配的元素组成的流
|
||||
*/
|
||||
public EasyStream<T> takeWhile(final Predicate<? super T> predicate) {
|
||||
Objects.requireNonNull(predicate);
|
||||
return of(StreamUtil.takeWhile(stream, predicate));
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 删除 与指定断言 匹配的元素, 在第一次不匹配时终止, 返回当前(第一个不匹配元素)及剩余元素组成的新流
|
||||
* <p>与 jdk9 中的 dropWhile 方法不太一样, 这里的实现是个 顺序的、有状态的中间操作</p>
|
||||
* <pre>本环节中是顺序执行的, 但是后续操作可以支持并行流: {@code
|
||||
* FastStream.iterate(1, i <= 100, i -> i + 1)
|
||||
* .parallel()
|
||||
* // 顺序执行
|
||||
* .dropWhile(e -> e < 50)
|
||||
* // 并发
|
||||
* .map(e -> e + 1)
|
||||
* // 并发
|
||||
* .map(String::valueOf)
|
||||
* .toList();
|
||||
* }</pre>
|
||||
* <p>但是不建议在并行流中使用, 除非你确定 dropWhile 之后的操作能在并行流中受益很多</p>
|
||||
*
|
||||
* @param predicate 断言
|
||||
* @return 剩余元素组成的流
|
||||
*/
|
||||
public EasyStream<T> dropWhile(final Predicate<? super T> predicate) {
|
||||
Objects.requireNonNull(predicate);
|
||||
return of(StreamUtil.dropWhile(stream, predicate));
|
||||
}
|
||||
|
||||
/**
|
||||
* 流是否为空
|
||||
*
|
||||
* @return 流是否为空
|
||||
*/
|
||||
public boolean isEmpty() {
|
||||
return !findAny().isPresent();
|
||||
}
|
||||
|
||||
/**
|
||||
* 流是否不为空
|
||||
*
|
||||
* @return 流是否不为空
|
||||
*/
|
||||
public boolean isNotEmpty() {
|
||||
return !isEmpty();
|
||||
}
|
||||
|
||||
/**
|
||||
* 将当前流转为另一对象。用于提供针对流本身而非流中元素的操作
|
||||
*
|
||||
* @param <R> 转换类型
|
||||
* @param transform 转换
|
||||
* @return 转换后的流
|
||||
*/
|
||||
public <R> Optional<R> transform(final Function<EasyStream<T>, R> transform) {
|
||||
Assert.notNull(transform, "transform must not null");
|
||||
return Optional.ofNullable(transform.apply(this));
|
||||
}
|
||||
|
||||
public interface FastStreamBuilder<T> extends Consumer<T>, cn.hutool.core.builder.Builder<EasyStream<T>> {
|
||||
|
||||
/**
|
||||
|
@ -22,7 +22,7 @@ import java.util.stream.StreamSupport;
|
||||
* @param <V> 值类型
|
||||
* @author huangchengxing
|
||||
*/
|
||||
public class EntryStream<K, V> extends StreamWrapper<Map.Entry<K, V>, EntryStream<K, V>> {
|
||||
public class EntryStream<K, V> extends AbstractEnhancedStreamWrapper<Map.Entry<K, V>, EntryStream<K, V>> {
|
||||
|
||||
/**
|
||||
* 默认的空键值对
|
||||
@ -157,7 +157,7 @@ public class EntryStream<K, V> extends StreamWrapper<Map.Entry<K, V>, EntryStrea
|
||||
*/
|
||||
public EntryStream<K, V> distinctByKey() {
|
||||
Set<K> accessed = new ConcurrentHashSet<>(16);
|
||||
return convertToStreamImpl(stream.filter(e -> {
|
||||
return wrapping(stream.filter(e -> {
|
||||
K key = e.getKey();
|
||||
if (accessed.contains(key)) {
|
||||
return false;
|
||||
@ -174,7 +174,7 @@ public class EntryStream<K, V> extends StreamWrapper<Map.Entry<K, V>, EntryStrea
|
||||
*/
|
||||
public EntryStream<K, V> distinctByValue() {
|
||||
Set<V> accessed = new ConcurrentHashSet<>(16);
|
||||
return convertToStreamImpl(stream.filter(e -> {
|
||||
return wrapping(stream.filter(e -> {
|
||||
V val = e.getValue();
|
||||
if (accessed.contains(val)) {
|
||||
return false;
|
||||
@ -299,7 +299,7 @@ public class EntryStream<K, V> extends StreamWrapper<Map.Entry<K, V>, EntryStrea
|
||||
* @return {@link EntryStream}实例
|
||||
*/
|
||||
public EntryStream<K, V> append(K key, V value) {
|
||||
return convertToStreamImpl(Stream.concat(stream, Stream.of(ofEntry(key, value))));
|
||||
return wrapping(Stream.concat(stream, Stream.of(ofEntry(key, value))));
|
||||
}
|
||||
|
||||
/**
|
||||
@ -310,7 +310,7 @@ public class EntryStream<K, V> extends StreamWrapper<Map.Entry<K, V>, EntryStrea
|
||||
* @return {@link EntryStream}实例
|
||||
*/
|
||||
public EntryStream<K, V> prepend(K key, V value) {
|
||||
return convertToStreamImpl(Stream.concat(Stream.of(ofEntry(key, value)), stream));
|
||||
return wrapping(Stream.concat(Stream.of(ofEntry(key, value)), stream));
|
||||
}
|
||||
|
||||
/**
|
||||
@ -325,7 +325,7 @@ public class EntryStream<K, V> extends StreamWrapper<Map.Entry<K, V>, EntryStrea
|
||||
}
|
||||
final Stream<Map.Entry<K, V>> contacted = StreamSupport.stream(entries.spliterator(), isParallel())
|
||||
.map(EntryStream::ofEntry);
|
||||
return convertToStreamImpl(Stream.concat(stream, contacted));
|
||||
return wrapping(Stream.concat(stream, contacted));
|
||||
}
|
||||
|
||||
/**
|
||||
@ -340,7 +340,7 @@ public class EntryStream<K, V> extends StreamWrapper<Map.Entry<K, V>, EntryStrea
|
||||
}
|
||||
final Stream<Map.Entry<K, V>> contacted = StreamSupport.stream(entries.spliterator(), isParallel())
|
||||
.map(EntryStream::ofEntry);
|
||||
return convertToStreamImpl(Stream.concat(contacted, stream));
|
||||
return wrapping(Stream.concat(contacted, stream));
|
||||
}
|
||||
|
||||
/**
|
||||
@ -744,7 +744,7 @@ public class EntryStream<K, V> extends StreamWrapper<Map.Entry<K, V>, EntryStrea
|
||||
* @return 实现类
|
||||
*/
|
||||
@Override
|
||||
public EntryStream<K, V> convertToStreamImpl(Stream<Map.Entry<K, V>> stream) {
|
||||
public EntryStream<K, V> wrapping(Stream<Map.Entry<K, V>> stream) {
|
||||
return new EntryStream<>(stream);
|
||||
}
|
||||
|
||||
|
@ -0,0 +1,55 @@
|
||||
package cn.hutool.core.stream;
|
||||
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
/**
|
||||
* {@link AbstractEnhancedStreamWrapper}的基本实现
|
||||
*
|
||||
* @author huangchengxing
|
||||
*/
|
||||
public class SimpleStreamWrapper<T> extends AbstractEnhancedStreamWrapper<T, SimpleStreamWrapper<T>> {
|
||||
|
||||
/**
|
||||
* 创建一个流包装器
|
||||
*
|
||||
* @param stream 包装的流对象
|
||||
* @throws NullPointerException 当{@code stream}为{@code null}时抛出
|
||||
*/
|
||||
SimpleStreamWrapper(Stream<T> stream) {
|
||||
super(stream);
|
||||
}
|
||||
|
||||
/**
|
||||
* 将一个流包装为{@link SimpleStreamWrapper}
|
||||
*
|
||||
* @param source 被包装的流
|
||||
* @return 包装后的流
|
||||
*/
|
||||
@Override
|
||||
public SimpleStreamWrapper<T> wrapping(Stream<T> source) {
|
||||
return new SimpleStreamWrapper<>(source);
|
||||
}
|
||||
|
||||
/**
|
||||
* 将当前流中元素映射为另一类型
|
||||
*
|
||||
* @param mapper 映射方法
|
||||
* @return 映射后的流
|
||||
*/
|
||||
@Override
|
||||
public <R> SimpleStreamWrapper<R> map(Function<? super T, ? extends R> mapper) {
|
||||
return new SimpleStreamWrapper<>(stream().map(mapper));
|
||||
}
|
||||
|
||||
/**
|
||||
* 将当前流中元素展开为流,然后返回由这些新流中元素组成的流
|
||||
*
|
||||
* @param mapper 映射方法
|
||||
* @return 映射后的流
|
||||
*/
|
||||
@Override
|
||||
public <R> SimpleStreamWrapper<R> flatMap(Function<? super T, ? extends Stream<? extends R>> mapper) {
|
||||
return new SimpleStreamWrapper<>(stream().flatMap(mapper));
|
||||
}
|
||||
}
|
@ -1,32 +1,57 @@
|
||||
package cn.hutool.core.stream;
|
||||
|
||||
import java.util.*;
|
||||
import cn.hutool.core.util.ObjUtil;
|
||||
|
||||
import java.util.Comparator;
|
||||
import java.util.Iterator;
|
||||
import java.util.Optional;
|
||||
import java.util.Spliterator;
|
||||
import java.util.function.*;
|
||||
import java.util.stream.*;
|
||||
|
||||
/**
|
||||
* {@link Stream}的包装类,用于基于一个已有的流实例进行扩展
|
||||
* <p>表示一个用于增强原始{@link Stream}对象的包装器,当调用{@link Stream}中的方法时,
|
||||
* 将会代理到被包装的原始流对象,并返回指定的包装器实例。
|
||||
*
|
||||
* @param <T> 流中的元素类型
|
||||
* @param <I> 链式调用获得的实现类类型
|
||||
* @author huangchengxing
|
||||
* @see EasyStream
|
||||
* @see TerminableStreamWrapper
|
||||
* @see TransformableStreamWrapper
|
||||
* @see AbstractEnhancedStreamWrapper
|
||||
*/
|
||||
abstract class StreamWrapper<T, I extends Stream<T>> implements Stream<T>, Iterable<T> {
|
||||
public interface StreamWrapper<T, I extends StreamWrapper<T, I>> extends Stream<T>, Iterable<T> {
|
||||
|
||||
/**
|
||||
* 原始的流实例
|
||||
* 代表不存在的下标, 一般用于并行流的下标, 或者未找到元素时的下标
|
||||
*/
|
||||
protected final Stream<T> stream;
|
||||
int NOT_FOUND_INDEX = -1;
|
||||
|
||||
/**
|
||||
* 创建一个流包装器
|
||||
* 将一个流包装为简单增强流,若{@code stream}为{@code null}则默认返回一个空串行流
|
||||
*
|
||||
* @param stream 包装的流对象
|
||||
* @param stream 被包装的流
|
||||
* @return {@link SimpleStreamWrapper}实例
|
||||
*/
|
||||
protected StreamWrapper(Stream<T> stream) {
|
||||
Objects.requireNonNull(stream, "stream must not null");
|
||||
this.stream = stream;
|
||||
static <T> SimpleStreamWrapper<T> create(Stream<T> stream) {
|
||||
return new SimpleStreamWrapper<>(ObjUtil.defaultIfNull(stream, Stream.empty()));
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取被包装的原始流
|
||||
*
|
||||
* @return 被包装的原始流
|
||||
*/
|
||||
Stream<T> stream();
|
||||
|
||||
/**
|
||||
* 将一个原始流包装为指定类型的增强流
|
||||
*
|
||||
* @param source 被包装的流
|
||||
* @return I
|
||||
*/
|
||||
I wrapping(Stream<T> source);
|
||||
|
||||
/**
|
||||
* 过滤元素,返回与指定断言匹配的元素组成的流
|
||||
* 这是一个无状态中间操作
|
||||
@ -35,8 +60,8 @@ abstract class StreamWrapper<T, I extends Stream<T>> implements Stream<T>, Itera
|
||||
* @return 返回叠加过滤操作后的流
|
||||
*/
|
||||
@Override
|
||||
public I filter(Predicate<? super T> predicate) {
|
||||
return convertToStreamImpl(stream.filter(predicate));
|
||||
default I filter(Predicate<? super T> predicate) {
|
||||
return wrapping(stream().filter(predicate));
|
||||
}
|
||||
|
||||
/**
|
||||
@ -47,8 +72,8 @@ abstract class StreamWrapper<T, I extends Stream<T>> implements Stream<T>, Itera
|
||||
* @return 叠加操作后元素类型全为int的流
|
||||
*/
|
||||
@Override
|
||||
public IntStream mapToInt(ToIntFunction<? super T> mapper) {
|
||||
return stream.mapToInt(mapper);
|
||||
default IntStream mapToInt(ToIntFunction<? super T> mapper) {
|
||||
return stream().mapToInt(mapper);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -59,8 +84,8 @@ abstract class StreamWrapper<T, I extends Stream<T>> implements Stream<T>, Itera
|
||||
* @return 叠加操作后元素类型全为long的流
|
||||
*/
|
||||
@Override
|
||||
public LongStream mapToLong(ToLongFunction<? super T> mapper) {
|
||||
return stream.mapToLong(mapper);
|
||||
default LongStream mapToLong(ToLongFunction<? super T> mapper) {
|
||||
return stream().mapToLong(mapper);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -71,8 +96,8 @@ abstract class StreamWrapper<T, I extends Stream<T>> implements Stream<T>, Itera
|
||||
* @return 叠加操作后元素类型全为double的流
|
||||
*/
|
||||
@Override
|
||||
public DoubleStream mapToDouble(ToDoubleFunction<? super T> mapper) {
|
||||
return stream.mapToDouble(mapper);
|
||||
default DoubleStream mapToDouble(ToDoubleFunction<? super T> mapper) {
|
||||
return stream().mapToDouble(mapper);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -83,8 +108,8 @@ abstract class StreamWrapper<T, I extends Stream<T>> implements Stream<T>, Itera
|
||||
* @return 返回叠加拆分操作后的IntStream
|
||||
*/
|
||||
@Override
|
||||
public IntStream flatMapToInt(Function<? super T, ? extends IntStream> mapper) {
|
||||
return stream.flatMapToInt(mapper);
|
||||
default IntStream flatMapToInt(Function<? super T, ? extends IntStream> mapper) {
|
||||
return stream().flatMapToInt(mapper);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -95,8 +120,8 @@ abstract class StreamWrapper<T, I extends Stream<T>> implements Stream<T>, Itera
|
||||
* @return 返回叠加拆分操作后的LongStream
|
||||
*/
|
||||
@Override
|
||||
public LongStream flatMapToLong(Function<? super T, ? extends LongStream> mapper) {
|
||||
return stream.flatMapToLong(mapper);
|
||||
default LongStream flatMapToLong(Function<? super T, ? extends LongStream> mapper) {
|
||||
return stream().flatMapToLong(mapper);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -107,8 +132,8 @@ abstract class StreamWrapper<T, I extends Stream<T>> implements Stream<T>, Itera
|
||||
* @return 返回叠加拆分操作后的DoubleStream
|
||||
*/
|
||||
@Override
|
||||
public DoubleStream flatMapToDouble(Function<? super T, ? extends DoubleStream> mapper) {
|
||||
return stream.flatMapToDouble(mapper);
|
||||
default DoubleStream flatMapToDouble(Function<? super T, ? extends DoubleStream> mapper) {
|
||||
return stream().flatMapToDouble(mapper);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -118,8 +143,8 @@ abstract class StreamWrapper<T, I extends Stream<T>> implements Stream<T>, Itera
|
||||
* @return 一个具有去重特征的流
|
||||
*/
|
||||
@Override
|
||||
public I distinct() {
|
||||
return convertToStreamImpl(stream.distinct());
|
||||
default I distinct() {
|
||||
return wrapping(stream().distinct());
|
||||
}
|
||||
|
||||
/**
|
||||
@ -131,8 +156,8 @@ abstract class StreamWrapper<T, I extends Stream<T>> implements Stream<T>, Itera
|
||||
* @return 一个元素按自然顺序排序的流
|
||||
*/
|
||||
@Override
|
||||
public I sorted() {
|
||||
return convertToStreamImpl(stream.sorted());
|
||||
default I sorted() {
|
||||
return wrapping(stream().sorted());
|
||||
}
|
||||
|
||||
/**
|
||||
@ -145,8 +170,8 @@ abstract class StreamWrapper<T, I extends Stream<T>> implements Stream<T>, Itera
|
||||
* @return 一个元素按指定的Comparator排序的流
|
||||
*/
|
||||
@Override
|
||||
public I sorted(Comparator<? super T> comparator) {
|
||||
return convertToStreamImpl(stream.sorted(comparator));
|
||||
default I sorted(Comparator<? super T> comparator) {
|
||||
return wrapping(stream().sorted(comparator));
|
||||
}
|
||||
|
||||
/**
|
||||
@ -167,8 +192,8 @@ abstract class StreamWrapper<T, I extends Stream<T>> implements Stream<T>, Itera
|
||||
* }</pre>
|
||||
*/
|
||||
@Override
|
||||
public I peek(Consumer<? super T> action) {
|
||||
return convertToStreamImpl(stream.peek(action));
|
||||
default I peek(Consumer<? super T> action) {
|
||||
return wrapping(stream().peek(action));
|
||||
}
|
||||
|
||||
/**
|
||||
@ -179,8 +204,8 @@ abstract class StreamWrapper<T, I extends Stream<T>> implements Stream<T>, Itera
|
||||
* @return 截取后的流
|
||||
*/
|
||||
@Override
|
||||
public I limit(long maxSize) {
|
||||
return convertToStreamImpl(stream.limit(maxSize));
|
||||
default I limit(long maxSize) {
|
||||
return wrapping(stream().limit(maxSize));
|
||||
}
|
||||
|
||||
/**
|
||||
@ -191,8 +216,8 @@ abstract class StreamWrapper<T, I extends Stream<T>> implements Stream<T>, Itera
|
||||
* @return 丢弃前面n个元素后的剩余元素组成的流
|
||||
*/
|
||||
@Override
|
||||
public I skip(long n) {
|
||||
return convertToStreamImpl(stream.skip(n));
|
||||
default I skip(long n) {
|
||||
return wrapping(stream().skip(n));
|
||||
}
|
||||
|
||||
/**
|
||||
@ -202,8 +227,8 @@ abstract class StreamWrapper<T, I extends Stream<T>> implements Stream<T>, Itera
|
||||
* @param action 操作
|
||||
*/
|
||||
@Override
|
||||
public void forEach(Consumer<? super T> action) {
|
||||
stream.forEach(action);
|
||||
default void forEach(Consumer<? super T> action) {
|
||||
stream().forEach(action);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -213,8 +238,8 @@ abstract class StreamWrapper<T, I extends Stream<T>> implements Stream<T>, Itera
|
||||
* @param action 操作
|
||||
*/
|
||||
@Override
|
||||
public void forEachOrdered(Consumer<? super T> action) {
|
||||
stream.forEachOrdered(action);
|
||||
default void forEachOrdered(Consumer<? super T> action) {
|
||||
stream().forEachOrdered(action);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -224,8 +249,8 @@ abstract class StreamWrapper<T, I extends Stream<T>> implements Stream<T>, Itera
|
||||
* @return 包含此流元素的数组
|
||||
*/
|
||||
@Override
|
||||
public Object[] toArray() {
|
||||
return stream.toArray();
|
||||
default Object[] toArray() {
|
||||
return stream().toArray();
|
||||
}
|
||||
|
||||
/**
|
||||
@ -238,8 +263,8 @@ abstract class StreamWrapper<T, I extends Stream<T>> implements Stream<T>, Itera
|
||||
* @throws ArrayStoreException 如果元素转换失败,例如不是该元素类型及其父类,则抛出该异常
|
||||
*/
|
||||
@Override
|
||||
public <A> A[] toArray(IntFunction<A[]> generator) {
|
||||
return stream.toArray(generator);
|
||||
default <A> A[] toArray(IntFunction<A[]> generator) {
|
||||
return stream().toArray(generator);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -263,8 +288,8 @@ abstract class StreamWrapper<T, I extends Stream<T>> implements Stream<T>, Itera
|
||||
* @return 聚合计算后的值
|
||||
*/
|
||||
@Override
|
||||
public T reduce(T identity, BinaryOperator<T> accumulator) {
|
||||
return stream.reduce(identity, accumulator);
|
||||
default T reduce(T identity, BinaryOperator<T> accumulator) {
|
||||
return stream().reduce(identity, accumulator);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -298,8 +323,8 @@ abstract class StreamWrapper<T, I extends Stream<T>> implements Stream<T>, Itera
|
||||
* @see #max(Comparator)
|
||||
*/
|
||||
@Override
|
||||
public Optional<T> reduce(BinaryOperator<T> accumulator) {
|
||||
return stream.reduce(accumulator);
|
||||
default Optional<T> reduce(BinaryOperator<T> accumulator) {
|
||||
return stream().reduce(accumulator);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -315,8 +340,8 @@ abstract class StreamWrapper<T, I extends Stream<T>> implements Stream<T>, Itera
|
||||
* @see #reduce(Object, BinaryOperator)
|
||||
*/
|
||||
@Override
|
||||
public <U> U reduce(U identity, BiFunction<U, ? super T, U> accumulator, BinaryOperator<U> combiner) {
|
||||
return stream.reduce(identity, accumulator, combiner);
|
||||
default <U> U reduce(U identity, BiFunction<U, ? super T, U> accumulator, BinaryOperator<U> combiner) {
|
||||
return stream().reduce(identity, accumulator, combiner);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -333,8 +358,8 @@ abstract class StreamWrapper<T, I extends Stream<T>> implements Stream<T>, Itera
|
||||
* }</pre>
|
||||
*/
|
||||
@Override
|
||||
public <R> R collect(Supplier<R> supplier, BiConsumer<R, ? super T> accumulator, BiConsumer<R, R> combiner) {
|
||||
return stream.collect(supplier, accumulator, combiner);
|
||||
default <R> R collect(Supplier<R> supplier, BiConsumer<R, ? super T> accumulator, BiConsumer<R, R> combiner) {
|
||||
return stream().collect(supplier, accumulator, combiner);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -347,8 +372,8 @@ abstract class StreamWrapper<T, I extends Stream<T>> implements Stream<T>, Itera
|
||||
* @return 收集后的容器
|
||||
*/
|
||||
@Override
|
||||
public <R, A> R collect(Collector<? super T, A, R> collector) {
|
||||
return stream.collect(collector);
|
||||
default <R, A> R collect(Collector<? super T, A, R> collector) {
|
||||
return stream().collect(collector);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -358,8 +383,8 @@ abstract class StreamWrapper<T, I extends Stream<T>> implements Stream<T>, Itera
|
||||
* @return 最小值
|
||||
*/
|
||||
@Override
|
||||
public Optional<T> min(Comparator<? super T> comparator) {
|
||||
return stream.min(comparator);
|
||||
default Optional<T> min(Comparator<? super T> comparator) {
|
||||
return stream().min(comparator);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -369,8 +394,8 @@ abstract class StreamWrapper<T, I extends Stream<T>> implements Stream<T>, Itera
|
||||
* @return 最大值
|
||||
*/
|
||||
@Override
|
||||
public Optional<T> max(Comparator<? super T> comparator) {
|
||||
return stream.max(comparator);
|
||||
default Optional<T> max(Comparator<? super T> comparator) {
|
||||
return stream().max(comparator);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -379,8 +404,8 @@ abstract class StreamWrapper<T, I extends Stream<T>> implements Stream<T>, Itera
|
||||
* @return 流元素个数
|
||||
*/
|
||||
@Override
|
||||
public long count() {
|
||||
return stream.count();
|
||||
default long count() {
|
||||
return stream().count();
|
||||
}
|
||||
|
||||
/**
|
||||
@ -390,8 +415,8 @@ abstract class StreamWrapper<T, I extends Stream<T>> implements Stream<T>, Itera
|
||||
* @return 是否有任何一个元素满足给定断言
|
||||
*/
|
||||
@Override
|
||||
public boolean anyMatch(Predicate<? super T> predicate) {
|
||||
return stream.anyMatch(predicate);
|
||||
default boolean anyMatch(Predicate<? super T> predicate) {
|
||||
return stream().anyMatch(predicate);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -401,8 +426,8 @@ abstract class StreamWrapper<T, I extends Stream<T>> implements Stream<T>, Itera
|
||||
* @return 是否所有元素满足给定断言
|
||||
*/
|
||||
@Override
|
||||
public boolean allMatch(Predicate<? super T> predicate) {
|
||||
return stream.allMatch(predicate);
|
||||
default boolean allMatch(Predicate<? super T> predicate) {
|
||||
return stream().allMatch(predicate);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -412,8 +437,8 @@ abstract class StreamWrapper<T, I extends Stream<T>> implements Stream<T>, Itera
|
||||
* @return 是否没有元素满足给定断言
|
||||
*/
|
||||
@Override
|
||||
public boolean noneMatch(Predicate<? super T> predicate) {
|
||||
return stream.noneMatch(predicate);
|
||||
default boolean noneMatch(Predicate<? super T> predicate) {
|
||||
return stream().noneMatch(predicate);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -422,8 +447,8 @@ abstract class StreamWrapper<T, I extends Stream<T>> implements Stream<T>, Itera
|
||||
* @return 第一个元素
|
||||
*/
|
||||
@Override
|
||||
public Optional<T> findFirst() {
|
||||
return stream.findFirst();
|
||||
default Optional<T> findFirst() {
|
||||
return stream().findFirst();
|
||||
}
|
||||
|
||||
/**
|
||||
@ -432,8 +457,8 @@ abstract class StreamWrapper<T, I extends Stream<T>> implements Stream<T>, Itera
|
||||
* @return 随便取一个
|
||||
*/
|
||||
@Override
|
||||
public Optional<T> findAny() {
|
||||
return stream.findAny();
|
||||
default Optional<T> findAny() {
|
||||
return stream().findAny();
|
||||
}
|
||||
|
||||
/**
|
||||
@ -442,8 +467,8 @@ abstract class StreamWrapper<T, I extends Stream<T>> implements Stream<T>, Itera
|
||||
* @return 流的迭代器
|
||||
*/
|
||||
@Override
|
||||
public Iterator<T> iterator() {
|
||||
return stream.iterator();
|
||||
default Iterator<T> iterator() {
|
||||
return stream().iterator();
|
||||
}
|
||||
|
||||
/**
|
||||
@ -452,8 +477,8 @@ abstract class StreamWrapper<T, I extends Stream<T>> implements Stream<T>, Itera
|
||||
* @return 流的拆分器
|
||||
*/
|
||||
@Override
|
||||
public Spliterator<T> spliterator() {
|
||||
return stream.spliterator();
|
||||
default Spliterator<T> spliterator() {
|
||||
return stream().spliterator();
|
||||
}
|
||||
|
||||
/**
|
||||
@ -462,8 +487,8 @@ abstract class StreamWrapper<T, I extends Stream<T>> implements Stream<T>, Itera
|
||||
* @return 流的并行状态
|
||||
*/
|
||||
@Override
|
||||
public boolean isParallel() {
|
||||
return stream.isParallel();
|
||||
default boolean isParallel() {
|
||||
return stream().isParallel();
|
||||
}
|
||||
|
||||
/**
|
||||
@ -472,8 +497,8 @@ abstract class StreamWrapper<T, I extends Stream<T>> implements Stream<T>, Itera
|
||||
* @return 串行流
|
||||
*/
|
||||
@Override
|
||||
public I sequential() {
|
||||
return convertToStreamImpl(stream.sequential());
|
||||
default I sequential() {
|
||||
return wrapping(stream().sequential());
|
||||
}
|
||||
|
||||
/**
|
||||
@ -482,8 +507,8 @@ abstract class StreamWrapper<T, I extends Stream<T>> implements Stream<T>, Itera
|
||||
* @return 并行流
|
||||
*/
|
||||
@Override
|
||||
public I parallel() {
|
||||
return convertToStreamImpl(stream.parallel());
|
||||
default I parallel() {
|
||||
return wrapping(stream().parallel());
|
||||
}
|
||||
|
||||
/**
|
||||
@ -493,8 +518,8 @@ abstract class StreamWrapper<T, I extends Stream<T>> implements Stream<T>, Itera
|
||||
* @return 无序流
|
||||
*/
|
||||
@Override
|
||||
public I unordered() {
|
||||
return convertToStreamImpl(stream.unordered());
|
||||
default I unordered() {
|
||||
return wrapping(stream().unordered());
|
||||
}
|
||||
|
||||
/**
|
||||
@ -504,8 +529,8 @@ abstract class StreamWrapper<T, I extends Stream<T>> implements Stream<T>, Itera
|
||||
* @return 流
|
||||
*/
|
||||
@Override
|
||||
public I onClose(Runnable closeHandler) {
|
||||
return convertToStreamImpl(stream.onClose(closeHandler));
|
||||
default I onClose(Runnable closeHandler) {
|
||||
return wrapping(stream().onClose(closeHandler));
|
||||
}
|
||||
|
||||
/**
|
||||
@ -514,50 +539,33 @@ abstract class StreamWrapper<T, I extends Stream<T>> implements Stream<T>, Itera
|
||||
* @see AutoCloseable#close()
|
||||
*/
|
||||
@Override
|
||||
public void close() {
|
||||
stream.close();
|
||||
default void close() {
|
||||
stream().close();
|
||||
}
|
||||
|
||||
/**
|
||||
* hashcode
|
||||
* 获取当前实例的哈希值
|
||||
*
|
||||
* @return hashcode
|
||||
* @return 哈希值
|
||||
*/
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return stream.hashCode();
|
||||
}
|
||||
int hashCode();
|
||||
|
||||
/**
|
||||
* equals
|
||||
* 比较实例是否相等
|
||||
*
|
||||
* @param obj 对象
|
||||
* @return 结果
|
||||
* @return 是否相等
|
||||
*/
|
||||
@Override
|
||||
public boolean equals(final Object obj) {
|
||||
if (obj instanceof Stream) {
|
||||
return stream.equals(obj);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
boolean equals(final Object obj);
|
||||
|
||||
/**
|
||||
* toString
|
||||
* 将当前实例转为字符串
|
||||
*
|
||||
* @return string
|
||||
* @return 字符串
|
||||
*/
|
||||
@Override
|
||||
public String toString() {
|
||||
return stream.toString();
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据一个原始的流,返回一个新包装类实例
|
||||
*
|
||||
* @param stream 流
|
||||
* @return 实现类
|
||||
*/
|
||||
protected abstract I convertToStreamImpl(Stream<T> stream);
|
||||
String toString();
|
||||
|
||||
}
|
||||
|
@ -0,0 +1,475 @@
|
||||
package cn.hutool.core.stream;
|
||||
|
||||
import cn.hutool.core.lang.Opt;
|
||||
import cn.hutool.core.lang.mutable.MutableInt;
|
||||
import cn.hutool.core.lang.mutable.MutableObj;
|
||||
import cn.hutool.core.util.ArrayUtil;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.function.*;
|
||||
import java.util.stream.Collector;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* {@link StreamWrapper}的扩展,为实现类提供更多终端操作方法
|
||||
*
|
||||
* @param <T> 流中的元素类型
|
||||
* @param <I> 链式调用获得的实现类类型
|
||||
* @author huangchengxing
|
||||
*/
|
||||
public interface TerminableStreamWrapper<T, I extends TerminableStreamWrapper<T, I>> extends StreamWrapper<T, I> {
|
||||
|
||||
// region ============ join ============
|
||||
|
||||
/**
|
||||
* 返回拼接后的字符串
|
||||
*
|
||||
* @return 拼接后的字符串
|
||||
*/
|
||||
default String join() {
|
||||
return this.join("");
|
||||
}
|
||||
|
||||
/**
|
||||
* 返回拼接后的字符串
|
||||
*
|
||||
* @param delimiter 分隔符
|
||||
* @return 拼接后的字符串
|
||||
*/
|
||||
default String join(CharSequence delimiter) {
|
||||
return this.join(delimiter, "", "");
|
||||
}
|
||||
|
||||
/**
|
||||
* 返回拼接后的字符串
|
||||
*
|
||||
* @param delimiter 分隔符
|
||||
* @param prefix 前缀
|
||||
* @param suffix 后缀
|
||||
* @return 拼接后的字符串
|
||||
*/
|
||||
default String join(CharSequence delimiter,
|
||||
CharSequence prefix,
|
||||
CharSequence suffix) {
|
||||
return stream().map(String::valueOf).collect(Collectors.joining(delimiter, prefix, suffix));
|
||||
}
|
||||
|
||||
// endregion
|
||||
|
||||
// region ============ to collection ============
|
||||
|
||||
/**
|
||||
* 转换成集合
|
||||
*
|
||||
* @param collectionFactory 集合工厂(可以是集合构造器)
|
||||
* @param <C> 集合类型
|
||||
* @return 集合
|
||||
*/
|
||||
default <C extends Collection<T>> C toColl(Supplier<C> collectionFactory) {
|
||||
return stream().collect(Collectors.toCollection(collectionFactory));
|
||||
}
|
||||
|
||||
/**
|
||||
* 转换为{@link ArrayList}
|
||||
*
|
||||
* @return 集合
|
||||
*/
|
||||
default List<T> toList() {
|
||||
return this.toColl(ArrayList::new);
|
||||
}
|
||||
|
||||
/**
|
||||
* 换为不可变集合
|
||||
*
|
||||
* @return 集合
|
||||
*/
|
||||
default List<T> toUnmodifiableList() {
|
||||
return Collections.unmodifiableList(this.toList());
|
||||
}
|
||||
|
||||
/**
|
||||
* 转换为HashSet
|
||||
*
|
||||
* @return 集合
|
||||
*/
|
||||
default Set<T> toSet() {
|
||||
return this.toColl(HashSet::new);
|
||||
}
|
||||
|
||||
/**
|
||||
* 换为不可变集合
|
||||
*
|
||||
* @return 集合
|
||||
*/
|
||||
default Set<T> toUnmodifiableSet() {
|
||||
return Collections.unmodifiableSet(this.toSet());
|
||||
}
|
||||
|
||||
// endregion
|
||||
|
||||
// region ============ to map ============
|
||||
|
||||
/**
|
||||
* 转换为map,key为给定操作执行后的返回值,value为当前元素
|
||||
*
|
||||
* @param keyMapper 指定的key操作
|
||||
* @param <K> key类型
|
||||
* @return map
|
||||
*/
|
||||
default <K> Map<K, T> toMap(Function<? super T, ? extends K> keyMapper) {
|
||||
return this.toMap(keyMapper, Function.identity());
|
||||
}
|
||||
|
||||
/**
|
||||
* 转换为map,key,value为给定操作执行后的返回值
|
||||
*
|
||||
* @param keyMapper 指定的key操作
|
||||
* @param valueMapper 指定value操作
|
||||
* @param <K> key类型
|
||||
* @param <U> value类型
|
||||
* @return map
|
||||
*/
|
||||
default <K, U> Map<K, U> toMap(
|
||||
Function<? super T, ? extends K> keyMapper, Function<? super T, ? extends U> valueMapper) {
|
||||
return this.toMap(keyMapper, valueMapper, (l, r) -> r);
|
||||
}
|
||||
|
||||
/**
|
||||
* 转换为不可变map,key,value为给定操作执行后的返回值
|
||||
*
|
||||
* @param keyMapper 指定的key操作
|
||||
* @param valueMapper 指定value操作
|
||||
* @param <K> key类型
|
||||
* @param <U> value类型
|
||||
* @return map
|
||||
*/
|
||||
default <K, U> Map<K, U> toUnmodifiableMap(
|
||||
Function<? super T, ? extends K> keyMapper, Function<? super T, ? extends U> valueMapper) {
|
||||
return Collections.unmodifiableMap(this.toMap(keyMapper, valueMapper));
|
||||
}
|
||||
|
||||
/**
|
||||
* 转换为map,key,value为给定操作执行后的返回值
|
||||
*
|
||||
* @param keyMapper 指定的key操作
|
||||
* @param valueMapper 指定value操作
|
||||
* @param mergeFunction 合并操作
|
||||
* @param <K> key类型
|
||||
* @param <U> value类型
|
||||
* @return map
|
||||
*/
|
||||
default <K, U> Map<K, U> toMap(
|
||||
Function<? super T, ? extends K> keyMapper,
|
||||
Function<? super T, ? extends U> valueMapper,
|
||||
BinaryOperator<U> mergeFunction) {
|
||||
return this.toMap(keyMapper, valueMapper, mergeFunction, HashMap::new);
|
||||
}
|
||||
|
||||
/**
|
||||
* 转换为不可变map,key,value为给定操作执行后的返回值
|
||||
*
|
||||
* @param keyMapper 指定的key操作
|
||||
* @param valueMapper 指定value操作
|
||||
* @param mergeFunction 合并操作
|
||||
* @param <K> key类型
|
||||
* @param <U> value类型
|
||||
* @return map
|
||||
*/
|
||||
default <K, U> Map<K, U> toUnmodifiableMap(
|
||||
Function<? super T, ? extends K> keyMapper,
|
||||
Function<? super T, ? extends U> valueMapper,
|
||||
BinaryOperator<U> mergeFunction) {
|
||||
return Collections.unmodifiableMap(
|
||||
this.toMap(keyMapper, valueMapper, mergeFunction, HashMap::new)
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* 转换为map,key,value为给定操作执行后的返回值
|
||||
*
|
||||
* @param keyMapper 指定的key操作
|
||||
* @param valueMapper 指定value操作
|
||||
* @param mergeFunction 合并操作
|
||||
* @param mapSupplier map工厂
|
||||
* @param <K> key类型
|
||||
* @param <U> value类型
|
||||
* @param <M> map类型
|
||||
* @return map
|
||||
*/
|
||||
default <K, U, M extends Map<K, U>> M toMap(
|
||||
Function<? super T, ? extends K> keyMapper,
|
||||
Function<? super T, ? extends U> valueMapper,
|
||||
BinaryOperator<U> mergeFunction,
|
||||
Supplier<M> mapSupplier) {
|
||||
return stream().collect(Collectors.toMap(keyMapper, valueMapper, mergeFunction, mapSupplier));
|
||||
}
|
||||
|
||||
// endregion
|
||||
|
||||
// region ============ to zip ============
|
||||
|
||||
/**
|
||||
* 与给定的可迭代对象转换成map,key为现有元素,value为给定可迭代对象迭代的元素<br>
|
||||
* 至少包含全部的key,如果对应位置上的value不存在,则为null
|
||||
*
|
||||
* @param other 可迭代对象
|
||||
* @param <R> 可迭代对象迭代的元素类型
|
||||
* @return map,key为现有元素,value为给定可迭代对象迭代的元素;<br>
|
||||
* 至少包含全部的key,如果对应位置上的value不存在,则为null;<br>
|
||||
* 如果key重复, 则保留最后一个关联的value;<br>
|
||||
*/
|
||||
default <R> Map<T, R> toZip(Iterable<R> other) {
|
||||
// value对象迭代器
|
||||
final Iterator<R> iterator = Opt.ofNullable(other).map(Iterable::iterator).orElseGet(Collections::emptyIterator);
|
||||
if (this.isParallel()) {
|
||||
List<T> keyList = toList();
|
||||
final Map<T, R> map = new HashMap<>(keyList.size());
|
||||
for (T key : keyList) {
|
||||
map.put(key, iterator.hasNext() ? iterator.next() : null);
|
||||
}
|
||||
return map;
|
||||
} else {
|
||||
return this.toMap(Function.identity(), e -> iterator.hasNext() ? iterator.next() : null);
|
||||
}
|
||||
}
|
||||
|
||||
// endregion
|
||||
|
||||
// region ============ group ============
|
||||
|
||||
/**
|
||||
* 通过给定分组依据进行分组
|
||||
*
|
||||
* @param classifier 分组依据,得到的键为{@code null}时不会抛出异常
|
||||
* @param <K> 实体中的分组依据对应类型,也是Map中key的类型
|
||||
* @return map
|
||||
*/
|
||||
default <K> Map<K, List<T>> group(Function<? super T, ? extends K> classifier) {
|
||||
return this.group(classifier, Collectors.toList());
|
||||
}
|
||||
|
||||
/**
|
||||
* 通过给定分组依据进行分组
|
||||
*
|
||||
* @param classifier 分组依据,得到的键为{@code null}时不会抛出异常
|
||||
* @param downstream 下游操作
|
||||
* @param <K> 实体中的分组依据对应类型,也是Map中key的类型
|
||||
* @param <D> 下游操作对应返回类型,也是Map中value的类型
|
||||
* @param <A> 下游操作在进行中间操作时对应类型
|
||||
* @return map
|
||||
*/
|
||||
default <K, A, D> Map<K, D> group(
|
||||
Function<? super T, ? extends K> classifier, Collector<? super T, A, D> downstream) {
|
||||
return this.group(classifier, HashMap::new, downstream);
|
||||
}
|
||||
|
||||
/**
|
||||
* 通过给定分组依据进行分组
|
||||
*
|
||||
* @param classifier 分组依据,得到的键为{@code null}时不会抛出异常
|
||||
* @param mapFactory 提供的map
|
||||
* @param downstream 下游操作
|
||||
* @param <K> 实体中的分组依据对应类型,也是Map中key的类型
|
||||
* @param <D> 下游操作对应返回类型,也是Map中value的类型
|
||||
* @param <A> 下游操作在进行中间操作时对应类型
|
||||
* @param <M> 最后返回结果Map类型
|
||||
* @return map
|
||||
* @see CollectorUtil#groupingBy(Function, Supplier, Collector)
|
||||
*/
|
||||
default <K, D, A, M extends Map<K, D>> M group(
|
||||
Function<? super T, ? extends K> classifier,
|
||||
Supplier<M> mapFactory,
|
||||
Collector<? super T, A, D> downstream) {
|
||||
return stream().collect(CollectorUtil.groupingBy(classifier, mapFactory, downstream));
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据给定判断条件分组
|
||||
*
|
||||
* @param predicate 判断条件
|
||||
* @return map
|
||||
*/
|
||||
default Map<Boolean, List<T>> partition(Predicate<T> predicate) {
|
||||
return this.partition(predicate, ArrayList::new);
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据给定判断条件分组
|
||||
*
|
||||
* @param predicate 判断条件
|
||||
* @param collFactory 提供的集合
|
||||
* @return map
|
||||
*/
|
||||
default <C extends Collection<T>> Map<Boolean, C> partition(Predicate<T> predicate, Supplier<C> collFactory) {
|
||||
return this.partition(predicate, Collectors.toCollection(collFactory));
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据给定判断条件分组
|
||||
*
|
||||
* @param predicate 判断条件
|
||||
* @param downstream 下游操作
|
||||
* @param <R> 返回值类型
|
||||
* @return map
|
||||
*/
|
||||
default <R> Map<Boolean, R> partition(Predicate<T> predicate, Collector<T, ?, R> downstream) {
|
||||
return stream().collect(Collectors.partitioningBy(predicate, downstream));
|
||||
}
|
||||
|
||||
// endregion
|
||||
|
||||
// region ============ foreach ============
|
||||
|
||||
/**
|
||||
* 对流里面的每一个元素执行一个操作,操作带下标,并行流时下标永远为-1
|
||||
* 这是一个终端操作
|
||||
*
|
||||
* @param action 操作
|
||||
*/
|
||||
default void forEachIdx(final BiConsumer<? super T, Integer> action) {
|
||||
Objects.requireNonNull(action);
|
||||
if (isParallel()) {
|
||||
stream().forEach(e -> action.accept(e, NOT_FOUND_INDEX));
|
||||
} else {
|
||||
final MutableInt index = new MutableInt(NOT_FOUND_INDEX);
|
||||
stream().forEach(e -> action.accept(e, index.incrementAndGet()));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 对流里面的每一个元素按照顺序执行一个操作,操作带下标,并行流时下标永远为-1
|
||||
* 这是一个终端操作
|
||||
*
|
||||
* @param action 操作
|
||||
*/
|
||||
default void forEachOrderedIdx(final BiConsumer<? super T, Integer> action) {
|
||||
Objects.requireNonNull(action);
|
||||
if (isParallel()) {
|
||||
stream().forEachOrdered(e -> action.accept(e, NOT_FOUND_INDEX));
|
||||
} else {
|
||||
final MutableInt index = new MutableInt(NOT_FOUND_INDEX);
|
||||
stream().forEachOrdered(e -> action.accept(e, index.incrementAndGet()));
|
||||
}
|
||||
}
|
||||
|
||||
// endregion
|
||||
|
||||
// region ============ find & get ============
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* 获取与给定断言匹配的第一个元素
|
||||
*
|
||||
* @param predicate 断言
|
||||
* @return 与给定断言匹配的第一个元素
|
||||
*/
|
||||
default Optional<T> findFirst(final Predicate<? super T> predicate) {
|
||||
return stream().filter(predicate).findFirst();
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取与给定断言匹配的第一个元素的下标,并行流下标永远为-1
|
||||
*
|
||||
* @param predicate 断言
|
||||
* @return 与给定断言匹配的第一个元素的下标,如果不存在则返回-1
|
||||
*/
|
||||
default int findFirstIdx(final Predicate<? super T> predicate) {
|
||||
Objects.requireNonNull(predicate);
|
||||
if (isParallel()) {
|
||||
return NOT_FOUND_INDEX;
|
||||
} else {
|
||||
final MutableInt index = new MutableInt(NOT_FOUND_INDEX);
|
||||
stream().filter(e -> {
|
||||
index.increment();
|
||||
return predicate.test(e);
|
||||
}).findFirst();
|
||||
return index.get();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取最后一个元素
|
||||
*
|
||||
* @return 最后一个元素
|
||||
*/
|
||||
default Optional<T> findLast() {
|
||||
final MutableObj<T> last = new MutableObj<>(null);
|
||||
spliterator().forEachRemaining(last::set);
|
||||
return Optional.ofNullable(last.get());
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取与给定断言匹配的最后一个元素
|
||||
*
|
||||
* @param predicate 断言
|
||||
* @return 与给定断言匹配的最后一个元素
|
||||
*/
|
||||
default Optional<T> findLast(final Predicate<? super T> predicate) {
|
||||
Objects.requireNonNull(predicate);
|
||||
final MutableObj<T> last = new MutableObj<>(null);
|
||||
spliterator().forEachRemaining(e -> {
|
||||
if (predicate.test(e)) {
|
||||
last.set(e);
|
||||
}
|
||||
});
|
||||
return Optional.ofNullable(last.get());
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取与给定断言匹配的最后一个元素的下标,并行流下标永远为-1
|
||||
*
|
||||
* @param predicate 断言
|
||||
* @return 与给定断言匹配的最后一个元素的下标,如果不存在则返回-1
|
||||
*/
|
||||
default int findLastIdx(final Predicate<? super T> predicate) {
|
||||
Objects.requireNonNull(predicate);
|
||||
if (isParallel()) {
|
||||
return NOT_FOUND_INDEX;
|
||||
} else {
|
||||
final MutableInt idxRef = new MutableInt(NOT_FOUND_INDEX);
|
||||
forEachIdx((e, i) -> {
|
||||
if (predicate.test(e)) {
|
||||
idxRef.set(i);
|
||||
}
|
||||
});
|
||||
return idxRef.get();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取流中指定下标的元素,如果是负数,则从最后一个开始数起
|
||||
*
|
||||
* @param idx 下标
|
||||
* @return 指定下标的元素
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
default Optional<T> at(final Integer idx) {
|
||||
return Opt.ofNullable(idx).map(i -> (T) ArrayUtil.get(toArray(), i)).toOptional();
|
||||
}
|
||||
|
||||
// endregion
|
||||
|
||||
// region ============ is ============
|
||||
|
||||
/**
|
||||
* 流是否为空
|
||||
*
|
||||
* @return 流是否为空
|
||||
*/
|
||||
default boolean isEmpty() {
|
||||
return !findAny().isPresent();
|
||||
}
|
||||
|
||||
/**
|
||||
* 流是否不为空
|
||||
*
|
||||
* @return 流是否不为空
|
||||
*/
|
||||
default boolean isNotEmpty() {
|
||||
return !isEmpty();
|
||||
}
|
||||
|
||||
// endregion
|
||||
|
||||
}
|
@ -0,0 +1,216 @@
|
||||
package cn.hutool.core.stream;
|
||||
|
||||
import cn.hutool.core.collection.ListUtil;
|
||||
import cn.hutool.core.lang.Assert;
|
||||
import cn.hutool.core.lang.Console;
|
||||
import cn.hutool.core.lang.mutable.MutableInt;
|
||||
import cn.hutool.core.util.ArrayUtil;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.function.BiConsumer;
|
||||
import java.util.function.BiPredicate;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Predicate;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
/**
|
||||
* {@link StreamWrapper}的扩展,为实现类提供更多中间操作方法
|
||||
*
|
||||
* @param <T> 流中的元素类型
|
||||
* @param <I> 链式调用获得的实现类类型
|
||||
* @author huangchengxing
|
||||
*/
|
||||
public interface TransformableStreamWrapper<T, I extends TransformableStreamWrapper<T, I>> extends StreamWrapper<T, I> {
|
||||
|
||||
/**
|
||||
* 过滤元素,返回与指定断言匹配的元素组成的流,断言带下标,并行流时下标永远为-1
|
||||
* 这是一个无状态中间操作
|
||||
*
|
||||
* @param predicate 断言
|
||||
* @return 返回叠加过滤操作后的流
|
||||
*/
|
||||
default I filterIdx(final BiPredicate<? super T, Integer> predicate) {
|
||||
Objects.requireNonNull(predicate);
|
||||
if (isParallel()) {
|
||||
return filter(e -> predicate.test(e, NOT_FOUND_INDEX));
|
||||
} else {
|
||||
final MutableInt index = new MutableInt(NOT_FOUND_INDEX);
|
||||
return filter(e -> predicate.test(e, index.incrementAndGet()));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 过滤掉空元素
|
||||
*
|
||||
* @return 过滤后的流
|
||||
*/
|
||||
default I nonNull() {
|
||||
return filter(Objects::nonNull);
|
||||
}
|
||||
|
||||
/**
|
||||
* 返回与指定函数将元素作为参数执行后组成的流。操作带下标,并行流时下标永远为-1
|
||||
* 这是一个无状态中间操作
|
||||
* @param action 指定的函数
|
||||
* @return 返回叠加操作后的FastStream
|
||||
* @apiNote 该方法存在的意义主要是用来调试
|
||||
* 当你需要查看经过操作管道某处的元素和下标,可以执行以下操作:
|
||||
* <pre>{@code
|
||||
* Stream.of("one", "two", "three", "four")
|
||||
* .filter(e -> e.length() > 3)
|
||||
* .peekIdx((e,i) -> System.out.println("Filtered value: " + e + " Filtered idx:" + i))
|
||||
* .map(String::toUpperCase)
|
||||
* .peekIdx((e,i) -> System.out.println("Mapped value: " + e + " Mapped idx:" + i))
|
||||
* .collect(Collectors.toList());
|
||||
* }</pre>
|
||||
*/
|
||||
default I peekIdx(BiConsumer<? super T, Integer> action) {
|
||||
Objects.requireNonNull(action);
|
||||
if (isParallel()) {
|
||||
return peek(e -> action.accept(e, NOT_FOUND_INDEX));
|
||||
} else {
|
||||
AtomicInteger index = new AtomicInteger(NOT_FOUND_INDEX);
|
||||
return peek(e -> action.accept(e, index.incrementAndGet()));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 返回叠加调用{@link Console#log(Object)}打印出结果的流
|
||||
*
|
||||
* @return 返回叠加操作后的FastStream
|
||||
*/
|
||||
default I log() {
|
||||
return peek(Console::log);
|
||||
}
|
||||
|
||||
/**
|
||||
* 反转顺序
|
||||
*
|
||||
* @return 反转元素顺序
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
default I reverse() {
|
||||
final T[] array = (T[]) toArray();
|
||||
ArrayUtil.reverse(array);
|
||||
return wrapping(Stream.of(array)).parallel(isParallel());
|
||||
}
|
||||
|
||||
/**
|
||||
* 更改流的并行状态
|
||||
*
|
||||
* @param parallel 是否并行
|
||||
* @return 流
|
||||
*/
|
||||
default I parallel(final boolean parallel) {
|
||||
return parallel ? parallel() : sequential();
|
||||
}
|
||||
|
||||
/**
|
||||
* 与给定元素组成的流合并,成为新的流
|
||||
*
|
||||
* @param obj 元素
|
||||
* @return 流
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
default I push(final T... obj) {
|
||||
Stream<T> result = stream();
|
||||
if (ArrayUtil.isNotEmpty(obj)) {
|
||||
result = Stream.concat(stream(), Stream.of(obj));
|
||||
}
|
||||
return wrapping(result);
|
||||
}
|
||||
|
||||
/**
|
||||
* 给定元素组成的流与当前流合并,成为新的流
|
||||
*
|
||||
* @param obj 元素
|
||||
* @return 流
|
||||
*/
|
||||
default I unshift(final T... obj) {
|
||||
Stream<T> result = stream();
|
||||
if (ArrayUtil.isNotEmpty(obj)) {
|
||||
result = Stream.concat(Stream.of(obj), stream());
|
||||
}
|
||||
return wrapping(result);
|
||||
}
|
||||
|
||||
/**
|
||||
* 通过删除或替换现有元素或者原地添加新的元素来修改列表,并以列表形式返回被修改的内容。此方法不会改变原列表。
|
||||
* 类似js的<a href="https://developer.mozilla.org/zh-CN/docs/Web/JavaScript/Reference/Global_Objects/Array/splice">splice</a>函数
|
||||
*
|
||||
* @param start 起始下标
|
||||
* @param deleteCount 删除个数,正整数
|
||||
* @param items 放入值
|
||||
* @return 操作后的流
|
||||
*/
|
||||
default I splice(final int start, final int deleteCount, final T... items) {
|
||||
final List<T> elements = stream().collect(Collectors.toList());
|
||||
return wrapping(ListUtil.splice(elements, start, deleteCount, items).stream())
|
||||
.parallel(isParallel());
|
||||
}
|
||||
|
||||
/**
|
||||
* 保留 与指定断言 匹配时的元素, 在第一次不匹配时终止, 抛弃当前(第一个不匹配元素)及后续所有元素
|
||||
* <p>与 jdk9 中的 takeWhile 方法不太一样, 这里的实现是个 顺序的、有状态的中间操作</p>
|
||||
* <pre>本环节中是顺序执行的, 但是后续操作可以支持并行流: {@code
|
||||
* FastStream.iterate(1, i -> i + 1)
|
||||
* .parallel()
|
||||
* // 顺序执行
|
||||
* .takeWhile(e -> e < 50)
|
||||
* // 并发
|
||||
* .map(e -> e + 1)
|
||||
* // 并发
|
||||
* .map(String::valueOf)
|
||||
* .toList();
|
||||
* }</pre>
|
||||
* <p>但是不建议在并行流中使用, 除非你确定 takeWhile 之后的操作能在并行流中受益很多</p>
|
||||
*
|
||||
* @param predicate 断言
|
||||
* @return 与指定断言匹配的元素组成的流
|
||||
*/
|
||||
default I takeWhile(final Predicate<? super T> predicate) {
|
||||
Objects.requireNonNull(predicate);
|
||||
return wrapping(StreamUtil.takeWhile(stream(), predicate));
|
||||
}
|
||||
|
||||
/**
|
||||
* 删除 与指定断言 匹配的元素, 在第一次不匹配时终止, 返回当前(第一个不匹配元素)及剩余元素组成的新流
|
||||
* <p>与 jdk9 中的 dropWhile 方法不太一样, 这里的实现是个 顺序的、有状态的中间操作</p>
|
||||
* <pre>本环节中是顺序执行的, 但是后续操作可以支持并行流: {@code
|
||||
* FastStream.iterate(1, i <= 100, i -> i + 1)
|
||||
* .parallel()
|
||||
* // 顺序执行
|
||||
* .dropWhile(e -> e < 50)
|
||||
* // 并发
|
||||
* .map(e -> e + 1)
|
||||
* // 并发
|
||||
* .map(String::valueOf)
|
||||
* .toList();
|
||||
* }</pre>
|
||||
* <p>但是不建议在并行流中使用, 除非你确定 dropWhile 之后的操作能在并行流中受益很多</p>
|
||||
*
|
||||
* @param predicate 断言
|
||||
* @return 剩余元素组成的流
|
||||
*/
|
||||
default I dropWhile(final Predicate<? super T> predicate) {
|
||||
Objects.requireNonNull(predicate);
|
||||
return wrapping(StreamUtil.dropWhile(stream(), predicate));
|
||||
}
|
||||
|
||||
/**
|
||||
* 将当前流转为另一对象。用于提供针对流本身而非流中元素的操作
|
||||
*
|
||||
* @param <R> 转换类型
|
||||
* @param transform 转换
|
||||
* @return 转换后的流
|
||||
*/
|
||||
default <R> Optional<R> transform(final Function<? super I, R> transform) {
|
||||
Assert.notNull(transform, "transform must not null");
|
||||
return Optional.ofNullable(transform.apply(wrapping(this)));
|
||||
}
|
||||
|
||||
}
|
@ -6,6 +6,7 @@ import org.junit.Test;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
@ -327,7 +328,7 @@ public class EntryStreamTest {
|
||||
Map<Integer, Integer> result = EntryStream.of(map).toMap();
|
||||
Assert.assertEquals(map, result);
|
||||
|
||||
result = EntryStream.of(map).toMap(LinkedHashMap::new);
|
||||
result = EntryStream.of(map).toMap((Supplier<Map<Integer, Integer>>)LinkedHashMap::new);
|
||||
Assert.assertEquals(new LinkedHashMap<>(map), result);
|
||||
|
||||
result = EntryStream.of(map).toMap(LinkedHashMap::new, (t1, t2) -> t1);
|
||||
|
Loading…
x
Reference in New Issue
Block a user