CAS 精谈

关于 CAS 的八股文章已经有很多了,这里不细说。

本文以 AtomicInteger.compareAndSet 举例,直接 show code,为了精简,列出来的代码会删除注释等,如果大家想细看,点开链接就可以了。

java 层

Android 12 及之前的 AtomicInteger 是直接通过 Unsafe 来实现的,而 Android 13 及之后的 AtomicInteger,则是使用 VarHandle 建了一个隔离层,最终还是通过 Unsafe 实现。
Android12 - AtomicInteger.java, Android 13 - AtomicInteger.java

public class AtomicInteger extends Number implements java.io.Serializable {
    private static final long serialVersionUID = 6214790243416807050L;

    private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
    private static final long VALUE;

    static {
        try {
            VALUE = U.objectFieldOffset(AtomicInteger.class.getDeclaredField("value"));
        } catch (ReflectiveOperationException e) {
            throw new Error(e);
        }
    }

    private volatile int value;

    ......

    public final int getAndIncrement() {
        // Android-changed: Using VarHandle instead of Unsafe
        // return U.getAndAddInt(this, VALUE, 1);
        return (int)VALUE.getAndAdd(this, 1);
    }

    ......

    public final boolean compareAndSet(int expect, int update) {
        return U.compareAndSwapInt(this, VALUE, expect, update);
    }

    ......
}

Unsafe.java

public final class Unsafe {
    /** Traditional dalvik name. */
    private static final Unsafe THE_ONE = new Unsafe();

    private static final Unsafe theUnsafe = THE_ONE;
    public static final int INVALID_FIELD_OFFSET   = -1;

    /**
     * This class is only privately instantiable.
     */
    private Unsafe() {}

    /**
     * Gets the unique instance of this class. This is only allowed in
     * very limited situations.
     */
    public static Unsafe getUnsafe() {
        Class<?> caller = Reflection.getCallerClass();
        /*
         * Only code on the bootclasspath is allowed to get at the
         * Unsafe instance.
         */
        ClassLoader calling = (caller == null) ? null : caller.getClassLoader();
        if ((calling != null) && (calling != Unsafe.class.getClassLoader())) {
            throw new SecurityException("Unsafe access denied");
        }

        return THE_ONE;
    }

    
    @FastNative
    public native boolean compareAndSwapInt(Object obj, long offset,int expectedValue, int newValue);

    
    @FastNative
    public native int getIntVolatile(Object obj, long offset);

    @FastNative
    public native void putIntVolatile(Object obj, long offset, int newValue);

    @FastNative
    public native int getInt(Object obj, long offset);

    @FastNative
    public native void putInt(Object obj, long offset, int newValue);

    @FastNative
    public native int getInt(long address);

    @FastNative
    public native void putInt(long address, int x);

    @IntrinsicCandidate
    public final int getAndAddInt(Object o, long offset, int delta) {
        int v;
        do {
            v = getIntVolatile(o, offset);
        } while (!compareAndSwapInt(o, offset, v, v + delta));
        return v;
    }
}

framework 层

sun_misc_Unsafe.cc

namespace art {

static jboolean Unsafe_compareAndSwapInt(JNIEnv* env, jobject, jobject javaObj, jlong offset,
                                         jint expectedValue, jint newValue) {
  ScopedFastNativeObjectAccess soa(env);
  ObjPtr<mirror::Object> obj = soa.Decode<mirror::Object>(javaObj);
  // JNI must use non transactional mode.
  bool success = obj->CasField32<false>(MemberOffset(offset),
                                        expectedValue,
                                        newValue,
                                        CASMode::kStrong,
                                        std::memory_order_seq_cst);
  return success ? JNI_TRUE : JNI_FALSE;
}
}

object-readbarrier-inl.h - CasField32

