深入理解线程池!

为什么要使用线程池

线程池可以解决线程生命周期的系统开销问题,同时还可以加快响应速度:

  • 因为线程池中的线程是可以复用的,我们只用少量的线程去执行大量的任务,不用频繁创建线程。

线程池可以管理内存和 CPU 的使用,避免资源使用不当:

  • 线程池会根据配置和任务数量灵活地控制线程数量。

线程池可以统一管理资源:

  • 比如线程池可以统一管理任务队列和线程。

线程池参数

1
2
3
4
5
6
7
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)

corePoolSize:

  • 线程池的核心线程数量,即线程池中可以保持活动状态的最小线程数。
  • 当提交的任务数超过核心线程数量时,线程池可以创建更多的线程来执行任务。

maximumPoolSize:

  • 线程池中允许同时存在的最大线程数量,即当任务数超过核心线程数量时,线程池可以创建的最大线程数量。
  • 当线程池中的线程数量已经达到最大值且任务队列已满时,可以采取拒绝策略处理过多的任务。
  • 可以在创建线程池时指定最大线程数,也可以在运行时动态地修改。

keepAliveTime:

  • 线程空闲后的存活时间。
  • 当线程池中的线程数量超过核心线程数时,空闲的线程会在等待新任务到来的过程中等待一段时间。
  • 如果等待时间超过了 keepAliveTime,则该线程会被销毁,以释放资源。

unit:

  • keepAliveTime 的单位,可以是毫秒、微秒、纳秒等等。

workQueue:

  • 线程池中用于保存等待中任务的阻塞队列。

threadFactory:

  • 线程工厂,用于创建线程的工厂,可以自定义实现。
  • 可以选择使用默认的线程工厂,创建的线程都会在同一个线程组,并拥有一样的优先级,且都不是守护线程。
  • 也可以选择自己定制线程工厂,以方便给线程自定义命名,不同的线程池内的线程通常会根据具体业务来定制不同的线程名。

handler:

  • 当线程池已经关闭或已满时,新任务的处理策略。

image-20231018135610476

在创建了线程池后,线程池中的线程数为零。

当调用 execute() 方法添加一个请求任务时,线程池会做出如下判断:

  • 如果正在运行的线程数量 小于 corePoolSize,那么马上创建线程运行这个任务。

  • 如果正在运行的线程数量 大于或等于 corePoolSize,那么将这个任务放入队列。

  • 如果这个时候队列满了且正在运行的线程数量还 小于 maximumPoolSize,那么还是要创建非核心线程立刻运行这个任务。

  • 如果队列满了且正在运行的线程数量 大于或等于 maximumPoolSize,那么线程池会启动饱和拒绝策略来执行。

  • 当一个线程完成任务时,它会从队列中取下一个任务来执行。

当一个线程无事可做超过一定的时间(keepAliveTime)时,线程会判断:

  • 如果当前运行的线程数大于 corePoolSize,那么这个线程就被停掉。
  • 所以线程池的所有任务完成后,它最终会收缩到corePoolSize的大小。

拒绝策略

线程池会在以下两种情况下会拒绝新提交的任务:

当我们调用 shutdown 等方法关闭线程池后:

  • 即便此时可能线程池内部依然有没执行完的任务正在执行,但由于线程池已经关闭。
  • 此时如果再向线程池内提交任务,就会遭到拒绝。

线程池没有能力继续处理新提交的任务,也就是工作已经非常饱和的时候。

4种默认的拒绝策略:

第一种拒绝策略是 AbortPolicy

  • 在拒绝任务时,会直接抛出一个类型为 RejectedExecutionException 的 RuntimeException。
  • 让你感知到任务被拒绝了,可以根据业务逻辑选择重试或者放弃提交等策略。

第二种拒绝策略是 DiscardPolicy

  • 当新任务被提交后直接被丢弃掉,也不会给你任何的通知,存在一定的风险。
  • 因为我们提交的时候根本不知道这个任务会被丢弃,可能造成数据丢失。

