diff --git a/hutool-core/src/main/java/cn/hutool/core/collection/MemorySafeLinkedBlockingQueue.java b/hutool-core/src/main/java/cn/hutool/core/collection/MemorySafeLinkedBlockingQueue.java new file mode 100644 index 000000000..a22f93097 --- /dev/null +++ b/hutool-core/src/main/java/cn/hutool/core/collection/MemorySafeLinkedBlockingQueue.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cn.hutool.core.collection; + +import cn.hutool.core.thread.SimpleScheduler; +import cn.hutool.core.util.RuntimeUtil; + +import java.util.Collection; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +/** + * 内存安全的{@link LinkedBlockingQueue},可以解决OOM问题。
+ * 原理是通过Runtime#freeMemory()获取剩余内存,当剩余内存低于指定的阈值时,不再加入。 + * + *

+ * 此类来自于: + * Apache incubator-shenyu + *

+ * + * @author incubator-shenyu + * @since 6.0.0 + */ +public class MemorySafeLinkedBlockingQueue extends LinkedBlockingQueue { + private static final long serialVersionUID = 8032578371749960142L; + + private long maxFreeMemory; + + /** + * 构造 + * + * @param maxFreeMemory 最大剩余内存大小,当实际内存小于这个值时,不再加入元素 + */ + public MemorySafeLinkedBlockingQueue(final long maxFreeMemory) { + super(Integer.MAX_VALUE); + this.maxFreeMemory = maxFreeMemory; + } + + /** + * 构造 + * + * @param c 初始集合 + * @param maxFreeMemory 最大剩余内存大小,当实际内存小于这个值时,不再加入元素 + */ + public MemorySafeLinkedBlockingQueue(final Collection c, + final long maxFreeMemory) { + super(c); + this.maxFreeMemory = maxFreeMemory; + } + + /** + * set the max free memory. + * + * @param maxFreeMemory the max free memory + */ + public void setMaxFreeMemory(final int maxFreeMemory) { + this.maxFreeMemory = maxFreeMemory; + } + + /** + * get the max free memory. + * + * @return the max free memory limit + */ + public long getMaxFreeMemory() { + return maxFreeMemory; + } + + /** + * determine if there is any remaining free memory.
+ * 剩余内存是否大于给定阈值 + * + * @return {@code true}则表示大于阈值,可以加入元素,否则无法加入 + */ + public boolean hasRemainedMemory() { + return FreeMemoryCalculator.INSTANCE.getResult() > maxFreeMemory; + } + + @Override + public void put(final E e) throws InterruptedException { + if (hasRemainedMemory()) { + super.put(e); + } + } + + @Override + public boolean offer(final E e, final long timeout, final TimeUnit unit) throws InterruptedException { + return hasRemainedMemory() && super.offer(e, timeout, unit); + } + + @Override + public boolean offer(final E e) { + return hasRemainedMemory() && super.offer(e); + } + + /** + * 获取内存剩余大小的定时任务 + */ + private static class FreeMemoryCalculator extends SimpleScheduler { + private static final FreeMemoryCalculator INSTANCE = new FreeMemoryCalculator(); + + FreeMemoryCalculator() { + super(new SimpleScheduler.Job() { + private volatile long maxAvailable; + + @Override + public Long getResult() { + return this.maxAvailable; + } + + @Override + public void run() { + this.maxAvailable = RuntimeUtil.getFreeMemory(); + } + }, 50); + } + } +} diff --git a/hutool-core/src/main/java/cn/hutool/core/date/SystemClock.java b/hutool-core/src/main/java/cn/hutool/core/date/SystemClock.java index ac5e37807..5f9cf30f7 100644 --- a/hutool-core/src/main/java/cn/hutool/core/date/SystemClock.java +++ b/hutool-core/src/main/java/cn/hutool/core/date/SystemClock.java @@ -12,8 +12,8 @@ import java.util.concurrent.TimeUnit; * System.currentTimeMillis()之所以慢是因为去跟系统打了一次交道 * 后台定时更新时钟,JVM退出时,线程自动回收 * - * see: http://git.oschina.net/yu120/sequence - * @author lry,looly + * see: http://git.oschina.net/yu120/sequence + * @author lry, looly */ public class SystemClock { diff --git a/hutool-core/src/main/java/cn/hutool/core/thread/SimpleScheduler.java b/hutool-core/src/main/java/cn/hutool/core/thread/SimpleScheduler.java new file mode 100644 index 000000000..ee0cb0db4 --- /dev/null +++ b/hutool-core/src/main/java/cn/hutool/core/thread/SimpleScheduler.java @@ -0,0 +1,69 @@ +package cn.hutool.core.thread; + +import cn.hutool.core.util.RuntimeUtil; + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; + +/** + * 简单单线程任务调度器
+ * 通过自定义Job定时执行任务,通过{@link #getResult()} 可以获取调取时的执行结果 + * + * @param 结果类型 + */ +public class SimpleScheduler { + private final Job job; + + /** + * 构造 + * + * @param job 任务 + * @param period 任务间隔,单位毫秒 + */ + public SimpleScheduler(final Job job, final long period) { + this(job, 0, period, true); + } + + /** + * 构造 + * + * @param job 任务 + * @param initialDelay 初始延迟,单位毫秒 + * @param period 执行周期,单位毫秒 + * @param fixedRateOrFixedDelay {@code true}表示fixedRate模式,{@code false}表示fixedDelay模式 + */ + public SimpleScheduler(final Job job, final long initialDelay, final long period, final boolean fixedRateOrFixedDelay) { + this.job = job; + + final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); + // 启动定时任务 + ThreadUtil.schedule(scheduler, job, initialDelay, period, fixedRateOrFixedDelay); + // 定时任务在程序结束时结束 + RuntimeUtil.addShutdownHook(scheduler::shutdown); + } + + /** + * 获取执行任务的阶段性结果 + * + * @return 结果 + */ + public T getResult() { + return this.job.getResult(); + } + + /** + * 带有结果计算的任务
+ * 用户实现此接口,通过{@link #run()}实现定时任务的内容,定时任务每次执行或多次执行都可以产生一个结果
+ * 这个结果存储在一个volatile的对象属性中,通过{@link #getResult()}来读取这一阶段的结果。 + * + * @param 结果类型 + */ + public interface Job extends Runnable { + /** + * 获取执行结果 + * + * @return 执行结果 + */ + T getResult(); + } +} diff --git a/hutool-core/src/main/java/cn/hutool/core/thread/ThreadUtil.java b/hutool-core/src/main/java/cn/hutool/core/thread/ThreadUtil.java index 00c14b411..9542dfa1f 100644 --- a/hutool-core/src/main/java/cn/hutool/core/thread/ThreadUtil.java +++ b/hutool-core/src/main/java/cn/hutool/core/thread/ThreadUtil.java @@ -9,6 +9,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; @@ -114,7 +115,7 @@ public class ThreadUtil { * Blocking Coefficient(阻塞系数) = 阻塞时间/(阻塞时间+使用CPU的时间)
* 计算密集型任务的阻塞系数为0,而IO密集型任务的阻塞系数则接近于1。 *

- * see: http://blog.csdn.net/partner4java/article/details/9417663 + * see: http://blog.csdn.net/partner4java/article/details/9417663 * * @param blockingCoefficient 阻塞系数,阻塞因子介于0~1之间的数,阻塞因子越大,线程池中的线程数越多。 * @return {@link ThreadPoolExecutor} @@ -660,10 +661,10 @@ public class ThreadUtil { * @param initialDelay 初始延迟,单位毫秒 * @param period 执行周期,单位毫秒 * @param fixedRateOrFixedDelay {@code true}表示fixedRate模式,{@code false}表示fixedDelay模式 - * @return {@link ScheduledThreadPoolExecutor} + * @return {@link ScheduledExecutorService} * @since 5.5.8 */ - public static ScheduledThreadPoolExecutor schedule(final ScheduledThreadPoolExecutor executor, + public static ScheduledExecutorService schedule(final ScheduledExecutorService executor, final Runnable command, final long initialDelay, final long period, @@ -685,10 +686,10 @@ public class ThreadUtil { * @param period 执行周期 * @param timeUnit 时间单位 * @param fixedRateOrFixedDelay {@code true}表示fixedRate模式,{@code false}表示fixedDelay模式 - * @return {@link ScheduledThreadPoolExecutor} + * @return {@link ScheduledExecutorService} * @since 5.6.5 */ - public static ScheduledThreadPoolExecutor schedule(ScheduledThreadPoolExecutor executor, + public static ScheduledExecutorService schedule(ScheduledExecutorService executor, final Runnable command, final long initialDelay, final long period, diff --git a/hutool-core/src/test/java/cn/hutool/core/collection/MemorySafeLinkedBlockingQueueTest.java b/hutool-core/src/test/java/cn/hutool/core/collection/MemorySafeLinkedBlockingQueueTest.java new file mode 100644 index 000000000..8b1cbd148 --- /dev/null +++ b/hutool-core/src/test/java/cn/hutool/core/collection/MemorySafeLinkedBlockingQueueTest.java @@ -0,0 +1,13 @@ +package cn.hutool.core.collection; + +import org.junit.Assert; +import org.junit.Test; + +public class MemorySafeLinkedBlockingQueueTest { + + @Test + public void offerTest(){ + final MemorySafeLinkedBlockingQueue queue = new MemorySafeLinkedBlockingQueue<>(Integer.MAX_VALUE); + Assert.assertFalse(queue.offer("123")); + } +} diff --git a/hutool-core/src/test/java/cn/hutool/core/thread/SimpleSchedulerTest.java b/hutool-core/src/test/java/cn/hutool/core/thread/SimpleSchedulerTest.java new file mode 100644 index 000000000..76ff6b082 --- /dev/null +++ b/hutool-core/src/test/java/cn/hutool/core/thread/SimpleSchedulerTest.java @@ -0,0 +1,37 @@ +package cn.hutool.core.thread; + +import cn.hutool.core.io.FileUtil; +import cn.hutool.core.lang.Console; +import cn.hutool.core.util.RuntimeUtil; + +/** + * 简单定时任务测试 + */ +public class SimpleSchedulerTest { + public static void main(final String[] args) { + + // 新建一个定时任务,定时获取内存信息 + final SimpleScheduler scheduler = new SimpleScheduler<>(new SimpleScheduler.Job() { + private volatile long maxAvailable; + + @Override + public Long getResult() { + return this.maxAvailable; + } + + @Override + public void run() { + this.maxAvailable = RuntimeUtil.getFreeMemory(); + } + }, 50); + + // 另一个线程不停获取内存结果计算值 + ThreadUtil.execAsync(() -> { + //noinspection InfiniteLoopStatement + while (true) { + Console.log(FileUtil.readableFileSize(scheduler.getResult())); + ThreadUtil.sleep(1000); + } + }); + } +}