!1330 增加分段锁实现 (issue#IBYLR1@Gitee)

Merge pull request !1330 from 大阔/v5-dev
This commit is contained in:
Looly 2025-04-15 03:05:35 +00:00 committed by Gitee
commit 54d6b99b98
No known key found for this signature in database
GPG Key ID: 173E9B9CA92EEF8F
3 changed files with 812 additions and 0 deletions

View File

@ -1,5 +1,8 @@
package cn.hutool.core.thread.lock; package cn.hutool.core.thread.lock;
import java.util.concurrent.Semaphore;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.StampedLock; import java.util.concurrent.locks.StampedLock;
@ -40,4 +43,101 @@ public class LockUtil {
public static NoLock getNoLock(){ public static NoLock getNoLock(){
return NO_LOCK; return NO_LOCK;
} }
/**
* 创建分段锁强引用使用 ReentrantLock
*
* @param segments 分段数量必须大于 0
* @return 分段锁实例
*/
public static SegmentLock<Lock> createSegmentLock(int segments) {
return SegmentLock.lock(segments);
}
/**
* 创建分段读写锁强引用使用 ReentrantReadWriteLock
*
* @param segments 分段数量必须大于 0
* @return 分段读写锁实例
*/
public static SegmentLock<ReadWriteLock> createSegmentReadWriteLock(int segments) {
return SegmentLock.readWriteLock(segments);
}
/**
* 创建分段信号量强引用
*
* @param segments 分段数量必须大于 0
* @param permits 每个信号量的许可数
* @return 分段信号量实例
*/
public static SegmentLock<Semaphore> createSegmentSemaphore(int segments, int permits) {
return SegmentLock.semaphore(segments, permits);
}
/**
* 创建弱引用分段锁使用 ReentrantLock懒加载
*
* @param segments 分段数量必须大于 0
* @return 弱引用分段锁实例
*/
public static SegmentLock<Lock> createLazySegmentLock(int segments) {
return SegmentLock.lazyWeakLock(segments);
}
/**
* 根据 key 获取分段锁强引用
*
* @param segments 分段数量必须大于 0
* @param key 用于映射分段的 key
* @return 对应的 Lock 实例
*/
public static Lock getSegmentLock(int segments, Object key) {
return SegmentLock.lock(segments).get(key);
}
/**
* 根据 key 获取分段读锁强引用
*
* @param segments 分段数量必须大于 0
* @param key 用于映射分段的 key
* @return 对应的读锁实例
*/
public static Lock getSegmentReadLock(int segments, Object key) {
return SegmentLock.readWriteLock(segments).get(key).readLock();
}
/**
* 根据 key 获取分段写锁强引用
*
* @param segments 分段数量必须大于 0
* @param key 用于映射分段的 key
* @return 对应的写锁实例
*/
public static Lock getSegmentWriteLock(int segments, Object key) {
return SegmentLock.readWriteLock(segments).get(key).writeLock();
}
/**
* 根据 key 获取分段信号量强引用
*
* @param segments 分段数量必须大于 0
* @param permits 每个信号量的许可数
* @param key 用于映射分段的 key
* @return 对应的 Semaphore 实例
*/
public static Semaphore getSegmentSemaphore(int segments, int permits, Object key) {
return SegmentLock.semaphore(segments, permits).get(key);
}
/**
* 根据 key 获取弱引用分段锁懒加载
*
* @param segments 分段数量必须大于 0
* @param key 用于映射分段的 key
* @return 对应的 Lock 实例
*/
public static Lock getLazySegmentLock(int segments, Object key) {
return SegmentLock.lazyWeakLock(segments).get(key);
}
} }

View File

