线程池相关
源码:
package java.util.concurrent;import java.security.AccessControlContext;import java.security.AccessController;import java.security.PrivilegedAction;import java.util.ArrayList;import java.util.ConcurrentModificationException;import java.util.HashSet;import java.util.Iterator;import java.util.List;import java.util.concurrent.atomic.AtomicInteger;import java.util.concurrent.locks.AbstractQueuedSynchronizer;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.ReentrantLock;public class ThreadPoolExecutor extends AbstractExecutorService { // 线程池的控制状态(用来表示线程池的运行状态(整形的高3位)和运行的worker数量(低29位)) private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // 29位的偏移量 private static final int COUNT_BITS = Integer.SIZE - 3; // 最大容量(2^29 - 1) private static final int CAPACITY = (1 << COUNT_BITS) - 1; // 线程运行状态,总共有5个状态,需要3位来表示(所以偏移量的29 = 32 - 3) /** * RUNNING:接受新任务并且处理已经进入阻塞队列的任务 * SHUTDOWN:不接受新任务,但是处理已经进入阻塞队列的任务 * STOP:不接受新任务,不处理已经进入阻塞队列的任务并且中断正在运行的任务 * TIDYING:所有的任务都已经终止,workerCount为0, 线程转化为TIDYING状态并且调用terminated钩子函数 * TERMINATED:terminated钩子函数已经运行完成 **/ private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS; // 阻塞队列 private final BlockingQueueworkQueue; // 可重入锁 private final ReentrantLock mainLock = new ReentrantLock(); // 存放工作线程集合 private final HashSet workers = new HashSet (); // 终止条件 private final Condition termination = mainLock.newCondition(); // 最大线程池容量 private int largestPoolSize; // 已完成任务数量 private long completedTaskCount; // 线程工厂 private volatile ThreadFactory threadFactory; // 拒绝执行处理器 private volatile RejectedExecutionHandler handler; // 线程等待运行时间 private volatile long keepAliveTime; // 是否运行核心线程超时 private volatile boolean allowCoreThreadTimeOut; // 核心池的大小 private volatile int corePoolSize; // 最大线程池大小 private volatile int maximumPoolSize; // 默认拒绝执行处理器 private static final RejectedExecutionHandler defaultHandler = new AbortPolicy(); private static final RuntimePermission shutdownPerm = new RuntimePermission("modifyThread"); private final AccessControlContext acc; //核心内部类 private final class Worker extends AbstractQueuedSynchronizer implements Runnable { private static final long serialVersionUID = 6138294804551838833L;// 版本号 final Thread thread;// worker 所对应的线程 Runnable firstTask;// worker所对应的第一个任务 volatile long completedTasks;// 已完成任务数量 //构造函数 Worker(Runnable firstTask) { setState(-1);// 设置AQS的state为-1 this.firstTask = firstTask;// 初始化第一个任务 this.thread = getThreadFactory().newThread(this);// 根据当前worker,初始化线程 } // 重写了Runnable的run方法 public void run() { runWorker(this); } // 是否被独占,0代表未被独占,1代表被独占 protected boolean isHeldExclusively() { return getState() != 0; } // 尝试获取 protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) {// 比较并设置状态成功 setExclusiveOwnerThread(Thread.currentThread());// 设置独占线程 return true; } return false; } // 尝试释放 protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null);// 设置独占线程为null setState(0);// 设置状态为0 return true; } // 获取锁 public void lock() { acquire(1); } // 尝试获取锁 public boolean tryLock() { return tryAcquire(1); } // 释放锁 public void unlock() { release(1); } // 是否被独占 public boolean isLocked() { return isHeldExclusively(); } //如果worker对应的线程已经启动,则中断该线程的执行 void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {// AQS状态大于等于0并且worker对应的线程不为null并且该线程没有被中断 try { t.interrupt();// 中断线程 } catch (SecurityException ignore) { } } } } //拒绝策略继承接口RejectedExecutionHandler,实现了rejectedExecution方法: //rejectedExecution:当execute方法不能接受某个任务时,可以由 ThreadPoolExecutor 调用的方法。 //因为超出其界限而没有更多可用的线程或队列槽时,或者关闭 Executor 时就可能发生这种情况。 //在没有其他替代方法的情况下,该方法可能抛出未经检查的 RejectedExecutionException,而该异常将传播到 execute 的调用者。 public static class CallerRunsPolicy implements RejectedExecutionHandler { //构造器 public CallerRunsPolicy() { } //执行调用者线程中的任务r;如果执行程序已关闭,则会丢弃该任务 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) {//执行程序未关闭 r.run();//执行任务r } } } //拒绝策略继承接口RejectedExecutionHandler,实现了rejectedExecution方法 public static class AbortPolicy implements RejectedExecutionHandler { //构造器 public AbortPolicy() { } //用于被拒绝任务的处理程序,它总是抛出 RejectedExecutionException public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); } } //拒绝策略继承接口RejectedExecutionHandler,实现了rejectedExecution方法 public static class DiscardPolicy implements RejectedExecutionHandler { //构造器 public DiscardPolicy() { } //用于被拒绝任务的处理程序,不执行任何操作:将丢弃被拒绝的任务 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { } } //拒绝策略继承接口RejectedExecutionHandler,实现了rejectedExecution方法 public static class DiscardOldestPolicy implements RejectedExecutionHandler { //构造器 public DiscardOldestPolicy() { } //用于被拒绝任务的处理程序,它放弃最旧的未处理请求,然后重试execute;如果执行程序已关闭,则会丢弃该任务。 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) {//执行程序未关闭 e.getQueue().poll();//放弃最旧的未处理请求 e.execute(r);//重试r } } } //构造器 public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); } //构造器 public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue workQueue,ThreadFactory threadFactory) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,threadFactory, defaultHandler); } //构造器 public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue workQueue,RejectedExecutionHandler handler) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), handler); } //构造器 public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) { if (corePoolSize < 0 ||// 核心大小不能小于0 maximumPoolSize <= 0 ||// 线程池的初始最大容量不能小于0 maximumPoolSize < corePoolSize ||// 初始最大容量不能小于核心大小 keepAliveTime < 0)// keepAliveTime不能小于0 throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null)//检查参数合法性 throw new NullPointerException(); // 初始化相应的域 this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; } //当在客户端调用submit()时,之后会间接调用到execute函数,其在将来某个时间执行给定任务,此方法中并不会直接运行给定的任务 public void execute(Runnable command) { if (command == null)// 命令为null,抛出异常 throw new NullPointerException(); /* * 进行下面三步: * 1. 如果运行的线程小于corePoolSize,则尝试使用用户定义的Runnalbe对象创建一个新的线程 * 调用addWorker函数会原子性的检查runState和workCount,通过返回false来防止在不应该添加线程时添加了线程 * * 2. 如果一个任务能够成功入队列,在添加一个线城时仍需要进行双重检查(因为在前一次检查后可能该线程死亡了), * 或者当进入到此方法时,线程池已经shutdown了,所以需要再次检查状态,若有必要,当停止时还需要回滚入队列操作, * 或者当线程池没有线程时需要创建一个新线程 * * 3. 如果无法入队列,那么需要增加一个新线程, * 如果此操作失败,那么就意味着线程池已经shutdown或者已经饱和了,所以拒绝任务 */ int c = ctl.get();// 获取线程池控制状态 if (workerCountOf(c) < corePoolSize) {//运行的线程小于corePoolSize if (addWorker(command, true))//尝试使用用户定义的Runnalbe对象创建一个新的线程 return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (!isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false))//如果无法入队列,那么需要增加一个新线程 reject(command);//操作失败,意味着线程池已经shutdown或者已经饱和,所以拒绝任务 } //按过去执行已提交任务的顺序发起一个有序的关闭,但是不接受新任务。 //首先会检查是否具有shutdown的权限,然后设置线程池的控制状态为SHUTDOWN,之后中断空闲的worker,最后尝试终止线程池 public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 检查shutdown权限 checkShutdownAccess(); // 设置线程池控制状态为SHUTDOWN advanceRunState(SHUTDOWN); // 中断空闲worker interruptIdleWorkers(); // 调用shutdown钩子函数 onShutdown(); } finally { mainLock.unlock(); } // 尝试终止 tryTerminate(); } //尝试停止所有的活动执行任务、暂停等待任务的处理,并返回等待执行的任务列表。 //会终止所有的worker,而并非只是空闲的worker public List shutdownNow() { List tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(STOP); interruptWorkers(); tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); return tasks; } //是否已关闭 public boolean isShutdown() { return !isRunning(ctl.get()); } public boolean isTerminating() { int c = ctl.get(); return !isRunning(c) && runStateLessThan(c, TERMINATED); } public boolean isTerminated() { return runStateAtLeast(ctl.get(), TERMINATED); } public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (; ; ) { if (runStateAtLeast(ctl.get(), TERMINATED)) return true; if (nanos <= 0) return false; nanos = termination.awaitNanos(nanos); } } finally { mainLock.unlock(); } } protected void finalize() { SecurityManager sm = System.getSecurityManager(); if (sm == null || acc == null) { shutdown(); } else { PrivilegedAction pa = () -> { shutdown(); return null; }; AccessController.doPrivileged(pa, acc); } } public void setThreadFactory(ThreadFactory threadFactory) { if (threadFactory == null) throw new NullPointerException(); this.threadFactory = threadFactory; } public ThreadFactory getThreadFactory() { return threadFactory; } public void setRejectedExecutionHandler(RejectedExecutionHandler handler) { if (handler == null) throw new NullPointerException(); this.handler = handler; } public RejectedExecutionHandler getRejectedExecutionHandler() { return handler; } public void setCorePoolSize(int corePoolSize) { if (corePoolSize < 0) throw new IllegalArgumentException(); int delta = corePoolSize - this.corePoolSize; this.corePoolSize = corePoolSize; if (workerCountOf(ctl.get()) > corePoolSize) interruptIdleWorkers(); else if (delta > 0) { int k = Math.min(delta, workQueue.size()); while (k-- > 0 && addWorker(null, true)) { if (workQueue.isEmpty()) break; } } } public int getCorePoolSize() { return corePoolSize; } public boolean prestartCoreThread() { return workerCountOf(ctl.get()) < corePoolSize && addWorker(null, true); } public int prestartAllCoreThreads() { int n = 0; while (addWorker(null, true)) ++n; return n; } public boolean allowsCoreThreadTimeOut() { return allowCoreThreadTimeOut; } public void allowCoreThreadTimeOut(boolean value) { if (value && keepAliveTime <= 0) throw new IllegalArgumentException("Core threads must have nonzero keep alive times"); if (value != allowCoreThreadTimeOut) { allowCoreThreadTimeOut = value; if (value) interruptIdleWorkers(); } } public void setMaximumPoolSize(int maximumPoolSize) { if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize) throw new IllegalArgumentException(); this.maximumPoolSize = maximumPoolSize; if (workerCountOf(ctl.get()) > maximumPoolSize) interruptIdleWorkers(); } public int getMaximumPoolSize() { return maximumPoolSize; } public void setKeepAliveTime(long time, TimeUnit unit) { if (time < 0) throw new IllegalArgumentException(); if (time == 0 && allowsCoreThreadTimeOut()) throw new IllegalArgumentException("Core threads must have nonzero keep alive times"); long keepAliveTime = unit.toNanos(time); long delta = keepAliveTime - this.keepAliveTime; this.keepAliveTime = keepAliveTime; if (delta < 0) interruptIdleWorkers(); } public long getKeepAliveTime(TimeUnit unit) { return unit.convert(keepAliveTime, TimeUnit.NANOSECONDS); } public BlockingQueue getQueue() { return workQueue; } public boolean remove(Runnable task) { boolean removed = workQueue.remove(task); tryTerminate(); return removed; } public void purge() { final BlockingQueue q = workQueue; try { Iterator it = q.iterator(); while (it.hasNext()) { Runnable r = it.next(); if (r instanceof Future && ((Future ) r).isCancelled()) it.remove(); } } catch (ConcurrentModificationException fallThrough) { for (Object r : q.toArray()) if (r instanceof Future && ((Future ) r).isCancelled()) q.remove(r); } tryTerminate(); } public int getPoolSize() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { return runStateAtLeast(ctl.get(), TIDYING) ? 0 : workers.size(); } finally { mainLock.unlock(); } } public int getActiveCount() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { int n = 0; for (Worker w : workers) if (w.isLocked()) ++n; return n; } finally { mainLock.unlock(); } } public int getLargestPoolSize() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { return largestPoolSize; } finally { mainLock.unlock(); } } public long getTaskCount() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { long n = completedTaskCount; for (Worker w : workers) { n += w.completedTasks; if (w.isLocked()) ++n; } return n + workQueue.size(); } finally { mainLock.unlock(); } } public long getCompletedTaskCount() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { long n = completedTaskCount; for (Worker w : workers) n += w.completedTasks; return n; } finally { mainLock.unlock(); } } public String toString() { long ncompleted; int nworkers, nactive; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { ncompleted = completedTaskCount; nactive = 0; nworkers = workers.size(); for (Worker w : workers) { ncompleted += w.completedTasks; if (w.isLocked()) ++nactive; } } finally { mainLock.unlock(); } int c = ctl.get(); String rs = (runStateLessThan(c, SHUTDOWN) ? "Running" : (runStateAtLeast(c, TERMINATED) ? "Terminated" : "Shutting down")); return super.toString() + "[" + rs + ", pool size = " + nworkers + ", active threads = " + nactive + ", queued tasks = " + workQueue.size() + ", completed tasks = " + ncompleted + "]"; } void ensurePrestart() { int wc = workerCountOf(ctl.get()); if (wc < corePoolSize) addWorker(null, true); else if (wc == 0) addWorker(null, false); } private static int runStateOf(int c) { return c & ~CAPACITY; } private static int workerCountOf(int c) { return c & CAPACITY; } private static int ctlOf(int rs, int wc) { return rs | wc; } private static boolean runStateLessThan(int c, int s) { return c < s; } private static boolean runStateAtLeast(int c, int s) { return c >= s; } private static boolean isRunning(int c) { return c < SHUTDOWN; } private boolean compareAndIncrementWorkerCount(int expect) { return ctl.compareAndSet(expect, expect + 1); } private boolean compareAndDecrementWorkerCount(int expect) { return ctl.compareAndSet(expect, expect - 1); } private void decrementWorkerCount() { do { } while (!compareAndDecrementWorkerCount(ctl.get())); } private void advanceRunState(int targetState) { for (; ; ) { int c = ctl.get(); if (runStateAtLeast(c, targetState) || ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c)))) break; } } //尝试终止线程池:如果线程池的状态为SHUTDOWN并且线程池和阻塞队列都为空或者状态为STOP并且线程池为空,则将线程池控制状态转化为TERMINATED;否则,将中断一个空闲的worker final void tryTerminate() { for (;;) { // 无限循环,确保操作成功 // 获取线程池控制状态 int c = ctl.get(); if (isRunning(c) || // 线程池的运行状态为RUNNING runStateAtLeast(c, TIDYING) || // 线程池的运行状态最小要大于TIDYING (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) // 线程池的运行状态为SHUTDOWN并且workQueue队列不为null // 不能终止,直接返回 return; if (workerCountOf(c) != 0) { // 线程池正在运行的worker数量不为0 // 仅仅中断一个空闲的worker interruptIdleWorkers(ONLY_ONE); return; } // 获取线程池的锁 final ReentrantLock mainLock = this.mainLock; // 获取锁 mainLock.lock(); try { if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { // 比较并设置线程池控制状态为TIDYING try { // 终止,钩子函数 terminated(); } finally { // 设置线程池控制状态为TERMINATED ctl.set(ctlOf(TERMINATED, 0)); // 释放在termination条件上等待的所有线程 termination.signalAll(); } return; } } finally { // 释放锁 mainLock.unlock(); } } } private void checkShutdownAccess() { SecurityManager security = System.getSecurityManager(); if (security != null) { security.checkPermission(shutdownPerm); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) security.checkAccess(w.thread); } finally { mainLock.unlock(); } } } private void interruptWorkers() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) w.interruptIfStarted(); } finally { mainLock.unlock(); } } //中断正在等待任务的空闲worker private void interruptIdleWorkers(boolean onlyOne) { // 线程池的锁 final ReentrantLock mainLock = this.mainLock; // 获取锁 mainLock.lock(); try { for (Worker w : workers) { // 遍历workers队列 // worker对应的线程 Thread t = w.thread; if (!t.isInterrupted() && w.tryLock()) { // 线程未被中断并且成功获得锁 try { // 中断线程 t.interrupt(); } catch (SecurityException ignore) { } finally { // 释放锁 w.unlock(); } } if (onlyOne) // 若只中断一个,则跳出循环 break; } } finally { // 释放锁 mainLock.unlock(); } } private void interruptIdleWorkers() { interruptIdleWorkers(false); } private static final boolean ONLY_ONE = true; final void reject(Runnable command) { handler.rejectedExecution(command, this); } void onShutdown() { } final boolean isRunningOrShutdown(boolean shutdownOK) { int rs = runStateOf(ctl.get()); return rs == RUNNING || (rs == SHUTDOWN && shutdownOK); } private List drainQueue() { BlockingQueue q = workQueue; ArrayList taskList = new ArrayList (); q.drainTo(taskList); if (!q.isEmpty()) { for (Runnable r : q.toArray(new Runnable[0])) { if (q.remove(r)) taskList.add(r); } } return taskList; } //可能会完成如下几件任务: //1.原子性的增加workerCount。 //2.将用户给定的任务封装成为一个worker,并将此worker添加进workers集合中。 //3.启动worker对应的线程,并启动该线程,运行worker的run方法。 //4.回滚worker的创建动作,即将worker从workers集合中删除,并原子性的减少workerCount。 private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { // 外层无限循环 // 获取线程池控制状态 int c = ctl.get(); // 获取状态 int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && // 状态大于等于SHUTDOWN,初始的ctl为RUNNING,小于SHUTDOWN ! (rs == SHUTDOWN && // 状态为SHUTDOWN firstTask == null && // 第一个任务为null ! workQueue.isEmpty())) // worker队列不为空 // 返回 return false; for (;;) { // worker数量 int wc = workerCountOf(c); if (wc >= CAPACITY || // worker数量大于等于最大容量 wc >= (core ? corePoolSize : maximumPoolSize)) // worker数量大于等于核心线程池大小或者最大线程池大小 return false; if (compareAndIncrementWorkerCount(c)) // 比较并增加worker的数量 // 跳出外层循环 break retry; // 获取线程池控制状态 c = ctl.get(); if (runStateOf(c) != rs) // 此次的状态与上次获取的状态不相同 // 跳过剩余部分,继续循环 continue retry; } } // worker开始标识 boolean workerStarted = false; // worker被添加标识 boolean workerAdded = false; Worker w = null; try { // 初始化worker w = new Worker(firstTask); // 获取worker对应的线程 final Thread t = w.thread; if (t != null) { // 线程不为null // 线程池锁 final ReentrantLock mainLock = this.mainLock; // 获取锁 mainLock.lock(); try { // 线程池的运行状态 int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || // 小于SHUTDOWN (rs == SHUTDOWN && firstTask == null)) { // 等于SHUTDOWN并且firstTask为null if (t.isAlive()) // 线程刚添加进来,还未启动就存活 // 抛出线程状态异常 throw new IllegalThreadStateException(); // 将worker添加到worker集合 workers.add(w); // 获取worker集合的大小 int s = workers.size(); if (s > largestPoolSize) // 队列大小大于largestPoolSize // 重新设置largestPoolSize largestPoolSize = s; // 设置worker已被添加标识 workerAdded = true; } } finally { // 释放锁 mainLock.unlock(); } if (workerAdded) { // worker被添加 // 开始执行worker的run方法 t.start(); // 设置worker已开始标识 workerStarted = true; } } } finally { if (! workerStarted) // worker没有开始 // 添加worker失败 addWorkerFailed(w); } return workerStarted; } private void addWorkerFailed(Worker w) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (w != null) workers.remove(w); decrementWorkerCount(); tryTerminate(); } finally { mainLock.unlock(); } } //在worker退出时调用到的钩子函数。 //worker退出的主要因素: //1.阻塞队列已经为空,即没有任务可以运行了。 //2.调用了shutDown或shutDownNow函数 //根据是否中断了空闲线程来确定是否减少workerCount的值,并且将worker从workers集合中移除并且会尝试终止线程池 private void processWorkerExit(Worker w, boolean completedAbruptly) { if (completedAbruptly) // 如果被中断,则需要减少workCount decrementWorkerCount(); // 获取可重入锁 final ReentrantLock mainLock = this.mainLock; // 获取锁 mainLock.lock(); try { // 将worker完成的任务添加到总的完成任务中 completedTaskCount += w.completedTasks; // 从workers集合中移除该worker workers.remove(w); } finally { // 释放锁 mainLock.unlock(); } // 尝试终止 tryTerminate(); // 获取线程池控制状态 int c = ctl.get(); if (runStateLessThan(c, STOP)) { // 小于STOP的运行状态 if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) // 允许核心超时并且workQueue阻塞队列不为空 min = 1; if (workerCountOf(c) >= min) // workerCount大于等于min // 直接返回 return; } // 添加worker addWorker(null, false); } } //用于从workerQueue阻塞队列中获取Runnable对象,由于是阻塞队列,所以支持有限时间等待(poll)和无限时间等待(take)。 //在该函数中还会响应shutDown和shutDownNow函数的操作,若检测到线程池处于SHUTDOWN或STOP状态,则会返回null,而不再返回阻塞队列中的Runnalbe对象。 private Runnable getTask() { boolean timedOut = false; for (;;) { // 无限循环,确保操作成功 // 获取线程池控制状态 int c = ctl.get(); // 运行的状态 int rs = runStateOf(c); if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { // 大于等于SHUTDOWN(表示调用了shutDown)并且(大于等于STOP(调用了shutDownNow)或者worker阻塞队列为空) // 减少worker的数量 decrementWorkerCount(); // 返回null,不执行任务 return null; } // 获取worker数量 int wc = workerCountOf(c); boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // 是否允许coreThread超时或者workerCount大于核心大小 if ((wc > maximumPoolSize || (timed && timedOut)) // worker数量大于maximumPoolSize && (wc > 1 || workQueue.isEmpty())) { // workerCount大于1或者worker阻塞队列为空(在阻塞队列不为空时,需要保证至少有一个wc) if (compareAndDecrementWorkerCount(c)) // 比较并减少workerCount // 返回null,不执行任务,该worker会退出 return null; // 跳过剩余部分,继续循环 continue; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : // 等待指定时间 workQueue.take(); // 一直等待,直到有元素 if (r != null) return r; // 等待指定时间后,没有获取元素,则超时 timedOut = true; } catch (InterruptedException retry) { // 抛出了被中断异常,重试,没有超时 timedOut = false; } } } //此函数中会实际执行给定任务(即调用用户重写的run方法),并且当给定任务完成后,会继续从阻塞队列中取任务,直到阻塞队列为空(即任务全部完成) //在执行给定任务时,会调用钩子函数,利用钩子函数可以完成用户自定义的一些逻辑. final void runWorker(Worker w) { // 获取当前线程 Thread wt = Thread.currentThread(); // 获取w的firstTask Runnable task = w.firstTask; // 设置w的firstTask为null w.firstTask = null; // 释放锁(设置state为0,允许中断) w.unlock(); boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { // 任务不为null或者阻塞队列还存在任务 // 获取锁 w.lock(); if ((runStateAtLeast(ctl.get(), STOP) || // 线程池的运行状态至少应该高于STOP (Thread.interrupted() && // 线程被中断 runStateAtLeast(ctl.get(), STOP))) && // 再次检查,线程池的运行状态至少应该高于STOP !wt.isInterrupted()) // wt线程(当前线程)没有被中断 wt.interrupt(); // 中断wt线程(当前线程) try { // 在执行之前调用钩子函数 beforeExecute(wt, task); Throwable thrown = null; try { // 运行给定的任务 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { // 执行完后调用钩子函数 afterExecute(task, thrown); } } finally { task = null; // 增加给worker完成的任务数量 w.completedTasks++; // 释放锁 w.unlock(); } } completedAbruptly = false; } finally { // 处理完成后,调用钩子函数 processWorkerExit(w, completedAbruptly); } } protected void beforeExecute(Thread t, Runnable r) { } protected void afterExecute(Runnable r, Throwable t) { } protected void terminated() { }}
线程池可以解决两个不同问题:由于减少了每个任务调用的开销,它们通常可以在执行大量异步任务时提供增强的性能,并且还可以提供绑定和管理资源(包括执行任务集时使用的线程)的方法。
在ThreadPoolExecutor的内部,主要由BlockingQueue和AbstractQueuedSynchronizer对其提供支持,BlockingQueue接口有多种数据结构的实现,如LinkedBlockingQueue、ArrayBlockingQueue等
ThreadPoolExecutor继承自AbstractExecutorService,AbstractExecuetorService提供了ExecutorService执行方法的默认实现。
类的内部类
ThreadPoolExecutor的核心内部类为Worker,其对资源进行了复用,减少创建线程的开销,还有若干个策略类。内部类的类图如下:
说明:
可以看到Worker继承了AQS抽象类并且实现了Runnable接口,是ThreadPoolExecutor的核心内部类。
对于AbortPolicy,用于被拒绝任务的处理程序,它将抛出 RejectedExecutionException、
CallerRunsPolicy,用于被拒绝任务的处理程序,它直接在 execute 方法的调用线程中运行被拒绝的任务;如果执行程序已关闭,则会丢弃该任务、
DiscardPolicy,用于被拒绝任务的处理程序,默认情况下它将丢弃被拒绝的任务、
DiscardOldestPolicy,用于被拒绝任务的处理程序,它放弃最旧的未处理请求,然后重试 execute;如果执行程序已关闭,则会丢弃该任务。这些都是拒绝任务提交时的所采用的不同策略。
Worker类
1. 类的继承关系
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {}
说明:Worker继承了AQS抽象类,其重写了AQS的一些方法,并且其也可作为一个Runnable对象,从而可以创建线程Thread。
2. 类的属性
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{ // 版本号 private static final long serialVersionUID = 6138294804551838833L; // worker 所对应的线程 final Thread thread; // worker所对应的第一个任务 Runnable firstTask; // 已完成任务数量 volatile long completedTasks;}
说明:Worker属性中比较重要的属性如下,Thread类型的thread属性,用来封装worker(因为worker为Runnable对象),表示一个线程;Runnable类型的firstTask,其表示该worker所包含的Runnable对象,即用户自定义的Runnable对象,完成用户自定义的逻辑的Runnable对象;volatile修饰的long类型的completedTasks,表示已完成的任务数量。
3. 类的构造函数
Worker(Runnable firstTask) { // 设置状态为-1 setState(-1); // 初始化第一个任务 this.firstTask = firstTask; // 根据当前worker,初始化线程 this.thread = getThreadFactory().newThread(this);}
说明:用于构造一个worker对象,并设置AQS的state为-1,同时初始化了对应的域。
4. 核心函数分析
// 重写了Runnable的run方法public void run() { runWorker(this);}// 是否被独占,0代表未被独占,1代表被独占protected boolean isHeldExclusively() { return getState() != 0;}// 尝试获取protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { // 比较并设置状态成功 // 设置独占线程 setExclusiveOwnerThread(Thread.currentThread()); return true; } return false;}// 尝试释放protected boolean tryRelease(int unused) { // 设置独占线程为null setExclusiveOwnerThread(null); // 设置状态为0 setState(0); return true;}// 获取锁public void lock(){ acquire(1);}// 尝试获取锁public boolean tryLock(){ return tryAcquire(1);}// 释放锁public void unlock(){ release(1);}// 是否被独占public boolean isLocked(){ return isHeldExclusively();}void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { // AQS状态大于等于0并且worker对应的线程不为null并且该线程没有被中断 try { // 中断线程 t.interrupt(); } catch (SecurityException ignore) { } }}
说明:Worker的函数主要是重写了AQS的相应函数和重写了Runnable的run函数,重写的函数比较简单,具体的可以参见AQS的分析。
拒绝策略
//拒绝策略继承接口RejectedExecutionHandler,实现了rejectedExecution方法 public static class CallerRunsPolicy implements RejectedExecutionHandler { //构造器 public CallerRunsPolicy() { } //执行调用者线程中的任务r;如果执行程序已关闭,则会丢弃该任务 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) {//执行程序未关闭 r.run();//执行任务r } } } //拒绝策略继承接口RejectedExecutionHandler,实现了rejectedExecution方法 public static class AbortPolicy implements RejectedExecutionHandler { //构造器 public AbortPolicy() { } //用于被拒绝任务的处理程序,它总是抛出 RejectedExecutionException public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); } } //拒绝策略继承接口RejectedExecutionHandler,实现了rejectedExecution方法 public static class DiscardPolicy implements RejectedExecutionHandler { //构造器 public DiscardPolicy() { } //用于被拒绝任务的处理程序,不执行任何操作:将丢弃被拒绝的任务 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { } } //拒绝策略继承接口RejectedExecutionHandler,实现了rejectedExecution方法 public static class DiscardOldestPolicy implements RejectedExecutionHandler { //构造器 public DiscardOldestPolicy() { } //用于被拒绝任务的处理程序,它放弃最旧的未处理请求,然后重试execute;如果执行程序已关闭,则会丢弃该任务。 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) {//执行程序未关闭 e.getQueue().poll();//放弃最旧的未处理请求 e.execute(r);//重试r } } }
ThreadPoolExecutor.AbortPolicy、ThreadPoolExecutor.CallerRunsPolicy、ThreadPoolExecutor.DiscardOldestPolicy、ThreadPoolExecutor.DiscardPolicy 均实现了RejectedExecutionHandler 接口:
package java.util.concurrent;public interface RejectedExecutionHandler { void rejectedExecution(Runnable r, ThreadPoolExecutor executor);}
rejectedExecution(Runnable r, ThreadPoolExecutor executor):当execute方法不能接受某个任务时,可以由 ThreadPoolExecutor 调用的方法。
因为超出其界限而没有更多可用的线程或队列槽时,或者关闭 Executor 时就可能发生这种情况。
在没有其他替代方法的情况下,该方法可能抛出未经检查的 RejectedExecutionException,而该异常将传播到 execute 的调用者。
AbortPolicy的rejectedExecution实现:用于被拒绝任务的处理程序,它总是抛出 RejectedExecutionException;
CallerRunsPolicy的rejectedExecution实现:执行调用者线程中的任务r;如果执行程序已关闭,则会丢弃该任务;
DiscardOldestPolicy的rejectedExecution实现:用于被拒绝任务的处理程序,它放弃最旧的未处理请求,然后重试execute;如果执行程序已关闭,则会丢弃该任务;
DiscardPolicy 的rejectedExecution实现:用于被拒绝任务的处理程序,不执行任何操作:将丢弃被拒绝的任务。
类 ThreadPoolExecutor
已实现的接口:
,
已知子类:
一个 ,使用池线程中的一个实现执行每个提交的任务,这个实现通常使用 工厂方法配置。
线程池可以解决两个不同问题:
- 由于减少了每个任务调用的开销,它们通常可以在执行大量异步任务时提供增强的性能,并且还可以提供绑定和管理资源(包括执行任务集时使用的线程)的方法。
- 每个ThreadPoolExecutor 还维护着一些基本的统计数据,如完成的任务数。
为了便于跨大量上下文使用,此类提供了很多可调整的参数和扩展钩子 (hook)。
一般情况,都使用较为方便的 工厂方法 (无界线程池,可以进行自动线程回收)、(固定大小线程池)和 (单个后台线程),它们均为大多数使用场景预定义了设置。
如果需要手动配置和调整此类时,则使用以下指导:
- 核心和最大池大小
ThreadPoolExecutor 将根据 corePoolSize(参见 )和 maximumPoolSize(参见 )设置的边界自动调整池大小。
当新任务在方法 中提交时:
- 如果运行的线程少于 corePoolSize,则创建新线程来处理请求,即使其他辅助线程是空闲的。
- 如果运行的线程多于 corePoolSize 而少于 maximumPoolSize,则仅当队列满时才创建新线程。
- 如果设置的 corePoolSize 和 maximumPoolSize 相同,则创建了固定大小的线程池。
- 如果将 maximumPoolSize 设置为基本的无界值(如 Integer.MAX_VALUE),则允许池适应任意数量的并发任务。
在大多数情况下,核心和最大池大小仅基于构造来设置,不过也可以使用 和 进行动态更改。
- 按需构造
默认情况下,即使核心线程最初只是在新任务到达时才创建和启动的,也可以使用方法 或 对其进行动态重写。如果构造带有非空队列的池,则可能希望预先启动线程。
- 创建新线程
使用 创建新线程。如果没有另外说明,则在同一个 中一律使用 创建线程,并且这些线程具有相同的 NORM_PRIORITY 优先级和非守护进程状态。通过提供不同的 ThreadFactory,可以改变线程的名称、线程组、优先级、守护进程状态,等等。如果从 newThread 返回 null 时 ThreadFactory 未能创建线程,则执行程序将继续运行,但不能执行任何任务。
- 保持活动时间
如果池中当前有多于 corePoolSize 的线程,则这些多出的线程在空闲时间超过 keepAliveTime 时将会终止(参见 )。这提供了当池处于非活动状态时减少资源消耗的方法。如果池后来变得更为活动,则可以创建新的线程。也可以使用方法 动态地更改此参数。使用 Long.MAX_VALUE 的值在关闭前有效地从以前的终止状态禁用空闲线程。默认情况下,保持活动策略只在有多于 corePoolSizeThreads 的线程时应用。但是只要 keepAliveTime 值非 0, 方法也可将此超时策略应用于核心线程。
- 排队
所有 都可用于传输和保持提交的任务。可以使用此队列与池大小进行交互:
- 如果运行的线程少于 corePoolSize,则 Executor 始终首选添加新的线程,而不进行排队。
- 如果运行的线程等于或多于 corePoolSize,则 Executor 始终首选将请求加入队列,而不添加新的线程。
- 如果无法将请求加入队列,则创建新的线程,除非创建此线程超出 maximumPoolSize,在这种情况下,任务将被拒绝。
排队有三种通用策略:
- 直接提交。工作队列的默认选项是 ,它将任务直接提交给线程而不保持它们。在此,如果不存在可用于立即运行任务的线程,则试图把任务加入队列将失败,因此会构造一个新的线程。此策略可以避免在处理可能具有内部依赖性的请求集时出现锁。直接提交通常要求无界 maximumPoolSizes 以避免拒绝新提交的任务。当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。
- 无界队列。使用无界队列(例如,不具有预定义容量的 )将导致在所有 corePoolSize 线程都忙时新任务在队列中等待。这样,创建的线程就不会超过 corePoolSize。(因此,maximumPoolSize 的值也就无效了。)当每个任务完全独立于其他任务,即任务执行互不影响时,适合于使用无界队列;例如,在 Web 页服务器中。这种排队可用于处理瞬态突发请求,当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。
- 有界队列。当使用有限的 maximumPoolSizes 时,有界队列(如 )有助于防止资源耗尽,但是可能较难调整和控制。队列大小和最大池大小可能需要相互折衷:使用大型队列和小型池可以最大限度地降低 CPU 使用率、操作系统资源和上下文切换开销,但是可能导致人工降低吞吐量。如果任务频繁阻塞(例如,如果它们是 I/O 边界),则系统可能为超过您许可的更多线程安排时间。使用小型队列通常要求较大的池大小,CPU 使用率较高,但是可能遇到不可接受的调度开销,这样也会降低吞吐量。
- 被拒绝的任务
当 Executor 已经关闭,并且 Executor 将有限边界用于最大线程和工作队列容量,且已经饱和时,在方法 中提交的新任务将被拒绝。execute 方法将调用其 的 方法。
下面提供了四种预定义的处理程序策略:
- 在默认的 中,处理程序遭到拒绝将抛出运行时 。
- 在 中,线程调用运行该任务的 execute 本身。此策略提供简单的反馈控制机制,能够减缓新任务的提交速度。
- 在 中,不能执行的任务将被删除。
- 在 中,如果执行程序尚未关闭,则位于工作队列头部的任务将被删除,然后重试执行程序(如果再次失败,则重复此过程)。
定义和使用其他种类的 类也是可能的,但这样做需要非常小心,尤其是当策略仅用于特定容量或排队策略时。
- 钩子 (hook) 方法
此类提供 protected 可重写的 和 方法,这两种方法分别在执行每个任务之前和之后调用。它们可用于操纵执行环境;
例如,重新初始化 ThreadLocal、搜集统计信息或添加日志条目。此外,还可以重写方法 来执行 Executor 完全终止后需要完成的所有特殊处理。
如果钩子 (hook) 或回调方法抛出异常,则内部辅助线程将依次失败并突然终止。
- 队列维护
允许出于监控和调试目的而访问工作队列。强烈反对出于其他任何目的而使用此方法。
和 这两种方法可用于在取消大量已排队任务时帮助进行存储回收。
- 终止
不再引用的池没有剩余线程会自动 shutdown。如果希望确保回收取消引用的池(即使用户忘记调用 ),则必须安排未使用的线程最终终止:设置适当保持活动时间 。
扩展示例
此类的大多数扩展可以重写一个或多个受保护的钩子 (hook) 方法。例如,下面是一个添加了简单的暂停/恢复功能的子类:
class PausableThreadPoolExecutor extends ThreadPoolExecutor { private boolean isPaused; private ReentrantLock pauseLock = new ReentrantLock(); private Condition unpaused = pauseLock.newCondition(); public PausableThreadPoolExecutor(...) { super(...); } protected void beforeExecute(Thread t, Runnable r) { super.beforeExecute(t, r); pauseLock.lock(); try { while (isPaused) unpaused.await(); } catch(InterruptedException ie) { t.interrupt(); } finally { pauseLock.unlock(); } } public void pause() { pauseLock.lock(); try { isPaused = true; } finally { pauseLock.unlock(); } } public void resume() { pauseLock.lock(); try { isPaused = false; unpaused.signalAll(); } finally { pauseLock.unlock(); } }}
嵌套类摘要
static class | 用于被拒绝任务的处理程序,它将抛出 RejectedExecutionException. |
static class | 用于被拒绝任务的处理程序,它直接在 execute 方法的调用线程中运行被拒绝的任务;如果执行程序已关闭,则会丢弃该任务。 |
static class | 用于被拒绝任务的处理程序,它放弃最旧的未处理请求,然后重试 execute;如果执行程序已关闭,则会丢弃该任务。 |
static class | 用于被拒绝任务的处理程序,默认情况下它将丢弃被拒绝的任务。 |
构造方法摘要
(int corePoolSize, int maximumPoolSize, long keepAliveTime, unit, <> workQueue) 用给定的初始参数和默认的线程工厂及被拒绝的执行处理程序创建新的 ThreadPoolExecutor。 |
(int corePoolSize, int maximumPoolSize, long keepAliveTime, unit, <> workQueue, handler) 用给定的初始参数和默认的线程工厂创建新的 ThreadPoolExecutor。 |
(int corePoolSize, int maximumPoolSize, long keepAliveTime, unit, <> workQueue, threadFactory) 用给定的初始参数和默认被拒绝的执行处理程序创建新的 ThreadPoolExecutor。 |
(int corePoolSize, int maximumPoolSize, long keepAliveTime, unit, <> workQueue, threadFactory, handler) 用给定的初始参数创建新的 ThreadPoolExecutor。 |
方法摘要
protected void | ( r, t) 基于完成执行给定 Runnable 所调用的方法。 |
void | allowCoreThreadTimeOut(boolean value) 如果在保持活动时间内没有任务到达,新任务到达时正在替换(如果需要),则设置控制核心线程是超时还是终止的策略。 |
boolean | allowsCoreThreadTimeOut() 如果此池允许核心线程超时和终止,如果在 keepAlive 时间内没有任务到达,新任务到达时正在替换(如果需要),则返回 true。 |
boolean | (long timeout, unit) 请求关闭、发生超时或者当前线程中断,无论哪一个首先发生之后,都将导致阻塞,直到所有任务完成执行。 |
protected void | ( t, r) 在执行给定线程中的给定 Runnable 之前调用的方法。 |
void | execute( command) 在将来某个时间执行给定任务。 |
protected void | finalize() 当不再引用此执行程序时,调用 shutdown。 |
int | getActiveCount() 返回主动执行任务的近似线程数。 |
long | getCompletedTaskCount() 返回已完成执行的近似任务总数。 |
int | () 返回核心线程数。 |
long | ( unit) 返回线程保持活动的时间,该时间就是超过核心池大小的线程可以在终止前保持空闲的时间值。 |
int | getLargestPoolSize() 返回曾经同时位于池中的最大线程数。 |
int | () 返回允许的最大线程数。 |
int | getPoolSize() 返回池中的当前线程数。 |
<> | getQueue() 返回此执行程序使用的任务队列。 |
| () 返回用于未执行任务的当前处理程序。 |
long | getTaskCount() 返回曾计划执行的近似任务总数。 |
| () 返回用于创建新线程的线程工厂。 |
boolean | isShutdown() 如果此执行程序已关闭,则返回 true。 |
boolean | isTerminated() 如果关闭后所有任务都已完成,则返回 true。 |
boolean | isTerminating() 如果此执行程序处于在 shutdown 或 shutdownNow 之后正在终止但尚未完全终止的过程中,则返回 true。 |
int | prestartAllCoreThreads() 启动所有核心线程,使其处于等待工作的空闲状态。 |
boolean | prestartCoreThread() 启动核心线程,使其处于等待工作的空闲状态。 |
void | () 尝试从工作队列移除所有已取消的 任务。 |
boolean | remove( task) 从执行程序的内部队列中移除此任务(如果存在),从而如果尚未开始,则其不再运行。 |
void | setCorePoolSize(int corePoolSize) 设置核心线程数。 |
void | (long time, unit) 设置线程在终止前可以保持空闲的时间限制。 |
void | setMaximumPoolSize(int maximumPoolSize) 设置允许的最大线程数。 |
void | setRejectedExecutionHandler( handler) 设置用于未执行任务的新处理程序。 |
void | setThreadFactory( threadFactory) 设置用于创建新线程的线程工厂。 |
void | shutdown() 按过去执行已提交任务的顺序发起一个有序的关闭,但是不接受新任务。 |
<> | shutdownNow() 尝试停止所有的活动执行任务、暂停等待任务的处理,并返回等待执行的任务列表。 |
protected void | terminated() 当 Executor 已经终止时调用的方法。 |
从类 java.util.concurrent. 继承的方法
, , , , , , , ,
从类 java.lang. 继承的方法
, , , , , , , , ,
ThreadPoolExecutor
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, unit, <> workQueue)
用给定的初始参数和默认的线程工厂及被拒绝的执行处理程序创建新的 ThreadPoolExecutor。使用 工厂方法之一比使用此通用构造方法方便得多。
参数:
corePoolSize
- 池中所保存的线程数,包括空闲线程。
maximumPoolSize
- 池中允许的最大线程数。
keepAliveTime
- 当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间。
unit
- keepAliveTime 参数的时间单位。
workQueue
- 执行前用于保持任务的队列。此队列仅保持由 execute 方法提交的 Runnable 任务。
抛出:
- 如果 corePoolSize 或 keepAliveTime 小于 0,或者 maximumPoolSize 小于等于 0,或者 corePoolSize 大于 maximumPoolSize。
- 如果 workQueue 为 null
ThreadPoolExecutor
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, unit, <> workQueue, threadFactory)
用给定的初始参数和默认被拒绝的执行处理程序创建新的 ThreadPoolExecutor。
参数:
corePoolSize
- 池中所保存的线程数,包括空闲线程。
maximumPoolSize
- 池中允许的最大线程数。
keepAliveTime
- 当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间。
unit
- keepAliveTime 参数的时间单位。
workQueue
- 执行前用于保持任务的队列。此队列仅保持由 execute 方法提交的 Runnable 任务。
threadFactory
- 执行程序创建新线程时使用的工厂。
抛出:
- 如果 corePoolSize 或 keepAliveTime 小于 0,或者 maximumPoolSize 小于等于 0,或者 corePoolSize 大于 maximumPoolSize。
- 如果 workQueue 或 threadFactory 为 null。
ThreadPoolExecutor
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, unit, <> workQueue, handler)
用给定的初始参数和默认的线程工厂创建新的 ThreadPoolExecutor。
参数:
corePoolSize
- 池中所保存的线程数,包括空闲线程。
maximumPoolSize
- 池中允许的最大线程数。
keepAliveTime
- 当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间。
unit
- keepAliveTime 参数的时间单位。
workQueue
- 执行前用于保持任务的队列。此队列仅由保持 execute 方法提交的 Runnable 任务。
handler
- 由于超出线程范围和队列容量而使执行被阻塞时所使用的处理程序。
抛出:
- 如果 corePoolSize 或 keepAliveTime 小于 0,或者 maximumPoolSize 小于等于 0,或者 corePoolSize 大于 maximumPoolSize。
- 如果 workQueue 或 handler 为 null。
ThreadPoolExecutor
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, unit, <> workQueue, threadFactory, handler)
用给定的初始参数创建新的 ThreadPoolExecutor。
参数:
corePoolSize
- 池中所保存的线程数,包括空闲线程。
maximumPoolSize
- 池中允许的最大线程数。
keepAliveTime
- 当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间。
unit
- keepAliveTime 参数的时间单位。
workQueue
- 执行前用于保持任务的队列。此队列仅保持由 execute 方法提交的 Runnable 任务。
threadFactory
- 执行程序创建新线程时使用的工厂。
handler
- 由于超出线程范围和队列容量而使执行被阻塞时所使用的处理程序。
抛出:
- 如果 corePoolSize 或 keepAliveTime 小于 0,或者 maximumPoolSize 小于等于 0,或者 corePoolSize 大于 maximumPoolSize。
- 如果 workQueue、 threadFactory 或 handler 为 null。
execute
public void execute( command)
在将来某个时间执行给定任务。可以在新线程中或者在现有池线程中执行该任务。 如果无法将任务提交执行,或者因为此执行程序已关闭,或者因为已达到其容量,则该任务由当前 RejectedExecutionHandler 处理。
参数:
command
- 要执行的任务。
抛出:
- 如果无法接收要执行的任务,则由 RejectedExecutionHandler 决定是否抛出 RejectedExecutionException
- 如果命令为 null
shutdown
public void shutdown()
按过去执行已提交任务的顺序发起一个有序的关闭,但是不接受新任务。如果已经关闭,则调用没有其他作用。
抛出:
- 如果安全管理器存在并且关闭此 ExecutorService 可能操作某些不允许调用者修改的线程(因为它没有 ("modifyThread")),或者安全管理器的 checkAccess 方法拒绝访问。
shutdownNow
public <> shutdownNow()
尝试停止所有的活动执行任务、暂停等待任务的处理,并返回等待执行的任务列表。在从此方法返回的任务队列中排空(移除)这些任务。
并不保证能够停止正在处理的活动执行任务,但是会尽力尝试。 此实现通过 取消任务,所以无法响应中断的任何任务可能永远无法终止。
返回:
从未开始执行的任务的列表。
抛出:
- 如果安全管理器存在并且关闭此 ExecutorService 可能操作某些不允许调用者修改的线程(因为它没有 ("modifyThread")),或者安全管理器的 checkAccess 方法拒绝访问。
isShutdown
public boolean isShutdown()
从接口 复制的描述
如果此执行程序已关闭,则返回 true。
返回:
如果此执行程序已关闭,则返回 true
isTerminating
public boolean isTerminating()
如果此执行程序处于在 shutdown 或 shutdownNow 之后正在终止但尚未完全终止的过程中,则返回 true。此方法可能对调试很有用。关闭之后很长一段时间才报告返回的 true,这可能表示提交的任务已经被忽略或取消中断,导致此执行程序无法正确终止。
返回:
如果正在终止但尚未完成,则返回 true
isTerminated
public boolean isTerminated()
从接口 复制的描述
如果关闭后所有任务都已完成,则返回 true。注意,除非首先调用 shutdown 或 shutdownNow,否则 isTerminated 永不为 true。
返回:
如果关闭后所有任务都已完成,则返回 true
awaitTermination
public boolean awaitTermination(long timeout, unit) throws
从接口 复制的描述
请求关闭、发生超时或者当前线程中断,无论哪一个首先发生之后,都将导致阻塞,直到所有任务完成执行。
参数:
timeout
- 最长等待时间
unit
- timeout 参数的时间单位
返回:
如果此执行程序终止,则返回 true;如果终止前超时期满,则返回 false
抛出:
- 如果等待时发生中断
finalize
protected void finalize()
当不再引用此执行程序时,调用 shutdown。
覆盖:
类 中的
setThreadFactory
public void setThreadFactory( threadFactory)
设置用于创建新线程的线程工厂。
参数:
threadFactory
- 新线程工厂
抛出:
- 如果 threadFactory 为 null
另请参见:
getThreadFactory()
getThreadFactory
public getThreadFactory()
返回用于创建新线程的线程工厂。
返回:
当前线程工厂
另请参见:
setRejectedExecutionHandler
public void setRejectedExecutionHandler( handler)
设置用于未执行任务的新处理程序。
参数:
handler
- 新处理程序
抛出:
- 如果处理程序为 null
另请参见:
getRejectedExecutionHandler()
getRejectedExecutionHandler
public getRejectedExecutionHandler()
返回用于未执行任务的当前处理程序。
返回:
当前处理程序
另请参见:
setCorePoolSize
public void setCorePoolSize(int corePoolSize)
设置核心线程数。此操作将重写构造方法中设置的任何值。如果新值小于当前值,则多余的现有线程将在下一次空闲时终止。如果较大,则在需要时启动新线程来执行这些排队的任务。
参数:
corePoolSize
- 新核心大小
抛出:
- 如果 corePoolSize 小于 0
另请参见:
getCorePoolSize()
getCorePoolSize
public int getCorePoolSize()
返回核心线程数。
返回:
核心线程数
另请参见:
prestartCoreThread
public boolean prestartCoreThread()
启动核心线程,使其处于等待工作的空闲状态。仅当执行新任务时,此操作才重写默认的启动核心线程策略。如果已启动所有核心线程,此方法将返回 false。
返回:
如果启动了线程,则返回 true
prestartAllCoreThreads
public int prestartAllCoreThreads()
启动所有核心线程,使其处于等待工作的空闲状态。仅当执行新任务时,此操作才重写默认的启动核心线程策略。
返回:
已启动的线程数
allowsCoreThreadTimeOut
public boolean allowsCoreThreadTimeOut()
如果此池允许核心线程超时和终止,如果在 keepAlive 时间内没有任务到达,新任务到达时正在替换(如果需要),则返回 true。当返回 true 时,适用于非核心线程的相同的保持活动策略也同样适用于核心线程。当返回 false(默认值)时,由于没有传入任务,核心线程不会终止。
返回:
如果允许核心线程超时,则返回 true;否则返回 false
allowCoreThreadTimeOut
public void allowCoreThreadTimeOut(boolean value)
如果在保持活动时间内没有任务到达,新任务到达时正在替换(如果需要),则设置控制核心线程是超时还是终止的策略。当为 false(默认值)时,由于没有传入任务,核心线程将永远不会中止。当为 true 时,适用于非核心线程的相同的保持活动策略也同样适用于核心线程。为了避免连续线程替换,保持活动时间在设置为 true 时必须大于 0。通常应该在主动使用该池前调用此方法。
参数:
value
- 如果应该超时,则为 true;否则为 false
抛出:
- 如果 value 为 true 并且当前保持活动时间不大于 0。
setMaximumPoolSize
public void setMaximumPoolSize(int maximumPoolSize)
设置允许的最大线程数。此操作将重写构造方法中设置的任何值。如果新值小于当前值,则多余的现有线程将在下一次空闲时终止。
参数:
maximumPoolSize
- 新的最大值
抛出:
- 如果新的最大值小于等于 0,或者小于
另请参见:
getMaximumPoolSize()
getMaximumPoolSize
public int getMaximumPoolSize()
返回允许的最大线程数。
返回:
允许的最大线程数
另请参见:
setKeepAliveTime
public void setKeepAliveTime(long time, unit)
设置线程在终止前可以保持空闲的时间限制。如果池中的当前线程数多于核心线程数,在不处理任务的情况下等待这一时间段之后,多余的线程将被终止。此操作将重写构造方法中设置的任何值。
参数:
time
- 等待的时间。时间值 0 将导致执行任务后多余的线程立即终止。
unit
- 时间参数的时间单位
抛出:
- 如果时间小于 0,或者时间为 0 和 allowsCoreThreadTimeOut
另请参见:
getKeepAliveTime(java.util.concurrent.TimeUnit)
getKeepAliveTime
public long getKeepAliveTime( unit)
返回线程保持活动的时间,该时间就是超过核心池大小的线程可以在终止前保持空闲的时间值。
参数:
unit
- 所需的结果时间单位
返回:
时间限制
另请参见:
getQueue
public <> getQueue()
返回此执行程序使用的任务队列。对任务队列的访问主要用于调试和监控。此队列可能正处于活动使用状态中。获取任务队列不妨碍已加入队列的任务的执行。
返回:
任务队列
remove
public boolean remove( task)
从执行程序的内部队列中移除此任务(如果存在),从而如果尚未开始,则其不再运行。
此方法可用作取消方案的一部分。它可能无法移除在放置到内部队列之前已经转换为其他形式的任务。例如,使用 submit 输入的任务可能被转换为维护 Future 状态的形式。但是,在此情况下,purge()
方法可用于移除那些已被取消的 Future。
参数:
task
- 要移除的任务
返回:
如果已经移除任务,则返回 true
purge
public void purge()
尝试从工作队列移除所有已取消的 任务。此方法可用作存储回收操作,它对功能没有任何影响。取消的任务不会再次执行,但是它们可能在工作队列中累积,直到 worker 线程主动将其移除。调用此方法将试图立即移除它们。但是,如果出现其他线程的干预,那么此方法移除任务将失败。
getPoolSize
public int getPoolSize()
返回池中的当前线程数。
返回:
线程数。
getActiveCount
public int getActiveCount()
返回主动执行任务的近似线程数。
返回:
线程数。
getLargestPoolSize
public int getLargestPoolSize()
返回曾经同时位于池中的最大线程数。
返回:
线程数。
getTaskCount
public long getTaskCount()
返回曾计划执行的近似任务总数。因为在计算期间任务和线程的状态可能动态改变,所以返回值只是一个近似值。
返回:
任务数
getCompletedTaskCount
public long getCompletedTaskCount()
返回已完成执行的近似任务总数。因为在计算期间任务和线程的状态可能动态改变,所以返回值只是一个近似值,但是该值在整个连续调用过程中不会减少。
返回:
任务数。
beforeExecute
protected void beforeExecute( t, r)
在执行给定线程中的给定 Runnable 之前调用的方法。此方法由将执行任务 r 的线程 t 调用,并且可用于重新初始化 ThreadLocals 或者执行日志记录。
此实现不执行任何操作,但可在子类中定制。注:为了正确嵌套多个重写操作,此方法结束时,子类通常应该调用 super.beforeExecute。
参数:
t
- 将运行任务 r 的线程。
r
- 将执行的任务。
afterExecute
protected void afterExecute( r, t)
基于完成执行给定 Runnable 所调用的方法。此方法由执行任务的线程调用。如果非 null,则 Throwable 是导致执行突然终止的未捕获 RuntimeException 或 Error。
注:当操作显示地或者通过 submit 之类的方法包含在任务内时(如 ),这些任务对象捕获和维护计算异常,因此它们不会导致突然终止,内部异常不会 传递给此方法。
此实现不执行任何操作,但可在子类中定制。注:为了正确嵌套多个重写操作,此方法开始时,子类通常应该调用 super.afterExecute。
参数:
r
- 已经完成的 runnable 线程。
t
- 导致终止的异常;如果执行正常结束,则为 null。
terminated
protected void terminated()
当 Executor 已经终止时调用的方法。默认实现不执行任何操作。注:为了正确嵌套多个重写操作,子类通常应该在此方法中调用 super.afterExecute。
实现原理
先来看ThreadPoolExecutor的execute方法,这个方法能体现出一个Task被加入到线程池之后都发生了什么:
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* 如果运行中的worker线程数少于设定的常驻线程数,增加worker线程,把task分配给新建的worker线程 */ int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } // 如果任务可以被加入到任务队列中,即等待的任务数还在允许的范围内, // 再次检查线程池是否被关闭,如果关闭的话,则移除任务并拒绝该任务 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 如果任务数超过了现有worker线程的承受范围,尝试新建worker线程 // 如果无法添加新的worker线程,则会拒绝该任务 else if (!addWorker(command, false)) reject(command); }
在执行任务时,需要经常检查线程池的状态,那么接下来说说线程池是如何进行状态控制的。上面的代码有个成员变量叫做ctl,它用于标记线程池状态和worker线程的数量,是一个AutomaticInteger对象。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
ctl是一个32位的整数,最高的3位表示状态:
- 111为running,
- 000为shutdown,
- 001为stop,
- 010为tidying,
- 011为ternimated。
因此状态值就是这三位加上29个0,因此running的值是个负整数(最高位为1),其他状态都是正整数,后面判断状态会比较值的大小时会用到这点。
剩下的29位表示worker线程的数量(因此最大允许的线程数就是2的29方减1)。
这几个状态的意义:
running表示正常运行状态
shutdown状态意味着发出了一个shutdown信号,类似于你点击了windows的关机按钮
stop表示shutdown信号收到,等于windows响应了这个信号,发出正在关机的信息
tidying发生在stop之后做出的响应,表示这个时候在清理一些资源,
ternimated发生在tidying完成之后,表示关闭完成。
接着来看添加一个worker线程时都发生了什么:
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // 返回false的情况: // 1. rs>shutdown,即shutdown和running以外的状态 // 2. shutdown的状态 // 1)firstTask不为null,即有task分配 // 2)没有task,但是workQueue(等待任务队列)为空 if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { // 1. 如果没有设定线程数的限制,worker线程数不能大于最大值(2的29次方-1) // 2. 如果是固定尺寸的线程池,不能大于固定尺寸 // 3. 如果是可扩展的线程池,不能大于规定的线程数的上限 int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; // 用CAS操作增加线程数量,如果失败,重新循环 if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; loop } } // 新建worker线程 Worker w = new Worker(firstTask); Thread t = w.thread; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { int c = ctl.get(); int rs = runStateOf(c); // 检查以下任一状态是否出现: // 1. 创建线程失败 // 2. rs>shutdown,即shutdown和running以外的状态 // 3. rs==shutdown,有任务分配 if (t == null || (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null))) { decrementWorkerCount(); tryTerminate(); return false; } workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; } finally { mainLock.unlock(); } t.start(); // 这里考虑一种极少出现的情况,如果worker线程调用start没有完成时, // 线程池进入Stop状态,这个时候会调用Thread#interrupt中断每个 // worker线程,但是 interrupt对没有start的线程不一定起作用,这样 // 就会漏掉了对这个thread的interrupt,因此在worker线程start之后 // 检查以下,如果stop了,而这个线程却没有被interrupt,补上这个漏掉 // 的interrupt。 if (runStateOf(ctl.get()) == STOP && ! t.isInterrupted()) t.interrupt(); return true; }
ThreadPoolExecutor有一个成员类叫Worker,所起到的作用就是线程池worker线程的作用。
private final class Worker extends AbstractQueuedSynchronizer implements Runnable
这里AbstractQueuedSynchronizer的作用是使Worker具有锁的功能,在执行任务时,会把Worker锁住,这个时候就无法中断Worker。Worker空闲时候是线程池可以通过获取锁,改变Worker的某些状态,在此期间因为锁被占用,Worker就是不会执行任务的。
Worker工作的逻辑在ThreadPoolExecutor#runWorker方法中:
public void run() { runWorker(this);}
runWorker():
final void runWorker(Worker w) { Runnable task = w.firstTask; w.firstTask = null; boolean completedAbruptly = true; try { // 执行分配的任务或者从BlockingQueue中等待获取任务 while (task != null || (task = getTask()) != null) { w.lock(); clearInterruptsForTaskRun(); try { // 执行任务之前的工作 beforeExecute(w.thread, task); Throwable thrown = null; // 执行任务,如果发生异常,该Worker就不会再继续执行任务 try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { // 任务执行完的工作 afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { // Worker不再执行任务的处理,completedAbruptly为false // 表示正常结束,否则表示执行任务出错。 processWorkerExit(w, completedAbruptly); }}
来看看processWorkerExit,重点看看执行任务发生异常时该如何处理:
private void processWorkerExit(Worker w, boolean completedAbruptly) { // 发生异常,首先要更新Worker数量 if (completedAbruptly) decrementWorkerCount(); // 移除这个Worker final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; workers.remove(w); } finally { mainLock.unlock(); } // 尝试停止线程池,正常运行的线程池调用该方法不会有任何动作 tryTerminate(); int c = ctl.get(); // 如果线程池没有被关闭的话, if (runStateLessThan(c, STOP)) { // Worker不是异常退出,检查worker线程数是不是小于最小值 // 这个最小值分为几种情况: // 1. allowCoreThreadTimeOut(JDK6新加)表示是否允许线程池在超 // 过一定时间没有收到任务后退出,这种情况下,最小值为0,因为如果如 // 果一直没有任何任务,worker线程数是0 // 2. 最小值为corePoolSize,因为corePoolSize可能为0,因此这种情况 // 下,如果有任务的话必然会有Worker,因此最小值为1 if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; } // 如果Worker线程数小于最小值,新建一个Worker线程 addWorker(null, false); } }
接下来说说线程池如何进行状态控制,即线程池的开启和关闭。
先来说说线程池的开启,这部分来看ThreadPoolExecutor构造方法:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueueworkQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
可以看到,尽管设定了corePoolSize,也就是Worker线程的数量,但是线程池开启的时候,默认是没有创建这些Worker线程的,但是ThreadPoolExecutor提供了prestartAllCoreThreads方法来开启所有的预设的Worker线程,以及prestartCoreThread尝试开启一个预设的Worker线程。
这里重点说说handler,也就是RejectedExecutionHandler,拒绝任务的处理类,ThreadPoolExecutor提供四种策略:
1. CallerRunsPolicy 该策略会在ThreadPoolExecutor没有关闭的情况,依旧运行任务
2. AbortPolicy 该策略会抛出一个RejectedExecutionException
3. DiscardPolicy 该策略直接忽略该任务,不会有任何动作
4. DiscardOldestPolicy 该策略会在ThreadPoolExecutor没有关闭的情况,丢弃下一个将要执行的任务,把该任务加入到执行队列。
接下来说说关闭,ThreadPoolExecutor提供了shutdown和shutdownNow两种方式,从字面上就能看出区别,后者会尝试结束正在运行的任务。
先来看shutdown:
public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(SHUTDOWN); interruptIdleWorkers(); onShutdown(); // ScheduledThreadPoolExecutor的回调方法 } finally { mainLock.unlock(); } tryTerminate(); }
再看shutdownNow:
public ListshutdownNow() { List tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(STOP); interruptWorkers(); tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); return tasks; }
两个方法的代码非常相似,区别在于:
1. shutdownNow的状态设置为STOP,shutdown的状态是SHUTDOWN
2. shutdownNow会中断所有线程,也就是所有任务,而shutdown仅仅中断空闲线程,不会影响正在执行的任务。
3. shutdownNow会导出未执行的任务。
两个方法都用到的checkShutdownAccess方法主要是检查方法调用者是否有权限中断Worker线程。
advanceRunState方法用于设定线程的状态,如果状态值大于等于该状态值则会返回。
关于 interruptIdleWorkers 和 interruptWorkers :Worker线程具备锁的功能,因此可以通过tryLock来判断Worker线程是否处于空闲状态,这是两个方法的区别所在。