第三种拒绝策略是 DiscardOldestPolicy

  • 如果线程池没被关闭且没有能力执行,会丢弃任务队列中的头结点。
  • 通常是存活时间最长的任务,这样可以腾出空间给新提交的任务。

第四种拒绝策略是 CallerRunsPolicy

  • 当有新任务提交后,如果线程池没被关闭且没有能力执行。
  • 则把这个任务交于提交任务的线程执行,也就是谁提交任务,谁就负责执行任务,这样做主要有两点好处:
  • 第一点:新提交的任务不会被丢弃,这样也就不会造成业务损失。
  • 第二点:由于谁提交任务谁就要负责执行任务,这样提交任务的线程就得负责执行任务,而执行任务又是比较耗时的。
  • 在这段期间,提交任务的线程被占用,也就不会再提交新的任务,减缓了任务提交的速度。

以上内置的策略均实现了 RejectedExecutionHandler 接口

可以自己扩展 RejectedExecutionHandler 接口,定义自己的拒绝策略。

使用CallerRunsPolicy顺序问题:

线程数线程池的最大线程数并且阻塞队列已满的情况下,后到的数据会执行拒绝策略。

让调用线程(提交任务的线程)直接执行此任务,导致数据处理顺序不一致。

当在多线程中数据处理时需要强关联数据时间顺序时,最好考虑一下其他的处理方式,避免踩坑。

设置线程数

公式:

最佳线程数目 = ((线程等待时间+线程CPU时间)/ 线程CPU时间 )* CPU数目

举例:

服务器CPU核数为4核,一个任务线程CPU耗时为20ms,线程等待(网络IO、磁盘IO)耗时80ms。

最佳线程数目:( 80 + 20 )/20 * 4 = 20

  • 线程的等待时间越大,线程数就要设置越大。

线程数设置多大,是根据我们自身的业务的,需要自己去压力测试,设置一个合理的数值。

CPU密集型:

  • 操作内存处理的业务,一般线程数设置为:CPU核数 + 1 或者 CPU核数*2,核数为4的话,一般设置 5 或 8
  • CPU密集任务只有在真正的多核CPU上才可能得到加速(通过多线程),而在单核CPU上。
  • 无论你开几个模拟的多线程该任务都不可能得到加速,因为CPU总的运算能力就那些。

IO密集型:

  • 会导致浪费大量的CPU运算能力浪费在等待。

  • 文件操作,网络操作,数据库操作,一般线程设置为:CPU核数 / (1-0.9),核数为4的话,一般设置 40

常见线程池

FixedThreadPool:

它的核心线程数和最大线程数是一样的,线程池中的线程数除了初始阶段需要从 0 开始增加外,之后的线程数量就是固定的。

就算任务数超过线程数,线程池也不会再创建更多的线程来处理任务,而是会把超出线程处理能力的任务放到任务队列中进行等待。

而且就算任务队列满了,也无法再增加新的线程了。

BlockingQueue选取的是LinkedBlockingQueue。

CachedThreadPool:

它的特点在于线程数是几乎可以无限增加的(实际最大可以达到 Integer.MAX_VALUE,为 2^31-1)。

  • 当线程闲置时还可以对线程进行回收。

它有一个用于存储提交任务的队列,但这个队列是 SynchronousQueue,队列的容量为0。

实际不存储任何任务,它只负责对任务进行中转和传递,所以效率比较高。

ScheduledThreadPool:

ScheduledThreadPool,它支持定时或周期性执行任务。

BlockingQueue选取的是延迟队列DelayedWorkQueue。

SingleThreadExecutor:

它会使用唯一的一个线程去执行任务,如果线程在执行任务的过程中发生异常,线程池也会重新创建一个线程来执行后续的任务。

这种线程池由于只有一个线程,所以非常适合用于所有任务都需要按被提交的顺序依次执行的场景。

