0%

AbstractQueuedSynchronizer源码学习

前言:什么是AQS?之前我一直不懂,网上看了一些文章也是似懂非懂,所以今天我从源码学习,将这个类源码全部啃下来。

简述

AbstractQueuedSynchronizer(AQS)是一个队列同步器。是JUC锁框架中最重要的类,AQS包含了以下内容:CLH队列,Condition队列,独占式锁的获取和释放,共享锁的获取和释放以及可中断锁,超时等待锁获取这些特性的实现,另外JUC中还有AbstractQueuedLongSynchronizer类,这个与AQS相同只是把成员变量state从int变为long

AQS是一个抽象类,但是没有抽象方法,继承类需要根据要实现的锁特点重写不同的方法,如ReentrantLock独占锁要重写tryAcquire、tryRelease等方法,Semaphore共享锁要重写tryAcquireShared、tryReleaseShared等方法

内部类

image-20200104001101323

内部类详情见AQS Node类 与Condition源码学习这两篇文章

方法

image-20200104001110373

image-20200104001120317

image-20200104001128976

属性

image-20200104001137110

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// CLH队列头结点
private transient volatile Node head;

// CLH队列尾结点
private transient volatile Node tail;

// 同步状态,这是一个抽象概念,在Semaphore中state是目前资源的数量,在ReentrantLock中是加锁的次数
private volatile int state;

// 用于doAcquireNanos等方法,当nanosTimeout小于spinForTimeoutThreshold 则不会执行park,继续CAS自旋(因为nanosTimeout时间已经很小了,执行park反而会浪费更多时间)
static final long spinForTimeoutThreshold = 1000L;

// Unsafe是位于sun.misc包下的一个类,主要提供一些用于执行低级别、不安全操作的方法,如直接访问系统内存资源、自主管理内存资源等。线程的唤醒、挂起与CAS等操作底层均又unsafe实现
private static final Unsafe unsafe = Unsafe.getUnsafe();

// 下面5个是属性偏移量,用于unsafe对内存的操作,使用静态代码块初始化
private static final long stateOffset;
private static final long headOffset;
private static final long tailOffset;
private static final long waitStatusOffset;
private static final long nextOffset;

// 静态代码块初始化
static {
try {
stateOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("state"));
headOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("head"));
tailOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
waitStatusOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("waitStatus"));
nextOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("next"));

} catch (Exception ex) { throw new Error(ex); }
}

CLH队列,CLH是(Craig, Landin, and Hagersten)三位创造者的缩写,CLH队列是CLH同步锁的一种变形。其主要从两方面进行了改造:节点的结构与节点等待机制。

在结构上,AQS类引入了头结点和尾节点,他们分别指向队列的头和尾,尝试获取锁、入队列、释放锁等实现都与头尾节点相关

在等待机制上将自旋机制改为阻塞机制,当前线程将首先检测是否为头结点且尝试获取锁,如果当前节点为头结点并成功获取锁则直接返回,当前线程不进入阻塞,否则将当前线程阻塞,直到被唤醒。

CLH队列本质是一个以链表实现的双向队列,以Node类作为节点,有哨兵结点(head结点作为哨兵,不存放线程),CLH队列是公平的,只有head.next结点能获取锁

说明:如果每次访问冲突概率小于 20%,推荐使用乐观锁,否则使用悲观锁。乐观锁的重试次数不得小于3次。

image-20200104001036764

除了CLH队列,AQS中还要一个Condition队列,Condition队列是用于支持await()、signal()方法,实现堵塞、唤醒,被await()的线程会释放锁并加入Condition队列直到被signal()唤醒加入CLH队列,然后重新尝试获取锁,两者关系如下图:

image-20200104001044679

acquire

获取资源,失败则将线程封装为Node结点并放入CLH队列,独占锁

