深入AQS

简介

AbstractQueuedSynchronizer一般简称AQS,是位于JUC包中的重要工具类,包括ReentrantLock、CountDownLatch等众多提供阻塞方法的类都是基于AQS进行编写的,理解AQS对于理解JUC包中许多类的实现都有极大的帮助。本文通过阅读AQS源码的方式,学习AQS实现的原理及技巧。下文主要包括以下几方面:

  1. AQS总览;
  2. AQS主要功能;
  3. 使用示例-排它锁

AQS总览

AQS在java.util.concurrent.locks包中,其UML图为:

可见AQS继承于AbstractOwnableSynchronizer,并且其实现中包含了NodeConditionObject两个内部类。AbstractOwnableSynchronizer类功能较为简单,不包括构造函数外只提供了两个方法:setExclusiveOwnerThreadgetExclusiveOwnerThread,用于设置和获取当前对象所属的线程,需要注意在设置及获取当前对象所属线程时,该类并没有加锁使用时需要调用人员自行保证数据的一致性。

AQS主要功能

AQS类的文档中说明其主要作用为:

Provides a framework for implementing blocking locks and related synchronizers (semaphores, events, etc) that rely on first-in-first-out (FIFO) wait queues.

简单说来就是提供一个实现基于FIFO队列的阻塞锁的框架,在使用中一般推荐继承AQS并实现以下几个方法完成同步逻辑:

  • tryAcquire:线程尝试以排他方式获取锁;
  • tryRelease:线程调用该方法释放排他锁;
  • tryAcquireShared:线程尝试以共享方式获取锁;
  • tryReleaseShared:线程调用该方法释放共享锁;
  • isHeldExclusively:锁是否被排他方式占用

以上几个方法在AQS中均为进行实现,直接调用会抛出UnsupportedOperationException异常,需要子类根据自己的需求实现对应方法。
文字描述较为抽象,通过学习AQS文档中自带的代码深入学习

使用示例-排它锁

以下代码引用自JDK8中的AQS类注释

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
class Mutex implements Lock, java.io.Serializable {

// Our internal helper class
private static class Sync extends AbstractQueuedSynchronizer {
// Reports whether in locked state
protected boolean isHeldExclusively() {
return getState() == 1;
}

// Acquires the lock if state is zero
public boolean tryAcquire(int acquires) {
assert acquires == 1; // Otherwise unused
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}

// Releases the lock by setting state to zero
protected boolean tryRelease(int releases) {
assert releases == 1; // Otherwise unused
if (getState() == 0) throw new IllegalMonitorStateException();
setExclusiveOwnerThread(null);
setState(0);
return true;
}

// Provides a Condition
Condition newCondition() { return new ConditionObject(); }

// Deserializes properly
private void readObject(ObjectInputStream s)
throws IOException, ClassNotFoundException {
s.defaultReadObject();
setState(0); // reset to unlocked state
}
}

// The sync object does all the hard work. We just forward to it.
private final Sync sync = new Sync();

public void lock() { sync.acquire(1); }
public boolean tryLock() { return sync.tryAcquire(1); }
public void unlock() { sync.release(1); }
public Condition newCondition() { return sync.newCondition(); }
public boolean isLocked() { return sync.isHeldExclusively(); }
public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); }
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
}

该类实现了一个互斥锁,实现了Lock接口,在实现过程中主要通过调用内部类Sync的对象sync完成相关工作。重点观察SyncSync继承了了AbstractQueuedSynchronizer,并实现了上文提到的5个方法中的3个,即isHeldExclusivelytryAcquiretryRelease三个方法。

加锁过程

当前线程调用lock方法请求锁时,实际上是调用了AQS的acquire函数,该函数代码为:

1
2
3
4
5
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

而该函数流程为:

tryAcquire函数实现

其中tryAcquire函数为示例类Mutex实现, 根据以上代码该函数实现时调用compareAndSetState函数。其功能为使用CAS操作原子的设置对象中一个名为stateint型变量,传入两个int类型参数,分别为预期值及设置后的值,流程如下:

compareAndSetState是AQS关键函数之一,查看compareAndSetState源码:

1
2
3
4
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

从代码可以发现compareAndSetState最终调用了sun.misc.Unsafe类的compareAndSwapInt方法,该函数可以原子性的设置属性,保证线程安全,该方法前两个参数分别为对象地址及偏移量,对象地址执行需要修改的对象,偏移量指定需要修改的整数类型数据在该对象中的内存地址,该函数实现是在Native方法中实现,查看该方法在OpenJdk9中的实现为(代码在:hotspot/src/share/vm/primsunsafe.cpp中):

1
2
3
4
5
6
UNSAFE_ENTRY(jint, Unsafe_CompareAndExchangeInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x)) {
oop p = JNIHandles::resolve(obj);
jint* addr = (jint *) index_oop_from_field_offset_long(p, offset);

return (jint)(Atomic::cmpxchg(x, addr, e));
} UNSAFE_END

