畅读源码系列之J.U.C(二):聊聊CAS

【引言】CAS=Compare and Swap,通俗的解释就是比较并交换,Doug lea大神在J.U.C中大量的操作都用到了这种技术,所以这一讲作为并发系列的第一讲,我们先来看看CAS到底是个什么东东。


先睹为快

  这里我们先不深究内部的逻辑,只是先有个概念,比如下面的getAndSet方法里面,有一个compareAndSet的调用,而compareAndSet最终实现里面是调用了unsafe.compareAndSwapInt,这里的名字是不是看起来很眼熟?Compare and Swap,也就是我们今天的主题CAS了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/**
* Atomically sets to the given value and returns the previous value.
*
* @param newValue the new value
* @return the previous value
*/
public final boolean getAndSet(boolean newValue) {
boolean prev;
do {
prev = get();
} while (!compareAndSet(prev, newValue));
return prev;
}

/**
* Atomically sets the value to the given updated value
* if the current value {@code ==} the expected value.
*
* @param expect the expected value
* @param update the new value
* @return {@code true} if successful. False return indicates that
* the actual value was not equal to the expected value.
*/
public final boolean compareAndSet(boolean expect, boolean update) {
int e = expect ? 1 : 0;
int u = update ? 1 : 0;
return unsafe.compareAndSwapInt(this, valueOffset, e, u);
}

CAS是什么

  正如开篇我们解释的那样,CAS的中文解释就是比较并交换,它通常涉及到以下步骤(我们假设内存中的原数据V,旧的预期值A,需要修改的新值B):

  • 比较 A 与 V 是否相等。(比较)
  • 如果比较相等,将 B 写入 V。(交换)
  • 返回操作是否成功。

  当多个线程并发操作时,通过这个机制就可以保证只有一个线程可以操作成功,而且这个操作也不会导致其他线程的阻塞(因为比较失败的话直接就返回一个失败结果给调用者了),所以实际上CAS和数据库的乐观锁是类似的(通过版本这种类似的机制来保证通过最小的锁粒度也可以支持并发的线程安全性)。

CAS的地位

  下图是concurrent包的实现示意图,很明显的可以发现CAS是处在最底层的,和volitale是同一级别的(volitale是控制可见性、重排序),所以CAS的定位是实现整个java并发包的基石。

n++的问题

  有下面这么一段代码,很简单的,无非就是把某个值通过add方法做一个累加操作

1
2
3
4
5
6
7
8
9
10
package com.demo;

public class AddDemo {

public volatile int n;

public void add() {
n++;
}
}

  通过字节码查看工具,得出本类的字节码如下;我们重点关注一下add方法内部的逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// class version 52.0 (52)
// access flags 0x21
public class com/demo/AddDemo {

// compiled from: AddDemo.java

// access flags 0x41
public volatile I n

// access flags 0x1
public <init>()V
L0
LINENUMBER 3 L0
ALOAD 0
INVOKESPECIAL java/lang/Object.<init> ()V
RETURN
L1
LOCALVARIABLE this Lcom/demo/AddDemo; L0 L1 0
MAXSTACK = 1
MAXLOCALS = 1

// access flags 0x1
public add()V
L0
LINENUMBER 8 L0
ALOAD 0
DUP
GETFIELD com/demo/AddDemo.n : I // 【Lin.C】:n++会被拆分成3个操作,这是第一步GETFIELD
ICONST_1 // 【Lin.C】:多句嘴,这里应该是取n+1的那个1
IADD // 【Lin.C】:第二步IADD实现+1操作
PUTFIELD com/demo/AddDemo.n : I // 【Lin.C】:第三步PUTFIELD把累加后的值写回n
L1
LINENUMBER 9 L1
RETURN
L2
LOCALVARIABLE this Lcom/demo/AddDemo; L0 L2 0
MAXSTACK = 3
MAXLOCALS = 1
}

  通过volatile修饰的变量可以保证线程间的可见性、也可以控制指令重排序,但并不能保证这3个操作指令的原子性,所以在多线程情况下,是无法保证线程安全的,这就是我们所说的问题了。那这个问题怎么解决呢?

synchronized

  上面n++这个问题出现的根源,是因为n++本身不是原子操作,当然很多其他不是原子操作的方法在多线程运行时,也都会存在一样的问题;我们也都知道synchronized关键字是可以保证操作的原子性的(比如用它来锁住一个方法),这种方法固然是没问题的,是能用的,但是好用么?没有其他选择的情况下,可能也觉得是好用的;但当你接触到了别的方案,还会觉得好用么?所以知识的积累是个不断求索的过程(路漫漫其修远兮,吾将上下而求索)。

1
2
3
4
5
6
7
8
9
10
package com.demo;

public class AddDemo {

public volatile int n;

public synchronized void add() {
n++;
}
}

