溯源设备 发表于 2025-2-5 14:41:49

AQS源码解析

掌握AQS

何为AQS

AQS是一个用于构建锁和同步器的基础框架,实际上AQS核心代码主要是实现了线程的阻塞和唤醒机制以及资源状态的基本管理
AQS的核心功能


[*]线程的排队管理:通过 CLH 队列管理线程的阻塞和唤醒。
[*]资源状态的基本管理:通过 state 表示资源状态,通过 compareAndSetState 实现原子操作。
[*]线程的唤醒和调度:通过 unparkSuccessor 唤醒后继线程。
由于AQS目的就是做一个构建锁和同步器的基础框架,对于这类框架以上功能有部分是我们AQS可以直接实现,例如线程的唤醒和阻塞,线程的排队管理也可以我们直接在AQS实现,另一部分,资源状态的基本管理,在AQS是无法直接写死的,所以AQS使用了模板方法,并且抽象出了state,所有对state的操作都由AQS的子类来实现
线程获取不到锁首先为什么要排队?

如果线程在资源不可用的情况下继续占用 CPU,而不是进入等待队列进行管理,其实是浪费了CPU资源,让CPU去执行其他线程的代码,可以提升整个系统的并发量嘛,上面是阻塞的流程,而阻塞归根到底到有资源的时候就需要接着唤醒,所以才以队列这样的数据结构来实现此功能
核心API

例如如下两个,实际上有很多个,不过功能类似,只不过要考虑独占锁与共享锁以及是否支持中断

[*]acquire
[*]release
(随口一提)为什么要支持中断?

由于是底层框架嘛,需要为用户提供更多种选择,所以支持了中断与非中断
(随口一提)中断与不可中断的区别?


[*]不可中断模式 (Uninterruptible):如 acquire 方法,线程在获取资源时忽略中断,必须获取到资源后才能退出。
[*]可中断模式 (Interruptible):如 acquireInterruptibly 方法,线程在获取资源时可以响应中断,提前退出。
设计不可中断模式的原因:


[*]简化逻辑:

[*]在很多场景下,锁的获取过程希望简单明确,即 "要么成功获取,要么阻塞直到获取到资源"。
[*]忽略中断可以避免处理中断逻辑带来的复杂性,减少开发者出错的可能性。

[*]保证任务完整性:

[*]某些关键任务不希望因为中断信号而被打断,特别是在资源状态必须保持一致的情况下。

[*]降低性能开销:

[*]响应中断需要额外的判断逻辑,忽略中断可以简化实现,提高性能。

(随口一提)中断方式如何实现

对于java来讲是简单的,无非就是获取当前线程的状态,代码上来讲就是Thread.interrupted(),获取当前线程是否被中断
如何管理(独占锁)之acquire

AQS通过Node对象实例来进行映射被阻塞的线程,也就是Node类里面有个Thread属性
public final void acquire(int arg) {      if (!tryAcquire(arg) &&            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))            selfInterrupt();    }tryAcquire

交由子类来实现
添加新阻塞线程的流程addWaiter


[*]生成一个node(构造器参数:currentThread,mode("独占" or "共享"))
[*]如果tail为null,直接enq(node),并且返回node
(tail是AQS实例的变量还有一个head)
[*]非null的话,会使用CAS的方式去修改对应的tail(把tail设置成node),同时原先的prev(原先的tail)的next设置成node,CAS成功直接返回node
[*]CAS失败走退化路径(enq)
但是上面的逻辑仍然存在并发问题,例如说下面两个情况:

[*]如果两个线程同时判断当前tail为null呢?
[*]如果两个线程同时执行获取到tail,只有一个CAS成功呢,那另一个怎么办?
所以完整的过程
核心方法退化路径 (enq)

enq的核心逻辑就是自旋+CAS就和Synchronized在轻量级锁时类似机制,只不过enq是死循环
提出问题:为什么不直接使用enq这么设计有什么好处?

其实是快速路径和完整路径的设计思想
快速路径指的是addWaiter中的这一段
Node pred = tail;      if (pred != null) {            node.prev = pred;            if (compareAndSetTail(pred, node)) {                pred.next = node;                return node;            }      }而完整路径则指代的是enq的逻辑
这么设计的好处在于