其中主要为调用Atomic::cmpxchg方法,该方法在不同平台实现略有不同,以下为mac平台实现:

1
2
3
4
5
6
7
8
inline jint     Atomic::cmpxchg    (jint     exchange_value, volatile jint*     dest, jint     compare_value, cmpxchg_memory_order order) {
int mp = os::is_MP();
__asm__ volatile (LOCK_IF_MP(%4) "cmpxchgl %1,(%3)"
: "=a" (exchange_value)
: "r" (exchange_value), "a" (compare_value), "r" (dest), "r" (mp)
: "cc", "memory");
return exchange_value;
}

以下为windows实现:

1
2
3
4
5
6
7
8
9
10
11
inline jint     Atomic::cmpxchg    (jint     exchange_value, volatile jint*     dest, jint     compare_value, cmpxchg_memory_order order) {
// alternative for InterlockedCompareExchange
int mp = os::is_MP();
__asm {
mov edx, dest
mov ecx, exchange_value
mov eax, compare_value
LOCK_IF_MP(mp)
cmpxchg dword ptr [edx], ecx
}
}

可见其中主要使用了c++代码中嵌入汇编代码的方式实现,并且使用宏LOCK_IF_MP判断是否为多核处理器,在是多核处理时会增加lock操作。到当前代码依然可以使用汇编的视角进行语句分析,本文在此就略去,不过有一点小细节可见在windows实现时使用了dword关键字,可见Java整形数据在内存中的确使用了32位进行存储。
返回对于acquire函数的研究,调用了tryAcquire后,具有两种情况:

  • 获取锁成功(将值从0设置为1):tryAcquire函数继续调用setExclusiveOwnerThread函数将锁的所有者线程设置为当前线程,并返回true注意此处有之前调用compareAndSetState函数,表明只可能有设置值成功的线程可调用setExclusiveOwnerThread函数,故可保证线程安全。
  • 获取锁失败(期望设置的值不为0或设置时失败):tryAcquire函数直接返回false。

tryAcquire函数直接返回true后acquire函数不继续判断直接返回,则最上层的lock函数执行完毕,线程完成锁的申请,继续执行后续代码。tryAcquire函数直接返回false后,继续执行acquireQueued函数。

acquireQueued函数

tryAcquire函数核心为尝试将state充0设置为1,在两种情况下可能返回失败:

  1. state已经不为0:已有其他线程获取锁;
  2. state为0:但同时多个线程同时调用compareAndSwapInt,当前线程竞争失败。

无论以上何种失败,在tryAcquire函数无法获取锁的情况下,继续进行acquireQueued函数函数的调用。在调用acquireQueued之前,函数会调用addWaiter将当前线程封装为节点对象,由于调用acquire函数获取的是排它锁,故调用addWaiter函数时传入创建节点类型为Node.EXCLUSIVEaddWaiter代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}

完成如下功能:

  1. 将当前线程封装到Node对象中;
  2. 获取当前AQS对象中等待队列的尾节点;
  3. 如果当前尾节点不为空:
    a. 将新建节点的前置节点指向当前尾节点;
    b. 尝试使用compareAndSetTail函数原子性的将当前AQS对象的尾节点指向新建节点,若指向成功,将原尾节点的后续节点指向新建节点,返回新建节点对象,由于compareAndSetTail函数保证有且只有一个线程能够设置成功,则后续pred.next = node语句能在逻辑上保证线程安全;
    c. 若设置失败,继续调用enq(node)函数进行自旋.
  4. 如当前尾节点不存在或希望更新尾节点时失败,继续调用enq(node)函数。

enq(node)函数主要使用自旋循环CAS方式更新当前队列的尾节点(在首次初始化时会将新建一不携带任何逻辑信息的头结点),此处不再详细展开。
完成addWaiter代码的调用后,保证当前封装当前线程的等待节点已经插入等待队列队尾,此时开始真正调用acquireQueued函数,其代码为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

根据代码分析:

  1. 取出刚加入队列的节点的前置节点,若前置节点为头结点则再次尝试获取锁,这么做的原因是在刚才获取锁失败到当前时刻,排在现在节点之前的等待节点可能被其他线程移除;
  2. 如获取锁成功,将当前节点设置为头结点,在设置时,需要将当前节点指向的线程及前置节点的属性都置为null,以帮助GC有机会快速收回不可达对象,之后进入finally块,最后返回false
  3. 如果获取锁失败开始调用shouldParkAfterFailedAcquire函数

调用shouldParkAfterFailedAcquire函数需要传入参数包括当前节点及当前节点的前置结点,其逻辑如下:


shouldParkAfterFailedAcquire函数在前置节点状态不为Node.SIGNAL的状态下主要完成两件事:移除队列中所有状态值大于0的节点(如被取消的节点);将前置节点状态设置为Node.SIGNAL
继续acquireQueued函数的分析,在调用shouldParkAfterFailedAcquire函数后,若shouldParkAfterFailedAcquire返回false,则继续刚才的循环,知道获取锁或shouldParkAfterFailedAcquire函数返回true。由于前一次的函数调用将前置节点状态置为了Node.SIGNAL,再次调用shouldParkAfterFailedAcquire大概率情况下会返回true。
shouldParkAfterFailedAcquire函数返回true后,acquireQueued函数继续调用parkAndCheckInterrupt函数。

