searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

Java线程池应用与原理

2023-05-19 05:49:45
39
0

java中线程池使用时,常用类结构

正确使用线程池姿势

线程池根据不同业务场景设置不同的线程池。不要整个系统用一个线程池。因为其他业务场景占用线程过多,影响其他业务处理

线程池方法

Executor有一个重要子接口ExecutorService,其中定义了线程池的具体行为

1.execute(Runnable command):履行Ruannable类型的任务,

2.submit(task):可用来提交Callable或Runnable任务,并返回代表此任务的Future对象

3.shutdown():在完成已提交的任务后关闭线程池,不再接管新任务,

4.shutdownNow():停止所有正在履行的任务并立刻关闭线程池

5.isTerminated():测试所有任务是否都执行完毕

6.isShutdown():测试是否该ExecutorService已被关闭。

一、线程与线程池性能对比

线程池:线程缓存的实现,线程是稀缺资源,避免线程被频繁的创建销毁。

例如:在web应用中,服务器接受处理请求,并为请求分配一个线程进行处理,每个新请求创建一个线程,实现简单,但存在问题。

线程池优势

复用存在的线程,减少线程创建、消亡的开销,提高性能。

可以提高响应速度。(当任务到达时,不需要等待线程创建,使用现有线程,就可以立即执行)

提高线程的可管理性。(线程池统一管理、分配线程;避免无限创建)

二、JDK自带线程池

Executors.newCachedThreadPool()
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

问题:高并发时,可能导致大量创建线程,导致cup达到100%。因为队列不缓存任务,所有任务都需要对应一个线程。

Executors.newFixedThreadPool(10)
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}
Executors.newSingleThreadExecutor()
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}
Executors.newScheduledThreadPool(5)
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}
ScheduledThreadPoolExecutor继承ThreadPoolExecutor,内部使用DelayedWorkQueue延迟队列
//利用ThreadPoolExecutor初始化
public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue());
}
//任务被封装为ScheduledFutureTask,底层执行任务依靠addWorker方法,定时依赖延迟队列

线程池中的队列

SynchronousQueue(无缓冲)

SynchronousQueue:无缓冲等待队列,是一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,

拥有公平(FIFO)和非公平(LIFO)策略,非公平策略会导致一些数据永远无法被消费的情况?

使用SynchronousQueue阻塞队列一般要求maximumPoolSizes为Integer.MAX_VALUE,避免线程拒绝执行操作

LinkedBlockingQueue(无界)

按FIFO排序任务,吞吐量通常要高于ArrayBlockingQuene

LinkedBlockingQueue:无界队列,当前执行的线程数量达到corePoolSize的数量时,剩余的元素会在阻塞队列里等待。(所以在使用此阻塞队列时maximumPoolSizes参数就相当于无效),每个线程完全独立于其他线程。生产者和消费者使用独立的锁来控制数据的同步,即在高并发的情况下可以并行操作队列中的数据。

可指定队列大小,默认队列大小Integer.MAX_VALUE

ArrayBlockingQueue(有界)

ArrayBlockingQueue:有界队列,可以指定缓存队列的大小,

1.当正在执行的线程数等于corePoolSize时,多余的元素缓存在ArrayBlockingQueue队列中等待有空闲的线程时继续执行,

2.当ArrayBlockingQueue已满时,加入ArrayBlockingQueue失败,会开启新的线程去执行

3.当线程数达到最大的maximumPoolSizes时,再有新的元素尝试加入ArrayBlockingQueue时会执行拒绝策略

priorityBlockingQuene

具有优先级的无界阻塞队列

DelayedWorkQueue

底层为RunnableScheduledFuture类型的数组,初始容量为16

三、线程池应用

具体实现:ThreadPoolExecutor、ScheduledThreadPoolExecutor

ThreadPoolExecutor

核心参数

corePoolSize(核心线程数)

1.线程池中的核心线程数,当提交一个任务时,线程池创建一个新线程执行任务,直到当前线程数等于corePoolSize;

2.如果当前线程数为corePoolSize,继续提交的任务被保存到阻塞队列中,等待被执行;

3.如果执行了线程池的prestartAllCoreThreads()方法,线程池会提前创建并启动所有核心线程。

maximumPoolSize(最大线程数)

线程池中允许的最大线程数。如果当前阻塞队列满了,且继续提交任务,(如果当前线程数小于maximumPoolSize)则创建新的线程执行任务,

keepAliveTime(允许的空闲时间)

线程池非核心线程允许的空闲时间。当线程池中的线程数量大于corePoolSize的时,且队列中任务时,核心线程外的线程会等待超过了keepAliveTime,对额外的线程进行销毁;

unit(单位)

keepAliveTime的单位;

workQueue(阻塞队列)

用来保存等待被执行的任务的阻塞队列,且任务必须实现Runable接口,在JDK中提供了如下阻塞队列:

threadFactory(线程工厂)

继承ThreadFactory接口的变量,用来创建新线程。默认使用Executors.defaultThreadFactory() 来创建线程。

使用默认的ThreadFactory来创建线程时,新创建的线程具有相同的NORM_PRIORITY优先级并且是非守护线程,同时也设置线程的名称。

handler

线程池的饱和策略阻塞队列放满了,且没有空闲的工作线程,如果继续提交任务,必须采取一种策略处理该任务,线程池提供了4种策略:

1、AbortPolicy:直接抛出异常,默认策略

2、CallerRunsPolicy:用调用者所在的线程来执行任务;

3、DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;

4、DiscardPolicy:直接丢弃任务;

上面的4种策略都是ThreadPoolExecutor的内部类。

也可以根据应用场景实现RejectedExecutionHandler接口,自定义饱和策略,如记录日志或持久化存储不能处理的任务。

四、源码解析

源码重点属性

//ctl=111000000000000000000000000000000

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

//Interge.size = 32;故COUNT_BITS=29

private static final int COUNT_BITS = Integer.SIZE - 3;

//向左移动29位,0010 0000 0000 0000 0000 0000 0000 0000 - 1 = 0001 1111 1111 1111 1111 1111 1111 1111(29位1)

