并发编程 synchronized 的应用

本文通过一个转账的案例,说明 synchronized 的用法。

一、说明

假设有一个转账操作,该操作会将钱从某个账户转移至另一个账户。

二、错误的做法

transfer() 方法添加 synchronized 关键字,如下:

1
2
3
4
5
6
7
8
9
10
11
12
class Account {

private int balance;

synchronized void transfer(Account target, int amt){
if (this.balance > amt) {
this.balance -= amt;
target.balance += amt;
}
}

}

自身对象被锁住,但转账操作涉及两个资源,无法保证其它线程的 transfer() 方法不会操作 target 对象。

三、粗粒度锁

1. 锁住实例的共有部分

让所有的对象都持有一个唯一的 lock 对象,要求转账前获取 lock 对象的锁,相当于锁住了所有的 Account 对象,可以保证转账双方都被 “锁住”。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class Account {

// 在每个Account实例中都放置一个单例的lock对象
@Resource
private Object lock;

private int balance;

void transfer(Account target, int amt){
synchronized(lock) {
if (this.balance > amt) {
this.balance -= amt;
target.balance += amt;
}
}
}

}

2. 锁住类

由于 Account 实例都共享 Account.class,因此可以将其作为公共部分,锁住它即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class Account {

private int balance;

void transfer(Account target, int amt){
synchronized(Account.class) {
if (this.balance > amt) {
this.balance -= amt;
target.balance += amt;
}
}
}

}

四、细粒度锁

“粗粒度锁” 存在一个问题,那便是性能太差,同一时刻有且只能有一个转账操作可以进行。

容易想到的解决方案是 依次加锁:首先获取转出方的锁,然后获取转入方的锁,两个锁都获取后,便可以执行操作,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class Account {

private int balance;

void transfer(Account target, int amt){
synchronized(this) {
synchronized(target) {
if (this.balance > amt) {
this.balance -= amt;
target.balance += amt;
}
}
}
}

}

五、避免死锁的细粒度锁

1. 死锁

上面的 “细粒度锁” 方法存在死锁的可能,假设这样一种情况:

  • 线程 A 调用 transfer() 方法,转出方为 a,转入方为 b,获取了 a 的锁
  • 线程 B 调用 transfer() 方法,转出方为 b,转入方为 a,获取了 b 的锁
  • 线程 A 持有 a 的锁,进入阻塞状态,等待 b 的锁
  • 线程 B 持有 b 的锁,进入阻塞状态,等待 a 的锁

在没有额外干预的前提下,他们将持续僵持下去,发生 死锁

具体请看:

并发编程 死锁

2. 破坏互斥

显然,互斥是锁的本质,无法破坏。

3. 破坏占有等待

(1) 思路

只需要保证每次可以一次性申请到所有资源,就可以避免等待。

(2) 方法 1 - 轮询

编写一个锁记录类,如下:

  • 类中记录所有被加锁的资源
  • 加锁前应该调用其 apply() 方法申请加锁,apply() 方法会遍历已记录的所有锁,
    • 当申请的资源都未被上锁时,允许上锁
    • 当申请的资源之一被上锁时,不允许上锁
  • 加锁后应调用其 free() 方法说明释放锁
  • apply()free() 都用 synchronized 标识,防止有多个线程同时申请锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class LockRecorder {

/**
* 所有被加锁的资源
*/
private List<Object> lockedObjectList = new ArrayList<>();

/**
* 记录加锁
*/
synchronized boolean apply(Object... objects) {
List<Object> objectList = Arrays.asList(objects);
boolean locked = lockedObjectList.stream().anyMatch(objectList::contains);
if (locked) {
return false;
}
lockedObjectList.addAll(objectList);
return true;
}

/**
* 记录释放锁
*/
synchronized void free(Object... objects) {
List<Object> objectList = Arrays.asList(objects);
lockedObjectList.removeAll(objectList);
}

}

每次希望加锁之前,轮询锁记录类,直至允许,一次性进行加锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class Account {

@Resource
LockRecorder lockRecorder;

private int balance;

void transfer(Account target, int amt){
// 持续申请加锁,直至成功
while(!lockRecorder.apply(this, target));

synchronized(this) {
synchronized(target) {
if (this.balance > amt) {
this.balance -= amt;
target.balance += amt;
}
}
}
}

}

(3) 方法 2 - 等待通知

与循环等待相比,更好的方式是 “等待通知”:

  • 当线程要求的条件不满足时,进入等待状态
  • 当条件满足时,线程收到通知,唤醒后开始执行

因此,对锁记录类做改造,主要改造的地方是:

  • 当申请的资源之一被上锁时,调用 wait() 方法,等待被唤醒
  • 通过 while ... wait 的方式避免 “虚假唤醒”
  • free() 被调用时,调用 notifyAll() 方法,唤醒所有正在 wait 的线程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class LockRecorder {

/**
* 所有被加锁的资源
*/
private List<Object> lockedObjectList = new ArrayList<>();

/**
* 记录加锁
*/
synchronized void apply(Object... objects) {
List<Object> objectList = Arrays.asList(objects);
while (lockedObjectList.stream().anyMatch(objectList::contains)) {
wait();
}
lockedObjectList.addAll(objectList);
}

/**
* 记录释放锁
*/
synchronized void free(Object... objects) {
List<Object> objectList = Arrays.asList(objects);
lockedObjectList.removeAll(objectList);
notifyAll();
}

}

4. 破坏无法中断

synchronized 无法做到。

Java.util.concurrent 包下的 Lock 可以解决这一问题。

5. 破坏循环等待

循环等待的原因是:

  • A 需要获取一批锁,B 需要获取一批锁
  • A 占据了 B 希望获取的后续锁,B 也占据了 A 希望获取的后续锁
  • 双方既无法继续,也无法放弃,只能一直僵持

一种解决思路是:对资源进行排序,要求按序申请资源,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class Account {

private int id;

private int balance;

void transfer(Account target, int amt){
Account left = this;
Account right = target;
if (this.id > target.id) {
left = target;
right = this;
}

synchronized(left) {
synchronized(right) {
if (this.balance > amt) {
this.balance -= amt;
target.balance += amt;
}
}
}
}

}

参考

  • Java 并发编程实战