在分析 Java 并发包 java.util.concurrent 源码的时候,少不了需要了解 AbstractQueuedSynchronizer(以下简写AQS)这个抽象类,因为它是 Java 并发包的基础工具类,是实现 ReentrantLock、CountDownLatch、Semaphore、FutureTask 等类的基础。Google 一下 AbstractQueuedSynchronizer,我们可以找到很多关于 AQS 的介绍,但是很多都没有介绍清楚,因为大部分文章没有把其中的一些关键的细节说清楚。本文将从 ReentrantLock 的公平锁源码出发,分析下 AbstractQueuedSynchronizer 这个类是怎么工作的,希望能给大家提供一些简单的帮助。
申明以下几点:
1、本文有点长,但还是挺简单,主要面向读者对象为并发编程的初学者,或者想要阅读 Java 并发包源码的开发者。对于新手来说,可能需要花好几个小时才能完全看懂,但是这时间肯定是值得的。 2、源码环境 JDK1.7(1.8没啥变化),看到不懂或有疑惑的部分,最好能自己打开源码看看。Doug Lea 大神的代码写得真心不错。 3、本文不分析共享模式,这样可以给读者减少很多负担,第三篇文章 对共享模式进行了分析。而且也不分析 condition 部分,所以应该说很容易就可以看懂了。 4、本文大量使用我们平时用得最多的 ReentrantLock 的概念,本质上来说是不正确的,读者应该清楚,AQS 不仅仅用来实现可重入锁,只是希望读者可以用锁来联想 AQS 的使用场景,降低阅读压力。 5、ReentrantLock 的公平锁和非公平锁只有一点点区别,第二篇文章 做了介绍。 6、评论区有读者反馈本文直接用代码说不友好,应该多配点流程图,这篇文章确实有这个问题。但是作为过来人,我想告诉大家,对于 AQS 来说,形式真的不重要,重要的是把细节说清楚。
AQS 结构 先来看看 AQS 有哪些属性,搞清楚这些基本就知道 AQS 是什么套路了,毕竟可以猜嘛!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 private transient volatile Node head;private transient volatile Node tail;private volatile int state;private transient Thread exclusiveOwnerThread;
怎么样,看样子应该是很简单的吧,毕竟也就四个属性啊。AbstractQueuedSynchronizer 的等待队列示意如下所示,注意了,之后分析过程中所说的 queue,也就是阻塞队列 不包含 head,不包含 head,不包含 head 。
等待队列中每个线程被包装成一个 Node 实例,数据结构是链表,一起看看源码吧:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 static final class Node { static final Node SHARED = new Node(); static final Node EXCLUSIVE = null ; static final int CANCELLED = 1 ; static final int SIGNAL = -1 ; static final int CONDITION = -2 ; static final int PROPAGATE = -3 ; volatile int waitStatus; volatile Node prev; volatile Node next; volatile Thread thread; }
Node 的数据结构其实也挺简单的,就是 thread + waitStatus + pre + next 四个属性而已,大家先要有这个概念在心里。
上面的是基础知识,后面会多次用到,心里要时刻记着它们,心里想着这个结构图就可以了。下面,我们开始说 ReentrantLock 的公平锁。再次强调,我说的阻塞队列不包含 head 节点。
首先,我们先看下 ReentrantLock 的使用方式。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public class OrderService { private static ReentrantLock reentrantLock = new ReentrantLock(true ); public void createOrder () { reentrantLock.lock(); try { } finally { reentrantLock.unlock(); } } }
ReentrantLock 在内部用了内部类 Sync 来管理锁,所以真正的获取锁和释放锁是由 Sync 的实现类来控制的。
1 2 abstract static class Sync extends AbstractQueuedSynchronizer {}
Sync 有两个实现,分别为 NonfairSync(非公平锁)和 FairSync(公平锁),我们看 FairSync 部分。
1 2 3 public ReentrantLock (boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }
线程抢锁 很多人肯定开始嫌弃上面废话太多了,下面跟着代码走,我就不废话了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 static final class FairSync extends Sync { private static final long serialVersionUID = -3000897897090466540L ; final void lock () { acquire(1 ); } public final void acquire (int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) { selfInterrupt(); } } protected final boolean tryAcquire (int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0 ) { if (!hasQueuedPredecessors() && compareAndSetState(0 , acquires)) { setExclusiveOwnerThread(current); return true ; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0 ) throw new Error("Maximum lock count exceeded" ); setState(nextc); return true ; } return false ; } private Node addWaiter (Node mode) { Node node = new Node(Thread.currentThread(), mode); Node pred = tail; if (pred != null ) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; } private Node enq (final Node node) { for (;;) { Node t = tail; if (t == null ) { if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } } final boolean acquireQueued (final Node node, int arg) { boolean failed = true ; try { boolean interrupted = false ; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null ; failed = false ; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true ; } } finally { if (failed) cancelAcquire(node); } } private static boolean shouldParkAfterFailedAcquire (Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) return true ; if (ws > 0 ) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0 ); pred.next = node; } else { compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false ; } private final boolean parkAndCheckInterrupt () { LockSupport.park(this ); return Thread.interrupted(); } }
说到这里,也就明白了,多看几遍 final boolean acquireQueued(final Node node, int arg)
这个方法吧。自己推演下各个分支怎么走,哪种情况下会发生什么,走到哪里。
解锁操作 最后,就是还需要介绍下唤醒的动作了。我们知道,正常情况下,如果线程没获取到锁,线程会被 LockSupport.park(this);
挂起停止,等待被唤醒。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 public void unlock () { sync.release(1 ); } public final boolean release (int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0 ) unparkSuccessor(h); return true ; } return false ; } protected final boolean tryRelease (int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false ; if (c == 0 ) { free = true ; setExclusiveOwnerThread(null ); } setState(c); return free; } private void unparkSuccessor (Node node) { int ws = node.waitStatus; if (ws < 0 ) compareAndSetWaitStatus(node, ws, 0 ); Node s = node.next; if (s == null || s.waitStatus > 0 ) { s = null ; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0 ) s = t; } if (s != null ) LockSupport.unpark(s.thread); }
唤醒线程以后,被唤醒的线程将从以下代码中继续往前走:
1 2 3 4 5 private final boolean parkAndCheckInterrupt () { LockSupport.park(this ); return Thread.interrupted(); }
好了,后面就不分析源码了,剩下的还有问题自己去仔细看看代码吧。
总结 总结一下吧。
在并发环境下,加锁和解锁需要以下三个部件的协调:
1、 锁状态。我们要知道锁是不是被别的线程占有了,这个就是 state 的作用,它为 0 的时候代表没有线程占有锁,可以去争抢这个锁,用 CAS 将 state 设为 1,如果 CAS 成功,说明抢到了锁,这样其他线程就抢不到了,如果锁重入的话,state进行 +1 就可以,解锁就是减 1,直到 state 又变为 0,代表释放锁,所以 lock() 和 unlock() 必须要配对啊。然后唤醒等待队列中的第一个线程,让其来占有锁。 2、 线程的阻塞和解除阻塞。AQS 中采用了 LockSupport.park(thread) 来挂起线程,用 unpark 来唤醒线程。 3、 阻塞队列。因为争抢锁的线程可能很多,但是只能有一个线程拿到锁,其他的线程都必须等待,这个时候就需要一个 queue 来管理这些线程,AQS 用的是一个 FIFO 的队列,就是一个链表,每个 node 都持有后继节点的引用。AQS 采用了 CLH 锁的变体来实现,感兴趣的读者可以参考这篇文章关于CLH的介绍 ,写得简单明了。
示例图解析 下面属于回顾环节,用简单的示例来说一遍,如果上面的有些东西没看懂,这里还有一次帮助你理解的机会。
首先,第一个线程调用 reentrantLock.lock(),翻到最前面可以发现,tryAcquire(1) 直接就返回 true 了,结束。只是设置了 state=1,连 head 都没有初始化,更谈不上什么阻塞队列了。要是线程 1 调用 unlock() 了,才有线程 2 来,那世界就太太太平了,完全没有交集嘛,那我还要 AQS 干嘛。
如果线程 1 没有调用 unlock() 之前,线程 2 调用了 lock(), 想想会发生什么?
线程 2 会初始化 head【new Node()】,同时线程 2 也会插入到阻塞队列并挂起 (注意看这里是一个 for 循环,而且设置 head 和 tail 的部分是不 return 的,只有入队成功才会跳出循环)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 private Node enq (final Node node) { for (;;) { Node t = tail; if (t == null ) { if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
首先,是线程 2 初始化 head 节点,此时 head==tail, waitStatus==0
然后线程 2 入队:
同时我们也要看此时节点的 waitStatus,我们知道 head 节点是线程 2 初始化的,此时的 waitStatus 没有设置, java 默认会设置为 0,但是到 shouldParkAfterFailedAcquire 这个方法的时候,线程 2 会把前驱节点,也就是 head 的waitStatus设置为 -1。
那线程 2 节点此时的 waitStatus 是多少呢,由于没有设置,所以是 0;
如果线程 3 此时再进来,直接插到线程 2 的后面就可以了,此时线程 3 的 waitStatus 是 0,到 shouldParkAfterFailedAcquire 方法的时候把前驱节点线程 2 的 waitStatus 设置为 -1。
这里可以简单说下 waitStatus 中 SIGNAL(-1) 状态的意思,Doug Lea 注释的是:代表后继节点需要被唤醒。也就是说这个 waitStatus 其实代表的不是自己的状态,而是后继节点的状态,我们知道,每个 node 在入队的时候,都会把前驱节点的状态改为 SIGNAL,然后阻塞,等待被前驱唤醒。这里涉及的是两个问题:有线程取消了排队、唤醒操作。其实本质是一样的,读者也可以顺着 “waitStatus代表后继节点的状态” 这种思路去看一遍源码。
(全文完)