1
2
3
4
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}

该函数使用到了Jdk(从1.6开始)中提供的工具类方法LockSupport.park(this),代码为:

1
2
3
4
5
6
public static void park(Object blocker) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
UNSAFE.park(false, 0L);
setBlocker(t, null);
}

该函数作用设置当前线程的阻塞对象,并且调用UNSAFE.park(false, 0L)函数将当前线程设置为阻塞状态,让出CPU。UNSAFE.park函数在JVM的native方法中实现,查看JVM源码在Linux平台该函数最终主要使用了pthread_cond_waitpthread_cond_signal两个函数进行线程的阻塞和唤醒,其中具体实现本文不再赘述。此处需要注意理论上pthread_cond_signal函数的调用仅仅会唤醒一个线程,不会出现惊群现象(即同时唤醒多个线程),但是在某些平台可能无法完全保证,所以LockSupport.park函数说明了三种停止阻塞并继续执行的情况:

  1. Some other thread invokes unpark with the current thread as the target;
  2. Some other thread interrupts the current thread;
  3. The call spuriously (that is, for no reason) returns.

其中第三种情况就可能是由惊群现象所引起。但由于本文中acquireQueued函数使用死循环的方式进行了判断,即使线程被意外唤醒,也可以再次判断其锁的状态,在无法获取锁的情况下会再次阻塞,不会出现逻辑问题。
在执行了UNSAFE.park函数后,当前线程进入阻塞状态,不再占用CPU资源(此处的线程调度可参考操作系统相关实现)。当前当前线程被唤醒后从LockSupport.park函数返回,并且通过Thread.interrupted()返回当前线程中断状态。之后继续在循环中继续判断是获取锁还是继续阻塞。这里需要注意由于acquireQueued并没有处理中断,其只会在正常获取锁后返回当前线程是否是由于中断被唤醒的,所以使用acquireQueued获取锁的线程不会因为中断而停止锁的获取。
最后如果在获取锁的过程出现异常,则会调用cancelAcquire(node)函数,cancelAcquire函数执行以下逻辑:

  1. 将当前节点指向的线程置为空;
  2. 设置指向前置节点的引用,跳过所有已经被取消的前驱节点;
  3. 将当前节点的状态置为已取消状态;
  4. 如果当前节点是尾节点,原子性的设置当前节点的前置节点为尾节点,并且将前置节点的后驱节点置为空;
  5. 如果当前节点不是尾节点,则判断当前节点的前置节点是不是头结点且前置节点状态为Node.SIGNAL则将前置节点的后置节点置为当前节点的后置置节点,否则调用unparkSuccessor函数唤醒后置节点。

锁获取总结

到此为止通过分析acquire函数的实现,完成了Mutexlock函数的实现的分析。在获取锁的过程中主要使用AQS相关功能,通过维护等待队列的方式完成线程的阻塞及唤醒。Mutex类是AQS注释中的示例类,不能应用在实际生产环境中,如以上加锁过程没有考虑重入问题,生产环境使用可能出现递归调用死锁问题

解锁过程

在线程执行完成临界区代码(或出现异常退出临界区时)需要调用unlock函数进行解锁以让出排他锁。通过对于示例代码的分析,解锁函数实际使用sync.release函数进行,该函数代码为:

1
2
3
4
5
6
7
8
9
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

为了方便观察,将tryRelease函数也展示出来:

1
2
3
4
5
6
7
protected boolean tryRelease(int releases) {
assert releases == 1; // Otherwise unused
if (getState() == 0) throw new IllegalMonitorStateException();
setExclusiveOwnerThread(null);
setState(0);
return true;
}

解锁函数较为简单,当前的Mutex类假设使用锁的代码都是按照先加锁再解锁的顺序进行执行,所以在此情况下能保证同一时刻有且只有一个线程能够执行unlock函数,故解锁过程非常简单且不需要同步:

  1. 获取锁的状态,如果锁的状态为0,则表示目前没有加锁,抛出异常;
  2. 将当前锁记录的拥有锁的线程置为空;
  3. 将锁的状态置为0;
  4. 获取当前等待队列头结点,如头结点不为空且其等待状态不为0则唤醒头结点的后置节点

到此则释放锁的过程结束。Mutex类是AQS注释中的示例类,不能应用在实际生产环境中,如以上解锁过程就没有判断当前解锁的线程是不是已经得到锁的线程,这在生产环境中可能出现未知的问题

排它锁总结

本节分析了使用AQS进行实现互斥锁的方法,并深入AQS代码分析其实现原理。重点分析其加锁及解锁过程,其他锁方法类似,本处不在赘述。针对其他重要方法、共享锁及Condition类的分析后续结合JCU包中关联类进行。