优化FastStream:

1.新增takeWhile、dropWhile方法;
2.优化部分写法, 希望进一步提高效率;
3.删除多余的代码, 移动部分代码到其他类中;
This commit is contained in:
Zjp 2022-08-05 17:06:21 +08:00
parent d8e63ab2f0
commit fc8923b2e9
10 changed files with 660 additions and 163 deletions

View File

@ -13,6 +13,7 @@ import cn.hutool.core.util.ObjUtil;
import cn.hutool.core.util.PageUtil; import cn.hutool.core.util.PageUtil;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.Comparator; import java.util.Comparator;
@ -692,4 +693,51 @@ public class ListUtil {
} }
return list; return list;
} }
/**
* 类似js的<a href="https://developer.mozilla.org/zh-CN/docs/Web/JavaScript/Reference/Global_Objects/Array/splice">splice</a>函数
*
* @param start 起始下标, 可以为负数, -1代表最后一个元素
* @param deleteCount 删除个数必须是正整数
* @param items 放入值
* @return 操作后的流
* @since 6.0.0
*/
@SafeVarargs
public static <T> List<T> splice(List<T> list, int start, int deleteCount, T... items) {
if (CollUtil.isEmpty(list)) {
return zero();
}
final int size = list.size();
// 从后往前查找
if (start < 0) {
start += size;
} else if (start >= size) {
// 直接在尾部追加不删除
start = size;
deleteCount = 0;
}
// 起始位置 加上 删除的数量 超过 数据长度需要重新计算需要删除的数量
if (start + deleteCount > size) {
deleteCount = size - start;
}
// 新列表的长度
final int newSize = size - deleteCount + items.length;
List<T> resList = list;
// 新列表的长度 大于 旧列表创建新列表
if (newSize > size) {
resList = new ArrayList<>(newSize);
resList.addAll(list);
}
// 需要删除的部分
if (deleteCount > 0) {
resList.subList(start, start + deleteCount).clear();
}
// 新增的部分
if (ArrayUtil.isNotEmpty(items)) {
resList.addAll(start, Arrays.asList(items));
}
return resList;
}
} }

View File

@ -127,6 +127,19 @@ public class Opt<T> {
} }
} }
/**
* 根据 {@link Optional} 构造 {@code Opt}
*
* @param optional optional
* @param <T> 包裹的元素类型
* @return 一个包裹里元素可能为空的 {@code Opt}
* @since 6.0.0
*/
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
public static <T> Opt<T> of(Optional<T> optional) {
return ofNullable(optional).flattedMap(Function.identity());
}
/** /**
* 包裹里实际的元素 * 包裹里实际的元素
*/ */

View File

