AQS抽象队列同步器!

AQS定义了一套多线程访问共享资源的同步模板。

  • 解决了实现同步器时涉及的大量细节问题,能够极大地减少实现工作。

三部分组成:

  • state同步状态、Node组成的CLH队列、ConditionObject条件变量。
    • 包含Node组成的条件单向队列。

image-20231018135610476

整体流程

image-20231018192938937

使用 AQS 写一个线程协作工具类

1、新建一个线程协作工具类,在内部写一个 Sync 类。

  • Sync 类继承 AbstractQueuedSynchronizer,即 AQS。

2、在 Sync 类里,根据是否是独占,来重写对应的方法。

  • 如果是独占,则重写 tryAcquiretryRelease 等方法。
  • 如果是非独占,则重写 tryAcquireSharedtryReleaseShared 等方法。

3、在自己的线程协作工具类中,实现获取/释放的相关方法,并在里面调用 AQS 对应的方法。

  • 如果是独占则调用 acquirerelease 等方法。
  • 非独占则调用 acquireSharedreleaseSharedacquireSharedInterruptibly 等方法。

同步状态

AQS中维护了一个同步状态变量state,获取、释放资源是否成功都是由state决定的。

  • 比如state>0代表可获取资源,否则无法获取。

  • state的具体语义由实现者去定义。

ReentrantLock、ReentrantReadWriteLock、Semaphore、CountDownLatch定义的state语义都不一样:

  • ReentrantLockstate用来表示是否有锁资源。

  • ReentrantReadWriteLockstate16位代表读锁状态,低16位代表写锁状态。

  • Semaphorestate用来表示可用信号的个数。

  • CountDownLatchstate用来表示计数器的值。

CLH队列

CLHAQS内部维护的FIFO(先进先出)双端双向队列(方便尾部节点插入)。

  • 基于链表数据结构。

当一个线程竞争资源失败,就会将等待资源的线程封装成一个Node节点。

通过CAS原子操作插入队列尾部,最终不同的Node节点连接组成了一个CLH队列:

  • 先进先出保证了公平性。
  • 非阻塞的队列,通过自旋锁和CAS保证节点插入和移除的原子性,实现无锁快速插入。
  • 采用了自旋锁思想,所以CLH也是一种基于链表的可扩展、高性能、公平的自旋锁。

Node内部类

NodeAQS的内部类:

  • 每个等待资源的线程都会封装成Node节点组成CLH队列、等待队列。

线程获取资源失败,封装成Node节点从CLH队列尾部入队并阻塞线程。

  • 某线程释放资源时会把CLH队列首部Node节点关联的线程唤醒,再次获取资源。

image-20231018142402280

AQS中提供addWaiter函数完成Node节点的创建与入队。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private Node addWaiter(Node mode) {
//根据当前线程创建节点,等待状态为0
Node node = new Node(Thread.currentThread(), mode);
// 获取尾节点
Node pred = tail;
if (pred != null) {
//如果尾节点不等于null,把当前节点的前驱节点指向尾节点
node.prev = pred;
//通过cas把尾节点指向当前节点
if (compareAndSetTail(pred, node)) {
//之前尾节点的下个节点指向当前节点
pred.next = node;
return node;
}
}
//如果添加失败或队列不存在,执行end函数
enq(node);
return node;
}

添加节点的时候,如果从CLH队列已经存在,通过CAS快速将当前节点添加到队列尾部。

如果添加失败或队列不存在,则指向enq函数自旋入队。

  • 通过自旋CAS尝试往队列尾部插入节点,直到成功。
  • 自旋过程如果发现CLH队列不存在时会初始化CLH队列。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private Node enq(final Node node) {
for (;;) { //循环
//获取尾节点
Node t = tail;
if (t == null) {
//如果尾节点为空,创建哨兵节点,通过cas把头节点指向哨兵节点
if (compareAndSetHead(new Node()))
//cas成功,尾节点指向哨兵节点
tail = head;
} else {
//当前节点的前驱节点设指向之前尾节点
node.prev = t;
//cas设置把尾节点指向当前节点
if (compareAndSetTail(t, node)) {
//cas成功,之前尾节点的下个节点指向当前节点
t.next = node;
return t;
}
}
}
}

条件变量

Objectwait、notify函数是配合Synchronized锁实现线程间同步协作的功能。

AQSConditionObject条件变量,则通过ConditionObjectawaitsignal两类函数完成。

  • 不同于Synchronized锁,一个AQS可以对应多个条件变量,而Synchronized只有一个。

ConditionObject内部维护着一个单向条件队列,不同于CLH队列。

条件队列只入队执行await的线程节点,并且加入条件队列的节点。

  • 不能在CLH队列, 条件队列出队的节点,会入队到CLH队列。

当某个线程执行了ConditionObjectawait函数,阻塞当前线程。

线程会被封装成Node节点添加到条件队列的末端,其他线程执行ConditionObjectsignal函数。

  • 会将条件队列头部线程节点转移到CLH队列参与竞争资源。

