java中线程池使用时,常用类结构
正确使用线程池姿势
线程池根据不同业务场景设置不同的线程池。不要整个系统用一个线程池。因为其他业务场景占用线程过多,影响其他业务处理
线程池方法
Executor有一个重要子接口ExecutorService,其中定义了线程池的具体行为
1.execute(Runnable command):履行Ruannable类型的任务,
2.submit(task):可用来提交Callable或Runnable任务,并返回代表此任务的Future对象
3.shutdown():在完成已提交的任务后关闭线程池,不再接管新任务,
4.shutdownNow():停止所有正在履行的任务并立刻关闭线程池。
5.isTerminated():测试所有任务是否都执行完毕。
6.isShutdown():测试是否该ExecutorService已被关闭。
一、线程与线程池性能对比
线程池:线程缓存的实现,线程是稀缺资源,避免线程被频繁的创建销毁。
例如:在web应用中,服务器接受处理请求,并为请求分配一个线程进行处理,每个新请求创建一个线程,实现简单,但存在问题。
线程池优势
复用存在的线程,减少线程创建、消亡的开销,提高性能。
可以提高响应速度。(当任务到达时,不需要等待线程创建,使用现有线程,就可以立即执行)
提高线程的可管理性。(线程池统一管理、分配线程;避免无限创建)
二、JDK自带线程池
Executors.newCachedThreadPool()
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
问题:高并发时,可能导致大量创建线程,导致cup达到100%。因为队列不缓存任务,所有任务都需要对应一个线程。
Executors.newFixedThreadPool(10)
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
Executors.newSingleThreadExecutor()
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
Executors.newScheduledThreadPool(5)
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
ScheduledThreadPoolExecutor继承ThreadPoolExecutor,内部使用DelayedWorkQueue延迟队列
//利用ThreadPoolExecutor初始化
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
//任务被封装为ScheduledFutureTask,底层执行任务依靠addWorker方法,定时依赖延迟队列
线程池中的队列
SynchronousQueue(无缓冲)
SynchronousQueue:无缓冲等待队列,是一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,
拥有公平(FIFO)和非公平(LIFO)策略,非公平策略会导致一些数据永远无法被消费的情况?
使用SynchronousQueue阻塞队列一般要求maximumPoolSizes为Integer.MAX_VALUE,避免线程拒绝执行操作
LinkedBlockingQueue(无界)
按FIFO排序任务,吞吐量通常要高于ArrayBlockingQuene
LinkedBlockingQueue:无界队列,当前执行的线程数量达到corePoolSize的数量时,剩余的元素会在阻塞队列里等待。(所以在使用此阻塞队列时maximumPoolSizes参数就相当于无效),每个线程完全独立于其他线程。生产者和消费者使用独立的锁来控制数据的同步,即在高并发的情况下可以并行操作队列中的数据。
可指定队列大小,默认队列大小Integer.MAX_VALUE
ArrayBlockingQueue(有界)
ArrayBlockingQueue:有界队列,可以指定缓存队列的大小,
1.当正在执行的线程数等于corePoolSize时,多余的元素缓存在ArrayBlockingQueue队列中等待有空闲的线程时继续执行,
2.当ArrayBlockingQueue已满时,加入ArrayBlockingQueue失败,会开启新的线程去执行
3.当线程数达到最大的maximumPoolSizes时,再有新的元素尝试加入ArrayBlockingQueue时会执行拒绝策略
priorityBlockingQuene
具有优先级的无界阻塞队列
DelayedWorkQueue
底层为RunnableScheduledFuture类型的数组,初始容量为16
三、线程池应用
具体实现:ThreadPoolExecutor、ScheduledThreadPoolExecutor
ThreadPoolExecutor
核心参数
corePoolSize(核心线程数)
1.线程池中的核心线程数,当提交一个任务时,线程池创建一个新线程执行任务,直到当前线程数等于corePoolSize;
2.如果当前线程数为corePoolSize,继续提交的任务被保存到阻塞队列中,等待被执行;
3.如果执行了线程池的prestartAllCoreThreads()方法,线程池会提前创建并启动所有核心线程。
maximumPoolSize(最大线程数)
线程池中允许的最大线程数。如果当前阻塞队列满了,且继续提交任务,(如果当前线程数小于maximumPoolSize)则创建新的线程执行任务,
keepAliveTime(允许的空闲时间)
线程池非核心线程允许的空闲时间。当线程池中的线程数量大于corePoolSize的时,且队列中任务时,核心线程外的线程会等待超过了keepAliveTime,对额外的线程进行销毁;
unit(单位)
keepAliveTime的单位;
workQueue(阻塞队列)
用来保存等待被执行的任务的阻塞队列,且任务必须实现Runable接口,在JDK中提供了如下阻塞队列:
threadFactory(线程工厂)
继承ThreadFactory接口的变量,用来创建新线程。默认使用Executors.defaultThreadFactory() 来创建线程。
使用默认的ThreadFactory来创建线程时,新创建的线程具有相同的NORM_PRIORITY优先级并且是非守护线程,同时也设置线程的名称。
handler
线程池的饱和策略,阻塞队列放满了,且没有空闲的工作线程,如果继续提交任务,必须采取一种策略处理该任务,线程池提供了4种策略:
1、AbortPolicy:直接抛出异常,默认策略;
2、CallerRunsPolicy:用调用者所在的线程来执行任务;
3、DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
4、DiscardPolicy:直接丢弃任务;
上面的4种策略都是ThreadPoolExecutor的内部类。
也可以根据应用场景实现RejectedExecutionHandler接口,自定义饱和策略,如记录日志或持久化存储不能处理的任务。
四、源码解析
源码重点属性
//ctl=111000000000000000000000000000000
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//Interge.size = 32;故COUNT_BITS=29
private static final int COUNT_BITS = Integer.SIZE - 3;
//向左移动29位,0010 0000 0000 0000 0000 0000 0000 0000 - 1 = 0001 1111 1111 1111 1111 1111 1111 1111(29位1)
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
ctl是对线程池的运行状态、线程池有效线程数量(能记录2^29-1.约5亿)进行记录的一个字段。
使用Integer类型保存
高3位保存线程状态(runState),低29位保存线程数量(workerCount)
线程池状态
RUNNING[-536870912]
//111000000000000000000000000000000。
//状态说明:接受新任务或者处理队列中任务。
private static final int RUNNING = -1 << COUNT_BITS;
SHUTDOWN[0]
//000000000000000000000000000000000。
//状态切换:调用线程池的shutdown()方法。(RUNNING-->SHUTDOWN)
//状态说明:不再接受新任务,仅处理队列中的任务。
private static final int SHUTDOWN = 0 << COUNT_BITS;
STOP[536870912]
//001000000000000000000000000000000。
//状态切换:调用线程池的shutdownNow()方法。(RUNNING、SHUTDOWN-->STOP)
//状态说明:不再接受新任务,不再处理队列中的任务,并且中断当前执行的任务。
private static final int STOP = 1 << COUNT_BITS;
TIDYING[1073741824]
//01000000000000000000000000000000。
//状态切换:
当线程池在SHUTDOWN状态下,阻塞队列为空并且线程池中工作线程数为0时,就会由 SHUTDOWN -> TIDYING。
线程池在STOP状态下,线程池中工作线程数为0时,就会由STOP -> TIDYING。
//状态说明:所有任务都已终止,workerCount(任务数)为0,状态为TIDYING的线程将运行钩子函数terminated(),(terminated()方法为空,需要用户可自己实现)。
private static final int TIDYING = 2 << COUNT_BITS;
TERMINATED[1610612736]
//01100000000000000000000000000000。
//状态切换:线程池处在TIDYING状态时,执行完terminated()之后,就会由 TIDYING -> TERMINATED。
//状态说明:线程池彻底终止,就变成TERMINATED状态。
private static final int TERMINATED = 3 << COUNT_BITS;
ThreadPoolExecutor源码注释
execute()方法
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
//ctl记录线程池状态(runState)、线程数(workCount)
int c = ctl.get();
//1.计算当前线程数
//如果当前线程数【 < 核心线程数】,则创建新的线程。并把任务添加到线程中。
if (workerCountOf(c) < corePoolSize) {
//创建新线程成功后直接返回。
//第二个参数:(ture)标示添加线程与corePoolSize比较,否则与maximumPoolSize比较
if (addWorker(command, true))
return;
c = ctl.get();//添加失败,则重新获取ctl数据
}
//2.当线程数【 >= 核心线程数】,则执行到此处
//a.校验线程池状态,是否为运行状态
//b.尝试将任务加入队列,
//c.再次校验线程池状态(防止线程加入队列时发生变化),如果线程池shutdown,则丢弃任务,如果线程池无工作线程,则添加线程
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))//再次判断线程池运行状态,如果不是RUNNING状态,则进行移除操作。并执行拒绝策略
reject(command);
else if (workerCountOf(recheck) == 0)//获取线程池中有效线程数,如果为0,则执行addWorker方法。
addWorker(null, false);//第一个参数为null时,表示仅在线程池创建一个线程,用于消费队列中任务(task != null || (task = getTask()) != null)task==null,则从队列中获取任务。
}
//3.当队列放满,则执行到此处,有两种情况。
//a.线程池已经不是RUNNING状态
//b.是RUNNING状态,但线程数>核心线程数,且队列已满。(故addWorker方法,第二个参数出入false与maximumPoolSize比较)
else if (!addWorker(command, false))
reject(command);
}
execute()执行总结。
1.线程池未达到核心线程时(workerCount < corePoolSize),当有任务添加时,不断创建新线程作为核心线程;
2.当线程数达到核心线程时(workerCount >= corePoolSize)队列不满,则将任务放入队列中.
3.当队列满时(workerCount >= corePoolSize && workerCount < maximumPoolSize,且线程池内的阻塞队列已满),尝试创建非核心线程
4.如果workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满, 则根据拒绝策略来处理该任务, 默认的处理方式是直接抛异常
通过上面步骤可以看出。如果开始创建非核心线程时,队列任务被消费(即不满状态),新任务添加时,将任务添加到队列中。等队列满时,再次开始创建非核心线程
addWorker()方法
//校验是否满足增加的条件
//1.firstTask:用于指定新增的线程执行的【第一个任务】,firstTask==null时,代表仅增加执行线程。
//2.core:(ture)添加线程时与corePoolSize比较,否则与maximumPoolSize比较
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
//获取当前线程运行状态
int c = ctl.get();
int rs = runStateOf(c);
/*
* 如果rs >= SHUTDOWN(0),则表示此时不再接收新任务;
* 接着判断以下3个条件,只要有1个不满足,则返回false:
* 1. rs == SHUTDOWN,这时表示关闭状态,不再接受新提交的任务,但却可以继续处理阻塞队列中已保存的任务
* 2. firsTask为空
* 3. 阻塞队列不为空
*
* 首先考虑rs == SHUTDOWN的情况
* 这种情况下不接受新提交的任务,所以在firstTask不为空的时候会返回false;
* 然后,如果firstTask为空,并且workQueue也为空,则返回false;即:如果队列中已经没有任务了,不需要再添加线程
*
* 但是如果队列不会空,则可以继续执行
*/
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
!workQueue.isEmpty()))
//终止添加新线程
return false;
for (;;) {
//获取线程池中线程数
int wc = workerCountOf(c);
//判断线程池中线程数是否大于最大限制,或者核心(或最大线程数)。如果大于直接返回。
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 尝试增加workerCount,如果成功,则跳出第一个for循环(为什么修改c值?因为满足增加线程的条件,而变量c记录线程池中线程的数量)
//cas修改数据可能失败,如果失败,则继续执行内循环
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
// 如果当前的运行状态不等于rs,说明状态已被改变,返回外层for循环,继续执行外层for循环校验
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
//通过校验,则执行增加逻辑
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//firstTask作为入参,创建Worker对象
w = new Worker(firstTask);
//每一个Worker对象都会创建一个线程
final Thread t = w.thread;
if (t != null) {
//防止并发问题
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
// rs < SHUTDOWN表示是RUNNING状态;
// rs是RUNNING状态 || rs是SHUTDOWN状态 && firstTask为null,向线程池中添加线程。
// 因为在SHUTDOWN时不会在添加新的任务,但还是会执行workQueue中的任务
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // 测试线程是否是活跃状态(即:调用了start方法,则为true)
throw new IllegalThreadStateException();
//使用set集合暂存worker对象
workers.add(w);
int s = workers.size();
// largestPoolSize记录当前线程池中最大线程数
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
//启动worker相关线程,通过run方法调用runWork方法
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
Worker类
//Worker类继承AQS、实现Runnable接口
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/** 具体执行任务的线程. */
final Thread thread;
/** 初始的待任务,可能为空 */
Runnable firstTask;
/** 记录每个线程完成任务数 */
volatile long completedTasks;
/**
*构造函数中,利用ThreadFactory产生新的线程,并给新线程第一个任务(任务可能为null)
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // 在运行worker前,禁止线程中断操作。(如果想对worker操作,必须获取锁,将state=0,改成state=1,设置为1,则无法获取锁)
this.firstTask = firstTask;
//创建新线程,并给新线程初始化执行逻辑(worker就是Runnable,在run方法中定义了处理逻辑)
this.thread = getThreadFactory().newThread(this);
}
/** runWorker为主要处理逻辑 */
public void run() {
runWorker(this);
}
// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
protected boolean isHeldExclusively() {
return getState() != 0;
}
//尝试获取锁(cas方式0-->1)
//tryAcquire方法判断state是否为0,所以setState(-1);将state设置为-1,为了禁止在执行任务前(调用runWorker前)对线程进行中断。
//在runWorker方法中会先调用Worker对象的unlock方法将state设置为0。(当state=0时,worker线程可以被中断)
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
//中断当前线程
//isInterrupted():判断线程对象中是否设置了中断标识,如果设置,则返回true,但不会清除中断标识,再次调用仍为true。
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
//没有设置中断,则执行中断操作
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
线程池中每一个线程被封装成一个Worker对象,ThreadPool维护的其实就是一组Worker对象(workers.add(w);上面代码显示),请参见JDK源码。
Worker类继承AQS、实现Runnable接口,重要的两个属性
1.firstTask用它来保存传入的任务;
2.thread是在调用构造方法时通过ThreadFactory来创建的线程,是处理任务的线程。
在调用构造方法时,传入需要执行的任务,通过getThreadFactory().newThread(this);来新建一个线程,newThread方法传入的参数是this,因为Worker本身继承了Runnable接口,也就是一个线程, 所以Worker对象在启动的时候会调用Worker类中的run方法。
Worker继承了AQS,使用AQS来实现独占锁的功能。为什么不使用ReentrantLock来实现呢?
Worker中tryAcquire方法不允许重入的(当前线程同一时间仅能被一个任务占用),而ReentrantLock的tryAcquire允许重入:
- lock方法一旦获取了独占锁,表示当前线程正在执行任务中;如果正在执行任务,则不应该中断线程;
- 如果线程不是独占锁的状态,即空闲的状态,说明它没有在处理任务,则可以对该线程进行中断;
- 线程池在执行shutdown方法或tryTerminate方法时会调用interruptIdleWorkers方法来中断空闲的线程,interruptIdleWorkers方法会使用tryLock方法来判断线程池中的线程是否是空闲状态;
- 之所以设置为不可重入,是因为我们不希望任务在调用像setCorePoolSize这样的线程池控制方法时重新获取锁。如果使用ReentrantLock,它是可重入的,这样如果在任务中调用了如setCorePoolSize这类线程池控制的方法,会中断正在运行的线程。
- 所以,Worker继承自AQS,用于判断线程是否空闲以及是否可以被中断。
Worker在构造方法中执行了setState(-1);,把state变量设置为-1,为什么这么做呢?
因为AQS中默认的state是0,如果创建了一个Worker对象,还没有执行任务时,这时就不应该被中断,即防止Worker对象未调用runWorker方法,就被中断。
runWorker()方法
//每个线程都会循环获取队列中的任务,并调用任务的run()方法
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
//获取第一个任务,可能为空
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // (将state从-1修改为0,允许中断操作)
//是否因为异常退出循环
boolean completedAbruptly = true;
try {
//如果task=null,则获取队列中任务,即:队列任务最后执行
while (task != null || (task = getTask()) != null) {
w.lock();
//如果线程池正在停止,那么要保证当前线程是中断状态;
//如果不是的话,则要保证当前线程不是中断状态;
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();//中断当前线程
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
//当前线程调用传入线程池任务的run方法
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
STOP
//001
//状态切换:调用线程池的shutdownNow()方法。(RUNNING、SHUTDOWN-->STOP)
//状态说明:不再接受新任务,不再处理队列中的任务,并且中断当前执行的任务。
private static final int STOP = 1 << COUNT_BITS;
runWorker方法的执行过程:
- while循环不断地通过getTask()方法获取任务,getTask()方法从阻塞队列中取任务;
- 如果线程池正在停止,那么要保证当前线程是中断状态,否则要保证当前线程不是中断状态;
- 调用task.run()执行任务;
- 如果task为null则跳出循环,执行processWorkerExit()方法;
- runWorker方法执行完毕,也代表着Worker中的run方法执行完毕,销毁线程。
这里的beforeExecute方法和afterExecute方法在ThreadPoolExecutor类中是空的,留给子类来实现。
completedAbruptly变量来表示在执行任务过程中是否出现了异常,在processWorkerExit方法中会对该变量的值进行判断。
getTask()方法
private Runnable getTask() {
//上次从阻塞队列中(使用poll方法)获取任务,是否超时。
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
/*
* 如果线程池状态rs >= SHUTDOWN,也就是非RUNNING状态,再进行以下判断:
* 1. rs >= STOP,线程池是否正在stop;
* 2. 阻塞队列是否为空。
* 如果以上条件满足,则将workerCount减1并返回null。【返回null,调用getTask处,会跳出循环,并对线程销毁处理】
* 因为如果当前线程池状态的值是SHUTDOWN或以上时,不允许再向阻塞队列中添加任务。
*/
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();//使用CAS方式操作,将Worker数量减1
//返回null,终止runWork方法中循环
return null;
}
//wc:线程池中线程数
int wc = workerCountOf(c);
// timed:判断是否需要进行超时控制。
// allowCoreThreadTimeOut默认是false,即:核心线程不进行超时回收;
// wc > corePoolSize,表示当前线程池中的线程数量大于核心线程数量;
// 对于超过核心线程数量的这些线程,需要进行超时控制
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
/*
* wc > maximumPoolSize的情况是因为可能在此方法执行阶段同时执行了setMaximumPoolSize方法;
* timed && timedOut 如果为true,表示当前操作需要进行超时控制,并且上次从阻塞队列中获取任务发生了超时
* 如果有效线程数量大于1,或者阻塞队列是空的,那么尝试将workerCount减1;
* 如果减1失败,执行continue,进行重试。
* 如果wc == 1时,也就说明当前线程是线程池中唯一的一个线程了。
*/
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
/*
* 根据timed来判断,如果为true,则通过阻塞队列的poll方法进行超时控制,如果在keepAliveTime时间内没有获取到任务,则返回null;
* 否则通过take方法,如果这时队列为空,则take方法会阻塞直到队列不为空。【保证核心线程,不会被销毁的方式】。
* keepAliveTime:线程池初始化时,设置的参数
*/
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
// 如果获取任务时当前线程发生了中断,则设置timedOut为false并返回循环重试
timedOut = false;
}
}
}
总结:
第二个if判断目的是控制线程池的有效线程数量。
在执行execute方法时,如果当前线程池的线程数量超过了corePoolSize且小于maximumPoolSize,并且workQueue已满时,则可以增加工作线程,
但这时如果超时没有获取到任务,也就是timedOut为true的情况,说明workQueue已经为空,也就说明了当前线程池中不需要那么多线程来执行任务了,可以把多于corePoolSize数量的线程销毁掉,保持线程数量在corePoolSize即可。
什么时候会销毁?当然是runWorker方法执行完之后,也就是Worker中的run方法执行完,由JVM自动回收。
getTask方法返回null时,在runWorker方法中会跳出while循环,然后会执行processWorkerExit方法(进行线程回收)。
processWorkerExit()方法
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// 如果completedAbruptly值为true,则说明线程执行时出现了异常,需要将workerCount减1;
// 如果线程执行时没有出现异常,说明在getTask()方法中已经完成对workerCount进行了减1操作。
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//统计完成的任务数
completedTaskCount += w.completedTasks;
//从线程池中移除工作线程
workers.remove(w);
} finally {
mainLock.unlock();
}
// 对线程池进行判断,尝试设置线程池为最终态
//如果(SHUTDOWN且线程池和队列为空)或(STOP且线程池为空),则转换为终止状态(TERMINATED)。
tryTerminate();
int c = ctl.get();
/*
* 保留线程池中线程操作
* 当线程池是RUNNING或SHUTDOWN状态时(c<STOP),如果worker是异常结束,直接通过addWorker方式想线程池添加线程;
* 如果allowCoreThreadTimeOut=true,并且等待队列有任务,至少保留一个worker;
* 如果allowCoreThreadTimeOut=false,workerCount不少于corePoolSize。
*/
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {//正常结束。completedAbruptly=false
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}
tryTerminate()方法
如果(SHUTDOWN且线程池和队列为空)或(STOP且线程池为空),则转换为终止状态(TERMINATED)。
如果有资格终止,但workerCount不为零,则中断空闲工作进程,以确保停机信号传播。
必须在可能导致终止的任何操作之后调用此方法,这些操作包括减少worker计数或在关机期间从队列中删除任务。
该方法是非私有的,允许从ScheduledThreadPoolExecutor进行访问
final void tryTerminate() {
for (;;) {
int c = ctl.get();
//校验是否进行终止线程池操作
//1.线程池为RUNNING状态。(不能终止)
//2.线程池为TERMINATED状态(已经终止)。
//3.线程池为SHUTDOWN状态且队列不为null(不能终止,需要处理完队列任务)
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
//符合终止条件。判断线程池中是否还存在线程,如果存在,则设置中断标识(调用interrupt方法)
if (workerCountOf(c) != 0) { // Eligible to terminate
//中断线程池中线程
interruptIdleWorkers(ONLY_ONE);
return;
}
//设置线程池终止状态
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();
} finally {
//设置线程池状态为终态。
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}