@ -0,0 +1,505 @@
package cn.hutool.core.thread.lock;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.lang.Assert;
import java.lang.ref.Reference;
import java.lang.ref.ReferenceQueue;
import java.lang.ref.WeakReference;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Supplier;
/**
* 分段锁工具类支持 LockSemaphore ReadWriteLock 的分段实现
* <p>
* 通过将锁分成多个段segments不同的操作可以并发使用不同的段避免所有线程竞争同一把锁
* 相等的 key 保证映射到同一段锁 key1.equals(key2) get(key1) get(key2) 返回相同对象
* 但不同 key 可能因哈希冲突映射到同一段段数越少冲突概率越高
* <p>
* 支持两种实现
* <ul>
* <li>强引用创建时初始化所有段内存占用稳定</li>
* <li>弱引用懒加载首次使用时创建段未使用时可被垃圾回收适合大量段但使用较少的场景</li>
* </ul>
*
* @author Guava,dakuo
* @since 5.8.37
*/
public abstract class SegmentLock<L> {
/** 当段数大于此阈值时,使用 ConcurrentMap 替代大数组以节省内存(适用于懒加载场景) */
private static final int LARGE_LAZY_CUTOFF = 1024;
private SegmentLock() {}
/**
* 根据 key 获取对应的锁段保证相同 key 返回相同对象
*
* @param key 非空 key
* @return 对应的锁段
*/
public abstract L get(Object key);
/**
* 根据索引获取锁段索引范围为 [0, size())
*
* @param index 索引
* @return 指定索引的锁段
*/
public abstract L getAt(int index);
/**
* 计算 key 对应的段索引
*
* @param key 非空 key
* @return 段索引
*/
abstract int indexFor(Object key);
/**
* 获取总段数
*
* @return 段数
*/
public abstract int size();
/**
* 批量获取多个 key 对应的锁段列表按索引升序排列避免死锁
*
* @param keys 非空 key 集合
* @return 锁段列表可能有重复
*/
public Iterable<L> bulkGet(Iterable<? extends Object> keys) {
@SuppressWarnings("unchecked")
List<Object> result = (List<Object>) CollUtil.newArrayList(keys);
if (CollUtil.isEmpty(result)) {
return Collections.emptyList();
}
int[] stripes = new int[result.size()];
for (int i = 0; i < result.size(); i++) {
stripes[i] = indexFor(result.get(i));
}
Arrays.sort(stripes);
int previousStripe = stripes[0];
result.set(0, getAt(previousStripe));
for (int i = 1; i < result.size(); i++) {
int currentStripe = stripes[i];
if (currentStripe == previousStripe) {
result.set(i, result.get(i - 1));
} else {
result.set(i, getAt(currentStripe));
previousStripe = currentStripe;
}
}
@SuppressWarnings("unchecked")
List<L> asStripes = (List<L>) result;
return Collections.unmodifiableList(asStripes);
}
// 静态工厂方法
/**
* 创建强引用的分段锁所有段在创建时初始化
*
* @param stripes 段数
* @param supplier 锁提供者
* @param <L> 锁类型
* @return 分段锁实例
*/
public static <L> SegmentLock<L> custom(int stripes, Supplier<L> supplier) {
return new CompactSegmentLock<>(stripes, supplier);
}
/**
* 创建强引用的可重入锁分段实例
*
* @param stripes 段数
* @return 分段锁实例
*/
public static SegmentLock<Lock> lock(int stripes) {
return custom(stripes, PaddedLock::new);
}
/**
* 创建弱引用的可重入锁分段实例懒加载
*
* @param stripes 段数
* @return 分段锁实例
*/
public static SegmentLock<Lock> lazyWeakLock(int stripes) {
return lazyWeakCustom(stripes, () -> new ReentrantLock(false));
}
/**
* 创建弱引用的分段锁懒加载
*
* @param stripes 段数
* @param supplier 锁提供者
* @param <L> 锁类型
* @return 分段锁实例
*/
private static <L> SegmentLock<L> lazyWeakCustom(int stripes, Supplier<L> supplier) {
return stripes < LARGE_LAZY_CUTOFF
? new SmallLazySegmentLock<>(stripes, supplier)
: new LargeLazySegmentLock<>(stripes, supplier);
}
/**
* 创建强引用的信号量分段实例
*
* @param stripes 段数
* @param permits 每个信号量的许可数
* @return 分段信号量实例
*/
public static SegmentLock<Semaphore> semaphore(int stripes, int permits) {
return custom(stripes, () -> new PaddedSemaphore(permits));
}
/**
* 创建弱引用的信号量分段实例懒加载
*
* @param stripes 段数
* @param permits 每个信号量的许可数
* @return 分段信号量实例
*/
public static SegmentLock<Semaphore> lazyWeakSemaphore(int stripes, int permits) {
return lazyWeakCustom(stripes, () -> new Semaphore(permits, false));
}
/**
* 创建强引用的读写锁分段实例
*
* @param stripes 段数
* @return 分段读写锁实例
*/
public static SegmentLock<ReadWriteLock> readWriteLock(int stripes) {
return custom(stripes, ReentrantReadWriteLock::new);
}
/**
* 创建弱引用的读写锁分段实例懒加载
*
* @param stripes 段数
* @return 分段读写锁实例
*/
public static SegmentLock<ReadWriteLock> lazyWeakReadWriteLock(int stripes) {
return lazyWeakCustom(stripes, WeakSafeReadWriteLock::new);
}
// 内部实现类
/**
* 弱引用安全的读写锁实现确保读锁和写锁持有对自身的强引用
*/
private static final class WeakSafeReadWriteLock implements ReadWriteLock {
private final ReadWriteLock delegate;
WeakSafeReadWriteLock() {
this.delegate = new ReentrantReadWriteLock();
}
@Override
public Lock readLock() {
return new WeakSafeLock(delegate.readLock(), this);
}
@Override
public Lock writeLock() {
return new WeakSafeLock(delegate.writeLock(), this);
}
}
/**
* 弱引用安全的锁包装类确保持有强引用
*/
private static final class WeakSafeLock implements Lock {
private final Lock delegate;
private final WeakSafeReadWriteLock strongReference;
WeakSafeLock(Lock delegate, WeakSafeReadWriteLock strongReference) {
this.delegate = delegate;
this.strongReference = strongReference;
}
@Override
public void lock() {
delegate.lock();
}
@Override
public void lockInterruptibly() throws InterruptedException {
delegate.lockInterruptibly();
}
@Override
public boolean tryLock() {
return delegate.tryLock();
}
@Override
public boolean tryLock(long time, java.util.concurrent.TimeUnit unit) throws InterruptedException {
return delegate.tryLock(time, unit);
}
@Override
public void unlock() {
delegate.unlock();
}
@Override
public Condition newCondition() {
return new WeakSafeCondition(delegate.newCondition(), strongReference);
}
}
/**
* 弱引用安全的条件包装类
*/
private static final class WeakSafeCondition implements Condition {
private final Condition delegate;
/** 防止垃圾回收 */
private final WeakSafeReadWriteLock strongReference;
WeakSafeCondition(Condition delegate, WeakSafeReadWriteLock strongReference) {
this.delegate = delegate;
this.strongReference = strongReference;
}
@Override
public void await() throws InterruptedException {
delegate.await();
}
@Override
public void awaitUninterruptibly() {
delegate.awaitUninterruptibly();
}
@Override
public long awaitNanos(long nanosTimeout) throws InterruptedException {
return delegate.awaitNanos(nanosTimeout);
}
@Override
public boolean await(long time, TimeUnit unit) throws InterruptedException {
return delegate.await(time, unit);
}
@Override
public boolean awaitUntil(Date deadline) throws InterruptedException {
return delegate.awaitUntil(deadline);
}
@Override
public void signal() {
delegate.signal();
}
@Override
public void signalAll() {
delegate.signalAll();
}
}
/**
* 抽象基类确保段数为 2 的幂
*/
private abstract static class PowerOfTwoSegmentLock<L> extends SegmentLock<L> {
final int mask;
PowerOfTwoSegmentLock(int stripes) {
Assert.isTrue(stripes > 0, "Segment count must be positive");
this.mask = stripes > Integer.MAX_VALUE / 2 ? ALL_SET : ceilToPowerOfTwo(stripes) - 1;
}
@Override
final int indexFor(Object key) {
int hash = smear(key.hashCode());
return hash & mask;
}
@Override
public final L get(Object key) {
return getAt(indexFor(key));
}
}
/**
* 强引用实现使用固定数组存储段
*/
private static class CompactSegmentLock<L> extends PowerOfTwoSegmentLock<L> {
private final Object[] array;
CompactSegmentLock(int stripes, Supplier<L> supplier) {
super(stripes);
Assert.isTrue(stripes <= Integer.MAX_VALUE / 2, "Segment count must be <= 2^30");
this.array = new Object[mask + 1];
for (int i = 0; i < array.length; i++) {
array[i] = supplier.get();
}
}
@SuppressWarnings("unchecked")
@Override
public L getAt(int index) {
if (index < 0 || index >= array.length) {
throw new IllegalArgumentException("Index " + index + " out of bounds for size " + array.length);
}
return (L) array[index];
}
@Override
public int size() {
return array.length;
}
}
/**
* 小规模弱引用实现使用 AtomicReferenceArray 存储段
*/
private static class SmallLazySegmentLock<L> extends PowerOfTwoSegmentLock<L> {
final AtomicReferenceArray<ArrayReference<? extends L>> locks;
final Supplier<L> supplier;
final int size;
final ReferenceQueue<L> queue = new ReferenceQueue<>();
SmallLazySegmentLock(int stripes, Supplier<L> supplier) {
super(stripes);
this.size = (mask == ALL_SET) ? Integer.MAX_VALUE : mask + 1;
this.locks = new AtomicReferenceArray<>(size);
this.supplier = supplier;
}
@Override
public L getAt(int index) {
if (size != Integer.MAX_VALUE) {
Assert.isTrue(index >= 0 && index < size, "Index out of bounds");
}
ArrayReference<? extends L> existingRef = locks.get(index);
L existing = existingRef == null ? null : existingRef.get();
if (existing != null) {
return existing;
}
L created = supplier.get();
ArrayReference<L> newRef = new ArrayReference<>(created, index, queue);
while (!locks.compareAndSet(index, existingRef, newRef)) {
existingRef = locks.get(index);
existing = existingRef == null ? null : existingRef.get();
if (existing != null) {
return existing;
}
}
drainQueue();
return created;
}
private void drainQueue() {
Reference<? extends L> ref;
while ((ref = queue.poll()) != null) {
ArrayReference<? extends L> arrayRef = (ArrayReference<? extends L>) ref;
locks.compareAndSet(arrayRef.index, arrayRef, null);
}
}
@Override
public int size() {
return size;
}
private static final class ArrayReference<L> extends WeakReference<L> {
final int index;
ArrayReference(L referent, int index, ReferenceQueue<L> queue) {
super(referent, queue);
this.index = index;
}
}
}
/**
* 大规模弱引用实现使用 ConcurrentMap 存储段
*/
private static class LargeLazySegmentLock<L> extends PowerOfTwoSegmentLock<L> {
final ConcurrentMap<Integer, L> locks;
final Supplier<L> supplier;
final int size;
LargeLazySegmentLock(int stripes, Supplier<L> supplier) {
super(stripes);
this.size = (mask == ALL_SET) ? Integer.MAX_VALUE : mask + 1;
this.locks = new ConcurrentHashMap<>();
this.supplier = supplier;
}
@Override
public L getAt(int index) {
if (size != Integer.MAX_VALUE) {
Assert.isTrue(index >= 0 && index < size, "Index out of bounds");
}
L existing = locks.get(index);
if (existing != null) {
return existing;
}
L created = supplier.get();
existing = locks.putIfAbsent(index, created);
return existing != null ? existing : created;
}
@Override
public int size() {
return size;
}
}
private static final int ALL_SET = ~0;
private static int ceilToPowerOfTwo(int x) {
return 1 << (Integer.SIZE - Integer.numberOfLeadingZeros(x - 1));
}
private static int smear(int hashCode) {
hashCode ^= (hashCode >>> 20) ^ (hashCode >>> 12);
return hashCode ^ (hashCode >>> 7) ^ (hashCode >>> 4);
}
/**
* 填充锁避免缓存行干扰
*/
private static class PaddedLock extends ReentrantLock {
long unused1;
long unused2;
long unused3;
PaddedLock() {
super(false);
}
}
/**
* 填充信号量避免缓存行干扰
*/
private static class PaddedSemaphore extends Semaphore {
long unused1;
long unused2;
long unused3;
PaddedSemaphore(int permits) {
super(permits, false);
}
}
}

