本文介绍java.util.concurrent.atomic
包下的原子操作类,使用的JDK是JDK 8。
一、前导知识
为理解本文,需要的前导知识列表如下:
- volatile变量的内存语义,参见《synchronized-volatile-final关键词》
- Unsafe类中的一些方法,参见《Unsafe类》,具体有:
- 3个CAS操作方法(
compareAndSwapInt
,compareAndSwapLong
和compareAndSwapObject
) - 9个
put*Volatile
方法和9个get*Volatile
方法 putOrderedInt/putOrderedLong/putOrderedObject
方法
- 3个CAS操作方法(
- 原子操作与锁,参见《原子操作与锁》
- final关键词内存可见语义,参见《synchronized-volatile-final关键词》
二、java.util.concurrent.atomic
包下的原子操作类
可分成4类:
- 基本类型或者引用类型原子修改操作类
- 数组元素原子修改操作类
- 对象字段原子修改操作类
- 原子计数/累加操作类
2.1、基本类型或者引用类型原子修改操作类
一共有6个类,可分为两类:
- 第一类:AtomicInteger,AtomicLong,AtomicReference,AtomicBoolean。我们知道Java数据类型可分为“基本类型(boolean,byte,char,short,int,float,long,double)”和“引用类型”,在Unsafe类下有3个CAS操作方法(
compareAndSwapInt
,compareAndSwapLong
和compareAndSwapObject
),直接对应3个原子修改操作类,另外通过一定设计(以int型的“1|0”分别代表boolean型的“true|false”)获得对应于“boolean”基本类型的原子修改操作类AtomicBoolean
- 第二类:AtomicStampedReference和AtomicMarkableReference。这两个类被设计用来解决CAS操作的ABA问题
2.1.1、第一类
AtomicInteger,AtomicLong,AtomicReference,AtomicBoolean这4个类的实现大同小异,接下来以AtomicInteger类为例进行说明。
AtomicInteger有两个成员变量:
- 实例成员变量value,核心操作变量
- 类成员变量valueOffset,它保存了AtomicInteger类实例对象的内存起始地址到其内value变量内存起始地址的偏移值,之所以需要这个偏移值是因为所涉及到的几个Unsafe类下方法是以这样的方式去操作value变量的内存
在类中比较重要的两个方法是addAndGet
和lazySet
。
1、addAndGet(int delta)
AtomicInteger类中的addAndGet(int delta)
方法。
public final int addAndGet(int delta) {
return unsafe.getAndAddInt(this, valueOffset, delta) + delta;
}
Unsafe类中的getAndAddInt(Object var1, long var2, int var4)
方法。
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
return var5;
}
结合getIntVolatile方法具有volatile变量内存语义
和compareAndSwapInt方法具有voltile变量内存语义
的前导知识,可知正确实现“增加且返回操作”的原子化。
以上是JDK 8版本的实现,接下来对比JDK 6版本的实现。
JDK 6中,AtomicInteger类中的addAndGet(int delta)
方法。
public final int addAndGet(int delta) {
for (;;) {
int current = get();
int next = current + delta;
if (compareAndSet(current, next))
return next;
}
}
JDK 6中,AtomicInteger类中的get()
方法。
public final int get() {
return value;
}
两者最大的不同之处在于:具有volatile变量读语义限制地读取value变量,虽然最终等价,但是使用的方式不同——JDK 8使用getIntVolatile
方法,JDK 6直接返回value变量,而该变量本就由volatile修饰。
2、lazySet(int newValue)
基于Unsafe类下的putOrderedInt
方法实现。根据Unsafe类下putOrderedInt/putOrderedLong/putOrderedObject方法的含义
前导知识,我们知道Unsafe.putOrderedInt(Object var1, long var2, int var4)
介于“普通变量写语义”和“volatile变量写语义”之间,虽然不具有volatile变量写语义
,但是在有些场景中能够获得很大的性能提升,比如“大量写之后,再有一个读,写和读之间的可见性确保由其他机制提供,例如‘synchronized内存语义’,‘volatile内存语义’等”。
2.1.2、第二类
我们知道CAS原子操作存在ABA问题,解决ABA问题的常见方案是增加一个版本号,在java.util.concurrent.atomic
包下有两个类用于解决这个问题:
- AtomicMarkableReference,引入一个boolean类型的标记位,作为简化形式的版本号,也即只有“true”和“false”两个版本号,它只能解决偶数ABA问题,而不能解决奇数ABA问题,比如“A-B-C-A,假定标记位的变化为‘true-false-true-false’,此时(A,false)并非预期的(A,true),故修改尝试会失败;A-B-A,假定标记位的变化为‘true-false-true’,此时(A,true)为预期的(A,true),故修改尝试会成功”
- AtomicStampedReference,引入一个int类型的版本号,正统彻底的解决方案,既能解决偶数ABA问题,也能解决奇数ABA问题
接下来就主要介绍AtomicStampedReference类。
首先介绍封装引用变量reference
和版本号变量stamp
的内部类Pair,它们都由final修饰且都在构造方法中赋值,显而易见满足了final关键词内存可见语义规则
的条件。
private static class Pair<T> {
final T reference;
final int stamp;
private Pair(T reference, int stamp) {
this.reference = reference;
this.stamp = stamp;
}
static <T> Pair<T> of(T reference, int stamp) {
return new Pair<T>(reference, stamp);
}
}
然后介绍compareAndSet(V expectedReference, V newReference, int expectedStamp, int newStamp)
这个核心方法:
public boolean compareAndSet(V expectedReference, V newReference, int expectedStamp, int newStamp) {
AtomicStampedReference.Pair<V> current = pair;
return expectedReference == current.reference && expectedStamp == current.stamp
&& ((newReference == current.reference && newStamp == current.stamp)
|| casPair(current, AtomicStampedReference.Pair.of(newReference, newStamp)));
}
上述方法的语义是:只有当“旧值”和“旧版本号”都匹配时,才进行“新值”和“新版本号”的更新,其中(newReference == current.reference && newStamp == current.stamp)
逻辑表示在“旧值”和“新值”,“旧版本号”和“新版本号”都相同的情形下,无需做实际的更新操作,从而避免无必要的CAS原子操作。
2.2、数组元素原子修改操作类
包括3个类:AtomicIntegerArray,AtomicLongArray,AtomicReferenceArray。这3个类的实现大同小异,接下来以AtomicIntegerArray类为例进行说明。
AtomicIntegerArray有3个成员变量:
- 实例成员变量int[] array,核心操作变量。由final修饰,在构造方法中赋值,显而易见满足了
final关键词内存可见语义规则
的条件 - 类成员变量base,这里保存了array数组引用对象“内存起始地址”与“数组对象内元素内存起始地址”的偏移量,即数组引用对象对象头的字节数。它的计算方法是
Unsafe.arrayBaseOffset(int[].class)
。需要注意的是,是否开启指针压缩会影响该值 - 类成员变量shift,它的值为2,
2^2=4
,4是int型所占据的字节大小。它的计算方法是scale=Unsafe.arrayIndexScale(int[].class)
和shift=31 - Integer.numberOfLeadingZeros(scale)
:前者确定int型所占据的字节大小,后者计算以2为底的log(4)
对数值
通过阅读源码可以发现,对数组中某个元素进行原子修改操作的“路径”是:
- 先定位该数组元素的内存地址
- 再调用Unsafe类中的操作方法,比如
compareAndSwapInt
,putIntVolatile
等。本步实现与“基本类型或者引用类型原子修改操作类”的实现并无二致;同时佐证了Unsafe类下CAS方法,put*Volatile
和get*Volatile
方法的volatile变量内存语义并不要求变量是由volatile修饰的,因为这里的数组元素并非由volatile修饰的
另外有一点值得说明的是:在AtomicReferenceArray类中有一个独有的readObject()
方法(与“AtomicIntegerArray”和“AtomicLongArray”类相比),它是为了确保在反序列化时传入的数组类型的确是Object[]类型,而非其他虽然不会报错的数组类型,比如“String[]”,这是作为一个安全补丁而引入的[1];而在AtomicIntegerArray/AtomicLongArray类中,如果传入的数组类型不是int[]/long[]则会立即报错。
2.3、对象字段原子修改操作类
包括3个类:AtomicIntegerFieldUpdater,AtomicLongFieldUpdater,AtomicReferenceFieldUpdater。这3个类的实现大同小异,接下来以AtomicIntegerFieldUpdater类为例进行说明。
通过阅读源码可以发现,对对象中某个字段进行原子修改操作的“路径”是:
- 先定位对象中该字段的内存地址
- 再调用Unsafe类中的操作方法,比如
compareAndSwapInt
,putIntVolatile
等
这里有两点需要说明:
- 在这个使用场景中,字段的操作入口不只限于AtomicIntegerFieldUpdater类,故当混杂其他操作入口时,不能保证所涉及字段操作的有序性、可见性和原子性。比如“对于一个字段A,一处使用AtomicIntegerFieldUpdater类实例进行修改,另外一处直接进行修改”
- 在AtomicIntegerFieldUpdater类的JavaDoc中要求所操作字段是由volatile修饰的,而且构造方法也直接作了相应的限定,但其实使用Unsafe类下CAS方法,
put*Volatile
和get*Volatile
方法获得volatile变量内存语义并不要求变量是由volatile修饰的
2.4、原子计数/累加操作类
包括5个类:Striped64,LongAdder,LongAccumulator,DoubleAdder和DoubleAccumulator。
Striped64是父类,LongAdder/DoubleAdder分别是(LongAdder,LongAccumulator)/(DoubleAdder,DoubleAccumulator)系列的代表类,故接下来介绍“Striped64”,“LongAdder”和“DoubleAdder”这3个类。
2.4.1、Striped64
基本原理是:通过两处进行计数/累加,一处是使用base变量进行基本计数/累加,另外一处是通过cells数组进行多线程分段计数/累加。
核心方法是longAccumulate
和doubleAccumulate
,两者类似,以longAccumulate
为例进行说明。
final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) {
int h;
if ((h = getProbe()) == 0) {
ThreadLocalRandom.current(); // force initialization
h = getProbe();
wasUncontended = true;
}
boolean collide = false; // True if last slot nonempty
for (;;) {
Cell[] as;
Cell a;
int n;
long v;
if ((as = cells) != null && (n = as.length) > 0) {
if ((a = as[(n - 1) & h]) == null) {
if (cellsBusy == 0) { // Try to attach new Cell
Cell r = new Cell(x); // Optimistically create
if (cellsBusy == 0 && casCellsBusy()) {
boolean created = false;
try { // Recheck under lock
Cell[] rs;
int m, j;
if ((rs = cells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)
break;
continue; // Slot is now non-empty
}
}
collide = false;
} else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
else if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x))))
break;
else if (n >= NCPU || cells != as)
collide = false; // At max size or stale
else if (!collide)
collide = true;
else if (cellsBusy == 0 && casCellsBusy()) {
try {
if (cells == as) { // Expand table unless stale
Cell[] rs = new Cell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
h = advanceProbe(h);
} else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
boolean init = false;
try { // Initialize table
if (cells == as) {
Cell[] rs = new Cell[2];
rs[h & 1] = new Cell(x);
cells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
} else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x))))
break; // Fall back on using base
}
}
上述方法的主体流程是十分清晰的,就是“通过两处进行计数/累加”,但由于需要考虑各种并发情形,且为无锁编程实现方案(使用CAS自旋锁),因此相对来说理解起来较为费劲,牢记“多线程环境存在各种并发可能,关键操作通过CAS方法进行”这点可帮助理解。
接下来介绍方法中几个关键变量的含义:
cellsBusy
变量用作锁用途,cellsBusy=0
时表示未锁住态,cellsBusy=1
时表示锁住态,使用CAS方法操作该变量wasUncontended
变量,用于一个小性能优化,当其值为false时,表示在调用该方法前已经失败执行a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x)))
的等价语句,故可不执行后续最临近一次的a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x)))
语句- 当
collide
变量的值为true的时候,表示允许扩展cells
数组
2.4.2、LongAdder
Long类型的计数类,核心方法在父类Striped64中,本身主要提供获取计数结果/重置计数的方法,需要注意的是,它不是线程安全的计数数据结构,原因参见如下源码。
public long sumThenReset() {
Cell[] as = cells;
Cell a;
long sum = base;
// 此时的计数数据丢失
base = 0L;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null) {
sum += a.value;
// 此时的计数数据丢失
a.value = 0L;
}
}
}
return sum;
}
2.4.3、DoubleAdder
Double类型的计数类,跟LongAddr类似。一个实现要点是:将Double类型存储成Long类型,可操作的基础是两者都是8个字节大小。