优化FastStream实现细节, 使用高效一些的写法;

This commit is contained in:
Zjp 2022-08-01 16:25:07 +08:00
parent 4f32f20ea9
commit 9a7367ffe1
4 changed files with 298 additions and 135 deletions

View File

@ -1467,7 +1467,7 @@ public class CollUtil {
* @return 最后一个位置 * @return 最后一个位置
* @since 5.6.6 * @since 5.6.6
*/ */
public static <T> int lastIndexOf(final Collection<T> collection, final Predicate<T> predicate) { public static <T> int lastIndexOf(final Collection<T> collection, final Predicate<? super T> predicate) {
if (collection instanceof List) { if (collection instanceof List) {
// List的查找最后一个有优化算法 // List的查找最后一个有优化算法
return ListUtil.lastIndexOf((List<T>) collection, predicate); return ListUtil.lastIndexOf((List<T>) collection, predicate);

View File

@ -524,7 +524,7 @@ public class ListUtil {
* @return 最后一个位置 * @return 最后一个位置
* @since 5.6.6 * @since 5.6.6
*/ */
public static <T> int lastIndexOf(final List<T> list, final Predicate<T> matcher) { public static <T> int lastIndexOf(final List<T> list, final Predicate<? super T> matcher) {
if (null != list) { if (null != list) {
final int size = list.size(); final int size = list.size();
if (size > 0) { if (size > 0) {

View File

@ -1,12 +1,17 @@
package cn.hutool.core.stream; package cn.hutool.core.stream;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.collection.ListUtil;
import cn.hutool.core.lang.Console; import cn.hutool.core.lang.Console;
import cn.hutool.core.lang.Opt; import cn.hutool.core.lang.Opt;
import cn.hutool.core.lang.mutable.MutableInt;
import cn.hutool.core.lang.mutable.MutableObj;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.text.StrUtil; import cn.hutool.core.text.StrUtil;
import java.util.*; import java.util.*;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.*; import java.util.function.*;
import java.util.stream.*; import java.util.stream.*;
@ -39,9 +44,15 @@ import java.util.stream.*;
* 这些异步任务默认使用{@link java.util.concurrent.ForkJoinPool}线程池进行管理 * 这些异步任务默认使用{@link java.util.concurrent.ForkJoinPool}线程池进行管理
* *
* @author VampireAchao * @author VampireAchao
* @author emptypoint
* @see java.util.stream.Stream * @see java.util.stream.Stream
* @since 6.0.0
*/ */
public class FastStream<T> implements Stream<T>, Iterable<T> { public class FastStream<T> implements Stream<T>, Iterable<T> {
/**
* 代表不存在的下标, 一般用于并行流的下标, 或者未找到元素时的下标
*/
private static final int NOT_FOUND_INDEX = -1;
protected final Stream<T> stream; protected final Stream<T> stream;
@ -215,7 +226,7 @@ public class FastStream<T> implements Stream<T>, Iterable<T> {
* @param b 第二个流 * @param b 第二个流
* @return 拼接两个流之后的流 * @return 拼接两个流之后的流
*/ */
public static <T> FastStream<T> concat(FastStream<? extends T> a, FastStream<? extends T> b) { public static <T> FastStream<T> concat(Stream<? extends T> a, Stream<? extends T> b) {
return new FastStream<>(Stream.concat(a, b)); return new FastStream<>(Stream.concat(a, b));
} }
@ -239,7 +250,7 @@ public class FastStream<T> implements Stream<T>, Iterable<T> {
* @return * @return
*/ */
public static <T> FastStream<T> of(Iterable<T> iterable, boolean parallel) { public static <T> FastStream<T> of(Iterable<T> iterable, boolean parallel) {
return Optional.ofNullable(iterable).map(Iterable::spliterator).map(spliterator -> StreamSupport.stream(spliterator, parallel)).map(FastStream::new).orElseGet(FastStream::empty); return Opt.ofNullable(iterable).map(Iterable::spliterator).map(spliterator -> StreamSupport.stream(spliterator, parallel)).map(FastStream::new).orElseGet(FastStream::empty);
} }
/** /**
@ -250,7 +261,7 @@ public class FastStream<T> implements Stream<T>, Iterable<T> {
* @return * @return
*/ */
public static <T> FastStream<T> of(Stream<T> stream) { public static <T> FastStream<T> of(Stream<T> stream) {
return new FastStream<>(stream); return new FastStream<>(Objects.requireNonNull(stream));
} }
/** /**
@ -261,7 +272,7 @@ public class FastStream<T> implements Stream<T>, Iterable<T> {
* @return 拆分后元素组成的流 * @return 拆分后元素组成的流
*/ */
public static FastStream<String> split(CharSequence str, String regex) { public static FastStream<String> split(CharSequence str, String regex) {
return Opt.ofBlankAble(str).map(String::valueOf).map(s -> s.split(regex)).map(FastStream::of).orElseGet(FastStream::empty); return Opt.ofBlankAble(str).map(CharSequence::toString).map(s -> s.split(regex)).map(FastStream::of).orElseGet(FastStream::empty);
} }
/** /**
@ -286,6 +297,7 @@ public class FastStream<T> implements Stream<T>, Iterable<T> {
* @return 指定操作结果 匹配 指定值 的元素组成的流 * @return 指定操作结果 匹配 指定值 的元素组成的流
*/ */
public <R> FastStream<T> filter(Function<? super T, ? extends R> mapper, R value) { public <R> FastStream<T> filter(Function<? super T, ? extends R> mapper, R value) {
Objects.requireNonNull(mapper);
return filter(e -> Objects.equals(Opt.ofNullable(e).map(mapper).get(), value)); return filter(e -> Objects.equals(Opt.ofNullable(e).map(mapper).get(), value));
} }
@ -298,8 +310,13 @@ public class FastStream<T> implements Stream<T>, Iterable<T> {
* @return 返回叠加过滤操作后的流 * @return 返回叠加过滤操作后的流
*/ */
public FastStream<T> filterIdx(BiPredicate<? super T, Integer> predicate) { public FastStream<T> filterIdx(BiPredicate<? super T, Integer> predicate) {
AtomicInteger index = new AtomicInteger(-1); Objects.requireNonNull(predicate);
return filter(e -> predicate.test(e, isParallel() ? index.get() : index.incrementAndGet())); if (isParallel()) {
return filter(e -> predicate.test(e, NOT_FOUND_INDEX));
} else {
MutableInt index = new MutableInt(NOT_FOUND_INDEX);
return filter(e -> predicate.test(e, index.increment().get()));
}
} }
/** /**
@ -333,8 +350,13 @@ public class FastStream<T> implements Stream<T>, Iterable<T> {
* @return 返回叠加操作后的流 * @return 返回叠加操作后的流
*/ */
public <R> FastStream<R> mapIdx(BiFunction<? super T, Integer, ? extends R> mapper) { public <R> FastStream<R> mapIdx(BiFunction<? super T, Integer, ? extends R> mapper) {
AtomicInteger index = new AtomicInteger(-1); Objects.requireNonNull(mapper);
return map(e -> mapper.apply(e, isParallel() ? index.get() : index.incrementAndGet())); if (isParallel()) {
return map(e -> mapper.apply(e, NOT_FOUND_INDEX));
} else {
MutableInt index = new MutableInt(NOT_FOUND_INDEX);
return map(e -> mapper.apply(e, index.increment().get()));
}
} }
/** /**
@ -363,8 +385,13 @@ public class FastStream<T> implements Stream<T>, Iterable<T> {
* @return 返回叠加拆分操作后的流 * @return 返回叠加拆分操作后的流
*/ */
public <R> FastStream<R> flatMapIdx(BiFunction<? super T, Integer, ? extends Stream<? extends R>> mapper) { public <R> FastStream<R> flatMapIdx(BiFunction<? super T, Integer, ? extends Stream<? extends R>> mapper) {
AtomicInteger index = new AtomicInteger(-1); Objects.requireNonNull(mapper);
return flatMap(e -> mapper.apply(e, isParallel() ? index.get() : index.incrementAndGet())); if (isParallel()) {
return flatMap(e -> mapper.apply(e, NOT_FOUND_INDEX));
} else {
MutableInt index = new MutableInt(NOT_FOUND_INDEX);
return flatMap(e -> mapper.apply(e, index.increment().get()));
}
} }
/** /**
@ -416,6 +443,7 @@ public class FastStream<T> implements Stream<T>, Iterable<T> {
* @return 返回叠加拆分操作后的流 * @return 返回叠加拆分操作后的流
*/ */
public <R> FastStream<R> flatMapIter(Function<? super T, ? extends Iterable<? extends R>> mapper) { public <R> FastStream<R> flatMapIter(Function<? super T, ? extends Iterable<? extends R>> mapper) {
Objects.requireNonNull(mapper);
return flatMap(w -> Opt.of(w).map(mapper).map(FastStream::of).orElseGet(FastStream::empty)); return flatMap(w -> Opt.of(w).map(mapper).map(FastStream::of).orElseGet(FastStream::empty));
} }
@ -463,7 +491,7 @@ public class FastStream<T> implements Stream<T>, Iterable<T> {
* @param <R> 拆分后流的元素类型 * @param <R> 拆分后流的元素类型
* @return 返回叠加拆分操作后的流 * @return 返回叠加拆分操作后的流
*/ */
public <R> FastStream<R> mapMulti(BiConsumer<? super T, ? super Consumer<R>> mapper) { public <R> FastStream<R> mapMulti(BiConsumer<? super T, ? super FastStreamBuilder<R>> mapper) {
Objects.requireNonNull(mapper); Objects.requireNonNull(mapper);
return flatMap(e -> { return flatMap(e -> {
FastStreamBuilder<R> buffer = FastStream.builder(); FastStreamBuilder<R> buffer = FastStream.builder();
@ -490,8 +518,31 @@ public class FastStream<T> implements Stream<T>, Iterable<T> {
* @param keyExtractor 去重依据 * @param keyExtractor 去重依据
* @return 一个具有去重特征的流 * @return 一个具有去重特征的流
*/ */
public FastStream<T> distinct(Function<? super T, ?> keyExtractor) { public <F> FastStream<T> distinct(Function<? super T, F> keyExtractor) {
return new FastStream<>(toMap(keyExtractor).entrySet().stream()).parallel(isParallel()).map(Map.Entry::getValue); Objects.requireNonNull(keyExtractor);
if (isParallel()) {
ConcurrentHashMap<F, Boolean> exists = MapUtil.newConcurrentHashMap();
// 标记是否出现过null值用于保留第一个出现的null
// 由于ConcurrentHashMap的key不能为null所以用此变量来标记
AtomicBoolean hasNull = new AtomicBoolean(false);
return of(stream.filter(e -> {
F key = keyExtractor.apply(e);
if (key == null) {
// 已经出现过null值跳过该值
if (hasNull.get()) {
return false;
}
hasNull.set(Boolean.TRUE);
return true;
} else {
// 第一次出现的key返回true
return null == exists.putIfAbsent(key, Boolean.TRUE);
}
})).parallel(isParallel());
} else {
Set<F> exists = new HashSet<>();
return of(stream.filter(e -> exists.add(keyExtractor.apply(e)))).parallel(isParallel());
}
} }
/** /**
@ -583,7 +634,9 @@ public class FastStream<T> implements Stream<T>, Iterable<T> {
*/ */
@Override @Override
public FastStream<T> sequential() { public FastStream<T> sequential() {
return new FastStream<>(stream.sequential()); //noinspection ResultOfMethodCallIgnored
stream.sequential();
return this;
} }
/** /**
@ -604,8 +657,13 @@ public class FastStream<T> implements Stream<T>, Iterable<T> {
* @param action 操作 * @param action 操作
*/ */
public void forEachIdx(BiConsumer<? super T, Integer> action) { public void forEachIdx(BiConsumer<? super T, Integer> action) {
AtomicInteger index = new AtomicInteger(-1); Objects.requireNonNull(action);
stream.forEach(e -> action.accept(e, isParallel() ? index.get() : index.incrementAndGet())); if (isParallel()) {
stream.forEach(e -> action.accept(e, NOT_FOUND_INDEX));
} else {
MutableInt index = new MutableInt(NOT_FOUND_INDEX);
stream.forEach(e -> action.accept(e, index.increment().get()));
}
} }
/** /**
@ -626,8 +684,13 @@ public class FastStream<T> implements Stream<T>, Iterable<T> {
* @param action 操作 * @param action 操作
*/ */
public void forEachOrderedIdx(BiConsumer<? super T, Integer> action) { public void forEachOrderedIdx(BiConsumer<? super T, Integer> action) {
AtomicInteger index = new AtomicInteger(-1); Objects.requireNonNull(action);
stream.forEachOrdered(e -> action.accept(e, isParallel() ? index.get() : index.incrementAndGet())); if (isParallel()) {
stream.forEachOrdered(e -> action.accept(e, NOT_FOUND_INDEX));
} else {
MutableInt index = new MutableInt(NOT_FOUND_INDEX);
stream.forEachOrdered(e -> action.accept(e, index.increment().get()));
}
} }
/** /**
@ -653,6 +716,7 @@ public class FastStream<T> implements Stream<T>, Iterable<T> {
* String[] strings = Stream.<Integer>builder().add(1).build().toArray(String[]::new); * String[] strings = Stream.<Integer>builder().add(1).build().toArray(String[]::new);
* }</pre> * }</pre>
*/ */
@Override
public <A> A[] toArray(IntFunction<A[]> generator) { public <A> A[] toArray(IntFunction<A[]> generator) {
//noinspection SuspiciousToArrayCall //noinspection SuspiciousToArrayCall
return stream.toArray(generator); return stream.toArray(generator);
@ -849,23 +913,28 @@ public class FastStream<T> implements Stream<T>, Iterable<T> {
* @return 与给定断言匹配的第一个元素 * @return 与给定断言匹配的第一个元素
*/ */
public T findFirst(Predicate<? super T> predicate) { public T findFirst(Predicate<? super T> predicate) {
return filter(predicate).findFirst().orElse(null); return stream.filter(predicate).findFirst().orElse(null);
} }
/** /**
* 获取与给定断言匹配的第一个元素的下标 * 获取与给定断言匹配的第一个元素的下标并行流下标永远为-1
* *
* @param predicate 断言 * @param predicate 断言
* @return 与给定断言匹配的第一个元素的下标 * @return 与给定断言匹配的第一个元素的下标如果不存在则返回-1
*/ */
public Integer findFirstIdx(Predicate<? super T> predicate) { public Integer findFirstIdx(Predicate<? super T> predicate) {
AtomicInteger idxRef = new AtomicInteger(-1); Objects.requireNonNull(predicate);
forEachIdx((e, i) -> { if (isParallel()) {
if (predicate.test(e) && idxRef.get() == -1) { return NOT_FOUND_INDEX;
idxRef.set(i); } else {
} MutableInt index = new MutableInt(NOT_FOUND_INDEX);
}); //noinspection ResultOfMethodCallIgnored
return idxRef.get(); stream.filter(e -> {
index.increment();
return predicate.test(e);
}).findFirst();
return index.get();
}
} }
/** /**
@ -874,7 +943,13 @@ public class FastStream<T> implements Stream<T>, Iterable<T> {
* @return 最后一个元素 * @return 最后一个元素
*/ */
public Optional<T> findLast() { public Optional<T> findLast() {
return Optional.of(toList()).filter(l -> !l.isEmpty()).map(l -> l.get(l.size() - 1)); if (isParallel()) {
return Optional.of(toList()).filter(CollUtil::isNotEmpty).map(l -> l.get(l.size() - 1));
} else {
MutableObj<T> last = new MutableObj<>(null);
forEach(last::set);
return Optional.ofNullable(last.get());
}
} }
/** /**
@ -884,23 +959,41 @@ public class FastStream<T> implements Stream<T>, Iterable<T> {
* @return 与给定断言匹配的最后一个元素 * @return 与给定断言匹配的最后一个元素
*/ */
public T findLast(Predicate<? super T> predicate) { public T findLast(Predicate<? super T> predicate) {
return reverse().filter(predicate).findFirst().orElse(null); Objects.requireNonNull(predicate);
if (isParallel()) {
List<T> list = toList();
final int index = ListUtil.lastIndexOf(list, predicate);
return index == NOT_FOUND_INDEX ? null : list.get(index);
} else {
MutableObj<T> last = new MutableObj<>(null);
forEach(e -> {
if (predicate.test(e)) {
last.set(e);
}
});
return last.get();
}
} }
/** /**
* 获取与给定断言匹配的最后一个元素的下标 * 获取与给定断言匹配的最后一个元素的下标并行流下标永远为-1
* *
* @param predicate 断言 * @param predicate 断言
* @return 与给定断言匹配的最后一个元素的下标 * @return 与给定断言匹配的最后一个元素的下标如果不存在则返回-1
*/ */
public Integer findLastIdx(Predicate<? super T> predicate) { public Integer findLastIdx(Predicate<? super T> predicate) {
AtomicInteger idxRef = new AtomicInteger(-1); Objects.requireNonNull(predicate);
forEachIdx((e, i) -> { if (isParallel()) {
if (predicate.test(e)) { return NOT_FOUND_INDEX;
idxRef.set(i); } else {
} MutableInt idxRef = new MutableInt(NOT_FOUND_INDEX);
}); forEachIdx((e, i) -> {
return idxRef.get(); if (predicate.test(e)) {
idxRef.set(i);
}
});
return idxRef.get();
}
} }
/** /**
@ -951,7 +1044,9 @@ public class FastStream<T> implements Stream<T>, Iterable<T> {
*/ */
@Override @Override
public FastStream<T> parallel() { public FastStream<T> parallel() {
return new FastStream<>(stream.parallel()); //noinspection ResultOfMethodCallIgnored
stream.parallel();
return this;
} }
/** /**
@ -961,7 +1056,7 @@ public class FastStream<T> implements Stream<T>, Iterable<T> {
* @return * @return
*/ */
public FastStream<T> parallel(boolean parallel) { public FastStream<T> parallel(boolean parallel) {
return new FastStream<>(parallel ? stream.parallel() : stream.sequential()); return parallel ? parallel() : sequential();
} }
/** /**
@ -982,7 +1077,9 @@ public class FastStream<T> implements Stream<T>, Iterable<T> {
*/ */
@Override @Override
public FastStream<T> onClose(Runnable closeHandler) { public FastStream<T> onClose(Runnable closeHandler) {
return new FastStream<>(stream.onClose(closeHandler)); //noinspection ResultOfMethodCallIgnored
stream.onClose(closeHandler);
return this;
} }
/** /**
@ -992,7 +1089,7 @@ public class FastStream<T> implements Stream<T>, Iterable<T> {
* @return * @return
*/ */
public FastStream<T> push(T obj) { public FastStream<T> push(T obj) {
return FastStream.concat(this, FastStream.of(obj)); return FastStream.concat(this.stream, Stream.of(obj));
} }
/** /**
@ -1003,7 +1100,7 @@ public class FastStream<T> implements Stream<T>, Iterable<T> {
*/ */
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public FastStream<T> push(T... obj) { public FastStream<T> push(T... obj) {
return FastStream.concat(this, FastStream.of(obj)); return FastStream.concat(this.stream, Stream.of(obj));
} }
/** /**
@ -1012,8 +1109,8 @@ public class FastStream<T> implements Stream<T>, Iterable<T> {
* @param obj 元素 * @param obj 元素
* @return * @return
*/ */
public FastStream<T> unshift(T obj) { public FastStream<T> addFirst(T obj) {
return FastStream.concat(FastStream.of(obj), this); return FastStream.concat(Stream.of(obj), this.stream);
} }
/** /**
@ -1022,9 +1119,9 @@ public class FastStream<T> implements Stream<T>, Iterable<T> {
* @param obj 元素 * @param obj 元素
* @return * @return
*/ */
@SuppressWarnings("unchecked") @SafeVarargs
public FastStream<T> unshift(T... obj) { public final FastStream<T> addFirst(T... obj) {
return FastStream.concat(FastStream.of(obj), this); return FastStream.concat(Stream.of(obj), this.stream);
} }
/** /**
@ -1037,17 +1134,7 @@ public class FastStream<T> implements Stream<T>, Iterable<T> {
if (Objects.isNull(idx)) { if (Objects.isNull(idx)) {
return null; return null;
} }
List<T> list = toList(); return CollUtil.get(toList(), idx);
if (idx > -1) {
if (idx >= list.size()) {
return null;
}
return list.get(idx);
}
if (-idx > list.size()) {
return null;
}
return list.get(list.size() + idx);
} }
/** /**
@ -1134,15 +1221,28 @@ public class FastStream<T> implements Stream<T>, Iterable<T> {
} }
/** /**
* 与给定的可迭代对象转换成mapkey为现有元素value为给定可迭代对象迭代的元素 * 与给定的可迭代对象转换成mapkey为现有元素value为给定可迭代对象迭代的元素<br>
* 至少包含全部的key如果对应位置上的value不存在则为null
* *
* @param other 可迭代对象 * @param other 可迭代对象
* @param <R> 可迭代对象迭代的元素类型 * @param <R> 可迭代对象迭代的元素类型
* @return mapkey为现有元素value为给定可迭代对象迭代的元素 * @return mapkey为现有元素value为给定可迭代对象迭代的元素;<br>
* 至少包含全部的key如果对应位置上的value不存在则为null;<br>
* 如果key重复, 则保留最后一个关联的value;<br>
*/ */
public <R> Map<T, R> toZip(Iterable<R> other) { public <R> Map<T, R> toZip(Iterable<R> other) {
Iterator<R> iterator = other.iterator(); // value对象迭代器
return toMap(Function.identity(), e -> iterator.hasNext() ? iterator.next() : null); final Iterator<R> iterator = Opt.ofNullable(other).map(Iterable::iterator).orElseGet(Collections::emptyIterator);
if (isParallel()) {
List<T> keyList = toList();
final Map<T, R> map = MapUtil.newHashMap(keyList.size());
for (T key : keyList) {
map.put(key, iterator.hasNext() ? iterator.next() : null);
}
return map;
} else {
return toMap(Function.identity(), e -> iterator.hasNext() ? iterator.next() : null);
}
} }
/** /**
@ -1283,63 +1383,106 @@ public class FastStream<T> implements Stream<T>, Iterable<T> {
return collect(CollectorUtil.groupingBy(classifier, mapFactory, downstream)); return collect(CollectorUtil.groupingBy(classifier, mapFactory, downstream));
} }
/**
* 现有元素 给定迭代器中对应位置的元素 使用 zipper 转换为新的元素并返回新元素组成的流<br>
* 新流的数量等于旧流元素的数量<br>
* 使用 zipper 转换时, 如果对应位置上已经没有other元素则other元素为null<br>
*
* @param other 给定的迭代器
* @param zipper 两个元素的合并器
* @param <U> 给定的迭代对象类型
* @param <R> 合并后的结果对象类型
* @return 合并后的结果对象的流
*/
public <U, R> FastStream<R> zip(Iterable<U> other, public <U, R> FastStream<R> zip(Iterable<U> other,
BiFunction<? super T, ? super U, ? extends R> zipper) { BiFunction<? super T, ? super U, ? extends R> zipper) {
Iterator<U> iterator = other.iterator(); Objects.requireNonNull(zipper);
return new FastStream<>(stream.map(e -> zipper.apply(e, iterator.hasNext() ? iterator.next() : null))); // 给定对象迭代器
final Iterator<U> iterator = Opt.ofNullable(other).map(Iterable::iterator).orElseGet(Collections::emptyIterator);
Stream<T> resStream = this.stream;
if (isParallel()) {
resStream = toList().stream();
}
return of(resStream.map(e -> zipper.apply(e, iterator.hasNext() ? iterator.next() : null)));
} }
/** /**
* 类似js的<a href="https://developer.mozilla.org/zh-CN/docs/Web/JavaScript/Reference/Global_Objects/Array/splice">splice</a>函数 * 类似js的<a href="https://developer.mozilla.org/zh-CN/docs/Web/JavaScript/Reference/Global_Objects/Array/splice">splice</a>函数
* *
* @param start 起始下标 * @param start 起始下标
* @param deleteCount 删除个数 * @param deleteCount 删除个数正整数
* @param items 放入值 * @param items 放入值
* @return 操作后的流 * @return 操作后的流
*/ */
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public FastStream<T> splice(int start, int deleteCount, T... items) { public FastStream<T> splice(int start, int deleteCount, T... items) {
List<T> list = toList(); List<T> list = toList();
if (start > -1) { final int size = list.size();
if (start >= list.size()) { // 从后往前查找
return FastStream.concat(FastStream.of(list), FastStream.of(items)); if (start < 0) {
} start += size;
list.removeAll(list.subList(start, start + deleteCount)); } else if (start >= size) {
list.addAll(start, Arrays.asList(items)); // 直接在尾部追加不删除
return FastStream.of(list); start = size;
deleteCount = 0;
} }
if (-start > list.size()) { // 起始位置 加上 删除的数量 超过 数据长度需要重新计算需要删除的数量
return FastStream.concat(FastStream.of(items), FastStream.of(list)); if (start + deleteCount > size) {
deleteCount = size - start;
} }
start = list.size() + start;
list.removeAll(list.subList(start, start + deleteCount)); // 新列表的长度
list.addAll(start, Arrays.asList(items)); final int newSize = size - deleteCount + items.length;
return FastStream.of(list); List<T> resList = list;
// 新列表的长度 大于 旧列表创建新列表
if (newSize > size) {
resList = new ArrayList<>(newSize);
resList.addAll(list);
}
// 需要删除的部分
if (deleteCount > 0) {
resList.subList(start, start + deleteCount).clear();
}
// 新增的部分
if (items.length > 0) {
resList.addAll(start, Arrays.asList(items));
}
return FastStream.of(resList);
} }
/** /**
* 按指定长度切分为双层流 * 按指定长度切分为双层流
* <p>
* 形如[1,2,3,4,5] -> [[1,2], [3,4], [5,6]]
* </p>
* *
* @param batchSize 指定长度 * @param batchSize 指定长度, 正整数
* @return 切好的流 * @return 切好的流
*/ */
public FastStream<FastStream<T>> sub(int batchSize) { public FastStream<FastStream<T>> split(final int batchSize) {
List<T> list = toList(); List<T> list = toList();
if (list.size() <= batchSize) { final int size = list.size();
return FastStream.<FastStream<T>>of(FastStream.of(list)).parallel(isParallel()); // 指定长度 大于等于 列表长度
if (size <= batchSize) {
// 返回第一层只有单个元素的双层流形如[[1,2,3,4,5]]
return FastStream.<FastStream<T>>of(of(list, isParallel()));
} }
return FastStream.iterate(0, i -> i < list.size(), i -> i + batchSize) return FastStream.iterate(0, i -> i < size, i -> i + batchSize)
.map(skip -> FastStream.of(list).skip(skip).limit(batchSize)).parallel(isParallel()); .map(skip -> of(list.subList(skip, Math.min(size, skip + batchSize)), isParallel()))
.parallel(isParallel());
} }
/** /**
* 按指定长度切分为元素为list的流 * 按指定长度切分为元素为list的流
* <p>
* 形如[1,2,3,4,5] -> [[1,2], [3,4], [5,6]]
* </p>
* *
* @param batchSize 指定长度 * @param batchSize 指定长度, 正整数
* @return 切好的流 * @return 切好的流
*/ */
public FastStream<List<T>> subList(int batchSize) { public FastStream<List<T>> splitList(final int batchSize) {
return sub(batchSize).map(FastStream::toList); return split(batchSize).map(FastStream::toList);
} }
public interface FastStreamBuilder<T> extends Consumer<T>, cn.hutool.core.builder.Builder<FastStream<T>> { public interface FastStreamBuilder<T> extends Consumer<T>, cn.hutool.core.builder.Builder<FastStream<T>> {

View File

@ -1,10 +1,13 @@
package cn.hutool.core.stream; package cn.hutool.core.stream;
import cn.hutool.core.collection.ListUtil;
import cn.hutool.core.map.MapUtil;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import java.util.*; import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
import static java.util.Collections.singletonList; import static java.util.Collections.singletonList;
@ -62,17 +65,19 @@ public class FastStreamTest {
@Test @Test
public void testToZip() { public void testToZip() {
List<Integer> orders = Arrays.asList(1, 2, 3); List<Integer> orders = Arrays.asList(1, 2, 3, 2);
List<String> list = Arrays.asList("dromara", "hutool", "sweet"); List<String> list = Arrays.asList("dromara", "guava", "sweet", "hutool");
Map<Integer, String> toZip = FastStream.of(orders).toZip(list); final Map<Integer, String> map = MapUtil.<Integer, String>builder()
Assert.assertEquals(new HashMap<Integer, String>() { .put(1, "dromara")
private static final long serialVersionUID = 1L; .put(2, "hutool")
.put(3, "sweet")
.build();
{ Map<Integer, String> toZip = FastStream.of(orders).toZip(list);
put(1, "dromara"); Assert.assertEquals(map, toZip);
put(2, "hutool");
put(3, "sweet"); Map<Integer, String> toZipParallel = FastStream.of(orders).parallel().toZip(list);
}}, toZip); Assert.assertEquals(map, toZipParallel);
} }
@Test @Test
@ -92,10 +97,10 @@ public class FastStreamTest {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
{ {
put("1", 1); put("1", 1);
put("2", 2); put("2", 2);
put("3", 3); put("3", 3);
}}, identityMap); }}, identityMap);
} }
@Test @Test
@ -107,10 +112,10 @@ public class FastStreamTest {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
{ {
put("1", singletonList(1)); put("1", singletonList(1));
put("2", singletonList(2)); put("2", singletonList(2));
put("3", singletonList(3)); put("3", singletonList(3));
}}, group); }}, group);
} }
@Test @Test
@ -126,19 +131,29 @@ public class FastStreamTest {
public void testMapMulti() { public void testMapMulti() {
List<Integer> list = Arrays.asList(1, 2, 3); List<Integer> list = Arrays.asList(1, 2, 3);
List<Integer> mapMulti = FastStream.of(list).<Integer>mapMulti((e, buffer) -> { List<Integer> mapMulti = FastStream.of(list).<Integer>mapMulti((e, buffer) -> {
if (e % 2 == 0) { for (int i = 0; i < e; i++) {
buffer.accept(e); buffer.accept(e);
} }
buffer.accept(e);
}).toList(); }).toList();
Assert.assertEquals(Arrays.asList(1, 2, 2, 3), mapMulti); Assert.assertEquals(Arrays.asList(1, 2, 2, 3, 3, 3), mapMulti);
} }
@Test @Test
public void testDistinct() { public void testDistinct() {
List<Integer> list = Arrays.asList(1, 2, 2, 3); List<Integer> list = ListUtil.of(3, 2, 2, 1, null, null);
List<Integer> distinctBy = FastStream.of(list).distinct(String::valueOf).toList(); for (int i = 0; i < 1000; i++) {
Assert.assertEquals(Arrays.asList(1, 2, 3), distinctBy); list.add(i);
}
// 使用stream去重
List<Integer> collect1 = list.stream().distinct().collect(Collectors.toList());
List<Integer> collect2 = list.stream().parallel().distinct().collect(Collectors.toList());
// 使用FastStream去重
List<Integer> distinctBy1 = FastStream.of(list).distinct().toList();
List<Integer> distinctBy2 = FastStream.of(list).parallel().distinct(String::valueOf).toList();
Assert.assertEquals(collect1, distinctBy1);
Assert.assertEquals(collect2, distinctBy2);
} }
@Test @Test
@ -165,7 +180,7 @@ public class FastStreamTest {
List<String> mapIndex = FastStream.of(list).flatMapIdx((e, i) -> FastStream.of(i + 1 + "." + e)).toList(); List<String> mapIndex = FastStream.of(list).flatMapIdx((e, i) -> FastStream.of(i + 1 + "." + e)).toList();
Assert.assertEquals(Arrays.asList("1.dromara", "2.hutool", "3.sweet"), mapIndex); Assert.assertEquals(Arrays.asList("1.dromara", "2.hutool", "3.sweet"), mapIndex);
// 并行流时为-1 // 并行流时为-1
Assert.assertEquals(Arrays.asList(-1, -1, -1), FastStream.of(1, 2, 3).parallel().mapIdx((e, i) -> i).toList()); Assert.assertEquals(Arrays.asList(-1, -1, -1), FastStream.of(1, 2, 3).parallel().flatMapIdx((e, i) -> FastStream.of(i)).toList());
} }
@Test @Test
@ -173,6 +188,9 @@ public class FastStreamTest {
List<Integer> list = Arrays.asList(1, 2, 3); List<Integer> list = Arrays.asList(1, 2, 3);
List<Integer> flatMapIter = FastStream.of(list).<Integer>flatMapIter(e -> null).toList(); List<Integer> flatMapIter = FastStream.of(list).<Integer>flatMapIter(e -> null).toList();
Assert.assertEquals(Collections.emptyList(), flatMapIter); Assert.assertEquals(Collections.emptyList(), flatMapIter);
flatMapIter = FastStream.of(list).flatMapIter(e -> Arrays.asList(e, e * 10)).toList();
Assert.assertEquals(ListUtil.of(1, 10, 2, 20, 3, 30), flatMapIter);
} }
@Test @Test
@ -212,9 +230,9 @@ public class FastStreamTest {
} }
@Test @Test
public void testUnshift() { public void testAddFirst() {
List<Integer> list = Arrays.asList(2, 3); List<Integer> list = Arrays.asList(2, 3);
List<Integer> unshift = FastStream.of(list).unshift(1).toList(); List<Integer> unshift = FastStream.of(list).addFirst(1).toList();
Assert.assertEquals(Arrays.asList(1, 2, 3), unshift); Assert.assertEquals(Arrays.asList(1, 2, 3), unshift);
} }
@ -256,10 +274,10 @@ public class FastStreamTest {
@Test @Test
public void testFindLast() { public void testFindLast() {
List<Integer> list = Arrays.asList(1, null, 3); List<Integer> list = ListUtil.of(1, null, 3);
Integer find = FastStream.of(list).findLast(Objects::nonNull); Integer find = FastStream.of(list).parallel().findLast(Objects::nonNull);
Assert.assertEquals(3, (Object) find); Assert.assertEquals(3, (Object) find);
Assert.assertEquals(3, (Object) FastStream.of(list).findLast().orElse(null)); Assert.assertEquals(3, (Object) FastStream.of(list).parallel().findLast().orElse(null));
} }
@Test @Test
@ -279,22 +297,24 @@ public class FastStreamTest {
} }
@Test @Test
public void testSub() { public void testListSplit() {
List<Integer> list = Arrays.asList(1, 2, 3, 4, 5); List<Integer> list = Arrays.asList(1, 2, 3, 4, 5);
List<List<Integer>> lists = FastStream.of(list).sub(2).map(FastStream::toList).toList(); List<List<Integer>> lists = FastStream.of(list).split(2).map(FastStream::toList).toList();
Assert.assertEquals(Arrays.asList(Arrays.asList(1, 2), Assert.assertEquals(ListUtil.split(list, 2), lists);
Arrays.asList(3, 4),
singletonList(5) // 指定长度 大于等于 列表长度
), lists); lists = FastStream.of(list).split(list.size()).map(FastStream::toList).toList();
Assert.assertEquals(singletonList(list), lists);
} }
@Test @Test
public void testSubList() { public void testSplitList() {
List<Integer> list = Arrays.asList(1, 2, 3, 4, 5); List<Integer> list = Arrays.asList(1, 2, 3, 4, 5);
List<List<Integer>> lists = FastStream.of(list).subList(2).toList(); List<List<Integer>> lists = FastStream.of(list).splitList(2).toList();
Assert.assertEquals(Arrays.asList(Arrays.asList(1, 2), Assert.assertEquals(ListUtil.split(list, 2), lists);
Arrays.asList(3, 4),
singletonList(5) // 指定长度 大于等于 列表长度
), lists); lists = FastStream.of(list).splitList(list.size()).toList();
Assert.assertEquals(singletonList(list), lists);
} }
} }