并发编程 线程池

本文将介绍并发编程中的线程池。

一、线程池的工作模式

一般的 “XX 池” 都会提供资源的申请和释放方法,但 JUC 的线程池并不会提供这些。

线程池会自行维护线程的创建销毁和任务的排队执行,我们只需要将任务丢给线程池,它便会负责任务的执行。

下面是一个简单的线程池示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
class MyThreadPool{

/**
* 任务队列
*/
BlockingQueue<Runnable> workQueue;

/**
* 工作线程List
*/
List<WorkerThread> threads = new ArrayList<>();

/**
* 提交任务
*/
void execute(Runnable command){
workQueue.put(command);
}

/**
* 构造方法
*/
MyThreadPool(int poolSize, BlockingQueue workQueue){
this.workQueue = workQueue;
// 创建工作线程
for (int idx=0; idx < poolSize; idx++) {
WorkerThread work = new WorkerThread();
work.start();
threads.add(work);
}
}

/**
* 工作线程
* <p>负责消费并执行任务
*/
class WorkerThread extends Thread{
public void run() {
while(true){
Runnable task = workQueue.take();
task.run();
}
}
}
}

二、ThreadPoolExecutor

1. 什么是 ThreadPoolExecutor?

ThreadPoolExecutor 是 JUC 提供的最核心的线程池工具类,它是 JUC 对线程池的基础实现。

2. 实现原理

ThreadPoolExecutor 有两个核心的核心组成元素:

  • workQueue 工作队列:
    • private final BlockingQueue<Runnable> workQueue
    • 存储提交到线程池且还未执行的所有任务
  • workers 线程集合:
    • private final HashSet<Worker> workers
    • 存储线程池中的所有线程

当用户向线程池提交了一个任务,线程池会将任务放入 workQueue 中。workers 中的线程会不断从 workQueue 中获取线程并执行。

3. 构造函数

1
2
3
4
5
6
7
8
ThreadPoolExecutor(
int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
  • corePoolSize:保留在线程池中的线程数

    即使所有线程都空闲,也会始终保留 corePoolSize 个线程

  • maximumPoolSize:线程池的最大线程数

  • keepAliveTime:活跃判定时间

    如果一个线程在活跃判定时间内并没有执行任务,则此线程为不活跃

  • unit:活跃判定时间的时间单位

  • workQueue:工作队列

  • threadFactory:线程创建工厂

  • RejectedExecutionHandler:自定义任务的拒绝策略。

    当线程池无法承载更多任务时,线程将会对新来的任务进行拒绝,拒绝策略有以下几种:

    • CallerRunsPolicy:提交任务的线程自己去执行该任务
    • AbortPolicy:默认的拒绝策略,抛出 RejectedExecutionException 异常
    • DiscardPolicy:直接丢弃任务,没有任何异常抛出
    • DiscardOldestPolicy:加入该任务,丢弃最老的任务

三、任务提交与结果获取

1. execute()

可以使用 execute() 方法进行任务的提交,这一方法可以提交任务,但无法获取任务的执行结果。

可以借助 FutureTask 实现结果的获取

2. submit()

可以使用 submit() 方法进行任务的提交,这一方法可以获取任务的执行结果。

  • Future submit(Runnable task)

    提交无返回值任务(Runnable);

    返回代表任务执行成功与否的 Future,Future 可以在执行成功后获取到 null

  • <T> Future<T> submit(Callable<T> task)

    提交有返回值任务(Callable);

    返回代表任务执行结果的 Future,Future 可以在执行成功后获取到任务执行结果

  • <T> Future<T> submit(Runnable task, T result)

    提交无返回值任务(Runnable)和 “结果对象”;

    返回包装了结果对象的 Future,Future 可以在执行成功后获取到 “结果对象”

    通过该方法,可以指定 Future 返回的对象

四、工作队列的选择

首先,工作队列不宜选择无界队列,否则将可能导致 OOM(内存溢出)。

五、任务拒绝问题

在选择有界队列的情况下,如果线程池队列已满且线程均在工作,线程池便需要对新来的任务做拒绝处理,一般情况下,有几种处理方式:

  • 若任务不重要,线程池直接丢弃

  • 若任务重要,线程池拒绝后抛出异常,调用者捕获异常,进行降级处理

    所谓降级处理,就是在服务无法正常工作时进行的补救措施,常见做法是保存操作信息,待空闲后处理。

六、静态线程池工厂类 Executors

1. 什么是 Executors?

Executors 是 JUC 提供的静态线程池工厂类,可以用于快速创建线程池。

2. 不推荐使用 Executors

不建议使用 Executors 的原因是:Executors 提供的很多方法默认使用的工作队列是无界的 LinkedBlockingQueue,在高负载情况下,无界队列容易导致 OOM(内存溢出)。

七、ScheduledThreadPoolExecutor

1. 什么是 ScheduledThreadPoolExecutor?

ScheduledThreadPoolExecutor 继承自 ThreadPoolExecutor。

ScheduledThreadPoolExecutor

2. 特有方法

  • schedule():用于提交一个延迟执行的任务
  • scheduleAtFixedRate():用于提交一个周期性执行的任务;下一次执行时间 = 上一次执行开始时间 + 周期
  • scheduleWithFixedDelay:用于提交一个周期性执行的任务;下一次执行时间 = 上一次执行结束时间 + 延迟

ScheduledThreadPoolExecutor 额外提供了 schedule() 方法,该方法用于延迟或周期性执行任务。

八、异常处理

1. 外部获取异常

如果任务提交方式是 execute,可以在控制台看到错误。

如果任务提交方式是 submit,可以通过 Future.get() 获取错误。

2. 线程池对异常的处理

某一个线程异常并不会影响其它线程的正常执行, 线程池会将线程移除,并创建一个新的线程放入池中。

3. 推荐做法

  • 法 1:在 Runable 或 Callable 中捕获异常,做相应的处理
  • 法 2:在创建线程池时,传入 ThreadFactory 线程创建工厂,在创建线程时,通过 setUncaughtExceptionHandler() 配置其异常处理方法

参考

  • Java 并发编程实战