并发编程 Guarded Suspension模式

本文将介绍并发编程中的 Guarded Suspension 模式。

一、什么是 Guarded Suspension?

Guarded Suspension,即保护性暂停。

它适用于这样一种场景:线程 A 调用线程 B 执行某项异步操作,又需要在线程 A 中返回操作结果。此时,应该使用 Guarded Suspension 模式,让线程 A 等待直至异步操作完成。

二、Guarded Suspension

Guarded Suspension 模式的结构图如图所示:

  • GuardedObject 表示保护者,其内部包含一个被保护对象
  • get() 方法用于获取受保护对象,当条件不满足时,将会阻塞循环等待
  • onChange() 方法用于改变被保护对象状态,并在条件得到满足时通知该条件的等待队列
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
class GuardedObject<T>{
//受保护的对象
T obj;

final Lock lock = new ReentrantLock();

final Condition done = lock.newCondition();

final int timeout=1;

/**
* 获取受保护对象
*/
T get(Predicate<T> p) {
lock.lock();
try {
while (!p.test(obj)) {
done.await(timeout, TimeUnit.SECONDS);
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
return obj;
}

/**
* 事件通知方法
*/
void onChanged(T obj) {
lock.lock();
try {
this.obj = obj;
done.signalAll();
} finally {
lock.unlock();
}
}
}

三、扩展的 Guarded Suspension

上述 Guarded Suspension 存在一个问题,子线程有可能无法访问到 GuardedObject,从而导致子线程无法改变受保护对象状态。

对 Guarded Suspension 进行了扩展,将所有的 GuardedObject 保存到静态变量中,当需要改变受保护对象状态时,可以通过调用静态方法 fireEvent() 传入对应的 key 实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
class GuardedObject<T> {
//受保护的对象
T obj;

final Lock lock = new ReentrantLock();

final Condition done = lock.newCondition();

final int timeout = 2;

/**
* 保存所有未完成条件的GuardedObject
*/
final static Map<Object, GuardedObject> gos = new ConcurrentHashMap<>();

/**
* 创建GuardedObject的静态方法
*/
static <K> GuardedObject create(K key) {
GuardedObject go = new GuardedObject();
gos.put(key, go);
return go;
}

static <K, T> void fireEvent(K key, T obj) {
GuardedObject go = gos.remove(key);
if (go != null) {
go.onChanged(obj);
}
}

/**
* 获取受保护对象
*/
T get(Predicate<T> p) {
lock.lock();
try {
while(!p.test(obj)) {
done.await(timeout, TimeUnit.SECONDS);
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
return obj;
}

/**
* 事件通知方法
*/
void onChanged(T obj) {
lock.lock();
try {
this.obj = obj;
done.signalAll();
} finally {
lock.unlock();
}
}
}

参考

  • Java 并发编程实战