[fix] 移除xxxIdx对并行流的支持

This commit is contained in:
VampireAchao 2023-06-13 11:45:55 +08:00
parent 5b8bbe4943
commit 928bcc52e3
3 changed files with 25 additions and 93 deletions

View File

@ -269,9 +269,6 @@ public interface TerminableWrappedStream<T, S extends TerminableWrappedStream<T,
@SuppressWarnings("ResultOfMethodCallIgnored") @SuppressWarnings("ResultOfMethodCallIgnored")
default int findFirstIdx(final Predicate<? super T> predicate) { default int findFirstIdx(final Predicate<? super T> predicate) {
Objects.requireNonNull(predicate); Objects.requireNonNull(predicate);
if (isParallel()) {
return NOT_FOUND_ELEMENT_INDEX;
} else {
final MutableInt index = new MutableInt(NOT_FOUND_ELEMENT_INDEX); final MutableInt index = new MutableInt(NOT_FOUND_ELEMENT_INDEX);
unwrap().filter(e -> { unwrap().filter(e -> {
index.increment(); index.increment();
@ -279,7 +276,6 @@ public interface TerminableWrappedStream<T, S extends TerminableWrappedStream<T,
}).findFirst();// 此处只做计数不需要值 }).findFirst();// 此处只做计数不需要值
return index.get(); return index.get();
} }
}
/** /**
* 获取最后一个元素 * 获取最后一个元素
@ -317,9 +313,6 @@ public interface TerminableWrappedStream<T, S extends TerminableWrappedStream<T,
*/ */
default int findLastIdx(final Predicate<? super T> predicate) { default int findLastIdx(final Predicate<? super T> predicate) {
Objects.requireNonNull(predicate); Objects.requireNonNull(predicate);
if (isParallel()) {
return NOT_FOUND_ELEMENT_INDEX;
} else {
final MutableInt idxRef = new MutableInt(NOT_FOUND_ELEMENT_INDEX); final MutableInt idxRef = new MutableInt(NOT_FOUND_ELEMENT_INDEX);
forEachIdx((e, i) -> { forEachIdx((e, i) -> {
if (predicate.test(e)) { if (predicate.test(e)) {
@ -328,7 +321,6 @@ public interface TerminableWrappedStream<T, S extends TerminableWrappedStream<T,
}); });
return idxRef.get(); return idxRef.get();
} }
}
/** /**
* 获取流中指定下标的元素如果是负数则从最后一个开始数起 * 获取流中指定下标的元素如果是负数则从最后一个开始数起
@ -505,15 +497,9 @@ public interface TerminableWrappedStream<T, S extends TerminableWrappedStream<T,
*/ */
default void forEachIdx(final BiConsumer<? super T, Integer> action) { default void forEachIdx(final BiConsumer<? super T, Integer> action) {
Objects.requireNonNull(action); Objects.requireNonNull(action);
final boolean isParallel = isParallel();
if (isParallel) {
EasyStream.of(toIdxMap().entrySet()).parallel()
.forEach(e -> action.accept(e.getValue(), e.getKey()));
} else {
final MutableInt index = new MutableInt(NOT_FOUND_ELEMENT_INDEX); final MutableInt index = new MutableInt(NOT_FOUND_ELEMENT_INDEX);
unwrap().forEach(e -> action.accept(e, index.incrementAndGet())); unwrap().forEach(e -> action.accept(e, index.incrementAndGet()));
} }
}
/** /**
* 对流里面的每一个元素按照顺序执行一个操作操作带下标并行流时下标永远为-1 * 对流里面的每一个元素按照顺序执行一个操作操作带下标并行流时下标永远为-1
@ -523,15 +509,9 @@ public interface TerminableWrappedStream<T, S extends TerminableWrappedStream<T,
*/ */
default void forEachOrderedIdx(final BiConsumer<? super T, Integer> action) { default void forEachOrderedIdx(final BiConsumer<? super T, Integer> action) {
Objects.requireNonNull(action); Objects.requireNonNull(action);
final boolean isParallel = isParallel();
if (isParallel) {
EasyStream.of(toIdxMap().entrySet()).parallel()
.forEachOrdered(e -> action.accept(e.getValue(), e.getKey()));
} else {
final MutableInt index = new MutableInt(NOT_FOUND_ELEMENT_INDEX); final MutableInt index = new MutableInt(NOT_FOUND_ELEMENT_INDEX);
unwrap().forEachOrdered(e -> action.accept(e, index.incrementAndGet())); unwrap().forEachOrdered(e -> action.accept(e, index.incrementAndGet()));
} }
}
// endregion // endregion

View File

@ -279,18 +279,9 @@ public interface TransformableWrappedStream<T, S extends TransformableWrappedStr
*/ */
default S peekIdx(final BiConsumer<? super T, Integer> action) { default S peekIdx(final BiConsumer<? super T, Integer> action) {
Objects.requireNonNull(action); Objects.requireNonNull(action);
final boolean isParallel = isParallel();
if (isParallel) {
final Map<Integer, T> idxMap = easyStream().toIdxMap();
return wrap(EasyStream.of(idxMap.entrySet())
.parallel()
.peek(e -> action.accept(e.getValue(), e.getKey()))
.map(Map.Entry::getValue));
} else {
final AtomicInteger index = new AtomicInteger(NOT_FOUND_ELEMENT_INDEX); final AtomicInteger index = new AtomicInteger(NOT_FOUND_ELEMENT_INDEX);
return peek(e -> action.accept(e, index.incrementAndGet())); return peek(e -> action.accept(e, index.incrementAndGet()));
} }
}
/** /**
* 返回叠加调用{@link Console#log(Object)}打印出结果的流 * 返回叠加调用{@link Console#log(Object)}打印出结果的流
@ -384,17 +375,9 @@ public interface TransformableWrappedStream<T, S extends TransformableWrappedStr
*/ */
default S filterIdx(final BiPredicate<? super T, Integer> predicate) { default S filterIdx(final BiPredicate<? super T, Integer> predicate) {
Objects.requireNonNull(predicate); Objects.requireNonNull(predicate);
final boolean isParallel = isParallel();
if (isParallel) {
final Map<Integer, T> idxMap = easyStream().toIdxMap();
return wrap(EasyStream.of(idxMap.entrySet()).parallel()
.filter(e -> predicate.test(e.getValue(), e.getKey()))
.map(Map.Entry::getValue));
} else {
final MutableInt index = new MutableInt(NOT_FOUND_ELEMENT_INDEX); final MutableInt index = new MutableInt(NOT_FOUND_ELEMENT_INDEX);
return filter(e -> predicate.test(e, index.incrementAndGet())); return filter(e -> predicate.test(e, index.incrementAndGet()));
} }
}
/** /**
* 过滤元素返回与 指定操作结果 匹配 指定值 的元素组成的流 * 过滤元素返回与 指定操作结果 匹配 指定值 的元素组成的流
@ -441,16 +424,9 @@ public interface TransformableWrappedStream<T, S extends TransformableWrappedStr
*/ */
default <R> EasyStream<R> flatMapIdx(final BiFunction<? super T, Integer, ? extends Stream<? extends R>> mapper) { default <R> EasyStream<R> flatMapIdx(final BiFunction<? super T, Integer, ? extends Stream<? extends R>> mapper) {
Objects.requireNonNull(mapper); Objects.requireNonNull(mapper);
final boolean isParallel = isParallel();
if (isParallel) {
final Map<Integer, T> idxMap = easyStream().toIdxMap();
return EasyStream.of(idxMap.entrySet()).parallel()
.flatMap(e -> mapper.apply(e.getValue(), e.getKey()));
} else {
final MutableInt index = new MutableInt(NOT_FOUND_ELEMENT_INDEX); final MutableInt index = new MutableInt(NOT_FOUND_ELEMENT_INDEX);
return flatMap(e -> mapper.apply(e, index.incrementAndGet())); return flatMap(e -> mapper.apply(e, index.incrementAndGet()));
} }
}
/** /**
* 扩散流操作可能影响流元素个数将原有流元素执行mapper操作, 转换为迭代器元素, * 扩散流操作可能影响流元素个数将原有流元素执行mapper操作, 转换为迭代器元素,
@ -554,16 +530,9 @@ public interface TransformableWrappedStream<T, S extends TransformableWrappedStr
*/ */
default <R> EasyStream<R> mapIdx(final BiFunction<? super T, Integer, ? extends R> mapper) { default <R> EasyStream<R> mapIdx(final BiFunction<? super T, Integer, ? extends R> mapper) {
Objects.requireNonNull(mapper); Objects.requireNonNull(mapper);
final boolean isParallel = isParallel();
if (isParallel) {
final Map<Integer, T> idxMap = easyStream().toIdxMap();
return EasyStream.of(idxMap.entrySet()).parallel()
.map(e -> mapper.apply(e.getValue(), e.getKey()));
} else {
final MutableInt index = new MutableInt(NOT_FOUND_ELEMENT_INDEX); final MutableInt index = new MutableInt(NOT_FOUND_ELEMENT_INDEX);
return map(e -> mapper.apply(e, index.incrementAndGet())); return map(e -> mapper.apply(e, index.incrementAndGet()));
} }
}
/** /**
* 扩散流操作可能影响流元素个数将原有流元素执行mapper操作返回多个流所有元素组成的流操作带一个方法调用该方法可增加元素 * 扩散流操作可能影响流元素个数将原有流元素执行mapper操作返回多个流所有元素组成的流操作带一个方法调用该方法可增加元素

View File

@ -12,7 +12,6 @@ import org.junit.jupiter.api.Test;
import java.math.BigDecimal; import java.math.BigDecimal;
import java.math.RoundingMode; import java.math.RoundingMode;
import java.util.*; import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function; import java.util.function.Function;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
@ -145,9 +144,6 @@ public class EasyStreamTest {
final List<String> list = Arrays.asList("dromara", "hutool", "sweet"); final List<String> list = Arrays.asList("dromara", "hutool", "sweet");
final List<String> mapIndex = EasyStream.of(list).mapIdx((e, i) -> i + 1 + "." + e).toList(); final List<String> mapIndex = EasyStream.of(list).mapIdx((e, i) -> i + 1 + "." + e).toList();
Assertions.assertEquals(Arrays.asList("1.dromara", "2.hutool", "3.sweet"), mapIndex); Assertions.assertEquals(Arrays.asList("1.dromara", "2.hutool", "3.sweet"), mapIndex);
// 并行流时正常
Assertions.assertEquals(Arrays.asList("1.dromara", "2.hutool", "3.sweet"),
EasyStream.of("dromara", "hutool", "sweet").parallel().mapIdx((e, i) -> i + 1 + "." + e).toList());
} }
@Test @Test
@ -199,10 +195,6 @@ public class EasyStreamTest {
final EasyStream.Builder<String> builder = EasyStream.builder(); final EasyStream.Builder<String> builder = EasyStream.builder();
EasyStream.of(list).forEachIdx((e, i) -> builder.accept(i + 1 + "." + e)); EasyStream.of(list).forEachIdx((e, i) -> builder.accept(i + 1 + "." + e));
Assertions.assertEquals(Arrays.asList("1.dromara", "2.hutool", "3.sweet"), builder.build().toList()); Assertions.assertEquals(Arrays.asList("1.dromara", "2.hutool", "3.sweet"), builder.build().toList());
// 并行流时正常
final AtomicInteger total = new AtomicInteger(0);
EasyStream.of("dromara", "hutool", "sweet").parallel().forEachIdx((e, i) -> total.addAndGet(i));
Assertions.assertEquals(3, total.get());
} }
@Test @Test
@ -223,10 +215,6 @@ public class EasyStreamTest {
final List<String> list = Arrays.asList("dromara", "hutool", "sweet"); final List<String> list = Arrays.asList("dromara", "hutool", "sweet");
final List<String> mapIndex = EasyStream.of(list).flatMapIdx((e, i) -> EasyStream.of(i + 1 + "." + e)).toList(); final List<String> mapIndex = EasyStream.of(list).flatMapIdx((e, i) -> EasyStream.of(i + 1 + "." + e)).toList();
Assertions.assertEquals(Arrays.asList("1.dromara", "2.hutool", "3.sweet"), mapIndex); Assertions.assertEquals(Arrays.asList("1.dromara", "2.hutool", "3.sweet"), mapIndex);
// 并行流时正常
Assertions.assertEquals(Arrays.asList("1.dromara", "2.hutool", "3.sweet"),
EasyStream.of("dromara", "hutool", "sweet").parallel()
.flatMapIdx((e, i) -> EasyStream.of(i + 1 + "." + e)).toList());
} }
@Test @Test
@ -280,9 +268,6 @@ public class EasyStreamTest {
final List<String> list = Arrays.asList("dromara", "hutool", "sweet"); final List<String> list = Arrays.asList("dromara", "hutool", "sweet");
final List<String> filterIndex = EasyStream.of(list).filterIdx((e, i) -> i < 2).toList(); final List<String> filterIndex = EasyStream.of(list).filterIdx((e, i) -> i < 2).toList();
Assertions.assertEquals(Arrays.asList("dromara", "hutool"), filterIndex); Assertions.assertEquals(Arrays.asList("dromara", "hutool"), filterIndex);
// 并行流时正常
Assertions.assertEquals(Arrays.asList("dromara", "hutool"),
EasyStream.of("dromara", "hutool", "sweet").parallel().filterIdx((e, i) -> i < 2).toList());
} }
@Test @Test
@ -350,7 +335,6 @@ public class EasyStreamTest {
public void testFindFirstIdx() { public void testFindFirstIdx() {
final List<Integer> list = Arrays.asList(null, 2, 3); final List<Integer> list = Arrays.asList(null, 2, 3);
Assertions.assertEquals(1, EasyStream.of(list).findFirstIdx(Objects::nonNull)); Assertions.assertEquals(1, EasyStream.of(list).findFirstIdx(Objects::nonNull));
Assertions.assertEquals(-1, (Object) EasyStream.of(list).parallel().findFirstIdx(Objects::nonNull));
} }
@Test @Test
@ -370,7 +354,6 @@ public class EasyStreamTest {
public void testFindLastIdx() { public void testFindLastIdx() {
final List<Integer> list = Arrays.asList(1, null, 3); final List<Integer> list = Arrays.asList(1, null, 3);
Assertions.assertEquals(2, (Object) EasyStream.of(list).findLastIdx(Objects::nonNull)); Assertions.assertEquals(2, (Object) EasyStream.of(list).findLastIdx(Objects::nonNull));
Assertions.assertEquals(-1, (Object) EasyStream.of(list).parallel().findLastIdx(Objects::nonNull));
} }
@Test @Test