private static final int CAPACITY = (1 << COUNT_BITS) - 1;

ctl是对线程池的运行状态、线程池有效线程数量(能记录2^29-1.约5亿)进行记录的一个字段。

使用Integer类型保存

高3位保存线程状态(runState)低29位保存线程数量(workerCount)

线程池状态

RUNNING[-536870912]

//111000000000000000000000000000000。

//状态说明:接受新任务或者处理队列中任务。

private static final int RUNNING = -1 << COUNT_BITS;

SHUTDOWN[0]

//000000000000000000000000000000000。

//状态切换:调用线程池的shutdown()方法。(RUNNING-->SHUTDOWN)

//状态说明:不再接受新任务仅处理队列中的任务

private static final int SHUTDOWN = 0 << COUNT_BITS;

STOP[536870912]

//001000000000000000000000000000000。

//状态切换:调用线程池的shutdownNow()方法。(RUNNING、SHUTDOWN-->STOP)

//状态说明:不再接受新任务,不再处理队列中的任务,并且中断当前执行的任务

private static final int STOP = 1 << COUNT_BITS;

TIDYING[1073741824]

//01000000000000000000000000000000。

//状态切换:

当线程池在SHUTDOWN状态下,阻塞队列为空并且线程池中工作线程数为0时,就会由 SHUTDOWN -> TIDYING

线程池在STOP状态下,线程池中工作线程数为0时,就会由STOP -> TIDYING

//状态说明:所有任务都已终止,workerCount(任务数)为0,状态为TIDYING的线程将运行钩子函数terminated(),(terminated()方法为空,需要用户可自己实现)。

private static final int TIDYING = 2 << COUNT_BITS;

TERMINATED[1610612736]

//01100000000000000000000000000000。

//状态切换:线程池处在TIDYING状态时,执行完terminated()之后,就会由 TIDYING -> TERMINATED。

//状态说明:线程池彻底终止,就变成TERMINATED状态。

private static final int TERMINATED = 3 << COUNT_BITS;

ThreadPoolExecutor源码注释

execute()方法

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    //ctl记录线程池状态(runState)、线程数(workCount)
    int c = ctl.get();

    //1.计算当前线程数
    //如果当前线程数【 < 核心线程数】,则创建新的线程。并把任务添加到线程中。
    if (workerCountOf(c) < corePoolSize) {
        //创建新线程成功后直接返回。
        //第二个参数:(ture)标示添加线程与corePoolSize比较,否则与maximumPoolSize比较
        if (addWorker(command, true))
            return;
        c = ctl.get();//添加失败,则重新获取ctl数据
    }
    //2.当线程数【 >= 核心线程数】,则执行到此处
    //a.校验线程池状态,是否为运行状态
    //b.尝试将任务加入队列,
    //c.再次校验线程池状态(防止线程加入队列时发生变化),如果线程池shutdown,则丢弃任务,如果线程池无工作线程,则添加线程
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))//再次判断线程池运行状态,如果不是RUNNING状态,则进行移除操作。并执行拒绝策略
            reject(command);
        else if (workerCountOf(recheck) == 0)//获取线程池中有效线程数,如果为0,则执行addWorker方法。
            addWorker(null, false);//第一个参数为null时,表示仅在线程池创建一个线程,用于消费队列中任务(task != null || (task = getTask()) != null)task==null,则从队列中获取任务。
    }
    //3.当队列放满,则执行到此处,有两种情况。
    //a.线程池已经不是RUNNING状态
    //b.是RUNNING状态,但线程数>核心线程数,且队列已满。(故addWorker方法,第二个参数出入false与maximumPoolSize比较)
    else if (!addWorker(command, false))
        reject(command);
}

execute()执行总结。

1.线程池未达到核心线程时(workerCount < corePoolSize),当有任务添加时,不断创建新线程作为核心线程;

2.当线程数达到核心线程时(workerCount >= corePoolSize)队列不满,则将任务放入队列中.

3.当队列满时(workerCount >= corePoolSize && workerCount < maximumPoolSize,且线程池内的阻塞队列已满),尝试创建非核心线程

4.如果workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满, 则根据拒绝策略来处理该任务, 默认的处理方式是直接抛异常

通过上面步骤可以看出。如果开始创建非核心线程时,队列任务被消费(即不满状态),新任务添加时,将任务添加到队列中。等队列满时,再次开始创建非核心线程

addWorker()方法

//校验是否满足增加的条件


