This commit is contained in:
Looly 2022-06-18 14:04:20 +08:00
parent 4ebec434a1
commit 103738d9ef
6 changed files with 259 additions and 7 deletions

View File

@ -0,0 +1,132 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hutool.core.collection;
import cn.hutool.core.thread.SimpleScheduler;
import cn.hutool.core.util.RuntimeUtil;
import java.util.Collection;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
/**
* 内存安全的{@link LinkedBlockingQueue}可以解决OOM问题<br>
* 原理是通过Runtime#freeMemory()获取剩余内存当剩余内存低于指定的阈值时不再加入
*
* <p>
* 此类来自于<a href="https://github.com/apache/incubator-shenyu/blob/master/shenyu-common/src/main/java/org/apache/shenyu/common/concurrent/MemorySafeLinkedBlockingQueue.java">
* Apache incubator-shenyu</a>
* </p>
*
* @author incubator-shenyu
* @since 6.0.0
*/
public class MemorySafeLinkedBlockingQueue<E> extends LinkedBlockingQueue<E> {
private static final long serialVersionUID = 8032578371749960142L;
private long maxFreeMemory;
/**
* 构造
*
* @param maxFreeMemory 最大剩余内存大小当实际内存小于这个值时不再加入元素
*/
public MemorySafeLinkedBlockingQueue(final long maxFreeMemory) {
super(Integer.MAX_VALUE);
this.maxFreeMemory = maxFreeMemory;
}
/**
* 构造
*
* @param c 初始集合
* @param maxFreeMemory 最大剩余内存大小当实际内存小于这个值时不再加入元素
*/
public MemorySafeLinkedBlockingQueue(final Collection<? extends E> c,
final long maxFreeMemory) {
super(c);
this.maxFreeMemory = maxFreeMemory;
}
/**
* set the max free memory.
*
* @param maxFreeMemory the max free memory
*/
public void setMaxFreeMemory(final int maxFreeMemory) {
this.maxFreeMemory = maxFreeMemory;
}
/**
* get the max free memory.
*
* @return the max free memory limit
*/
public long getMaxFreeMemory() {
return maxFreeMemory;
}
/**
* determine if there is any remaining free memory.<br>
* 剩余内存是否大于给定阈值
*
* @return {@code true}则表示大于阈值可以加入元素否则无法加入
*/
public boolean hasRemainedMemory() {
return FreeMemoryCalculator.INSTANCE.getResult() > maxFreeMemory;
}
@Override
public void put(final E e) throws InterruptedException {
if (hasRemainedMemory()) {
super.put(e);
}
}
@Override
public boolean offer(final E e, final long timeout, final TimeUnit unit) throws InterruptedException {
return hasRemainedMemory() && super.offer(e, timeout, unit);
}
@Override
public boolean offer(final E e) {
return hasRemainedMemory() && super.offer(e);
}
/**
* 获取内存剩余大小的定时任务
*/
private static class FreeMemoryCalculator extends SimpleScheduler<Long> {
private static final FreeMemoryCalculator INSTANCE = new FreeMemoryCalculator();
FreeMemoryCalculator() {
super(new SimpleScheduler.Job<Long>() {
private volatile long maxAvailable;
@Override
public Long getResult() {
return this.maxAvailable;
}
@Override
public void run() {
this.maxAvailable = RuntimeUtil.getFreeMemory();
}
}, 50);
}
}
}

View File

