mirror of
https://gitee.com/chinabugotech/hutool.git
synced 2025-05-09 23:51:34 +08:00
!737 1、ThreadUtil 增加:newPhaser;2、修改不准确注释;3、增加:Phaser、CountDownLatch最常用示例
Merge pull request !737 from dazer007/v6-dev
This commit is contained in:
commit
65aecebd34
@ -10,6 +10,8 @@ import java.util.function.Consumer;
|
||||
* 如果阻塞过程中被中断,就会抛出{@link InterruptedException}异常<br>
|
||||
* 有时候在线程池内访问第三方接口,只希望固定并发数去访问,并且不希望丢弃任务时使用此策略,队列满的时候会处于阻塞状态(例如刷库的场景)
|
||||
*
|
||||
* 其他系统内置的拒绝策略,见hutool定义的枚举 {@link RejectPolicy} 线程拒绝策略枚举.
|
||||
*
|
||||
* @author luozongle
|
||||
* @since 5.8.0
|
||||
*/
|
||||
|
@ -20,7 +20,7 @@ public enum RejectPolicy {
|
||||
DISCARD(new ThreadPoolExecutor.DiscardPolicy()),
|
||||
/** 如果执行程序尚未关闭,则位于工作队列头部的任务将被删除,然后重试执行程序(如果再次失败,则重复此过程) */
|
||||
DISCARD_OLDEST(new ThreadPoolExecutor.DiscardOldestPolicy()),
|
||||
/** 由主线程来直接执行 */
|
||||
/** 调用者线程来执行被丢弃的任务;一般可能是由主线程来直接执行 */
|
||||
CALLER_RUNS(new ThreadPoolExecutor.CallerRunsPolicy()),
|
||||
/** 当任务队列过长时处于阻塞状态,直到添加到队列中,固定并发数去访问,并且不希望丢弃任务时使用此策略 */
|
||||
BLOCK(new BlockPolicy());
|
||||
|
@ -4,6 +4,8 @@ import java.lang.Thread.UncaughtExceptionHandler;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.CompletionService;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.Phaser;
|
||||
import java.util.concurrent.CyclicBarrier;
|
||||
import java.util.concurrent.ExecutorCompletionService;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Future;
|
||||
@ -272,11 +274,116 @@ public class ThreadUtil {
|
||||
/**
|
||||
* 新建一个CountDownLatch,一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。
|
||||
*
|
||||
* @param threadCount 线程数量
|
||||
* 示例:等6个同学都离开教室,班长才能锁门。
|
||||
* <pre>{@code
|
||||
* CountDownLatch latch = new CountDownLatch(6); // 总共任务是6
|
||||
* for (int x = 0; x < 6; x++) {
|
||||
* //具体生产任务,可以用线程池替换
|
||||
* new Thread(()->{
|
||||
* try {
|
||||
* //每个同学在教室待上几秒秒钟
|
||||
* int time = ThreadLocalRandom.current().nextInt(1, 5);
|
||||
* TimeUnit.SECONDS.sleep(time);
|
||||
* } catch (InterruptedException e) {
|
||||
* e.printStackTrace();
|
||||
* }
|
||||
* System.out.printf("同学【%s】,已经离开了教室%n", Thread.currentThread().getName());
|
||||
* latch.countDown(); // 减1(每离开一个同学,减去1),必须执行,可以放到 try...finally中
|
||||
* },"同学 - " + x).start();
|
||||
* }
|
||||
* latch.await(); // 等待计数为0后再解除阻塞;(等待所有同学离开)
|
||||
* System.out.println("【主线程】所有同学都离开了教室,开始锁教室大门了。");
|
||||
* }
|
||||
* </pre>
|
||||
*
|
||||
* 该示例,也可以用:{@link Phaser} 移相器 进行实现
|
||||
*
|
||||
* @param taskCount 任务数量
|
||||
* @return CountDownLatch
|
||||
*/
|
||||
public static CountDownLatch newCountDownLatch(final int threadCount) {
|
||||
return new CountDownLatch(threadCount);
|
||||
public static CountDownLatch newCountDownLatch(final int taskCount) {
|
||||
return new CountDownLatch(taskCount);
|
||||
}
|
||||
|
||||
/**
|
||||
* 新建一个CycleBarrier 循环栅栏,一个同步辅助类
|
||||
*
|
||||
* 示例:7个同学,集齐7个龙珠,7个同学一起召唤神龙;前后集齐了2次
|
||||
* <pre>{@code
|
||||
* AtomicInteger times = new AtomicInteger();
|
||||
* CyclicBarrier barrier = new CyclicBarrier(7, ()->{
|
||||
* System.out.println("");
|
||||
* System.out.println("");
|
||||
* System.out.println("【循环栅栏业务处理】7个子线程 都收集了一颗龙珠,七颗龙珠已经收集齐全,开始召唤神龙。" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
|
||||
* times.getAndIncrement();
|
||||
* }); // 现在设置的栅栏的数量为2
|
||||
* for (int x = 0; x < 7; x++) { // 创建7个线程, 当然也可以使用线程池替换。
|
||||
* new Thread(() -> {
|
||||
* while (times.get() < 2) {
|
||||
* try {
|
||||
* System.out.printf("【Barrier - 收集龙珠】当前的线程名称:%s%n", Thread.currentThread().getName());
|
||||
* int time = ThreadLocalRandom.current().nextInt(1, 10); // 等待一段时间,模拟线程的执行时间
|
||||
* TimeUnit.SECONDS.sleep(time); // 模拟业务延迟,收集龙珠的时间
|
||||
* barrier.await(); // 等待,凑够了7个等待的线程
|
||||
* System.err.printf("〖Barrier - 举起龙珠召唤神龙〗当前的线程名称:%s\t%s%n", Thread.currentThread().getName(), LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
|
||||
* if (barrier.getParties() >= 7) {
|
||||
* barrier.reset(); // 重置栅栏,等待下一次的召唤。
|
||||
* }
|
||||
* } catch (Exception e) {
|
||||
* e.printStackTrace();
|
||||
* }
|
||||
* }
|
||||
* }, "线程 - " + x).start();
|
||||
* }
|
||||
* }</pre>
|
||||
*
|
||||
* 该示例,也可以用:{@link Phaser} 移相器 进行实现
|
||||
* @param taskCount 任务数量
|
||||
* @return 循环栅栏
|
||||
*/
|
||||
public static CyclicBarrier newCyclicBarrier(final int taskCount) {
|
||||
return new CyclicBarrier(taskCount);
|
||||
}
|
||||
|
||||
/**
|
||||
* 新建一个Phaser,一个同步辅助类,jdk1.7提供,可以完全替代CountDownLatch;
|
||||
* @since 6.0.1
|
||||
* @author dazer
|
||||
*
|
||||
* Pharser: 移相器、相位器,可重用同步屏障;
|
||||
* 功能可以替换:{@link CyclicBarrier}(固定线程)循环栅栏、{@link CountDownLatch}(固定计数)倒数计数、加上分层功能
|
||||
*
|
||||
* 示例1:等6个同学都离开教室,班长才能锁门。
|
||||
* <pre>{@code
|
||||
* Phaser phaser = new Phaser(6); // 总共任务是6
|
||||
* for (int x = 0; x < 6; x++) {
|
||||
* //具体生产任务,可以用线程池替换
|
||||
* new Thread(()->{
|
||||
* try {
|
||||
* //每个同学在教室待上几秒秒钟
|
||||
* int time = ThreadLocalRandom.current().nextInt(1, 5);
|
||||
* TimeUnit.SECONDS.sleep(5);
|
||||
* } catch (InterruptedException e) {
|
||||
* e.printStackTrace();
|
||||
* }
|
||||
* System.out.printf("同学【%s】,已经离开了教室%n", Thread.currentThread().getName());
|
||||
* phaser.arrive(); // 减1 等价于countDown()方法(每离开一个同学,减去1),必须执行,可以放到 try...finally中
|
||||
* },"同学 - " + x).start();
|
||||
* }
|
||||
* phaser.awaitAdvance(phaser.getPhase()); // 等价于latch.await()方法 等待计数为0后再解除阻塞;(等待所有同学离开)
|
||||
* System.out.println("【主线程】所有同学都离开了教室,开始锁教室大门了。");
|
||||
* }
|
||||
* </pre>
|
||||
*
|
||||
* 示例2:7个同学,集齐7个龙珠,7个同学一起召唤神龙;
|
||||
* 只需要:phaser.arrive(); --> phaser.arriveAndAwaitAdvance() //等待其他的线程就位
|
||||
* 该示例,也可以用:{@link CyclicBarrier} 进行实现
|
||||
*
|
||||
* @param taskCount 任务数量
|
||||
* @return Phaser
|
||||
*/
|
||||
public static Phaser newPhaser(final int taskCount) {
|
||||
return new Phaser(taskCount);
|
||||
}
|
||||
|
||||
/**
|
||||
|
Loading…
x
Reference in New Issue
Block a user