//1.firstTask:用于指定新增的线程执行的【第一个任务】,firstTask==null时,代表仅增加执行线程。
//2.core:(ture)添加线程时与corePoolSize比较,否则与maximumPoolSize比较
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        //获取当前线程运行状态
        int c = ctl.get();
        int rs = runStateOf(c);

    /*
     * 如果rs >= SHUTDOWN(0),则表示此时不再接收新任务;
     * 接着判断以下3个条件,只要有1个不满足,则返回false:
     * 1. rs == SHUTDOWN,这时表示关闭状态,不再接受新提交的任务,但却可以继续处理阻塞队列中已保存的任务
     * 2. firsTask为空
     * 3. 阻塞队列不为空
     * 
     * 首先考虑rs == SHUTDOWN的情况
     * 这种情况下不接受新提交的任务,所以在firstTask不为空的时候会返回false;
     * 然后,如果firstTask为空,并且workQueue也为空,则返回false;即:如果队列中已经没有任务了,不需要再添加线程
     *
     * 但是如果队列不会空,则可以继续执行
     */

        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               !workQueue.isEmpty()))
               //终止添加新线程
            return false;


        for (;;) {
            //获取线程池中线程数
            int wc = workerCountOf(c);
            
            //判断线程池中线程数是否大于最大限制,或者核心(或最大线程数)。如果大于直接返回。
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // 尝试增加workerCount,如果成功,则跳出第一个for循环(为什么修改c值?因为满足增加线程的条件,而变量c记录线程池中线程的数量)
            //cas修改数据可能失败,如果失败,则继续执行内循环
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl

            // 如果当前的运行状态不等于rs,说明状态已被改变,返回外层for循环,继续执行外层for循环校验
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    //通过校验,则执行增加逻辑
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        //firstTask作为入参,创建Worker对象
        w = new Worker(firstTask);
        //每一个Worker对象都会创建一个线程
        final Thread t = w.thread;
        if (t != null) {
        		//防止并发问题
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());

                // rs < SHUTDOWN表示是RUNNING状态;
                // rs是RUNNING状态 || rs是SHUTDOWN状态 && firstTask为null,向线程池中添加线程。
                // 因为在SHUTDOWN时不会在添加新的任务,但还是会执行workQueue中的任务
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // 测试线程是否是活跃状态(即:调用了start方法,则为true)
                        throw new IllegalThreadStateException();
                    //使用set集合暂存worker对象
                    workers.add(w);
                    int s = workers.size();
                    // largestPoolSize记录当前线程池中最大线程数
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                //启动worker相关线程,通过run方法调用runWork方法
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

Worker类

//Worker类继承AQS、实现Runnable接口  
private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        /** 具体执行任务的线程. */
        final Thread thread;
        
        /** 初始的待任务,可能为空 */
        Runnable firstTask;
        
        /** 记录每个线程完成任务数 */
        volatile long completedTasks;

        /**
         *构造函数中,利用ThreadFactory产生新的线程,并给新线程第一个任务(任务可能为null)
         * @param firstTask the first task (null if none)
         */
        Worker(Runnable firstTask) {
            setState(-1); // 在运行worker前,禁止线程中断操作。(如果想对worker操作,必须获取锁,将state=0,改成state=1,设置为1,则无法获取锁)
            this.firstTask = firstTask;
            //创建新线程,并给新线程初始化执行逻辑(worker就是Runnable,在run方法中定义了处理逻辑)
            this.thread = getThreadFactory().newThread(this);
        }

        /** runWorker为主要处理逻辑  */
        public void run() {
            runWorker(this);
        }

        // Lock methods
        //
        // The value 0 represents the unlocked state.
        // The value 1 represents the locked state.

        protected boolean isHeldExclusively() {
            return getState() != 0;
        }
        //尝试获取锁(cas方式0-->1)
        //tryAcquire方法判断state是否为0,所以setState(-1);将state设置为-1,为了禁止在执行任务前(调用runWorker前)对线程进行中断。
        //在runWorker方法中会先调用Worker对象的unlock方法将state设置为0。(当state=0时,worker线程可以被中断)
        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }
        
        //中断当前线程
        //isInterrupted():判断线程对象中是否设置了中断标识,如果设置,则返回true,但不会清除中断标识,再次调用仍为true。
        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    //没有设置中断,则执行中断操作
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }

线程池中每一个线程被封装成一个Worker对象,ThreadPool维护的其实就是一组Worker对象(workers.add(w);上面代码显示),请参见JDK源码。

Worker类继承AQS实现Runnable接口,重要的两个属性

1.firstTask用它来保存传入的任务;

2.thread是在调用构造方法时通过ThreadFactory来创建的线程,是处理任务的线程。

在调用构造方法时,传入需要执行的任务,通过getThreadFactory().newThread(this);来新建一个线程,newThread方法传入的参数是this,因为Worker本身继承了Runnable接口,也就是一个线程, 所以Worker对象在启动的时候会调用Worker类中的run方法。

Worker继承了AQS,使用AQS来实现独占锁的功能。为什么不使用ReentrantLock来实现呢?

Worker中tryAcquire方法不允许重入的(当前线程同一时间仅能被一个任务占用),而ReentrantLock的tryAcquire允许重入

  1. lock方法一旦获取了独占锁,表示当前线程正在执行任务中;如果正在执行任务,则不应该中断线程
  2. 如果线程不是独占锁的状态,即空闲的状态,说明它没有在处理任务,则可以对该线程进行中断
  3. 线程池在执行shutdown方法或tryTerminate方法时会调用interruptIdleWorkers方法来中断空闲的线程,interruptIdleWorkers方法会使用tryLock方法来判断线程池中的线程是否是空闲状态;
  4. 之所以设置为不可重入,是因为我们不希望任务在调用像setCorePoolSize这样的线程池控制方法时重新获取锁。如果使用ReentrantLock,它是可重入的,这样如果在任务中调用了如setCorePoolSize这类线程池控制的方法,会中断正在运行的线程。
  5. 所以,Worker继承自AQS,用于判断线程是否空闲以及是否可以被中断。

Worker在构造方法中执行了setState(-1);,把state变量设置为-1,为什么这么做呢?

因为AQS中默认的state是0,如果创建了一个Worker对象,还没有执行任务时,这时就不应该被中断即防止Worker对象未调用runWorker方法,就被中断

runWorker()方法

