0%

原子操作类

本文介绍java.util.concurrent.atomic包下的原子操作类,使用的JDK是JDK 8。

一、前导知识

为理解本文,需要的前导知识列表如下:

二、java.util.concurrent.atomic包下的原子操作类

可分成4类:

  • 基本类型或者引用类型原子修改操作类
  • 数组元素原子修改操作类
  • 对象字段原子修改操作类
  • 原子计数/累加操作类

2.1、基本类型或者引用类型原子修改操作类

一共有6个类,可分为两类:

  • 第一类:AtomicInteger,AtomicLong,AtomicReference,AtomicBoolean。我们知道Java数据类型可分为“基本类型(boolean,byte,char,short,int,float,long,double)”和“引用类型”,在Unsafe类下有3个CAS操作方法(compareAndSwapIntcompareAndSwapLongcompareAndSwapObject),直接对应3个原子修改操作类,另外通过一定设计(以int型的“1|0”分别代表boolean型的“true|false”)获得对应于“boolean”基本类型的原子修改操作类AtomicBoolean
  • 第二类:AtomicStampedReference和AtomicMarkableReference。这两个类被设计用来解决CAS操作的ABA问题

2.1.1、第一类

AtomicInteger,AtomicLong,AtomicReference,AtomicBoolean这4个类的实现大同小异,接下来以AtomicInteger类为例进行说明。

AtomicInteger有两个成员变量:

  • 实例成员变量value,核心操作变量
  • 类成员变量valueOffset,它保存了AtomicInteger类实例对象的内存起始地址到其内value变量内存起始地址的偏移值,之所以需要这个偏移值是因为所涉及到的几个Unsafe类下方法是以这样的方式去操作value变量的内存

在类中比较重要的两个方法是addAndGetlazySet
1、addAndGet(int delta)
AtomicInteger类中的addAndGet(int delta)方法。

1
2
3
public final int addAndGet(int delta) {
return unsafe.getAndAddInt(this, valueOffset, delta) + delta;
}

Unsafe类中的getAndAddInt(Object var1, long var2, int var4)方法。

1
2
3
4
5
6
7
8
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));

return var5;
}

结合getIntVolatile方法具有volatile变量内存语义compareAndSwapInt方法具有voltile变量内存语义的前导知识,可知正确实现“增加且返回操作”的原子化。


以上是JDK 8版本的实现,接下来对比JDK 6版本的实现。
JDK 6中,AtomicInteger类中的addAndGet(int delta)方法。

1
2
3
4
5
6
7
8
public final int addAndGet(int delta) {
for (;;) {
int current = get();
int next = current + delta;
if (compareAndSet(current, next))
return next;
}
}

JDK 6中,AtomicInteger类中的get()方法。

1
2
3
public final int get() {
return value;
}

两者最大的不同之处在于:具有volatile变量读语义限制地读取value变量,虽然最终等价,但是使用的方式不同——JDK 8使用getIntVolatile方法,JDK 6直接返回value变量,而该变量本就由volatile修饰。

2、lazySet(int newValue)
基于Unsafe类下的putOrderedInt方法实现。根据Unsafe类下putOrderedInt/putOrderedLong/putOrderedObject方法的含义前导知识,我们知道Unsafe.putOrderedInt(Object var1, long var2, int var4)介于“普通变量写语义”和“volatile变量写语义”之间,虽然不具有volatile变量写语义,但是在有些场景中能够获得很大的性能提升,比如“大量写之后,再有一个读,写和读之间的可见性确保由其他机制提供,例如‘synchronized内存语义’,‘volatile内存语义’等”。

2.1.2、第二类

我们知道CAS原子操作存在ABA问题,解决ABA问题的常见方案是增加一个版本号,在java.util.concurrent.atomic包下有两个类用于解决这个问题:

  • AtomicMarkableReference,引入一个boolean类型的标记位,作为简化形式的版本号,也即只有“true”和“false”两个版本号,它只能解决偶数ABA问题,而不能解决奇数ABA问题,比如“A-B-C-A,假定标记位的变化为‘true-false-true-false’,此时(A,false)并非预期的(A,true),故修改尝试会失败;A-B-A,假定标记位的变化为‘true-false-true’,此时(A,true)为预期的(A,true),故修改尝试会成功”
  • AtomicStampedReference,引入一个int类型的版本号,正统彻底的解决方案,既能解决偶数ABA问题,也能解决奇数ABA问题

