书成

再这样堕落下去就给我去死啊你这混蛋!!!

0%

线程池详解

ThreadPoolExecutor

在高并发场景中使用线程池有两个好处,其一,使用线程池降低了线程创建、销毁的开销;其二,可以限制进程使用的线程资源数量。

ThreadPoolExecutor 构造参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/**
* 创建一个新的线程池实例.
*
* @param corePoolSize 是线程池中核心线程数量
* @param maximumPoolSize 是线程池中的最大线程数量
* @param keepAliveTime 是空闲线程的最大生存时间
* @param unit 是时间单位
* @param workQueue 是任务队列,只保存通过 execute 方法提交的 Runnable 任务
* @param threadFactory 创建线程的线程工厂
* @param handler 提交任务过多时的拒绝策略
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}

线程池调度流程

拒绝策略

通过实现 RejectedExecutionHandler 接口可以自定义线程池的拒绝策略,在 JDK 中内置了4个拒绝策略:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public CallerRunsPolicy() { }
// 如果线程池未关闭则直接运行该任务,可能回导致任务提交线程性能降低
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy() { }
// 直接抛出异常,默认策略
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
public static class DiscardPolicy implements RejectedExecutionHandler {
public DiscardPolicy() { }
// 什么也不做,相当于丢弃当前提交的任务
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
public DiscardOldestPolicy() { }
// 如果线程池未关闭则丢弃任务队列中最久未执行的任务,即队列头的任务
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}

线程池内部属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// 线程池中使用一个 int 型变量保存线程池的状态以及线程有效数量
// 其中高三位表示状态,后面的29位表示线程的有效数量,因此线程的
// 有效数量为(2^29)-1,目前足够使用
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;

// 在高位存储线程池状态
private static final int RUNNING = -1 << COUNT_BITS; // 即111
private static final int SHUTDOWN = 0 << COUNT_BITS; // 即000
private static final int STOP = 1 << COUNT_BITS; // 即001
private static final int TIDYING = 2 << COUNT_BITS; // 即010
private static final int TERMINATED = 3 << COUNT_BITS; // 即011

// 解析 ctl
private static int runStateOf(int c) { return c & ~COUNT_MASK; }
private static int workerCountOf(int c) { return c & COUNT_MASK; }
private static int ctlOf(int rs, int wc) { return rs | wc; }

// 线程池全局锁
private final ReentrantLock mainLock = new ReentrantLock();
// 保存线程池中所有的 Worker ,Worker 是对线程的封装
private final HashSet<Worker> workers = new HashSet<>();
// 线程池结束等待条件
private final Condition termination = mainLock.newCondition();
// 跟踪线程池中最大大小
private int largestPoolSize;
// 线程池中已完成的任务
private long completedTaskCount;
// 如果为 True ,即使核心线程空闲也会回收!!!!
private volatile boolean allowCoreThreadTimeOut;
// 以下为构造器参数属性
private final BlockingQueue<Runnable> workQueue;
private volatile ThreadFactory threadFactory;
private volatile RejectedExecutionHandler handler;
private volatile long keepAliveTime;
private volatile int corePoolSize;
private volatile int maximumPoolSize;

线程池中的 Worker

线程池中的线程都被封装为 Worker :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* 这个类不会序列化,提供UID是为了抑制编译器警告
*/
private static final long serialVersionUID = 6138294804551838833L;

/** Worker 持有的线程,如果工厂创建失败可能会是 null */
final Thread thread;
/** 初始化的任务,可能为空 */
Runnable firstTask;
/** 每个 Worker 保存已经执行完的任务数量 */
volatile long completedTasks;

// 用一个任务来创建 Worker
Worker(Runnable firstTask) {
setState(-1); // 通过它本身的锁结构保证直到运行才可以中断 Worker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}

/** 委托外部运行该 Worker */
public void run() {
runWorker(this);
}

// 锁相关方法
protected boolean isHeldExclusively() {
return getState() != 0;
}

protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}

protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}

public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }

void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}

从源码中可以看到,Worker 本身继承了 AQS,它本身就是一把锁,在下面的runWorker函数中可以看到,当它被锁住时,说明它不是空闲线程,同时它也实现了 Runnable 接口,持有一个线程,并且保存了已经执行完的任务数量。线程池的 runWorker 方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask; // 获得要执行的第一个任务
w.firstTask = null;
w.unlock(); // Worker 初始化时同步状态为 -1 ,这里修改为0才可以被中断
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// 如果线程池已经停止就中断线程
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task); // 钩子函数
try {
task.run(); // 执行任务
afterExecute(task, null); // 钩子函数
} catch (Throwable ex) {
afterExecute(task, ex); // 钩子函数
throw ex;
}
} finally {
task = null;
w.completedTasks++; // 完成任务数加一
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly); // 清理Worker
}
}