View File

@ -0,0 +1,207 @@
package cn.hutool.core.thread;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.thread.lock.SegmentLock;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import static org.junit.jupiter.api.Assertions.*;
/**
* SegmentLock 单元测试类
*/
public class SegmentLockTest {
private static final int SEGMENT_COUNT = 4;
private SegmentLock<Lock> strongLock;
private SegmentLock<Lock> weakLock;
private SegmentLock<Semaphore> semaphore;
private SegmentLock<ReadWriteLock> readWriteLock;
@BeforeEach
public void setUp() {
strongLock = SegmentLock.lock(SEGMENT_COUNT);
weakLock = SegmentLock.lazyWeakLock(SEGMENT_COUNT);
semaphore = SegmentLock.semaphore(SEGMENT_COUNT, 2);
readWriteLock = SegmentLock.readWriteLock(SEGMENT_COUNT);
}
@Test
public void testSize() {
assertEquals(SEGMENT_COUNT, strongLock.size());
assertEquals(SEGMENT_COUNT, weakLock.size());
assertEquals(SEGMENT_COUNT, semaphore.size());
assertEquals(SEGMENT_COUNT, readWriteLock.size());
}
@Test
public void testGetWithSameKey() {
// 相同 key 应返回相同锁
String key1 = "testKey";
String key2 = new String("testKey"); // equals 但不同对象
Lock lock1 = strongLock.get(key1);
Lock lock2 = strongLock.get(key2);
assertSame(lock1, lock2, "相同 key 应返回同一锁对象");
Lock weakLock1 = weakLock.get(key1);
Lock weakLock2 = weakLock.get(key2);
assertSame(weakLock1, weakLock2, "弱引用锁相同 key 应返回同一锁对象");
}
@Test
public void testGetAt() {
for (int i = 0; i < SEGMENT_COUNT; i++) {
Lock lock = strongLock.getAt(i);
assertNotNull(lock, "getAt 返回的锁不应为 null");
}
assertThrows(IllegalArgumentException.class, () -> strongLock.getAt(SEGMENT_COUNT),
"超出段数的索引应抛出异常");
}
@Test
public void testBulkGet() {
List<String> keys = CollUtil.newArrayList("key1", "key2", "key3");
Iterable<Lock> locks = strongLock.bulkGet(keys);
List<Lock> lockList = CollUtil.newArrayList(locks);
assertEquals(3, lockList.size(), "bulkGet 返回的锁数量应与 key 数量一致");
// 检查顺序性
int prevIndex = -1;
for (Lock lock : lockList) {
int index = findIndex(strongLock, lock);
assertTrue(index >= prevIndex, "bulkGet 返回的锁应按索引升序");
prevIndex = index;
}
}
@Test
public void testLockConcurrency() throws InterruptedException {
int threadCount = SEGMENT_COUNT * 2;
CountDownLatch startLatch = new CountDownLatch(1);
CountDownLatch endLatch = new CountDownLatch(threadCount);
ExecutorService executor = Executors.newFixedThreadPool(threadCount);
List<String> keys = new ArrayList<>();
for (int i = 0; i < threadCount; i++) {
keys.add("key" + i);
}
for (int i = 0; i < threadCount; i++) {
final String key = keys.get(i);
executor.submit(() -> {
try {
startLatch.await();
Lock lock = strongLock.get(key);
lock.lock();
try {
Thread.sleep(100); // 模拟工作
} finally {
lock.unlock();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
endLatch.countDown();
}
});
}
startLatch.countDown();
assertTrue(endLatch.await(2000, java.util.concurrent.TimeUnit.MILLISECONDS),
"并发锁测试应在 2 秒内完成");
executor.shutdown();
}
@Test
public void testSemaphore() {
Semaphore sem = semaphore.get("testKey");
assertEquals(2, sem.availablePermits(), "信号量初始许可应为 2");
sem.acquireUninterruptibly(2);
assertEquals(0, sem.availablePermits(), "获取所有许可后应为 0");
sem.release(1);
assertEquals(1, sem.availablePermits(), "释放一个许可后应为 1");
}
@Test
public void testReadWriteLock() throws InterruptedException {
ReadWriteLock rwLock = readWriteLock.get("testKey");
Lock readLock = rwLock.readLock();
Lock writeLock = rwLock.writeLock();
// 测试读锁可重入
readLock.lock();
assertTrue(readLock.tryLock(), "读锁应允许多个线程同时持有");
readLock.unlock();
readLock.unlock();
CountDownLatch latch = new CountDownLatch(1);
ExecutorService executor = Executors.newSingleThreadExecutor();
AtomicBoolean readLockAcquired = new AtomicBoolean(false);
writeLock.lock();
executor.submit(() -> {
readLockAcquired.set(readLock.tryLock());
latch.countDown();
});
latch.await(500, TimeUnit.MILLISECONDS);
assertFalse(readLockAcquired.get(), "写锁持有时读锁应失败");
writeLock.unlock();
executor.shutdown();
executor.awaitTermination(1, TimeUnit.SECONDS);
}
@Test
public void testWeakReferenceCleanup() throws InterruptedException {
SegmentLock<Lock> weakLockLarge = SegmentLock.lazyWeakLock(1024); // 超过 LARGE_LAZY_CUTOFF
Lock lock = weakLockLarge.get("testKey");
System.gc();
Thread.sleep(100);
// 弱引用锁未被其他引用应仍可获取
Lock lockAgain = weakLockLarge.get("testKey");
assertSame(lock, lockAgain, "弱引用锁未被回收时应返回同一对象");
}
@Test
public void testInvalidSegmentCount() {
assertThrows(IllegalArgumentException.class, () -> SegmentLock.lock(0),
"段数为 0 应抛出异常");
assertThrows(IllegalArgumentException.class, () -> SegmentLock.lock(-1),
"负段数应抛出异常");
}
@Test
public void testHashDistribution() {
SegmentLock<Lock> lock = SegmentLock.lock(4);
int[] counts = new int[4];
for (int i = 0; i < 100; i++) {
int index = findIndex(lock, lock.get("key" + i));
counts[index]++;
}
for (int count : counts) {
assertTrue(count > 0, "每个段都应至少被分配到一个 key");
}
}
private int findIndex(SegmentLock<Lock> lock, Lock target) {
for (int i = 0; i < lock.size(); i++) {
if (lock.getAt(i) == target) {
return i;
}
}
return -1;
}
}