接下来就主要介绍AtomicStampedReference类。

首先介绍封装引用变量reference版本号变量stamp的内部类Pair,它们都由final修饰且都在构造方法中赋值,显而易见满足了final关键词内存可见语义规则的条件。

1
2
3
4
5
6
7
8
9
10
11
private static class Pair<T> {
final T reference;
final int stamp;
private Pair(T reference, int stamp) {
this.reference = reference;
this.stamp = stamp;
}
static <T> Pair<T> of(T reference, int stamp) {
return new Pair<T>(reference, stamp);
}
}

然后介绍compareAndSet(V expectedReference, V newReference, int expectedStamp, int newStamp)这个核心方法:

1
2
3
4
5
6
public boolean compareAndSet(V expectedReference, V newReference, int expectedStamp, int newStamp) {
AtomicStampedReference.Pair<V> current = pair;
return expectedReference == current.reference && expectedStamp == current.stamp
&& ((newReference == current.reference && newStamp == current.stamp)
|| casPair(current, AtomicStampedReference.Pair.of(newReference, newStamp)));
}

上述方法的语义是:只有当“旧值”和“旧版本号”都匹配时,才进行“新值”和“新版本号”的更新,其中(newReference == current.reference && newStamp == current.stamp)逻辑表示在“旧值”和“新值”,“旧版本号”和“新版本号”都相同的情形下,无需做实际的更新操作,从而避免无必要的CAS原子操作。

2.2、数组元素原子修改操作类

包括3个类:AtomicIntegerArray,AtomicLongArray,AtomicReferenceArray。这3个类的实现大同小异,接下来以AtomicIntegerArray类为例进行说明。

AtomicIntegerArray有3个成员变量:

  • 实例成员变量int[] array,核心操作变量。由final修饰,在构造方法中赋值,显而易见满足了final关键词内存可见语义规则的条件
  • 类成员变量base,这里保存了array数组引用对象“内存起始地址”与“数组对象内元素内存起始地址”的偏移量,即数组引用对象对象头的字节数。它的计算方法是Unsafe.arrayBaseOffset(int[].class)。需要注意的是,是否开启指针压缩会影响该值
  • 类成员变量shift,它的值为2,2^2=4,4是int型所占据的字节大小。它的计算方法是scale=Unsafe.arrayIndexScale(int[].class)shift=31 - Integer.numberOfLeadingZeros(scale):前者确定int型所占据的字节大小,后者计算以2为底的log(4)对数值

通过阅读源码可以发现,对数组中某个元素进行原子修改操作的“路径”是:

  1. 先定位该数组元素的内存地址
  2. 再调用Unsafe类中的操作方法,比如compareAndSwapIntputIntVolatile等。本步实现与“基本类型或者引用类型原子修改操作类”的实现并无二致;同时佐证了Unsafe类下CAS方法,put*Volatileget*Volatile方法的volatile变量内存语义并不要求变量是由volatile修饰的,因为这里的数组元素并非由volatile修饰的

另外有一点值得说明的是:在AtomicReferenceArray类中有一个独有的readObject()方法(与“AtomicIntegerArray”和“AtomicLongArray”类相比),它是为了确保在反序列化时传入的数组类型的确是Object[]类型,而非其他虽然不会报错的数组类型,比如“String[]”,这是作为一个安全补丁而引入的[1];而在AtomicIntegerArray/AtomicLongArray类中,如果传入的数组类型不是int[]/long[]则会立即报错。

2.3、对象字段原子修改操作类

包括3个类:AtomicIntegerFieldUpdater,AtomicLongFieldUpdater,AtomicReferenceFieldUpdater。这3个类的实现大同小异,接下来以AtomicIntegerFieldUpdater类为例进行说明。

通过阅读源码可以发现,对对象中某个字段进行原子修改操作的“路径”是:

  1. 先定位对象中该字段的内存地址
  2. 再调用Unsafe类中的操作方法,比如compareAndSwapIntputIntVolatile