execute 提交任务

线程池的 execute 函数如下,其流程与之前的的线程池调度流程一致:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 核心线程未满
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true)) // 尝试直接新建线程执行
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 添加队列成功后 double-check 线程池是否关闭或者是否需要新建线程
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false)) // 尝试使用非核心线程执行
reject(command);
}

线程池的 addWorker 函数如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (int c = ctl.get();;) {
// 检查线程池状态
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP)
|| firstTask != null
|| workQueue.isEmpty()))
return false;

for (;;) {
if (workerCountOf(c)
>= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
return false; // 核心线程或最大线程数量已满
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateAtLeast(c, SHUTDOWN))
continue retry;
}
}

boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int c = ctl.get();

if (isRunning(c) ||
(runStateLessThan(c, STOP) && firstTask == null)) {
if (t.isAlive()) // 线程如果已启动抛出异常
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s; // 更踪线程池最大线程数量
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start(); // 添加成功就执行,会执行到 runWorker 函数
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w); // 回滚
}
return workerStarted;
}

/**
* 回滚添加Worker操作.
* - 从 workers 中移除 Worker
* - 减小 workerCount
* - 检查线程池是否关闭
*/
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (w != null)
workers.remove(w);
decrementWorkerCount();
tryTerminate();
} finally {
mainLock.unlock();
}
}

关闭线程池

关闭线程池有两个函数如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess(); // 检查权限
advanceRunState(SHUTDOWN); // 修改线程池状态
// 中断空闲线程 (通过判断 Worker 的锁状态)
interruptIdleWorkers();
onShutdown(); // 钩子函数
} finally {
mainLock.unlock();
}
tryTerminate(); // 等待结束线程池
}
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
interruptWorkers(); // 中断所有线程
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks; // 将任务队列中未执行的任务返回
}

final void tryTerminate() {
for (;;) {
int c = ctl.get();
// 检查是否符合终止线程池条件,不符合就返回
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateLessThan(c, STOP) && ! workQueue.isEmpty()))
return;
if (workerCountOf(c) != 0) {
interruptIdleWorkers(ONLY_ONE);
return;
}

final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated(); // 钩子函数
} finally {
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll(); // 唤醒等待线程池关闭的线程
}
return;
}
} finally {
mainLock.unlock();
}
}
}

submit 提交任务

submit 用来提交有返回值的任务,源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// 构建 RunnableFuture
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}

protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}

public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}

public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}

public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}

从源码中可以看出,submit 是将任务封装为一个 RunnableFuture,然后调用 execute 方法执行,返回封装的任务。因此关键点在于 RunnableFuture,它实现了 Runnable 接口与 Future 接口,在线程池中使用的是实现该接口的 FutureTask 类,它支持取消任务,查看任务的状态等功能,该类的属性定义如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 定义 FutureTask 的状态
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;

/** 持有的 Callable 对象 */
private Callable<V> callable;
/** 执行结果,可以通过 get() 方法获取*/
private Object outcome; // non-volatile, protected by state reads/writes
/** 持有的线程 */
private volatile Thread runner;
/** 线程等待队列 */
private volatile WaitNode waiters;

该类的 run 方法定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public void run() {
if (state != NEW ||
!RUNNER.compareAndSet(this, null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call(); // 执行任务
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex); // 设置异常
}
if (ran)
set(result); // 设置返回值
}
} finally {
runner = null;
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
// 通过 CAS 设置异常或者返回值
protected void set(V v) {
if (STATE.compareAndSet(this, NEW, COMPLETING)) {
outcome = v;
STATE.setRelease(this, NORMAL); // final state
finishCompletion(); // 唤醒队列中的其它线程并移除当前线程
}
}
protected void setException(Throwable t) {
if (STATE.compareAndSet(this, NEW, COMPLETING)) {
outcome = t; // 将异常赋给 outcome
STATE.setRelease(this, EXCEPTIONAL);
finishCompletion(); // 唤醒队列中的其它线程并移除当前线程
}
}

该类的 cancel 方法可以取消任务的执行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public boolean cancel(boolean mayInterruptIfRunning) {
if (!(state == NEW && STATE.compareAndSet
(this, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try {
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt(); // 如果传入 true,线程已启动就中断
} finally {
STATE.setRelease(this, INTERRUPTED);
}
}
} finally {
finishCompletion();
}
return true;
}

该类的 get 方法会阻塞直到任务完成或出现异常:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L); // 等待任务完成
return report(s);
}

public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null)
throw new NullPointerException();
int s = state;
// 超时等待任务完成
if (s <= COMPLETING &&
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
return report(s);
}
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED) // 任务已经被取消
throw new CancellationException();
throw new ExecutionException((Throwable)x); // 抛出任务异常
}