BlockingQueue选取的是LinkedBlockingQueue。

ForkJoinPool

它非常适合执行可以产生子任务的任务。

适合用于递归的场景,例如树的遍历、最优路径搜索等场景

关闭线程池

shutdown():

调用 shutdown() 方法之后线程池并不是立刻就被关闭,线程池会在执行完正在执行的任务和队列中等待的任务后才彻底关闭。

调用 shutdown() 方法后如果还有新的任务被提交,线程池则会根据拒绝策略直接拒绝后续新提交的任务。

shutdownNow():

首先会给所有线程池中的线程发送 interrupt 中断信号,尝试中断这些任务的执行。

队列中正在等待的所有任务转移到一个 List 中并返回,可根据返回的任务 List 来进行一些补救的操作。

  • 例如记录在案并在后期重试。

线程异常

当一个线程池里面的线程异常后:

  • 当执行方式是execute时:
    • 会抛出异常,线程池会把这个线程移除掉,并创建一个新的线程放到线程池中。
  • 当执行方式是submit时:
    • 会抛出异常,可以捕获到异常,不会把这个线程移除掉,也不会创建新的线程放入到线程池中。

以上俩种执行方式,都不会影响线程池里面其他线程的正常执行。

线程复用原理

在线程池中,同一个线程可以从 BlockingQueue 中不断提取新任务来执行。

线程池对 Thread 进行了封装,并不是每次执行任务都会调用 Thread.start() 来创建新线程

  • 而是让每个线程去执行一个 循环任务

    • 在这个 循环任务 中,不停地检查是否还有任务等待被执行
  • 如果有则直接去执行这个任务,也就是调用任务的 run 方法

    • run 方法当作和普通方法一样的地位去调用
    • 相当于把每个任务的 run() 方法串联了起来
  • 所以线程数量并不增加。

实现线程复用的逻辑主要在一个不停循环的 while 循环体中

  • 通过取 Worker 的 firstTask 或者通过 getTask 方法从 workQueue 中获取待执行的任务。

  • 直接调用 task 的 run 方法来执行具体的任务(而不是新建线程)。

ThreadPoolExecutor重要属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
//ctl 线程池状态控制字段,由两部分组成:
//1:workerCount wc 工作线程数,我们限制 workerCount 最大到(2^29)-1,大概 5 亿个线程
//2:runState rs 线程池的状态,提供了生命周期的控制,源码中有很多关于状态的校验,状态枚举如下:
//RUNNING(-536870912):接受新任务或者处理队列里的任务。
//SHUTDOWN(0):不接受新任务,但仍在处理已经在队列里面的任务。
//STOP(-536870912):不接受新任务,也不处理队列中的任务,对正在执行的任务进行中断。
//TIDYING(1073741824): 所以任务都被中断,workerCount 是 0,整理状态
//TERMINATED(1610612736): terminated() 已经完成的时候

//runState 之间的转变过程:
//RUNNING -> SHUTDOWN:调用 shudown(),finalize()
//(RUNNING or SHUTDOWN) -> STOP:调用shutdownNow()
//SHUTDOWN -> TIDYING -> workerCount ==0
//STOP -> TIDYING -> workerCount ==0
//TIDYING -> TERMINATED -> terminated() 执行完成之后
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;// 29
private static final int CAPACITY = (1 << COUNT_BITS) - 1;// =(2^29)-1=536870911

