线程池ThreadPoolExecutor源码解析

线程池ThreadPoolExecutor源码解析

介绍-类注释

JDK源码里有大量的注释,能很好的帮助我们理解代码行为,了解一个类时,通常应该先看一遍注释。本文源码于JDK1.8,不同版本略有不同。

An {@link ExecutorService} that executes each submitted task using one of possibly several pooled threads, normally configured using {@link Executors} factory methods.

ThreadPoolExecutor是一个ExecutorService实现,(作用是)用池里一个可用的线程来执行提交的任务,(线程池实例)通常通过Executors的工厂方法获得。

Thread pools address two different problems: they usually provide improved performance when executing large numbers of asynchronous tasks,due to reduced per-task invocation overhead,and they provide a means of bounding and managing the resources,including threads, consumed when executing a collection of tasks.

线程池解决了两个不同的问题:用于减少了每个任务的调用开销(不用每次都创建新的线程实例),线程池通常在执行大量异步任务时提供了更好的性能,并且提供了一种绑定和管理资源(包括执行任务集合时消耗的线程)的方法。

Each {@code ThreadPoolExecutor} also maintains some basic statistics, such as the number of completed tasks.

每一个ThreadPoolExecutor也维护了一些基本统计(信息),例如完成的任务数量。

To be useful across a wide range of contexts, this class provides many adjustable parameters and extensibility hooks.

为了适应更多的环境,这个类提供了许多可调整参数和可扩展钩子。

