mirror of
https://gitee.com/chinabugotech/hutool.git
synced 2025-05-09 23:51:34 +08:00
!840 【6.x】:trollface: 调整mapIdx、filterIdx、peekIdx、forEachIdx的并行流逻辑,添加toIdxMap函数,添加transform接口方法
Merge pull request !840 from 阿超/v6-dev
This commit is contained in:
commit
7c8881eb54
@ -19,7 +19,7 @@ public abstract class AbstractEnhancedWrappedStream<T, S extends AbstractEnhance
|
||||
/**
|
||||
* 原始流实例
|
||||
*/
|
||||
protected final Stream<T> stream;
|
||||
protected Stream<T> stream;
|
||||
|
||||
/**
|
||||
* 获取被包装的元素流实例
|
||||
|
@ -263,6 +263,18 @@ public class EasyStream<T> extends AbstractEnhancedWrappedStream<T, EasyStream<T
|
||||
return new EasyStream<>(stream);
|
||||
}
|
||||
|
||||
/**
|
||||
* 转换为子类实现
|
||||
*
|
||||
* @param stream 流
|
||||
* @return 子类实现
|
||||
*/
|
||||
@Override
|
||||
public EasyStream<T> transform(Stream<T> stream) {
|
||||
this.stream = stream;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* 建造者
|
||||
*
|
||||
|
@ -756,6 +756,18 @@ public class EntryStream<K, V> extends AbstractEnhancedWrappedStream<Map.Entry<K
|
||||
return new EntryStream<>(stream);
|
||||
}
|
||||
|
||||
/**
|
||||
* 转换为子类实现
|
||||
*
|
||||
* @param stream 流
|
||||
* @return 子类实现
|
||||
*/
|
||||
@Override
|
||||
public EntryStream<K, V> transform(Stream<Map.Entry<K, V>> stream) {
|
||||
this.stream = stream;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* key重复时直接抛出异常
|
||||
*/
|
||||
|
@ -181,14 +181,36 @@ public interface TerminableWrappedStream<T, S extends TerminableWrappedStream<T,
|
||||
Objects.requireNonNull(valueMapper);
|
||||
Objects.requireNonNull(mergeFunction);
|
||||
Objects.requireNonNull(mapSupplier);
|
||||
return unwrap().collect(Collectors.toMap(keyMapper, valueMapper, mergeFunction, mapSupplier));
|
||||
return unwrap().collect(CollectorUtil.toMap(keyMapper, valueMapper, mergeFunction, mapSupplier));
|
||||
}
|
||||
|
||||
/**
|
||||
* 转换为map,key为下标,value为元素
|
||||
*
|
||||
* @return map
|
||||
*/
|
||||
default Map<Integer, T> toIdxMap() {
|
||||
return toIdxMap(Function.identity());
|
||||
}
|
||||
|
||||
/**
|
||||
* 转换为map,key为下标,value为给定操作执行后的返回值
|
||||
*
|
||||
* @param valueMapper 指定value操作
|
||||
* @param <U> value类型
|
||||
* @return map
|
||||
*/
|
||||
default <U> Map<Integer, U> toIdxMap(Function<? super T, ? extends U> valueMapper) {
|
||||
final MutableInt index = new MutableInt(NOT_FOUND_ELEMENT_INDEX);
|
||||
return EasyStream.of(toList()).toMap(e -> index.incrementAndGet(), valueMapper, (l, r) -> r);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* <p>将集合转换为树,默认用 {@code parentId == null} 来判断树的根节点
|
||||
* 因为需要在当前传入数据里查找,所以这是一个结束操作 <br>
|
||||
*
|
||||
* @param idGetter id的getter对应的lambda,可以写作 {@code Student::getId} 会过滤掉id为null的元素
|
||||
* @param idGetter id的getter对应的lambda,可以写作 {@code Student::getId}
|
||||
* @param pIdGetter parentId的getter对应的lambda,可以写作 {@code Student::getParentId}
|
||||
* @param childrenSetter children的setter对应的lambda,可以写作{ @code Student::setChildren}
|
||||
* @param <R> 此处是id、parentId的泛型限制
|
||||
@ -211,7 +233,7 @@ public interface TerminableWrappedStream<T, S extends TerminableWrappedStream<T,
|
||||
* 将集合转换为树,自定义根节点的判断条件
|
||||
* 因为需要在当前传入数据里查找,所以这是一个结束操作
|
||||
*
|
||||
* @param idGetter id的getter对应的lambda,可以写作 {@code Student::getId} 会过滤掉id为null的元素
|
||||
* @param idGetter id的getter对应的lambda,可以写作 {@code Student::getId}
|
||||
* @param pIdGetter parentId的getter对应的lambda,可以写作 {@code Student::getParentId}
|
||||
* @param childrenSetter children的setter对应的lambda,可以写作 {@code Student::setChildren}
|
||||
* @param parentPredicate 树顶部的判断条件,可以写作 {@code s -> Objects.equals(s.getParentId(),0L) }
|
||||
@ -535,7 +557,8 @@ public interface TerminableWrappedStream<T, S extends TerminableWrappedStream<T,
|
||||
default void forEachIdx(final BiConsumer<? super T, Integer> action) {
|
||||
Objects.requireNonNull(action);
|
||||
if (isParallel()) {
|
||||
unwrap().forEach(e -> action.accept(e, NOT_FOUND_ELEMENT_INDEX));
|
||||
EasyStream.of(toIdxMap().entrySet()).parallel(isParallel())
|
||||
.forEach(e -> action.accept(e.getValue(), e.getKey()));
|
||||
} else {
|
||||
final MutableInt index = new MutableInt(NOT_FOUND_ELEMENT_INDEX);
|
||||
unwrap().forEach(e -> action.accept(e, index.incrementAndGet()));
|
||||
@ -551,7 +574,9 @@ public interface TerminableWrappedStream<T, S extends TerminableWrappedStream<T,
|
||||
default void forEachOrderedIdx(final BiConsumer<? super T, Integer> action) {
|
||||
Objects.requireNonNull(action);
|
||||
if (isParallel()) {
|
||||
unwrap().forEachOrdered(e -> action.accept(e, NOT_FOUND_ELEMENT_INDEX));
|
||||
EasyStream.of(toIdxMap().entrySet())
|
||||
.parallel(isParallel())
|
||||
.forEachOrdered(e -> action.accept(e.getValue(), e.getKey()));
|
||||
} else {
|
||||
final MutableInt index = new MutableInt(NOT_FOUND_ELEMENT_INDEX);
|
||||
unwrap().forEachOrdered(e -> action.accept(e, index.incrementAndGet()));
|
||||
|
@ -9,19 +9,10 @@ import cn.hutool.core.map.MapUtil;
|
||||
import cn.hutool.core.map.SafeConcurrentHashMap;
|
||||
import cn.hutool.core.util.ArrayUtil;
|
||||
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.function.BiConsumer;
|
||||
import java.util.function.BiFunction;
|
||||
import java.util.function.BiPredicate;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Predicate;
|
||||
import java.util.function.*;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
import java.util.stream.StreamSupport;
|
||||
@ -51,8 +42,8 @@ public interface TransformableWrappedStream<T, S extends TransformableWrappedStr
|
||||
final Iterable<U> other,
|
||||
final BiFunction<? super T, ? super U, ? extends R> zipper) {
|
||||
Objects.requireNonNull(zipper);
|
||||
final Map<Integer, T> idxIdentityMap = mapIdx((e, idx) -> MapUtil.entry(idx, e)).collect(CollectorUtil.entryToMap());
|
||||
final Map<Integer, U> idxOtherMap = EasyStream.of(other).mapIdx((e, idx) -> MapUtil.entry(idx, e)).collect(CollectorUtil.entryToMap());
|
||||
Map<Integer, T> idxIdentityMap = mapIdx((e, idx) -> MapUtil.entry(idx, e)).collect(CollectorUtil.entryToMap());
|
||||
Map<Integer, U> idxOtherMap = EasyStream.of(other).mapIdx((e, idx) -> MapUtil.entry(idx, e)).collect(CollectorUtil.entryToMap());
|
||||
if (idxIdentityMap.size() <= idxOtherMap.size()) {
|
||||
return EasyStream.of(idxIdentityMap.keySet(), isParallel()).map(k -> zipper.apply(idxIdentityMap.get(k), idxOtherMap.get(k)));
|
||||
}
|
||||
@ -259,8 +250,7 @@ public interface TransformableWrappedStream<T, S extends TransformableWrappedStr
|
||||
// region ============ peek ============
|
||||
|
||||
/**
|
||||
* 返回与指定函数将元素作为参数执行后组成的流。操作带下标,并行流时下标永远为-1
|
||||
* 这是一个无状态中间操作
|
||||
* 返回与指定函数将元素作为参数执行后组成的流。操作带下标
|
||||
*
|
||||
* @param action 指定的函数
|
||||
* @return 返回叠加操作后的FastStream
|
||||
@ -278,7 +268,11 @@ public interface TransformableWrappedStream<T, S extends TransformableWrappedStr
|
||||
default S peekIdx(final BiConsumer<? super T, Integer> action) {
|
||||
Objects.requireNonNull(action);
|
||||
if (isParallel()) {
|
||||
return peek(e -> action.accept(e, NOT_FOUND_ELEMENT_INDEX));
|
||||
final Map<Integer, T> idxMap = easyStream().toIdxMap();
|
||||
return transform(EasyStream.of(idxMap.entrySet())
|
||||
.parallel(isParallel())
|
||||
.peek(e -> action.accept(e.getValue(), e.getKey()))
|
||||
.map(Map.Entry::getValue));
|
||||
} else {
|
||||
final AtomicInteger index = new AtomicInteger(NOT_FOUND_ELEMENT_INDEX);
|
||||
return peek(e -> action.accept(e, index.incrementAndGet()));
|
||||
@ -370,8 +364,7 @@ public interface TransformableWrappedStream<T, S extends TransformableWrappedStr
|
||||
}
|
||||
|
||||
/**
|
||||
* 过滤元素,返回与指定断言匹配的元素组成的流,断言带下标,并行流时下标永远为-1
|
||||
* 这是一个无状态中间操作
|
||||
* 过滤元素,返回与指定断言匹配的元素组成的流,断言带下标
|
||||
*
|
||||
* @param predicate 断言
|
||||
* @return 返回叠加过滤操作后的流
|
||||
@ -379,7 +372,11 @@ public interface TransformableWrappedStream<T, S extends TransformableWrappedStr
|
||||
default S filterIdx(final BiPredicate<? super T, Integer> predicate) {
|
||||
Objects.requireNonNull(predicate);
|
||||
if (isParallel()) {
|
||||
return filter(e -> predicate.test(e, NOT_FOUND_ELEMENT_INDEX));
|
||||
final Map<Integer, T> idxMap = easyStream().toIdxMap();
|
||||
return transform(EasyStream.of(idxMap.entrySet())
|
||||
.parallel(isParallel())
|
||||
.filter(e -> predicate.test(e.getValue(), e.getKey()))
|
||||
.map(Map.Entry::getValue));
|
||||
} else {
|
||||
final MutableInt index = new MutableInt(NOT_FOUND_ELEMENT_INDEX);
|
||||
return filter(e -> predicate.test(e, index.incrementAndGet()));
|
||||
@ -423,8 +420,7 @@ public interface TransformableWrappedStream<T, S extends TransformableWrappedStr
|
||||
}
|
||||
|
||||
/**
|
||||
* 扩散流操作,可能影响流元素个数,将原有流元素执行mapper操作,返回多个流所有元素组成的流,操作带下标,并行流时下标永远为-1
|
||||
* 这是一个无状态中间操作
|
||||
* 扩散流操作,可能影响流元素个数,将原有流元素执行mapper操作,返回多个流所有元素组成的流,操作带下标
|
||||
*
|
||||
* @param mapper 操作,返回流
|
||||
* @param <R> 拆分后流的元素类型
|
||||
@ -433,7 +429,10 @@ public interface TransformableWrappedStream<T, S extends TransformableWrappedStr
|
||||
default <R> EasyStream<R> flatMapIdx(final BiFunction<? super T, Integer, ? extends Stream<? extends R>> mapper) {
|
||||
Objects.requireNonNull(mapper);
|
||||
if (isParallel()) {
|
||||
return flatMap(e -> mapper.apply(e, NOT_FOUND_ELEMENT_INDEX));
|
||||
final Map<Integer, T> idxMap = easyStream().toIdxMap();
|
||||
return EasyStream.of(idxMap.entrySet())
|
||||
.parallel(isParallel())
|
||||
.flatMap(e -> mapper.apply(e.getValue(), e.getKey()));
|
||||
} else {
|
||||
final MutableInt index = new MutableInt(NOT_FOUND_ELEMENT_INDEX);
|
||||
return flatMap(e -> mapper.apply(e, index.incrementAndGet()));
|
||||
@ -534,8 +533,7 @@ public interface TransformableWrappedStream<T, S extends TransformableWrappedStr
|
||||
}
|
||||
|
||||
/**
|
||||
* 返回与指定函数将元素作为参数执行的结果组成的流,操作带下标,并行流时下标永远为-1
|
||||
* 这是一个无状态中间操作
|
||||
* 返回与指定函数将元素作为参数执行的结果组成的流,操作带下标
|
||||
*
|
||||
* @param mapper 指定的函数
|
||||
* @param <R> 函数执行后返回的类型
|
||||
@ -544,7 +542,10 @@ public interface TransformableWrappedStream<T, S extends TransformableWrappedStr
|
||||
default <R> EasyStream<R> mapIdx(final BiFunction<? super T, Integer, ? extends R> mapper) {
|
||||
Objects.requireNonNull(mapper);
|
||||
if (isParallel()) {
|
||||
return map(e -> mapper.apply(e, NOT_FOUND_ELEMENT_INDEX));
|
||||
final Map<Integer, T> idxMap = easyStream().toIdxMap();
|
||||
return EasyStream.of(idxMap.entrySet())
|
||||
.parallel(isParallel())
|
||||
.map(e -> mapper.apply(e.getValue(), e.getKey()));
|
||||
} else {
|
||||
final MutableInt index = new MutableInt(NOT_FOUND_ELEMENT_INDEX);
|
||||
return map(e -> mapper.apply(e, index.incrementAndGet()));
|
||||
|
@ -25,7 +25,7 @@ import java.util.stream.*;
|
||||
public interface WrappedStream<T, S extends WrappedStream<T, S>> extends Stream<T>, Iterable<T> {
|
||||
|
||||
/**
|
||||
* 代表不存在的下标, 一般用于并行流的下标, 或者未找到元素时的下标
|
||||
* 代表不存在的下标, 或者未找到元素时的下标
|
||||
*/
|
||||
int NOT_FOUND_ELEMENT_INDEX = -1;
|
||||
|
||||
@ -587,4 +587,28 @@ public interface WrappedStream<T, S extends WrappedStream<T, S>> extends Stream<
|
||||
@Override
|
||||
String toString();
|
||||
|
||||
/**
|
||||
* 转换为EasyStream
|
||||
*
|
||||
* @return 转换后的EasyStream
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
default EasyStream<T> easyStream() {
|
||||
if (this instanceof EasyStream) {
|
||||
return (EasyStream<T>) this;
|
||||
} else if (this instanceof Iterator) {
|
||||
return (EasyStream<T>) EasyStream.of((Iterator<T>) this);
|
||||
} else {
|
||||
return EasyStream.of(collect(Collectors.toList()));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 转换为子类实现
|
||||
*
|
||||
* @param stream 流
|
||||
* @return 子类实现
|
||||
*/
|
||||
S transform(Stream<T> stream);
|
||||
|
||||
}
|
||||
|
@ -295,14 +295,8 @@ public class AbstractEnhancedWrappedStreamTest {
|
||||
Assert.assertEquals(asList(1, 2, 3), elements);
|
||||
Assert.assertEquals(asList(0, 1, 2), indexes);
|
||||
|
||||
final Set<Integer> elements2 = Collections.synchronizedSet(new HashSet<>());
|
||||
final Set<Integer> indexes2 = Collections.synchronizedSet(new HashSet<>());
|
||||
wrap(1, 2, null).parallel().peekIdx((t, i) -> {
|
||||
elements2.add(t);
|
||||
indexes2.add(i);
|
||||
}).exec();
|
||||
Assert.assertEquals(new HashSet<>(asList(1, null, 2)), elements2);
|
||||
Assert.assertEquals(new HashSet<>(asList(-1, -1, -1)), indexes2);
|
||||
wrap("one", "two", "three", "four").parallel().filter(e -> e.length() == 4)
|
||||
.peekIdx((e, i) -> Assert.assertEquals("four:0", e + ":" + i)).exec();
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -631,10 +625,11 @@ public class AbstractEnhancedWrappedStreamTest {
|
||||
final Map<Integer, Integer> expect = new HashMap<Integer, Integer>() {
|
||||
private static final long serialVersionUID = 1L;
|
||||
{
|
||||
put(1, 1);
|
||||
put(2, 2);
|
||||
put(3, 3);
|
||||
}};
|
||||
put(1, 1);
|
||||
put(2, 2);
|
||||
put(3, 3);
|
||||
}
|
||||
};
|
||||
Map<Integer, Integer> map = EasyStream.of(1, 2, 3)
|
||||
.toEntries(Function.identity(), Function.identity())
|
||||
.toMap();
|
||||
@ -709,6 +704,18 @@ public class AbstractEnhancedWrappedStreamTest {
|
||||
return new Wrapper<>(source);
|
||||
}
|
||||
|
||||
/**
|
||||
* 转换为子类实现
|
||||
*
|
||||
* @param stream 流
|
||||
* @return 子类实现
|
||||
*/
|
||||
@Override
|
||||
public Wrapper<T> transform(final Stream<T> stream) {
|
||||
this.stream = stream;
|
||||
return this;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Setter
|
||||
|
@ -9,6 +9,7 @@ import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
@ -142,8 +143,9 @@ public class EasyStreamTest {
|
||||
final List<String> list = Arrays.asList("dromara", "hutool", "sweet");
|
||||
final List<String> mapIndex = EasyStream.of(list).mapIdx((e, i) -> i + 1 + "." + e).toList();
|
||||
Assert.assertEquals(Arrays.asList("1.dromara", "2.hutool", "3.sweet"), mapIndex);
|
||||
// 并行流时为-1
|
||||
Assert.assertEquals(Arrays.asList(-1, -1, -1), EasyStream.of(1, 2, 3).parallel().mapIdx((e, i) -> i).toList());
|
||||
// 并行流时正常
|
||||
Assert.assertEquals(Arrays.asList("1.dromara", "2.hutool", "3.sweet"),
|
||||
EasyStream.of("dromara", "hutool", "sweet").parallel().mapIdx((e, i) -> i + 1 + "." + e).toList());
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -192,8 +194,10 @@ public class EasyStreamTest {
|
||||
final EasyStream.Builder<String> builder = EasyStream.builder();
|
||||
EasyStream.of(list).forEachIdx((e, i) -> builder.accept(i + 1 + "." + e));
|
||||
Assert.assertEquals(Arrays.asList("1.dromara", "2.hutool", "3.sweet"), builder.build().toList());
|
||||
// 并行流时为-1
|
||||
EasyStream.of(1, 2, 3).parallel().forEachIdx((e, i) -> Assert.assertEquals(-1, (Object) i));
|
||||
// 并行流时正常
|
||||
final AtomicInteger total = new AtomicInteger(0);
|
||||
EasyStream.of("dromara", "hutool", "sweet").parallel().forEachIdx((e, i) -> total.addAndGet(i));
|
||||
Assert.assertEquals(3, total.get());
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -205,7 +209,7 @@ public class EasyStreamTest {
|
||||
|
||||
final EasyStream.Builder<String> streamBuilder = EasyStream.builder();
|
||||
EasyStream.of(list).parallel().forEachOrderedIdx((e, i) -> streamBuilder.accept(i + 1 + "." + e));
|
||||
Assert.assertEquals(Arrays.asList("0.dromara", "0.hutool", "0.sweet"), streamBuilder.build().toList());
|
||||
Assert.assertEquals(Arrays.asList("1.dromara", "2.hutool", "3.sweet"), streamBuilder.build().toList());
|
||||
|
||||
}
|
||||
|
||||
@ -214,8 +218,10 @@ public class EasyStreamTest {
|
||||
final List<String> list = Arrays.asList("dromara", "hutool", "sweet");
|
||||
final List<String> mapIndex = EasyStream.of(list).flatMapIdx((e, i) -> EasyStream.of(i + 1 + "." + e)).toList();
|
||||
Assert.assertEquals(Arrays.asList("1.dromara", "2.hutool", "3.sweet"), mapIndex);
|
||||
// 并行流时为-1
|
||||
Assert.assertEquals(Arrays.asList(-1, -1, -1), EasyStream.of(1, 2, 3).parallel().flatMapIdx((e, i) -> EasyStream.of(i)).toList());
|
||||
// 并行流时正常
|
||||
Assert.assertEquals(Arrays.asList("1.dromara", "2.hutool", "3.sweet"),
|
||||
EasyStream.of("dromara", "hutool", "sweet").parallel()
|
||||
.flatMapIdx((e, i) -> EasyStream.of(i + 1 + "." + e)).toList());
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -269,8 +275,9 @@ public class EasyStreamTest {
|
||||
final List<String> list = Arrays.asList("dromara", "hutool", "sweet");
|
||||
final List<String> filterIndex = EasyStream.of(list).filterIdx((e, i) -> i < 2).toList();
|
||||
Assert.assertEquals(Arrays.asList("dromara", "hutool"), filterIndex);
|
||||
// 并行流时为-1
|
||||
Assert.assertEquals(3L, EasyStream.of(1, 2, 3).parallel().filterIdx((e, i) -> i == -1).count());
|
||||
// 并行流时正常
|
||||
Assert.assertEquals(Arrays.asList("dromara", "hutool"),
|
||||
EasyStream.of("dromara", "hutool", "sweet").parallel().filterIdx((e, i) -> i < 2).toList());
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -434,8 +441,6 @@ public class EasyStreamTest {
|
||||
Consumer<Object> test = o -> {
|
||||
final List<Student> studentTree = EasyStream
|
||||
.of(
|
||||
// 会过滤掉id为null的元素
|
||||
Student.builder().name("top").build(),
|
||||
Student.builder().id(1L).name("dromara").build(),
|
||||
Student.builder().id(2L).name("baomidou").build(),
|
||||
Student.builder().id(3L).name("hutool").parentId(1L).build(),
|
||||
|
Loading…
x
Reference in New Issue
Block a user