publicclassTaskQueueextendsLinkedBlockingQueue<Runnable> { privatetransientvolatileThreadPoolExecutorparent=null; @Override publicbooleanoffer(Runnable o) { //we can't do any checks if (parent==null) { returnsuper.offer(o); } //we are maxed out on threads, simply queue the object // 核心线程数=最大线程数,无脑添加 if (parent.getPoolSize() == parent.getMaximumPoolSize()) { returnsuper.offer(o); } //we have idle threads, just add it to the queue // 有空闲线程(即核心线程有空闲的),则添加,核心线程自己会去取任务执行 if (parent.getSubmittedCount()<=(parent.getPoolSize())) { returnsuper.offer(o); } //if we have less threads than maximum force creation of a new thread // 核心线程数小于最大线程数,这里改写了JDK线程池的第4步逻辑,不是等队列满了再新建线程,而是优先新建线程 if (parent.getPoolSize()<parent.getMaximumPoolSize()) { returnfalse; } //if we reached here, we need to add it to the queue // 其他情况,无脑添加 returnsuper.offer(o); } }
@Override publicvoidexecute(Runnable command) { submittedCount.incrementAndGet(); try { executeInternal(command); } catch (RejectedExecutionException rx) { if (getQueue() instanceof TaskQueue) { // If the Executor is close to maximum pool size, concurrent // calls to execute() may result (due to Tomcat's use of // TaskQueue) in some tasks being rejected rather than queued. // If this happens, add them to the queue. finalTaskQueuequeue= (TaskQueue) getQueue(); if (!queue.force(command)) { submittedCount.decrementAndGet(); thrownewRejectedExecutionException(sm.getString("threadPoolExecutor.queueFull")); } } else { submittedCount.decrementAndGet(); throw rx; } } }
@Override public Thread newThread(Runnable r) { TaskThreadt=newTaskThread(group, r, namePrefix + threadNumber.getAndIncrement()); t.setDaemon(daemon); t.setPriority(threadPriority); // Set the context class loader of newly created threads to be the // class loader that loaded this factory. This avoids retaining // references to web application class loaders and similar. t.setContextClassLoader(getClass().getClassLoader()); return t; }