Java 线程池源码解析

1. 线程池的作用

  • 管理线程,减少创建线程和销毁线程的资源消耗
  • 提高了响应速度
  • 复用线程

2. 线程池的使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
ExecutorService service = Executors.newFixedThreadPool(5);
for (int i = 0; i < 5; i++) {
Future future = service.submit(new Runnable() {
@Override
public void run() {
System.out.println("Current thread name is : " + Thread.currentThread().getName());
}
});
}
service.shutdown();

// 控制台输出:
Current thread name is : Thread-0
Current thread name is : Thread-1
Current thread name is : Thread-2
Current thread name is : Thread-3
Current thread name is : Thread-4

3. 线程池的创建过程

3.1 构造函数

1
2
3
4
5
6
7
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)

函数中参数的意义:

  • corePoolSize:线程池的核心线程数量
  • maximumPoolSize:线程池最大线程数量
  • keepAliveTime:线程池中非核心线程存活时间
  • unit:存活时间的单位
  • workQueue:存放任务的阻塞队列
  • threadFactory:设置创建线程的工厂,可以给创建的线程设置有意义的名字,方便排出问题
  • handler:线程池的饱和策略事件,有四种类型
    • AbortPolicy:默认直接抛出一个异常
    • DiscardPolicy:直接丢弃任务
    • DiscardOldestPolicy:丢弃队列里最老的任务
    • CallerRunsPolicy:交给线程池所在的线程处理

3.2 常用的线程池

  • FixedThreadPool(固定线程数量)
  • ScheduledThreadPool(可定时周期执行)
  • CachedThreadPool(可缓存)
  • SingleThreadExecutor(只有一个线程)

    3.2.1 FixedThreadPool

1
2
3
4
5
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
  • 线程池特点
    只有核心线程,阻塞队列为 LinkedBlockingQueue,此队列中任务过多的话,会抛出 OOM
  • 使用场景
    适合 CPU 密集型的任务,确保 CPU 充分使用

3.2.2 ScheduledThreadPool

1
2
3
4
5
6
7
8
9
public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory);
}
  • 线程池特点
    自定义核心线程数量,最大线程数量为 Inter.MAX_VALUE,非核心线程存活时间为 0,阻塞队列是 DelayedWorkQueue
    scheduleAtFixedRate():按照某种速率周期执行
    scheduleWithFixedDelay():在某个延迟后执行

  • 使用

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    ScheduledExecutorService service = Executors.newScheduledThreadPool(5);
    service.scheduleWithFixedDelay(new Runnable() {
    @Override
    public void run() {
    System.out.println(Thread.currentThread().getName() + " : " + System.currentTimeMillis());
    }
    }, 1, 3, TimeUnit.SECONDS);
    // 输出
    pool-1-thread-1 : 1568106876257
    pool-1-thread-1 : 1568106879258
    pool-1-thread-2 : 1568106882261
    pool-1-thread-1 : 1568106885263
    pool-1-thread-3 : 1568106888267
  • 使用场景
    周期性执行定时任务

3.2.3 CachedThreadPool

1
2
3
4
5
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
  • 特点:
    无核心线程,最多有 Integer.MAX_VALUE 个非核心线程(极端情况下会耗尽系统资源),每个线程存活 60s,队列使用 SynchronousQueue
  • 使用场景
    并发执行大量周期短的小任务

3.2.4 SingleThreadExecutor

1
2
3
4
5
6
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
  • 特点
    只有一个核心线程,阻塞队列是 LinkedBlockingQueue
  • 使用场景
    适合串行的执行任务。

4. 线程池的执行流程

先从 submit 开始

1
2
3
4
5
6
7
8
9
10
11
12
13
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}

传到 submit 方法中的 Runnable 对象被封装成一个 RunnableFuture 对象(实现了 Runnable 接口)。
然后执行 execute 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
* 1. 如果少于 corePoolSize 的线程在执行,去启动一个新的线程执行任务
* addWorker 会自动检测 runState 和 workerCount
* 所以通过返回 false 组织错误参数被设置
*
* 2. 如果一个任务能成功加入队列,需要执行第二次检查,因为存在的当前线程从上次检查后死去了
*
* 3. 如果无法插入队列,尝试增加一个新的线程,若干失败了,就停止或者拒绝这个任务
*/
int c = ctl.get();
// workCountOf(),获取当前线程的数量,如果当前工作的线程数量少于 corePoolSize,创建新的核心线程
if (workerCountOf(c) < corePoolSize) {
// addWorkder 的第二个参数为 true,创建核心线程,非 true,创建普通线程
if (addWorker(command, true))
return;
// 创建线程失败的话,重新获取当前线程数量
c = ctl.get();
}
// 如果当前的线程在执行,并且这个线程在 workQueue 中
if (isRunning(c) && workQueue.offer(command)) {
// 再获取 ctl 的值
int recheck = ctl.get();
// 如果线程没有在执行,移除上次添加的线程
if (! isRunning(recheck) && remove(command))
// reject 中执行 handler.rejectedExecution(command, this); 饱和策略事件
reject(command);
// 如果当前线程池中线程的数量为 0,执行 addWorker 添加线程(此时创建的不是核心线程)
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 非核心线程也到了上限,拒绝任务并执行饱和策略
else if (!addWorker(command, false))
reject(command);
}

// ...
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static int ctlOf(int rs, int wc) { return rs | wc; }
// ctl 是对线程池运行状态和线程池中有效线程数量进行控制的一个字段,
// 包含两部分:runState 和 workerCount

最后看 addWorker 如何创建一个线程并执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
// 获取 ctl
int c = ctl.get();
// 获取 ctl 中保存的运行状态
int rs = runStateOf(c);

// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;

for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}

boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 创建 Worker 对象 w,
w = new Worker(firstTask);
// Worker 会使用 threadFactory 创建线程 t
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());

if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 启动线程
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}

在 addWorker 方法里创建好线程之后,最后调用线程的 start 方法启动线程
addWorker 中创建的 Worker,保存了传入的任务,并创建了线程

1
2
3
4
5
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}

5. 线程池的状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
* The runState provides the main lifecycle control, taking on values:
*
* RUNNING: Accept new tasks and process queued tasks(接受新的任务,并且处理队列中的任务)
* SHUTDOWN: Don't accept new tasks, but process queued tasks(不接受新任务,但是处理队列中的任务)

* STOP: Don't accept new tasks, don't process queued tasks,
* and interrupt in-progress tasks(不接受新任务,不执行队列中的任务,打断正在执行的任务)

* TIDYING: All tasks have terminated, workerCount is zero,
* the thread transitioning to state TIDYING
* will run the terminated() hook method(状态变成 TIDYING 的线程会执行 terminated)

* TERMINATED: terminated() has completed(terminated() 完成后进入这个状态)

*
* RUNNING -> SHUTDOWN
* On invocation of shutdown(), perhaps implicitly in finalize()
* (RUNNING or SHUTDOWN) -> STOP
* On invocation of shutdownNow()
* SHUTDOWN -> TIDYING
* When both queue and pool are empty
* STOP -> TIDYING
* When pool is empty
* TIDYING -> TERMINATED
* When the terminated() hook method has completed
*/
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;