Tomcat线程池

月伴飞鱼 2024-11-03 16:05:21
公众号文章
支付宝打赏 微信打赏

如果文章对你有帮助,欢迎点击上方按钮打赏作者!

Tomcat的线程池Executor除了实现Lifecycle接口外,基本和JDK的ThreadPoolExecutor一致。

以前是直接继承了JDK的ThreadPoolExecutor,并改写部分逻辑。

在最新的代码上(Tomcat 10,2021.7.22以后),甚至是直接抄了一份,改写部分逻辑,然后再通过组合的方式使用。

主要区别是线程工厂、任务队列和拒绝策略上,先看看JDK线程池的执行策略,以及在这几个方面有什么缺陷。

JDK的线程池执行流程

首先需要知道线程池的几个基本概念:

核心线程数(corePoolSize)、最大线程数(maximumPoolSize)、阻塞任务队列、拒绝策略。

JDK的线程池执行流程如下,当有新任务来临时:

首先检测线程池运行状态,如果不是RUNNING,则直接拒绝,线程池要保证在RUNNING的状态下执行任务。

如果workerCount < corePoolSize

  • 则创建并启动一个线程来执行新提交的任务。

如果workerCount >= corePoolSize,且线程池内的阻塞队列未满。

  • 则将任务添加到该阻塞队列中。

如果workerCount >= corePoolSize && workerCount < maximumPoolSize,且线程池内的阻塞队列已满。

  • 则创建并启动一个线程来执行新提交的任务。

如果workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满。

  • 则根据拒绝策略来处理该任务, 默认的处理方式是直接抛异常。

原生线程池在达到核心线程数时,是优先添加队列,这样比较适合CPU密集型任务(认为新建线程不如让任务排队)。

Tomcat面临的问题

但是Tomcat是属于IO密集型任务,在Tomcat看来,原生的线程池主要有两个问题:

1. 阻塞任务队列

JDK可选的几个阻塞任务队列无非是:

LinkedBlockingQueue、ArrayBlockingQueue、SynchronousQueue、LinkedTransferQueue

  • 后两个显然不适合,候选只有前两个:链表和数组。

对于原生LinkedBlockingQueue,无界,那么线程池就不会满足上述第4步的条件,线程会在达到corePoolSize后不再新建,而是一直加入队列。

而作为IO密集型的Tomcat,显然是希望此时创建新线程。

对于原生ArrayBlockingQueue,有界,但是线程池在第5步的时候,就会执行拒绝策略。

2. 拒绝策略

当满足第5步的时候,原生线程池就会执行拒绝策略,具体来说是j.u.c.RejectedExecutionHandler接口。

JDK默认了四个实现:

public interface RejectedExecutionHandler {
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
  • 抛出异常(默认)
  • 直接丢弃
  • 丢弃最早的任务
  • 交给调用者执行

Tomcat的线程池

因为Tomcat的任务属于IO密集型,大概率不会长时间占用CPU资源。

即期望任务堆积时,优先创建线程来处理,而不是入队,但是又不想任务被丢弃或交给调用者处理(想始终交给线程池处理)。

所以做了如下改造:

阻塞任务队列继承了LinkedBlockingQueue(无界),但是自身持有ThreadPoolExecutor的引用,又改写了插入方法:

public class TaskQueue extends LinkedBlockingQueue<Runnable> {
    private transient volatile ThreadPoolExecutor parent = null;   
 
    @Override
    public boolean offer(Runnable o) {
      //we can't do any checks
        if (parent==null) {
            return super.offer(o);
        }
        //we are maxed out on threads, simply queue the object
        // 核心线程数=最大线程数,无脑添加
        if (parent.getPoolSize() == parent.getMaximumPoolSize()) {
            return super.offer(o);
        }
        //we have idle threads, just add it to the queue
        // 有空闲线程(即核心线程有空闲的),则添加,核心线程自己会去取任务执行
        if (parent.getSubmittedCount()<=(parent.getPoolSize())) {
            return super.offer(o);
        }
        //if we have less threads than maximum force creation of a new thread
        // 核心线程数小于最大线程数,这里改写了JDK线程池的第4步逻辑,不是等队列满了再新建线程,而是优先新建线程
        if (parent.getPoolSize()<parent.getMaximumPoolSize()) {
            return false;
        }
        //if we reached here, we need to add it to the queue
        // 其他情况,无脑添加
        return super.offer(o);
    }
}

当活跃线程数达到最大线程数,即不能再创建新线程时,将执行拒绝策略。

这里对应原生JDK的第5步:

如果workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满。

  • 则根据拒绝策略来处理该任务,默认的处理方式是直接抛异常。

但此时队列其实是没满的,只是满足了上述最后一个if (parent.getPoolSize()<parent.getMaximumPoolSize())