@ -81,6 +81,47 @@ public class MutableInt extends Number implements Comparable<MutableInt>, Mutabl
return this; return this;
} }
// -----------------------------------------------------------------------
/**
* 先加1, 再获取值
*
* @return +1后的值
* @since 6.0.0
*/
public int incrementAndGet() {
return ++value;
}
/**
* 先获取原来的值, 再加1
*
* @return 原始值
* @since 6.0.0
*/
public int getAndIncrement() {
return value++;
}
/**
* 先减1, 再获取值
*
* @return -1后的值
* @since 6.0.0
*/
public int decrementAndGet() {
return --value;
}
/**
* 先获取原来的值, 再减1
*
* @return 原始值
* @since 6.0.0
*/
public int getAndDecrement() {
return value--;
}
// ----------------------------------------------------------------------- // -----------------------------------------------------------------------
/** /**
* 增加值 * 增加值

View File

@ -1,6 +1,7 @@
package cn.hutool.core.lang.mutable; package cn.hutool.core.lang.mutable;
import java.io.Serializable; import java.io.Serializable;
import java.util.function.Consumer;
/** /**
* 可变{@code Object} * 可变{@code Object}
@ -8,7 +9,7 @@ import java.io.Serializable;
* @param <T> 可变的类型 * @param <T> 可变的类型
* @since 3.0.1 * @since 3.0.1
*/ */
public class MutableObj<T> implements Mutable<T>, Serializable { public class MutableObj<T> implements Mutable<T>, Serializable, Consumer<T> {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
/** /**
@ -50,6 +51,17 @@ public class MutableObj<T> implements Mutable<T>, Serializable {
this.value = value; this.value = value;
} }
/**
* 消费元素
*
* @param t t
* @since 6.0.0
*/
@Override
public void accept(T t) {
this.value = t;
}
// ----------------------------------------------------------------------- // -----------------------------------------------------------------------
@Override @Override
public boolean equals(final Object obj) { public boolean equals(final Object obj) {

View File

@ -1,12 +1,12 @@
package cn.hutool.core.stream; package cn.hutool.core.stream;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.collection.ListUtil; import cn.hutool.core.collection.ListUtil;
import cn.hutool.core.lang.Console; import cn.hutool.core.lang.Console;
import cn.hutool.core.lang.Opt; import cn.hutool.core.lang.Opt;
import cn.hutool.core.lang.mutable.MutableInt; import cn.hutool.core.lang.mutable.MutableInt;
import cn.hutool.core.lang.mutable.MutableObj; import cn.hutool.core.lang.mutable.MutableObj;
import cn.hutool.core.map.MapUtil; import cn.hutool.core.map.MapUtil;
import cn.hutool.core.stream.support.StreamHelper;
import cn.hutool.core.text.StrUtil; import cn.hutool.core.text.StrUtil;
import cn.hutool.core.util.ArrayUtil; import cn.hutool.core.util.ArrayUtil;
@ -61,6 +61,7 @@ public class FastStream<T> implements Stream<T>, Iterable<T> {
this.stream = stream; this.stream = stream;
} }
// region Static method
// --------------------------------------------------------------- Static method start // --------------------------------------------------------------- Static method start
/** /**
@ -157,51 +158,7 @@ public class FastStream<T> implements Stream<T>, Iterable<T> {
public static <T> FastStream<T> iterate(T seed, Predicate<? super T> hasNext, UnaryOperator<T> next) { public static <T> FastStream<T> iterate(T seed, Predicate<? super T> hasNext, UnaryOperator<T> next) {
Objects.requireNonNull(next); Objects.requireNonNull(next);
Objects.requireNonNull(hasNext); Objects.requireNonNull(hasNext);
Spliterator<T> spliterator = new Spliterators.AbstractSpliterator<T>(Long.MAX_VALUE, return new FastStream<>(StreamHelper.iterate(seed, hasNext, next));
Spliterator.ORDERED | Spliterator.IMMUTABLE) {
T prev;
boolean started;
boolean finished;
@Override
public boolean tryAdvance(Consumer<? super T> action) {
Objects.requireNonNull(action);
if (finished) {
return false;
}
T t;
if (started) {
t = next.apply(prev);
} else {
t = seed;
started = true;
}
if (!hasNext.test(t)) {
prev = null;
finished = true;
return false;
}
prev = t;
action.accept(prev);
return true;
}
@Override
public void forEachRemaining(Consumer<? super T> action) {
Objects.requireNonNull(action);
if (finished) {
return;
}
finished = true;
T t = started ? next.apply(prev) : seed;
prev = null;
while (hasNext.test(t)) {
action.accept(t);
t = next.apply(t);
}
}
};
return new FastStream<>(StreamSupport.stream(spliterator, false));
} }
/** /**
@ -279,6 +236,7 @@ public class FastStream<T> implements Stream<T>, Iterable<T> {
} }
// --------------------------------------------------------------- Static method end // --------------------------------------------------------------- Static method end
// endregion
/** /**
* 过滤元素返回与指定断言匹配的元素组成的流 * 过滤元素返回与指定断言匹配的元素组成的流
@ -303,10 +261,9 @@ public class FastStream<T> implements Stream<T>, Iterable<T> {
*/ */
public <R> FastStream<T> filter(Function<? super T, ? extends R> mapper, R value) { public <R> FastStream<T> filter(Function<? super T, ? extends R> mapper, R value) {
Objects.requireNonNull(mapper); Objects.requireNonNull(mapper);
return filter(e -> Objects.equals(Opt.ofNullable(e).map(mapper).get(), value)); return filter(e -> Objects.equals(mapper.apply(e), value));
} }
/** /**
* 过滤元素返回与指定断言匹配的元素组成的流断言带下标并行流时下标永远为-1 * 过滤元素返回与指定断言匹配的元素组成的流断言带下标并行流时下标永远为-1
* 这是一个无状态中间操作 * 这是一个无状态中间操作
@ -320,7 +277,7 @@ public class FastStream<T> implements Stream<T>, Iterable<T> {
return filter(e -> predicate.test(e, NOT_FOUND_INDEX)); return filter(e -> predicate.test(e, NOT_FOUND_INDEX));
} else { } else {
MutableInt index = new MutableInt(NOT_FOUND_INDEX); MutableInt index = new MutableInt(NOT_FOUND_INDEX);
return filter(e -> predicate.test(e, index.increment().get())); return filter(e -> predicate.test(e, index.incrementAndGet()));
} }
} }
@ -360,7 +317,7 @@ public class FastStream<T> implements Stream<T>, Iterable<T> {
return map(e -> mapper.apply(e, NOT_FOUND_INDEX)); return map(e -> mapper.apply(e, NOT_FOUND_INDEX));
} else { } else {
MutableInt index = new MutableInt(NOT_FOUND_INDEX); MutableInt index = new MutableInt(NOT_FOUND_INDEX);
return map(e -> mapper.apply(e, index.increment().get())); return map(e -> mapper.apply(e, index.incrementAndGet()));
} }
} }
@ -395,7 +352,7 @@ public class FastStream<T> implements Stream<T>, Iterable<T> {
return flatMap(e -> mapper.apply(e, NOT_FOUND_INDEX)); return flatMap(e -> mapper.apply(e, NOT_FOUND_INDEX));
} else { } else {
MutableInt index = new MutableInt(NOT_FOUND_INDEX); MutableInt index = new MutableInt(NOT_FOUND_INDEX);
return flatMap(e -> mapper.apply(e, index.increment().get())); return flatMap(e -> mapper.apply(e, index.incrementAndGet()));
} }
} }
@ -449,7 +406,7 @@ public class FastStream<T> implements Stream<T>, Iterable<T> {
*/ */
public <R> FastStream<R> flatMapIter(Function<? super T, ? extends Iterable<? extends R>> mapper) { public <R> FastStream<R> flatMapIter(Function<? super T, ? extends Iterable<? extends R>> mapper) {
Objects.requireNonNull(mapper); Objects.requireNonNull(mapper);
return flatMap(w -> Opt.of(w).map(mapper).map(FastStream::of).orElseGet(FastStream::empty)); return flatMap(w -> of(mapper.apply(w)));
} }
/** /**
@ -668,7 +625,7 @@ public class FastStream<T> implements Stream<T>, Iterable<T> {
stream.forEach(e -> action.accept(e, NOT_FOUND_INDEX)); stream.forEach(e -> action.accept(e, NOT_FOUND_INDEX));
} else { } else {
MutableInt index = new MutableInt(NOT_FOUND_INDEX); MutableInt index = new MutableInt(NOT_FOUND_INDEX);
stream.forEach(e -> action.accept(e, index.increment().get())); stream.forEach(e -> action.accept(e, index.incrementAndGet()));
} }
} }
@ -695,7 +652,7 @@ public class FastStream<T> implements Stream<T>, Iterable<T> {
stream.forEachOrdered(e -> action.accept(e, NOT_FOUND_INDEX)); stream.forEachOrdered(e -> action.accept(e, NOT_FOUND_INDEX));
} else { } else {
MutableInt index = new MutableInt(NOT_FOUND_INDEX); MutableInt index = new MutableInt(NOT_FOUND_INDEX);
stream.forEachOrdered(e -> action.accept(e, index.increment().get())); stream.forEachOrdered(e -> action.accept(e, index.incrementAndGet()));
} }
} }
@ -918,8 +875,8 @@ public class FastStream<T> implements Stream<T>, Iterable<T> {
* @param predicate 断言 * @param predicate 断言
* @return 与给定断言匹配的第一个元素 * @return 与给定断言匹配的第一个元素
*/ */
public T findFirst(Predicate<? super T> predicate) { public Optional<T> findFirst(Predicate<? super T> predicate) {
return stream.filter(predicate).findFirst().orElse(null); return stream.filter(predicate).findFirst();
} }
/** /**
@ -928,7 +885,7 @@ public class FastStream<T> implements Stream<T>, Iterable<T> {
* @param predicate 断言 * @param predicate 断言
* @return 与给定断言匹配的第一个元素的下标如果不存在则返回-1 * @return 与给定断言匹配的第一个元素的下标如果不存在则返回-1
*/ */
public Integer findFirstIdx(Predicate<? super T> predicate) { public int findFirstIdx(Predicate<? super T> predicate) {
Objects.requireNonNull(predicate); Objects.requireNonNull(predicate);
if (isParallel()) { if (isParallel()) {
return NOT_FOUND_INDEX; return NOT_FOUND_INDEX;
@ -949,14 +906,10 @@ public class FastStream<T> implements Stream<T>, Iterable<T> {
* @return 最后一个元素 * @return 最后一个元素
*/ */
public Optional<T> findLast() { public Optional<T> findLast() {
if (isParallel()) {
return Optional.of(toList()).filter(CollUtil::isNotEmpty).map(l -> l.get(l.size() - 1));
} else {
MutableObj<T> last = new MutableObj<>(null); MutableObj<T> last = new MutableObj<>(null);
forEach(last::set); spliterator().forEachRemaining(last);
return Optional.ofNullable(last.get()); return Optional.ofNullable(last.get());
} }
}
/** /**
* 获取与给定断言匹配的最后一个元素 * 获取与给定断言匹配的最后一个元素
@ -964,21 +917,15 @@ public class FastStream<T> implements Stream<T>, Iterable<T> {
* @param predicate 断言 * @param predicate 断言
* @return 与给定断言匹配的最后一个元素 * @return 与给定断言匹配的最后一个元素
*/ */
public T findLast(Predicate<? super T> predicate) { public Optional<T> findLast(Predicate<? super T> predicate) {
Objects.requireNonNull(predicate); Objects.requireNonNull(predicate);
if (isParallel()) {
List<T> list = toList();
final int index = ListUtil.lastIndexOf(list, predicate);
return index == NOT_FOUND_INDEX ? null : list.get(index);
} else {
MutableObj<T> last = new MutableObj<>(null); MutableObj<T> last = new MutableObj<>(null);
forEach(e -> { spliterator().forEachRemaining(e -> {
if (predicate.test(e)) { if (predicate.test(e)) {
last.set(e); last.set(e);
} }
}); });
return last.get(); return Optional.ofNullable(last.get());
}
} }
/** /**
@ -987,7 +934,7 @@ public class FastStream<T> implements Stream<T>, Iterable<T> {
* @param predicate 断言 * @param predicate 断言
* @return 与给定断言匹配的最后一个元素的下标如果不存在则返回-1 * @return 与给定断言匹配的最后一个元素的下标如果不存在则返回-1
*/ */
public Integer findLastIdx(Predicate<? super T> predicate) { public int findLastIdx(Predicate<? super T> predicate) {
Objects.requireNonNull(predicate); Objects.requireNonNull(predicate);
if (isParallel()) { if (isParallel()) {
return NOT_FOUND_INDEX; return NOT_FOUND_INDEX;
@ -1008,9 +955,10 @@ public class FastStream<T> implements Stream<T>, Iterable<T> {
* @return 反转元素顺序 * @return 反转元素顺序
*/ */
public FastStream<T> reverse() { public FastStream<T> reverse() {
List<T> list = toList(); //noinspection unchecked
Collections.reverse(list); final T[] array = (T[]) toArray();
return of(list, isParallel()); ArrayUtil.reverse(array);
return of(array).parallel(isParallel()).onClose(stream::close);
} }
/** /**
@ -1067,6 +1015,7 @@ public class FastStream<T> implements Stream<T>, Iterable<T> {
/** /**
* 返回一个无序流(无手动排序) * 返回一个无序流(无手动排序)
* <p>标记一个流是不在意元素顺序的, 在并行流的某些情况下可以提高性能</p>
* *
* @return 无序流 * @return 无序流
*/ */
@ -1136,11 +1085,11 @@ public class FastStream<T> implements Stream<T>, Iterable<T> {
* @param idx 下标 * @param idx 下标
* @return 指定下标的元素 * @return 指定下标的元素
*/ */
public T at(Integer idx) { public Optional<T> at(Integer idx) {
if (Objects.isNull(idx)) { return Opt.ofNullable(idx).map(i -> {
return null; //noinspection unchecked
} return (T) ArrayUtil.get(toArray(), i);
return CollUtil.get(toList(), idx); }).toOptional();
} }
/** /**
@ -1227,28 +1176,29 @@ public class FastStream<T> implements Stream<T>, Iterable<T> {
} }
/** /**
* 与给定的可迭代对象转换成mapkey为现有元素value为给定可迭代对象迭代的元素<br> * 与给定的可迭代对象转换成Mapkey为现有元素value为给定可迭代对象迭代的元素<br>
* 至少包含全部的key如果对应位置上的value不存在则为null * Map的大小与两个集合中较小的数量一致, , 只合并下标位置相同的部分
* *
* @param other 可迭代对象 * @param other 可迭代对象
* @param <R> 可迭代对象迭代的元素类型 * @param <R> 可迭代对象迭代的元素类型
* @return mapkey为现有元素value为给定可迭代对象迭代的元素;<br> * @return mapkey为现有元素value为给定可迭代对象迭代的元素
* 至少包含全部的key如果对应位置上的value不存在则为null;<br>
* 如果key重复, 则保留最后一个关联的value;<br>
*/ */
public <R> Map<T, R> toZip(Iterable<R> other) { public <R> Map<T, R> toZip(Iterable<R> other) {
// value对象迭代器 final Spliterator<T> keys = spliterator();
final Iterator<R> iterator = Opt.ofNullable(other).map(Iterable::iterator).orElseGet(Collections::emptyIterator); final Spliterator<R> values = Opt.ofNullable(other).map(Iterable::spliterator).orElseGet(Spliterators::emptySpliterator);
if (isParallel()) { // 获取两个Spliterator的中较小的数量
List<T> keyList = toList(); // 如果Spliterator经过流操作, getExactSizeIfKnown()可能会返回-1, 所以默认大小为 MapUtil.DEFAULT_INITIAL_CAPACITY
final Map<T, R> map = MapUtil.newHashMap(keyList.size()); final int sizeIfKnown = (int) Math.max(Math.min(keys.getExactSizeIfKnown(), values.getExactSizeIfKnown()), MapUtil.DEFAULT_INITIAL_CAPACITY);
for (T key : keyList) { final Map<T, R> map = MapUtil.newHashMap(sizeIfKnown);
map.put(key, iterator.hasNext() ? iterator.next() : null); // 保存第一个Spliterator的值
MutableObj<T> key = new MutableObj<>();
// 保存第二个Spliterator的值
MutableObj<R> value = new MutableObj<>();
// 当两个Spliterator中都还有剩余元素时
while (keys.tryAdvance(key) && values.tryAdvance(value)) {
map.put(key.get(), value.get());
} }
return map; return map;
} else {
return toMap(Function.identity(), e -> iterator.hasNext() ? iterator.next() : null);
}
} }
/** /**
@ -1391,8 +1341,7 @@ public class FastStream<T> implements Stream<T>, Iterable<T> {
/** /**
* 现有元素 给定迭代器中对应位置的元素 使用 zipper 转换为新的元素并返回新元素组成的流<br> * 现有元素 给定迭代器中对应位置的元素 使用 zipper 转换为新的元素并返回新元素组成的流<br>
* 新流的数量等于旧流元素的数量<br> * 新流的数量为两个集合中较小的数量, , 只合并下标位置相同的部分<br>
* 使用 zipper 转换时, 如果对应位置上已经没有other元素则other元素为null<br>
* *
* @param other 给定的迭代器 * @param other 给定的迭代器
* @param zipper 两个元素的合并器 * @param zipper 两个元素的合并器
@ -1403,15 +1352,21 @@ public class FastStream<T> implements Stream<T>, Iterable<T> {
public <U, R> FastStream<R> zip(Iterable<U> other, public <U, R> FastStream<R> zip(Iterable<U> other,
BiFunction<? super T, ? super U, ? extends R> zipper) { BiFunction<? super T, ? super U, ? extends R> zipper) {
Objects.requireNonNull(zipper); Objects.requireNonNull(zipper);
// 给定对象迭代器 final Spliterator<T> keys = spliterator();
final Iterator<U> iterator = Opt.ofNullable(other).map(Iterable::iterator).orElseGet(Collections::emptyIterator); final Spliterator<U> values = Opt.ofNullable(other).map(Iterable::spliterator).orElseGet(Spliterators::emptySpliterator);
Stream<T> resStream = this.stream; // 获取两个Spliterator的中较小的数量
if (isParallel()) { // 如果Spliterator经过流操作, getExactSizeIfKnown()可能会返回-1, 所以默认大小为 ArrayList.DEFAULT_CAPACITY
resStream = toList().stream(); final int sizeIfKnown = (int) Math.max(Math.min(keys.getExactSizeIfKnown(), values.getExactSizeIfKnown()), 10);
final List<R> list = new ArrayList<>(sizeIfKnown);
// 保存第一个Spliterator的值
MutableObj<T> key = new MutableObj<>();
// 保存第二个Spliterator的值
MutableObj<U> value = new MutableObj<>();
// 当两个Spliterator中都还有剩余元素时
while (keys.tryAdvance(key) && values.tryAdvance(value)) {
list.add(zipper.apply(key.get(), value.get()));
} }
final FastStream<R> newStream = of(resStream.map(e -> zipper.apply(e, iterator.hasNext() ? iterator.next() : null))); return of(list).parallel(isParallel()).onClose(stream::close);
newStream.parallel(isParallel());
return newStream;
} }
/** /**
@ -1422,40 +1377,11 @@ public class FastStream<T> implements Stream<T>, Iterable<T> {
* @param items 放入值 * @param items 放入值
* @return 操作后的流 * @return 操作后的流
*/ */
@SuppressWarnings("unchecked") @SafeVarargs
public FastStream<T> splice(int start, int deleteCount, T... items) { public final FastStream<T> splice(int start, int deleteCount, T... items) {
List<T> list = toList(); return of(ListUtil.splice(toList(), start, deleteCount, items))
final int size = list.size(); .parallel(isParallel())
// 从后往前查找 .onClose(stream::close);
if (start < 0) {
start += size;
} else if (start >= size) {
// 直接在尾部追加不删除
start = size;
deleteCount = 0;
}
// 起始位置 加上 删除的数量 超过 数据长度需要重新计算需要删除的数量
if (start + deleteCount > size) {
deleteCount = size - start;
}
// 新列表的长度
final int newSize = size - deleteCount + items.length;
List<T> resList = list;
// 新列表的长度 大于 旧列表创建新列表
if (newSize > size) {
resList = new ArrayList<>(newSize);
resList.addAll(list);
}
// 需要删除的部分
if (deleteCount > 0) {
resList.subList(start, start + deleteCount).clear();
}
// 新增的部分
if (items.length > 0) {
resList.addAll(start, Arrays.asList(items));
}
return FastStream.of(resList);
} }
/** /**
@ -1475,9 +1401,10 @@ public class FastStream<T> implements Stream<T>, Iterable<T> {
// 返回第一层只有单个元素的双层流形如[[1,2,3,4,5]] // 返回第一层只有单个元素的双层流形如[[1,2,3,4,5]]
return FastStream.<FastStream<T>>of(of(list, isParallel())); return FastStream.<FastStream<T>>of(of(list, isParallel()));
} }
return FastStream.iterate(0, i -> i < size, i -> i + batchSize) return iterate(0, i -> i < size, i -> i + batchSize)
.map(skip -> of(list.subList(skip, Math.min(size, skip + batchSize)), isParallel())) .map(skip -> of(list.subList(skip, Math.min(size, skip + batchSize)), isParallel()))
.parallel(isParallel()); .parallel(isParallel())
.onClose(stream::close);
} }
/** /**
@ -1493,6 +1420,78 @@ public class FastStream<T> implements Stream<T>, Iterable<T> {
return split(batchSize).map(FastStream::toList); return split(batchSize).map(FastStream::toList);
} }
/**
* 保留 与指定断言 匹配时的元素, 在第一次不匹配时终止, 抛弃当前(第一个不匹配元素)及后续所有元素
* <p> jdk9 中的 takeWhile 方法不太一样, 这里的实现是个 顺序的有状态的中间操作</p>
* <pre>本环节中是顺序执行的, 但是后续操作可以支持并行流: {@code
* FastStream.iterate(1, i -> i + 1)
* .parallel()
* // 顺序执行
* .takeWhile(e -> e < 50)
* // 并发
* .map(e -> e + 1)
* // 并发
* .map(String::valueOf)
* .toList();
* }</pre>
* <p>但是不建议在并行流中使用, 除非你确定 takeWhile 之后的操作能在并行流中受益很多</p>
*
* @param predicate 断言
* @return 与指定断言匹配的元素组成的流
*/
public FastStream<T> takeWhile(Predicate<? super T> predicate) {
Objects.requireNonNull(predicate);
return of(StreamHelper.takeWhile(stream, predicate));
}
/**
* 保留 与指定断言 匹配的元素, 在第一次不匹配时终止, 抛弃当前(第一个不匹配元素)及后续所有元素
* <p>takeWhile 的别名方法</p>
*
* @param predicate 断言
* @return 与指定断言匹配的元素组成的流
* @see #takeWhile(Predicate)
*/
public FastStream<T> limit(Predicate<? super T> predicate) {
return takeWhile(predicate);
}
/**
* 删除 与指定断言 匹配的元素, 在第一次不匹配时终止, 返回当前(第一个不匹配元素)及剩余元素组成的新流
* <p> jdk9 中的 dropWhile 方法不太一样, 这里的实现是个 顺序的有状态的中间操作</p>
* <pre>本环节中是顺序执行的, 但是后续操作可以支持并行流: {@code
* FastStream.iterate(1, i <= 100, i -> i + 1)
* .parallel()
* // 顺序执行
* .dropWhile(e -> e < 50)
* // 并发
* .map(e -> e + 1)
* // 并发
* .map(String::valueOf)
* .toList();
* }</pre>
* <p>但是不建议在并行流中使用, 除非你确定 dropWhile 之后的操作能在并行流中受益很多</p>
*
* @param predicate 断言
* @return 剩余元素组成的流
*/
public FastStream<T> dropWhile(Predicate<? super T> predicate) {
Objects.requireNonNull(predicate);
return of(StreamHelper.dropWhile(stream, predicate));
}
/**
* 跳过 与断言匹配的元素, 在第一次不匹配时终止, 返回当前(第一个不匹配元素)及剩余元素组成的新流
* <p>dropWhile 的别名方法</p>
*
* @param predicate 断言
* @return 剩余元素组成的流
* @see #dropWhile(Predicate)
*/
public FastStream<T> skip(Predicate<? super T> predicate) {
return dropWhile(predicate);
}
public interface FastStreamBuilder<T> extends Consumer<T>, cn.hutool.core.builder.Builder<FastStream<T>> { public interface FastStreamBuilder<T> extends Consumer<T>, cn.hutool.core.builder.Builder<FastStream<T>> {
/** /**

View File

@ -0,0 +1,72 @@
package cn.hutool.core.stream.support;
import java.util.Comparator;
import java.util.Spliterator;
import java.util.function.Consumer;
import java.util.function.Predicate;
/**
* dropWhile Spliterator
* <p>借鉴自StreamEx</p>
*
* @author emptypoint
* @since 6.0.0
*/
class DropWhileSpliterator<T> implements Spliterator<T> {
static <T> DropWhileSpliterator<T> create(Spliterator<T> source, Predicate<? super T> predicate) {
return new DropWhileSpliterator<>(source, predicate);
}
private final Spliterator<T> source;
private final Predicate<? super T> predicate;
private boolean isFound = false;
private DropWhileSpliterator(Spliterator<T> source, Predicate<? super T> predicate) {
this.source = source;
this.predicate = predicate;
}
@Override
public boolean tryAdvance(Consumer<? super T> action) {
boolean hasNext = true;
// 如果 还没找到 并且 流中还有元素 继续找
while (!isFound && hasNext) {
hasNext = source.tryAdvance(e -> {
if (!predicate.test(e)) {
// 第一次不匹配
isFound = true;
action.accept(e);
}
});
}
// 对找到的元素进行后续处理
if (isFound) {
source.forEachRemaining(action);
}
// 该环节已经处理完成
return false;
}
@Override
public Spliterator<T> trySplit() {
return null;
}
@Override
public long estimateSize() {
return Long.MAX_VALUE;
}
@Override
public int characteristics() {
return source.characteristics() & ~Spliterator.SIZED;
}
@Override
public Comparator<? super T> getComparator() {
return source.getComparator();
}
}

View File

@ -0,0 +1,76 @@
package cn.hutool.core.stream.support;
import java.util.Objects;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.function.UnaryOperator;
/**
* 无限有序流 的Spliterator
*
* @author VampireAchao
* @since 6.0.0
*/
class IterateSpliterator<T> extends Spliterators.AbstractSpliterator<T> {
public static <T> IterateSpliterator<T> create(T seed, Predicate<? super T> hasNext, UnaryOperator<T> next) {
return new IterateSpliterator<>(seed, hasNext, next);
}
/**
* Creates a spliterator reporting the given estimated size and
* additionalCharacteristics.
*/
IterateSpliterator(T seed, Predicate<? super T> hasNext, UnaryOperator<T> next) {
super(Long.MAX_VALUE, Spliterator.ORDERED | Spliterator.IMMUTABLE);
this.seed = seed;
this.hasNext = hasNext;
this.next = next;
}
private final T seed;
private final Predicate<? super T> hasNext;
private final UnaryOperator<T> next;
private T prev;
private boolean started;
private boolean finished;
@Override
public boolean tryAdvance(Consumer<? super T> action) {
Objects.requireNonNull(action);
if (finished) {
return false;
}
T t;
if (started) {
t = next.apply(prev);
} else {
t = seed;
started = true;
}
if (!hasNext.test(t)) {
prev = null;
finished = true;
return false;
}
prev = t;
action.accept(prev);
return true;
}
@Override
public void forEachRemaining(Consumer<? super T> action) {
Objects.requireNonNull(action);
if (finished) {
return;
}
finished = true;
T t = started ? next.apply(prev) : seed;
prev = null;
while (hasNext.test(t)) {
action.accept(t);
t = next.apply(t);
}
}
}

View File

@ -0,0 +1,99 @@
package cn.hutool.core.stream.support;
import java.util.Spliterator;
import java.util.function.Predicate;
import java.util.function.UnaryOperator;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import static java.util.Objects.requireNonNull;
/**
* FastStream 辅助工具类
*
* @author emptypoint
* @since 6.0.0
*/
public final class StreamHelper {
private StreamHelper() {
}
/**
* 返回无限有序流
* 该流由 初始值 然后判断条件 以及执行 迭代函数 进行迭代获取到元素
*
* @param <T> 元素类型
* @param seed 初始值
* @param hasNext 条件值
* @param next 用上一个元素作为参数执行并返回一个新的元素
* @return 无限有序流
*/
public static <T> Stream<T> iterate(T seed, Predicate<? super T> hasNext, UnaryOperator<T> next) {
requireNonNull(next);
requireNonNull(hasNext);
return StreamSupport.stream(IterateSpliterator.create(seed, hasNext, next), false);
}
/**
* 保留 与指定断言 匹配时的元素, 在第一次不匹配时终止, 抛弃当前(第一个不匹配元素)及后续所有元素
* <p> jdk9 中的 takeWhile 方法不太一样, 这里的实现是个 顺序的有状态的中间操作</p>
* <p>本环节中是顺序执行的, 但是后续操作可以支持并行流</p>
* <p>但是不建议在并行流中使用, 除非你确定 takeWhile 之后的操作能在并行流中受益很多</p>
*
* @param source 源流
* @param <T> 元素类型
* @param predicate 断言
* @return 与指定断言匹配的元素组成的流
*/
public static <T> Stream<T> takeWhile(Stream<T> source, Predicate<? super T> predicate) {
requireNonNull(source);
requireNonNull(predicate);
return createStatefulNewStream(source, TakeWhileSpliterator.create(source.spliterator(), predicate));
}
/**
* 删除 与指定断言 匹配的元素, 在第一次不匹配时终止, 返回当前(第一个不匹配元素)及剩余元素组成的新流
* <p> jdk9 中的 dropWhile 方法不太一样, 这里的实现是个 顺序的有状态的中间操作</p>
* <p>本环节中是顺序执行的, 但是后续操作可以支持并行流</p>
* <p>但是不建议在并行流中使用, 除非你确定 dropWhile 之后的操作能在并行流中受益很多</p>
*
* @param source 源流
* @param <T> 元素类型
* @param predicate 断言
* @return 剩余元素组成的流
*/
public static <T> Stream<T> dropWhile(Stream<T> source, Predicate<? super T> predicate) {
requireNonNull(source);
requireNonNull(predicate);
return createStatefulNewStream(source, DropWhileSpliterator.create(source.spliterator(), predicate));
}
// region 私有方法
/* ================================================== 私有方法 =================================================== */
/**
* 根据 源流 新的Spliterator 生成新的流
* <p>这是一个 顺序的有状态的流</p>
* <p>在新流的第一个节点是顺序执行的, 但是后续操作可以支持并行流</p>
*
* @param source 源流
* @param newSpliterator 新流的Spliterator
* @param <T> 旧流的元素类型
* @param <R> 新流的元素类型
* @return 新流
*/
private static <T, R> Stream<R> createStatefulNewStream(Stream<T> source, Spliterator<R> newSpliterator) {
// 创建新流
Stream<R> newStream = StreamSupport.stream(newSpliterator, source.isParallel());
// 如果旧流是并行流, 新流主动调用一个有状态的操作, 虽然没有意义, 但是可以让后续的无状态节点正常并发
if (source.isParallel()) {
newStream = newStream.limit(Long.MAX_VALUE);
}
// 由于新流不与旧流的节点关联, 所以需要主动设置旧流的close方法, 哪怕几乎不可能有人在旧流上设置onClose操作
return newStream.onClose(source::close);
}
/* ============================================================================================================== */
// endregion
}

View File

@ -0,0 +1,68 @@
package cn.hutool.core.stream.support;
import java.util.Comparator;
import java.util.Spliterator;
import java.util.function.Consumer;
import java.util.function.Predicate;
/**
* takeWhile Spliterator
* <p>借鉴自StreamEx</p>
*
* @author emptypoint
* @since 6.0.0
*/
class TakeWhileSpliterator<T> implements Spliterator<T> {
static <T> TakeWhileSpliterator<T> create(Spliterator<T> source, Predicate<? super T> predicate) {
return new TakeWhileSpliterator<>(source, predicate);
}
private final Spliterator<T> source;
private final Predicate<? super T> predicate;
private boolean isContinue = true;
TakeWhileSpliterator(Spliterator<T> source, Predicate<? super T> predicate) {
this.source = source;
this.predicate = predicate;
}
@Override
public boolean tryAdvance(Consumer<? super T> action) {
boolean hasNext = true;
// 如果 还可以继续 并且 流中还有元素 则继续遍历
while (isContinue && hasNext) {
hasNext = source.tryAdvance(e -> {
if (predicate.test(e)) {
action.accept(e);
} else {
// 终止遍历剩下的元素
isContinue = false;
}
});
}
// 该环节已经处理完成
return false;
}
@Override
public Spliterator<T> trySplit() {
return null;
}
@Override
public long estimateSize() {
return isContinue ? source.estimateSize() : 0;
}
@Override
public int characteristics() {
return source.characteristics() & ~(Spliterator.SIZED | Spliterator.SUBSIZED);
}
@Override
public Comparator<? super T> getComparator() {
return source.getComparator();
}
}

View File

@ -76,7 +76,7 @@ public class FastStreamTest {
Map<Integer, String> toZip = FastStream.of(orders).toZip(list); Map<Integer, String> toZip = FastStream.of(orders).toZip(list);
Assert.assertEquals(map, toZip); Assert.assertEquals(map, toZip);
Map<Integer, String> toZipParallel = FastStream.of(orders).parallel().toZip(list); Map<Integer, String> toZipParallel = FastStream.of(orders).parallel().nonNull().toZip(list);
Assert.assertEquals(map, toZipParallel); Assert.assertEquals(map, toZipParallel);
} }
@ -241,10 +241,12 @@ public class FastStreamTest {
@Test @Test
public void testAt() { public void testAt() {
List<Integer> list = Arrays.asList(1, 2, 3); List<Integer> list = Arrays.asList(1, 2, 3);
Assert.assertEquals(1, (Object) FastStream.of(list).at(0)); Assert.assertEquals(1, (Object) FastStream.of(list).at(0).orElse(null));
Assert.assertEquals(1, (Object) FastStream.of(list).at(-3)); Assert.assertEquals(2, (Object) FastStream.of(list).at(1).orElse(null));
Assert.assertEquals(3, (Object) FastStream.of(list).at(-1)); Assert.assertEquals(3, (Object) FastStream.of(list).at(2).orElse(null));
Assert.assertNull(FastStream.of(list).at(-4)); Assert.assertEquals(1, (Object) FastStream.of(list).at(-3).orElse(null));
Assert.assertEquals(3, (Object) FastStream.of(list).at(-1).orElse(null));
Assert.assertNull(FastStream.of(list).at(-4).orElse(null));
} }
@Test @Test
@ -262,34 +264,46 @@ public class FastStreamTest {
@Test @Test
public void testFindFirst() { public void testFindFirst() {
List<Integer> list = Arrays.asList(1, 2, 3); List<Integer> list = Arrays.asList(1, 2, 3);
Integer find = FastStream.of(list).findFirst(Objects::nonNull); Integer find = FastStream.of(list).findFirst(Objects::nonNull).orElse(null);
Assert.assertEquals(1, (Object) find); Assert.assertEquals(1, (Object) find);
} }
@Test @Test
public void testFindFirstIdx() { public void testFindFirstIdx() {
List<Integer> list = Arrays.asList(null, 2, 3); List<Integer> list = Arrays.asList(null, 2, 3);
Integer idx = FastStream.of(list).findFirstIdx(Objects::nonNull); Assert.assertEquals(1, FastStream.of(list).findFirstIdx(Objects::nonNull));
Assert.assertEquals(1, (Object) idx);
Assert.assertEquals(-1, (Object) FastStream.of(list).parallel().findFirstIdx(Objects::nonNull)); Assert.assertEquals(-1, (Object) FastStream.of(list).parallel().findFirstIdx(Objects::nonNull));
} }
@Test @Test
public void testFindLast() { public void testFindLast() {
List<Integer> list = ListUtil.of(1, null, 3); List<Integer> list = ListUtil.of(1, 2, 4, 5, 6, 7, 8, 9, 10, 3);
Integer find = FastStream.of(list).parallel().findLast(Objects::nonNull); Assert.assertEquals(3, (Object) FastStream.of(list).findLast().orElse(null));
Assert.assertEquals(3, (Object) find);
Assert.assertEquals(3, (Object) FastStream.of(list).parallel().findLast().orElse(null)); Assert.assertEquals(3, (Object) FastStream.of(list).parallel().findLast().orElse(null));
List<Integer> list2 = ListUtil.of(1, 2, 4, 5, 6, 7, 8, 9, 10, 3, null);
Assert.assertEquals(3, (Object) FastStream.of(list2).parallel().findLast(Objects::nonNull).orElse(null));
Assert.assertNull(FastStream.of().parallel().findLast(Objects::nonNull).orElse(null));
Assert.assertNull(FastStream.of((Object) null).parallel().findLast(Objects::nonNull).orElse(null));
} }
@Test @Test
public void testFindLastIdx() { public void testFindLastIdx() {
List<Integer> list = Arrays.asList(1, null, 3); List<Integer> list = Arrays.asList(1, null, 3);
Integer idx = FastStream.of(list).findLastIdx(Objects::nonNull); Assert.assertEquals(2, (Object) FastStream.of(list).findLastIdx(Objects::nonNull));
Assert.assertEquals(2, (Object) idx);
Assert.assertEquals(-1, (Object) FastStream.of(list).parallel().findLastIdx(Objects::nonNull)); Assert.assertEquals(-1, (Object) FastStream.of(list).parallel().findLastIdx(Objects::nonNull));
} }
@Test
public void testReverse() {
final List<Integer> list = ListUtil.of(Stream.iterate(1, i -> i + 1).limit(1000).collect(Collectors.toList()));
Assert.assertEquals(ListUtil.reverseNew(list), FastStream.of(list).reverse().toList());
Assert.assertEquals(ListUtil.empty(), FastStream.of().reverse().toList());
Assert.assertEquals(ListUtil.of((Object) null), FastStream.of((Object) null).reverse().toList());
}
@Test @Test
public void testZip() { public void testZip() {
List<Integer> orders = Arrays.asList(1, 2, 3); List<Integer> orders = Arrays.asList(1, 2, 3);
@ -319,4 +333,59 @@ public class FastStreamTest {
lists = FastStream.of(list).splitList(list.size()).toList(); lists = FastStream.of(list).splitList(list.size()).toList();
Assert.assertEquals(singletonList(list), lists); Assert.assertEquals(singletonList(list), lists);
} }
@Test
public void testTakeWhile() {
// 1 10
final List<Integer> list = FastStream.iterate(1, i -> i <= 10, i -> i + 1).toList();
final List<String> res1 = FastStream.of(list)
// 舍弃 5
.takeWhile(e -> e < 5)
// 过滤奇数
.filter(e -> (e & 1) == 0)
// 反序
.sorted(Comparator.reverseOrder())
.map(String::valueOf)
.toList();
Assert.assertEquals(Arrays.asList("4", "2"), res1);
final List<Integer> res2 = FastStream.iterate(1, i -> i + 1)
.parallel()
.takeWhile(e -> e < 5)
.map(String::valueOf)
.map(Integer::valueOf)
.sorted(Comparator.naturalOrder())
.toList();
Assert.assertEquals(Arrays.asList(1, 2, 3, 4), res2);
}
@Test
public void testDropWhile() {
// 1 10
final List<Integer> list = FastStream.iterate(1, i -> i <= 10, i -> i + 1).toList();
final List<String> res1 = FastStream.of(list)
// 舍弃 5之前的数字
.dropWhile(e -> e < 5)
// 过滤偶数
.filter(e -> (e & 1) == 1)
// 反序
.sorted(Comparator.reverseOrder())
.map(String::valueOf)
.toList();
Assert.assertEquals(Arrays.asList("9", "7", "5"), res1);
final List<Integer> res2 = FastStream.of(list)
.parallel()
.dropWhile(e -> e < 5)
// 过滤偶数
.filter(e -> (e & 1) == 1)
.map(String::valueOf)
.map(Integer::valueOf)
.sorted(Comparator.naturalOrder())
.toList();
Assert.assertEquals(Arrays.asList(5, 7, 9), res2);
}
} }