Tomcat的线程池Executor
除了实现Lifecycle
接口外,基本和JDK的ThreadPoolExecutor
一致。
以前是直接继承了JDK的
ThreadPoolExecutor
,并改写部分逻辑。在最新的代码上(
Tomcat 10,2021.7.22
以后),甚至是直接抄了一份,改写部分逻辑,然后再通过组合的方式使用。
主要区别是线程工厂、任务队列和拒绝策略上,先看看JDK线程池的执行策略,以及在这几个方面有什么缺陷。
JDK的线程池执行流程
首先需要知道线程池的几个基本概念:
核心线程数(corePoolSize)、最大线程数(maximumPoolSize)、阻塞任务队列、拒绝策略。
JDK的线程池执行流程如下,当有新任务来临时:
首先检测线程池运行状态,如果不是
RUNNING
,则直接拒绝,线程池要保证在RUNNING的状态下执行任务。如果
workerCount < corePoolSize
。
- 则创建并启动一个线程来执行新提交的任务。
如果
workerCount >= corePoolSize
,且线程池内的阻塞队列未满。
- 则将任务添加到该阻塞队列中。
如果
workerCount >= corePoolSize && workerCount < maximumPoolSize
,且线程池内的阻塞队列已满。
- 则创建并启动一个线程来执行新提交的任务。
如果
workerCount >= maximumPoolSize
,并且线程池内的阻塞队列已满。
- 则根据拒绝策略来处理该任务, 默认的处理方式是直接抛异常。
原生线程池在达到核心线程数时,是优先添加队列,这样比较适合CPU密集型任务(认为新建线程不如让任务排队)。
Tomcat面临的问题
但是Tomcat是属于IO密集型任务,在Tomcat看来,原生的线程池主要有两个问题:
1. 阻塞任务队列
JDK可选的几个阻塞任务队列无非是:
LinkedBlockingQueue、ArrayBlockingQueue、SynchronousQueue、LinkedTransferQueue
。
- 后两个显然不适合,候选只有前两个:链表和数组。
对于原生LinkedBlockingQueue,无界,那么线程池就不会满足上述第4步的条件,线程会在达到corePoolSize后不再新建,而是一直加入队列。
而作为IO密集型的Tomcat,显然是希望此时创建新线程。
对于原生ArrayBlockingQueue,有界,但是线程池在第5步的时候,就会执行拒绝策略。
2. 拒绝策略
当满足第5步的时候,原生线程池就会执行拒绝策略,具体来说是
j.u.c.RejectedExecutionHandler
接口。JDK默认了四个实现:
public interface RejectedExecutionHandler {
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
- 抛出异常(默认)
- 直接丢弃
- 丢弃最早的任务
- 交给调用者执行
Tomcat的线程池
因为Tomcat的任务属于IO密集型,大概率不会长时间占用CPU资源。
即期望任务堆积时,优先创建线程来处理,而不是入队,但是又不想任务被丢弃或交给调用者处理(想始终交给线程池处理)。
所以做了如下改造:
阻塞任务队列继承了
LinkedBlockingQueue
(无界),但是自身持有ThreadPoolExecutor
的引用,又改写了插入方法:
public class TaskQueue extends LinkedBlockingQueue<Runnable> {
private transient volatile ThreadPoolExecutor parent = null;
@Override
public boolean offer(Runnable o) {
//we can't do any checks
if (parent==null) {
return super.offer(o);
}
//we are maxed out on threads, simply queue the object
// 核心线程数=最大线程数,无脑添加
if (parent.getPoolSize() == parent.getMaximumPoolSize()) {
return super.offer(o);
}
//we have idle threads, just add it to the queue
// 有空闲线程(即核心线程有空闲的),则添加,核心线程自己会去取任务执行
if (parent.getSubmittedCount()<=(parent.getPoolSize())) {
return super.offer(o);
}
//if we have less threads than maximum force creation of a new thread
// 核心线程数小于最大线程数,这里改写了JDK线程池的第4步逻辑,不是等队列满了再新建线程,而是优先新建线程
if (parent.getPoolSize()<parent.getMaximumPoolSize()) {
return false;
}
//if we reached here, we need to add it to the queue
// 其他情况,无脑添加
return super.offer(o);
}
}
当活跃线程数达到最大线程数,即不能再创建新线程时,将执行拒绝策略。
这里对应原生JDK的第5步:
如果
workerCount >= maximumPoolSize
,并且线程池内的阻塞队列已满。
- 则根据拒绝策略来处理该任务,默认的处理方式是直接抛异常。
但此时队列其实是没满的,只是满足了上述最后一个
if (parent.getPoolSize()<parent.getMaximumPoolSize())
。
- 直接返回的false。
Tomcat的处理也是抛出异常,但是在异常处理时,又强制将任务插入队列:
@Override
public void execute(Runnable command) {
submittedCount.incrementAndGet();
try {
executeInternal(command);
} catch (RejectedExecutionException rx) {
if (getQueue() instanceof TaskQueue) {
// If the Executor is close to maximum pool size, concurrent
// calls to execute() may result (due to Tomcat's use of
// TaskQueue) in some tasks being rejected rather than queued.
// If this happens, add them to the queue.
final TaskQueue queue = (TaskQueue) getQueue();
if (!queue.force(command)) {
submittedCount.decrementAndGet();
throw new RejectedExecutionException(sm.getString("threadPoolExecutor.queueFull"));
}
} else {
submittedCount.decrementAndGet();
throw rx;
}
}
}
所谓强制插入force其实就是直接
super.offer
而已。
总结下来就是Tomcat的线程池总是优先尝试新建线程,如果达到上限了,再尝试将任务放入阻塞队列。
- 由于是IO密集型任务,执行时间一般都不会太长,所以阻塞队列大概率不会排队太多造成
OOM
。
此外Tomcat还自定义了线程工厂,这个比较简单,只是在新建线程时,将调用工厂的类加载器传递给线程上下文加载器:
@Override
public Thread newThread(Runnable r) {
TaskThread t = new TaskThread(group, r, namePrefix + threadNumber.getAndIncrement());
t.setDaemon(daemon);
t.setPriority(threadPriority);
// Set the context class loader of newly created threads to be the
// class loader that loaded this factory. This avoids retaining
// references to web application class loaders and similar.
t.setContextClassLoader(getClass().getClassLoader());
return t;
}
测试验证
写个简单的程序测试下:线程池固定核心线程数1,最大线程数5,一次提交10个任务。
JDK线程池
public class ThreadPoolTest {
public static void main(String[] args) {
// 构造线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 5, 5, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));
for (int i = 0; i < 10; i++) {
executor.execute(() -> {
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {}
});
}
System.out.println("active: " + executor.getActiveCount());
System.out.println("queue: " + executor.getQueue().size());
}
}
输出:
active: 1
queue: 9
在队列可用的前提下,JDK线程池优先让任务排队。
Tomcat线程池
public class ThreadPoolTest {
public static void main(String[] args) {
// 阻塞队列使用Tomcat的TaskQueue
TaskQueue queue = new TaskQueue();
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 5, 5, TimeUnit.SECONDS, queue);
// 需要设置parent引用
queue.setParent(executor);
for (int i = 0; i < 10; i++) {
executor.execute(() -> {
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {}
});
}
System.out.println("active: " + executor.getActiveCount());
System.out.println("queue: " + executor.getQueue().size());
}
}
输出:
active: 5
queue: 5
在线程数未达到最大线程数时,Tomcat线程池优先创建线程执行任务。
总结
简单来说:原生JDK的线程池是优先将任务添加到阻塞队列,等队列满再尝试创建线程,适合IO密集型任务。
Tomcat属于IO密集型,所以总是优先尝试新建线程,线程池满载了,再添加任务到阻塞队列里排队等待。
Dubbo中也有极其相似的处理:EagerThreadPoolExecutor。
不过个人感觉这种处理方式不太好,在catch块中执行逻辑总觉得不太合适,感觉单独写个reject接口实现来处理比较好,类似如下:
private static class ForceAdd implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
BlockingQueue<Runnable> queue = executor.getQueue();
if (queue instanceof TaskQueue) {
final TaskQueue q = (TaskQueue) queue;
if (!q.force(r)) {
executor.submittedCount.decrementAndGet();
throw new RejectedExecutionException(sm.getString("threadPoolExecutor.queueFull"));
}
} else {
executor.submittedCount.decrementAndGet();
throw new RejectedExecutionException(sm.getString("threadPoolExecutor.queueFull"));
}
}
}
不过Tomcat和Dubbo都采用在catch块中处理,暂未找到这样做的相关原因描述。