[fix] 处理并行流下zip等依赖于toIdxMap函数时没有保证顺序问题导致的数据顺序错乱的问题

This commit is contained in:
VampireAchao 2023-06-13 11:18:30 +08:00
parent add4a44039
commit 169c625f3b
2 changed files with 29 additions and 27 deletions

View File

@ -12,10 +12,10 @@
package org.dromara.hutool.core.stream;
import org.dromara.hutool.core.array.ArrayUtil;
import org.dromara.hutool.core.lang.Opt;
import org.dromara.hutool.core.lang.mutable.MutableInt;
import org.dromara.hutool.core.lang.mutable.MutableObj;
import org.dromara.hutool.core.array.ArrayUtil;
import java.util.*;
import java.util.function.*;
@ -214,7 +214,7 @@ public interface TerminableWrappedStream<T, S extends TerminableWrappedStream<T,
*/
default <U> Map<Integer, U> toIdxMap(final Function<? super T, ? extends U> valueMapper) {
final MutableInt index = new MutableInt(NOT_FOUND_ELEMENT_INDEX);
return EasyStream.of(toList()).toMap(e -> index.incrementAndGet(), valueMapper, (l, r) -> r);
return EasyStream.of(parallel().toList()).toMap(e -> index.incrementAndGet(), valueMapper, (l, r) -> r);
}
// region ============ to zip ============
@ -505,9 +505,10 @@ public interface TerminableWrappedStream<T, S extends TerminableWrappedStream<T,
*/
default void forEachIdx(final BiConsumer<? super T, Integer> action) {
Objects.requireNonNull(action);
if (isParallel()) {
EasyStream.of(toIdxMap().entrySet()).parallel(isParallel())
.forEach(e -> action.accept(e.getValue(), e.getKey()));
final boolean isParallel = isParallel();
if (isParallel) {
EasyStream.of(toIdxMap().entrySet()).parallel()
.forEach(e -> action.accept(e.getValue(), e.getKey()));
} else {
final MutableInt index = new MutableInt(NOT_FOUND_ELEMENT_INDEX);
unwrap().forEach(e -> action.accept(e, index.incrementAndGet()));
@ -522,10 +523,10 @@ public interface TerminableWrappedStream<T, S extends TerminableWrappedStream<T,
*/
default void forEachOrderedIdx(final BiConsumer<? super T, Integer> action) {
Objects.requireNonNull(action);
if (isParallel()) {
EasyStream.of(toIdxMap().entrySet())
.parallel(isParallel())
.forEachOrdered(e -> action.accept(e.getValue(), e.getKey()));
final boolean isParallel = isParallel();
if (isParallel) {
EasyStream.of(toIdxMap().entrySet()).parallel()
.forEachOrdered(e -> action.accept(e.getValue(), e.getKey()));
} else {
final MutableInt index = new MutableInt(NOT_FOUND_ELEMENT_INDEX);
unwrap().forEachOrdered(e -> action.accept(e, index.incrementAndGet()));

View File

@ -12,6 +12,7 @@
package org.dromara.hutool.core.stream;
import org.dromara.hutool.core.array.ArrayUtil;
import org.dromara.hutool.core.collection.ListUtil;
import org.dromara.hutool.core.collection.iter.IterUtil;
import org.dromara.hutool.core.lang.Console;
@ -19,7 +20,6 @@ import org.dromara.hutool.core.lang.mutable.MutableInt;
import org.dromara.hutool.core.lang.mutable.MutableObj;
import org.dromara.hutool.core.map.MapUtil;
import org.dromara.hutool.core.map.SafeConcurrentHashMap;
import org.dromara.hutool.core.array.ArrayUtil;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
@ -279,12 +279,13 @@ public interface TransformableWrappedStream<T, S extends TransformableWrappedStr
*/
default S peekIdx(final BiConsumer<? super T, Integer> action) {
Objects.requireNonNull(action);
if (isParallel()) {
final boolean isParallel = isParallel();
if (isParallel) {
final Map<Integer, T> idxMap = easyStream().toIdxMap();
return wrap(EasyStream.of(idxMap.entrySet())
.parallel(isParallel())
.peek(e -> action.accept(e.getValue(), e.getKey()))
.map(Map.Entry::getValue));
.parallel()
.peek(e -> action.accept(e.getValue(), e.getKey()))
.map(Map.Entry::getValue));
} else {
final AtomicInteger index = new AtomicInteger(NOT_FOUND_ELEMENT_INDEX);
return peek(e -> action.accept(e, index.incrementAndGet()));
@ -383,12 +384,12 @@ public interface TransformableWrappedStream<T, S extends TransformableWrappedStr
*/
default S filterIdx(final BiPredicate<? super T, Integer> predicate) {
Objects.requireNonNull(predicate);
if (isParallel()) {
final boolean isParallel = isParallel();
if (isParallel) {
final Map<Integer, T> idxMap = easyStream().toIdxMap();
return wrap(EasyStream.of(idxMap.entrySet())
.parallel(isParallel())
.filter(e -> predicate.test(e.getValue(), e.getKey()))
.map(Map.Entry::getValue));
return wrap(EasyStream.of(idxMap.entrySet()).parallel()
.filter(e -> predicate.test(e.getValue(), e.getKey()))
.map(Map.Entry::getValue));
} else {
final MutableInt index = new MutableInt(NOT_FOUND_ELEMENT_INDEX);
return filter(e -> predicate.test(e, index.incrementAndGet()));
@ -440,11 +441,11 @@ public interface TransformableWrappedStream<T, S extends TransformableWrappedStr
*/
default <R> EasyStream<R> flatMapIdx(final BiFunction<? super T, Integer, ? extends Stream<? extends R>> mapper) {
Objects.requireNonNull(mapper);
if (isParallel()) {
final boolean isParallel = isParallel();
if (isParallel) {
final Map<Integer, T> idxMap = easyStream().toIdxMap();
return EasyStream.of(idxMap.entrySet())
.parallel(isParallel())
.flatMap(e -> mapper.apply(e.getValue(), e.getKey()));
return EasyStream.of(idxMap.entrySet()).parallel()
.flatMap(e -> mapper.apply(e.getValue(), e.getKey()));
} else {
final MutableInt index = new MutableInt(NOT_FOUND_ELEMENT_INDEX);
return flatMap(e -> mapper.apply(e, index.incrementAndGet()));
@ -553,11 +554,11 @@ public interface TransformableWrappedStream<T, S extends TransformableWrappedStr
*/
default <R> EasyStream<R> mapIdx(final BiFunction<? super T, Integer, ? extends R> mapper) {
Objects.requireNonNull(mapper);
if (isParallel()) {
final boolean isParallel = isParallel();
if (isParallel) {
final Map<Integer, T> idxMap = easyStream().toIdxMap();
return EasyStream.of(idxMap.entrySet())
.parallel(isParallel())
.map(e -> mapper.apply(e.getValue(), e.getKey()));
return EasyStream.of(idxMap.entrySet()).parallel()
.map(e -> mapper.apply(e.getValue(), e.getKey()));
} else {
final MutableInt index = new MutableInt(NOT_FOUND_ELEMENT_INDEX);
return map(e -> mapper.apply(e, index.incrementAndGet()));