这里有两点需要说明:

  • 在这个使用场景中,字段的操作入口不只限于AtomicIntegerFieldUpdater类,故当混杂其他操作入口时,不能保证所涉及字段操作的有序性、可见性和原子性。比如“对于一个字段A,一处使用AtomicIntegerFieldUpdater类实例进行修改,另外一处直接进行修改”
  • 在AtomicIntegerFieldUpdater类的JavaDoc中要求所操作字段是由volatile修饰的,而且构造方法也直接作了相应的限定,但其实使用Unsafe类下CAS方法,put*Volatileget*Volatile方法获得volatile变量内存语义并不要求变量是由volatile修饰的

2.4、原子计数/累加操作类

包括5个类:Striped64,LongAdder,LongAccumulator,DoubleAdder和DoubleAccumulator。
Striped64是父类,LongAdder/DoubleAdder分别是(LongAdder,LongAccumulator)/(DoubleAdder,DoubleAccumulator)系列的代表类,故接下来介绍“Striped64”,“LongAdder”和“DoubleAdder”这3个类。

2.4.1、Striped64

基本原理是:通过两处进行计数/累加,一处是使用base变量进行基本计数/累加,另外一处是通过cells数组进行多线程分段计数/累加。

核心方法是longAccumulatedoubleAccumulate,两者类似,以longAccumulate为例进行说明。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) {
int h;
if ((h = getProbe()) == 0) {
ThreadLocalRandom.current(); // force initialization
h = getProbe();
wasUncontended = true;
}
boolean collide = false; // True if last slot nonempty
for (;;) {
Cell[] as;
Cell a;
int n;
long v;
if ((as = cells) != null && (n = as.length) > 0) {
if ((a = as[(n - 1) & h]) == null) {
if (cellsBusy == 0) { // Try to attach new Cell
Cell r = new Cell(x); // Optimistically create
if (cellsBusy == 0 && casCellsBusy()) {
boolean created = false;
try { // Recheck under lock
Cell[] rs;
int m, j;
if ((rs = cells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)
break;
continue; // Slot is now non-empty
}
}
collide = false;
} else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
else if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x))))
break;
else if (n >= NCPU || cells != as)
collide = false; // At max size or stale
else if (!collide)
collide = true;
else if (cellsBusy == 0 && casCellsBusy()) {
try {
if (cells == as) { // Expand table unless stale
Cell[] rs = new Cell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
h = advanceProbe(h);
} else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
boolean init = false;
try { // Initialize table
if (cells == as) {
Cell[] rs = new Cell[2];
rs[h & 1] = new Cell(x);
cells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
} else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x))))
break; // Fall back on using base
}
}

上述方法的主体流程是十分清晰的,就是“通过两处进行计数/累加”,但由于需要考虑各种并发情形,且为无锁编程实现方案(使用CAS自旋锁),因此相对来说理解起来较为费劲,牢记“多线程环境存在各种并发可能,关键操作通过CAS方法进行”这点可帮助理解。

接下来介绍方法中几个关键变量的含义:

  • cellsBusy变量用作锁用途,cellsBusy=0时表示未锁住态,cellsBusy=1时表示锁住态,使用CAS方法操作该变量
  • wasUncontended变量,用于一个小性能优化,当其值为false时,表示在调用该方法前已经失败执行a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x)))的等价语句,故可不执行后续最临近一次的a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x)))语句
  • collide变量的值为true的时候,表示允许扩展cells数组

2.4.2、LongAdder

Long类型的计数类,核心方法在父类Striped64中,本身主要提供获取计数结果/重置计数的方法,需要注意的是,它不是线程安全的计数数据结构,原因参见如下源码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public long sumThenReset() {
Cell[] as = cells;
Cell a;
long sum = base;

// 此时的计数数据丢失

base = 0L;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null) {
sum += a.value;

// 此时的计数数据丢失

a.value = 0L;
}
}
}
return sum;
}

2.4.3、DoubleAdder

Double类型的计数类,跟LongAddr类似。一个实现要点是:将Double类型存储成Long类型,可操作的基础是两者都是8个字节大小。


参考文献

[1]https://stackoverflow.com/questions/64622143/why-need-the-readobject-overriding-method-in-atomicreferencearray

您的支持将鼓励我继续分享!