//每个线程都会循环获取队列中的任务,并调用任务的run()方法
final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    //获取第一个任务,可能为空
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // (将state从-1修改为0,允许中断操作)
    //是否因为异常退出循环
    boolean completedAbruptly = true;
    try {
        //如果task=null,则获取队列中任务,即:队列任务最后执行
        while (task != null || (task = getTask()) != null) {
            w.lock();
            //如果线程池正在停止,那么要保证当前线程是中断状态;
            //如果不是的话,则要保证当前线程不是中断状态;
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();//中断当前线程
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    //当前线程调用传入线程池任务的run方法
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

STOP
        //001
        //状态切换:调用线程池的shutdownNow()方法。(RUNNING、SHUTDOWN-->STOP)
        //状态说明:不再接受新任务,不再处理队列中的任务,并且中断当前执行的任务。
        private static final int STOP       =  1 << COUNT_BITS;

runWorker方法的执行过程:

  1. while循环不断地通过getTask()方法获取任务,getTask()方法从阻塞队列中取任务
  2. 如果线程池正在停止,那么要保证当前线程是中断状态,否则要保证当前线程不是中断状态;
  3. 调用task.run()执行任务;
  4. 如果task为null则跳出循环,执行processWorkerExit()方法;
  5. runWorker方法执行完毕,也代表着Worker中的run方法执行完毕,销毁线程。

这里的beforeExecute方法和afterExecute方法在ThreadPoolExecutor类中是空的,留给子类来实现。

completedAbruptly变量来表示在执行任务过程中是否出现了异常,在processWorkerExit方法中会对该变量的值进行判断。

getTask()方法

private Runnable getTask() {
    //上次从阻塞队列中(使用poll方法)获取任务,是否超时。
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

    /*
     * 如果线程池状态rs >= SHUTDOWN,也就是非RUNNING状态,再进行以下判断:
     * 1. rs >= STOP,线程池是否正在stop;
     * 2. 阻塞队列是否为空。
     * 如果以上条件满足,则将workerCount减1并返回null。【返回null,调用getTask处,会跳出循环,并对线程销毁处理】
     * 因为如果当前线程池状态的值是SHUTDOWN或以上时,不允许再向阻塞队列中添加任务。
     */
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();//使用CAS方式操作,将Worker数量减1
            //返回null,终止runWork方法中循环
            return null;
        }

        //wc:线程池中线程数
        int wc = workerCountOf(c);

        // timed:判断是否需要进行超时控制。
        // allowCoreThreadTimeOut默认是false,即:核心线程不进行超时回收;
        // wc > corePoolSize,表示当前线程池中的线程数量大于核心线程数量;
        // 对于超过核心线程数量的这些线程,需要进行超时控制
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

    /*
     * wc > maximumPoolSize的情况是因为可能在此方法执行阶段同时执行了setMaximumPoolSize方法;
     * timed && timedOut 如果为true,表示当前操作需要进行超时控制,并且上次从阻塞队列中获取任务发生了超时
     * 如果有效线程数量大于1,或者阻塞队列是空的,那么尝试将workerCount减1;
     * 如果减1失败,执行continue,进行重试。
     * 如果wc == 1时,也就说明当前线程是线程池中唯一的一个线程了。
     */
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }
        /*
         * 根据timed来判断,如果为true,则通过阻塞队列的poll方法进行超时控制,如果在keepAliveTime时间内没有获取到任务,则返回null;
         * 否则通过take方法,如果这时队列为空,则take方法会阻塞直到队列不为空。【保证核心线程,不会被销毁的方式】。
         * keepAliveTime:线程池初始化时,设置的参数
         */
        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            // 如果获取任务时当前线程发生了中断,则设置timedOut为false并返回循环重试
            timedOut = false;
        }
    }
}

总结:

第二个if判断目的是控制线程池的有效线程数量

在执行execute方法时,如果当前线程池的线程数量超过了corePoolSize且小于maximumPoolSize,并且workQueue已满时,则可以增加工作线程,

但这时如果超时没有获取到任务,也就是timedOut为true的情况,说明workQueue已经为空,也就说明了当前线程池中不需要那么多线程来执行任务了,可以把多于corePoolSize数量的线程销毁掉,保持线程数量在corePoolSize即可。

什么时候会销毁?当然是runWorker方法执行完之后,也就是Worker中的run方法执行完,由JVM自动回收。

getTask方法返回null时,在runWorker方法中会跳出while循环,然后会执行processWorkerExit方法(进行线程回收)。

processWorkerExit()方法

private void processWorkerExit(Worker w, boolean completedAbruptly) {

    // 如果completedAbruptly值为true,则说明线程执行时出现了异常,需要将workerCount减1;
    // 如果线程执行时没有出现异常,说明在getTask()方法中已经完成对workerCount进行了减1操作。
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();


    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        //统计完成的任务数
        completedTaskCount += w.completedTasks;
        //从线程池中移除工作线程
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }

    // 对线程池进行判断,尝试设置线程池为最终态
    //如果(SHUTDOWN且线程池和队列为空)或(STOP且线程池为空),则转换为终止状态(TERMINATED)。
    tryTerminate();


    int c = ctl.get();

/*
* 保留线程池中线程操作
* 当线程池是RUNNING或SHUTDOWN状态时(c<STOP),如果worker是异常结束,直接通过addWorker方式想线程池添加线程;
* 如果allowCoreThreadTimeOut=true,并且等待队列有任务,至少保留一个worker;
* 如果allowCoreThreadTimeOut=false,workerCount不少于corePoolSize。
*/
    if (runStateLessThan(c, STOP)) {
        if (!completedAbruptly) {//正常结束。completedAbruptly=false
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        addWorker(null, false);
    }
}

tryTerminate()方法

如果(SHUTDOWN且线程池和队列为空)或(STOP且线程池为空),则转换为终止状态(TERMINATED)。
如果有资格终止,但workerCount不为零,则中断空闲工作进程,以确保停机信号传播。
必须在可能导致终止的任何操作之后调用此方法,这些操作包括减少worker计数或在关机期间从队列中删除任务。

