:trollface: 增强EasyStream#zip对并行流的支持,提供CollectorUtil.entryToMap对Entry转map提供支持,完善javadoc

This commit is contained in:
VampireAchao 2022-09-09 14:41:10 +08:00
parent 814b55cc87
commit fc532551af

View File

@ -3,7 +3,6 @@ package cn.hutool.core.stream;
import cn.hutool.core.collection.ListUtil; import cn.hutool.core.collection.ListUtil;
import cn.hutool.core.collection.iter.IterUtil; import cn.hutool.core.collection.iter.IterUtil;
import cn.hutool.core.lang.Console; import cn.hutool.core.lang.Console;
import cn.hutool.core.lang.Opt;
import cn.hutool.core.lang.mutable.MutableInt; import cn.hutool.core.lang.mutable.MutableInt;
import cn.hutool.core.lang.mutable.MutableObj; import cn.hutool.core.lang.mutable.MutableObj;
import cn.hutool.core.map.MapUtil; import cn.hutool.core.map.MapUtil;
@ -40,24 +39,15 @@ public interface TransformableWrappedStream<T, S extends TransformableWrappedStr
* @return 合并后的结果对象的流 * @return 合并后的结果对象的流
*/ */
default <U, R> EasyStream<R> zip( default <U, R> EasyStream<R> zip(
final Iterable<U> other, final Iterable<U> other,
final BiFunction<? super T, ? super U, ? extends R> zipper) { final BiFunction<? super T, ? super U, ? extends R> zipper) {
Objects.requireNonNull(zipper); Objects.requireNonNull(zipper);
final Spliterator<T> keys = spliterator(); Map<Integer, T> idxIdentityMap = mapIdx((e, idx) -> MapUtil.entry(idx, e)).collect(CollectorUtil.entryToMap());
final Spliterator<U> values = Opt.ofNullable(other).map(Iterable::spliterator).orElseGet(Spliterators::emptySpliterator); Map<Integer, U> idxOtherMap = EasyStream.of(other).mapIdx((e, idx) -> MapUtil.entry(idx, e)).collect(CollectorUtil.entryToMap());
// 获取两个Spliterator的中较小的数量 if (idxIdentityMap.size() <= idxOtherMap.size()) {
// 如果Spliterator经过流操作, getExactSizeIfKnown()可能会返回-1, 所以默认大小为 ArrayList.DEFAULT_CAPACITY return EasyStream.of(idxIdentityMap.keySet(), isParallel()).map(k -> zipper.apply(idxIdentityMap.get(k), idxOtherMap.get(k)));
final int sizeIfKnown = (int) Math.max(Math.min(keys.getExactSizeIfKnown(), values.getExactSizeIfKnown()), 10);
final List<R> list = new ArrayList<>(sizeIfKnown);
// 保存第一个Spliterator的值
final MutableObj<T> key = new MutableObj<>();
// 保存第二个Spliterator的值
final MutableObj<U> value = new MutableObj<>();
// 当两个Spliterator中都还有剩余元素时
while (keys.tryAdvance(key::set) && values.tryAdvance(value::set)) {
list.add(zipper.apply(key.get(), value.get()));
} }
return EasyStream.of(list).parallel(isParallel()).onClose(unwrap()::close); return EasyStream.of(idxOtherMap.keySet(), isParallel()).map(k -> zipper.apply(idxIdentityMap.get(k), idxOtherMap.get(k)));
} }
/** /**
@ -78,9 +68,9 @@ public interface TransformableWrappedStream<T, S extends TransformableWrappedStr
return EasyStream.<EasyStream<T>>of(EasyStream.of(list, isParallel())); return EasyStream.<EasyStream<T>>of(EasyStream.of(list, isParallel()));
} }
return EasyStream.iterate(0, i -> i < size, i -> i + batchSize) return EasyStream.iterate(0, i -> i < size, i -> i + batchSize)
.map(skip -> EasyStream.of(list.subList(skip, Math.min(size, skip + batchSize)), isParallel())) .map(skip -> EasyStream.of(list.subList(skip, Math.min(size, skip + batchSize)), isParallel()))
.parallel(isParallel()) .parallel(isParallel())
.onClose(unwrap()::close); .onClose(unwrap()::close);
} }
/** /**
@ -114,8 +104,8 @@ public interface TransformableWrappedStream<T, S extends TransformableWrappedStr
/** /**
* 将当前流转为键值对流 * 将当前流转为键值对流
* *
* @param keyMapper 键的映射方法 * @param keyMapper 键的映射方法
* @param <K> 键类型 * @param <K> 键类型
* @return {@link EntryStream}实例 * @return {@link EntryStream}实例
*/ */
default <K> EntryStream<K, T> toEntries(final Function<T, K> keyMapper) { default <K> EntryStream<K, T> toEntries(final Function<T, K> keyMapper) {
@ -159,7 +149,7 @@ public interface TransformableWrappedStream<T, S extends TransformableWrappedStr
default S splice(final int start, final int deleteCount, final T... items) { default S splice(final int start, final int deleteCount, final T... items) {
final List<T> elements = unwrap().collect(Collectors.toList()); final List<T> elements = unwrap().collect(Collectors.toList());
return wrap(ListUtil.splice(elements, start, deleteCount, items).stream()) return wrap(ListUtil.splice(elements, start, deleteCount, items).stream())
.parallel(isParallel()); .parallel(isParallel());
} }
/** /**
@ -262,6 +252,7 @@ public interface TransformableWrappedStream<T, S extends TransformableWrappedStr
/** /**
* 返回与指定函数将元素作为参数执行后组成的流操作带下标并行流时下标永远为-1 * 返回与指定函数将元素作为参数执行后组成的流操作带下标并行流时下标永远为-1
* 这是一个无状态中间操作 * 这是一个无状态中间操作
*
* @param action 指定的函数 * @param action 指定的函数
* @return 返回叠加操作后的FastStream * @return 返回叠加操作后的FastStream
* @apiNote 该方法存在的意义主要是用来调试 * @apiNote 该方法存在的意义主要是用来调试
@ -491,10 +482,9 @@ public interface TransformableWrappedStream<T, S extends TransformableWrappedStr
Objects.requireNonNull(childrenGetter); Objects.requireNonNull(childrenGetter);
Objects.requireNonNull(childrenSetter); Objects.requireNonNull(childrenSetter);
final MutableObj<Function<T, EasyStream<T>>> recursiveRef = new MutableObj<>(); final MutableObj<Function<T, EasyStream<T>>> recursiveRef = new MutableObj<>();
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked") final Function<T, EasyStream<T>> recursive = e -> EasyStream.of(childrenGetter.apply(e))
final Function<T, EasyStream<T>> recursive = e -> EasyStream.of(childrenGetter.apply(e)) .flat(recursiveRef.get())
.flat(recursiveRef.get()) .unshift(e);
.unshift(e);
recursiveRef.set(recursive); recursiveRef.set(recursive);
return wrap(flatMap(recursive).peek(e -> childrenSetter.accept(e, null))); return wrap(flatMap(recursive).peek(e -> childrenSetter.accept(e, null)));
} }