1
2
3
4
5
6
public final void acquire(int arg) {
// 尝试获取资源,注意tryAcquire需要被子类重写,获取资源失败,调用addWaiter(Node.EXCLUSIVE), arg)方法,将结点设置为独占方式并放入CLH队列,调用acquireQueued CAS自旋获取锁
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

acquireInterruptibly

能响应中断的acquire方法,独占锁

1
2
3
4
5
6
7
8
9
public final void acquireInterruptibly(int arg)
throws InterruptedException {
// 响应中断
if (Thread.interrupted())
throw new InterruptedException();
// 尝试获取资源,注意tryAcquire需要被子类重写,获取资源失败,则调用doAcquireInterruptibly将线程放入CLH队列并CAS自旋获取锁
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}

acquireQueued

结点进入CLH队列后,就会调用acquireQueued方法来CAS自旋获取锁,注意不会一直自旋而且只有头结点才能获取资源(去除哨兵结点的头结点,所以CLH队列是公平的),获取锁失败就会调用shouldParkAfterFailedAcquire、parkAndCheckInterrupt方法将线程堵塞,一般第一次失败将结点状态从0设置为-1(Signal),连续失败两次就会park,当被唤醒(其他线程释放锁),结点状态会恢复为0,重新再次开始CAS自旋获取锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
// 设置中断标识为false
boolean interrupted = false;
for (;;) {
// 获取前驱结点
final Node p = node.predecessor();
// 只有前驱结点为head的结点才能尝试获取锁
if (p == head && tryAcquire(arg)) {
// 获取锁成功,清除结点
setHead(node);
p.next = null; // help GC
failed = false;
// 不响应中断
return interrupted;
}
// 获取锁失败,shouldParkAfterFailedAcquire修改结点状态并返回是否要将结点堵塞
// shouldParkAfterFailedAcquire返回true,则执行parkAndCheckInterrupt方法将线程堵塞
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
// 设置中断标识为true
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

acquireShared

获取资源,共享锁

1
2
3
4
5
public final void acquireShared(int arg) {
// 尝试获取资源,注意tryAcquireShared需要被子类重写,获取资源失败,调用doAcquireShared方法
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}

acquireSharedInterruptibly

能响应中断的acquireShared方法,共享锁

1
2
3
4
5
6
7
8
9
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
// 响应中断
if (Thread.interrupted())
throw new InterruptedException();
// 尝试获取资源,注意tryAcquireShared需要被子类重写,获取资源失败,调用doAcquireShared方法
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}

addWaiter

将当前线程封装为结点并加入CLH队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private Node addWaiter(Node mode) {
// 以当前线程、传入的方式创建结点
Node node = new Node(Thread.currentThread(), mode);
// 如果尾结点不为空,CAS将结点加入队列
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// CAS加入CLH队列失败或者tail为空,调用enq()将结点加入队列
enq(node);
return node;
}

apparentlyFirstQueuedIsExclusive

判断head.next是否正在等待独占锁,仅用于ReentrantReadWriteLock(可重入读写锁)

1
2
3
4
5
6
7
final boolean apparentlyFirstQueuedIsExclusive() {
Node h, s;
return (h = head) != null &&
(s = h.next) != null &&
!s.isShared() &&
s.thread != null;
}

头结点不为空、头结点的下一个结点不为空并且不是共享方式(独占方式,写锁)、线程不为空,则返回true

cancelAcquire

cancelAcquire用于acquireQueued、doAcquireInterruptibly、doAcquireNanos、doAcquireShared、doAcquireSharedInterruptibly、doAcquireSharedNanos这六个方法的finally语句,当获取锁失败并出现异常时清除结点(获取锁成功,上面6个方法会自己清除结点)

我对这个方法的理解:结束Acquire方法并清除结点

清除结点分为两部分:1、清除结点数据,2、结点从CLH队列删除(出队)

删除结点需要考虑四种情况:结点是tail、结点是head、结点是head.next、结点是中间节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
private void cancelAcquire(Node node) {
// 如果结点为null
if (node == null)
return;
// 结点不再关联到任何线程
node.thread = null;
// 跳过被cancel的前继结点,找到一个有效的前继节点pred
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;

Node predNext = pred.next;
// 将结点状态设为CANCELLED
node.waitStatus = Node.CANCELLED;
// 如果结点是tail,更新tail为pred,并使pred.next指向null
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
// 如果结点既不是tail,又不是head的后继节点
// 则将结点的前继节点的waitStatus置为SIGNAL
// 并使结点的前继节点指向结点的后继节点
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
// 如果node是head的后继节点,则直接唤醒node的后继节点
unparkSuccessor(node);
}
// 去除结点(如果结点是head这种情况是最简单的,只需要执行这通用的一步即可)
node.next = node; // help GC
}
}

compareAndSet方法系列

