前言:JUC包,Condition源码学习
Condition简介 Condition是一个多线程协调通信的接口,有await、signal等方法
ConditionObject简介 ConditionObject实现了Condition接口,是一个由Node节点组成的单向链表
Condition(条件队列),这个一个根据按照先进先出的顺序的公平队列,线程被await操作挂起后就会被放入条件队列,这个队列中的节点都被挂起,等待signal进入CLH队列再次获取锁
ConditionObject类结构
属性 1 2 3 4 5 6 7 8 private transient Node firstWaiter;private transient Node lastWaiter;private static final int REINTERRUPT = 1 ;private static final int THROW_IE = -1 ;
addConditionWaiter addConditionWaiter方法将当前线程加入条件队列,并返回新增结点
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 private Node addConditionWaiter () { Node t = lastWaiter; if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null ) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; }
await 简单描述await的作用就是使线程等待直到signal等方法唤醒线程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public final void await () throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0 ; while (!isOnSyncQueue(node)) { LockSupport.park(this ); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0 ) break ; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null ) unlinkCancelledWaiters(); if (interruptMode != 0 ) reportInterruptAfterWait(interruptMode); }
await(long time, TimeUnit unit) 功能同await,不同就是加了时间限时,TimeUnit则是时间单位
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 public final boolean await (long time, TimeUnit unit) throws InterruptedException { long nanosTimeout = unit.toNanos(time); if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); final long deadline = System.nanoTime() + nanosTimeout; boolean timedout = false ; int interruptMode = 0 ; while (!isOnSyncQueue(node)) { if (nanosTimeout <= 0L ) { timedout = transferAfterCancelledWait(node); break ; } if (nanosTimeout >= spinForTimeoutThreshold) LockSupport.parkNanos(this , nanosTimeout); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0 ) break ; nanosTimeout = deadline - System.nanoTime(); } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null ) unlinkCancelledWaiters(); if (interruptMode != 0 ) reportInterruptAfterWait(interruptMode); return !timedout; }
awaitNanos(long nanosTimeout) 同await(long time, TimeUnit unit),区别是默认时间单位为毫微秒(纳秒),1秒=1000豪秒 1毫秒=1000微秒 1微秒=1000毫微秒,1秒=10^9毫微秒(纳秒)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 public final long awaitNanos (long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); final long deadline = System.nanoTime() + nanosTimeout; int interruptMode = 0 ; while (!isOnSyncQueue(node)) { if (nanosTimeout <= 0L ) { transferAfterCancelledWait(node); break ; } if (nanosTimeout >= spinForTimeoutThreshold) LockSupport.parkNanos(this , nanosTimeout); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0 ) break ; nanosTimeout = deadline - System.nanoTime(); } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null ) unlinkCancelledWaiters(); if (interruptMode != 0 ) reportInterruptAfterWait(interruptMode); return deadline - System.nanoTime(); }
awaitUninterruptibly 不相应中断的await()
1 2 3 4 5 6 7 8 9 10 11 12 public final void awaitUninterruptibly () { Node node = addConditionWaiter(); int savedState = fullyRelease(node); boolean interrupted = false ; while (!isOnSyncQueue(node)) { LockSupport.park(this ); if (Thread.interrupted()) interrupted = true ; } if (acquireQueued(node, savedState) || interrupted) selfInterrupt(); }
awaitUntil(Date deadline) awaitUtil方法与awaitNanos方法也十分相似,只不过park操作调用的是LockSupportparkUtil方法,没有spinForTimeoutThreshold阈值的应用。返回值上同await(long time, TimeUnit unit),返回限定时间内是否被唤醒
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public final boolean awaitUntil (Date deadline) throws InterruptedException { long abstime = deadline.getTime(); if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); boolean timedout = false ; int interruptMode = 0 ; while (!isOnSyncQueue(node)) { if (System.currentTimeMillis() > abstime) { timedout = transferAfterCancelledWait(node); break ; } LockSupport.parkUntil(this , abstime); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0 ) break ; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null ) unlinkCancelledWaiters(); if (interruptMode != 0 ) reportInterruptAfterWait(interruptMode); return !timedout; }
checkInterruptWhileWaiting 检测是否中断并返回线程中断状态
1 2 3 4 5 6 7 8 9 10 private static final int REINTERRUPT = 1 ;private static final int THROW_IE = -1 ;private int checkInterruptWhileWaiting (Node node) { return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0 ; }
doSignal 执行唤醒操作:将结点从condition队列删除,然后调用AQS transferForSignal方法将结点加入CLH队列并设置状态
1 2 3 4 5 6 7 8 9 private void doSignal (Node first) { do { if ( (firstWaiter = first.nextWaiter) == null ) lastWaiter = null ; first.nextWaiter = null ; } while (!transferForSignal(first) && (first = firstWaiter) != null ); }
doSignalAll 执行唤醒操作:将Condition队列lastWaiter、firstWaiter设为null,然后遍历Condition队列,然后调用AQS
1 2 3 4 5 6 7 8 9 10 private void doSignalAll (Node first) { lastWaiter = firstWaiter = null ; do { Node next = first.nextWaiter; first.nextWaiter = null ; transferForSignal(first); first = next; } while (first != null ); }
getWaitingThreads 获取condition队列中的所有线程并返回Collection集合
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 protected final Collection<Thread> getWaitingThreads () { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); ArrayList<Thread> list = new ArrayList<Thread>(); for (Node w = firstWaiter; w != null ; w = w.nextWaiter) { if (w.waitStatus == Node.CONDITION) { Thread t = w.thread; if (t != null ) list.add(t); } } return list; }
getWaitQueueLength 获取condition队列大小
1 2 3 4 5 6 7 8 9 10 11 12 protected final int getWaitQueueLength () { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); int n = 0 ; for (Node w = firstWaiter; w != null ; w = w.nextWaiter) { if (w.waitStatus == Node.CONDITION) ++n; } return n; }
hasWaiters 判断condition队列是否有结点(结点状态需要为CONDITION)
1 2 3 4 5 6 7 8 9 10 11 protected final boolean hasWaiters () { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); for (Node w = firstWaiter; w != null ; w = w.nextWaiter) { if (w.waitStatus == Node.CONDITION) return true ; } return false ; }
isOwnedBy 当condition对象是传入参数sync对象所创建的则返回true
1 2 3 final boolean isOwnedBy (AbstractQueuedSynchronizer sync) { return sync == AbstractQueuedSynchronizer.this ; }
reportInterruptAfterWait 根据interruptMode的值,进行相应中断处理:
1 2 3 4 5 6 7 8 9 10 private void reportInterruptAfterWait (int interruptMode) throws InterruptedException { if (interruptMode == THROW_IE) throw new InterruptedException(); else if (interruptMode == REINTERRUPT) selfInterrupt(); }
signal 唤醒condition队列头结点线程
1 2 3 4 5 6 7 8 9 10 public final void signal () { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null ) doSignal(first); }
signalAll 唤醒Condition队列的所有等待线程
1 2 3 4 5 6 7 8 9 10 public final void signalAll () { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null ) doSignalAll(first); }
unlinkCancelledWaiters unlinkCancelledWaiters用于清除条件队列中结点状态不为CONDITION的节点,从而减少垃圾结点,提升性能
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 private void unlinkCancelledWaiters () { Node t = firstWaiter; Node trail = null ; while (t != null ) { Node next = t.nextWaiter; if (t.waitStatus != Node.CONDITION) { t.nextWaiter = null ; if (trail == null ) firstWaiter = next; else trail.nextWaiter = next; if (next == null ) lastWaiter = trail; } else trail = t; t = next; } }