AQS抽象队列同步器!
AQS抽象队列同步器!
月伴飞鱼
AQS
定义了一套多线程访问共享资源的同步模板。
- 解决了实现同步器时涉及的大量细节问题,能够极大地减少实现工作。
三部分组成:
state
同步状态、Node
组成的CLH
队列、ConditionObject
条件变量。
- 包含
Node
组成的条件单向队列。
整体流程
使用 AQS 写一个线程协作工具类
1、新建一个线程协作工具类,在内部写一个
Sync
类。
- 该
Sync
类继承AbstractQueuedSynchronizer
,即 AQS。2、在
Sync
类里,根据是否是独占,来重写对应的方法。
- 如果是独占,则重写
tryAcquire
和tryRelease
等方法。- 如果是非独占,则重写
tryAcquireShared
和tryReleaseShared
等方法。3、在自己的线程协作工具类中,实现获取/释放的相关方法,并在里面调用
AQS
对应的方法。
- 如果是独占则调用
acquire
或release
等方法。- 非独占则调用
acquireShared
或releaseShared
或acquireSharedInterruptibly
等方法。
同步状态
在
AQS
中维护了一个同步状态变量state
,获取、释放资源是否成功都是由state
决定的。
比如
state>0
代表可获取资源,否则无法获取。
state
的具体语义由实现者去定义。
ReentrantLock、ReentrantReadWriteLock、Semaphore、CountDownLatch
定义的state
语义都不一样:
ReentrantLock
的state
用来表示是否有锁资源。
ReentrantReadWriteLock
的state
高16
位代表读锁状态,低16
位代表写锁状态。
Semaphore
的state
用来表示可用信号的个数。
CountDownLatch
的state
用来表示计数器的值。
CLH队列
CLH
是AQS
内部维护的FIFO
(先进先出)双端双向队列(方便尾部节点插入)。
- 基于链表数据结构。
当一个线程竞争资源失败,就会将等待资源的线程封装成一个
Node
节点。通过
CAS
原子操作插入队列尾部,最终不同的Node
节点连接组成了一个CLH
队列:
- 先进先出保证了公平性。
- 非阻塞的队列,通过自旋锁和
CAS
保证节点插入和移除的原子性,实现无锁快速插入。- 采用了自旋锁思想,所以
CLH
也是一种基于链表的可扩展、高性能、公平的自旋锁。
Node内部类
Node
是AQS
的内部类:
- 每个等待资源的线程都会封装成
Node
节点组成CLH
队列、等待队列。线程获取资源失败,封装成
Node
节点从CLH
队列尾部入队并阻塞线程。
- 某线程释放资源时会把
CLH
队列首部Node
节点关联的线程唤醒,再次获取资源。
在
AQS
中提供addWaiter
函数完成Node
节点的创建与入队。
1 | private Node addWaiter(Node mode) { |
添加节点的时候,如果从
CLH
队列已经存在,通过CAS
快速将当前节点添加到队列尾部。如果添加失败或队列不存在,则指向
enq
函数自旋入队。
- 通过自旋
CAS
尝试往队列尾部插入节点,直到成功。- 自旋过程如果发现
CLH
队列不存在时会初始化CLH
队列。
1 | private Node enq(final Node node) { |
条件变量
Object
的wait、notify
函数是配合Synchronized
锁实现线程间同步协作的功能。
AQS
的ConditionObject
条件变量,则通过ConditionObject
的await
和signal
两类函数完成。
- 不同于
Synchronized
锁,一个AQS
可以对应多个条件变量,而Synchronized
只有一个。
ConditionObject
内部维护着一个单向条件队列,不同于CLH
队列。条件队列只入队执行
await
的线程节点,并且加入条件队列的节点。
- 不能在
CLH
队列, 条件队列出队的节点,会入队到CLH
队列。当某个线程执行了
ConditionObject
的await
函数,阻塞当前线程。线程会被封装成
Node
节点添加到条件队列的末端,其他线程执行ConditionObject
的signal
函数。
- 会将条件队列头部线程节点转移到
CLH
队列参与竞争资源。
AQS
采用了模板方法设计模式,提供了两类模板:
- 一类是独占式模板,另一类是共享形模式。
独占式获取资源
acquire
模板函数,模板流程:
- 线程获取共享资源,如果获取资源成功,线程直接返回。
- 否则进入
CLH
队列,直到获取资源成功为止,且整个过程忽略中断的影响。执行
tryAcquire
函数,tryAcquire
是由子类实现,代表获取资源是否成功。
- 如果资源获取失败,执行下面的逻辑。
执行
addWaiter
函数,根据当前线程创建出独占式节点,并入队CLH
队列。执行
acquireQueued
函数,自旋阻塞等待获取资源。如果
acquireQueued
函数中获取资源成功。
- 根据线程是否被中断状态,来决定执行线程中断逻辑。
1 | final boolean acquireQueued(final Node node, int arg) { |
独占式释放资源
AQS
中提供了release
模板函数来释放资源,模板流程就是线程释放资源成功。
- 唤醒
CLH
队列的第二个线程节点(首节点的下个节点)。
1 | public final boolean release(int arg) { |
共享式获取资源
acquireShared
模板流程就是线程获取共享资源,如果获取到资源,线程直接返回。
- 否则进入
CLH
队列,直到获取到资源为止,且整个过程忽略中断的影响。
1 | public final void acquireShared(int arg) { |
共享式释放资源
AQS
中提供了releaseShared
模板函数来释放资源,模板流程就是线程释放资源成功。
- 唤醒CHL队列的第二个线程节点(首节点的下个节点)。
1 | public final boolean releaseShared(int arg) { |