mirror of
https://gitee.com/chinabugotech/hutool.git
synced 2025-04-19 03:01:48 +08:00
增加分段锁实现SegmentLock
(pr#1330@Gitee)
This commit is contained in:
parent
aa13f98934
commit
b0e37e3ef3
@ -16,6 +16,9 @@
|
||||
|
||||
package org.dromara.hutool.core.thread.lock;
|
||||
|
||||
import java.util.concurrent.Semaphore;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReadWriteLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
import java.util.concurrent.locks.StampedLock;
|
||||
|
||||
@ -48,6 +51,105 @@ public class LockUtil {
|
||||
return new ReentrantReadWriteLock(fair);
|
||||
}
|
||||
|
||||
// region ----- SegmentLock
|
||||
/**
|
||||
* 创建分段锁(强引用),使用 ReentrantLock
|
||||
*
|
||||
* @param segments 分段数量,必须大于 0
|
||||
* @return 分段锁实例
|
||||
*/
|
||||
public static SegmentLock<Lock> createSegmentLock(final int segments) {
|
||||
return SegmentLock.lock(segments);
|
||||
}
|
||||
|
||||
/**
|
||||
* 创建分段读写锁(强引用),使用 ReentrantReadWriteLock
|
||||
*
|
||||
* @param segments 分段数量,必须大于 0
|
||||
* @return 分段读写锁实例
|
||||
*/
|
||||
public static SegmentLock<ReadWriteLock> createSegmentReadWriteLock(final int segments) {
|
||||
return SegmentLock.readWriteLock(segments);
|
||||
}
|
||||
|
||||
/**
|
||||
* 创建分段信号量(强引用)
|
||||
*
|
||||
* @param segments 分段数量,必须大于 0
|
||||
* @param permits 每个信号量的许可数
|
||||
* @return 分段信号量实例
|
||||
*/
|
||||
public static SegmentLock<Semaphore> createSegmentSemaphore(final int segments, final int permits) {
|
||||
return SegmentLock.semaphore(segments, permits);
|
||||
}
|
||||
|
||||
/**
|
||||
* 创建弱引用分段锁,使用 ReentrantLock,懒加载
|
||||
*
|
||||
* @param segments 分段数量,必须大于 0
|
||||
* @return 弱引用分段锁实例
|
||||
*/
|
||||
public static SegmentLock<Lock> createLazySegmentLock(final int segments) {
|
||||
return SegmentLock.lazyWeakLock(segments);
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据 key 获取分段锁(强引用)
|
||||
*
|
||||
* @param segments 分段数量,必须大于 0
|
||||
* @param key 用于映射分段的 key
|
||||
* @return 对应的 Lock 实例
|
||||
*/
|
||||
public static Lock getSegmentLock(final int segments, final Object key) {
|
||||
return SegmentLock.lock(segments).get(key);
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据 key 获取分段读锁(强引用)
|
||||
*
|
||||
* @param segments 分段数量,必须大于 0
|
||||
* @param key 用于映射分段的 key
|
||||
* @return 对应的读锁实例
|
||||
*/
|
||||
public static Lock getSegmentReadLock(final int segments, final Object key) {
|
||||
return SegmentLock.readWriteLock(segments).get(key).readLock();
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据 key 获取分段写锁(强引用)
|
||||
*
|
||||
* @param segments 分段数量,必须大于 0
|
||||
* @param key 用于映射分段的 key
|
||||
* @return 对应的写锁实例
|
||||
*/
|
||||
public static Lock getSegmentWriteLock(final int segments, final Object key) {
|
||||
return SegmentLock.readWriteLock(segments).get(key).writeLock();
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据 key 获取分段信号量(强引用)
|
||||
*
|
||||
* @param segments 分段数量,必须大于 0
|
||||
* @param permits 每个信号量的许可数
|
||||
* @param key 用于映射分段的 key
|
||||
* @return 对应的 Semaphore 实例
|
||||
*/
|
||||
public static Semaphore getSegmentSemaphore(final int segments, final int permits, final Object key) {
|
||||
return SegmentLock.semaphore(segments, permits).get(key);
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据 key 获取弱引用分段锁,懒加载
|
||||
*
|
||||
* @param segments 分段数量,必须大于 0
|
||||
* @param key 用于映射分段的 key
|
||||
* @return 对应的 Lock 实例
|
||||
*/
|
||||
public static Lock getLazySegmentLock(final int segments, final Object key) {
|
||||
return SegmentLock.lazyWeakLock(segments).get(key);
|
||||
}
|
||||
// endregion
|
||||
|
||||
/**
|
||||
* 获取单例的无锁对象
|
||||
*
|
||||
|
@ -0,0 +1,523 @@
|
||||
/*
|
||||
* Copyright (c) 2025 Hutool Team and hutool.cn
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.dromara.hutool.core.thread.lock;
|
||||
|
||||
import org.dromara.hutool.core.collection.CollUtil;
|
||||
import org.dromara.hutool.core.collection.ListUtil;
|
||||
import org.dromara.hutool.core.lang.Assert;
|
||||
|
||||
import java.lang.ref.Reference;
|
||||
import java.lang.ref.ReferenceQueue;
|
||||
import java.lang.ref.WeakReference;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.Semaphore;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReferenceArray;
|
||||
import java.util.concurrent.locks.*;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
/**
|
||||
* 分段锁工具类,支持 Lock、Semaphore 和 ReadWriteLock 的分段实现。
|
||||
* <p>
|
||||
* 通过将锁分成多个段(segments),不同的操作可以并发使用不同的段,避免所有线程竞争同一把锁。
|
||||
* 相等的 key 保证映射到同一段锁(如 key1.equals(key2) 时,get(key1) 和 get(key2) 返回相同对象)。
|
||||
* 但不同 key 可能因哈希冲突映射到同一段,段数越少冲突概率越高。
|
||||
* <p>
|
||||
* 支持两种实现:
|
||||
* <ul>
|
||||
* <li>强引用:创建时初始化所有段,内存占用稳定。</li>
|
||||
* <li>弱引用:懒加载,首次使用时创建段,未使用时可被垃圾回收,适合大量段但使用较少的场景。</li>
|
||||
* </ul>
|
||||
*
|
||||
* @param <L>
|
||||
* @author Guava,dakuo
|
||||
* @since 5.8.38
|
||||
*/
|
||||
public abstract class SegmentLock<L> {
|
||||
|
||||
/** 当段数大于此阈值时,使用 ConcurrentMap 替代大数组以节省内存(适用于懒加载场景) */
|
||||
private static final int LARGE_LAZY_CUTOFF = 1024;
|
||||
|
||||
private SegmentLock() {}
|
||||
|
||||
/**
|
||||
* 根据 key 获取对应的锁段,保证相同 key 返回相同对象。
|
||||
*
|
||||
* @param key 非空 key
|
||||
* @return 对应的锁段
|
||||
*/
|
||||
public abstract L get(Object key);
|
||||
|
||||
/**
|
||||
* 根据索引获取锁段,索引范围为 [0, size())。
|
||||
*
|
||||
* @param index 索引
|
||||
* @return 指定索引的锁段
|
||||
*/
|
||||
public abstract L getAt(int index);
|
||||
|
||||
/**
|
||||
* 计算 key 对应的段索引。
|
||||
*
|
||||
* @param key 非空 key
|
||||
* @return 段索引
|
||||
*/
|
||||
abstract int indexFor(Object key);
|
||||
|
||||
/**
|
||||
* 获取总段数。
|
||||
*
|
||||
* @return 段数
|
||||
*/
|
||||
public abstract int size();
|
||||
|
||||
/**
|
||||
* 批量获取多个 key 对应的锁段列表,按索引升序排列,避免死锁。
|
||||
*
|
||||
* @param keys 非空 key 集合
|
||||
* @return 锁段列表(可能有重复)
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public Iterable<L> bulkGet(final Iterable<?> keys) {
|
||||
final List<Object> result = (List<Object>) ListUtil.of(keys);
|
||||
if (CollUtil.isEmpty(result)) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
final int[] stripes = new int[result.size()];
|
||||
for (int i = 0; i < result.size(); i++) {
|
||||
stripes[i] = indexFor(result.get(i));
|
||||
}
|
||||
Arrays.sort(stripes);
|
||||
int previousStripe = stripes[0];
|
||||
result.set(0, getAt(previousStripe));
|
||||
for (int i = 1; i < result.size(); i++) {
|
||||
final int currentStripe = stripes[i];
|
||||
if (currentStripe == previousStripe) {
|
||||
result.set(i, result.get(i - 1));
|
||||
} else {
|
||||
result.set(i, getAt(currentStripe));
|
||||
previousStripe = currentStripe;
|
||||
}
|
||||
}
|
||||
final List<L> asStripes = (List<L>) result;
|
||||
return Collections.unmodifiableList(asStripes);
|
||||
}
|
||||
|
||||
// 静态工厂方法
|
||||
|
||||
/**
|
||||
* 创建强引用的分段锁,所有段在创建时初始化。
|
||||
*
|
||||
* @param stripes 段数
|
||||
* @param supplier 锁提供者
|
||||
* @param <L> 锁类型
|
||||
* @return 分段锁实例
|
||||
*/
|
||||
public static <L> SegmentLock<L> custom(final int stripes, final Supplier<L> supplier) {
|
||||
return new CompactSegmentLock<>(stripes, supplier);
|
||||
}
|
||||
|
||||
/**
|
||||
* 创建强引用的可重入锁分段实例。
|
||||
*
|
||||
* @param stripes 段数
|
||||
* @return 分段锁实例
|
||||
*/
|
||||
public static SegmentLock<Lock> lock(final int stripes) {
|
||||
return custom(stripes, PaddedLock::new);
|
||||
}
|
||||
|
||||
/**
|
||||
* 创建弱引用的可重入锁分段实例,懒加载。
|
||||
*
|
||||
* @param stripes 段数
|
||||
* @return 分段锁实例
|
||||
*/
|
||||
public static SegmentLock<Lock> lazyWeakLock(final int stripes) {
|
||||
return lazyWeakCustom(stripes, () -> new ReentrantLock(false));
|
||||
}
|
||||
|
||||
/**
|
||||
* 创建弱引用的分段锁,懒加载。
|
||||
*
|
||||
* @param stripes 段数
|
||||
* @param supplier 锁提供者
|
||||
* @param <L> 锁类型
|
||||
* @return 分段锁实例
|
||||
*/
|
||||
private static <L> SegmentLock<L> lazyWeakCustom(final int stripes, final Supplier<L> supplier) {
|
||||
return stripes < LARGE_LAZY_CUTOFF
|
||||
? new SmallLazySegmentLock<>(stripes, supplier)
|
||||
: new LargeLazySegmentLock<>(stripes, supplier);
|
||||
}
|
||||
|
||||
/**
|
||||
* 创建强引用的信号量分段实例。
|
||||
*
|
||||
* @param stripes 段数
|
||||
* @param permits 每个信号量的许可数
|
||||
* @return 分段信号量实例
|
||||
*/
|
||||
public static SegmentLock<Semaphore> semaphore(final int stripes, final int permits) {
|
||||
return custom(stripes, () -> new PaddedSemaphore(permits));
|
||||
}
|
||||
|
||||
/**
|
||||
* 创建弱引用的信号量分段实例,懒加载。
|
||||
*
|
||||
* @param stripes 段数
|
||||
* @param permits 每个信号量的许可数
|
||||
* @return 分段信号量实例
|
||||
*/
|
||||
public static SegmentLock<Semaphore> lazyWeakSemaphore(final int stripes, final int permits) {
|
||||
return lazyWeakCustom(stripes, () -> new Semaphore(permits, false));
|
||||
}
|
||||
|
||||
/**
|
||||
* 创建强引用的读写锁分段实例。
|
||||
*
|
||||
* @param stripes 段数
|
||||
* @return 分段读写锁实例
|
||||
*/
|
||||
public static SegmentLock<ReadWriteLock> readWriteLock(final int stripes) {
|
||||
return custom(stripes, ReentrantReadWriteLock::new);
|
||||
}
|
||||
|
||||
/**
|
||||
* 创建弱引用的读写锁分段实例,懒加载。
|
||||
*
|
||||
* @param stripes 段数
|
||||
* @return 分段读写锁实例
|
||||
*/
|
||||
public static SegmentLock<ReadWriteLock> lazyWeakReadWriteLock(final int stripes) {
|
||||
return lazyWeakCustom(stripes, WeakSafeReadWriteLock::new);
|
||||
}
|
||||
|
||||
// 内部实现类
|
||||
|
||||
/**
|
||||
* 弱引用安全的读写锁实现,确保读锁和写锁持有对自身的强引用。
|
||||
*/
|
||||
private static final class WeakSafeReadWriteLock implements ReadWriteLock {
|
||||
private final ReadWriteLock delegate;
|
||||
|
||||
WeakSafeReadWriteLock() {
|
||||
this.delegate = new ReentrantReadWriteLock();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Lock readLock() {
|
||||
return new WeakSafeLock(delegate.readLock(), this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Lock writeLock() {
|
||||
return new WeakSafeLock(delegate.writeLock(), this);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 弱引用安全的锁包装类,确保持有强引用。
|
||||
*/
|
||||
private static final class WeakSafeLock implements Lock {
|
||||
private final Lock delegate;
|
||||
private final WeakSafeReadWriteLock strongReference;
|
||||
|
||||
WeakSafeLock(final Lock delegate, final WeakSafeReadWriteLock strongReference) {
|
||||
this.delegate = delegate;
|
||||
this.strongReference = strongReference;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void lock() {
|
||||
delegate.lock();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void lockInterruptibly() throws InterruptedException {
|
||||
delegate.lockInterruptibly();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean tryLock() {
|
||||
return delegate.tryLock();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean tryLock(final long time, final java.util.concurrent.TimeUnit unit) throws InterruptedException {
|
||||
return delegate.tryLock(time, unit);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void unlock() {
|
||||
delegate.unlock();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Condition newCondition() {
|
||||
return new WeakSafeCondition(delegate.newCondition(), strongReference);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 弱引用安全的条件包装类。
|
||||
*/
|
||||
private static final class WeakSafeCondition implements Condition {
|
||||
private final Condition delegate;
|
||||
|
||||
/** 防止垃圾回收 */
|
||||
@SuppressWarnings("FieldCanBeLocal")
|
||||
private final WeakSafeReadWriteLock strongReference;
|
||||
|
||||
WeakSafeCondition(final Condition delegate, final WeakSafeReadWriteLock strongReference) {
|
||||
this.delegate = delegate;
|
||||
this.strongReference = strongReference;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void await() throws InterruptedException {
|
||||
delegate.await();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void awaitUninterruptibly() {
|
||||
delegate.awaitUninterruptibly();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long awaitNanos(final long nanosTimeout) throws InterruptedException {
|
||||
return delegate.awaitNanos(nanosTimeout);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean await(final long time, final TimeUnit unit) throws InterruptedException {
|
||||
return delegate.await(time, unit);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean awaitUntil(final Date deadline) throws InterruptedException {
|
||||
return delegate.awaitUntil(deadline);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void signal() {
|
||||
delegate.signal();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void signalAll() {
|
||||
delegate.signalAll();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 抽象基类,确保段数为 2 的幂。
|
||||
*/
|
||||
private abstract static class PowerOfTwoSegmentLock<L> extends SegmentLock<L> {
|
||||
final int mask;
|
||||
|
||||
PowerOfTwoSegmentLock(final int stripes) {
|
||||
Assert.isTrue(stripes > 0, "Segment count must be positive");
|
||||
this.mask = stripes > Integer.MAX_VALUE / 2 ? ALL_SET : ceilToPowerOfTwo(stripes) - 1;
|
||||
}
|
||||
|
||||
@Override
|
||||
final int indexFor(final Object key) {
|
||||
final int hash = smear(key.hashCode());
|
||||
return hash & mask;
|
||||
}
|
||||
|
||||
@Override
|
||||
public final L get(final Object key) {
|
||||
return getAt(indexFor(key));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 强引用实现,使用固定数组存储段。
|
||||
*/
|
||||
private static class CompactSegmentLock<L> extends PowerOfTwoSegmentLock<L> {
|
||||
private final Object[] array;
|
||||
|
||||
CompactSegmentLock(final int stripes, final Supplier<L> supplier) {
|
||||
super(stripes);
|
||||
Assert.isTrue(stripes <= Integer.MAX_VALUE / 2, "Segment count must be <= 2^30");
|
||||
this.array = new Object[mask + 1];
|
||||
for (int i = 0; i < array.length; i++) {
|
||||
array[i] = supplier.get();
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public L getAt(final int index) {
|
||||
if (index < 0 || index >= array.length) {
|
||||
throw new IllegalArgumentException("Index " + index + " out of bounds for size " + array.length);
|
||||
}
|
||||
return (L) array[index];
|
||||
}
|
||||
|
||||
@Override
|
||||
public int size() {
|
||||
return array.length;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 小规模弱引用实现,使用 AtomicReferenceArray 存储段。
|
||||
*/
|
||||
private static class SmallLazySegmentLock<L> extends PowerOfTwoSegmentLock<L> {
|
||||
final AtomicReferenceArray<ArrayReference<? extends L>> locks;
|
||||
final Supplier<L> supplier;
|
||||
final int size;
|
||||
final ReferenceQueue<L> queue = new ReferenceQueue<>();
|
||||
|
||||
SmallLazySegmentLock(final int stripes, final Supplier<L> supplier) {
|
||||
super(stripes);
|
||||
this.size = (mask == ALL_SET) ? Integer.MAX_VALUE : mask + 1;
|
||||
this.locks = new AtomicReferenceArray<>(size);
|
||||
this.supplier = supplier;
|
||||
}
|
||||
|
||||
@Override
|
||||
public L getAt(final int index) {
|
||||
if (size != Integer.MAX_VALUE) {
|
||||
Assert.isTrue(index >= 0 && index < size, "Index out of bounds");
|
||||
}
|
||||
ArrayReference<? extends L> existingRef = locks.get(index);
|
||||
L existing = existingRef == null ? null : existingRef.get();
|
||||
if (existing != null) {
|
||||
return existing;
|
||||
}
|
||||
final L created = supplier.get();
|
||||
final ArrayReference<L> newRef = new ArrayReference<>(created, index, queue);
|
||||
while (!locks.compareAndSet(index, existingRef, newRef)) {
|
||||
existingRef = locks.get(index);
|
||||
existing = existingRef == null ? null : existingRef.get();
|
||||
if (existing != null) {
|
||||
return existing;
|
||||
}
|
||||
}
|
||||
drainQueue();
|
||||
return created;
|
||||
}
|
||||
|
||||
private void drainQueue() {
|
||||
Reference<? extends L> ref;
|
||||
while ((ref = queue.poll()) != null) {
|
||||
final ArrayReference<? extends L> arrayRef = (ArrayReference<? extends L>) ref;
|
||||
locks.compareAndSet(arrayRef.index, arrayRef, null);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int size() {
|
||||
return size;
|
||||
}
|
||||
|
||||
private static final class ArrayReference<L> extends WeakReference<L> {
|
||||
final int index;
|
||||
|
||||
ArrayReference(final L referent, final int index, final ReferenceQueue<L> queue) {
|
||||
super(referent, queue);
|
||||
this.index = index;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 大规模弱引用实现,使用 ConcurrentMap 存储段。
|
||||
*/
|
||||
private static class LargeLazySegmentLock<L> extends PowerOfTwoSegmentLock<L> {
|
||||
final ConcurrentMap<Integer, L> locks;
|
||||
final Supplier<L> supplier;
|
||||
final int size;
|
||||
|
||||
LargeLazySegmentLock(final int stripes, final Supplier<L> supplier) {
|
||||
super(stripes);
|
||||
this.size = (mask == ALL_SET) ? Integer.MAX_VALUE : mask + 1;
|
||||
this.locks = new ConcurrentHashMap<>();
|
||||
this.supplier = supplier;
|
||||
}
|
||||
|
||||
@Override
|
||||
public L getAt(final int index) {
|
||||
if (size != Integer.MAX_VALUE) {
|
||||
Assert.isTrue(index >= 0 && index < size, "Index out of bounds");
|
||||
}
|
||||
L existing = locks.get(index);
|
||||
if (existing != null) {
|
||||
return existing;
|
||||
}
|
||||
final L created = supplier.get();
|
||||
existing = locks.putIfAbsent(index, created);
|
||||
return existing != null ? existing : created;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int size() {
|
||||
return size;
|
||||
}
|
||||
}
|
||||
|
||||
private static final int ALL_SET = ~0;
|
||||
|
||||
private static int ceilToPowerOfTwo(final int x) {
|
||||
return 1 << (Integer.SIZE - Integer.numberOfLeadingZeros(x - 1));
|
||||
}
|
||||
|
||||
private static int smear(int hashCode) {
|
||||
hashCode ^= (hashCode >>> 20) ^ (hashCode >>> 12);
|
||||
return hashCode ^ (hashCode >>> 7) ^ (hashCode >>> 4);
|
||||
}
|
||||
|
||||
/**
|
||||
* 填充锁,避免缓存行干扰。
|
||||
*/
|
||||
private static class PaddedLock extends ReentrantLock {
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
long unused1;
|
||||
long unused2;
|
||||
long unused3;
|
||||
|
||||
PaddedLock() {
|
||||
super(false);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 填充信号量,避免缓存行干扰。
|
||||
*/
|
||||
private static class PaddedSemaphore extends Semaphore {
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
long unused1;
|
||||
long unused2;
|
||||
long unused3;
|
||||
|
||||
PaddedSemaphore(final int permits) {
|
||||
super(permits, false);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,203 @@
|
||||
package org.dromara.hutool.core.thread.lock;
|
||||
|
||||
import org.dromara.hutool.core.collection.ListUtil;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.*;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReadWriteLock;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.*;
|
||||
|
||||
public class SegmentLockTest {
|
||||
private static final int SEGMENT_COUNT = 4;
|
||||
private SegmentLock<Lock> strongLock;
|
||||
private SegmentLock<Lock> weakLock;
|
||||
private SegmentLock<Semaphore> semaphore;
|
||||
private SegmentLock<ReadWriteLock> readWriteLock;
|
||||
|
||||
@BeforeEach
|
||||
public void setUp() {
|
||||
strongLock = SegmentLock.lock(SEGMENT_COUNT);
|
||||
weakLock = SegmentLock.lazyWeakLock(SEGMENT_COUNT);
|
||||
semaphore = SegmentLock.semaphore(SEGMENT_COUNT, 2);
|
||||
readWriteLock = SegmentLock.readWriteLock(SEGMENT_COUNT);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSize() {
|
||||
assertEquals(SEGMENT_COUNT, strongLock.size());
|
||||
assertEquals(SEGMENT_COUNT, weakLock.size());
|
||||
assertEquals(SEGMENT_COUNT, semaphore.size());
|
||||
assertEquals(SEGMENT_COUNT, readWriteLock.size());
|
||||
}
|
||||
|
||||
@SuppressWarnings("StringOperationCanBeSimplified")
|
||||
@Test
|
||||
public void testGetWithSameKey() {
|
||||
// 相同 key 应返回相同锁
|
||||
final String key1 = "testKey";
|
||||
final String key2 = new String("testKey"); // equals 但不同对象
|
||||
final Lock lock1 = strongLock.get(key1);
|
||||
final Lock lock2 = strongLock.get(key2);
|
||||
assertSame(lock1, lock2, "相同 key 应返回同一锁对象");
|
||||
|
||||
final Lock weakLock1 = weakLock.get(key1);
|
||||
final Lock weakLock2 = weakLock.get(key2);
|
||||
assertSame(weakLock1, weakLock2, "弱引用锁相同 key 应返回同一锁对象");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetAt() {
|
||||
for (int i = 0; i < SEGMENT_COUNT; i++) {
|
||||
final Lock lock = strongLock.getAt(i);
|
||||
assertNotNull(lock, "getAt 返回的锁不应为 null");
|
||||
}
|
||||
assertThrows(IllegalArgumentException.class, () -> strongLock.getAt(SEGMENT_COUNT),
|
||||
"超出段数的索引应抛出异常");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBulkGet() {
|
||||
final List<String> keys = ListUtil.of("key1", "key2", "key3");
|
||||
final Iterable<Lock> locks = strongLock.bulkGet(keys);
|
||||
final List<Lock> lockList = ListUtil.of(locks);
|
||||
|
||||
assertEquals(3, lockList.size(), "bulkGet 返回的锁数量应与 key 数量一致");
|
||||
|
||||
// 检查顺序性
|
||||
int prevIndex = -1;
|
||||
for (final Lock lock : lockList) {
|
||||
final int index = findIndex(strongLock, lock);
|
||||
assertTrue(index >= prevIndex, "bulkGet 返回的锁应按索引升序");
|
||||
prevIndex = index;
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLockConcurrency() throws InterruptedException {
|
||||
final int threadCount = SEGMENT_COUNT * 2;
|
||||
final CountDownLatch startLatch = new CountDownLatch(1);
|
||||
final CountDownLatch endLatch = new CountDownLatch(threadCount);
|
||||
final ExecutorService executor = Executors.newFixedThreadPool(threadCount);
|
||||
final List<String> keys = new ArrayList<>();
|
||||
for (int i = 0; i < threadCount; i++) {
|
||||
keys.add("key" + i);
|
||||
}
|
||||
|
||||
for (int i = 0; i < threadCount; i++) {
|
||||
final String key = keys.get(i);
|
||||
executor.submit(() -> {
|
||||
try {
|
||||
startLatch.await();
|
||||
final Lock lock = strongLock.get(key);
|
||||
lock.lock();
|
||||
try {
|
||||
Thread.sleep(100); // 模拟工作
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
} catch (final InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
} finally {
|
||||
endLatch.countDown();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
startLatch.countDown();
|
||||
assertTrue(endLatch.await(2000, java.util.concurrent.TimeUnit.MILLISECONDS),
|
||||
"并发锁测试应在 2 秒内完成");
|
||||
executor.shutdown();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSemaphore() {
|
||||
final Semaphore sem = semaphore.get("testKey");
|
||||
assertEquals(2, sem.availablePermits(), "信号量初始许可应为 2");
|
||||
|
||||
sem.acquireUninterruptibly(2);
|
||||
assertEquals(0, sem.availablePermits(), "获取所有许可后应为 0");
|
||||
|
||||
sem.release(1);
|
||||
assertEquals(1, sem.availablePermits(), "释放一个许可后应为 1");
|
||||
}
|
||||
|
||||
@SuppressWarnings("ResultOfMethodCallIgnored")
|
||||
@Test
|
||||
public void testReadWriteLock() throws InterruptedException {
|
||||
final ReadWriteLock rwLock = readWriteLock.get("testKey");
|
||||
final Lock readLock = rwLock.readLock();
|
||||
final Lock writeLock = rwLock.writeLock();
|
||||
|
||||
// 测试读锁可重入
|
||||
readLock.lock();
|
||||
assertTrue(readLock.tryLock(), "读锁应允许多个线程同时持有");
|
||||
readLock.unlock();
|
||||
readLock.unlock();
|
||||
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
final ExecutorService executor = Executors.newSingleThreadExecutor();
|
||||
final AtomicBoolean readLockAcquired = new AtomicBoolean(false);
|
||||
|
||||
writeLock.lock();
|
||||
executor.submit(() -> {
|
||||
readLockAcquired.set(readLock.tryLock());
|
||||
latch.countDown();
|
||||
});
|
||||
|
||||
latch.await(500, TimeUnit.MILLISECONDS);
|
||||
assertFalse(readLockAcquired.get(), "写锁持有时读锁应失败");
|
||||
writeLock.unlock();
|
||||
|
||||
executor.shutdown();
|
||||
executor.awaitTermination(1, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWeakReferenceCleanup() throws InterruptedException {
|
||||
final SegmentLock<Lock> weakLockLarge = SegmentLock.lazyWeakLock(1024); // 超过 LARGE_LAZY_CUTOFF
|
||||
final Lock lock = weakLockLarge.get("testKey");
|
||||
|
||||
System.gc();
|
||||
Thread.sleep(100);
|
||||
|
||||
// 弱引用锁未被其他引用,应仍可获取
|
||||
final Lock lockAgain = weakLockLarge.get("testKey");
|
||||
assertSame(lock, lockAgain, "弱引用锁未被回收时应返回同一对象");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInvalidSegmentCount() {
|
||||
assertThrows(IllegalArgumentException.class, () -> SegmentLock.lock(0),
|
||||
"段数为 0 应抛出异常");
|
||||
assertThrows(IllegalArgumentException.class, () -> SegmentLock.lock(-1),
|
||||
"负段数应抛出异常");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHashDistribution() {
|
||||
final SegmentLock<Lock> lock = SegmentLock.lock(4);
|
||||
final int[] counts = new int[4];
|
||||
for (int i = 0; i < 100; i++) {
|
||||
final int index = findIndex(lock, lock.get("key" + i));
|
||||
counts[index]++;
|
||||
}
|
||||
for (final int count : counts) {
|
||||
assertTrue(count > 0, "每个段都应至少被分配到一个 key");
|
||||
}
|
||||
}
|
||||
|
||||
private int findIndex(final SegmentLock<Lock> lock, final Lock target) {
|
||||
for (int i = 0; i < lock.size(); i++) {
|
||||
if (lock.getAt(i) == target) {
|
||||
return i;
|
||||
}
|
||||
}
|
||||
return -1;
|
||||
}
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user