// Packing and unpacking ctl
private static int ctlOf(int rs, int wc) { return rs | wc; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int runStateOf(int c) { return c & ~CAPACITY; }

// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;//-536870912
private static final int SHUTDOWN = 0 << COUNT_BITS;//0
private static final int STOP = 1 << COUNT_BITS;//-536870912
private static final int TIDYING = 2 << COUNT_BITS;//1073741824
private static final int TERMINATED = 3 << COUNT_BITS;//1610612736

// 已完成任务的计数
volatile long completedTasks;
// 线程池最大容量
private int largestPoolSize;
// 已经完成的任务数
private long completedTaskCount;
// 用户可控制的参数都是 volatile 修饰的
// 可以使用 threadFactory 创建 thread
// 创建失败一般不抛出异常,只有在 OutOfMemoryError 时候才会
private volatile ThreadFactory threadFactory;
// 饱和或者运行中拒绝任务的 handler 处理类
private volatile RejectedExecutionHandler handler;
// 线程存活时间设置
private volatile long keepAliveTime;
// 设置 true 的话,核心线程空闲 keepAliveTime 时间后,也会被回收
private volatile boolean allowCoreThreadTimeOut;
// coreSize
private volatile int corePoolSize;
// maxSize 最大限制 (2^29)-1
private volatile int maximumPoolSize;
// 默认的拒绝策略
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();

// 队列会 hold 住任务,并且利用队列的阻塞的特性,来保持线程的存活周期
private final BlockingQueue<Runnable> workQueue;

// 大多数情况下是控制对 workers 的访问权限
private final ReentrantLock mainLock = new ReentrantLock();
private final Condition termination = mainLock.newCondition();

// 包含线程池中所有的工作线程
private final HashSet<Worker> workers = new HashSet<Worker>();

Worker可以理解成线程池中任务运行的最小单元:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
// 线程池中任务执行的最小单元
// Worker 继承 AQS,具有锁功能
// Worker 实现 Runnable,本身是一个可执行的任务
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
// 任务运行的线程
final Thread thread;

// 需要执行的任务
Runnable firstTask;

// 非常巧妙的设计,Worker本身是个 Runnable,把自己作为任务传递给 thread
// 内部有个属性又设置了 Runnable
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
// 把 Worker 自己作为 thread 运行的任务
this.thread = getThreadFactory().newThread(this);
}

/** Worker 本身是 Runnable,run 方法是 Worker 执行的入口, runWorker 是外部的方法 */
public void run() {
runWorker(this);
}

private static final long serialVersionUID = 6138294804551838833L;

// Lock methods
// 0 代表没有锁住,1 代表锁住
protected boolean isHeldExclusively() {
return getState() != 0;
}
// 尝试加锁,CAS 赋值为 1,表示锁住
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
// 尝试释放锁,释放锁没有 CAS 校验,可以任意的释放锁
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}

public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }

void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}

在线程池中,最小的执行单位就是 Worker,所以 Worker 实现了 Runnable 接口,实现了 run 方法。

Worker 本身也实现了 AQS,所以其本身也是一个锁,其在执行任务的时候,会锁住自己,任务执行完成之后,会释放自己。

线程池的任务提交

线程池的任务提交从 submit 方法 开始,主要做了两件事情:

把 Runnable 和 Callable 都转化成 FutureTask。

使用 execute 方法执行 FutureTask。

在 execute 方法中,多次调用 addWorker 方法把任务传入,Worker 内部有一个 Thread 对象。

它正是最终真正执行任务的线程,所以一个 Worker 就对应线程池中的一个线程,addWorker 就代表增加线程。

线程复用的逻辑实现主要在 Worker 类中的 run 方法里执行的 runWorker 方法中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 工作的线程小于核心线程数,创建新的线程,成功返回,失败不抛异常
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
// 线程池状态可能发生变化
c = ctl.get();
}
// 工作的线程大于等于核心线程数,或者新建线程失败
// 线程池状态正常,并且可以入队的话,尝试入队列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 如果线程池状态异常 尝试从队列中移除任务,可以移除的话就拒绝掉任务
if (!isRunning(recheck) && remove(command))
reject(command);
// 发现可运行的线程数是 0,就初始化一个线程,这里是个极限情况,入队的时候,突然发现
// 可用线程都被回收了
else if (workerCountOf(recheck) == 0)
// Runnable是空的,不会影响新增线程,但是线程在 start 的时候不会运行
// Thread.run() 里面有判断
addWorker(null, false);
}
// 队列满了,开启线程到 maxSize,如果失败直接拒绝,
else if (!addWorker(command, false))
reject(command);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
// 结合线程池的情况看是否可以添加新的 worker
// firstTask 不为空可以直接执行,为空执行不了,Thread.run()方法有判断,Runnable为空不执行
// core 为 true 表示线程最大新增个数是 coresize,false 表示最大新增个数是 maxsize
// 返回 true 代表成功,false 失败
// break retry 跳到retry处,且不再进入循环
// continue retry 跳到retry处,且再次进入循环
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
// 先是各种状态的校验
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
// rs >= SHUTDOWN 说明线程池状态不正常
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;