However, programmers are urged to use the more convenient {@link Executors} factory methods {@link Executors#newCachedThreadPool} (unbounded thread pool, with automatic thread reclamation), {@link Executors#newFixedThreadPool} (fixed size thread pool) and {@link Executors#newSingleThreadExecutor} (single background thread), that preconfigure settings for the most common usage scenarios.

但是我们鼓励程序员使用更方便的Executors工厂方法Executors#newCachedThreadPool(无界线程池,带有自动线程回收功能),Executors#newFixedThreadPool(固定大小线程池),Executors#newSingleThreadExecutor(单后台线程),是最常见的使用场景预配置。

Otherwise, use the following guide when manually configuring and tuning this class:

否则根据如下指南配置和调整该类

这一段介绍ThreadPoolExecutor这个类是什么,即用来执行用户提交的线程。它解决两个问题,一是减少任务执行开销,因为线程池可以提前创建线程,并反复利用已有线程,二是它可以管理所有线程,方便控制线程的数量、行为和统计。如果由业务代码自己执行线程处理任务,则难以实现这些。

还告诉我们,通过工厂方法提供了三种创建线程池,并推荐用户使用。如果仍不满足需求,可以通过以下的配置来创建线程实例。

Core and maximum pool sizes(核心和最大池大小)

A {@code ThreadPoolExecutor} will automatically adjust the pool size (see {@link #getPoolSize}) according to the bounds set by corePoolSize (see {@link #getCorePoolSize}) and maximumPoolSize (see {@link #getMaximumPoolSize}).

ThreadPoolExecutor会根据corePoolSize和maximumPoolSize两个参数自动调整池大小。

When a new task is submitted in method {@link #execute(Runnable)}, and fewer than corePoolSize threads are running, a new thread is created to handle the request, even if other worker threads are idle.

当用#execute(Runnable)方法提交一个新任务,并且运行中线程数小于corePoolSize时,即使其他工作线程空闲,也会创建一个新线程处理请求。

If there are more than corePoolSize but less than maximumPoolSize threads running, a new thread will be created only if the queue is full.

如果运行中线程数多于corePoolSize小于maximumPoolSize,则仅当队列已满时才会创建新线程

By setting corePoolSize and maximumPoolSize the same, you create a fixed-size thread pool.

通过设置相同的corePoolSize和maximumPoolSize,可以创建一个固定大小的线程池。

By setting maximumPoolSize to an essentially unbounded value such as {@code Integer.MAX_VALUE}, you allow the pool to accommodate an arbitrary number of concurrent tasks.

通过将maximumPoolSize设置为一个基本上无界的值,例如Integer.MAX_VALUE,你可以允许线程池容纳任意数量的并发任务。(如果工作队列是无限的,那maximumPoolSize就没用了)

Most typically, core and maximum pool sizes are set only upon construction, but they may also be changed dynamically using {@link #setCorePoolSize} and {@link #setMaximumPoolSize}.

通常,core和maximum大小仅通过构造方法设置,但也可以使用setCorePoolSize和setMaximumPoolSize进行动态更改

创建一个线程池实例需要7个参数(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)这里介绍了两个,即核心池大小和最大池大小。对于这两个参数,需要配置ThreadPoolExecutor处理新提交任务的流程来解释。

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
	/**
     * Executes the given task sometime in the future.
     * 在某个时候执行提交的任务
     * The task may execute in a new thread or in an existing pooled thread.
     * 任务可能在一个新线程或pool里已存在的线程中执行
     * If the task cannot be submitted for execution, either because this executor has been shutdown or because its capacity has been reached,
     * 如果任务无法提交执行,可能是已关闭或者容量已满
     * the task is handled by the current {@code RejectedExecutionHandler}.
     * 任务会交给RejectedExecutionHandler处理
     *
     * @param command the task to execute
     * @throws RejectedExecutionException at discretion of
     *                                    {@code RejectedExecutionHandler}, if the task
     *                                    cannot be accepted for execution
     * @throws NullPointerException       if {@code command} is null
     */
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) { //当前工作线程数小于corePoolSize
        if (addWorker(command, true)) //添加工作,使用corePoolSize,增加核心线程
            return; //添加完成结束
        c = ctl.get(); //如果失败,获取最新状态
    }
    if (isRunning(c) && workQueue.offer(command)) { //pool运行中,且能放入队列
        int recheck = ctl.get(); //加入队列成功,获取最新状态
        if (!isRunning(recheck) && remove(command)) //如果状态变成非运行且能从队列移除(加入队列过程中,线程池状态改变,且任务还在队列中,则拒绝,如果不在队列中,说明已经开始执行)
            reject(command);
        else if (workerCountOf(recheck) == 0) //正好工作线程都死亡
            addWorker(null, false);
    } else if (!addWorker(command, false)) //再次尝试添加工作线程,使用maximumPoolSize,增加最大线程
        reject(command); //最大线程数也满了,执行拒绝策略
}

如上面代码所示,当新任务提交时,如果工作线程数小于corePoolSize,就以corePoolSize为界尝试添加新线程。如果失败,就尝试放入等待队列,如果入队成功,还需要判断线程池状态,选择拒绝或者创建新线程来消费队列。如果入队失败,则已maximumPoolSize为界尝试创建线程,如果仍然失败,就执行拒绝策略。

流程如下:

graph TD
   start[提交任务] --> spare{线程数 < corePoolSize}
   spare -- YES --> createCore{创建core线程}
   spare -- NO --> queue{尝试入队}
   createCore -- YES --> stop[结束]
   createCore -- NO --> queue{尝试入队}
   queue -- YES --> isStop{线程池状态}
   isStop -- 已停止 --> reject[拒绝]
   isStop -- 线程为0 --> addWorker[创建core线程] --> stop[结束]
   queue -- NO --> createMax{创建max线程}
   createMax -- YES --> stop[结束]
   createMax -- NO --> reject[拒绝]
   reject --> stop[结束]
   

On-demand construction(按需求构造)

By default, even core threads are initially created and started only when new tasks arrive, but this can be overridden dynamically using method {@link #prestartCoreThread} or {@link #prestartAllCoreThreads}. You probably want to prestart threads if you construct the pool with a non-empty queue.

默认情况下,即使core线程最初是在新任务到达时创建和启动的,但可以使用方法#prestartCoreThread或#prestartAllCoreThreads来动态的控制。如果你使用了一个非空队列来构建线程池,可能需要预先启动线程。

如果你创建线程池实例时,使用了非空的队列,队列里的任务并不会被执行。因为ThreadPoolExecutor只有在新任务提交时才尝试创建一个工作线程,当执行完这个任务后,线程才尝试从队列里获取任务。

    /**
     * Checks if a new worker can be added with respect to current pool state and the given bound (either core or maximum).
     * 检查是否可以根据当前状态和指定的范围(core或maximum)添加一个新的worker
     * If so,the worker count is adjusted accordingly, and, if possible, a new worker is created and started, running firstTask as its first task.
     * 如果是这样,就调整worker count,并且,如果可以,创建并启用一个新的worker,运行firstTask作为它的第一个任务
     * This method returns false if the pool is stopped or eligible to shut down.
     * 如果pool已经关闭或者适合关闭,方法返回false
     * It also returns false if the thread factory fails to create a thread when asked.
     * 也有可能会返回false,当thread factory创建新线程失败
     * If the thread creation fails, either due to the thread factory returning null,
     * or due to an exception (typically OutOfMemoryError in Thread.start()), we roll back cleanly.
     * 如果创建线程失败或者thread factory返回null,或者由于异常(通常是Thread.start()中的OutOfMemoryError),我们会回滚干净
     *
     * @param firstTask the task the new thread should run first (or null if none).
     *                  新线程应该首先运行的任务(如果没有就是null)。
     *                  Workers are created with an initial first task(in method execute()) to bypass queuing when there are fewer
     *                  than corePoolSize threads (in which case we always start one),
     *                  or when the queue is full (in which case we must bypass queue).
     *                  当线程数少于corePoolSize(此时总是启动一个)或queue已经满了(此时必须绕过queue)的时候,
     *                  会创建一个work,包含一个初始的first task(在execute()方法里)。
     *                  Initially idle threads are usually created via prestartCoreThread or to replace other dying workers.
     *                  最初的空闲线程通常是通过prestartCoreThread()或替换其他死亡worker而创建的
     * @param core      if true use corePoolSize as bound, else maximumPoolSize.
     *                  (A boolean indicator is used here rather than a value to ensure reads of fresh values after checking other pool state).
     *                  如果ture就用corePoolSize作为边界,否则用maximumPoolSize。(此处使用Boolean而不是值,是为了确保检查其他pool状态后,仍能读取到刷新后的值)
     * @return true if successful
     */
    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (; ; ) {
            int c = ctl.get();
            int rs = runStateOf(c);//获取运行状态

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN //大于SHUTDOWN说明不是运行状态
                    && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()))
                // 非运行状态 && !(关闭状态 && 任务为null && 队列非空)
                // 1,状态是运行:继续 2,状态不是运行:当状态不是关闭或者任务为不是null或者队列不是空时直接返回
                // 所谓仅在必要时检查队列是否为空:
                // 1,状态为运行,不需要检查,因为队列满了还有拒绝处理器
                // 2,如果状态为关闭,不需要检查,因为线程池还没有启动
                // 3,如果任务为null,不需要检查,因为这是在提前创建worker
                return false;

            for (; ; ) {
                int wc = workerCountOf(c);//获取当前work数量,第一次默认为0
                if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
                    //如果work数量>=‭536870911(构造方法没有限制,所以可能超过‭536870911‬)‬ 或 >=指定范围,无法再添加
                    return false;
                if (compareAndIncrementWorkerCount(c))//尝试增加work数量
                    break retry;//添加成功,继续
                //如果添加失败,说明work数量或者pool状态变了
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs) //重新检查状态
                    continue retry;//状态变了,从新执行(获取、检查)
                // else CAS failed due to workerCount change; retry inner loop
                //状态没变,只是worker数量变了,循环,获取当前值,递增
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask); //创建worker,传入任务
            final Thread t = w.thread;
            if (t != null) { //ThreadFactory可能返回null
                final ReentrantLock mainLock = this.mainLock;ThreadPoolExecutor
                mainLock.lock();
                try {
                    // Recheck while holding lock.  获取锁后再次检查
                    // Back out on ThreadFactory failure or if shut down before lock acquired.
                    // 检查退出ThreadFactory失败或者在获取锁之前关闭。
                    int rs = runStateOf(ctl.get());

                    //pool状态是运行,或者是关闭且firstTask==null(调用prestartCoreThread触发的)
                    if (rs < SHUTDOWN || //状态为RUNNING
                            (rs == SHUTDOWN && firstTask == null)) { //执行过程调用了shutdown,状态还没有更新为TERMINATED
                        if (t.isAlive()) // precheck that t is startable    检查线程是否已经被激活
                            throw new IllegalThreadStateException();
                        workers.add(w);//保存work
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s; //记录历史最大值
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start(); //启动work线程,执行提交的任务
                    workerStarted = true;
                }
            }
        } finally {
            if (!workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

提交任务时,会进行一系列校验,创建并启动新线程,执行提交的任务。

看看Work是如何执行任务的:

	private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
        /**
         * Creates with given first task and thread from ThreadFactory.
         * 根据指定的task创建一个work
         *
         * @param firstTask the first task (null if none)
         */
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker 禁止中断,直到runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

        /**
         * Delegates main run loop to outer runWorker
         */
        public void run() {
            runWorker(this);
        }
    }
    /**
     * Main worker run loop.  Repeatedly gets tasks from queue and
     * executes them, while coping with a number of issues:
     * 主要的work运行循环。反复从队列中获取任务并执行它们,同时应对以下几个问题:
     * <p>
     * 1. We may start out with an initial task, in which case we don"t need to get the first one.
     * 我们可能会从初始化的任务开始,在这种情况下,我们不需要获得第一个任务。
     * Otherwise, as long as pool is running, we get tasks from getTask.
     * 否则,只要池正在运行,我们就从getTask()获得任务。
     * If it returns null then the worker exits due to changed pool state or configuration parameters.
     * 如果它返回null,则由于更改池状态或配置参数导致的,worker退出。
     * Other exits result from exception throws in external code, in which case completedAbruptly holds,
     * which usually leads processWorkerExit to replace this thread.
     * 其他退出是由外部代码中的异常抛出引起的,在这种情况下,completionAbruptly成立,这通常会导致processWorkerExit替换该线程。
     * <p>
     * 2. Before running any task, the lock is acquired to prevent other pool interrupts while the task is executing,
     * and then we ensure that unless pool is stopping, this thread does not have its interrupt set.
     * 在运行任何任务之前,获取该锁以防止任务执行期间发生其他池中断,然后确保除非池正在停止,则此线程不能中断为设置。
     * <p>
     * 3. Each task run is preceded by a call to beforeExecute, which might throw an exception,
     * in which case we cause thread to die(breaking loop with completedAbruptly true) without processing the task.
     * 每个任务运行之前都会调用beforeExecute,这可能会引发异常,
     * 在这种情况下,我们会让线程死掉(使completionAbruptly成为true退出循环)而不处理任务。
     * <p>
     * 4. Assuming beforeExecute completes normally, we run the task,gathering any of its thrown exceptions to send to afterExecute.
     * 假设beforeExecute正常完成,我们运行任务,然后收集任何(运行中)抛出的异常以发送到afterExecute。
     * We separately handle RuntimeException, Error (both of which the specs guarantee that we trap) and arbitrary Throwables.
     * 我们分别处理RuntimeException,Error(规范保证我们可以捕获它们)和任意Throwables。
     * Because we cannot rethrow Throwables within Runnable.run, we wrap them within Errors on the way out (to the thread"s UncaughtExceptionHandler).
     * 由于我们无法在Runnable.run中再次抛出Throwable,我们将它们包装在Errors的路径中(输出到线程的UncaughtExceptionHandler)。
     * Any thrown exception also conservatively causes thread to die.
     * 任何抛出的异常也会保守地导致线程死亡。
     * <p>
     * 5. After task.run completes, we call afterExecute, which may also throw an exception, which will also cause thread to die.
     * According to JLS Sec 14.20, this exception is the one that will be in effect even if task.run throws.
     * task.run执行完后,我们调用afterExecute,它也可能会抛出一个异常,这也会导致线程死亡。
     * 根据JLS Sec 14.20,这个异常是即使task.run抛出也会生效的异常。
     * <p>
     * The net effect of the exception mechanics is that afterExecute and the thread"s UncaughtExceptionHandler have as accurate
     * information as we can provide about any problems encountered by user code.
     * 异常机制的最终结果是afterExecute和线程的UncaughtExceptionHandler拥有我们可以提供的有关用户代码遇到的任何问题的准确信息。
     *
     * @param w the worker
     */
    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {    //循环获取新任务
                w.lock();
                // If pool is stopping, ensure thread is interrupted;如果pool正在停止(STOP、TIDYING、TERMINATED),确保线程中断
                // if not, ensure thread is not interrupted.如果不是,确保线程不中断
                // This requires a recheck in second case to deal with shutdownNow race while clearing interrupt
                // 第二种情况需要进行重新检查,以便在清除中断时处理shutdownNow竞争
                if ((runStateAtLeast(ctl.get(), STOP) ||
                        (Thread.interrupted() &&
                                runStateAtLeast(ctl.get(), STOP))) &&
                        !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run(); //执行任务
                    } catch (RuntimeException x) {
                        thrown = x;
                        throw x;
                    } catch (Error x) {
                        thrown = x;
                        throw x;
                    } catch (Throwable x) {
                        thrown = x;
                        throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            //处理work结束后的工作
            processWorkerExit(w, completedAbruptly);
        }
    }

Work启动后,先执行firstTask,然后从等待队列中获取任务。

所以如果使用了非空的工作队列创建线程池实例,必须手动创建线程,否则,如果没有新任务提交,队列中的任务永远不会被执行。

Creating new threads(创建新线程)

(线程池)使用ThreadFactory的工厂方法创建新线程。

如果没有指定(ThreadFactory),则使用Executors#defaultThreadFactory,这样创建的线程,会全部位于同一ThreadGroup中,并且具有相同的NORM_PRIORITY优先级和非守护进程状态。

通过提供不同的ThreadFactory,可以更改线程的名称,线程组,优先级,守护状态等。

如果ThreadCactory未能创建新线程,即从newThread中返回null,任务执行会继续,但可能无法执行任何任务。(

线程应该拥有名叫“modifyThread”的RuntimePermission。如果工作线程或使用该池的其他线程不具备此权限,则服务可能会降级:配置更改可能无法及时生效,并且关闭池可能会保持可终止但尚未完成的状态。

创建一个线程池实例所需的参数之一ThreadFactory,即用于创建线程的工厂实例。如果没有指定,会使用默认实现Executors.defaultThreadFactory()。

通常情况下应该指定自定义的ThreadFactory,指定线程名字格式,配置输出线程名的日志框架,方便调试和监控。

还应该保证newThread()方法不要返回null,这会导致addWorker()方法返回false,取决于执行位置,任务会进入等待队列或者触发拒绝策略。(见execute源码)

Keep-alive times(存活时间)

If the pool currently has more than corePoolSize threads, excess threads will be terminated if they have been idle for more than the keepAliveTime (see {@link #getKeepAliveTime(TimeUnit)}).

如果池当前线程多余corePoolSize,那么多余的线程在空闲时间超过keepAliveTime时会被终止(请参阅#getKeepAliveTime(TimeUnit))。

This provides a means of reducing resource consumption when the pool is not being actively used.

这提供了一种在池不活跃时减少资源消耗的方法。

If the pool becomes more active later,new threads will be constructed.

如果池在以后变得更加活跃,则将构建新线程。

This parameter can also be changed dynamically using method {@link #setKeepAliveTime(long, TimeUnit)}.

此参数也可以使用方法#setKeepAliveTime(long,TimeUnit)进行动态更改。

Using a value of {@code Long.MAX_VALUE} {@link TimeUnit#NANOSECONDS} effectively disables idle threads from ever terminating prior to shut down.

设置Long.MAX_VALUE和TimeUnit#NANOSECONDS,可以防止空闲线程在线程池关闭之前终止。

By default, the keep-alive policy applies only when there are more than corePoolSize threads.

默认情况下,保持活动策略仅适用于线程多于corePoolSize的情况。

But method {@link #allowCoreThreadTimeOut(boolean)} can be used to apply this time-out policy to core threads as well,so long as the keepAliveTime value is non-zero.

但是,只要keepAliveTime值不为零,方法#allowCoreThreadTimeOut(boolean)也可用于将此超时策略应用于核心线程。

创建一个线程池实例所需的参数 long keepAliveTime,TimeUnit unit,表示当超过corePoolSize数量的线程在空闲keepAliveTime时间之后,会被终止,以节约资源。

    /**
     * Performs blocking or timed wait for a task, depending on current configuration settings, or returns null if this worker must exit because of any of:
     * 根据当前的配置,阻塞或者定时获取一个任务,或者当工作线程因为如下原因需要退出时:
     * 1. There are more than maximumPoolSize workers (due to a call to setMaximumPoolSize).
     * 1. 工作线程数量超过了maximumPoolSize值(由于调用setMaximumPoolSize)。
     * 2. The pool is stopped.
     * 2. 线程池停止了
     * 3. The pool is shutdown and the queue is empty.
     * 3. 线程池关闭并且等待队列为空
     * 4. This worker timed out waiting for a task, and timed-out
     * 4. 该工作线程等待任务超时
     * workers are subject to termination (that is, {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
     * both before and after the timed wait, and if the queue is non-empty, this worker is not the last thread in the pool.
     *
     * @return task, or null if the worker must exit, in which case workerCount is decremented
     */
    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (; ; ) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary. 仅在必要时检查队列是否为空。
            // 当线程池停止,或者关闭且等待队列为空
            // 写作 rs >= STOP || (rs >= SHUTDOWN && workQueue.isEmpty()) 更好理解
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // Are workers subject to culling? 工作线程会被淘汰吗?
            // 当开启了允许所有线程超时或者当前线程数超过corePoolSize时,允许线程终止
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            //如果工作线程数已经超过maximumPoolSize或当前线程已经超时 且 工作线程>1或等待队列为空
            if ((wc > maximumPoolSize || (timed && timedOut)) //如果已经超时了要终止,如果超过maximumPoolSize不超时也要终止
                    && (wc > 1 || workQueue.isEmpty())) { //如果队列不为空就不能终止线程,因为当前肯定不是STOP。如果工作线程>1就不用判断队列,因为还有别的线程会处理。
                //则尝试减工作线程数量
                if (compareAndDecrementWorkerCount(c))
                    return null; //成功,返回null,终止线程
                continue; //失败,其它线程处理了,继续循环再检查一次
            }

            try {
                //如果允许超时,就等待,如果不允许就一直阻塞
                Runnable r = timed ?
                        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                        workQueue.take();
                if (r != null)
                    return r;
                //获取到null,说明超时了,在下个循环尝试执行终止
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

可见在获取任务时,会判断是否需要处理超时,如果需要,会调用poll方法获取任务,当超过指定时间queue返回null,在下次循环判断其他条件,如果符合要求,则把工作线程数量减1,并返回null。在runWorker()方法中(见上文),结束循环,执行processWorkerExit()。

/**
     * Performs cleanup and bookkeeping for a dying worker.
     * 为垂死的worker进行清理和记账。
     * Called only from worker threads.
     * 只能工作线程调用。
     * Unless completedAbruptly is set,assumes that workerCount has already been adjusted to account for exit.
     * 除非completedAbruptly为true,否则假定workerCount已经被调整以考虑退出。
     * This method removes thread from worker set,
     * and possibly terminates the pool or replaces the worker if either it exited due to user task exception or if fewer than
     * corePoolSize workers are running or queue is non-empty but there are no workers.
     * 此方法从工作集中删除线程,并可能结束池。如果由于用户任务异常或运行中worker少于corePoolSize,或队列非空但没有worker,就替换worker,
     *
     * @param w                 the worker
     * @param completedAbruptly if the worker died due to user exception 如果是用户异常导致的work死亡
     */
    private void processWorkerExit(Worker w, boolean completedAbruptly) {
        if (completedAbruptly) // If abrupt, then workerCount wasn"t adjusted 如果突然死亡,那么workerCount没有被调整。如果是正常退出,在getTask()方法里已经减过了
            decrementWorkerCount(); //workerCount--

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;
            workers.remove(w); //移除worker
        } finally {
            mainLock.unlock();
        }

        tryTerminate(); //尝试终止pool,通常情况下状态为运行中,不会发生什么

        int c = ctl.get();
        if (runStateLessThan(c, STOP)) { //小于STOP,等于RUNNING或SHUTDOWN

            if (!completedAbruptly) { //如果不是突然死亡
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && !workQueue.isEmpty())
                    min = 1;
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
                //allowCoreThreadTimeOut=true,队列为空,workerCount >= 0
                //allowCoreThreadTimeOut=true,队列非空,workerCount >= 1
                //allowCoreThreadTimeOut=false,workerCount >= corePoolSize
            }
            // 创建工作线程,保证线程数符合规定
            addWorker(null, false);
        }
    }

Queuing(等待队列)

Any {@link BlockingQueue} may be used to transfer and hold

任何BlockingQueue都可用于传输和保存提交的任务

The use of this queue interacts with pool sizing:

此队列的作用与池大小互相影响:

  • If fewer than corePoolSize threads are running, the Executor always prefers adding a new thread rather than queuing.如果运行中线程数少于corePoolSize,那么Executor总是倾向于添加新线程而不是排队。
  • If corePoolSize or more threads are running, the Executor always prefers queuing a request rather than adding a new thread.

    如果运行中线程大于等于corePoolSize,那么Executor总是首选排队请求而不是添加新线程。

  • If a request cannot be queued, a new thread is created unless this would exceed maximumPoolSize, in which case, the task will be rejected.

    如果请求不能排队,则会创建一个新线程,除非这会超过maximumPoolSize,在这种情况下,该任务将被拒绝

排队有三种基本策略:

  • Direct handoffs 直接传递

    A good default choice for a work queue is a {@link SynchronousQueue} that hands off tasks to threads without otherwise holding them. 工作队列的一个很好的默认选择是一个SynchronousQueue(每个插入操作必须等待另一个线程的对应移除操作 ,反之亦然,当没有线程等待获取时,offer返回false),它将任务交给线程而不需要保留。 Here, an attempt to queue a task will fail if no threads are immediately available to run it, so a new thread will be constructed. 这一点上,如果没有立即可用的线程来运行它,那么排队任务的尝试将失败,因此将构建新的线程。 此策略在处理可能具有内部依赖项的请求集时避免锁定。 Direct handoffs generally require unbounded maximumPoolSizes to avoid rejection of new submitted tasks. 直接传递通常需要无限制的maximumPoolSizes来避免拒绝新提交的任务。 This in turn admits the possibility of unbounded thread growth when commands continue to arrive on average faster than they can be processed. 这反过来又承认,当命令以平均速度持续到达的速度超过可处理速度时,线程的无限增长成为可能。

  • Unbounded queues 无界队列

    Using an unbounded queue (for example a {@link LinkedBlockingQueue} without a predefined capacity) will cause new tasks to wait in the queue when all corePoolSize threads are busy. 使用无界队列(例如,没有预定义容量的{@link LinkedBlockingQueue})将导致新任务在所有corePoolSize线程繁忙时在队列中等待。 Thus, no more than corePoolSize threads will ever be created. (And the value of the maximumPoolSize therefore doesn"t have any effect.) 因此,不会创建多余corePoolSize的线程。(所以maximumPoolSize的值没有任何作用。) This may be appropriate when each task is completely independent of others, so tasks cannot affect each others execution; 当每个任务完全独立于其他任务时,这可能是合适的,因此任务不能影响其他任何执行; for example, in a web page server. 例如,在网页服务器中 While this style of queuing can be useful in smoothing out transient bursts of requests, it admits the possibility of unbounded work queue growth when commands continue to arrive on average faster than they can be processed. 虽然这种排队方式可以有效地消除瞬时爆发请求,但是它允许当命令持续以平均速度快于可以处理的速度持续到达时,工作队列有无限制的增长的可能。

  • Bounded queues 有界队列

    A bounded queue (for example, an {@link ArrayBlockingQueue}) helps prevent resource exhaustion when used with finite maximumPoolSizes, but can be more difficult to tune and control. 一个有界队列(例如,一个ArrayBlockingQueue),用有限的maximumPoolSizes时有助于防止资源枯竭,但更难以调整和控制。 Queue sizes and maximum pool sizes may be traded off for each other: 队列大小和最大池大小可以相互权衡: Using large queues and small pools minimizes CPU usage, OS resources, and context-switching overhead, but can lead to artificially low throughput. 使用大队列和小池可以最大限度地减少CPU使用、OS资源和上下文切换开销,但可能导致(人为的)低吞吐量。 If tasks frequently block (for example if they are I/O bound), a system may be able to schedule time for more threads than you otherwise allow. 如果任务频繁阻塞(例如,如果它们受I/O限制),则系统可能能够安排比您原先允许的线程更多的时间。 Use of small queues generally requires larger pool sizes, which keeps CPUs busier but may encounter unacceptable scheduling overhead, which also decreases throughput. 使用小队列通常需要更大的池大小,这会使CPU更繁忙,但可能会遇到不可接受的调度开销,这也会降低吞吐量。

创建一个线程池实例所需的参数BlockingQueue<Runnable> workQueue,用于保存暂时无法执行的任务。

关于队列与池大小的相互影响,在上文流程图已经说明了。

三种队列类型注释已经解释清楚了,只需按照场景选择即可。

Rejected tasks(拒绝任务)

New tasks submitted in method {@link #execute(Runnable)} will be <em>rejected</em> when the Executor has been shut down, and also when the Executor uses finite bounds for both maximum threads and work queue capacity, and is saturated. 当执行器已关闭时,或者Executor的最大线程数和使用的有限容量工作队列已饱和时,将拒绝在方法{@link #execute(Runnable)}中提交的新任务。 In either case, the {@code execute} method invokes the {@link RejectedExecutionHandler#rejectedExecution(Runnable, ThreadPoolExecutor)} method of its {@link RejectedExecutionHandler}. 无论哪种情况,execute方法都会执行RejectedExecutionHandler的RejectedExecutionHandler#rejectedExecution(Runnable, ThreadPoolExecutor)方法 Four predefined handler policies are provided: (默认)提供四个预定义的处理程序策略

  • In the default {@link ThreadPoolExecutor.AbortPolicy}, the handler throws a runtime {@link RejectedExecutionException} upon rejection.

    默认的策略ThreadPoolExecutor.AbortPolicy,任务被拒绝后handler抛出一个异常RejectedExecutionException

  • In {@link ThreadPoolExecutor.CallerRunsPolicy}, the thread that invokes {@code execute} itself runs the task. This provides a simple feedback control mechanism that will slow down the rate that new tasks are submitted.

    ThreadPoolExecutor.CallerRunsPolicy,执行execute方法的线程会自己运行任务。这提供了一个简单的反馈控制机制,减缓新任务的提交速度。

  • In {@link ThreadPoolExecutor.DiscardPolicy}, a task that cannot be executed is simply dropped.

    ThreadPoolExecutor.DiscardPolicy,任务不会执行,直接丢弃

  • In {@link ThreadPoolExecutor.DiscardOldestPolicy}, if the executor is not shut down, the task at the head of the work queue is dropped, and then execution is retried (which can fail again, causing this to be repeated.)

    ThreadPoolExecutor.DiscardOldestPolicy,如果executor没有关闭,丢弃队列前端的任务,并重新尝试执行(可能仍然失败,所以不断重试)

It is possible to define and use other kinds of {@link RejectedExecutionHandler} classes. Doing so requires some care especially when policies are designed to work only under particular capacity or queuing policies.

可以定义和使用其他种类的RejectedExecutionHandler。这样做需要谨慎,当这个拒绝策略被设计用于特定的容量和排队策略时

创建一个线程池实例所需的参数RejectedExecutionHandler handler,当线程池已经关闭或者线程数已满,且等待队列也满的时候,会调用RejectedExecutionHandler .rejectedExecution()。提供了四个默认实现,覆盖了大部分场景,大部分情况会自定义RejectedExecutionHandler 以记录日志或报警。

    public void execute(Runnable command) {
        ...
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (!isRunning(recheck) && remove(command))
                reject(command); //线程池已停止,拒绝任务
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        } else if (!addWorker(command, false))
            reject(command); //工作线程和等待队列都满了,拒绝任务
    }

    final void reject(Runnable command) {
        handler.rejectedExecution(command, this);
    }

Hook methods(钩子方法)

This class provides {@code protected} overridable {@link #beforeExecute(Thread, Runnable)} and {@link #afterExecute(Runnable, Throwable)} methods that are called before and after execution of each task.

这个类提供了protected修饰的可覆盖的#beforeExecute(Thread, Runnable)和#afterExecute(Runnable, Throwable)方法,这些方法会在执行每个任务之前和之后被调用。

These can be used to manipulate the execution environment; for example, reinitializing ThreadLocals, gathering statistics, or adding log entries.

这些方法可以用来操作执行环境;例如,重新初始化ThreadLocals、收集统计信息或添加日志条目。

Additionally, method {@link #terminated} can be overridden to perform any special processing that needs to be done once the Executor has fully terminated.

此外,可以覆盖方法#terminated来执行任何需要在执行Executor完全终止后执行的特殊处理。

If hook or callback methods throw exceptions, internal worker threads may in turn fail and abruptly terminate.

如果钩子或回调方法引发异常,内部工作线程可能进而失败并直接终止。

ThreadPoolExecutor提供一些protected方法供用户覆盖添加功能,如beforeExecute和afterExecute可以在任务执行前,做一些操作,如记录日志、修改任务甚至参数、统计等。需要注意的是如果这些方法报错,会导致任务执行失败,因为这些方法没有单独捕获异常。

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock();
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                if ((runStateAtLeast(ctl.get(), STOP) ||
                        (Thread.interrupted() &&                                runStateAtLeast(ctl.get(), STOP))) &&
                        !wt.isInterrupted())
                    wt.interrupt();
                try {
                    //执行前置处理,异常会导致任务终止
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run(); //执行任务
                    } catch (RuntimeException x) {
                        thrown = x;
                        throw x;
                    } catch (Error x) {
                        thrown = x;
                        throw x;
                    } catch (Throwable x) {
                        thrown = x;
                        throw new Error(x);
                    } finally {
                        //执行后置处理
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

Queue maintenance( 队列维护)

Method {@link #getQueue()} allows access to the work queue for purposes of monitoring and debugging.

方法#getQueue()允许访问工作队列用于进行监视和调试。

Use of this method for any other purpose is strongly discouraged.

强烈建议不要将此方法用于任何其他目的。

Two supplied methods, {@link #remove(Runnable)} and {@link #purge} are available to assist in storage reclamation when large numbers of queued tasks become cancelled.</dd>

当大量排队的任务被取消时,提供了两种方法#remove(Runnable)和#purge用于协助存储回收

ThreadPoolExecutor提供getQueue方法用于获取队列,以用于检查排队中任务情况。

    /**
     * Returns the task queue used by this executor.
     * 返回任务队列
     * Access to the task queue is intended primarily for debugging and monitoring.
     * 对任务队列的访问主要用于调试和监视。
     * This queue may be in active use.
     * 此队列可能正在使用中。
     * Retrieving the task queue does not prevent queued tasks from executing.
     * 检索任务队列不会阻止排队的任务执行。
     *
     * @return the task queue
     */
    public BlockingQueue<Runnable> getQueue() {
        return workQueue;
    }

官方建议不要将等待队列用作其他目的,但是当工作线程数固定,并手动启动线程池时,可以直接向队列添加任务。

LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(2);
ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 0L, TimeUnit.SECONDS, queue);
executor.prestartAllCoreThreads();
Runnable task = () -> {
    while (true) {
    }
};
System.out.println(queue.offer(task));
System.out.println(queue.offer(task));
System.out.println(queue.offer(task));
System.out.println(queue.offer(task));
System.out.println(queue.offer(task));

这么做更加灵活,跳过各种检查机制,提高了执行效率。可以保证任务快速平稳的执行。

Finalization(终结)

A pool that is no longer referenced in a program <em>AND</em> has no remaining threads will be {@code shutdown} automatically. 一个程序中不再被引用并且没有剩余线程的池将自动被关闭。 If you would like to ensure that unreferenced pools are reclaimed even if users forget to call {@link #shutdown}, then you must arrange that unused threads eventually die, by setting appropriate keep-alive times, using a lower bound of zero core threads and/or setting {@link #allowCoreThreadTimeOut(boolean)}. 如果您希望即使用户忘记调用#shutdown也可以确保未引用的池被回收, 那么必须安排未使用的线程最终死亡,通过设置适当的存活时间,使用0核心线程的下边界和/或设置#allowCoreThreadTimeOut(boolean)。

不在被引用的线程池实例将会被关闭,因为ThreadPoolExecutor重写了finalize()方法,调用了shutdown()。

    /**
     * Invokes {@code shutdown} when this executor is no longer referenced and it has no threads.
     * 当这个线程池不再被引用并且没有线程时,调用{@code shutdown}。
     */
    protected void finalize() {
        shutdown();
    }

这里的注释说的并不准确,因为只要JVM发现对象不可达之后,就会尝试调用finalize(),进而将线程池状态改为SHUTDOWN,并不是线程池不被引用且没有线程的时候才关闭。

先来看看shutdown()都做了什么:

    /**
     * Initiates an orderly shutdown in which previously submitted tasks are executed, but no new tasks will be accepted.
     * 启动有序关闭,其中之前提交的任务将被执行,但不会接受任何新任务。
     * Invocation has no additional effect if already shut down.
     * 如果已经关闭,调用没有其他影响。
     * <p>
     * This method does not wait for previously submitted tasks to complete execution.
     * 此方法不会等待先前提交的任务完成执行。
     * Use {@link #awaitTermination awaitTermination} to do that.
     * 可以调用{@link #awaitTermination awaitTermination}判断是否终止
     *
     * @throws SecurityException {@inheritDoc}
     */
    public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess(); //校验是否有权限关闭
            advanceRunState(SHUTDOWN); //修改状态为shutdown
            interruptIdleWorkers();
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
    }

    /**
     * Interrupts threads that might be waiting for tasks (as indicated by not being locked) so they can check for termination or configuration changes.
     * 中断可能正在等待任务的线程(如未被锁定所示???),以便它们可以检查中断或配置变更
     * Ignores SecurityExceptions (in which case some threads may remain uninterrupted).
     * 忽略SecurityExceptions(在某些情况下,某些线程可能仍然不中断)
     *
     * @param onlyOne If true, interrupt at most one worker.
     *                如果true,则最多中断一个work。
     *                This is called only from tryTerminate when termination is otherwise enabled but there are still other workers.
     *                只有当执行终止被启用但是仍有worker时才会被tryTerminate()调用
     *                In this case, at most one waiting worker is interrupted to propagate shutdown signals in case all threads are currently waiting.
     *                在这种情况下,最多一个等待的work会中断来传播关闭信号,以防所有线程都在等待
     *                Interrupting any arbitrary thread ensures that newly arriving workers since shutdown began will also eventually exit.
     *                中断任意线程确保退出后新到的worker最终也能关闭
     *                To guarantee eventual termination, it suffices to always interrupt only one idle worker,
     *                but shutdown() interrupts all idle workers so that redundant workers exit promptly, not waiting for a straggler task to finish.
     *                为了保证最终的终止,总是只中断一个闲置的工作人员,
     *                但是shutdown()会中断所有空闲的worker,以便冗余worker立即退出,而不是等待一个闲置的任务完成
     */
    private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers) {
                Thread t = w.thread;
                if (!t.isInterrupted() && w.tryLock()) { //尝试加锁,Work执行中会加锁,即只中断闲置线程
                    try {
                        t.interrupt();
                        // 中断worker,实际上只是通知线程可以中断了,并不能保证线程中断
                        // 如果线程被阻塞,则抛出中断异常
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                if (onlyOne)
                    break;//只中断一个
            }
        } finally {
            mainLock.unlock();
        }
    }

    /**
     * Common form of interruptIdleWorkers, to avoid having to remember what the boolean argument means.
     * interruptIdleWorkers的常见形式,以避免必须记住boolean参数的含义。
     */
    private void interruptIdleWorkers() {
        interruptIdleWorkers(false);
    }

shutdown()主要做的事就是把线程池状态改为SHUTDOWN、中断所有闲置线程、尝试结束线程池。修改状态就是通过CAS改掉状态标志,中断闲置线程就是调用线程interrupt(),关于interrupt()实际上将线程标记为中断转态,或者通知线程可以中断了,结果还要看线程内部情况。

private Runnable getTask() {
        boolean timedOut = false;

        for (; ; ) {
            int c = ctl.get();
            int rs = runStateOf(c);
            //第二次循环,线程池状态已被改为SHUTDOWN
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                //工作线程数减一,返回null
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if ((wc > maximumPoolSize || (timed && timedOut)) 
                    && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null; //成功,返回null,终止线程
                continue;
            }

            try {
                //线程如果不是在执行任务,就是阻塞在这里,等待任务
                Runnable r = timed ?
                        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                        workQueue.take();
                if (r != null)
                    return r;
                //获取到null,说明超时了,在下个循环尝试执行终止
                timedOut = true;
            } catch (InterruptedException retry) {
                //interrupt()触发异常,结束第一次循环
                timedOut = false;
            }
        }
    }
    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                ...
            }
            //interrupt()执行后,getTask()返回null
            completedAbruptly = false;
        } finally {
            //处理work结束后的工作
            processWorkerExit(w, completedAbruptly);
        }
    }
	private void processWorkerExit(Worker w, boolean completedAbruptly) {
		...
        tryTerminate(); //当池被关闭,会触发终止,将线程池状态改为TERMINATED

        int c = ctl.get();
        if (runStateLessThan(c, STOP)) {
	        //此时线程池状态已被改为TERMINATED,不会执行
            ...
        }
    }
    /**
     * Transitions to TERMINATED state if either (SHUTDOWN and pool and queue empty) or (STOP and pool empty).
     * 把pool转换成TERMINATED状态,如果(状态是SHUTDOWN状态并且queue为空)或(状态是STOP并且pool为空)
     * If otherwise eligible to terminate but workerCount is nonzero, interrupts an
     * idle worker to ensure that shutdown signals propagate.
     * 如果不符合中断(上面的)条件,但是workerCount不为0,则中断闲置worker以确保关闭信号传播
     * This method must be called following any action that might make
     * termination possible -- reducing worker count or removing tasks
     * from the queue during shutdown.
     * 这个方法必须在任意可能产生终止的操作后被调用 -- 在关闭期间减少worker数量(变更)或从队列移除任务
     * The method is non-private to allow access from ScheduledThreadPoolExecutor.
     * 这个方法不是私有的,允许ScheduledThreadPoolExecutor调用
     */
	final void tryTerminate() {
        for (; ; ) {
            int c = ctl.get();
            if (isRunning(c) || //运行状态
                    runStateAtLeast(c, TIDYING) || //TIDYING、TERMINATED状态
                    (runStateOf(c) == SHUTDOWN && !workQueue.isEmpty())) //状态为关闭,但工作队列不为空
                return; //以上这些状态,不能关闭,需要继续执行任务
            if (workerCountOf(c) != 0) { // Eligible to terminate 有资格终止
                interruptIdleWorkers(ONLY_ONE); //把闲置的worker终止,这里只中断了一个
                return;
            }

            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { //把ctl设置为010+000..(TIDYING)整理状态,防止多线程重复执行,因为已经判断过,所以work数量肯定是0
                    try {
                        terminated(); //钩子
                    } finally {
                        ctl.set(ctlOf(TERMINATED, 0)); //执行完成,把ctl状态改为终止
                        termination.signalAll(); //唤醒awaitTermination
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
            //如果修改状态失败,进入循环,直到成功
            // else retry on failed CAS
        }
    }

当调用线程的interrupt()方法时,处于等待任务状态的线程,不管是poll(keepAliveTime, TimeUnit.NANOSECONDS) 还是take()都会抛出InterruptedException结束阻塞,进入下次循环,此时线程池状态已被改为SHUTDOWN,return null,最终走到tryTerminate()。可见shutdown()和工作线程最后都会走到这个方法,但是结果会有不同,因为线程处理中断需要时间,通常是shutdown()先执行,此时工作线程可能还没有执行decrementWorkerCount(),所以这里可能直接return了,当所有工作线程都中断后,走到这里,才会真正的执行线程池的终止。现象就是在调用shutdown后,线程池isTerminated()并不会理解返回true。过程如下:

主线程:修改状态为SHUTDOWN-> 中断工作线程 -> tryTerminate() -> 工作线程数不是0 -> 结束

work:收到中断 -> 判断线程池状态 -> WorkerCount-1 -> processWorkerExit -> tryTerminate() -> 线程池状态改为TERMINATED

如果线程池队列中任然有任务,工作线程会一直执行,直到线程池为空,满足条件rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())后终止线程,这也符合shutdown()的定义:有序关闭,已提交的任务会被执行。

再来看为什么说了如果用户系统忘了调用shutdown()方法时,也要保证线程池被回收时,应该将所有线程设置为可终止,如设置corePoolSize为0,且设置合适的keepAliveTime时间,或者调用allowCoreThreadTimeOut(true),让所有线程都可以终止。

既然已经重写了finalize()并调用了shutdown()为何还要让线程可以终止?因为Object类的注释里说了,JVM并不保证一定会执行finalize()。

The Java programming language does not guarantee which thread will invoke the finalize method for any given object.

如果finalize()没被执行,空闲的线程池里的工作线程会一直阻塞在BlockingQueue.take(),而运行中的线程是不会被GC回收的,也就是说即使线程池没有引用变为不可达状态,也不会被GC回收,占用的资源就没法释放。所以如果线程池不再使用,应手动执行shutdown()或者将所有线程设置为可终止。

其他关注点

线程池控制状态

/**
 * The main pool control state, ctl, is an atomic integer packing two conceptual fields workerCount, indicating the effective number of threads runState, indicating whether running, shutting down etc
 * 主池控制状态:ctl,是一个AtomicInteger类型属性,包含两个概念,workerCount表示当前有效线程数,runState表示池的运行状态(运行、关闭等)
 * 
 * In order to pack them into one int, we limit workerCount to (2^29)-1 (about 500 million) threads rather than (2^31)-1 (2 billion) otherwise representable.
 * 为了将他们包装成一个int,我们把workerCount限制为用(2^29)-1,而不是(2^31)-1表示
 * If this is ever an issue in the future, the variable can be changed to be an AtomicLong,and the shift/mask constants below adjusted.
 * 如果未来这么做遇到问题,那就把这个变量改为AtomicLong,并调整下面的常量
 * But until the need arises, this code is a bit faster and simpler using an int.
 * 但是在需求出现之前,用int更快更简单
 * 
 * The workerCount is the number of workers that have been permitted to start and not permitted to stop.
 * workerCount是已经被允许启动和不允许停止的worker的数量
 * 
 * The value may be transiently different from the actual number of live threads, for example when a ThreadFactory fails to create a thread when asked,
 * and when exiting threads are still performing bookkeeping before terminating.
 * 这个值可能暂时与实际存活的线程数不同,例如当需要ThreadFactory创建线程却失败时,或者需要退出的线程在中断前仍然在执行记账
 * 
 * The user-visible pool size is reported as the current size of the workers set.
 * 用户可见的pool大小是当前worker集合大小
 * 
 * The runState provides the main lifecycle control, taking on values:
 * runState提供了主要的证明周期控制,提供的值为:
 * 
 * RUNNING:  Accept new tasks and process queued tasks
 * 允许新任务和处理队列中任务
 * SHUTDOWN: Don"t accept new tasks, but process queued tasks
 * 不允许新任务,但是处理队列中任务
 * STOP:     Don"t accept new tasks, don"t process queued tasks, and interrupt in-progress tasks
 * 不允许新任务,不处理处理队列中任务,中断进行中的任务
 * TIDYING:  All tasks have terminated, workerCount is zero, the thread transitioning to state TIDYING will run the terminated() hook method
 * 所有任务已经中断,workerCount为0,线程过度到TIDYING状态会回调terminated()
 * TERMINATED: terminated() has completed
 * terminated()执行完了
 * 
 * The numerical order among these values matters, to allow ordered comparisons. The runState monotonically increases over time,
 * but need not hit each state. The transitions are:
 * 这些数字的顺序很重要,可以进行顺序比较。runState会随时间单调增加,但不需要击中每个状态(指可能跳过某个状态),所有转变可能为:
 * 
 * RUNNING -> SHUTDOWN
 * On invocation of shutdown(), perhaps implicitly in finalize() 调用shutdown()或finalize()时
 * (RUNNING or SHUTDOWN) -> STOP
 * On invocation of shutdownNow()   调用shutdownNow()时
 * SHUTDOWN -> TIDYING
 * When both queue and pool are empty   当队列和pool同时为空
 * STOP -> TIDYING
 * When pool is empty   当pool为空
 * TIDYING -> TERMINATED
 * When the terminated() hook method has completed  当terminated()回调执行完
 * 
 * Threads waiting in awaitTermination() will return when the state reaches TERMINATED.
 * 当状态变为TERMINATED时,awaitTermination()中等待的线程会返回
 * 
 * Detecting the transition from SHUTDOWN to TIDYING is less straightforward than you"d like because the queue may become empty after non-empty and vice versa during SHUTDOWN state,
 * but we can only terminate if, after seeing that it is empty, we see that workerCount is 0 (which sometimes entails a recheck -- see below).
 * 检测状态SHUTDOWN到TIDYING并不像你想的那么简单,因为在SHUTDOWN期间队列可能从非空变为空或者相反,
 * 但是我们只能看到它是空的,workerCount是0时才能终止(线程池)(有时需要重新检查)
 */
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

线程池里很多操作都依赖于线程池状态和线程数,要保证一致性,如果同时对两个变量做原子操作,需要大量锁竞争,且有死锁风险。所以ThreadPoolExecutor里用一个AtomicInteger类型变量ctl同时保存了这两个参数。用32位的前三位保存线程池的运行状态runState,后29位表示工作线程数workerCount。用位运算操作,可以方便的对ctl进行修改。

private static final int COUNT_BITS = Integer.SIZE - 3; //32-3=29
private static final int CAPACITY = (1 << COUNT_BITS) - 1; //000 11111111111111111111111111111
//后29位作为工作数量,最大值全是29个1

// runState is stored in the high-order bits 	runState用高阶位保存
private static final int RUNNING = -1 << COUNT_BITS;	//111 00000000000000000000000000000
private static final int SHUTDOWN = 0 << COUNT_BITS;	//000 00000000000000000000000000000
private static final int STOP = 1 << COUNT_BITS;		//001 00000000000000000000000000000
// 整理
private static final int TIDYING = 2 << COUNT_BITS;		//010 00000000000000000000000000000
// 终止
private static final int TERMINATED = 3 << COUNT_BITS;	//011 00000000000000000000000000000

当想要获取或设置数值时,只需用位运算符操作,例如:

一个线程池状态为RUNNING,线程数为8,那么ctl就是:
	111 00000000000000000000000000000
|	000 00000000000000000000000001000
=	111 00000000000000000000000001000
想要取当前状态,只需按位与~CAPACITY:
	111 00000000000000000000000001000
&	111 00000000000000000000000000000
=	111 00000000000000000000000000000
想要取线程数,只需按位与CAPACITY:
	111 00000000000000000000000001000
&	000 11111111111111111111111111111
=	000 00000000000000000000000001000

ThreadPoolExecutor的生命周期通常为RUNNING->SHUTDOWN/STOP->TIDYING->TERMINATED,即状态值有小到大,代码中有很多地方就是通过比较值大小判断状态。可能的状态变化有几种可能:

  • RUNNING -> SHUTDOWN

    运行状态中调用shutdown(),修改状态为SHUTDOWN,并中断空闲线程

  • (RUNNING or SHUTDOWN) -> STOP

    调用shutdownNow(),修改状态为STOP,并中断所有线程

  • SHUTDOWN -> TIDYING

    当处理完等待任务,工作线程终止后,调用tryTerminate(),修改状态为TIDYING,然后处理回调

  • STOP -> TIDYING

    当清空等待任务,工作线程终止后,调用tryTerminate(),修改状态为TIDYING,然后处理回调

  • TIDYING -> TERMINATED

    tryTerminate()执行回调完成,修改状态为TERMINATED

这里可以注意到shutdown()和shutdownNow()的区别:shutdown()只中断了空闲线程,让工作中线程继续处理队列中的任务,shutdownNow()会中断所有线程,同时清空工作队列。最终目的都是使工作线程为0、等待队列为空。