[*]提升性能:大部分情况下,是已有head的那么再去考虑head的if逻辑判断其实是没必要执行的,另一方面,如果我们在多线程的情况下,我们不在这里通过快速路径的方式过滤掉一部分线程,到完整路径的自旋等等操作会导致竞争更加激烈,自然能提升一部分的性能,所以这类设计使得部分场景性能更优
[*]分离简单情况和复杂情况:这个主要是编码思路的学习了,这种设计是典型的 "优化常见路径,退化处理复杂情况" 的思想,是一种平衡性能和线程安全的工程实践。如果工程上遇到较为复杂业务也可以利用这个思想来对场景进行解剖
acquireQueued

final boolean acquireQueued(final Node node, int arg) {      boolean failed = true;      try {            boolean interrupted = false;            for (;;) {                final Node p = node.predecessor();                if (p == head && tryAcquire(arg)) {                  setHead(node);                  p.next = null; // help GC                  failed = false;                  return interrupted;                }                if (shouldParkAfterFailedAcquire(p, node) &&                  parkAndCheckInterrupt())                  interrupted = true;            }      } finally {            if (failed)                cancelAcquire(node);      }    }获取前驱节点并检查是否为队首:
final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {    setHead(node); // 成为新的头节点    p.next = null; // 帮助 GC    failed = false;    return interrupted;}

[*]如果当前节点的前驱是头节点,尝试获取资源。
[*]成功后,将当前节点设置为新的头节点,断开原头节点的引用。
判断是否需要挂起:
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())    interrupted = true;

[*]如果获取失败,检查前驱节点是否要求挂起。
[*]挂起线程,等待被唤醒。
处理中断:

[*]acquireQueued 不响应中断,但会记录中断状态。
[*]如果线程在等待过程中被中断,interrupted 变量会被设置为 true。
失败处理:
if (failed)    cancelAcquire(node);

[*]如果线程未成功获取资源(failed == true),调用 cancelAcquire 移除当前节点。
如何管理(独占锁)之release


[*]if(tryRelease(arg))-->由子类实现来负责修改state值
[*]如果head存在并且head.waitStatus!=0,进行唤醒后续节点unparkSuccessor
waitStatus的意义

waitStatus 的作用: waitStatus 是 AQS 中每个节点的重要状态字段,用来描述节点的状态:

[*]0:节点处于正常状态。
[*]< 0(如 Node.SIGNAL = -1):表示后继节点需要被唤醒。
[*]> 0:节点被取消(如中断、超时等)。
unparkSuccessor


[*]尝试清理当前节点的 waitStatus:
if (ws < 0)    compareAndSetWaitStatus(node, ws, 0);

[*]如果当前节点的 waitStatus 为负数(通常为 Node.SIGNAL),表示需要通知后继节点。
[*]在通知前,将其重置为 0,避免重复通知。

[*]找到需要唤醒的后继节点:


[*]首先尝试使用 node.next。
[*]如果 node.next 为 null 或 waitStatus > 0(取消状态),则从尾部向前查找有效节点。

[*]唤醒线程:


[*]找到有效节点后,通过 LockSupport.unpark(s.thread) 唤醒对应线程。
1. 为什么需要从尾部遍历?

这是为了找到真正需要被唤醒的 有效后继节点。
原因是:

[*]节点可能被取消:

[*]某些节点在等待期间可能因为超时或中断而被取消(waitStatus > 0 表示取消状态)。
[*]如果直接访问 node.next,它可能是一个取消的节点,无法唤醒。

[*]node.next 可能为空:

[*]在某些特殊情况下,队列的连接关系可能未完全建立(例如,前后节点未及时更新)。
[*]为了确保不遗漏有效节点,需要从队列尾部逆向查找。

最终问题


[*]为什么release这边不进行节点的释放,那么最终Node的释放如何控制?
因为对应的逻辑会在acquire进行执行,主要情况是会在acquireQueue中进行,在aquireQueue中如果判断当前node的前驱节点是head,会进行help GC 的操作,而且acquireQueue是一个死循环除非前驱节点是head&&拿到资源了也就是轮到他来执行了,才会去释放掉原本的head,所以我们release只需要去确保能够正确唤醒后续节点即可

[*]acquireQueued 是否会阻塞线程?
会阻塞线程。
虽然代码中线程会自旋尝试获取资源,但并不是一直占用 CPU 进行无休止地尝试。
具体来说:

[*]如果当前线程的前驱节点不是 head,或者资源无法被成功获取,线程会调用 parkAndCheckInterrupt 方法,将线程挂起(阻塞),直到被显式唤醒。
[*]因此,acquireQueued 的实现并非忙等,而是通过 自旋 + 阻塞 的方式高效等待。
页: [1]
查看完整版本: AQS源码解析