JUC-Semaphore

Semaphore

基本使用

synchronized 可以起到锁的作用,但某个时间段内,只能有一个线程允许执行。我们可以把Semaphore看作一个包含多个许可(permit)的集合,例如一个代表5个许可的Semaphore、6个许可的Semaphore等等(为便于表达,后文用字母P表示许可)。Semaphore上的acuqire操作申请P,而release操作则产生P,Semaphore可用于追踪可用资源的个数。

Semaphore(信号量)用来限制能同时访问共享资源的线程上限,非重入锁

构造方法:

  • public Semaphore(int permits):permits 表示许可线程的数量(state)
  • public Semaphore(int permits, boolean fair):fair 表示公平性,如果设为 true,下次执行的线程会是等待最久的线程

常用API:

  • public void acquire():表示获取许可
  • public void release():表示释放许可,acquire() 和 release() 方法之间的代码为同步代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public static void main(String[] args) {
// 1.创建Semaphore对象
Semaphore semaphore = new Semaphore(3);

// 2. 10个线程同时运行
for (int i = 0; i < 10; i++) {
new Thread(() -> {
try {
// 3. 获取许可
semaphore.acquire();
sout(Thread.currentThread().getName() + " running...");
Thread.sleep(1000);
sout(Thread.currentThread().getName() + " end...");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 4. 释放许可
semaphore.release();
}
}).start();
}
}

单机限流用法

Semaphore也可以来实现一个单机限流工具(针对单台机器的线程而言)——即限制同时访问某资源的线程数。实现思路:让1个线程以固定的速度生产P,而让多个线程消费P,这样,消费者线程就能以低于某个上限的速度消费资源,不会导致系统超负荷。

特殊用法

我们可以创建一个只有1个P的Semaphore,即二元信号量。它的功能与锁类似,但是没有所有权的概念。然后,我们可以在一个线程中进行加锁(acquire),而在另一个线程中执行解锁动作(release),并且负责解锁的线程不需要事先获得这个锁。与之相反,ReentrantLock的加锁和解锁动作都必须在同一个线程中完成。

Semaphore的实现原理

首先,Semaphore内部并没有真正保存P,而是只保存了P的个数。其次,Semaphore直接复用了AQS框架的共享模式锁,其acquire和release操作直接调用共享模式的AQS加锁和AQS解锁,没有增加其他逻辑,只不过在加锁和解锁的过程中,是把P的个数存入AQS原子整数。

具体的讲,Semaphore的acquire操作(acquireUninterruptibly操作及tryAcquire操作都与acquire类似)尝试取走1个P,而如果P的个数等于0无法取出就阻塞等待。

acquire

acquire的全部实现就是直接调用AQS类的acquireSharedInterruptibly方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// acquire() -> sync.acquireSharedInterruptibly(1),可中断
public final void acquireSharedInterruptibly(int arg) {
if (Thread.interrupted())
throw new InterruptedException();
// 尝试获取通行证,获取成功返回 >= 0的值
if (tryAcquireShared(arg) < 0)
// 获取许可证失败,进入阻塞
doAcquireSharedInterruptibly(arg);
}

// tryAcquireShared() -> nonfairTryAcquireShared()
// 非公平,公平锁会在循环内 hasQueuedPredecessors()方法判断阻塞队列是否有临头节点(第二个节点)
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
// 获取 state ,state 这里【表示通行证】
int available = getState();
// 计算当前线程获取通行证完成之后,通行证还剩余数量
int remaining = available - acquires;
// 如果许可已经用完, 返回负数, 表示获取失败,
if (remaining < 0 ||
// 许可证足够分配的,如果 cas 重试成功, 返回正数, 表示获取成功
compareAndSetState(available, remaining))
return remaining;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private void doAcquireSharedInterruptibly(int arg) {
// 将调用 Semaphore.aquire 方法的线程,包装成 node 加入到 AQS 的阻塞队列中
final Node node = addWaiter(Node.SHARED);
// 获取标记
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
// 前驱节点是头节点可以再次获取许可
if (p == head) {
// 再次尝试获取许可,【返回剩余的许可证数量】
int r = tryAcquireShared(arg);
if (r >= 0) {
// 成功后本线程出队(AQS), 所在 Node设置为 head
// r 表示【可用资源数】, 为 0 则不会继续传播
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
// 不成功, 设置上一个节点 waitStatus = Node.SIGNAL, 下轮进入 park 阻塞
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
// 被打断后进入该逻辑
if (failed)
cancelAcquire(node);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
private void setHeadAndPropagate(Node node, int propagate) {    
Node h = head;
// 设置自己为 head 节点
setHead(node);
// propagate 表示有【共享资源】(例如共享读锁或信号量)
// head waitStatus == Node.SIGNAL 或 Node.PROPAGATE,doReleaseShared 函数中设置的
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
// 如果是最后一个节点或者是等待共享读锁的节点,做一次唤醒
if (s == null || s.isShared())
doReleaseShared();
}
}

release

release操作也非常简单,直接调用AQS的releaseShared方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// release() -> releaseShared()
public final boolean releaseShared(int arg) {
// 尝试释放锁
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
protected final boolean tryReleaseShared(int releases) {
for (;;) {
// 获取当前锁资源的可用许可证数量
int current = getState();
int next = current + releases;
// 索引越界判断
if (next < current)
throw new Error("Maximum permit count exceeded");
// 释放锁
if (compareAndSetState(current, next))
return true;
}
}
private void doReleaseShared() {
// PROPAGATE 详解
// 如果 head.waitStatus == Node.SIGNAL ==> 0 成功, 下一个节点 unpark
// 如果 head.waitStatus == 0 ==> Node.PROPAGATE
}

  • 接下来 Thread-0 竞争成功,permits 再次设置为 0,设置自己为 head 节点,并且 unpark 接下来的共享状态的 Thread-3 节点,但由于 permits 是 0,因此 Thread-3 在尝试不成功后再次进入 park 状态

面试

问题1.semaphore初始化有10个令牌,11个线程同时各调用1次acquire方法,会发生什么?

答案:拿不到令牌的线程阻塞,不会继续往下运行。

问题2.semaphore初始化有10个令牌,一个线程重复调用11次acquire方法,会发生什么?

答案:线程阻塞,不会继续往下运行。可能你会考虑类似于锁的重入的问题,很好,但是,令牌没有重入的概念。你只要调用一次acquire方法,就需要有一个令牌才能继续运行。

问题3.semaphore初始化有1个令牌,1个线程调用一次acquire方法,然后调用两次release方法,之后另外一个线程调用acquire(2)方法,此线程能够获取到足够的令牌并继续运行吗?

答案:能,原因是release方法会添加令牌,并不会以初始化的大小为准。

问题4.semaphore初始化有2个令牌,一个线程调用1次release方法,然后一次性获取3个令牌,会获取到吗?

答案:能,原因是release会添加令牌,并不会以初始化的大小为准。Semaphore中release方法的调用并没有限制要在acquire后调用。

文章作者: GeYu
文章链接: https://nuistgy.github.io/2023/05/17/JUC-Semaphore/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 Yu's Blog