并发编程 CompletionService

本文将介绍并发编程中的 CompletionService,它将 “异步任务的执行” 与 “执行结果的消费” 分离。

一、什么是 CompletionService?

CompletionService 是一种接口,它将 “异步任务的执行” 与 “执行结果的消费” 分离。

CompletionService 的实现原理是在内部维护了一个阻塞队列,当任务执行结束就将执行结果放入阻塞队列中。

二、接口方法

  • Future<V> submit(Callable<V> task):提交一个带返回值的任务;该方法将返回一个 Future,可以用它来获取任务执行状态和执行结果
  • Future<V> submit(Callable<V> task):提交一个无返回值的任务和 “结果对象”;该方法将返回一个 Future,可以用它获取任务执行状态和 “结果对象”
  • Future<V> take():阻塞式获取一个执行结果(获取后从队列中删除)
  • Future<V> poll():获取一个执行结果(获取后从队列中删除),如果没有,则返回 null
  • Future<V> poll(long timeout, TimeUnit unit):获取一个执行结果(获取后从队列中删除),如果没有,则等待一段时间,等待后如果还是没有,则返回 null

三、ExecutorCompletionService

ExecutorCompletionService 是 CompletionService 的实现类,它是使用指定的 Executor 执行任务的 CompletionService。

四、ExecutorCompletionService 创建

  • ExecutorCompletionService(Executor executor):创建使用 executor 执行任务,使用无界的 LinkedBlockingQueue 保存结果的 ExecutorCompletionService
  • ExecutorCompletionService(Executor executor, BlockingQueue<Future<V>> completionQueue):创建使用 executor 执行任务,使用 completionQueue 保存任务执行结果的 ExecutorCompletionService

五、示例

1. 需求

从三个电商询价,然后保存在自己的数据库里。

2. 同步执行任务

1
2
3
4
5
6
r1 = getPriceByS1();
save(r1);
r2 = getPriceByS2();
save(r2);
r3 = getPriceByS3();
save(r3);

这种写法是串行的,速度很慢。

3. 异步执行任务,顺序访问结果

1
2
3
4
5
6
7
8
9
10
11
ExecutorService executor = Executors.newFixedThreadPool(3);
Future<Integer> f1 = executor.submit(() -> getPriceByS1());
Future<Integer> f2 = executor.submit(() -> getPriceByS2());
Future<Integer> f3 = executor.submit(() -> getPriceByS3());

r = f1.get();
executor.execute(()-> save(r));
r = f2.get();
executor.execute(()->save(r));
r = f3.get();
executor.execute(() -> save(r));

这种做法存在一个小问题,如果 getPriceByS1() 执行较慢,即使 getPriceByS2()getPriceByS3() 执行得很快,它们也需要等待直至 getPriceByS1() 执行完成,因为对结果的访问是顺序的。

4. 异步执行任务,堵塞队列实现结果的实时访问

1
2
3
4
5
6
7
8
9
10
ExecutorService executor = Executors.newFixedThreadPool(3);
BlockingQueue<Integer> bq = new LinkedBlockingQueue<>();
executor.execute(() -> bq.put(f1.get()));
executor.execute(() -> bq.put(f2.get()));
executor.execute(() -> bq.put(f3.get()));

for (int i = 0; i < 3; i++) {
Integer r = bq.take();
executor.execute(() -> save(r));
}

使用阻塞队列保存任务的执行结果,阻塞式读取该队列,从而避免了顺序访问结果时的等待问题。

5. 通过 CompletionService 实现

1
2
3
4
5
6
7
8
9
10
ExecutorService executor = Executors.newFixedThreadPool(3);
CompletionService cs = new ExecutorCompletionService<>(executor);
cs.submit(() -> getPriceByS1());
cs.submit(() -> getPriceByS2());
cs.submit(() -> getPriceByS3());

for (int i = 0; i < 3; i++) {
Integer r = cs.take().get();
executor.execute(() -> save(r));
}

参考

  • Java 并发编程实战