juc之AQS源码一
两种资源共享方式
AQS定义了两种资源共享方式:
- Exclusive:独占方式,只有一个线程能执行,如ReentrantLock。
- Share:共享,多个线程可以同时执行,如Semaphore、CountDownLatch、ReadWriteLock,CyclicBarrier。
对应的,在不同的资源共享模式下,就有了不同的源码。
值得注意的是,acquire()和acquireShared()两种方法下,线程在等待队列中都是忽略中断的。AQS也支持响应中断的,acquireInterruptibly()/acquireSharedInterruptibly()
即是。
独占方式
独占方式下,获取资源与释放资源的一组为acquire()
和release()
方法。
Acquire独占模式获取资源
acquire方法
acquire方法时独占模式下线程获取共享资源的顶层入口。如果获取到资源,线程直接返回,否则进入等待队列,直到获取到资源为止,且整个过程忽略中断的影响,获取到资源后线程就可以执行其临界区代码了。
1 | public final void acquire(int arg) { |
函数流程如下:
- tryAcquire()尝试直接去获取资源,如果成功则直接返回(这里体现了非公平锁,每个线程获取锁时会尝试直接抢占加塞一次,而CLH队列中可能还有别的线程在等待)。
- addWaiter()将该线程加入等待队列的尾部,并标记为独占模式。
- acquireQueued()使线程阻塞在等待队列中获取资源,一直获取到资源后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false。
- 如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断selfInterrupt(),将中断补上。
tryAcquire方法
此方法尝试去获取独占资源。如果获取成功,则直接返回true,否则直接返回false。这个方法在AQS中定义了空方法体,需要各个自定义同步器自己去重写这个方法,如CountDownLatch的Sync。
1 | protected boolean tryAcquire(int arg) { |
这里就是模板方法模式的典型应用。
这里之所以没有定义成abstract,是因为独占模式下只用实现tryAcquire-tryRelease,而共享模式下只用实现tryAcquireShared-tryReleaseShared。如果都定义成abstract,那么每个模式也要去实现另一模式下的接口。
addWaiter方法
用于将当前线程加入到等待队列的队尾,并返回当前线程所在的结点。这里会将每一个线程封装成一个Node,Node中有一个Tread属性用来保存当前线程。等待队列可以看作是一个双向链表,一个CLH队列,一个虚拟的双向队列。
1 | private Node addWaiter(Node mode) { |
enq方法将节点加入队尾。
1 | private Node enq(final Node node) { |
acquireQueued方法
通过tryAcquire()
和addWaiter()
,该线程获取资源失败,已经被放入等待队列尾部了。
acquireQueued()
用于在等待队列中排队拿号(中间没其它事干可以休息),直到拿到号后再返回。
进入等待状态休息,直到其他线程彻底释放资源后唤醒自己,自己再拿到资源,然后就可以去干自己想干的事了。
1 | final boolean acquireQueued(final Node node, int arg) { |
shouldParkAfterFailedAcquire方法
这个方法主要用于检查状态,这里主要就是前驱节点的状态。
整个流程中,如果前驱结点的状态不是SIGNAL,那么自己就不能安心去休息,需要去找个安心的休息点,同时可以再尝试下看有没有机会轮到自己拿号。
1 | private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { |
parkAndCheckInterrupt方法
这个方法用于将前驱节点的状态设置好后,真正进入等待状态。
1 | private final boolean parkAndCheckInterrupt() { |
小结
acquireQueued方法的流程就是:
- 结点进入队尾后,检查状态,找到安全休息点。
- 调用park()进入waiting状态,等待unpark()或interrupt()唤醒自己。
- 被唤醒后,看自己是不是有资格能拿到号。如果拿到,head指向当前结点,并返回从入队到拿到号的整个过程中是否被中断过。如果没拿到,继续流程1。
总结
整个流程就是:
- 调用自定义同步器的tryAcquire()尝试直接去获取资源,如果成功则直接返回。
- 没成功,则addWaiter()将该线程加入等待队列的尾部,并标记为独占模式。
- acquireQueued()使线程在等待队列中休息,有机会时(轮到自己,会被unpark())会去尝试获取资源。获取到资源后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false。
- 这里检测自己是否能拿到资源,拿到就会返回
- 这里面会判断是否有被中断过
- 如果有被中断,这里面会清除中断过程,不响应中断,外面会将中断补上
- 如果没有被中断,这里面会返回false,表示没有中断,外面就不会补
- 如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断selfInterrupt(),将中断补上。
Release独占模式释放资源
release方法
此方法是独占模式下线程释放共享资源的顶层入口。它会释放指定量的资源,如果彻底释放了(即state=0),它会唤醒等待队列里的其他线程来获取资源。
它是根据tryRelease()的返回值来判断该线程是否已经完成释放掉资源了!所以自定义同步器在设计tryRelease()的时候要明确这一点!
1 | public final boolean release(int arg) { |
tryRelease方法
跟tryAcquire()一样,这个方法是需要独占模式的自定义同步器去实现的。
正常来说,tryRelease()都会成功的,因为这是独占模式,该线程来释放资源,那么它肯定已经拿到独占资源了,直接减掉相应量的资源即可(state-=arg),也不需要考虑线程安全的问题。
但要注意它的返回值,上面已经提到了,release()是根据tryRelease()的返回值来判断该线程是否已经完成释放掉资源了!所以自义定同步器在实现时,如果已经彻底释放资源(state=0),要返回true,否则返回false。
1 | protected boolean tryRelease(int arg) { |
unparkSuccessor方法
此方法用于唤醒等待队列中下一个线程。原理就是用unpark()唤醒等待队列中最前边的那个未放弃线程。
1 | private void unparkSuccessor(Node node) { |
这里我们也用s来表示吧。此时,再和acquireQueued()
联系起来,s被唤醒后,进入if (p == head && tryAcquire(arg))
的判断(即使p!=head也没关系,它会再进入shouldParkAfterFailedAcquire()
寻找一个安全点。这里既然s已经是等待队列中最前边的那个未放弃线程了,那么通过shouldParkAfterFailedAcquire()
的调整,s也必然会跑到head的next结点,下一次自旋p==head
就成立啦),然后s把自己设置成head标杆结点,表示自己已经获取到资源了,acquire()也返回了!!And then, DO what you WANT!
总结
release()是独占模式下线程释放共享资源的顶层入口。它会释放指定量的资源,如果彻底释放了(即state=0),它会唤醒等待队列里的其他线程来获取资源。
如果获取锁的线程在release时异常了,没有unpark队列中的其他结点,这时队列中的其他结点会怎么办?这时队列中的其他节点就没法被唤醒了。 产生异常的可能如下:
- 线程突然死掉了?可以通过thread.stop来停止线程的执行,但该函数的执行条件要严苛的多,而且函数注明是非线程安全的,已经标明Deprecated;
- 线程被interupt了?线程在运行态是不响应中断的,所以也不会抛出异常;
- release代码有bug,抛出异常了?目前来看,Doug Lea的release方法还是比较健壮的,没有看出能引发异常的情形(如果有,恐怕早被用户吐槽了)。除非自己写的tryRelease()有bug,那就没啥说的,自己写的bug只能自己含着泪去承受了。