并发编程 线程池
本文将介绍并发编程中的线程池。
一、线程池的工作模式
一般的 “XX 池” 都会提供资源的申请和释放方法,但 JUC 的线程池并不会提供这些。
线程池会自行维护线程的创建销毁和任务的排队执行,我们只需要将任务丢给线程池,它便会负责任务的执行。
下面是一个简单的线程池示例:
1 |
|
二、ThreadPoolExecutor
1. 什么是 ThreadPoolExecutor?
ThreadPoolExecutor 是 JUC 提供的最核心的线程池工具类,它是 JUC 对线程池的基础实现。
2. 实现原理
ThreadPoolExecutor 有两个核心的核心组成元素:
- workQueue 工作队列:
private final BlockingQueue<Runnable> workQueue
- 存储提交到线程池且还未执行的所有任务
- workers 线程集合:
private final HashSet<Worker> workers
- 存储线程池中的所有线程
当用户向线程池提交了一个任务,线程池会将任务放入 workQueue 中。workers 中的线程会不断从 workQueue 中获取线程并执行。
3. 构造函数
1 |
|
corePoolSize:保留在线程池中的线程数
即使所有线程都空闲,也会始终保留 corePoolSize 个线程
maximumPoolSize:线程池的最大线程数
keepAliveTime:活跃判定时间
如果一个线程在活跃判定时间内并没有执行任务,则此线程为不活跃
unit:活跃判定时间的时间单位
workQueue:工作队列
threadFactory:线程创建工厂
RejectedExecutionHandler:自定义任务的拒绝策略。
当线程池无法承载更多任务时,线程将会对新来的任务进行拒绝,拒绝策略有以下几种:
- CallerRunsPolicy:提交任务的线程自己去执行该任务
- AbortPolicy:默认的拒绝策略,抛出 RejectedExecutionException 异常
- DiscardPolicy:直接丢弃任务,没有任何异常抛出
- DiscardOldestPolicy:加入该任务,丢弃最老的任务
三、任务提交与结果获取
1. execute()
可以使用 execute()
方法进行任务的提交,这一方法可以提交任务,但无法获取任务的执行结果。
可以借助 FutureTask 实现结果的获取
2. submit()
可以使用 submit()
方法进行任务的提交,这一方法可以获取任务的执行结果。
Future submit(Runnable task)
:提交无返回值任务(Runnable);
返回代表任务执行成功与否的 Future,Future 可以在执行成功后获取到 null
<T> Future<T> submit(Callable<T> task)
:提交有返回值任务(Callable);
返回代表任务执行结果的 Future,Future 可以在执行成功后获取到任务执行结果
<T> Future<T> submit(Runnable task, T result)
:提交无返回值任务(Runnable)和 “结果对象”;
返回包装了结果对象的 Future,Future 可以在执行成功后获取到 “结果对象”
通过该方法,可以指定 Future 返回的对象
四、工作队列的选择
首先,工作队列不宜选择无界队列,否则将可能导致 OOM(内存溢出)。
五、任务拒绝问题
在选择有界队列的情况下,如果线程池队列已满且线程均在工作,线程池便需要对新来的任务做拒绝处理,一般情况下,有几种处理方式:
若任务不重要,线程池直接丢弃
若任务重要,线程池拒绝后抛出异常,调用者捕获异常,进行降级处理
所谓降级处理,就是在服务无法正常工作时进行的补救措施,常见做法是保存操作信息,待空闲后处理。
六、静态线程池工厂类 Executors
1. 什么是 Executors?
Executors 是 JUC 提供的静态线程池工厂类,可以用于快速创建线程池。
2. 不推荐使用 Executors
不建议使用 Executors 的原因是:Executors 提供的很多方法默认使用的工作队列是无界的 LinkedBlockingQueue,在高负载情况下,无界队列容易导致 OOM(内存溢出)。
七、ScheduledThreadPoolExecutor
1. 什么是 ScheduledThreadPoolExecutor?
ScheduledThreadPoolExecutor 继承自 ThreadPoolExecutor。
ScheduledThreadPoolExecutor
2. 特有方法
schedule()
:用于提交一个延迟执行的任务scheduleAtFixedRate()
:用于提交一个周期性执行的任务;下一次执行时间 = 上一次执行开始时间 + 周期scheduleWithFixedDelay
:用于提交一个周期性执行的任务;下一次执行时间 = 上一次执行结束时间 + 延迟
ScheduledThreadPoolExecutor 额外提供了 schedule()
方法,该方法用于延迟或周期性执行任务。
八、异常处理
1. 外部获取异常
如果任务提交方式是 execute,可以在控制台看到错误。
如果任务提交方式是 submit,可以通过 Future.get()
获取错误。
2. 线程池对异常的处理
某一个线程异常并不会影响其它线程的正常执行, 线程池会将线程移除,并创建一个新的线程放入池中。
3. 推荐做法
- 法 1:在 Runable 或 Callable 中捕获异常,做相应的处理
- 法 2:在创建线程池时,传入 ThreadFactory 线程创建工厂,在创建线程时,通过
setUncaughtExceptionHandler()
配置其异常处理方法
参考
- Java 并发编程实战