image-20231018142940365

image-20231018192059833

AQS采用了模板方法设计模式,提供了两类模板:

  • 一类是独占式模板,另一类是共享形模式。

独占式获取资源

acquire模板函数,模板流程:

  • 线程获取共享资源,如果获取资源成功,线程直接返回。
  • 否则进入CLH队列,直到获取资源成功为止,且整个过程忽略中断的影响。

执行tryAcquire函数,tryAcquire是由子类实现,代表获取资源是否成功。

  • 如果资源获取失败,执行下面的逻辑。

执行addWaiter函数,根据当前线程创建出独占式节点,并入队CLH队列。

执行acquireQueued函数,自旋阻塞等待获取资源。

如果acquireQueued函数中获取资源成功。

  • 根据线程是否被中断状态,来决定执行线程中断逻辑。

image-20231018192408391

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
final boolean acquireQueued(final Node node, int arg) {
//异常状态,默认是
boolean failed = true;
try {
//该线程是否中断过,默认否
boolean interrupted = false;
for (;;) {//自旋
//获取前驱节点
final Node p = node.predecessor();
//如果前驱节点是首节点,获取资源(子类实现)
if (p == head && tryAcquire(arg)) {
//获取资源成功,设置当前节点为头节点,清空当前节点的信息,把当前节点变成哨兵节点
setHead(node);
//原来首节点下个节点指向为null
p.next = null; // help GC
//非异常状态,防止指向finally逻辑
failed = false;
//返回线程中断状态
return interrupted;
}
/**
* 如果前驱节点不是首节点,先执行shouldParkAfterFailedAcquire函数,shouldParkAfterFailedAcquire做了三件事
* 1.如果前驱节点的等待状态是SIGNAL,返回true,执行parkAndCheckInterrupt函数,返回false
* 2.如果前驱节点的等大状态是CANCELLED,把CANCELLED节点全部移出队列(条件节点)
* 3.以上两者都不符合,更新前驱节点的等待状态为SIGNAL,返回false
*/
if (shouldParkAfterFailedAcquire(p, node) &&
//使用LockSupport类的静态方法park挂起当前线程,直到被唤醒,唤醒后检查当前线程是否被中断,返回该线程中断状态并重置中断状态
parkAndCheckInterrupt())
//该线程被中断过
interrupted = true;
}
} finally {
// 尝试获取资源失败并执行异常,取消请求,将当前节点从队列中移除
if (failed)
cancelAcquire(node);
}
}

独占式释放资源

AQS中提供了release模板函数来释放资源,模板流程就是线程释放资源成功。

  • 唤醒CLH队列的第二个线程节点(首节点的下个节点)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public final boolean release(int arg) {

if (tryRelease(arg)) {//释放资源成功,tryRelease子类实现
//获取头部线程节点
Node h = head;
if (h != null && h.waitStatus != 0) //头部线程节点不为null,并且等待状态不为0
//唤醒CHL队列第二个线程节点
unparkSuccessor(h);
return true;
}
return false;
}


private void unparkSuccessor(Node node) {
//获取节点等待状态
int ws = node.waitStatus;
if (ws < 0)
//cas更新节点状态为0
compareAndSetWaitStatus(node, ws, 0);

//获取下个线程节点
Node s = node.next;
if (s == null || s.waitStatus > 0) { //如果下个节点信息异常,从尾节点循环向前获取到正常的节点为止,正常情况不会执行
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
//唤醒线程节点
LockSupport.unpark(s.thread);
}
}

共享式获取资源

acquireShared模板流程就是线程获取共享资源,如果获取到资源,线程直接返回。

  • 否则进入CLH队列,直到获取到资源为止,且整个过程忽略中断的影响。
1
2
3
4
5
6
7
8
9
10
public final void acquireShared(int arg) {
/**
* 1.负数表示失败
* 2.0表示成功,但没有剩余可用资源
* 3.正数表示成功且有剩余资源
*/
if (tryAcquireShared(arg) < 0) //获取资源失败,tryAcquireShared子类实现
//自旋阻塞等待获取资源
doAcquireShared(arg);
}

共享式释放资源

AQS中提供了releaseShared模板函数来释放资源,模板流程就是线程释放资源成功。

  • 唤醒CHL队列的第二个线程节点(首节点的下个节点)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {//释放资源成功,tryReleaseShared子类实现
//唤醒后继节点
doReleaseShared();
return true;
}
return false;
}

private void doReleaseShared() {
for (;;) {
//获取头节点
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;

if (ws == Node.SIGNAL) {//如果头节点等待状态为SIGNAL
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))//更新头节点等待状态为0
continue; // loop to recheck cases
//唤醒头节点下个线程节点
unparkSuccessor(h);
}
//如果后继节点暂时不需要被唤醒,更新头节点等待状态为PROPAGATE
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
if (h == head)
break;
}
}