5个CAS方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// CAS设置head结点
private final boolean compareAndSetHead(Node update) {
return unsafe.compareAndSwapObject(this, headOffset, null, update);
}

// CAS设置next结点
private static final boolean compareAndSetNext(Node node,
Node expect,
Node update) {
return unsafe.compareAndSwapObject(node, nextOffset, expect, update);
}

// CAS设置状态,可以由子方法调用
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

// CAS设置tail结点
private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}

// CAS设置结点状态
private static final boolean compareAndSetWaitStatus(Node node,
int expect,
int update) {
return unsafe.compareAndSwapInt(node, waitStatusOffset,
expect, update);
}

doAcquireInterruptibly

由acquireInterruptibly方法调用,类似acquireQueued,不同在于能响应中断

方法功能:将结点加入CLH队列并CAS获取锁(独占锁)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
// 调用addWaiter将结点加入CLH队列
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
// CAS 自旋
for (;;) {
// 获取前驱结点
final Node p = node.predecessor();
// 如果前驱结点为head则尝试获取锁
if (p == head && tryAcquire(arg)) {
// 获取锁成功则清除结点
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
// 获取锁失败,shouldParkAfterFailedAcquire修改结点状态并返回是否要将结点堵塞
// shouldParkAfterFailedAcquire返回true,则执行parkAndCheckInterrupt方法将线程堵塞
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
// 相应中断
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}

doAcquireNanos

同acquireInterruptibly,只由tryAcquireNanos方法调用

功能:将结点加入CLH队列并先限时CAS获取锁(独占锁)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
// 超时
if (nanosTimeout <= 0L)
return false;
// 结束时间
final long deadline = System.nanoTime() + nanosTimeout;
// 调用addWaiter将结点加入CLH队列
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
// CAS自旋
for (;;) {
// 获取先驱结点
final Node p = node.predecessor();
// 先驱结点是head则尝试获取资源
if (p == head && tryAcquire(arg)) {
// 删除结点
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
// 计算剩余时间
nanosTimeout = deadline - System.nanoTime();
// 超时则结束
if (nanosTimeout <= 0L)
return false;
// 获取锁失败,shouldParkAfterFailedAcquire修改结点状态并返回是否要将结点堵塞
// 如果剩余时间大于spinForTimeoutThreshold阈值,则执行堵塞
// shouldParkAfterFailedAcquire返回true,则执行parkNanos方法将线程限时堵塞
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
// 响应中断
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}

doAcquireShared

由acquireShared调用,不响应中断

功能:将结点加入CLH队列并先CAS获取资源(共享锁)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
private void doAcquireShared(int arg) {
// 以共享方式将结点加入CLH队列
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
// CAS自旋
for (;;) {
// 获取前驱结点
final Node p = node.predecessor();
// 如果前驱结点是head
if (p == head) {
// 尝试获取指定数目共享资源,返回剩余资源数量
int r = tryAcquireShared(arg);
if (r >= 0) {
// 清除结点(head是哨兵结点,将node设为head就是清除)
setHeadAndPropagate(node, r);
p.next = null; // help GC
// 给线程设置一个中断标志
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
// 获取锁失败,shouldParkAfterFailedAcquire修改结点状态并返回是否要将结点堵塞
// shouldParkAfterFailedAcquire返回true,则执行parkAndCheckInterrupt方法将线程堵塞
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

doAcquireSharedInterruptibly

同doAcquireShared,响应中断

功能:将结点加入CLH队列并先CAS获取资源(共享锁)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
// 以共享方式将结点加入CLH队列
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
// CAS自旋
for (;;) {
// 获取前驱结点
final Node p = node.predecessor();
// 前驱结点为head结点
if (p == head) {
// 尝试获取指定数目共享资源,返回剩余资源数量
int r = tryAcquireShared(arg);
if (r >= 0) {
// 清除结点(head是哨兵结点,将node设为head就是清除)
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
// 获取锁失败,shouldParkAfterFailedAcquire修改结点状态并返回是否要将结点堵塞
// shouldParkAfterFailedAcquire返回true,则执行parkAndCheckInterrupt方法将线程堵塞
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
// 响应中断,抛出异常
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}

doAcquireSharedNanos

同doAcquireSharedInterruptibly,限时

功能:将结点加入CLH队列并先CAS获取资源(共享锁)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
// 截止时间
final long deadline = System.nanoTime() + nanosTimeout;
// 以共享方式将结点加入CLH队列
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
// CAS自旋
for (;;) {
// 获取前驱结点
final Node p = node.predecessor();
// 前驱结点为head结点
if (p == head) {
// 尝试获取指定数目共享资源,返回剩余资源数量
int r = tryAcquireShared(arg);
if (r >= 0) {
// 清除结点(head是哨兵结点,将node设为head就是清除)
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return true;
}
}
// 剩余时间
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
return false;
// 获取锁失败,shouldParkAfterFailedAcquire修改结点状态并返回是否要将结点堵塞
// 剩余时间如果大于spinForTimeoutThreshold阈值则执行堵塞操作
// shouldParkAfterFailedAcquire返回true,则执行parkAndCheckInterrupt方法将线程堵塞
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
// 响应中断,抛出异常
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}

doReleaseShared

在资源释放后执行,用于设置结点状态并唤醒后继结点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
// 将结点状态设为0,失败则重新执行
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
// 唤醒后继结点
unparkSuccessor(h);
}
// 如果ws==0则将结点状态设为PROPAGATE(-3),失败则重新执行,说明一下,如果ws==0,就标识后继结点没有堵塞,不用唤醒
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
//如果头结点没有发生变化,表示设置完成,退出循环
//如果头结点发生变化,比如说其他线程获取到了锁,为了使自己的唤醒动作可以传递,必须进行重试
if (h == head) // loop if head changed
break;
}
}

enq

将结点入队(CLH队列)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private Node enq(final Node node) {
// CAS自旋
for (;;) {
Node t = tail;
// 初始化队列
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 将结点添加至队尾
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

findNodeFromTail

findNodeFromTail方法从尾部向前遍历CLH队列,如果检查node是否在队列中

1
2
3
4
5
6
7
8
9
10
private boolean findNodeFromTail(Node node) {
Node t = tail;
for (;;) {
if (t == node)
return true;
if (t == null)
return false;
t = t.prev;
}
}

fullGetFirstQueuedThread

返回CLH队列中第一个线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private Thread fullGetFirstQueuedThread() {
Node h, s;
Thread st;
// 满足如下条件,返回head.next.thread,考虑到并发,尝试判断两次
if (((h = head) != null && (s = h.next) != null &&
s.prev == head && (st = s.thread) != null) ||
((h = head) != null && (s = h.next) != null &&
s.prev == head && (st = s.thread) != null))
return st;
// 不满足上述条件,说明并发程度高,从队列尾部向前遍历找到最前的线程
Node t = tail;
Thread firstThread = null;
while (t != null && t != head) {
Thread tt = t.thread;
if (tt != null)
firstThread = tt;
t = t.prev;
}
return firstThread;
}

fullyRelease

释放锁(资源)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
final int fullyRelease(Node node) {
boolean failed = true;
try {
// 获取AQS的state属性
int savedState = getState();
// 调用release释放指定个锁(资源)
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
// 释放失败,将结点状态设为CANCELLED(1)
if (failed)
node.waitStatus = Node.CANCELLED;
}
}

getExclusiveQueuedThreads

ReentrantReadWriteLock的getQueuedWriterThreads调用,返回CLH队列中所有独占方式线程

1
2
3
4
5
6
7
8
9
10
11
12
public final Collection<Thread> getExclusiveQueuedThreads() {
ArrayList<Thread> list = new ArrayList<Thread>();
// 从队列尾部向前遍历,将独占方式的线程添加至集合
for (Node p = tail; p != null; p = p.prev) {
if (!p.isShared()) {
Thread t = p.thread;
if (t != null)
list.add(t);
}
}
return list;
}

getFirstQueuedThread

返回CLH队列中第一个线程

1
2
3
4
public final Thread getFirstQueuedThread() {
// handle only fast path, else relay
return (head == tail) ? null : fullGetFirstQueuedThread();
}

getQueuedThreads

返回CLH队列中所有线程

1
2
3
4
5
6
7
8
9
public final Collection<Thread> getQueuedThreads() {
ArrayList<Thread> list = new ArrayList<Thread>();
for (Node p = tail; p != null; p = p.prev) {
Thread t = p.thread;
if (t != null)
list.add(t);
}
return list;
}

getQueueLength

返回队列长度

1
2
3
4
5
6
7
8
public final int getQueueLength() {
int n = 0;
for (Node p = tail; p != null; p = p.prev) {
if (p.thread != null)
++n;
}
return n;
}

getSharedQueuedThreads

返回CLH队列中所有共享模式线程

1
2
3
4
5
6
7
8
9
10
11
public final Collection<Thread> getSharedQueuedThreads() {
ArrayList<Thread> list = new ArrayList<Thread>();
for (Node p = tail; p != null; p = p.prev) {
if (p.isShared()) {
Thread t = p.thread;
if (t != null)
list.add(t);
}
}
return list;
}

getState

1
2
3
protected final int getState() {
return state;
}

getWaitingThreads

返回Condition队列线程

1
2
3
4
5
6
public final Collection<Thread> getWaitingThreads(ConditionObject condition) {
// 判断传入的condition对象是不是该对象所创建
if (!owns(condition))
throw new IllegalArgumentException("Not owner");
return condition.getWaitingThreads();
}

getWaitQueueLength

返回Condition队列长度

1
2
3
4
5
6
public final int getWaitQueueLength(ConditionObject condition) {
// 判断传入的condition对象是不是该对象所创建
if (!owns(condition))
throw new IllegalArgumentException("Not owner");
return condition.getWaitQueueLength();
}

hasContended

返回队列是否为空

1
2
3
public final boolean hasContended() {
return head != null;
}

hasQueuedPredecessors

判断其他线程是否先于当前线程等待获取锁,可以获取就返回false,不可以获取返回true。这个方法用于公平锁,在队列不为空的情况下,CLH队列的头节点优先获取锁,其他结点要入队(排队)

  1. 队列为空(队列未初始化或者队列刚初始化时,h==t),当前线程就可以去尝试获取锁
  2. 队列不为空,,但是head.next为空,如并发下入队方法enq刚执行了compareAndSetTail(t, node)方法,但还没有执行t.next = node这种情况,当前线程也可以去尝试获取锁
  3. 队列不为空,有线程在等待获取锁,此时需要判断当前线程与head.next.thread的关系。
1
2
3
4
5
6
7
public final boolean hasQueuedPredecessors() {
Node t = tail;
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}

hasQueuedThreads

判断队列是否有线程在等待获取锁

1
2
3
public final boolean hasQueuedThreads() {
return head != tail;
}

hasWaiters

判断传入的Condition对象是不是该AQS对象所创建

1
2
3
4
5
6
public final boolean hasWaiters(ConditionObject condition) {
// 判断传入的Condition对象是不是该AQS对象所创建
if (!owns(condition))
throw new IllegalArgumentException("Not owner");
return condition.hasWaiters();
}

isHeldExclusively

返回是否是独占方式,需要子类重写

1
2
3
protected boolean isHeldExclusively() {
throw new UnsupportedOperationException();
}

isOnSyncQueue

isOnSyncQueue用于判断结点是否在CLH队列中

1
2
3
4
5
6
7
final boolean isOnSyncQueue(Node node) {    
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
if (node.next != null)
return true;
return findNodeFromTail(node);
}

isQueued

返回传入线程是否在CLH队列中

1
2
3
4
5
6
7
8
9
public final boolean isQueued(Thread thread) {
if (thread == null)
throw new NullPointerException();
// 从尾部向前遍历寻找匹配线程
for (Node p = tail; p != null; p = p.prev)
if (p.thread == thread)
return true;
return false;
}

owns

调用Condition方法判断传入的对象是不是本对象所创建

1
2
3
public final boolean owns(ConditionObject condition) {
return condition.isOwnedBy(this);
}

parkAndCheckInterrupt

堵塞当前线程并返回中断状态,中断状态会被重置

1
2
3
4
5
6
private final boolean parkAndCheckInterrupt() {
// 堵塞当前线程
LockSupport.park(this);
// 返回并重置中断状态
return Thread.interrupted();
}

release

释放指定个锁,独占锁

1
2
3
4
5
6
7
8
9
10
11
12
public final boolean release(int arg) { 
// tryRelease尝试释放锁,需要子类重写
if (tryRelease(arg)) {
// 唤醒线程去获取锁
Node h = head;
if (h != null && h.waitStatus != 0)
// 唤醒后继结点
unparkSuccessor(h);
return true;
}
return false;
}

releaseShared

释放指定个锁(资源),共享锁

1
2
3
4
5
6
7
8
9
public final boolean releaseShared(int arg) {
// tryReleaseShared尝试释放锁,需要子类重写
if (tryReleaseShared(arg)) {
// 设置结点状态并唤醒结点
doReleaseShared();
return true;
}
return false;
}

selfInterrupt

给线程设置一个中断标志,线程仍会继续运行,但这样更高级别的中断处理程序就会注意到它,并可以适当地处理它。

1
2
3
static void selfInterrupt() {
Thread.currentThread().interrupt();
}

setHead

设置头结点

1
2
3
4
5
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}

setHeadAndPropagate

设置头结点及状态

1
2
3
4
5
6
7
8
9
10
11
12
13
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
// 将node设为head并清除数据
setHead(node);
// propagate是也就是state值代表剩余资源,大于0就可以继续acquire资源
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
// 唤醒后继结点
doReleaseShared();
}
}

setState

设置state

1
2
3
protected final void setState(int newState) {
state = newState;
}

shouldParkAfterFailedAcquire

返回是否堵塞线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
// 结点已经是SIGNAL状态就返回true
if (ws == Node.SIGNAL)
return true;
// 结点状态为CANCELLED
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 结点状态为0或-3则设置为SIGNAL状态
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}

toString

AQS对象输出格式

1
2
3
4
5
6
public String toString() {
int s = getState();
String q = hasQueuedThreads() ? "non" : "";
return super.toString() +
"[State = " + s + ", " + q + "empty queue]";
}

transferAfterCancelledWait

改变结点状态并将结点加入CLH队列(可能会执行失败)。返回值表示了线程是否因为中断而被唤醒。

1
2
3
4
5
6
7
8
9
10
11
12
final boolean transferAfterCancelledWait(Node node) {
// CAS将结点状态从CONDITION(-2)设置为0并加入到CLH队列中,可能会执行失败
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
enq(node);
return true;
}
// CAS操作执行失败,说明其他线程执行了signal操作,执行下列逻辑
// 判断结点是否在CLH队列中,如果不在,线程让步
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}

transferForSignal

将结点加入CLH队列并设置前驱结点状态,由Condition的doSignal、doSignalAll两个方法调用,

1
2
3
4
5
6
7
8
9
10
11
12
13
final boolean transferForSignal(Node node) {
// CAS将当前结点状态从condition设为0
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
// 从尾部加入CLH队列并返回前驱结点
Node p = enq(node);
int ws = p.waitStatus;
// 如果前驱结点状态为CANCELLED或者CAS设置结点状态为SIGNAL(-1)失败,则唤醒该结点线程
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
// 注意将结点中的线程unpark
LockSupport.unpark(node.thread);
return true;
}

tryAcquire

独占锁,需要子类实现

1
2
3
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}

tryAcquireNanos

尝试获取锁,限时

1
2
3
4
5
6
7
8
9
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
// 响应异常
if (Thread.interrupted())
throw new InterruptedException();
// 先调用tryAcquire(arg)方法获取锁,获取失败调用doAcquireNanos CAS自旋限时获取
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}

tryAcquireShared

共享锁,需要子类实现

1
2
3
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}

tryRelease

独占锁,需要子类实现

1
2
3
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}

tryReleaseShared

共享锁,需要子类实现

1
2
3
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}

tryAcquireSharedNanos

尝试获取资源,限时

1
2
3
4
5
6
7
8
9
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
// 响应异常
if (Thread.interrupted())
throw new InterruptedException();
// 先调用tryAcquireShared(arg)方法获取锁,获取失败调用doAcquireSharedNanos CAS自旋限时获取
return tryAcquireShared(arg) >= 0 ||
doAcquireSharedNanos(arg, nanosTimeout);
}

unparkSuccessor

唤醒后继结点(一般在释放资源后调用)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private void unparkSuccessor(Node node) {
// 将结点状态设为0
int ws = node.waitStatus;
// CAS将结点状态置为0,允许失败。
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
// 如果s结点为空或者已被取消,从后向前遍历链表,找到最前的、结点状态小于等于0的结点
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 唤醒结点
if (s != null)
LockSupport.unpark(s.thread);
}
-------------本文结束感谢您的阅读-------------