0%

Condition源码学习

前言:JUC包,Condition源码学习

Condition简介

Condition是一个多线程协调通信的接口,有await、signal等方法

image-20191230124444197

ConditionObject简介

ConditionObject实现了Condition接口,是一个由Node节点组成的单向链表

Condition(条件队列),这个一个根据按照先进先出的顺序的公平队列,线程被await操作挂起后就会被放入条件队列,这个队列中的节点都被挂起,等待signal进入CLH队列再次获取锁

image-20191230124459019

ConditionObject类结构

image-20191230125458328

属性
1
2
3
4
5
6
7
8
// condition队列头结点
private transient Node firstWaiter;
// condition队列尾结点
private transient Node lastWaiter;
// 标识:发生中断但不抛出异常,
private static final int REINTERRUPT = 1;
// 标识:发生中断并且后续要抛出异常
private static final int THROW_IE = -1;
addConditionWaiter

addConditionWaiter方法将当前线程加入条件队列,并返回新增结点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 添加结点至队尾
private Node addConditionWaiter() {
Node t = lastWaiter;
// 如果尾结点不是CONDITION状态,执行清除操作
if (t != null && t.waitStatus != Node.CONDITION) {
// 清除条件队列中结点状态不为CONDITION 的节点
unlinkCancelledWaiters();
t = lastWaiter;
}
// 以当前线程为参数新建结点
Node node = new Node(Thread.currentThread(), Node.CONDITION);
// 初始化队列或者添加结点至队尾
if (t == null)
// 条件队列没有哨兵结点
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
await

简单描述await的作用就是使线程等待直到signal等方法唤醒线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public final void await() throws InterruptedException {
// 及时响应中断
if (Thread.interrupted())
throw new InterruptedException();
// 将当前线程构造成条件节点加入condition条件队列尾部,并返回新增结点
Node node = addConditionWaiter();
// 释放当前线程锁(资源)
int savedState = fullyRelease(node);
// 中断标识
int interruptMode = 0;
// 线程等待,isOnSyncQueue方法用于判断是否在CLH队列中,当在CLH队列中,说明该线程从condition条件队列移除并唤醒,返回true,结束循环
while (!isOnSyncQueue(node)) {
// 将当前线程堵塞
LockSupport.park(this);
// 检测是否发生中断
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 使用acquireQueued方法CAS获取锁(资源)
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
// 清除结点
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
// 根据interruptMode进行中断处理
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
await(long time, TimeUnit unit)

功能同await,不同就是加了时间限时,TimeUnit则是时间单位

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public final boolean await(long time, TimeUnit unit)
throws InterruptedException {
// 根据时间单位转换为毫微秒(纳秒)
long nanosTimeout = unit.toNanos(time);
// 及时响应中断
if (Thread.interrupted())
throw new InterruptedException();
// 将当前线程构造成条件节点加入condition条件队列尾部,并返回新增结点
Node node = addConditionWaiter();
// 释放当前线程锁(资源)
int savedState = fullyRelease(node);
// 结束时间
final long deadline = System.nanoTime() + nanosTimeout;
// 超时
boolean timedout = false;
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
// 超时结束
if (nanosTimeout <= 0L) {
timedout = transferAfterCancelledWait(node);
break;
}
// 剩余时间大于等于spinForTimeoutThreshold阈值,则将当前线程堵塞,如果剩余时间小于阈值就没必要堵塞,继续循环就好(反正即将结束,执行park方法反而浪费性能)
if (nanosTimeout >= spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
// 检测是否发生中断
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
// 计算剩余时间
nanosTimeout = deadline - System.nanoTime();
}
// 使用acquireQueued方法CAS获取锁(资源)
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
// 清除结点
if (node.nextWaiter != null)
unlinkCancelledWaiters();
// 根据interruptMode进行中断处理
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
// 返回限定时间内是否被唤醒
return !timedout;
}
awaitNanos(long nanosTimeout)

同await(long time, TimeUnit unit),区别是默认时间单位为毫微秒(纳秒),1秒=1000豪秒 1毫秒=1000微秒 1微秒=1000毫微秒,1秒=10^9毫微秒(纳秒)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public final long awaitNanos(long nanosTimeout)
throws InterruptedException {
// 及时响应中断
if (Thread.interrupted())
throw new InterruptedException();
// 将当前线程构造成条件节点加入condition条件队列尾部,并返回新增结点
Node node = addConditionWaiter();
// 释放当前线程锁(资源)
int savedState = fullyRelease(node);
// 结束时间
final long deadline = System.nanoTime() + nanosTimeout;
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
// 超时结束
if (nanosTimeout <= 0L) {
transferAfterCancelledWait(node);
break;
}
// 剩余时间大于等于spinForTimeoutThreshold阈值,则将当前线程堵塞,如果剩余时间小于阈值就没必要堵塞,继续循环就好(反正即将结束,执行park方法反而浪费性能)
if (nanosTimeout >= spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
// 检测是否发生中断
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
// 计算剩余时间
nanosTimeout = deadline - System.nanoTime();
}
// 使用acquireQueued方法CAS获取锁(资源)
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
// 清除结点
if (node.nextWaiter != null)
unlinkCancelledWaiters();
// 根据interruptMode进行中断处理
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
// 返回唤醒后的剩余等待时间
return deadline - System.nanoTime();
}
awaitUninterruptibly

不相应中断的await()

1
2
3
4
5
6
7
8
9
10
11
12
public final void awaitUninterruptibly() {
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
boolean interrupted = false;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if (Thread.interrupted())
interrupted = true;
}
if (acquireQueued(node, savedState) || interrupted)
selfInterrupt();
}
awaitUntil(Date deadline)

awaitUtil方法与awaitNanos方法也十分相似,只不过park操作调用的是LockSupportparkUtil方法,没有spinForTimeoutThreshold阈值的应用。返回值上同await(long time, TimeUnit unit),返回限定时间内是否被唤醒

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public final boolean awaitUntil(Date deadline)
throws InterruptedException {
long abstime = deadline.getTime();
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
boolean timedout = false;
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
if (System.currentTimeMillis() > abstime) {
timedout = transferAfterCancelledWait(node);
break;
}
LockSupport.parkUntil(this, abstime);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return !timedout;
}
checkInterruptWhileWaiting

检测是否中断并返回线程中断状态

1
2
3
4
5
6
7
8
9
10
private static final int REINTERRUPT =  1;

private static final int THROW_IE = -1;

private int checkInterruptWhileWaiting(Node node) {
// 线程没有中断返回0,线程中断并需要抛出中断异常返回-1(THROW_IE),中断但不抛出,而是“补上”中断操作返回1(REINTERRUPT)
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
doSignal

执行唤醒操作:将结点从condition队列删除,然后调用AQS transferForSignal方法将结点加入CLH队列并设置状态

1
2
3
4
5
6
7
8
9
private void doSignal(Node first) {
do {
// 将头结点从队列中去除
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
doSignalAll

执行唤醒操作:将Condition队列lastWaiter、firstWaiter设为null,然后遍历Condition队列,然后调用AQS

1
2
3
4
5
6
7
8
9
10
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
// 遍历Condition队列,对每一个结点调用AQS transferForSignal方法将结点加入CLH队列并设置状态
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}
getWaitingThreads

获取condition队列中的所有线程并返回Collection集合

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
protected final Collection<Thread> getWaitingThreads() {
// 检测当前线程是否有独占锁,没有锁抛出异常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 创建以Thread为元素的List集合
ArrayList<Thread> list = new ArrayList<Thread>();
// 遍历链表
for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
if (w.waitStatus == Node.CONDITION) {
// 将线程添加至集合
Thread t = w.thread;
if (t != null)
list.add(t);
}
}
return list;
}
getWaitQueueLength

获取condition队列大小

1
2
3
4
5
6
7
8
9
10
11
12
protected final int getWaitQueueLength() {
// 检测当前线程是否有独占锁,没有锁抛出异常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 遍历链表并计数
int n = 0;
for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
if (w.waitStatus == Node.CONDITION)
++n;
}
return n;
}
hasWaiters

判断condition队列是否有结点(结点状态需要为CONDITION)

1
2
3
4
5
6
7
8
9
10
11
protected final boolean hasWaiters() {
// 检测当前线程是否有独占锁,没有锁抛出异常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 遍历链表,找到CONDITION状态的结点返回true
for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
if (w.waitStatus == Node.CONDITION)
return true;
}
return false;
}
isOwnedBy

当condition对象是传入参数sync对象所创建的则返回true

1
2
3
final boolean isOwnedBy(AbstractQueuedSynchronizer sync) {
return sync == AbstractQueuedSynchronizer.this;
}
reportInterruptAfterWait

根据interruptMode的值,进行相应中断处理:

1
2
3
4
5
6
7
8
9
10
private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException {
// interruptMode为THROW_IE(-1) 抛出异常
if (interruptMode == THROW_IE)
throw new InterruptedException();
// interruptMode为REINTERRUPT(1) 抛出异常
else if (interruptMode == REINTERRUPT)
// AQS 线程中断(只是给线程设置一个中断标志)
selfInterrupt();
}
signal

唤醒condition队列头结点线程

1
2
3
4
5
6
7
8
9
10
public final void signal() {
// 检测当前线程是否有独占锁,没有锁抛出异常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 获取condition条件队列头结点
Node first = firstWaiter;
// 调用doSignal方法进行唤醒操作
if (first != null)
doSignal(first);
}
signalAll

唤醒Condition队列的所有等待线程

1
2
3
4
5
6
7
8
9
10
public final void signalAll() {
// 检测当前线程是否有独占锁,没有锁抛出异常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 获取condition条件队列头结点
Node first = firstWaiter;
// 调用doSignalAll方法进行唤醒操作
if (first != null)
doSignalAll(first);
}
unlinkCancelledWaiters

unlinkCancelledWaiters用于清除条件队列中结点状态不为CONDITION的节点,从而减少垃圾结点,提升性能

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private void unlinkCancelledWaiters() {
// 获取头结点
Node t = firstWaiter;
Node trail = null;
// 遍历链表
while (t != null) {
Node next = t.nextWaiter;
// 如果t结点不为CONDITION则清除
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
// trail == null说明是循环执行的第一次,t为头结点,则清除头结点
if (trail == null)
firstWaiter = next;
else
trail.nextWaiter = next;
// next结点为空说明t是尾结点
if (next == null)
lastWaiter = trail;
}
else
trail = t;
t = next;
}
}
-------------本文结束感谢您的阅读-------------