该方法是非私有的,允许从ScheduledThreadPoolExecutor进行访问    
final void tryTerminate() {
        for (;;) {
            int c = ctl.get();
            
            //校验是否进行终止线程池操作
            //1.线程池为RUNNING状态。(不能终止)
            //2.线程池为TERMINATED状态(已经终止)。
            //3.线程池为SHUTDOWN状态且队列不为null(不能终止,需要处理完队列任务)
            if (isRunning(c) ||
                runStateAtLeast(c, TIDYING) ||
                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                return;
            
            //符合终止条件。判断线程池中是否还存在线程,如果存在,则设置中断标识(调用interrupt方法)
            if (workerCountOf(c) != 0) { // Eligible to terminate
                //中断线程池中线程
                interruptIdleWorkers(ONLY_ONE);
                return;
            }
            
            //设置线程池终止状态
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                        terminated();
                    } finally {
                       //设置线程池状态为终态。
                        ctl.set(ctlOf(TERMINATED, 0));
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
            // else retry on failed CAS
        }
    }

 

 

0条评论
0 / 1000
彳亍的骆驼
2文章数
0粉丝数
彳亍的骆驼
2 文章 | 0 粉丝
彳亍的骆驼
2文章数
0粉丝数
彳亍的骆驼
2 文章 | 0 粉丝
原创

Java线程池应用与原理

2023-05-19 05:49:45
39
0

java中线程池使用时,常用类结构

正确使用线程池姿势

线程池根据不同业务场景设置不同的线程池。不要整个系统用一个线程池。因为其他业务场景占用线程过多,影响其他业务处理

线程池方法

Executor有一个重要子接口ExecutorService,其中定义了线程池的具体行为

1.execute(Runnable command):履行Ruannable类型的任务,

2.submit(task):可用来提交Callable或Runnable任务,并返回代表此任务的Future对象

3.shutdown():在完成已提交的任务后关闭线程池,不再接管新任务,

4.shutdownNow():停止所有正在履行的任务并立刻关闭线程池

5.isTerminated():测试所有任务是否都执行完毕

6.isShutdown():测试是否该ExecutorService已被关闭。

一、线程与线程池性能对比

线程池:线程缓存的实现,线程是稀缺资源,避免线程被频繁的创建销毁。

例如:在web应用中,服务器接受处理请求,并为请求分配一个线程进行处理,每个新请求创建一个线程,实现简单,但存在问题。

线程池优势

复用存在的线程,减少线程创建、消亡的开销,提高性能。

可以提高响应速度。(当任务到达时,不需要等待线程创建,使用现有线程,就可以立即执行)

提高线程的可管理性。(线程池统一管理、分配线程;避免无限创建)

二、JDK自带线程池

Executors.newCachedThreadPool()
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

问题:高并发时,可能导致大量创建线程,导致cup达到100%。因为队列不缓存任务,所有任务都需要对应一个线程。

Executors.newFixedThreadPool(10)
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}
Executors.newSingleThreadExecutor()
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}
Executors.newScheduledThreadPool(5)
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}
ScheduledThreadPoolExecutor继承ThreadPoolExecutor,内部使用DelayedWorkQueue延迟队列
//利用ThreadPoolExecutor初始化
public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue());
}
//任务被封装为ScheduledFutureTask,底层执行任务依靠addWorker方法,定时依赖延迟队列

线程池中的队列

SynchronousQueue(无缓冲)

SynchronousQueue:无缓冲等待队列,是一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,

拥有公平(FIFO)和非公平(LIFO)策略,非公平策略会导致一些数据永远无法被消费的情况?

使用SynchronousQueue阻塞队列一般要求maximumPoolSizes为Integer.MAX_VALUE,避免线程拒绝执行操作

LinkedBlockingQueue(无界)

按FIFO排序任务,吞吐量通常要高于ArrayBlockingQuene

LinkedBlockingQueue:无界队列,当前执行的线程数量达到corePoolSize的数量时,剩余的元素会在阻塞队列里等待。(所以在使用此阻塞队列时maximumPoolSizes参数就相当于无效),每个线程完全独立于其他线程。生产者和消费者使用独立的锁来控制数据的同步,即在高并发的情况下可以并行操作队列中的数据。

可指定队列大小,默认队列大小Integer.MAX_VALUE

ArrayBlockingQueue(有界)

ArrayBlockingQueue:有界队列,可以指定缓存队列的大小,

1.当正在执行的线程数等于corePoolSize时,多余的元素缓存在ArrayBlockingQueue队列中等待有空闲的线程时继续执行,

2.当ArrayBlockingQueue已满时,加入ArrayBlockingQueue失败,会开启新的线程去执行

3.当线程数达到最大的maximumPoolSizes时,再有新的元素尝试加入ArrayBlockingQueue时会执行拒绝策略

priorityBlockingQuene

具有优先级的无界阻塞队列

DelayedWorkQueue

底层为RunnableScheduledFuture类型的数组,初始容量为16

三、线程池应用

具体实现:ThreadPoolExecutor、ScheduledThreadPoolExecutor

ThreadPoolExecutor

核心参数

corePoolSize(核心线程数)

1.线程池中的核心线程数,当提交一个任务时,线程池创建一个新线程执行任务,直到当前线程数等于corePoolSize;

2.如果当前线程数为corePoolSize,继续提交的任务被保存到阻塞队列中,等待被执行;

3.如果执行了线程池的prestartAllCoreThreads()方法,线程池会提前创建并启动所有核心线程。

maximumPoolSize(最大线程数)

线程池中允许的最大线程数。如果当前阻塞队列满了,且继续提交任务,(如果当前线程数小于maximumPoolSize)则创建新的线程执行任务,

keepAliveTime(允许的空闲时间)

线程池非核心线程允许的空闲时间。当线程池中的线程数量大于corePoolSize的时,且队列中任务时,核心线程外的线程会等待超过了keepAliveTime,对额外的线程进行销毁;

unit(单位)

keepAliveTime的单位;

workQueue(阻塞队列)

用来保存等待被执行的任务的阻塞队列,且任务必须实现Runable接口,在JDK中提供了如下阻塞队列:

threadFactory(线程工厂)

继承ThreadFactory接口的变量,用来创建新线程。默认使用Executors.defaultThreadFactory() 来创建线程。

使用默认的ThreadFactory来创建线程时,新创建的线程具有相同的NORM_PRIORITY优先级并且是非守护线程,同时也设置线程的名称。

handler

线程池的饱和策略阻塞队列放满了,且没有空闲的工作线程,如果继续提交任务,必须采取一种策略处理该任务,线程池提供了4种策略:

1、AbortPolicy:直接抛出异常,默认策略

2、CallerRunsPolicy:用调用者所在的线程来执行任务;

3、DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;

4、DiscardPolicy:直接丢弃任务;

上面的4种策略都是ThreadPoolExecutor的内部类。

也可以根据应用场景实现RejectedExecutionHandler接口,自定义饱和策略,如记录日志或持久化存储不能处理的任务。

四、源码解析

源码重点属性

//ctl=111000000000000000000000000000000

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

//Interge.size = 32;故COUNT_BITS=29

private static final int COUNT_BITS = Integer.SIZE - 3;