  • 直接返回的false。

Tomcat的处理也是抛出异常,但是在异常处理时,又强制将任务插入队列:

@Override
public void execute(Runnable command) {
    submittedCount.incrementAndGet();
    try {
        executeInternal(command);
    } catch (RejectedExecutionException rx) {
        if (getQueue() instanceof TaskQueue) {
            // If the Executor is close to maximum pool size, concurrent
            // calls to execute() may result (due to Tomcat's use of
            // TaskQueue) in some tasks being rejected rather than queued.
            // If this happens, add them to the queue.
            final TaskQueue queue = (TaskQueue) getQueue();
            if (!queue.force(command)) {
                submittedCount.decrementAndGet();
                throw new RejectedExecutionException(sm.getString("threadPoolExecutor.queueFull"));
            }
        } else {
            submittedCount.decrementAndGet();
            throw rx;
        }
    }
}

所谓强制插入force其实就是直接super.offer而已。

总结下来就是Tomcat的线程池总是优先尝试新建线程,如果达到上限了,再尝试将任务放入阻塞队列。

  • 由于是IO密集型任务,执行时间一般都不会太长,所以阻塞队列大概率不会排队太多造成OOM

此外Tomcat还自定义了线程工厂,这个比较简单,只是在新建线程时,将调用工厂的类加载器传递给线程上下文加载器:

@Override
public Thread newThread(Runnable r) {
    TaskThread t = new TaskThread(group, r, namePrefix + threadNumber.getAndIncrement());
    t.setDaemon(daemon);
    t.setPriority(threadPriority);
 
    // Set the context class loader of newly created threads to be the
    // class loader that loaded this factory. This avoids retaining
    // references to web application class loaders and similar.
     
    t.setContextClassLoader(getClass().getClassLoader());
    return t;
}

测试验证

写个简单的程序测试下:线程池固定核心线程数1,最大线程数5,一次提交10个任务。

JDK线程池

public class ThreadPoolTest {
    public static void main(String[] args) {
        // 构造线程池
        ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 5, 5, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));
        for (int i = 0; i < 10; i++) {
            executor.execute(() -> {
                try {
                    TimeUnit.SECONDS.sleep(5);
                } catch (InterruptedException e) {}
            });
        }
        System.out.println("active: " + executor.getActiveCount());
        System.out.println("queue: " + executor.getQueue().size());
    }
}

输出:

active: 1

queue: 9

在队列可用的前提下,JDK线程池优先让任务排队。

Tomcat线程池

public class ThreadPoolTest {
    public static void main(String[] args) {
        // 阻塞队列使用Tomcat的TaskQueue
        TaskQueue queue = new TaskQueue();
        ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 5, 5, TimeUnit.SECONDS, queue);
        // 需要设置parent引用
        queue.setParent(executor);
        for (int i = 0; i < 10; i++) {
            executor.execute(() -> {
                try {
                    TimeUnit.SECONDS.sleep(5);
                } catch (InterruptedException e) {}
            });
        }
        System.out.println("active: " + executor.getActiveCount());
        System.out.println("queue: " + executor.getQueue().size());
    }
}

输出:

active: 5

queue: 5

在线程数未达到最大线程数时,Tomcat线程池优先创建线程执行任务。

总结

简单来说:原生JDK的线程池是优先将任务添加到阻塞队列,等队列满再尝试创建线程,适合IO密集型任务。

Tomcat属于IO密集型,所以总是优先尝试新建线程,线程池满载了,再添加任务到阻塞队列里排队等待。

Dubbo中也有极其相似的处理:EagerThreadPoolExecutor。

https://github.com/apache/dubbo/tree/master/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/support/eager

不过个人感觉这种处理方式不太好,在catch块中执行逻辑总觉得不太合适,感觉单独写个reject接口实现来处理比较好,类似如下:

private static class ForceAdd implements RejectedExecutionHandler {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        BlockingQueue<Runnable> queue = executor.getQueue();
        if (queue instanceof TaskQueue) {
            final TaskQueue q = (TaskQueue) queue;
            if (!q.force(r)) {
                executor.submittedCount.decrementAndGet();
                throw new RejectedExecutionException(sm.getString("threadPoolExecutor.queueFull"));
            }
        } else {
            executor.submittedCount.decrementAndGet();
            throw new RejectedExecutionException(sm.getString("threadPoolExecutor.queueFull"));
        }
    }
}

不过Tomcat和Dubbo都采用在catch块中处理,暂未找到这样做的相关原因描述。

支付宝打赏 微信打赏

如果文章对你有帮助,欢迎点击上方按钮打赏作者!