on
CAS 精谈
关于 CAS 的八股文章已经有很多了,这里不细说。
本文以 AtomicInteger.compareAndSet 举例,直接 show code,为了精简,列出来的代码会删除注释等,如果大家想细看,点开链接就可以了。
java 层
Android 12 及之前的 AtomicInteger 是直接通过 Unsafe 来实现的,而 Android 13 及之后的 AtomicInteger,则是使用 VarHandle 建了一个隔离层,最终还是通过 Unsafe 实现。
Android12 - AtomicInteger.java, Android 13 - AtomicInteger.java
public class AtomicInteger extends Number implements java.io.Serializable {
private static final long serialVersionUID = 6214790243416807050L;
private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
private static final long VALUE;
static {
try {
VALUE = U.objectFieldOffset(AtomicInteger.class.getDeclaredField("value"));
} catch (ReflectiveOperationException e) {
throw new Error(e);
}
}
private volatile int value;
......
public final int getAndIncrement() {
// Android-changed: Using VarHandle instead of Unsafe
// return U.getAndAddInt(this, VALUE, 1);
return (int)VALUE.getAndAdd(this, 1);
}
......
public final boolean compareAndSet(int expect, int update) {
return U.compareAndSwapInt(this, VALUE, expect, update);
}
......
}
public final class Unsafe {
/** Traditional dalvik name. */
private static final Unsafe THE_ONE = new Unsafe();
private static final Unsafe theUnsafe = THE_ONE;
public static final int INVALID_FIELD_OFFSET = -1;
/**
* This class is only privately instantiable.
*/
private Unsafe() {}
/**
* Gets the unique instance of this class. This is only allowed in
* very limited situations.
*/
public static Unsafe getUnsafe() {
Class<?> caller = Reflection.getCallerClass();
/*
* Only code on the bootclasspath is allowed to get at the
* Unsafe instance.
*/
ClassLoader calling = (caller == null) ? null : caller.getClassLoader();
if ((calling != null) && (calling != Unsafe.class.getClassLoader())) {
throw new SecurityException("Unsafe access denied");
}
return THE_ONE;
}
@FastNative
public native boolean compareAndSwapInt(Object obj, long offset,int expectedValue, int newValue);
@FastNative
public native int getIntVolatile(Object obj, long offset);
@FastNative
public native void putIntVolatile(Object obj, long offset, int newValue);
@FastNative
public native int getInt(Object obj, long offset);
@FastNative
public native void putInt(Object obj, long offset, int newValue);
@FastNative
public native int getInt(long address);
@FastNative
public native void putInt(long address, int x);
@IntrinsicCandidate
public final int getAndAddInt(Object o, long offset, int delta) {
int v;
do {
v = getIntVolatile(o, offset);
} while (!compareAndSwapInt(o, offset, v, v + delta));
return v;
}
}
framework 层
namespace art {
static jboolean Unsafe_compareAndSwapInt(JNIEnv* env, jobject, jobject javaObj, jlong offset,
jint expectedValue, jint newValue) {
ScopedFastNativeObjectAccess soa(env);
ObjPtr<mirror::Object> obj = soa.Decode<mirror::Object>(javaObj);
// JNI must use non transactional mode.
bool success = obj->CasField32<false>(MemberOffset(offset),
expectedValue,
newValue,
CASMode::kStrong,
std::memory_order_seq_cst);
return success ? JNI_TRUE : JNI_FALSE;
}
}
object-readbarrier-inl.h - CasField32
template<bool kTransactionActive, bool kCheckTransaction, VerifyObjectFlags kVerifyFlags>
inline bool Object::CasField32(MemberOffset field_offset,
int32_t old_value,
int32_t new_value,
CASMode mode,
std::memory_order memory_order) {
if (kCheckTransaction) {
DCHECK_EQ(kTransactionActive, Runtime::Current()->IsActiveTransaction());
}
if (kTransactionActive) {
Runtime::Current()->RecordWriteField32(this, field_offset, old_value, true);
}
if (kVerifyFlags & kVerifyThis) {
VerifyObject(this);
}
uint8_t* raw_addr = reinterpret_cast<uint8_t*>(this) + field_offset.Int32Value();
AtomicInteger* atomic_addr = reinterpret_cast<AtomicInteger*>(raw_addr);
return atomic_addr->CompareAndSet(old_value, new_value, mode, memory_order);
}
using AtomicInteger = Atomic<int32_t>;
bool CompareAndSet(T expected_value,
T desired_value,
CASMode mode,
std::memory_order memory_order) {
return mode == CASMode::kStrong
? this->compare_exchange_strong(expected_value, desired_value, memory_order)
: this->compare_exchange_weak(expected_value, desired_value, memory_order);
}
template <class _Tp>
_LIBCPP_HIDE_FROM_ABI
bool
atomic_compare_exchange_strong(volatile atomic<_Tp>* __o, typename atomic<_Tp>::value_type* __e, typename atomic<_Tp>::value_type __d) _NOEXCEPT
{
return __o->compare_exchange_strong(*__e, __d);
}
inline int32_t __sync_val_compare_and_swap(volatile int32_t* ptr, int32_t oldval, int32_t newval)
{
int32_t ret = *ptr;
(void)__sync_bool_compare_and_swap(ptr, oldval, newval);
return ret;
}
atomics_arm.c
int __atomic_cmpxchg(int old, int _new, volatile int *ptr)
{
/* We must return 0 on success */
return __sync_val_compare_and_swap(ptr, old, _new) != old;
}
int __atomic_swap(int _new, volatile int *ptr)
{
int prev;
do {
prev = *ptr;
} while (__sync_val_compare_and_swap(ptr, prev, _new) != prev);
return prev;
}
对于 arm64,compare_exchange_strong 会映射到硬件指令集的 ldxr/stxr。
LDXR 属于 “Exclusive Load and Store” 指令集,结合 STXR 指令实现原子性操作。LDXR 负责从内存中的地址加载一个值,同时设置一个排他锁,确保其他处理器或内核不能修改这个内存位置,直到锁被释放。
也就是,CAS 的实现最终其实仍然是通过“锁”来实现的,只不过并不是 jvm 和 C++ 程序中的锁来实现,而是硬件级别的锁来实现。
CAS
看到上述代码,就可以回到八股文的内容了:
在CAS中,有这样三个值:
- V:要更新的变量(var)
- E:预期值(expected)
- N:新值(new)
比较并交换的过程如下:
判断V是否等于E,如果等于,将V的值设置为N;如果不等,说明已经有其它线程更新了V,则当前线程放弃更新,什么都不做。这里的预期值E本质上指的是“旧值”。
ldxr
其实整个逻辑链路还有个最重要的问题,就是“支点”的问题,我们常说,给我一个支点,我能撬起地球。整个 CAS 流程的“支点”就是 CPU 支持的 “ldxr/stxr”,只有 cpu 级别能支持“锁”,才能通过锁一个小的东西,来撬动更大级别的锁,这也是 synchronized 等的核心原理。及:
- “ldxr/stxr” 等 cpu 能实现原子操作的指令才是从 0 - 1 的质变流程
- synchronized、atomic 等都是从 1 - 100 的量变过程
那 ldxr 等是如何实现的?
TODO
关于 ABA 问题
- 从上边的代码可以看出,AtomicInteger 实际并没有处理 ABA 问题。
- Java 中解决 ABA 问题需要使用 AtomicStampedReference 或 AtomicMarkableReference。
- ABA 大多数情况下并不是问题(所以针对只要最终结果正确的情况,放心使用 AtomicInteger 即可)。
其他
- 具体主要看被锁 “单元” 的执行规模与执行频率。而 Atomic 系列多是对于值类型的数据做操作,即保证执行规模是一个很小的量级,这才是 CAS 的适应场景。 亦即预期的收益(线程切换)是大于预期消耗(线程循环)的。而如果执行规模比较大(复杂),上下文比较多的情况,可能就不一定适用 Atomic 系列了。