0%

Semaphore源码学习

前言:JUC包,Semaphore 源码学习

简述

Semaphore是一种信号量,也是共享锁,可以对特定资源的允许同时访问的操作数量进行控制在进行操作

常用方法

Semaphore(int permits):构造方法,创建具有给定资源数的计数信号量,默认非公平锁

Semaphore(int permits,boolean fair):构造方法,当fair等于true时,则为公平锁

void acquire():获取一个资源,获取失败则堵塞

void acquire(int n):获取n个资源,获取失败则堵塞

void release():释放一个资源

void release(int n):释放n个资源。

int availablePermits():获取当前可用的资源数。

下面两个简单实例

哲学家就餐
详情可在本博客搜哲学家就餐一文

停车问题

10个车位,100辆汽车

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/**
* @author shency
* @description: TODO
* @date: 2019/11/19
*/
public class Park {
public static void main(String[] args) {
// 10个车位
Semaphore semaphore = new Semaphore(10, true);
ExecutorService executorService = new ThreadPoolExecutor(10, 50, 60, TimeUnit.SECONDS,
new LinkedBlockingDeque<>(50), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());

for (int i = 0; i < 100; i++) {
final int id = i;
executorService.execute(new Runnable() {
@Override
public void run() {
try {
System.out.println("当前可用车位:" + semaphore.availablePermits());
semaphore.acquire();
System.out.println("车辆" + (id + 1) + "获得车位");
Thread.sleep(2);
semaphore.release();
System.out.println("车辆" + (id + 1) + "离开");
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
executorService.shutdown();
}
}

源码学习

Semaphore结构

image-20191230002635381

如上图,Semaphore有3个内部类,2个成员变量,20个方法,接下来一一分析

构造方法

Semaphore有两个构造方法,初始化Semaphore时必须要指定信号量的资源数量,默认采用的是非公平锁,也可以传入布尔参数true来指定为公平锁

公平:在多个线程争用锁的情况下,公平策略倾向于将访问权授予等待时间最长的线程。也就是说,相当于有一个线程等待队列,先进入等待队列的线程后续会先获得锁,这样按照“先来后到”的原则,对于每一个等待线程都是公平的。

非公平:在多个线程争用锁的情况下,能够最终获得锁的线程是随机的(由底层OS调度)。

1
2
3
4
5
6
7
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}

public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

Sync类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
// 抽象内部静态类,继承AQS
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933L;
// 构造方法,指定AQS的state值
Sync(int permits) {
setState(permits);
}

final int getPermits() {
return getState();
}
// CAS自旋,为什么不获取失败直接返回?因为在还有剩余资源的情况下,一个线程因为竞争导致CAS失败后被放入等待序列尾部,则一定在队列头部有一个线程被唤醒去试图获取资源,这比自旋多了操作等待队列的开销,效率降低
// 非公平模式,共享方式,尝试获取n个资源,成功获取返回剩余资源数量,或者发现没有资源返回负值代表获取失败
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
// CAS自旋,尝试释放资源,传入参数不能为负数
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
// CAS自旋,减少n个资源,传入参数不能为负数
final void reducePermits(int reductions) {
for (;;) {
int current = getState();
int next = current - reductions;
if (next > current) // underflow
throw new Error("Permit count underflow");
if (compareAndSetState(current, next))
return;
}
}
// CAS自旋,将资源设为0
final int drainPermits() {
for (;;) {
int current = getState();
if (current == 0 || compareAndSetState(current, 0))
return current;
}
}
}

NonfairSync

非公平模式,使用NonfairSync类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* NonFair version
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
// 调用Sync父类构造初始化资源
NonfairSync(int permits) {
super(permits);
}
// 调用Sync 的 nonfairTryAcquireShared方法
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}

FairSync

公平模式,使用FairSync类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* Fair version
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;
// 调用Sync父类构造初始化资源
FairSync(int permits) {
super(permits);
}
// CAS自旋
// 公平模式,先来先服务,使用AQS类的hasQueuedPredecessors来判断表示有其他线程先于当前线程等待获取锁,如果是,返回-1,获取失败
// 如果当前线程获取锁的优先级最高则返回false,尝试获取n个资源,成功获取返回剩余资源数量,或者发现没有资源返回负值代表获取失败
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
1
2
3
4
5
6
7
8
9

// AQS hasQueuedPredecessors方法
public final boolean hasQueuedPredecessors() {
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}

AQS的hasQueuedPredecessors方法分析

如果h==t成立,说明队列为空,无前驱结点,返回false。

如果h!=t成立,判断head结点的next是否为null,如果为null,返回true。这个判断是为了避免多线程中,线程第一次进入空队列但还没完成的情况,即AQS的enq方法中,compareAndSetHead(new Node())完成,还没执行tail = head方法,此时tail=null,head=new Node,head.next=null。

如果h!=t成立,head.next != null,则判断head.next是否是当前线程,如果是返回false,否则返回true,为什么用head.next来判断是否是当前线程,如果看过AQS的enq()方法源码就明白了,head是一个哨兵结点,并不存储线程

acquire

Semaphore提供了两种获取资源的方式:响应中断&不响应中断,代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
// 获取1个资源,支持Interrupt中断机制
public void acquire() throws InterruptedException {
// 调用AQS的acquireSharedInterruptibly方法
sync.acquireSharedInterruptibly(1);
}

// 获取n个资源,支持Interrupt中断机制
public void acquire(int permits) throws InterruptedException {
// 参数为负,抛出非法参数异常
if (permits < 0) throw new IllegalArgumentException();
// 调用AQS的acquireSharedInterruptibly方法
sync.acquireSharedInterruptibly(permits);
}
1
2
3
4
5
6
7
8
9
// 获取1个资源,不支持Interrupt中断机制
public void acquireUninterruptibly() {
sync.acquireShared(1);
}
// 获取n个资源,不支持Interrupt中断机制
public void acquireUninterruptibly(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireShared(permits);
}

支持中断

先看支持Interrupt中断机制的两个acquire方法,acquireSharedInterruptibly方法,首先检测中断,然后调用tryAcquireShared方法试图获取资源,这里注意tryAcquireShared被FairSync和NonfairSync两个类重写了,根据设置的是否公平就会在这里调用不同的子类方法,具体上面已有解释。如果获取资源失败,就会调用doAcquireSharedInterruptibly方法将当前线程放入等待队列并开始自旋检测获取资源

1
2
3
4
5
6
7
8
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
// 检测中断
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}

doAcquireSharedInterruptibly,调用shouldParkAfterFailedAcquire方法检测是否该去park下,停止自旋。如果可以park就调用parkAndCheckInterrupt堵塞当前线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
// 调用addWaiter方法将当前线程放入等待队列
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
// CAS自旋
for (;;) {
// 获取前置节点
final Node p = node.predecessor();
// 如果前置节点为head,则说明当前为head.next即这个双向队列的第一个元素
if (p == head) {
// 尝试获取资源
int r = tryAcquireShared(arg);
// 获取资源成功,将当前线程从等待队列中去除
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
// 检测是否可以park(让这个线程堵塞,避免一直都是这个线程CAS自旋,让等待队列中其他的线程也能抢资源),如果可以调用parkAndCheckInterrupt并返回中断状态,如果中断状态为true,则报中断异常
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}

不支持中断

1
2
3
4
5
// 先根据是否公平调用相应的被重写的tryAcquireShared方法尝试获取资源,获取失败调用AQS的doAcquireShared
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}

doAcquireShared与上面doAcquireSharedInterruptibly方法相比,差别只是在中断的处理上,doAcquireShared不抛出异常,而是用一个局部变量interrupted记录下这个异常,然后下次循环调用selfInterrupt中断程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
private void doAcquireShared(int arg) {
// 调用addWaiter方法将当前线程放入等待队列
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
设置中断标志为false
boolean interrupted = false;
// CAS自旋
for (;;) {
// 获取前置节点
final Node p = node.predecessor();
// 如果前置节点为head,则说明当前为head.next即这个双向队列的第一个元素
if (p == head) {
// 尝试获取资源
int r = tryAcquireShared(arg);
// 获取资源成功,将当前线程从等待队列中去除
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
// 如果中断标志为false,则进行中断处理
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
// 检测是否可以park(让这个线程堵塞,避免一直都是这个线程CAS自旋,让等待队列中其他的线程也能抢资源),如果可以调用parkAndCheckInterrupt并返回中断状态,如果中断状态为true,则将局部变量的中断标志设为false
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

release

release不分是否公平

1
2
3
4
5
6
7
8
9
// 释放资源
public void release() {
sync.releaseShared(1);
}
// 释放N个资源
public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.releaseShared(permits);
}

调用AQS提供的releaseShared方法

1
2
3
4
5
6
7
8
9
10
// 释放共享资源
public final boolean releaseShared(int arg) {
// 调用Sync重写的tryReleaseShared方法,尝试释放资源
if (tryReleaseShared(arg)) {
// 释放资源成功,调用AQS的doReleaseShared方法唤醒等待的线程(因为有资源被释放,所以应该让)
doReleaseShared();
return true;
}
return false;
}

AQS的doReleaseShared用于唤醒等待的线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private void doReleaseShared() {
for (;;) {
Node h = head;
// head不为空且不为尾结点(队列只有一个结点的情况)
if (h != null && h != tail) {
// 获取线程状态
int ws = h.waitStatus;
// SIGNAL表示堵塞状态,值为-1
if (ws == Node.SIGNAL) {
// 设置头结点为0,设置失败continue
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
// 唤醒等待队列中下一个线程
unparkSuccessor(h);
}
// 如果头结点为0,则设置为PROPAGATE(-3)状态
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
// 如果头结点变了,就继续循环,头结点没变,就不用重复处理了
if (h == head) // loop if head changed
break;
}
}

availablePermits

调用sync的getPermits方法获取可用资源数

1
2
3
public int availablePermits() {
return sync.getPermits();
}
1
2
3
final int getPermits() {
return getState();
}

drainPermits

调用sync的drainPermits方法将资源设为0

1
2
3
public int drainPermits() {
return sync.drainPermits();
}

getQueuedThreads

获取等待队列中的线程

1
2
3
protected Collection<Thread> getQueuedThreads() {
return sync.getQueuedThreads();
}

getQueueLength

获取等待队列中的线程数量

1
2
3
public final int getQueueLength() {
return sync.getQueueLength();
}

hasQueuedThreads

等待队列是否为空

1
2
3
public final boolean hasQueuedThreads() {
return sync.hasQueuedThreads();
}

isFair()

判断是否公平

1
2
3
public boolean isFair() {
return sync instanceof FairSync;
}

reducePermits

减少n个资源

1
2
3
4
protected void reducePermits(int reduction) {
if (reduction < 0) throw new IllegalArgumentException();
sync.reducePermits(reduction);
}

toString

1
2
3
public String toString() {
return super.toString() + "[Permits = " + sync.getPermits() + "]";
}

tryAcquire

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public boolean tryAcquire() {
return sync.nonfairTryAcquireShared(1) >= 0;
}

public boolean tryAcquire(int permits) {
if (permits < 0) throw new IllegalArgumentException();
return sync.nonfairTryAcquireShared(permits) >= 0;
}

public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
}

public boolean tryAcquire(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

没有指定等待时间的tryAcquire调用的是sync的nonfairTryAcquireShared方法,通过CAS自旋尝试获取资源

指定时长tryAcquire调用的是AQS的tryAcquireSharedNanos方法,方法开始先检测中断,然后调用tryAcquireShared方法试图获取资源,如果成功的话直接返回true,不成功则调用doAcquireSharedNanos方法

1
2
3
4
5
6
7
8
9
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
// 检测中断
if (Thread.interrupted())
throw new InterruptedException();
// 先调用tryAcquireShared方法试图获取资源,获取资源失败则调用doAcquireSharedNanos方法
return tryAcquireShared(arg) >= 0 ||
doAcquireSharedNanos(arg, nanosTimeout);
}

上面简述过doAcquireShare和doAcquireShareInterrupted两个方法,doAcquireSharedNanos跟这两个方法区别只是多了一个时间限制,该方法作用是当前线程放入等待队列并在设置时间内自旋检测获取资源

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
// 设置结束时间
final long deadline = System.nanoTime() + nanosTimeout;
// 添加结点至等待队列
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
// CAS自旋
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return true;
}
}
nanosTimeout = deadline - System.nanoTime();
// 超时就结束自旋
if (nanosTimeout <= 0L)
return false;
// 检测是否该去park下停止自旋,但是如果剩余时间很小了,就没必要park,继续自旋然后就会因超时结束,执行park操作也需要开销
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
// 响应中断
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}

总结

Semaphore是一个共享锁、信号量,提供了公平&非公平两种工作模式、是否响应中断的acquire等方法,可用于控制同时访问某个特定资源的线程操作数量,或者同时执行某个指定操作的数量。还可以用来实现某种资源池限制,或者对容器施加边界

参考

Java多线程之JUC包:Semaphore源码学习笔记

-------------本文结束感谢您的阅读-------------