Java并发编程系列34|深入理解线程池(下)

酷扯儿

发布时间: 20-12-0817:12《酷扯儿》官方帐号

本文转载自【微信公众号:java进阶架构师,ID:java_jiagoushi】经微信公众号授权转载,如需转载与原文作者联系

本文是深入理解线程池下篇:

线程池介绍Executor框架接口线程池状态线程池参数线程池创建执行过程关闭线程池其他问题

任务拒绝策略线程池中的线程初始化线程池容量的动态调整线程池的监控

6. 执行过程

这一节详细分析任务提交到线程池之后,线程池是如何处理和执行任务的。

6.1 execute()方法

execute()方法执行过程如下:

如果workerCount < corePoolSize,则创建并启动一个线程来执行新提交的任务,即使有空闲线程,也要创建一个新线程如果workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中;如果workerCount >= corePoolSize,且线程池内的阻塞队列已满,则创建并启动一个线程来执行新提交的任务;如果workerCount>=maximumPoolSize,且线程池内的阻塞队列已满,则根据拒绝策略来处理该任务,默认的处理方式是直接抛异常。

public void execute(Runnable command) {if (command == null) throw new NullPointerException(); int c = ctl.get(); /* * 当前核心线程数小于corePoolSize,则新建一个线程放入线程池中。 * 注意这里不管核心线程有没有空闲,都会创建线程 */ if (workerCountOf(c) < corePoolSize) { // 创建线程,并执行command。addWorker方法后面详细讲解。 if (addWorker(command, true)) return; c = ctl.get();// 如果添加失败,则重新获取ctl值 } // 当前核心线程数大于等于corePoolSize,将任务添加到队列 if (isRunning(c) && workQueue.offer(command)) { /* * 再次检查线程池的运行状态 * 如果不是运行状态,将command从workQueue中移除,使用拒绝策略处理command */ int recheck = ctl.get(); if (!isRunning(recheck) && remove(command)) reject(command); // 如果有效线程数为0,创建一个线程 else if (workerCountOf(recheck) == 0) addWorker(null, false); } /* * 当前核心线程数大于等于corePoolSize,且workQueue队列添加任务失败,尝试创建maximumPoolSize中的线程来执行任务 */ else if (!addWorker(command, false)) reject(command);}

6.2 addWorker方法

addWorkerr(Runnable firstTask, boolean core)方法的主要工作是在线程池中创建一个新的线程并执行:

增加线程数量ctl;创建Worker对象来执行任务,每一个Worker对象都会创建一个线程;worker添加成功后,启动这个worker中的线程

参数firstTask:这个新创建的线程需要第一个执行的任务;firstTask==null,表示创建线程,到workQueue中取任务执行;参数core:true代表使用corePoolSize作为创建线程的界限;false代表使用maximumPoolSize作为界限

/*** 在线程池中创建一个新的线程并执行 * @param firstTask 这个新创建的线程需要第一个执行的任务;firstTask==null,表示创建线程,到workQueue中取任务执行 * @param core true代表使用corePoolSize作为创建线程的界限;false代表使用maximumPoolSize作为界限 * @return */private boolean addWorker(Runnable firstTask, boolean core) { // 1. 增加线程数量ctl retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c);// 获取运行状态 /* * 不能创建线程的几种情况: * 1. 线程池已关闭且rs == SHUTDOWN,不允许提交任务,且中断正在执行的任务 * 2. 线程池已关闭且firstTask!=null, * 3. 线程池已关闭且workQueue为空 */ if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c);// 获取线程数 // 判断线程数上限 if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; // 尝试增加workerCount,如果成功,则跳出外层for循环 if (compareAndIncrementWorkerCount(c)) break retry; // CAS失败,循环尝试 c = ctl.get(); if (runStateOf(c) != rs) continue retry; } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { /* * 2. 创建Worker对象来执行任务,每一个Worker对象都会创建一个线程 * Worker类下文详细讲解 */ w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { int rs = runStateOf(ctl.get()); /* * 判断状态: * 小于 SHUTTDOWN 那就是 RUNNING,最正常的情况 * 等于 SHUTDOWN,不接受新的任务但是会继续执行等待队列中的任务,所以要求firstTask == null */ if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w);// 添加worker int s = workers.size(); // largestPoolSize记录着线程池中出现过的最大线程数量 if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } // 3. worker添加成功,启动这个worker中的线程 if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted;}

6.3 Worker类

线程池中的每一个线程被封装成一个Worker对象,线程池维护的其实就是一组Worker对象。

private final class Worker extends AbstractQueuedSynchronizer implements Runnable {// 线程被封装成Worker final Thread thread; /* * 在创建线程的时候,如果同时指定的需要执行的第一个任务。 * 可以为 null,线程自己到任务队列中取任务执行 */ Runnable firstTask; // 线程完成的任务数 volatile long completedTasks; // Worker 只有这一个构造方法 Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this);// 调用 ThreadFactory 来创建一个新的线程 } /** * worker工作,调用外部类的 runWorker 方法,循环等待队列中获取任务并执行,下文详细介绍 */ public void run() { runWorker(this); } // ... 其他几个方法用AQS及锁的操作,不关注}

6.4 runWorker方法

循环从等待队列中获取任务并执行:

获取到新任务就执行;获取不到就阻塞等待新任务;队列中没有任务或空闲线程超时,销毁线程。

/*** 循环从等待队列中获取任务并执行 */final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { /* * 循环调用 getTask() 获取任务,getTask()下文详细讲解 * 获取到任务就执行, * 获取不到就阻塞等待新任务, * 返回null任务就销毁当前线程 */ while (task != null || (task = getTask()) != null) { w.lock(); // 如果线程池状态大于等于 STOP,中断 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast( ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task);// 钩子方法,留给需要的子类实现 Throwable thrown = null; try { task.run();// 真正执行任务,执行execute()中传入任务的run方法 } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown);// 钩子方法,留给需要的子类实现 } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { // 如果到这里,需要销毁线程: // 1. getTask 返回 null退出while循环,队列中没有任务或空闲线程超时 // 2. 任务执行过程中发生了异常 processWorkerExit(w, completedAbruptly); }}

6.5 getTask方法

获取workQueue中的任务

正常情况,直接workQueue.take()获取到任务返回;workQueue中没有任务,当前线程阻塞直到获取到任务;getTask()返回 null, runWorker()方法会销毁当前线程,如下情况返回null:

状态为SHUTDOWN && workQueue.isEmpty(),任务队列没有任务,且即将关闭线程池,销毁当前线程状态 >= STOP,关闭线程池,销毁当前线程当前线程数超过最大maximumPoolSize,销毁当前线程空闲线程超时keepAliveTime,需要销毁线程

/*** 获取workQueue中的任务 * 1. 正常情况,直接workQueue.take()获取到任务返回; * 2. workQueue中没有任务,当前线程阻塞直达获取到任务; * 3. getTask()返回 null, runWorker()方法会销毁当前线程,如下情况返回null: * 状态为SHUTDOWN && workQueue.isEmpty() * 状态 >= STOP * 当前线程数 wc > maximumPoolSize * 空闲线程超时keepAliveTime */private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); /* * 两种返回null的情况: * 1. rs == SHUTDOWN && workQueue.isEmpty() * 2. rs >= STOP */ if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount();// CAS 操作,减少工作线程数 return null; } int wc = workerCountOf(c); boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; /* * 两种返回null的情况: * 1. 当前线程数 wc > maximumPoolSize,return null * 2. 空闲线程超时,return null */ if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } /* * 到 workQueue 中获取任务并返回 */ try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } }}

6.6 总结

如果workerCount < corePoolSize,则创建并启动一个线程来执行新提交的任务;如果workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中;如果workerCount >= corePoolSize,且线程池内的阻塞队列已满,则创建并启动一个线程来执行新提交的任务;如果workerCount >= maximumPoolSize,且线程池内的阻塞队列已满, 则根据拒绝策略来处理该任务, 默认的处理方式是直接抛异常。线程池中的线程执行完当前任务后,会循环到任务队列中取任务继续执行;线程获取队列中任务时会阻塞,直到获取到任务返回;

7. 关闭线程池

关闭线程池使用shutdown方法或shutdownNow方法,最终目的是将线程池状态设置成TERMINATED。

7.1 shutdown方法

shutdown方法过程:

将线程池切换到SHUTDOWN状态;调用interruptIdleWorkers方法请求中断所有空闲的worker;调用tryTerminate尝试结束线程池。

public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess();// 安全策略判断 advanceRunState(SHUTDOWN);// CAS设置线程池状态为SHUTDOWN interruptIdleWorkers();// 中断空闲线程 onShutdown(); // 钩子方法,用于ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } // 尝试结束线程池,下文详细讲解 tryTerminate(); }

7.2 tryTerminate方法

结束线程池,最终将线程池状态设置为TERMINATED。

/*** 结束线程池,最终将线程池状态设置为TERMINATED */final void tryTerminate() { for (;;) { int c = ctl.get(); /* * 当前线程池的状态为以下几种情况时,直接返回: * 1. RUNNING,因为还在运行中,不能停止; * 2. TIDYING或TERMINATED,已经关闭了; * 3. SHUTDOWN并且等待队列非空,这时要执行完workQueue中的task; */ if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) return; // 如果线程数量不为0,则中断一个空闲的工作线程,并返回 if (workerCountOf(c) != 0) { // Eligible to terminate interruptIdleWorkers(ONLY_ONE); return; } final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 尝试设置状态为TIDYING if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { terminated();// 钩子方法,留给子类实现 } finally { // 设置状态为TERMINATED ctl.set(ctlOf(TERMINATED, 0)); termination.signalAll(); } return; } } finally { mainLock.unlock(); } // else retry on failed CAS }}

7.3 shutdownNow方法

shutdownNow方法过程:

将线程池切换到STOP状态;中断所有工作线程,无论是否空闲;取出阻塞队列中没有被执行的任务并返回;调用tryTerminate尝试结束线程池。

public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess();// 安全策略判断 advanceRunState(STOP);// CAS设置线程池状态为STOP interruptWorkers();// 中断所有工作线程,无论是否空闲 tasks = drainQueue();// 取出阻塞队列中没有被执行的任务并返回 } finally { mainLock.unlock(); } tryTerminate();// 结束线程池,最终将线程池状态设置为TERMINATED return tasks; }

shutdown方法 VS shutdownNow方法shutdown方法设置线程池状态为SHUTDOWN,SHUTDOWN状态不再接受新提交的任务,但却可以继续处理阻塞队列中已保存的任务。shutdownNow方法设置线程池状态为STOP,STOP状态不能接受新任务,也不处理队列中的任务,会中断正在处理任务的线程。

8. 其他问题

一些不需要长篇大论介绍的知识点,这里简单说下。

8.1 任务拒绝策略

构造线程时传入的 RejectedExecutionHandler 类型参数 handler 就是拒绝策略。

RejectedExecutionHandler只有一个钩子方法,执行拒绝策略。

public interface RejectedExecutionHandler {void rejectedExecution(Runnable r, ThreadPoolExecutor executor);}

ThreadPoolExecutor 中有四个已经定义好的RejectedExecutionHandler实现类可供我们直接使用。(我们也可以实现自己的策略)

/** * 由提交任务的线程自己来执行这个任务 */ public static class CallerRunsPolicy implements RejectedExecutionHandler { public CallerRunsPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } } } /** * 默认的策略:接抛出 RejectedExecutionException 异常 */ public static class AbortPolicy implements RejectedExecutionHandler { public AbortPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); } } /** * 不做任何处理,直接忽略掉这个任务 */ public static class DiscardPolicy implements RejectedExecutionHandler { public DiscardPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { } } /** * 把队列队头的任务(也就是等待了最长时间的)直接扔掉,然后提交这个任务到等待队列中 */ public static class DiscardOldestPolicy implements RejectedExecutionHandler { public DiscardOldestPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } } }

8.2 线程池的监控

我们可以通过线程池提供的参数和方法对线程池进行监控:

getTaskCount:线程池已经执行的和未执行的任务总数;getCompletedTaskCount:线程池已完成的任务数量,该值小于等于taskCount;getLargestPoolSize:线程池曾经创建过的最大线程数量。通过这个数据可以知道线程池是否满过,也就是达到了maximumPoolSize;getPoolSize:线程池当前的线程数量;getActiveCount:当前线程池中正在执行任务的线程数量;实现钩子方法beforeExecute方法,afterExecute方法和terminated方法,增加一些新操作。

8.3 线程池中的线程初始化

默认情况下,创建线程池之后,线程池中是没有线程的,需要提交任务之后才会创建线程。如果需要线程池创建之后立即创建线程,可以通过以下两个方法实现:

prestartCoreThread():初始化一个核心线程;prestartAllCoreThreads():初始化所有核心线程。

8.4 线程池容量的动态调整

ThreadPoolExecutor提供了动态调整线程池容量大小的方法:

setCorePoolSize:设置核心池大小setMaximumPoolSize:设置线程池最大能创建的线程数目大小

举报/反馈