@ -12,8 +12,8 @@ import java.util.concurrent.TimeUnit;
* System.currentTimeMillis()之所以慢是因为去跟系统打了一次交道
* 后台定时更新时钟JVM退出时线程自动回收
*
* see http://git.oschina.net/yu120/sequence
* @author lry,looly
* see <a href="http://git.oschina.net/yu120/sequence">http://git.oschina.net/yu120/sequence</a>
* @author lry, looly
*/
public class SystemClock {

View File

@ -0,0 +1,69 @@
package cn.hutool.core.thread;
import cn.hutool.core.util.RuntimeUtil;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
/**
* 简单单线程任务调度器<br>
* 通过自定义Job定时执行任务通过{@link #getResult()} 可以获取调取时的执行结果
*
* @param <T> 结果类型
*/
public class SimpleScheduler<T> {
private final Job<T> job;
/**
* 构造
*
* @param job 任务
* @param period 任务间隔单位毫秒
*/
public SimpleScheduler(final Job<T> job, final long period) {
this(job, 0, period, true);
}
/**
* 构造
*
* @param job 任务
* @param initialDelay 初始延迟单位毫秒
* @param period 执行周期单位毫秒
* @param fixedRateOrFixedDelay {@code true}表示fixedRate模式{@code false}表示fixedDelay模式
*/
public SimpleScheduler(final Job<T> job, final long initialDelay, final long period, final boolean fixedRateOrFixedDelay) {
this.job = job;
final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
// 启动定时任务
ThreadUtil.schedule(scheduler, job, initialDelay, period, fixedRateOrFixedDelay);
// 定时任务在程序结束时结束
RuntimeUtil.addShutdownHook(scheduler::shutdown);
}
/**
* 获取执行任务的阶段性结果
*
* @return 结果
*/
public T getResult() {
return this.job.getResult();
}
/**
* 带有结果计算的任务<br>
* 用户实现此接口通过{@link #run()}实现定时任务的内容定时任务每次执行或多次执行都可以产生一个结果<br>
* 这个结果存储在一个volatile的对象属性中通过{@link #getResult()}来读取这一阶段的结果
*
* @param <T> 结果类型
*/
public interface Job<T> extends Runnable {
/**
* 获取执行结果
*
* @return 执行结果
*/
T getResult();
}
}

View File

@ -9,6 +9,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
@ -114,7 +115,7 @@ public class ThreadUtil {
* Blocking Coefficient(阻塞系数) = 阻塞时间阻塞时间+使用CPU的时间<br>
* 计算密集型任务的阻塞系数为0而IO密集型任务的阻塞系数则接近于1
* <p>
* see: http://blog.csdn.net/partner4java/article/details/9417663
* see: <a href="http://blog.csdn.net/partner4java/article/details/9417663">http://blog.csdn.net/partner4java/article/details/9417663</a>
*
* @param blockingCoefficient 阻塞系数阻塞因子介于0~1之间的数阻塞因子越大线程池中的线程数越多
* @return {@link ThreadPoolExecutor}
@ -660,10 +661,10 @@ public class ThreadUtil {
* @param initialDelay 初始延迟单位毫秒
* @param period 执行周期单位毫秒
* @param fixedRateOrFixedDelay {@code true}表示fixedRate模式{@code false}表示fixedDelay模式
* @return {@link ScheduledThreadPoolExecutor}
* @return {@link ScheduledExecutorService}
* @since 5.5.8
*/
public static ScheduledThreadPoolExecutor schedule(final ScheduledThreadPoolExecutor executor,
public static ScheduledExecutorService schedule(final ScheduledExecutorService executor,
final Runnable command,
final long initialDelay,
final long period,
@ -685,10 +686,10 @@ public class ThreadUtil {
* @param period 执行周期
* @param timeUnit 时间单位
* @param fixedRateOrFixedDelay {@code true}表示fixedRate模式{@code false}表示fixedDelay模式
* @return {@link ScheduledThreadPoolExecutor}
* @return {@link ScheduledExecutorService}
* @since 5.6.5
*/
public static ScheduledThreadPoolExecutor schedule(ScheduledThreadPoolExecutor executor,
public static ScheduledExecutorService schedule(ScheduledExecutorService executor,
final Runnable command,
final long initialDelay,
final long period,

View File

@ -0,0 +1,13 @@
package cn.hutool.core.collection;
import org.junit.Assert;
import org.junit.Test;
public class MemorySafeLinkedBlockingQueueTest {
@Test
public void offerTest(){
final MemorySafeLinkedBlockingQueue<String> queue = new MemorySafeLinkedBlockingQueue<>(Integer.MAX_VALUE);
Assert.assertFalse(queue.offer("123"));
}
}

View File

@ -0,0 +1,37 @@
package cn.hutool.core.thread;
import cn.hutool.core.io.FileUtil;
import cn.hutool.core.lang.Console;
import cn.hutool.core.util.RuntimeUtil;
/**
* 简单定时任务测试
*/
public class SimpleSchedulerTest {
public static void main(final String[] args) {
// 新建一个定时任务定时获取内存信息
final SimpleScheduler<Long> scheduler = new SimpleScheduler<>(new SimpleScheduler.Job<Long>() {
private volatile long maxAvailable;
@Override
public Long getResult() {
return this.maxAvailable;
}
@Override
public void run() {
this.maxAvailable = RuntimeUtil.getFreeMemory();
}
}, 50);
// 另一个线程不停获取内存结果计算值
ThreadUtil.execAsync(() -> {
//noinspection InfiniteLoopStatement
while (true) {
Console.log(FileUtil.readableFileSize(scheduler.getResult()));
ThreadUtil.sleep(1000);
}
});
}
}