AQS底层原理
AQS底层原理
详细版本:https://blog.csdn.net/m0_73866527/article/details/142518162?spm=1001.2014.3001.5501
AQS架构
AQS核心思想
- AQS使用一个Volatile的int类型的成员变量State来表示同步状态。
- 通过CAS完成对State值的修改来获得锁。
- 未获得锁的线程放在内置的FIFO队列来完成锁获取的排队工作。
AQS数据结构
private transient Thread exclusiveOwnerThread; // 标识拿到锁的是哪个线程private transient volatile Node head; // 标识头节点private transient volatile Node tail; // 标识尾节点private volatile int state; // 同步状态,为0时,说明可以抢锁
Volatile修饰的int类型的state变量:
- State字段可以用来实现多线程的独占模式和共享模式。
- 如果要实现独占模式则可以初始化state字段为0。线程想要获取资源时,判断state是否为0,如果为0,则表示线程可以获取资源。然后修改state为1。如果state不为0,则表示不能获得资源。
- 如果要实现共享模式则可以初始化state为n。如果state不为0,那么线程都可以获取这把锁。
- 也就是说,我们可以通过规定state从而自定义了加锁逻辑。
Node类型的数据结构:
-
线程如果没获取到锁,就会被包装成一个Node类型的结点放到FIFO队列中。
/** waitStatus值,表示线程已被取消(等待超时或者被中断)*/ static final int CANCELLED = 1; /** waitStatus值,表示后继线程需要被唤醒(unpaking)*/ static final int SIGNAL = -1; /**waitStatus值,表示结点线程等待在condition上,当被signal后,会从等待队列转移到同步到队列中 */ static final int CONDITION = -2; /** waitStatus值,表示下一次共享式同步状态会被无条件地传播下去*/ static final int PROPAGATE = -3;volatile int waitStatus; // Node里,记录状态用的。初始为0volatile Thread thread; // Node里,标识哪个线程volatile Node prev; // 前驱节点(这个Node的上一个是谁)volatile Node next; // 后继节点(这个Node的个一个是谁)
-
waitStatus变量的枚举值:
枚举 含义 0 当一个Node被初始化的时候的默认值 CANCELLED 为1,表示线程获取锁的请求已经取消了 CONDITION 为-2,表示节点在等待队列中,节点线程等待唤醒 PROPAGATE 为-3,当前线程处在SHARED情况下,该字段才会使用 SIGNAL 为-1,表示线程已经准备好了,就等资源释放了
AQS源码分析
我们拿ReentrantLock加锁举例从而进入AQS源码:
整体加锁流程是这样的:
- 通过ReentrantLock的加锁方法Lock进行加锁操作。
- 会调用到内部类Sync的Lock方法,由于Sync#lock是抽象方法,根据ReentrantLock初始化选择的公平锁和非公平锁,执行相关内部类的Lock方法,本质上都会执行AQS的Acquire方法。
- AQS的Acquire方法会执行tryAcquire方法,但是由于tryAcquire需要自定义同步器实现,因此执行了ReentrantLock中的tryAcquire方法,由于ReentrantLock是通过公平锁和非公平锁内部类实现的tryAcquire方法,因此会根据锁类型不同,执行不同的tryAcquire。
- tryAcquire是获取锁逻辑,获取失败后,会执行框架AQS的后续逻辑,跟ReentrantLock自定义同步器无关。
- 获取失败后的后续逻辑就是执行addWaiter方法将线程加入队列中,该方法返回的是一个包含该线程的Node。
- Node会作为参数,进入到acquireQueued方法中。acquireQueued方法可以对排队中的线程进行“获锁”操作。
- acquireQueued会把放入队列中的线程不断去获取锁,直到获取成功或者不再需要获取(中断)。
- 进入到队列里的线程有两种状态,一种是不断循环获取锁,一种是被挂起等待被前继结点唤醒从而再获取锁。因此这样避免不断死循环浪费cpu资源。
- **进入shouldParkAfterFailedAcquire方法中,**判断前置节点的状态来决定是否要将当前线程挂起。如果前继结点的状态为SIGNAL,则将当前线程阻塞,而如果前继结点状态是CANCELLED则移除该结点直到前继结点不是CANCELLED状态为止。然后接着进入for循环。
- 如果判断出这个线程应被挂起,则进入parkAndCheckInterrupt方法。这个方法是专门用来挂起线程的。
- 这个被挂起的线程最终会等待它的前继结点释放锁后将其唤醒,唤醒之后继续进入for循环,直到获取锁成功。
- 释放锁逻辑是进入unlock方法,本质上都会执行AQS的Acquire方法。AQS的Acquire方法会执行tryRelease方法,而tryRelease方法也就是释放锁逻辑是自定义同步器实现的。释放锁完毕后,会唤醒后继结点。
以下是源码的补充:
-
acquireQueued
// java.util.concurrent.locks.AbstractQueuedSynchronizerfinal boolean acquireQueued(final Node node, int arg) {// 标记是否成功拿到资源boolean failed = true;try {// 标记等待过程中是否中断过boolean interrupted = false;// 开始自旋,要么获取锁,要么中断// 状态 1):不断循环获取锁for (;;) {// 获取当前节点的前驱节点final Node p = node.predecessor();// 如果p是头结点,说明当前节点在真实数据队列的首部,就尝试获取锁(别忘了头结点是虚节点)if (p == head && tryAcquire(arg)) {// 获取锁成功,头指针移动到当前nodesetHead(node);p.next = null; // help GCfailed = false;return interrupted;}// 说明p为头节点且当前没有获取到锁(可能是非公平锁被抢占了)或者是p不为头结点,这个时候就要判断当前node是否要被阻塞(被阻塞条件:前驱节点的waitStatus为-1),防止无限循环浪费资源。具体两个方法下面细细分析// 状态 2):挂起,等待前继结点的唤醒if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);} }
-
shouldParkAfterFailedAcquire
// 靠前驱节点判断当前线程是否应该被阻塞 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {// 获取头结点的节点状态int ws = pred.waitStatus;// 说明头结点处于唤醒状态if (ws == Node.SIGNAL)return true; // 通过枚举值我们知道waitStatus>0是取消状态if (ws > 0) {do {// 循环向前查找取消节点,把取消节点从队列中剔除node.prev = pred = pred.prev;} while (pred.waitStatus > 0);pred.next = node;} else {// 设置前任节点等待状态为SIGNALcompareAndSetWaitStatus(pred, ws, Node.SIGNAL);}return false; }
-
parkAndCheckInterrupt
private final boolean parkAndCheckInterrupt() {LockSupport.park(this);return Thread.interrupted(); }
-
释放锁
public void unlock() {sync.release(1); }
// java.util.concurrent.locks.AbstractQueuedSynchronizerpublic final boolean release(int arg) {// 上边自定义的tryRelease如果返回true,说明该锁没有被任何线程持有if (tryRelease(arg)) {// 获取头结点Node h = head;// 头结点不为空并且头结点的waitStatus不是初始化节点情况,解除线程挂起状态if (h != null && h.waitStatus != 0)unparkSuccessor(h);return true;}return false; }
提出疑问:
线程结点的状态为CANCELLED是怎么来的?
回答:
我们注意看acquireQueued方法,finally中的cancelAcquire方法就是设置线程为CANCELLED状态的方法。
// java.util.concurrent.locks.AbstractQueuedSynchronizerfinal boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {...for (;;) {final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {...failed = false;...}...} finally {if (failed)cancelAcquire(node);}
}
参考
https://tech.meituan.com/2019/12/05/aqs-theory-and-apply.html#:~:text=AQS%E6%98%AF%E4%B8%80%E7%A7%8D%E6%8F%90%E4%BE%9B%E4%BA%86%E5%8E%9F