template<bool kTransactionActive, bool kCheckTransaction, VerifyObjectFlags kVerifyFlags>
inline bool Object::CasField32(MemberOffset field_offset,
                               int32_t old_value,
                               int32_t new_value,
                               CASMode mode,
                               std::memory_order memory_order) {
  if (kCheckTransaction) {
    DCHECK_EQ(kTransactionActive, Runtime::Current()->IsActiveTransaction());
  }
  if (kTransactionActive) {
    Runtime::Current()->RecordWriteField32(this, field_offset, old_value, true);
  }
  if (kVerifyFlags & kVerifyThis) {
    VerifyObject(this);
  }
  uint8_t* raw_addr = reinterpret_cast<uint8_t*>(this) + field_offset.Int32Value();
  AtomicInteger* atomic_addr = reinterpret_cast<AtomicInteger*>(raw_addr);

  return atomic_addr->CompareAndSet(old_value, new_value, mode, memory_order);
}
using AtomicInteger = Atomic<int32_t>;

atomic.h - CompareAndSet

  bool CompareAndSet(T expected_value,
                     T desired_value,
                     CASMode mode,
                     std::memory_order memory_order) {
    return mode == CASMode::kStrong
        ? this->compare_exchange_strong(expected_value, desired_value, memory_order)
        : this->compare_exchange_weak(expected_value, desired_value, memory_order);
  }
template <class _Tp>
_LIBCPP_HIDE_FROM_ABI
bool
atomic_compare_exchange_strong(volatile atomic<_Tp>* __o, typename atomic<_Tp>::value_type* __e, typename atomic<_Tp>::value_type __d) _NOEXCEPT
{
    return __o->compare_exchange_strong(*__e, __d);
}
inline int32_t __sync_val_compare_and_swap(volatile int32_t* ptr, int32_t oldval, int32_t newval)
{
    int32_t ret = *ptr;
    (void)__sync_bool_compare_and_swap(ptr, oldval, newval);
    return ret;
}

atomics_arm.c

int __atomic_cmpxchg(int old, int _new, volatile int *ptr)
{
    /* We must return 0 on success */
    return __sync_val_compare_and_swap(ptr, old, _new) != old;
}

int __atomic_swap(int _new, volatile int *ptr)
{
    int prev;
    do {
        prev = *ptr;
    } while (__sync_val_compare_and_swap(ptr, prev, _new) != prev);
    return prev;
}

对于 arm64,compare_exchange_strong 会映射到硬件指令集的 ldxr/stxr。
LDXR 属于 “Exclusive Load and Store” 指令集,结合 STXR 指令实现原子性操作。LDXR 负责从内存中的地址加载一个值,同时设置一个排他锁,确保其他处理器或内核不能修改这个内存位置,直到锁被释放。
也就是,CAS 的实现最终其实仍然是通过“锁”来实现的,只不过并不是 jvm 和 C++ 程序中的锁来实现,而是硬件级别的锁来实现。

CAS

看到上述代码,就可以回到八股文的内容了:

在CAS中,有这样三个值:

比较并交换的过程如下:
判断V是否等于E,如果等于,将V的值设置为N;如果不等,说明已经有其它线程更新了V,则当前线程放弃更新,什么都不做。这里的预期值E本质上指的是“旧值”。

ldxr

其实整个逻辑链路还有个最重要的问题,就是“支点”的问题,我们常说,给我一个支点,我能撬起地球。整个 CAS 流程的“支点”就是 CPU 支持的 “ldxr/stxr”,只有 cpu 级别能支持“锁”,才能通过锁一个小的东西,来撬动更大级别的锁,这也是 synchronized 等的核心原理。及:

那 ldxr 等是如何实现的?

TODO

关于 ABA 问题

  1. 从上边的代码可以看出,AtomicInteger 实际并没有处理 ABA 问题。
  2. Java 中解决 ABA 问题需要使用 AtomicStampedReference 或 AtomicMarkableReference。
  3. ABA 大多数情况下并不是问题(所以针对只要最终结果正确的情况,放心使用 AtomicInteger 即可)。

其他

  1. 具体主要看被锁 “单元” 的执行规模与执行频率。而 Atomic 系列多是对于值类型的数据做操作,即保证执行规模是一个很小的量级,这才是 CAS 的适应场景。 亦即预期的收益(线程切换)是大于预期消耗(线程循环)的。而如果执行规模比较大(复杂),上下文比较多的情况,可能就不一定适用 Atomic 系列了。