前言:什么是AQS?之前我一直不懂,网上看了一些文章也是似懂非懂,所以今天我从源码学习,将这个类源码全部啃下来。
简述 AbstractQueuedSynchronizer(AQS)是一个队列同步器。是JUC锁框架中最重要的类,AQS包含了以下内容:CLH队列,Condition队列,独占式锁的获取和释放,共享锁的获取和释放以及可中断锁,超时等待锁获取这些特性的实现,另外JUC中还有AbstractQueuedLongSynchronizer类,这个与AQS相同只是把成员变量state从int变为long
AQS是一个抽象类,但是没有抽象方法,继承类需要根据要实现的锁特点重写不同的方法,如ReentrantLock独占锁要重写tryAcquire、tryRelease等方法,Semaphore共享锁要重写tryAcquireShared、tryReleaseShared等方法
内部类
内部类详情见AQS Node类 与Condition源码学习这两篇文章
方法
属性
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 private transient volatile Node head;private transient volatile Node tail;private volatile int state;static final long spinForTimeoutThreshold = 1000L ;private static final Unsafe unsafe = Unsafe.getUnsafe();private static final long stateOffset;private static final long headOffset;private static final long tailOffset;private static final long waitStatusOffset;private static final long nextOffset;static { try { stateOffset = unsafe.objectFieldOffset (AbstractQueuedSynchronizer.class.getDeclaredField("state")); headOffset = unsafe.objectFieldOffset (AbstractQueuedSynchronizer.class.getDeclaredField("head")); tailOffset = unsafe.objectFieldOffset (AbstractQueuedSynchronizer.class.getDeclaredField("tail")); waitStatusOffset = unsafe.objectFieldOffset (Node.class.getDeclaredField("waitStatus")); nextOffset = unsafe.objectFieldOffset (Node.class.getDeclaredField("next")); } catch (Exception ex) { throw new Error(ex); } }
CLH队列,CLH是(Craig, Landin, and Hagersten)三位创造者的缩写,CLH队列是CLH同步锁的一种变形。其主要从两方面进行了改造:节点的结构与节点等待机制。
在结构上,AQS类引入了头结点和尾节点,他们分别指向队列的头和尾,尝试获取锁、入队列、释放锁等实现都与头尾节点相关
在等待机制上将自旋机制改为阻塞机制,当前线程将首先检测是否为头结点且尝试获取锁,如果当前节点为头结点并成功获取锁则直接返回,当前线程不进入阻塞,否则将当前线程阻塞,直到被唤醒。
CLH队列本质是一个以链表实现的双向队列,以Node类作为节点,有哨兵结点(head结点作为哨兵,不存放线程),CLH队列是公平的,只有head.next结点能获取锁
说明:如果每次访问冲突概率小于 20%,推荐使用乐观锁,否则使用悲观锁。乐观锁的重试次数不得小于3次。
除了CLH队列,AQS中还要一个Condition队列,Condition队列是用于支持await()、signal()方法,实现堵塞、唤醒,被await()的线程会释放锁并加入Condition队列直到被signal()唤醒加入CLH队列,然后重新尝试获取锁,两者关系如下图:
acquire 获取资源,失败则将线程封装为Node结点并放入CLH队列,独占锁
1 2 3 4 5 6 public final void acquire (int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
acquireInterruptibly 能响应中断的acquire方法,独占锁
1 2 3 4 5 6 7 8 9 public final void acquireInterruptibly (int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (!tryAcquire(arg)) doAcquireInterruptibly(arg); }
acquireQueued 结点进入CLH队列后,就会调用acquireQueued方法来CAS自旋获取锁,注意不会一直自旋而且只有头结点才能获取资源(去除哨兵结点的头结点,所以CLH队列是公平的),获取锁失败就会调用shouldParkAfterFailedAcquire、parkAndCheckInterrupt方法将线程堵塞,一般第一次失败将结点状态从0设置为-1(Signal),连续失败两次就会park,当被唤醒(其他线程释放锁),结点状态会恢复为0,重新再次开始CAS自旋获取锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 final boolean acquireQueued (final Node node, int arg) { boolean failed = true ; try { boolean interrupted = false ; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null ; failed = false ; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true ; } } finally { if (failed) cancelAcquire(node); } }
acquireShared 获取资源,共享锁
1 2 3 4 5 public final void acquireShared (int arg) { if (tryAcquireShared(arg) < 0 ) doAcquireShared(arg); }
acquireSharedInterruptibly 能响应中断的acquireShared方法,共享锁
1 2 3 4 5 6 7 8 9 public final void acquireSharedInterruptibly (int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0 ) doAcquireSharedInterruptibly(arg); }
addWaiter 将当前线程封装为结点并加入CLH队列
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 private Node addWaiter (Node mode) { Node node = new Node(Thread.currentThread(), mode); Node pred = tail; if (pred != null ) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
apparentlyFirstQueuedIsExclusive 判断head.next是否正在等待独占锁,仅用于ReentrantReadWriteLock(可重入读写锁)
1 2 3 4 5 6 7 final boolean apparentlyFirstQueuedIsExclusive () { Node h, s; return (h = head) != null && (s = h.next) != null && !s.isShared() && s.thread != null ; }
头结点不为空、头结点的下一个结点不为空并且不是共享方式(独占方式,写锁)、线程不为空,则返回true
cancelAcquire cancelAcquire用于acquireQueued、doAcquireInterruptibly、doAcquireNanos、doAcquireShared、doAcquireSharedInterruptibly、doAcquireSharedNanos这六个方法的finally语句,当获取锁失败并出现异常时清除结点(获取锁成功,上面6个方法会自己清除结点)
我对这个方法的理解:结束Acquire方法并清除结点
清除结点分为两部分:1、清除结点数据,2、结点从CLH队列删除(出队)
删除结点需要考虑四种情况:结点是tail、结点是head、结点是head.next、结点是中间节点
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 private void cancelAcquire (Node node) { if (node == null ) return ; node.thread = null ; Node pred = node.prev; while (pred.waitStatus > 0 ) node.prev = pred = pred.prev; Node predNext = pred.next; node.waitStatus = Node.CANCELLED; if (node == tail && compareAndSetTail(node, pred)) { compareAndSetNext(pred, predNext, null ); } else { if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null ) { Node next = node.next; if (next != null && next.waitStatus <= 0 ) compareAndSetNext(pred, predNext, next); } else { unparkSuccessor(node); } node.next = node; } }
compareAndSet方法系列 5个CAS方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 private final boolean compareAndSetHead (Node update) { return unsafe.compareAndSwapObject(this , headOffset, null , update); } private static final boolean compareAndSetNext (Node node, Node expect, Node update) { return unsafe.compareAndSwapObject(node, nextOffset, expect, update); } protected final boolean compareAndSetState (int expect, int update) { return unsafe.compareAndSwapInt(this , stateOffset, expect, update); } private final boolean compareAndSetTail (Node expect, Node update) { return unsafe.compareAndSwapObject(this , tailOffset, expect, update); } private static final boolean compareAndSetWaitStatus (Node node, int expect, int update) { return unsafe.compareAndSwapInt(node, waitStatusOffset, expect, update); }
doAcquireInterruptibly 由acquireInterruptibly方法调用,类似acquireQueued,不同在于能响应中断
方法功能:将结点加入CLH队列并CAS获取锁(独占锁)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 private void doAcquireInterruptibly (int arg) throws InterruptedException { final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true ; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null ; failed = false ; return ; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
doAcquireNanos 同acquireInterruptibly,只由tryAcquireNanos方法调用
功能:将结点加入CLH队列并先限时CAS获取锁(独占锁)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 private boolean doAcquireNanos (int arg, long nanosTimeout) throws InterruptedException { if (nanosTimeout <= 0L ) return false ; final long deadline = System.nanoTime() + nanosTimeout; final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true ; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null ; failed = false ; return true ; } nanosTimeout = deadline - System.nanoTime(); if (nanosTimeout <= 0L ) return false ; if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) LockSupport.parkNanos(this , nanosTimeout); if (Thread.interrupted()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
doAcquireShared 由acquireShared调用,不响应中断
功能:将结点加入CLH队列并先CAS获取资源(共享锁)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 private void doAcquireShared (int arg) { final Node node = addWaiter(Node.SHARED); boolean failed = true ; try { boolean interrupted = false ; for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0 ) { setHeadAndPropagate(node, r); p.next = null ; if (interrupted) selfInterrupt(); failed = false ; return ; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true ; } } finally { if (failed) cancelAcquire(node); } }
doAcquireSharedInterruptibly 同doAcquireShared,响应中断
功能:将结点加入CLH队列并先CAS获取资源(共享锁)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 private void doAcquireSharedInterruptibly (int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); boolean failed = true ; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0 ) { setHeadAndPropagate(node, r); p.next = null ; failed = false ; return ; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
doAcquireSharedNanos 同doAcquireSharedInterruptibly,限时
功能:将结点加入CLH队列并先CAS获取资源(共享锁)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 private boolean doAcquireSharedNanos (int arg, long nanosTimeout) throws InterruptedException { if (nanosTimeout <= 0L ) return false ; final long deadline = System.nanoTime() + nanosTimeout; final Node node = addWaiter(Node.SHARED); boolean failed = true ; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0 ) { setHeadAndPropagate(node, r); p.next = null ; failed = false ; return true ; } } nanosTimeout = deadline - System.nanoTime(); if (nanosTimeout <= 0L ) return false ; if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) LockSupport.parkNanos(this , nanosTimeout); if (Thread.interrupted()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
doReleaseShared 在资源释放后执行,用于设置结点状态并唤醒后继结点
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 private void doReleaseShared () { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0 )) continue ; unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0 , Node.PROPAGATE)) continue ; } if (h == head) break ; } }
enq 将结点入队(CLH队列)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 private Node enq (final Node node) { for (;;) { Node t = tail; if (t == null ) { if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
findNodeFromTail findNodeFromTail方法从尾部向前遍历CLH队列,如果检查node是否在队列中
1 2 3 4 5 6 7 8 9 10 private boolean findNodeFromTail (Node node) { Node t = tail; for (;;) { if (t == node) return true ; if (t == null ) return false ; t = t.prev; } }
fullGetFirstQueuedThread 返回CLH队列中第一个线程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 private Thread fullGetFirstQueuedThread () { Node h, s; Thread st; if (((h = head) != null && (s = h.next) != null && s.prev == head && (st = s.thread) != null ) || ((h = head) != null && (s = h.next) != null && s.prev == head && (st = s.thread) != null )) return st; Node t = tail; Thread firstThread = null ; while (t != null && t != head) { Thread tt = t.thread; if (tt != null ) firstThread = tt; t = t.prev; } return firstThread; }
fullyRelease 释放锁(资源)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 final int fullyRelease (Node node) { boolean failed = true ; try { int savedState = getState(); if (release(savedState)) { failed = false ; return savedState; } else { throw new IllegalMonitorStateException(); } } finally { if (failed) node.waitStatus = Node.CANCELLED; } }
getExclusiveQueuedThreads ReentrantReadWriteLock的getQueuedWriterThreads调用,返回CLH队列中所有独占方式线程
1 2 3 4 5 6 7 8 9 10 11 12 public final Collection<Thread> getExclusiveQueuedThreads () { ArrayList<Thread> list = new ArrayList<Thread>(); for (Node p = tail; p != null ; p = p.prev) { if (!p.isShared()) { Thread t = p.thread; if (t != null ) list.add(t); } } return list; }
getFirstQueuedThread 返回CLH队列中第一个线程
1 2 3 4 public final Thread getFirstQueuedThread () { return (head == tail) ? null : fullGetFirstQueuedThread(); }
getQueuedThreads 返回CLH队列中所有线程
1 2 3 4 5 6 7 8 9 public final Collection<Thread> getQueuedThreads () { ArrayList<Thread> list = new ArrayList<Thread>(); for (Node p = tail; p != null ; p = p.prev) { Thread t = p.thread; if (t != null ) list.add(t); } return list; }
getQueueLength 返回队列长度
1 2 3 4 5 6 7 8 public final int getQueueLength () { int n = 0 ; for (Node p = tail; p != null ; p = p.prev) { if (p.thread != null ) ++n; } return n; }
getSharedQueuedThreads 返回CLH队列中所有共享模式线程
1 2 3 4 5 6 7 8 9 10 11 public final Collection<Thread> getSharedQueuedThreads () { ArrayList<Thread> list = new ArrayList<Thread>(); for (Node p = tail; p != null ; p = p.prev) { if (p.isShared()) { Thread t = p.thread; if (t != null ) list.add(t); } } return list; }
getState 1 2 3 protected final int getState () { return state; }
getWaitingThreads 返回Condition队列线程
1 2 3 4 5 6 public final Collection<Thread> getWaitingThreads (ConditionObject condition) { if (!owns(condition)) throw new IllegalArgumentException("Not owner" ); return condition.getWaitingThreads(); }
getWaitQueueLength 返回Condition队列长度
1 2 3 4 5 6 public final int getWaitQueueLength (ConditionObject condition) { if (!owns(condition)) throw new IllegalArgumentException("Not owner" ); return condition.getWaitQueueLength(); }
hasContended 返回队列是否为空
1 2 3 public final boolean hasContended () { return head != null ; }
hasQueuedPredecessors 判断其他线程是否先于当前线程等待获取锁,可以获取就返回false,不可以获取返回true。这个方法用于公平锁,在队列不为空的情况下,CLH队列的头节点优先获取锁,其他结点要入队(排队)
队列为空(队列未初始化或者队列刚初始化时,h==t),当前线程就可以去尝试获取锁
队列不为空,,但是head.next为空,如并发下入队方法enq刚执行了compareAndSetTail(t, node)方法,但还没有执行t.next = node这种情况,当前线程也可以去尝试获取锁
队列不为空,有线程在等待获取锁,此时需要判断当前线程与head.next.thread的关系。
1 2 3 4 5 6 7 public final boolean hasQueuedPredecessors () { Node t = tail; Node h = head; Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); }
hasQueuedThreads 判断队列是否有线程在等待获取锁
1 2 3 public final boolean hasQueuedThreads () { return head != tail; }
hasWaiters 判断传入的Condition对象是不是该AQS对象所创建
1 2 3 4 5 6 public final boolean hasWaiters (ConditionObject condition) { if (!owns(condition)) throw new IllegalArgumentException("Not owner" ); return condition.hasWaiters(); }
isHeldExclusively 返回是否是独占方式,需要子类重写
1 2 3 protected boolean isHeldExclusively () { throw new UnsupportedOperationException(); }
isOnSyncQueue isOnSyncQueue用于判断结点是否在CLH队列中
1 2 3 4 5 6 7 final boolean isOnSyncQueue (Node node) { if (node.waitStatus == Node.CONDITION || node.prev == null ) return false ; if (node.next != null ) return true ; return findNodeFromTail(node); }
isQueued 返回传入线程是否在CLH队列中
1 2 3 4 5 6 7 8 9 public final boolean isQueued (Thread thread) { if (thread == null ) throw new NullPointerException(); for (Node p = tail; p != null ; p = p.prev) if (p.thread == thread) return true ; return false ; }
owns 调用Condition方法判断传入的对象是不是本对象所创建
1 2 3 public final boolean owns (ConditionObject condition) { return condition.isOwnedBy(this ); }
parkAndCheckInterrupt 堵塞当前线程并返回中断状态,中断状态会被重置
1 2 3 4 5 6 private final boolean parkAndCheckInterrupt () { LockSupport.park(this ); return Thread.interrupted(); }
release 释放指定个锁,独占锁
1 2 3 4 5 6 7 8 9 10 11 12 public final boolean release (int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0 ) unparkSuccessor(h); return true ; } return false ; }
releaseShared 释放指定个锁(资源),共享锁
1 2 3 4 5 6 7 8 9 public final boolean releaseShared (int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true ; } return false ; }
selfInterrupt 给线程设置一个中断标志,线程仍会继续运行,但这样更高级别的中断处理程序就会注意到它,并可以适当地处理它。
1 2 3 static void selfInterrupt () { Thread.currentThread().interrupt(); }
setHead 设置头结点
1 2 3 4 5 private void setHead (Node node) { head = node; node.thread = null ; node.prev = null ; }
setHeadAndPropagate 设置头结点及状态
1 2 3 4 5 6 7 8 9 10 11 12 13 private void setHeadAndPropagate (Node node, int propagate) { Node h = head; setHead(node); if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0 ) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } }
setState 设置state
1 2 3 protected final void setState (int newState) { state = newState; }
shouldParkAfterFailedAcquire 返回是否堵塞线程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 private static boolean shouldParkAfterFailedAcquire (Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) return true ; if (ws > 0 ) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0 ); pred.next = node; } else { compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false ; }
toString AQS对象输出格式
1 2 3 4 5 6 public String toString () { int s = getState(); String q = hasQueuedThreads() ? "non" : "" ; return super .toString() + "[State = " + s + ", " + q + "empty queue]" ; }
transferAfterCancelledWait 改变结点状态并将结点加入CLH队列(可能会执行失败)。返回值表示了线程是否因为中断而被唤醒。
1 2 3 4 5 6 7 8 9 10 11 12 final boolean transferAfterCancelledWait (Node node) { if (compareAndSetWaitStatus(node, Node.CONDITION, 0 )) { enq(node); return true ; } while (!isOnSyncQueue(node)) Thread.yield(); return false ; }
transferForSignal 将结点加入CLH队列并设置前驱结点状态,由Condition的doSignal、doSignalAll两个方法调用,
1 2 3 4 5 6 7 8 9 10 11 12 13 final boolean transferForSignal (Node node) { if (!compareAndSetWaitStatus(node, Node.CONDITION, 0 )) return false ; Node p = enq(node); int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true ; }
tryAcquire 独占锁,需要子类实现
1 2 3 protected boolean tryAcquire (int arg) { throw new UnsupportedOperationException(); }
tryAcquireNanos 尝试获取锁,限时
1 2 3 4 5 6 7 8 9 public final boolean tryAcquireNanos (int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout); }
tryAcquireShared 共享锁,需要子类实现
1 2 3 protected int tryAcquireShared (int arg) { throw new UnsupportedOperationException(); }
tryRelease 独占锁,需要子类实现
1 2 3 protected boolean tryRelease (int arg) { throw new UnsupportedOperationException(); }
tryReleaseShared 共享锁,需要子类实现
1 2 3 protected boolean tryReleaseShared (int arg) { throw new UnsupportedOperationException(); }
tryAcquireSharedNanos 尝试获取资源,限时
1 2 3 4 5 6 7 8 9 public final boolean tryAcquireSharedNanos (int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); return tryAcquireShared(arg) >= 0 || doAcquireSharedNanos(arg, nanosTimeout); }
unparkSuccessor 唤醒后继结点(一般在释放资源后调用)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 private void unparkSuccessor (Node node) { int ws = node.waitStatus; if (ws < 0 ) compareAndSetWaitStatus(node, ws, 0 ); Node s = node.next; if (s == null || s.waitStatus > 0 ) { s = null ; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0 ) s = t; } if (s != null ) LockSupport.unpark(s.thread); }