AtomicXXX的出现

  听说J.U.C里有一批Atomic打头的类是可以高效支持原子操作的,那么我么来看看如何?既然前面用到了int,那么理所当然的我们就先来研究一下AtomicInteger,看上去它们是有千丝万缕的联系的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/**
* ......
* @since 1.5
* @author Doug Lea
*/
public class AtomicInteger extends Number implements java.io.Serializable {

/**
* 【Lin.C】:Unsafe是CAS的核心类
* - 实际上Java是方法无法直接访问底层系统的,通常需要通过本地(native)方法来访问
* - 而Unsafe相当于一个后门,基于该类可以直接操作特定内存的数据
*/
// setup to use Unsafe.compareAndSwapInt for updates
private static final Unsafe unsafe = Unsafe.getUnsafe();

// 【Lin.C】:表示变量值在内存中的偏移地址,因为Unsafe就是根据内存偏移地址获取数据的(像不像kafka?)。
private static final long valueOffset;

static {
try {
valueOffset = unsafe.objectFieldOffset(AtomicInteger.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}

// 【Lin.C】:用volatile修饰,保证了线程间的内存可见性并可防止指令重排序。
private volatile int value;
......
}

AtomicInteger如何累加

  以下是AtomicInteger里累加的实现源码,其实很简单,主要的操作还是通过Unsafe来实现的,那么在多线程操作的时候,这里是怎么保证操作的原子性的呢?我们来看看下面这一段儿:

  • 假设有两个线程(A和B),都在执行getAndAdd操作(跑在不同CPU上):
  • 刚开始假设AtomicInteger里面的value原始值为5
  • 这时,主内存有一个value变量值为5;CPU-A的缓存、CPU-B的缓存上分别有一份这个value的副本
  • 这时线程A开始处理,取到这个value(=5),然后在路边打了个盹
  • 此时线程B也来插一脚,也取到这个value(=5)
  • 然后线程B先调用getAndAdd了,并且操作成功了,实际这时value已经=6了
  • 这时线程A慢悠悠的醒过来了,发现天都黑了,于是快马加鞭也发起了getAndAdd调用
  • 结果就在提交这个操作的时候,compareAndSwapInt发现不对啊,你手里的值(5)和我现在的值(6)不一样啊
  • 于是提交被拒绝,线程A就灰溜溜的回去重新获取然后再提交一遍了(所以说关键时刻真的不能打盹啊)
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    /**
    * Atomically adds the given value to the current value.
    *
    * @param delta the value to add
    * @return the previous value
    */
    public final int getAndAdd(int delta) {
    return unsafe.getAndAddInt(this, valueOffset, delta);
    }

    // 【Lin.C】:来自Unsafe的累加方法
    public final int getAndAddInt(Object var1, long var2, int var4) {
    int var5;
    do {
    var5 = this.getIntVolatile(var1, var2);
    } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));

    return var5;
    }

  其实上面那场戏已经严重说明了CAS的基本过程,那么我们看看Unsafe自己到底是怎么来做的(这里仅供参考,不同的平台其实实现是不一样的)。

1
2
3
4
5
6
7
8
9
10
11
UNSAFE_ENTRY(jboolean,
Unsafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x))
UnsafeWrapper("Unsafe_CompareAndSwapInt");
oop p = JNIHandles::resolve(obj);

// 【Lin.C】:先通过offset拿到变量value的内存地址addr
jint* addr = (jint *) index_oop_from_field_offset_long(p, offset);

// 【Lin.C】:通过cmpxchg实现比较替换(x是即将更新到addr的新值,e是addr位置的现有值),返回结果是e则表示替换成功了
return (jint)(Atomic::cmpxchg(x, addr, e)) == e;
UNSAFE_END

CAS是完美的么

  Ofcourse not.这个世界上就没有什么东西会是完美无缺的,包括你我在内。其实细想一下,CAS存在一个非常明显的问题,即ABA问题。
  问题:如果变量V初次读取的时候是A,并且在赋值的时候检查仍然是A,能说明它的值没有被其他线程修改过了吗?很显然是不能的(比如在这段期间曾经被改成B,然后又改回A)。
  解决方案:针对这种场景,CAS操作是无法分辨的,它会误认为V还是原来那个V,这个问题就是传说中的ABA问题;所以为了解决这个问题,J.U.C中提供了一个带有标记的原子引用类AtomicStampedReference,它可以通过控制变量的版本来保证CAS的正确性(和AtomicInteger在同一个包里面)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34

public class AtomicStampedReference<V> {

private static class Pair<T> {
final T reference;
final int stamp;
private Pair(T reference, int stamp) {
this.reference = reference;
this.stamp = stamp;
}
static <T> Pair<T> of(T reference, int stamp) {
return new Pair<T>(reference, stamp);
}
}

private volatile Pair<V> pair;
......

/**
* 【Lin.C】:从return语句可以明显的发现,有两个比较(reference、stamp),也就是值和版本的比较
*/
public boolean compareAndSet(V expectedReference,
V newReference,
int expectedStamp,
int newStamp) {
Pair<V> current = pair;
return
expectedReference == current.reference &&
expectedStamp == current.stamp &&
((newReference == current.reference &&
newStamp == current.stamp) ||
casPair(current, Pair.of(newReference, newStamp)));
}
}

  另外,虽然CAS相较于大力度的锁来说,能避免阻塞,但是它自身是会引发性能问题的,因为我们使用CAS时大部分时间使用的是 while true 方式对数据来做修改,直到成功为止,优势就是响应极快,但一旦线程数不停增加时,性能就会下降很明显,因为每个线程都需要执行占用CPU。

总结

  其实CAS的思想,在编程领域使用的很广泛,比如我们前面提到的数据库锁的思想,涉及到多线程,涉及到锁和阻塞的场景,CAS的思想想必都是值得借鉴的,作为并发concurrent包的基石,更好的掌握它可能从更大的层面对我们的设计架构都有着隐性的影响。

------2019 Lin.C ------