0%

JDK源码 AQS

AbstractQueueSynchronizer笔记

在开始介绍之前我先来简单理解一下为什么会产生AQS。在我们应用内不可避免的会发生对一些资源的抢占,那么如何处理线程之间对资源的争夺呢?在Java SE 5 之前JDK可以使用synchronized来串行化对资源的操作,synchronized可以隐式的获取和释放锁,但是带来的不便就是不够灵活,可扩展性没有显式获取和释放锁的自主控制性强,另外synchronized是完全互斥的,没法达到诸如共享锁、读写锁的效果。对资源的竞争可以抽象为对变量状态的竞争,这正是AQS实现的基本原理,当然,AQS的实现是复杂的。

AQS(AbstractQueueSynchronizer)俗称同步器,是JDK中用来构建同步组件的基础框架,比如JDK中已经实现的可重入锁(ReentrantLock)就是基于AQS实现的,它的关键方法在实现中使用了unsafe的CAS操作来提高性能。

AQS采用模版方法的设计模式,屏蔽了同步状态管理、线程排队、等待与唤醒等底层操作,实现者只需要实现AQS定义的抽象方法即可快速实现自己的同步需求。

  • 需要实现的模版接口
    以下列出的接口方法,并非需要全部实现,按照自己对同步器的要求选择相应的进行实现。以下方法AQS默认抛出UnsupportedOperationException。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    /******************独占式同步状态 start****************/
    protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
    }
    protected boolean tryRelease(int arg) {
    throw new UnsupportedOperationException();
    }
    /******************独占式同步状态 end*****************/

    /******************共享式同步状态 start****************/
    /**
    *返回值大于等于0表示获取成功
    **/
    protected int tryAcquireShared(int arg) {
    throw new UnsupportedOperationException();
    }
    protected boolean tryReleaseShared(int arg) {
    throw new UnsupportedOperationException();
    }
    /******************共享式同步状态 end****************/

    /******************获取同步状态模式 start**************/
    protected boolean isHeldExclusively() {
    throw new UnsupportedOperationException();
    }
    /******************获取同步状态模式 end**************/
  • 关键属性

属性类型及名称 描述
volatile int state 竞争资源的抽象。代表资源的占用状态。
volatile Node head 同步队列用来保存获取状态失败的线程数据结构,指向队列头部
volatile Node tail 同上,指向队列尾部

结合具体的一个实现(ReentrantLock)来看AQS。ReentrantLock中的公平锁和非公平锁便是AQS独占锁的一种实现,从ReentrantLock默认的非公平锁入手,因为他是ReentrantLock的默认锁。

1
2
3
4
5
6
7
8
9
10
11
12
class X {
private final ReentrantLock lock = new ReentrantLock();
// ...
public void m() {
lock.lock(); // block until condition holds
try {
// ... method body
} finally {
lock.unlock()
}
}
}

上面是使用ReentrantLock的基本方式,当我们调用lock()方法时,发生了些什么事呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
static final class NonfairSync extends Sync {
/**
*直接加锁,不理会当前是否有其他线程在排队
*如果失败,则进入自旋状态,在自旋中检查当前线程是否应当挂起
**/
final void lock() {
//compareAndSetState/acquire/setExclusiveOwnerThread父类实现模版方法
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}

//模版方法(子类实现,根据要实现锁的类性复写对应模版方法)
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}

compareAndSetState由AQS提供,采用CAS的方式对state设值,保证操作的原子性。如果设置成功,则将当前线程作为排他线程;如果抢占式获取失败则进入等待队列,即acquire(1)。我们看下acquire的实现。