for (;;) {
int wc = workerCountOf(c);
// 工作中的线程数大于等于容量,或者大于等于 coreSize or maxSize
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
// break 结束 retry 的 for 循环
break retry;
c = ctl.get(); // Re-read ctl
// 线程池状态被更改
if (runStateOf(c) != rs)
// 跳转到retry位置
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}

boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 巧妙的设计,Worker 本身是个 Runnable.
// 在初始化的过程中,会把 worker 丢给 thread 去初始化
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 启动线程,实际上去执行 Worker.run 方法
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
1
2
3
public void run() {
runWorker(this);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
//帮助gc回收
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// task 为空的情况:
// 1:任务入队列了,极限情况下,发现没有运行的线程,于是新增一个线程;
// 2:线程执行完任务执行,再次回到 while 循环。
// 如果 task 为空,会使用 getTask 方法阻塞从队列中拿数据,如果拿不到数据,会阻塞住
while (task != null || (task = getTask()) != null) {
//锁住 worker
w.lock();
// 线程池 stop 中,但是线程没有到达中断状态,帮助线程中断
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
//执行 before 钩子函数
beforeExecute(wt, task);
Throwable thrown = null;
try {
//同步执行任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//执行 after 钩子函数,如果这里抛出异常,会覆盖 catch 的异常
//所以这里异常最好不要抛出来
afterExecute(task, thrown);
}
} finally {
//任务执行完成,计算解锁
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
//做一些抛出异常的善后工作
processWorkerExit(w, completedAbruptly);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
/**
* run 方法可以直接被调用
* 也可以由线程池进行调用
*/
public void run() {
// 状态不是任务创建,或者当前任务已经有线程在执行了
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
// Callable 不为空,并且已经初始化完成
if (c != null && state == NEW) {
V result;
boolean ran;
try {
// 调用执行
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
// 给 outcome 赋值
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}

线程执行完任务之后:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// 从阻塞队列中拿任务
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?

for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

//线程池关闭 && 队列为空,不需要在运行了,直接放回
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}

int wc = workerCountOf(c);

// Are workers subject to culling?
// true 运行的线程数大于 coreSize || 核心线程也可以被灭亡
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

// 队列以 LinkedBlockingQueue 为例,timedOut 为 true 的话说明下面 poll 方法执行返回的是 null
// 说明在等待 keepAliveTime 时间后,队列中仍然没有数据
// 说明此线程已经空闲了 keepAliveTime 了
// 再加上 wc > 1 || workQueue.isEmpty() 的判断
// 所以使用 compareAndDecrementWorkerCount 方法使线程池数量减少 1
// 并且直接 return,return 之后,此空闲的线程会自动被回收
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}

try {
// 从队列中阻塞拿 worker
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
// 设置已超时,说明此时队列没有数据
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}

常见问题

父子任务使用一个线程池:

向线程池中提交了一个任务,然后在这个任务的内部实现中又往同一个线程池中再次提交了一个任务。

相当于父子任务在同一个线程池中执行,出现线程死锁也就是循环等待的情况。

父任务全部处于执行状态,这时候子任务想要执行需要等父任务执行完成,但是父任务都执行不完。

因为还有个子任务没完成,即父任务等待子任务执行完成,而子任务等待父任务释放线程池资源,这也就造成了死锁。