为什么要使用线程池
线程池可以解决线程生命周期的系统开销问题,同时还可以加快响应速度:
因为线程池中的线程是可以复用的,我们只用少量的线程去执行大量的任务,不用频繁创建线程。
线程池可以管理内存和 CPU 的使用,避免资源使用不当:
线程池可以统一管理资源:
线程池参数
1 2 3 4 5 6 7 public ThreadPoolExecutor (int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
corePoolSize:
线程池的核心线程数量,即线程池中可以保持活动状态的最小线程数。
当提交的任务数超过核心线程数量时,线程池可以创建更多的线程来执行任务。
maximumPoolSize:
线程池中允许同时存在的最大线程数量,即当任务数超过核心线程数量时,线程池可以创建的最大线程数量。
当线程池中的线程数量已经达到最大值且任务队列已满时,可以采取拒绝策略处理过多的任务。
可以在创建线程池时指定最大线程数,也可以在运行时动态地修改。
keepAliveTime:
线程空闲后的存活时间。
当线程池中的线程数量超过核心线程数时,空闲的线程会在等待新任务到来的过程中等待一段时间。
如果等待时间超过了 keepAliveTime
,则该线程会被销毁,以释放资源。
unit:
keepAliveTime 的单位,可以是毫秒、微秒、纳秒等等。
workQueue:
threadFactory:
线程工厂,用于创建线程的工厂,可以自定义实现。
可以选择使用默认的线程工厂,创建的线程都会在同一个线程组,并拥有一样的优先级,且都不是守护线程。
也可以选择自己定制线程工厂,以方便给线程自定义命名,不同的线程池内的线程通常会根据具体业务来定制不同的线程名。
handler:
在创建了线程池后,线程池中的线程数为零。
当调用 execute()
方法添加一个请求任务时,线程池会做出如下判断:
如果正在运行的线程数量 小于 corePoolSize
,那么马上创建线程运行这个任务。
如果正在运行的线程数量 大于或等于 corePoolSize
,那么将这个任务放入队列。
如果这个时候队列满了且正在运行的线程数量还 小于 maximumPoolSize
,那么还是要创建非核心线程立刻运行这个任务。
如果队列满了且正在运行的线程数量 大于或等于 maximumPoolSize
,那么线程池会启动饱和拒绝策略来执行。
当一个线程完成任务时,它会从队列中取下一个任务来执行。
当一个线程无事可做超过一定的时间(keepAliveTime
)时,线程会判断:
如果当前运行的线程数大于 corePoolSize
,那么这个线程就被停掉。
所以线程池的所有任务完成后,它最终会收缩到corePoolSize
的大小。
拒绝策略
线程池会在以下两种情况下会拒绝新提交的任务:
当我们调用 shutdown 等方法关闭线程池后:
即便此时可能线程池内部依然有没执行完的任务正在执行,但由于线程池已经关闭。
此时如果再向线程池内提交任务,就会遭到拒绝。
线程池没有能力继续处理新提交的任务,也就是工作已经非常饱和的时候。
4种默认的拒绝策略:
第一种拒绝策略是 AbortPolicy :
在拒绝任务时,会直接抛出一个类型为 RejectedExecutionException 的 RuntimeException。
让你感知到任务被拒绝了,可以根据业务逻辑选择重试或者放弃提交等策略。
第二种拒绝策略是 DiscardPolicy :
当新任务被提交后直接被丢弃掉,也不会给你任何的通知,存在一定的风险。
因为我们提交的时候根本不知道这个任务会被丢弃,可能造成数据丢失。
第三种拒绝策略是 DiscardOldestPolicy :
如果线程池没被关闭且没有能力执行,会丢弃任务队列中的头结点。
通常是存活时间最长的任务,这样可以腾出空间给新提交的任务。
第四种拒绝策略是 CallerRunsPolicy :
当有新任务提交后,如果线程池没被关闭且没有能力执行。
则把这个任务交于提交任务的线程执行,也就是谁提交任务,谁就负责执行任务,这样做主要有两点好处:
第一点:新提交的任务不会被丢弃,这样也就不会造成业务损失。
第二点:由于谁提交任务谁就要负责执行任务,这样提交任务的线程就得负责执行任务,而执行任务又是比较耗时的。
在这段期间,提交任务的线程被占用,也就不会再提交新的任务,减缓了任务提交的速度。
以上内置的策略均实现了 RejectedExecutionHandler 接口 。
可以自己扩展 RejectedExecutionHandler
接口,定义自己的拒绝策略。
使用CallerRunsPolicy顺序问题:
当线程数 线程池的最大线程数 并且阻塞队列已满 的情况下,后到的数据会执行拒绝策略。
让调用线程(提交任务的线程)直接执行此任务,导致数据处理顺序不一致。
当在多线程中数据处理时需要强关联数据时间顺序时,最好考虑一下其他的处理方式,避免踩坑。
设置线程数
公式:
最佳线程数目 = ((线程等待时间+线程CPU时间)/ 线程CPU时间 )* CPU数目
。
举例:
服务器CPU
核数为4核,一个任务线程CPU
耗时为20ms,线程等待(网络IO、磁盘IO)耗时80ms。
最佳线程数目:( 80 + 20 )/20 * 4 = 20
线程数设置多大,是根据我们自身的业务的,需要自己去压力测试,设置一个合理的数值。
CPU密集型:
操作内存处理的业务,一般线程数设置为:CPU核数 + 1 或者 CPU核数*2
,核数为4的话,一般设置 5 或 8
CPU
密集任务只有在真正的多核CPU
上才可能得到加速(通过多线程),而在单核CPU
上。
无论你开几个模拟的多线程该任务都不可能得到加速,因为CPU
总的运算能力就那些。
IO密集型:
常见线程池
FixedThreadPool:
它的核心线程数和最大线程数是一样的,线程池中的线程数除了初始阶段需要从 0 开始增加外,之后的线程数量就是固定的。
就算任务数超过线程数,线程池也不会再创建更多的线程来处理任务,而是会把超出线程处理能力的任务放到任务队列中进行等待。
而且就算任务队列满了,也无法再增加新的线程了。
BlockingQueue选取的是LinkedBlockingQueue。
CachedThreadPool:
它的特点在于线程数是几乎可以无限增加的(实际最大可以达到 Integer.MAX_VALUE
,为 2^31-1
)。
它有一个用于存储提交任务的队列,但这个队列是 SynchronousQueue,队列的容量为0。
实际不存储任何任务,它只负责对任务进行中转和传递,所以效率比较高。
ScheduledThreadPool:
ScheduledThreadPool,它支持定时或周期性执行任务。
BlockingQueue选取的是延迟队列DelayedWorkQueue。
SingleThreadExecutor:
它会使用唯一的一个线程去执行任务,如果线程在执行任务的过程中发生异常,线程池也会重新创建一个线程来执行后续的任务。
这种线程池由于只有一个线程,所以非常适合用于所有任务都需要按被提交的顺序依次执行的场景。
BlockingQueue选取的是LinkedBlockingQueue。
ForkJoinPool
它非常适合执行可以产生子任务的任务。
适合用于递归的场景,例如树的遍历、最优路径搜索等场景 。
关闭线程池
shutdown():
调用 shutdown()
方法之后线程池并不是立刻就被关闭,线程池会在执行完正在执行的任务和队列中等待的任务后才彻底关闭。
调用 shutdown()
方法后如果还有新的任务被提交,线程池则会根据拒绝策略直接拒绝后续新提交的任务。
shutdownNow():
首先会给所有线程池中的线程发送 interrupt 中断信号,尝试中断这些任务的执行。
队列中正在等待的所有任务转移到一个 List 中并返回,可根据返回的任务 List 来进行一些补救的操作。
线程异常
当一个线程池里面的线程异常后:
当执行方式是execute时:
会抛出异常,线程池会把这个线程移除掉,并创建一个新的线程放到线程池中。
当执行方式是submit时:
会抛出异常,可以捕获到异常,不会把这个线程移除掉,也不会创建新的线程放入到线程池中。
以上俩种执行方式,都不会影响线程池里面其他线程的正常执行。
线程复用原理
在线程池中,同一个线程可以从 BlockingQueue
中不断提取新任务来执行。
线程池对 Thread 进行了封装,并不是每次执行任务都会调用 Thread.start()
来创建新线程
实现线程复用 的逻辑主要在一个不停循环的 while
循环体中
ThreadPoolExecutor重要属性
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 private final AtomicInteger ctl = new AtomicInteger (ctlOf(RUNNING, 0 ));private static final int COUNT_BITS = Integer.SIZE - 3 ;private static final int CAPACITY = (1 << COUNT_BITS) - 1 ; private static int ctlOf (int rs, int wc) { return rs | wc; }private static int workerCountOf (int c) { return c & CAPACITY; }private static int runStateOf (int c) { return c & ~CAPACITY; } private static final int RUNNING = -1 << COUNT_BITS;private static final int SHUTDOWN = 0 << COUNT_BITS;private static final int STOP = 1 << COUNT_BITS;private static final int TIDYING = 2 << COUNT_BITS;private static final int TERMINATED = 3 << COUNT_BITS; volatile long completedTasks;private int largestPoolSize;private long completedTaskCount;private volatile ThreadFactory threadFactory;private volatile RejectedExecutionHandler handler;private volatile long keepAliveTime;private volatile boolean allowCoreThreadTimeOut;private volatile int corePoolSize;private volatile int maximumPoolSize;private static final RejectedExecutionHandler defaultHandler = new AbortPolicy (); private final BlockingQueue<Runnable> workQueue; private final ReentrantLock mainLock = new ReentrantLock ();private final Condition termination = mainLock.newCondition(); private final HashSet<Worker> workers = new HashSet <Worker>();
Worker可以理解成线程池中任务运行的最小单元:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 private final class Worker extends AbstractQueuedSynchronizer implements Runnable { final Thread thread; Runnable firstTask; Worker(Runnable firstTask) { setState(-1 ); this .firstTask = firstTask; this .thread = getThreadFactory().newThread(this ); } public void run () { runWorker(this ); } private static final long serialVersionUID = 6138294804551838833L ; protected boolean isHeldExclusively () { return getState() != 0 ; } protected boolean tryAcquire (int unused) { if (compareAndSetState(0 , 1 )) { setExclusiveOwnerThread(Thread.currentThread()); return true ; } return false ; } protected boolean tryRelease (int unused) { setExclusiveOwnerThread(null ); setState(0 ); return true ; } public void lock () { acquire(1 ); } public boolean tryLock () { return tryAcquire(1 ); } public void unlock () { release(1 ); } public boolean isLocked () { return isHeldExclusively(); } void interruptIfStarted () { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }
在线程池中,最小的执行单位就是 Worker,所以 Worker 实现了 Runnable 接口,实现了 run 方法。
Worker 本身也实现了 AQS,所以其本身也是一个锁,其在执行任务的时候,会锁住自己,任务执行完成之后,会释放自己。
线程池的任务提交
线程池的任务提交从 submit 方法 开始,主要做了两件事情:
把 Runnable 和 Callable 都转化成 FutureTask。
使用 execute 方法执行 FutureTask。
在 execute 方法中,多次调用 addWorker 方法把任务传入,Worker 内部有一个 Thread 对象。
它正是最终真正执行任务的线程,所以一个 Worker 就对应线程池中的一个线程,addWorker 就代表增加线程。
线程复用的逻辑实现主要在 Worker 类中的 run 方法里执行的 runWorker 方法中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public void execute (Runnable command) { if (command == null ) throw new NullPointerException (); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true )) return ; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (!isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0 ) addWorker(null , false ); } else if (!addWorker(command, false )) reject(command); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 private boolean addWorker (Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false ; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false ; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); if (runStateOf(c) != rs) continue retry; } } boolean workerStarted = false ; boolean workerAdded = false ; Worker w = null ; try { w = new Worker (firstTask); final Thread t = w.thread; if (t != null ) { final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null )) { if (t.isAlive()) throw new IllegalThreadStateException (); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true ; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true ; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
1 2 3 public void run () { runWorker(this ); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 final void runWorker (Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null ; w.unlock(); boolean completedAbruptly = true ; try { while (task != null || (task = getTask()) != null ) { w.lock(); if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null ; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error (x); } finally { afterExecute(task, thrown); } } finally { task = null ; w.completedTasks++; w.unlock(); } } completedAbruptly = false ; } finally { processWorkerExit(w, completedAbruptly); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 public void run () { if (state != NEW || !UNSAFE.compareAndSwapObject(this , runnerOffset, null , Thread.currentThread())) return ; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true ; } catch (Throwable ex) { result = null ; ran = false ; setException(ex); } if (ran) set(result); } } finally { runner = null ; int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
线程执行完任务之后:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 private Runnable getTask () { boolean timedOut = false ; for (;;) { int c = ctl.get(); int rs = runStateOf(c); if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null ; } int wc = workerCountOf(c); boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null ; continue ; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null ) return r; timedOut = true ; } catch (InterruptedException retry) { timedOut = false ; } } }
常见问题
父子任务使用一个线程池:
向线程池中提交了一个任务,然后在这个任务的内部实现中又往同一个线程池中再次提交了一个任务。
相当于父子任务在同一个线程池中执行,出现线程死锁也就是循环等待的情况。
父任务全部处于执行状态,这时候子任务想要执行需要等父任务执行完成,但是父任务都执行不完。
因为还有个子任务没完成,即父任务等待子任务执行完成,而子任务等待父任务释放线程池资源,这也就造成了死锁。