//向左移动29位,0010 0000 0000 0000 0000 0000 0000 0000 - 1 = 0001 1111 1111 1111 1111 1111 1111 1111(29位1)

private static final int CAPACITY = (1 << COUNT_BITS) - 1;

ctl是对线程池的运行状态、线程池有效线程数量(能记录2^29-1.约5亿)进行记录的一个字段。

使用Integer类型保存

高3位保存线程状态(runState)低29位保存线程数量(workerCount)

线程池状态

RUNNING[-536870912]

//111000000000000000000000000000000。

//状态说明:接受新任务或者处理队列中任务。

private static final int RUNNING = -1 << COUNT_BITS;

SHUTDOWN[0]

//000000000000000000000000000000000。

//状态切换:调用线程池的shutdown()方法。(RUNNING-->SHUTDOWN)

//状态说明:不再接受新任务仅处理队列中的任务

private static final int SHUTDOWN = 0 << COUNT_BITS;

STOP[536870912]

//001000000000000000000000000000000。

//状态切换:调用线程池的shutdownNow()方法。(RUNNING、SHUTDOWN-->STOP)

//状态说明:不再接受新任务,不再处理队列中的任务,并且中断当前执行的任务

private static final int STOP = 1 << COUNT_BITS;

TIDYING[1073741824]

//01000000000000000000000000000000。

//状态切换:

当线程池在SHUTDOWN状态下,阻塞队列为空并且线程池中工作线程数为0时,就会由 SHUTDOWN -> TIDYING

线程池在STOP状态下,线程池中工作线程数为0时,就会由STOP -> TIDYING

//状态说明:所有任务都已终止,workerCount(任务数)为0,状态为TIDYING的线程将运行钩子函数terminated(),(terminated()方法为空,需要用户可自己实现)。

private static final int TIDYING = 2 << COUNT_BITS;

TERMINATED[1610612736]

//01100000000000000000000000000000。

//状态切换:线程池处在TIDYING状态时,执行完terminated()之后,就会由 TIDYING -> TERMINATED。

//状态说明:线程池彻底终止,就变成TERMINATED状态。

private static final int TERMINATED = 3 << COUNT_BITS;

ThreadPoolExecutor源码注释

execute()方法

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    //ctl记录线程池状态(runState)、线程数(workCount)
    int c = ctl.get();

    //1.计算当前线程数
    //如果当前线程数【 < 核心线程数】,则创建新的线程。并把任务添加到线程中。
    if (workerCountOf(c) < corePoolSize) {
        //创建新线程成功后直接返回。
        //第二个参数:(ture)标示添加线程与corePoolSize比较,否则与maximumPoolSize比较
        if (addWorker(command, true))
            return;
        c = ctl.get();//添加失败,则重新获取ctl数据
    }
    //2.当线程数【 >= 核心线程数】,则执行到此处
    //a.校验线程池状态,是否为运行状态
    //b.尝试将任务加入队列,
    //c.再次校验线程池状态(防止线程加入队列时发生变化),如果线程池shutdown,则丢弃任务,如果线程池无工作线程,则添加线程
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))//再次判断线程池运行状态,如果不是RUNNING状态,则进行移除操作。并执行拒绝策略
            reject(command);
        else if (workerCountOf(recheck) == 0)//获取线程池中有效线程数,如果为0,则执行addWorker方法。
            addWorker(null, false);//第一个参数为null时,表示仅在线程池创建一个线程,用于消费队列中任务(task != null || (task = getTask()) != null)task==null,则从队列中获取任务。
    }
    //3.当队列放满,则执行到此处,有两种情况。
    //a.线程池已经不是RUNNING状态
    //b.是RUNNING状态,但线程数>核心线程数,且队列已满。(故addWorker方法,第二个参数出入false与maximumPoolSize比较)
    else if (!addWorker(command, false))
        reject(command);
}

execute()执行总结。

1.线程池未达到核心线程时(workerCount < corePoolSize),当有任务添加时,不断创建新线程作为核心线程;

2.当线程数达到核心线程时(workerCount >= corePoolSize)队列不满,则将任务放入队列中.

3.当队列满时(workerCount >= corePoolSize && workerCount < maximumPoolSize,且线程池内的阻塞队列已满),尝试创建非核心线程

4.如果workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满, 则根据拒绝策略来处理该任务, 默认的处理方式是直接抛异常

通过上面步骤可以看出。如果开始创建非核心线程时,队列任务被消费(即不满状态),新任务添加时,将任务添加到队列中。等队列满时,再次开始创建非核心线程

addWorker()方法

//校验是否满足增加的条件