1
2
3
4
5
6
7
8
9
10
AbstractQueuedSynchronizer
public final void acquire(int arg) {
/**
*1.尝试再次获取锁,成功->业务代码
*2.失败->将自己加入等待队列尾部,自旋获取锁,获取成功则返回,否则直到满足挂起条件把自己挂起
**/
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

前面已经列出tryAcquire是需要子类实现的方法,我们在NonfairSync中看到正是如此,NonfairSync中tryAcquire方法调用了其父类Sync的nonfairTryAcquire。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
ReentrantLock$Sync
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
//获取当前资源状态,表格“关键属性”中列出的state值
int c = getState();
//如果为0,表明当前时刻没有线程占有竞争资源
if (c == 0) {
//尝试再次获取
if (compareAndSetState(0, acquires)) {
//获取成功->设置当前线程为占有线程
setExclusiveOwnerThread(current);
return true;
}
}
//如果当前线程为占有线程
//ps:可重入锁的特点
else if (current == getExclusiveOwnerThread()) {
//增加重入次数,最大值为Integer.MAX_VALUE
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

如果获取还是失败,就需要将当前线程加入等待队列的尾部,等待队列是一个FIFO双向队列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
AbstractQueuedSynchronizer	
/** FIFO同步队列
* +------+ prev +-----+ +-----+
*head | | <----> | | <----> | | tail
* +------+ next +-----+ +-----+
**/
private Node addWaiter(Node mode) {
//默认为addWaiter(Node.EXCLUSIVE)排他模式
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//如果快速入队失败,则进入到完整入队过程
enq(node);
return node;
}
//不断CAS将Node安全追加到队列尾部
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
//注意初始化队列头部Node
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

简单说下Node的数据结构

属性 描述
int waitStatus 等待状态。字段的具体描述见下表(因为MD换行无法设置格式)
Node prev 前驱节点
Node next 后继节点
Node nextWaiter Condition等待队列中的后继节点。如果该节点是Condition的,那么该节点表示等待在该Condition的下一个节点;如果不是,那么表示当前队列的默认mode(SHARED/EXCLUSIVE)
Thread thread 当前节点包装的线程
waitStatus状态 描述
CANCELLED 1 由于在同步队列中等待的线程等待超时活着中断,需要从同步队列中取消等待,节点进入该状态将不会变化
SIGNAL -1 后继节点的线程处于等待状态,当当前节点的线程如果释放了同步状态活着取消,将会通知后继节点,使后继接待你的线程得以运行
CONDITION -2 节点在对应的Condition等待队列中,当其他线程对Condition调用了signal()方法后,该节点将会从等待队列中转移到同步队列中,准备开始对状态的竞争
PROPAGATE -3 表示下一次共享式同步状态获取将会无条件被传播下去(阅读ReentrantReadWriteLock可以看到运用)
initial 0 Node非Condition下的初始态

我们看到addWaiter方法在队列最后追加了一个初始态的排他Node,完成此步骤后当前线程并没有直接被挂起,这是AQS和synchronized的不用点也是其高效的体现,我们知道线程的挂起和恢复是需要CPU进行上下文切换,这是一个相对耗时的操作,AQS在一个线程满足挂起条件前会不停的尝试获取锁,避免上下文的切换。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
//死循环获取锁,每次唤醒后重新竞争获取锁,避免假唤醒
for (;;) {
//获取当前节点的前一个节点
final Node p = node.predecessor();
//如果前节点为头节点,再次尝试获取
if (p == head && tryAcquire(arg)) {
//获取成功后,设置当前节点为头节点,注意设置setHead细节,此处不再贴出代码
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//获取失败判断当前线程是否应当挂起,如果满足挂起条件则进行挂起
//挂起parkAndCheckInterrupt比较简单,直接调用LockSupport.park(this);
//被唤醒后会返回当前线程的中断状态,然后在循环中继续竞争锁
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

/**
* 只有当前一个节点的状态为Node.SIGNAL时返回true,即:应当挂起。
**/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//前一个节点的等待状态waitStatus
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
* 前一个节点已经设置为释放锁要通知随后的节点,可以安全挂起。
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
* 前一个节点已经是取消状态,移除并且找到最近一个非取消状态的节点
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
* 前节点等待状态此时已定为初始态(0)或者PROPAGATE,我们需要一个
* 通知,所以需要将前节点设置为Node.SIGNAL,但是当前节点还不应该
* 被挂起,被挂起前应当确定当前线程确实不能够获取锁
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}

至此,一个竞争失败的线程就被安全挂起,等待其他线程释放锁后把它唤醒,被唤醒后继续未竞事业。
我们再来看下AQS锁释放的过程,还是以ReentrantLock为入口。

1
2
3
4
5
ReentrantLock
public void unlock() {
//ReentrantLock.unlock直接调用非公平锁的release方法,该方法是AQS实现的模版方法。
sync.release(1);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
AbstractQueuedSynchronizer
public final boolean release(int arg) {
//AQS先调用由子类ReentrantLock实现的的tryRelease方法,如果释放成功,则唤醒后续节点
if (tryRelease(arg)) {
Node h = head;
//如果头节点不为空并且等待状态不是初始态(0),则唤醒后续节点
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
* 如果当前节点等待状态是负数(比如-1,需要换醒后续节点)
* 尝试改变头节点等待状态,改变失败或者已经被其他线程改变也没有关系
* 因为它总会被设置为正常的状态并且被移除出队列
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);

/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
* 找出正常的节点并且唤醒
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
//如果下个节点不存在或者已被取消,则找出最近的等待状态小于0的节点
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
//唤醒后续节点
if (s != null)
LockSupport.unpark(s.thread);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
ReentrantLock$NonfairSync
protected final boolean tryRelease(int releases) {
//因为当前是可重入锁,state值实际保存了当前线程的重入次数,释放的时候需要依次释放
int c = getState() - releases;
//非锁持有线程释放锁会抛出异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
//如果c为0,表明当前线程已经完全释放锁
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}

至此,排他锁的一种获取和释放就结束了。实际上AQS中还有Condition锁(称之为锁也有些不恰当),每个Condition自己又维护了一个等待队列,Condition等待队列中的Node满足条件会被转移到AQS维护的队列中来完成锁的竞争。感兴趣的同学可以看ArrayBlockingQueue的实现,ArrayBlockingQueue就是基于AQS的该特性实现的。

从网上摘抄过来一段对AQS的总结作为结束。

首先,AQS并不关心“是什么锁”,对于AQS来说它只是实现了一系列的用于判断“资源”是否可以访问的API,并且封装了在“访问资源”受限时将请求访问的线程的加入队列、挂起、唤醒等操作, AQS只关心“资源不可以访问时,怎么处理?”、“资源是可以被同时访问,还是在同一时间只能被一个线程访问?”、“如果有线程等不及资源了,怎么从AQS的队列中退出?”等一系列围绕资源访问的问题,而至于“资源是否可以被访问?”这个问题则交给AQS的子类去实现。

这是典型的将规则和操作分开的设计思路:规则子类定义,操作逻辑因为具有公用性,放在父类中去封装。

小生不才,以上如有描述有误的地方还望各位不吝赐教 !^_^!