juc之AQS源码一

两种资源共享方式

  AQS定义了两种资源共享方式:

  1. Exclusive:独占方式,只有一个线程能执行,如ReentrantLock。
  2. Share:共享,多个线程可以同时执行,如Semaphore、CountDownLatch、ReadWriteLock,CyclicBarrier。

  对应的,在不同的资源共享模式下,就有了不同的源码。
  值得注意的是,acquire()和acquireShared()两种方法下,线程在等待队列中都是忽略中断的。AQS也支持响应中断的,acquireInterruptibly()/acquireSharedInterruptibly()即是。


独占方式

 独占方式下,获取资源与释放资源的一组为acquire()release()方法。

Acquire独占模式获取资源

acquire方法

  acquire方法时独占模式下线程获取共享资源的顶层入口。如果获取到资源,线程直接返回,否则进入等待队列,直到获取到资源为止,且整个过程忽略中断的影响,获取到资源后线程就可以执行其临界区代码了。

1
2
3
4
5
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

  函数流程如下:

  1. tryAcquire()尝试直接去获取资源,如果成功则直接返回(这里体现了非公平锁,每个线程获取锁时会尝试直接抢占加塞一次,而CLH队列中可能还有别的线程在等待)。
  2. addWaiter()将该线程加入等待队列的尾部,并标记为独占模式。
  3. acquireQueued()使线程阻塞在等待队列中获取资源,一直获取到资源后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false。
  4. 如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断selfInterrupt(),将中断补上。

tryAcquire方法

  此方法尝试去获取独占资源。如果获取成功,则直接返回true,否则直接返回false。这个方法在AQS中定义了空方法体,需要各个自定义同步器自己去重写这个方法,如CountDownLatch的Sync。

1
2
3
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}

  这里就是模板方法模式的典型应用。
  这里之所以没有定义成abstract,是因为独占模式下只用实现tryAcquire-tryRelease,而共享模式下只用实现tryAcquireShared-tryReleaseShared。如果都定义成abstract,那么每个模式也要去实现另一模式下的接口。

addWaiter方法

  用于将当前线程加入到等待队列的队尾,并返回当前线程所在的结点。这里会将每一个线程封装成一个Node,Node中有一个Tread属性用来保存当前线程。等待队列可以看作是一个双向链表,一个CLH队列,一个虚拟的双向队列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}

  enq方法将节点加入队尾。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

acquireQueued方法

  通过tryAcquire()addWaiter(),该线程获取资源失败,已经被放入等待队列尾部了。
  acquireQueued()用于在等待队列中排队拿号(中间没其它事干可以休息),直到拿到号后再返回。
  进入等待状态休息,直到其他线程彻底释放资源后唤醒自己,自己再拿到资源,然后就可以去干自己想干的事了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
final boolean acquireQueued(final Node node, int arg) {
// 标记是否成功拿到资源,没有拿到资源
boolean failed = true;
try {
// 标记等待过程中是否被中断过
boolean interrupted = false;
for (;;) {
// 拿到前驱节点
final Node p = node.predecessor();
// 如果前驱是head,即该结点已是第二,那么便有资格去尝试获取资源(可能是老大释放完资源唤醒自己的,当然也可能被interrupt了)。
if (p == head && tryAcquire(arg)) {
// 拿到资源后,将自己设置为head,所以head指向的节点就是拿到资源的节点
setHead(node);
// 将head的前驱断开连接,就是为了放弃以前的有助于GC
p.next = null; // help GC
// 成功拿到资源
failed = false;
// 返回等待过程中是否被中断,如果被中断了,就是获取失败了
return interrupted;
}
// 如果自己可以休息了,就通过park()进入waiting状态,直到被unpark()。如果不可中断的情况下被中断了,那么会从park()中醒过来,发现拿不到资源,从而继续进入park()等待。
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
// 表示被中断了
interrupted = true;
}
} finally {
// 如果等待过程中没有成功获取资源(如timeout,或者可中断的情况下被中断了),那么取消结点在队列中的等待。
if (failed)
cancelAcquire(node);
}
}

shouldParkAfterFailedAcquire方法

  这个方法主要用于检查状态,这里主要就是前驱节点的状态。
  整个流程中,如果前驱结点的状态不是SIGNAL,那么自己就不能安心去休息,需要去找个安心的休息点,同时可以再尝试下看有没有机会轮到自己拿号。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 拿到前驱的状态
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
// 如果已经告诉前驱拿完号后通知自己一下,那就可以安心休息了
return true;
if (ws > 0) {
// 如果前驱放弃了,那就一直往前找,直到找到最近一个正常等待的状态,并排在它的后边。
// 注意:那些放弃的结点,由于被自己“加塞”到它们前边,它们相当于形成一个无引用链,稍后就会被保安大叔赶走了(GC回收)!
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 如果前驱正常,那就把前驱的状态设置成SIGNAL,告诉它拿完号后通知自己一下。有可能失败,人家说不定刚刚释放完呢!
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}