//1.firstTask:用于指定新增的线程执行的【第一个任务】,firstTask==null时,代表仅增加执行线程。
//2.core:(ture)添加线程时与corePoolSize比较,否则与maximumPoolSize比较
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        //获取当前线程运行状态
        int c = ctl.get();
        int rs = runStateOf(c);

    /*
     * 如果rs >= SHUTDOWN(0),则表示此时不再接收新任务;
     * 接着判断以下3个条件,只要有1个不满足,则返回false:
     * 1. rs == SHUTDOWN,这时表示关闭状态,不再接受新提交的任务,但却可以继续处理阻塞队列中已保存的任务
     * 2. firsTask为空
     * 3. 阻塞队列不为空
     * 
     * 首先考虑rs == SHUTDOWN的情况
     * 这种情况下不接受新提交的任务,所以在firstTask不为空的时候会返回false;
     * 然后,如果firstTask为空,并且workQueue也为空,则返回false;即:如果队列中已经没有任务了,不需要再添加线程
     *
     * 但是如果队列不会空,则可以继续执行
     */

        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               !workQueue.isEmpty()))
               //终止添加新线程
            return false;


        for (;;) {
            //获取线程池中线程数
            int wc = workerCountOf(c);
            
            //判断线程池中线程数是否大于最大限制,或者核心(或最大线程数)。如果大于直接返回。
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // 尝试增加workerCount,如果成功,则跳出第一个for循环(为什么修改c值?因为满足增加线程的条件,而变量c记录线程池中线程的数量)
            //cas修改数据可能失败,如果失败,则继续执行内循环
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl

            // 如果当前的运行状态不等于rs,说明状态已被改变,返回外层for循环,继续执行外层for循环校验
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    //通过校验,则执行增加逻辑
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        //firstTask作为入参,创建Worker对象
        w = new Worker(firstTask);
        //每一个Worker对象都会创建一个线程
        final Thread t = w.thread;
        if (t != null) {
        		//防止并发问题
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());

                // rs < SHUTDOWN表示是RUNNING状态;
                // rs是RUNNING状态 || rs是SHUTDOWN状态 && firstTask为null,向线程池中添加线程。
                // 因为在SHUTDOWN时不会在添加新的任务,但还是会执行workQueue中的任务
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // 测试线程是否是活跃状态(即:调用了start方法,则为true)
                        throw new IllegalThreadStateException();
                    //使用set集合暂存worker对象
                    workers.add(w);
                    int s = workers.size();
                    // largestPoolSize记录当前线程池中最大线程数
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                //启动worker相关线程,通过run方法调用runWork方法
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

Worker类

//Worker类继承AQS、实现Runnable接口  
private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        /** 具体执行任务的线程. */
        final Thread thread;
        
        /** 初始的待任务,可能为空 */
        Runnable firstTask;
        
        /** 记录每个线程完成任务数 */
        volatile long completedTasks;

        /**
         *构造函数中,利用ThreadFactory产生新的线程,并给新线程第一个任务(任务可能为null)
         * @param firstTask the first task (null if none)
         */
        Worker(Runnable firstTask) {
            setState(-1); // 在运行worker前,禁止线程中断操作。(如果想对worker操作,必须获取锁,将state=0,改成state=1,设置为1,则无法获取锁)
            this.firstTask = firstTask;
            //创建新线程,并给新线程初始化执行逻辑(worker就是Runnable,在run方法中定义了处理逻辑)
            this.thread = getThreadFactory().newThread(this);
        }

        /** runWorker为主要处理逻辑  */
        public void run() {
            runWorker(this);
        }

        // Lock methods
        //
        // The value 0 represents the unlocked state.
        // The value 1 represents the locked state.

        protected boolean isHeldExclusively() {
            return getState() != 0;
        }
        //尝试获取锁(cas方式0-->1)
        //tryAcquire方法判断state是否为0,所以setState(-1);将state设置为-1,为了禁止在执行任务前(调用runWorker前)对线程进行中断。
        //在runWorker方法中会先调用Worker对象的unlock方法将state设置为0。(当state=0时,worker线程可以被中断)
        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }
        
        //中断当前线程
        //isInterrupted():判断线程对象中是否设置了中断标识,如果设置,则返回true,但不会清除中断标识,再次调用仍为true。
        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    //没有设置中断,则执行中断操作
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }

线程池中每一个线程被封装成一个Worker对象,ThreadPool维护的其实就是一组Worker对象(workers.add(w);上面代码显示),请参见JDK源码。

Worker类继承AQS实现Runnable接口,重要的两个属性

1.firstTask用它来保存传入的任务;

2.thread是在调用构造方法时通过ThreadFactory来创建的线程,是处理任务的线程。

在调用构造方法时,传入需要执行的任务,通过getThreadFactory().newThread(this);来新建一个线程,newThread方法传入的参数是this,因为Worker本身继承了Runnable接口,也就是一个线程, 所以Worker对象在启动的时候会调用Worker类中的run方法。

Worker继承了AQS,使用AQS来实现独占锁的功能。为什么不使用ReentrantLock来实现呢?

Worker中tryAcquire方法不允许重入的(当前线程同一时间仅能被一个任务占用),而ReentrantLock的tryAcquire允许重入

  1. lock方法一旦获取了独占锁,表示当前线程正在执行任务中;如果正在执行任务,则不应该中断线程
  2. 如果线程不是独占锁的状态,即空闲的状态,说明它没有在处理任务,则可以对该线程进行中断
  3. 线程池在执行shutdown方法或tryTerminate方法时会调用interruptIdleWorkers方法来中断空闲的线程,interruptIdleWorkers方法会使用tryLock方法来判断线程池中的线程是否是空闲状态;
  4. 之所以设置为不可重入,是因为我们不希望任务在调用像setCorePoolSize这样的线程池控制方法时重新获取锁。如果使用ReentrantLock,它是可重入的,这样如果在任务中调用了如setCorePoolSize这类线程池控制的方法,会中断正在运行的线程。
  5. 所以,Worker继承自AQS,用于判断线程是否空闲以及是否可以被中断。

Worker在构造方法中执行了setState(-1);,把state变量设置为-1,为什么这么做呢?

因为AQS中默认的state是0,如果创建了一个Worker对象,还没有执行任务时,这时就不应该被中断即防止Worker对象未调用runWorker方法,就被中断

runWorker()方法

