简介
AbstractQueuedSynchronizer一般简称AQS,是位于JUC包中的重要工具类,包括ReentrantLock、CountDownLatch等众多提供阻塞方法的类都是基于AQS进行编写的,理解AQS对于理解JUC包中许多类的实现都有极大的帮助。本文通过阅读AQS源码的方式,学习AQS实现的原理及技巧。下文主要包括以下几方面:
AQS总览
AQS在java.util.concurrent.locks
包中,其UML图为:
可见AQS继承于AbstractOwnableSynchronizer
,并且其实现中包含了Node
及ConditionObject
两个内部类。AbstractOwnableSynchronizer
类功能较为简单,不包括构造函数外只提供了两个方法:setExclusiveOwnerThread
及getExclusiveOwnerThread
,用于设置和获取当前对象所属的线程,需要注意在设置及获取当前对象所属线程时,该类并没有加锁使用时需要调用人员自行保证数据的一致性。
AQS主要功能
AQS类的文档中说明其主要作用为:
Provides a framework for implementing blocking locks and related synchronizers (semaphores, events, etc) that rely on first-in-first-out (FIFO) wait queues.
简单说来就是提供一个实现基于FIFO队列的阻塞锁的框架,在使用中一般推荐继承AQS并实现以下几个方法完成同步逻辑:
- tryAcquire:线程尝试以排他方式获取锁;
- tryRelease:线程调用该方法释放排他锁;
- tryAcquireShared:线程尝试以共享方式获取锁;
- tryReleaseShared:线程调用该方法释放共享锁;
- isHeldExclusively:锁是否被排他方式占用
以上几个方法在AQS中均为进行实现,直接调用会抛出UnsupportedOperationException
异常,需要子类根据自己的需求实现对应方法。
文字描述较为抽象,通过学习AQS文档中自带的代码深入学习
使用示例-排它锁
以下代码引用自JDK8中的AQS类注释
1 | class Mutex implements Lock, java.io.Serializable { |
该类实现了一个互斥锁,实现了Lock
接口,在实现过程中主要通过调用内部类Sync
的对象sync
完成相关工作。重点观察Sync
,Sync
继承了了AbstractQueuedSynchronizer
,并实现了上文提到的5个方法中的3个,即isHeldExclusively
、tryAcquire
及tryRelease
三个方法。
加锁过程
当前线程调用lock
方法请求锁时,实际上是调用了AQS的acquire
函数,该函数代码为:
1 | public final void acquire(int arg) { |
而该函数流程为:
tryAcquire
函数实现
其中tryAcquire
函数为示例类Mutex
实现, 根据以上代码该函数实现时调用compareAndSetState
函数。其功能为使用CAS操作原子的设置对象中一个名为state
的int
型变量,传入两个int
类型参数,分别为预期值及设置后的值,流程如下:
compareAndSetState
是AQS关键函数之一,查看compareAndSetState
源码:1
2
3
4protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
从代码可以发现compareAndSetState
最终调用了sun.misc.Unsafe
类的compareAndSwapInt
方法,该函数可以原子性的设置属性,保证线程安全,该方法前两个参数分别为对象地址及偏移量,对象地址执行需要修改的对象,偏移量指定需要修改的整数类型数据在该对象中的内存地址,该函数实现是在Native方法中实现,查看该方法在OpenJdk9中的实现为(代码在:hotspot/src/share/vm/primsunsafe.cpp中):
1 | UNSAFE_ENTRY(jint, Unsafe_CompareAndExchangeInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x)) { |
其中主要为调用Atomic::cmpxchg
方法,该方法在不同平台实现略有不同,以下为mac平台实现:
1 | inline jint Atomic::cmpxchg (jint exchange_value, volatile jint* dest, jint compare_value, cmpxchg_memory_order order) { |
以下为windows实现:
1 | inline jint Atomic::cmpxchg (jint exchange_value, volatile jint* dest, jint compare_value, cmpxchg_memory_order order) { |
可见其中主要使用了c++代码中嵌入汇编代码的方式实现,并且使用宏LOCK_IF_MP
判断是否为多核处理器,在是多核处理时会增加lock操作。到当前代码依然可以使用汇编的视角进行语句分析,本文在此就略去,不过有一点小细节可见在windows实现时使用了dword关键字,可见Java整形数据在内存中的确使用了32位进行存储。
返回对于acquire
函数的研究,调用了tryAcquire
后,具有两种情况:
- 获取锁成功(将值从0设置为1):
tryAcquire
函数继续调用setExclusiveOwnerThread
函数将锁的所有者线程设置为当前线程,并返回true
,注意此处有之前调用compareAndSetState
函数,表明只可能有设置值成功的线程可调用setExclusiveOwnerThread
函数,故可保证线程安全。 - 获取锁失败(期望设置的值不为0或设置时失败):
tryAcquire
函数直接返回false。
在tryAcquire
函数直接返回true后acquire
函数不继续判断直接返回,则最上层的lock
函数执行完毕,线程完成锁的申请,继续执行后续代码。tryAcquire
函数直接返回false后,继续执行acquireQueued
函数。
acquireQueued
函数
tryAcquire
函数核心为尝试将state
充0设置为1,在两种情况下可能返回失败:
state
已经不为0:已有其他线程获取锁;state
为0:但同时多个线程同时调用compareAndSwapInt
,当前线程竞争失败。
无论以上何种失败,在tryAcquire
函数无法获取锁的情况下,继续进行acquireQueued
函数函数的调用。在调用acquireQueued
之前,函数会调用addWaiter
将当前线程封装为节点对象,由于调用acquire
函数获取的是排它锁,故调用addWaiter
函数时传入创建节点类型为Node.EXCLUSIVE
,addWaiter
代码如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
完成如下功能:
- 将当前线程封装到
Node
对象中; - 获取当前AQS对象中等待队列的尾节点;
- 如果当前尾节点不为空:
a. 将新建节点的前置节点指向当前尾节点;
b. 尝试使用compareAndSetTail
函数原子性的将当前AQS
对象的尾节点指向新建节点,若指向成功,将原尾节点的后续节点指向新建节点,返回新建节点对象,由于compareAndSetTail
函数保证有且只有一个线程能够设置成功,则后续pred.next = node
语句能在逻辑上保证线程安全;
c. 若设置失败,继续调用enq(node)
函数进行自旋. - 如当前尾节点不存在或希望更新尾节点时失败,继续调用
enq(node)
函数。
enq(node)
函数主要使用自旋循环CAS方式更新当前队列的尾节点(在首次初始化时会将新建一不携带任何逻辑信息的头结点),此处不再详细展开。
完成addWaiter
代码的调用后,保证当前封装当前线程的等待节点已经插入等待队列队尾,此时开始真正调用acquireQueued
函数,其代码为:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
根据代码分析:
- 取出刚加入队列的节点的前置节点,若前置节点为头结点则再次尝试获取锁,这么做的原因是在刚才获取锁失败到当前时刻,排在现在节点之前的等待节点可能被其他线程移除;
- 如获取锁成功,将当前节点设置为头结点,在设置时,需要将当前节点指向的线程及前置节点的属性都置为null,以帮助GC有机会快速收回不可达对象,之后进入
finally
块,最后返回false
; - 如果获取锁失败开始调用
shouldParkAfterFailedAcquire
函数
调用shouldParkAfterFailedAcquire
函数需要传入参数包括当前节点及当前节点的前置结点,其逻辑如下:shouldParkAfterFailedAcquire
函数在前置节点状态不为Node.SIGNAL
的状态下主要完成两件事:移除队列中所有状态值大于0的节点(如被取消的节点);将前置节点状态设置为Node.SIGNAL
。
继续acquireQueued
函数的分析,在调用shouldParkAfterFailedAcquire
函数后,若shouldParkAfterFailedAcquire
返回false,则继续刚才的循环,知道获取锁或shouldParkAfterFailedAcquire
函数返回true。由于前一次的函数调用将前置节点状态置为了Node.SIGNAL
,再次调用shouldParkAfterFailedAcquire
大概率情况下会返回true。
当shouldParkAfterFailedAcquire
函数返回true后,acquireQueued
函数继续调用parkAndCheckInterrupt
函数。1
2
3
4private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
该函数使用到了Jdk(从1.6开始)中提供的工具类方法LockSupport.park(this)
,代码为:1
2
3
4
5
6public static void park(Object blocker) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
UNSAFE.park(false, 0L);
setBlocker(t, null);
}
该函数作用设置当前线程的阻塞对象,并且调用UNSAFE.park(false, 0L)
函数将当前线程设置为阻塞状态,让出CPU。UNSAFE.park
函数在JVM的native方法中实现,查看JVM源码在Linux平台该函数最终主要使用了pthread_cond_wait
及pthread_cond_signal
两个函数进行线程的阻塞和唤醒,其中具体实现本文不再赘述。此处需要注意理论上pthread_cond_signal
函数的调用仅仅会唤醒一个线程,不会出现惊群现象(即同时唤醒多个线程),但是在某些平台可能无法完全保证,所以LockSupport.park
函数说明了三种停止阻塞并继续执行的情况:
- Some other thread invokes unpark with the current thread as the target;
- Some other thread interrupts the current thread;
- The call spuriously (that is, for no reason) returns.
其中第三种情况就可能是由惊群现象所引起。但由于本文中acquireQueued
函数使用死循环的方式进行了判断,即使线程被意外唤醒,也可以再次判断其锁的状态,在无法获取锁的情况下会再次阻塞,不会出现逻辑问题。
在执行了UNSAFE.park
函数后,当前线程进入阻塞状态,不再占用CPU资源(此处的线程调度可参考操作系统相关实现)。当前当前线程被唤醒后从LockSupport.park
函数返回,并且通过Thread.interrupted()
返回当前线程中断状态。之后继续在循环中继续判断是获取锁还是继续阻塞。这里需要注意由于acquireQueued
并没有处理中断,其只会在正常获取锁后返回当前线程是否是由于中断被唤醒的,所以使用acquireQueued
获取锁的线程不会因为中断而停止锁的获取。
最后如果在获取锁的过程出现异常,则会调用cancelAcquire(node)
函数,cancelAcquire
函数执行以下逻辑:
- 将当前节点指向的线程置为空;
- 设置指向前置节点的引用,跳过所有已经被取消的前驱节点;
- 将当前节点的状态置为已取消状态;
- 如果当前节点是尾节点,原子性的设置当前节点的前置节点为尾节点,并且将前置节点的后驱节点置为空;
- 如果当前节点不是尾节点,则判断当前节点的前置节点是不是头结点且前置节点状态为
Node.SIGNAL
则将前置节点的后置节点置为当前节点的后置置节点,否则调用unparkSuccessor
函数唤醒后置节点。
锁获取总结
到此为止通过分析acquire
函数的实现,完成了Mutex
类lock
函数的实现的分析。在获取锁的过程中主要使用AQS
相关功能,通过维护等待队列的方式完成线程的阻塞及唤醒。Mutex类是AQS注释中的示例类,不能应用在实际生产环境中,如以上加锁过程没有考虑重入问题,生产环境使用可能出现递归调用死锁问题
解锁过程
在线程执行完成临界区代码(或出现异常退出临界区时)需要调用unlock
函数进行解锁以让出排他锁。通过对于示例代码的分析,解锁函数实际使用sync.release
函数进行,该函数代码为:1
2
3
4
5
6
7
8
9public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
为了方便观察,将tryRelease
函数也展示出来:
1 | protected boolean tryRelease(int releases) { |
解锁函数较为简单,当前的Mutex
类假设使用锁的代码都是按照先加锁再解锁的顺序进行执行,所以在此情况下能保证同一时刻有且只有一个线程能够执行unlock
函数,故解锁过程非常简单且不需要同步:
- 获取锁的状态,如果锁的状态为0,则表示目前没有加锁,抛出异常;
- 将当前锁记录的拥有锁的线程置为空;
- 将锁的状态置为0;
- 获取当前等待队列头结点,如头结点不为空且其等待状态不为0则唤醒头结点的后置节点
到此则释放锁的过程结束。Mutex类是AQS注释中的示例类,不能应用在实际生产环境中,如以上解锁过程就没有判断当前解锁的线程是不是已经得到锁的线程,这在生产环境中可能出现未知的问题
排它锁总结
本节分析了使用AQS进行实现互斥锁的方法,并深入AQS代码分析其实现原理。重点分析其加锁及解锁过程,其他锁方法类似,本处不在赘述。针对其他重要方法、共享锁及Condition
类的分析后续结合JCU包中关联类进行。