前言:JUC包,Semaphore 源码学习
简述
Semaphore是一种信号量,也是共享锁,可以对特定资源的允许同时访问的操作数量进行控制在进行操作
常用方法
Semaphore(int permits):构造方法,创建具有给定资源数的计数信号量,默认非公平锁
Semaphore(int permits,boolean fair):构造方法,当fair等于true时,则为公平锁
void acquire():获取一个资源,获取失败则堵塞
void acquire(int n):获取n个资源,获取失败则堵塞
void release():释放一个资源
void release(int n):释放n个资源。
int availablePermits():获取当前可用的资源数。
下面两个简单实例
哲学家就餐
详情可在本博客搜哲学家就餐一文
停车问题
10个车位,100辆汽车
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
|
public class Park { public static void main(String[] args) { Semaphore semaphore = new Semaphore(10, true); ExecutorService executorService = new ThreadPoolExecutor(10, 50, 60, TimeUnit.SECONDS, new LinkedBlockingDeque<>(50), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
for (int i = 0; i < 100; i++) { final int id = i; executorService.execute(new Runnable() { @Override public void run() { try { System.out.println("当前可用车位:" + semaphore.availablePermits()); semaphore.acquire(); System.out.println("车辆" + (id + 1) + "获得车位"); Thread.sleep(2); semaphore.release(); System.out.println("车辆" + (id + 1) + "离开"); } catch (Exception e) { e.printStackTrace(); } } }); } executorService.shutdown(); } }
|
源码学习
Semaphore结构
如上图,Semaphore有3个内部类,2个成员变量,20个方法,接下来一一分析
构造方法
Semaphore有两个构造方法,初始化Semaphore时必须要指定信号量的资源数量,默认采用的是非公平锁,也可以传入布尔参数true来指定为公平锁
公平:在多个线程争用锁的情况下,公平策略倾向于将访问权授予等待时间最长的线程。也就是说,相当于有一个线程等待队列,先进入等待队列的线程后续会先获得锁,这样按照“先来后到”的原则,对于每一个等待线程都是公平的。
非公平:在多个线程争用锁的情况下,能够最终获得锁的线程是随机的(由底层OS调度)。
1 2 3 4 5 6 7
| public Semaphore(int permits) { sync = new NonfairSync(permits); }
public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); }
|
Sync类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
| abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 1192457210091910933L; Sync(int permits) { setState(permits); } final int getPermits() { return getState(); } final int nonfairTryAcquireShared(int acquires) { for (;;) { int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } protected final boolean tryReleaseShared(int releases) { for (;;) { int current = getState(); int next = current + releases; if (next < current) throw new Error("Maximum permit count exceeded"); if (compareAndSetState(current, next)) return true; } } final void reducePermits(int reductions) { for (;;) { int current = getState(); int next = current - reductions; if (next > current) throw new Error("Permit count underflow"); if (compareAndSetState(current, next)) return; } } final int drainPermits() { for (;;) { int current = getState(); if (current == 0 || compareAndSetState(current, 0)) return current; } } }
|
NonfairSync
非公平模式,使用NonfairSync类
1 2 3 4 5 6 7 8 9 10 11 12 13 14
|
static final class NonfairSync extends Sync { private static final long serialVersionUID = -2694183684443567898L; NonfairSync(int permits) { super(permits); } protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); } }
|
FairSync
公平模式,使用FairSync类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
|
static final class FairSync extends Sync { private static final long serialVersionUID = 2014338818796000944L; FairSync(int permits) { super(permits); } protected int tryAcquireShared(int acquires) { for (;;) { if (hasQueuedPredecessors()) return -1; int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } }
|
1 2 3 4 5 6 7 8 9
|
public final boolean hasQueuedPredecessors() { Node t = tail; Node h = head; Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); }
|
AQS的hasQueuedPredecessors方法分析
如果h==t成立,说明队列为空,无前驱结点,返回false。
如果h!=t成立,判断head结点的next是否为null,如果为null,返回true。这个判断是为了避免多线程中,线程第一次进入空队列但还没完成的情况,即AQS的enq方法中,compareAndSetHead(new Node())完成,还没执行tail = head方法,此时tail=null,head=new Node,head.next=null。
如果h!=t成立,head.next != null,则判断head.next是否是当前线程,如果是返回false,否则返回true,为什么用head.next来判断是否是当前线程,如果看过AQS的enq()方法源码就明白了,head是一个哨兵结点,并不存储线程
acquire
Semaphore提供了两种获取资源的方式:响应中断&不响应中断,代码如下
1 2 3 4 5 6 7 8 9 10 11 12 13
| public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1); }
public void acquire(int permits) throws InterruptedException { if (permits < 0) throw new IllegalArgumentException(); sync.acquireSharedInterruptibly(permits); }
|
1 2 3 4 5 6 7 8 9
| public void acquireUninterruptibly() { sync.acquireShared(1); }
public void acquireUninterruptibly(int permits) { if (permits < 0) throw new IllegalArgumentException(); sync.acquireShared(permits); }
|
支持中断
先看支持Interrupt中断机制的两个acquire方法,acquireSharedInterruptibly方法,首先检测中断,然后调用tryAcquireShared方法试图获取资源,这里注意tryAcquireShared被FairSync和NonfairSync两个类重写了,根据设置的是否公平就会在这里调用不同的子类方法,具体上面已有解释。如果获取资源失败,就会调用doAcquireSharedInterruptibly方法将当前线程放入等待队列并开始自旋检测获取资源
1 2 3 4 5 6 7 8
| public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }
|
doAcquireSharedInterruptibly,调用shouldParkAfterFailedAcquire方法检测是否该去park下,停止自旋。如果可以park就调用parkAndCheckInterrupt堵塞当前线程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
|
不支持中断
1 2 3 4 5
| public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }
|
doAcquireShared与上面doAcquireSharedInterruptibly方法相比,差别只是在中断的处理上,doAcquireShared不抛出异常,而是用一个局部变量interrupted记录下这个异常,然后下次循环调用selfInterrupt中断程序
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| private void doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { 设置中断标志为false boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; if (interrupted) selfInterrupt(); failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
|
release
release不分是否公平
1 2 3 4 5 6 7 8 9
| public void release() { sync.releaseShared(1); }
public void release(int permits) { if (permits < 0) throw new IllegalArgumentException(); sync.releaseShared(permits); }
|
调用AQS提供的releaseShared方法
1 2 3 4 5 6 7 8 9 10
| public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
|
AQS的doReleaseShared用于唤醒等待的线程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; } if (h == head) break; } }
|
availablePermits
调用sync的getPermits方法获取可用资源数
1 2 3
| public int availablePermits() { return sync.getPermits(); }
|
1 2 3
| final int getPermits() { return getState(); }
|
drainPermits
调用sync的drainPermits方法将资源设为0
1 2 3
| public int drainPermits() { return sync.drainPermits(); }
|
getQueuedThreads
获取等待队列中的线程
1 2 3
| protected Collection<Thread> getQueuedThreads() { return sync.getQueuedThreads(); }
|
getQueueLength
获取等待队列中的线程数量
1 2 3
| public final int getQueueLength() { return sync.getQueueLength(); }
|
hasQueuedThreads
等待队列是否为空
1 2 3
| public final boolean hasQueuedThreads() { return sync.hasQueuedThreads(); }
|
isFair()
判断是否公平
1 2 3
| public boolean isFair() { return sync instanceof FairSync; }
|
reducePermits
减少n个资源
1 2 3 4
| protected void reducePermits(int reduction) { if (reduction < 0) throw new IllegalArgumentException(); sync.reducePermits(reduction); }
|
toString
1 2 3
| public String toString() { return super.toString() + "[Permits = " + sync.getPermits() + "]"; }
|
tryAcquire
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| public boolean tryAcquire() { return sync.nonfairTryAcquireShared(1) >= 0; }
public boolean tryAcquire(int permits) { if (permits < 0) throw new IllegalArgumentException(); return sync.nonfairTryAcquireShared(permits) >= 0; }
public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException { if (permits < 0) throw new IllegalArgumentException(); return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout)); }
public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); }
|
没有指定等待时间的tryAcquire调用的是sync的nonfairTryAcquireShared方法,通过CAS自旋尝试获取资源
指定时长tryAcquire调用的是AQS的tryAcquireSharedNanos方法,方法开始先检测中断,然后调用tryAcquireShared方法试图获取资源,如果成功的话直接返回true,不成功则调用doAcquireSharedNanos方法
1 2 3 4 5 6 7 8 9
| public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); return tryAcquireShared(arg) >= 0 || doAcquireSharedNanos(arg, nanosTimeout); }
|
上面简述过doAcquireShare和doAcquireShareInterrupted两个方法,doAcquireSharedNanos跟这两个方法区别只是多了一个时间限制,该方法作用是当前线程放入等待队列并在设置时间内自旋检测获取资源
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| private boolean doAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException { if (nanosTimeout <= 0L) return false; final long deadline = System.nanoTime() + nanosTimeout; final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; failed = false; return true; } } nanosTimeout = deadline - System.nanoTime(); if (nanosTimeout <= 0L) return false; if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); if (Thread.interrupted()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
|
总结
Semaphore是一个共享锁、信号量,提供了公平&非公平两种工作模式、是否响应中断的acquire等方法,可用于控制同时访问某个特定资源的线程操作数量,或者同时执行某个指定操作的数量。还可以用来实现某种资源池限制,或者对容器施加边界
参考
Java多线程之JUC包:Semaphore源码学习笔记