parkAndCheckInterrupt方法

  这个方法用于将前驱节点的状态设置好后,真正进入等待状态。

1
2
3
4
5
6
7
private final boolean parkAndCheckInterrupt() {
// 使线程进入waiting状态
LockSupport.park(this);
// 清除中断状态
//如果被唤醒,查看自己是不是被中断的。
return Thread.interrupted();
}

小结

  acquireQueued方法的流程就是:

  1. 结点进入队尾后,检查状态,找到安全休息点。
  2. 调用park()进入waiting状态,等待unpark()或interrupt()唤醒自己。
  3. 被唤醒后,看自己是不是有资格能拿到号。如果拿到,head指向当前结点,并返回从入队到拿到号的整个过程中是否被中断过。如果没拿到,继续流程1。

总结

  整个流程就是:

  1. 调用自定义同步器的tryAcquire()尝试直接去获取资源,如果成功则直接返回。
  2. 没成功,则addWaiter()将该线程加入等待队列的尾部,并标记为独占模式。
  3. acquireQueued()使线程在等待队列中休息,有机会时(轮到自己,会被unpark())会去尝试获取资源。获取到资源后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false。
    1. 这里检测自己是否能拿到资源,拿到就会返回
    2. 这里面会判断是否有被中断过
    3. 如果有被中断,这里面会清除中断过程,不响应中断,外面会将中断补上
    4. 如果没有被中断,这里面会返回false,表示没有中断,外面就不会补
  4. 如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断selfInterrupt(),将中断补上。

acquire流程

Release独占模式释放资源

release方法

  此方法是独占模式下线程释放共享资源的顶层入口。它会释放指定量的资源,如果彻底释放了(即state=0),它会唤醒等待队列里的其他线程来获取资源。
  它是根据tryRelease()的返回值来判断该线程是否已经完成释放掉资源了!所以自定义同步器在设计tryRelease()的时候要明确这一点!

1
2
3
4
5
6
7
8
9
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

tryRelease方法

  跟tryAcquire()一样,这个方法是需要独占模式的自定义同步器去实现的。
  正常来说,tryRelease()都会成功的,因为这是独占模式,该线程来释放资源,那么它肯定已经拿到独占资源了,直接减掉相应量的资源即可(state-=arg),也不需要考虑线程安全的问题。
  但要注意它的返回值,上面已经提到了,release()是根据tryRelease()的返回值来判断该线程是否已经完成释放掉资源了!所以自义定同步器在实现时,如果已经彻底释放资源(state=0),要返回true,否则返回false。

1
2
3
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}

unparkSuccessor方法

  此方法用于唤醒等待队列中下一个线程。原理就是用unpark()唤醒等待队列中最前边的那个未放弃线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private void unparkSuccessor(Node node) {
// 这里,node一边为当前线程所在节点
int ws = node.waitStatus;
if (ws < 0)
//置零当前线程所在的结点状态,允许失败。
compareAndSetWaitStatus(node, ws, 0);

// 找到下一个需要唤醒的节点
Node s = node.next;
// 如果为空或已取消
if (s == null || s.waitStatus > 0) {
s = null;
// 从后向前找
for (Node t = tail; t != null && t != node; t = t.prev)
// 从这里可以看出,<=0 的结点,都是还有效的结点,这里找到最前面的待唤醒的节点
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
// 唤醒
LockSupport.unpark(s.thread);
}

  这里我们也用s来表示吧。此时,再和acquireQueued()联系起来,s被唤醒后,进入if (p == head && tryAcquire(arg))的判断(即使p!=head也没关系,它会再进入shouldParkAfterFailedAcquire()寻找一个安全点。这里既然s已经是等待队列中最前边的那个未放弃线程了,那么通过shouldParkAfterFailedAcquire()的调整,s也必然会跑到head的next结点,下一次自旋p==head就成立啦),然后s把自己设置成head标杆结点,表示自己已经获取到资源了,acquire()也返回了!!And then, DO what you WANT!

总结

  release()是独占模式下线程释放共享资源的顶层入口。它会释放指定量的资源,如果彻底释放了(即state=0),它会唤醒等待队列里的其他线程来获取资源。
  如果获取锁的线程在release时异常了,没有unpark队列中的其他结点,这时队列中的其他结点会怎么办?这时队列中的其他节点就没法被唤醒了。 产生异常的可能如下:

  1. 线程突然死掉了?可以通过thread.stop来停止线程的执行,但该函数的执行条件要严苛的多,而且函数注明是非线程安全的,已经标明Deprecated;
  2. 线程被interupt了?线程在运行态是不响应中断的,所以也不会抛出异常;
  3. release代码有bug,抛出异常了?目前来看,Doug Lea的release方法还是比较健壮的,没有看出能引发异常的情形(如果有,恐怕早被用户吐槽了)。除非自己写的tryRelease()有bug,那就没啥说的,自己写的bug只能自己含着泪去承受了。

参考文献 & 鸣谢