//每个线程都会循环获取队列中的任务,并调用任务的run()方法
final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    //获取第一个任务,可能为空
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // (将state从-1修改为0,允许中断操作)
    //是否因为异常退出循环
    boolean completedAbruptly = true;
    try {
        //如果task=null,则获取队列中任务,即:队列任务最后执行
        while (task != null || (task = getTask()) != null) {
            w.lock();
            //如果线程池正在停止,那么要保证当前线程是中断状态;
            //如果不是的话,则要保证当前线程不是中断状态;
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();//中断当前线程
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    //当前线程调用传入线程池任务的run方法
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

STOP
        //001
        //状态切换:调用线程池的shutdownNow()方法。(RUNNING、SHUTDOWN-->STOP)
        //状态说明:不再接受新任务,不再处理队列中的任务,并且中断当前执行的任务。
        private static final int STOP       =  1 << COUNT_BITS;

runWorker方法的执行过程:

  1. while循环不断地通过getTask()方法获取任务,getTask()方法从阻塞队列中取任务
  2. 如果线程池正在停止,那么要保证当前线程是中断状态,否则要保证当前线程不是中断状态;
  3. 调用task.run()执行任务;
  4. 如果task为null则跳出循环,执行processWorkerExit()方法;
  5. runWorker方法执行完毕,也代表着Worker中的run方法执行完毕,销毁线程。

这里的beforeExecute方法和afterExecute方法在ThreadPoolExecutor类中是空的,留给子类来实现。

completedAbruptly变量来表示在执行任务过程中是否出现了异常,在processWorkerExit方法中会对该变量的值进行判断。

getTask()方法

private Runnable getTask() {
    //上次从阻塞队列中(使用poll方法)获取任务,是否超时。
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

    /*
     * 如果线程池状态rs >= SHUTDOWN,也就是非RUNNING状态,再进行以下判断:
     * 1. rs >= STOP,线程池是否正在stop;
     * 2. 阻塞队列是否为空。
     * 如果以上条件满足,则将workerCount减1并返回null。【返回null,调用getTask处,会跳出循环,并对线程销毁处理】
     * 因为如果当前线程池状态的值是SHUTDOWN或以上时,不允许再向阻塞队列中添加任务。
     */
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();//使用CAS方式操作,将Worker数量减1
            //返回null,终止runWork方法中循环
            return null;
        }

        //wc:线程池中线程数
        int wc = workerCountOf(c);

        // timed:判断是否需要进行超时控制。
        // allowCoreThreadTimeOut默认是false,即:核心线程不进行超时回收;
        // wc > corePoolSize,表示当前线程池中的线程数量大于核心线程数量;
        // 对于超过核心线程数量的这些线程,需要进行超时控制
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

    /*
     * wc > maximumPoolSize的情况是因为可能在此方法执行阶段同时执行了setMaximumPoolSize方法;
     * timed && timedOut 如果为true,表示当前操作需要进行超时控制,并且上次从阻塞队列中获取任务发生了超时
     * 如果有效线程数量大于1,或者阻塞队列是空的,那么尝试将workerCount减1;
     * 如果减1失败,执行continue,进行重试。
     * 如果wc == 1时,也就说明当前线程是线程池中唯一的一个线程了。
     */
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }
        /*
         * 根据timed来判断,如果为true,则通过阻塞队列的poll方法进行超时控制,如果在keepAliveTime时间内没有获取到任务,则返回null;
         * 否则通过take方法,如果这时队列为空,则take方法会阻塞直到队列不为空。【保证核心线程,不会被销毁的方式】。
         * keepAliveTime:线程池初始化时,设置的参数
         */
        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            // 如果获取任务时当前线程发生了中断,则设置timedOut为false并返回循环重试
            timedOut = false;
        }
    }
}

总结:

第二个if判断目的是控制线程池的有效线程数量

在执行execute方法时,如果当前线程池的线程数量超过了corePoolSize且小于maximumPoolSize,并且workQueue已满时,则可以增加工作线程,

但这时如果超时没有获取到任务,也就是timedOut为true的情况,说明workQueue已经为空,也就说明了当前线程池中不需要那么多线程来执行任务了,可以把多于corePoolSize数量的线程销毁掉,保持线程数量在corePoolSize即可。

什么时候会销毁?当然是runWorker方法执行完之后,也就是Worker中的run方法执行完,由JVM自动回收。

getTask方法返回null时,在runWorker方法中会跳出while循环,然后会执行processWorkerExit方法(进行线程回收)。

processWorkerExit()方法

private void processWorkerExit(Worker w, boolean completedAbruptly) {

    // 如果completedAbruptly值为true,则说明线程执行时出现了异常,需要将workerCount减1;
    // 如果线程执行时没有出现异常,说明在getTask()方法中已经完成对workerCount进行了减1操作。
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();


    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        //统计完成的任务数
        completedTaskCount += w.completedTasks;
        //从线程池中移除工作线程
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }

    // 对线程池进行判断,尝试设置线程池为最终态
    //如果(SHUTDOWN且线程池和队列为空)或(STOP且线程池为空),则转换为终止状态(TERMINATED)。
    tryTerminate();


    int c = ctl.get();

/*
* 保留线程池中线程操作
* 当线程池是RUNNING或SHUTDOWN状态时(c<STOP),如果worker是异常结束,直接通过addWorker方式想线程池添加线程;
* 如果allowCoreThreadTimeOut=true,并且等待队列有任务,至少保留一个worker;
* 如果allowCoreThreadTimeOut=false,workerCount不少于corePoolSize。
*/
    if (runStateLessThan(c, STOP)) {
        if (!completedAbruptly) {//正常结束。completedAbruptly=false
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        addWorker(null, false);
    }
}

tryTerminate()方法

如果(SHUTDOWN且线程池和队列为空)或(STOP且线程池为空),则转换为终止状态(TERMINATED)。
如果有资格终止,但workerCount不为零,则中断空闲工作进程,以确保停机信号传播。
必须在可能导致终止的任何操作之后调用此方法,这些操作包括减少worker计数或在关机期间从队列中删除任务。

该方法是非私有的,允许从ScheduledThreadPoolExecutor进行访问    
final void tryTerminate() {
        for (;;) {
            int c = ctl.get();
            
            //校验是否进行终止线程池操作
            //1.线程池为RUNNING状态。(不能终止)
            //2.线程池为TERMINATED状态(已经终止)。
            //3.线程池为SHUTDOWN状态且队列不为null(不能终止,需要处理完队列任务)
            if (isRunning(c) ||
                runStateAtLeast(c, TIDYING) ||
                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                return;
            
            //符合终止条件。判断线程池中是否还存在线程,如果存在,则设置中断标识(调用interrupt方法)
            if (workerCountOf(c) != 0) { // Eligible to terminate
                //中断线程池中线程
                interruptIdleWorkers(ONLY_ONE);
                return;
            }
            
            //设置线程池终止状态
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                        terminated();
                    } finally {
                       //设置线程池状态为终态。
                        ctl.set(ctlOf(TERMINATED, 0));
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
            // else retry on failed CAS
        }
    }

 

 

文章来自个人专栏
文章 | 订阅
0条评论
0 / 1000
请输入你的评论
0
0