JUC-Semaphore

Semaphore

基本使用

synchronized 可以起到锁的作用,但某个时间段内,只能有一个线程允许执行。我们可以把Semaphore看作一个包含多个许可(permit)的集合,例如一个代表5个许可的Semaphore、6个许可的Semaphore等等(为便于表达,后文用字母P表示许可)。Semaphore上的acuqire操作申请P,而release操作则产生P,Semaphore可用于追踪可用资源的个数。

Semaphore(信号量)用来限制能同时访问共享资源的线程上限,非重入锁

构造方法:

  • public Semaphore(int permits):permits 表示许可线程的数量(state)
  • public Semaphore(int permits, boolean fair):fair 表示公平性,如果设为 true,下次执行的线程会是等待最久的线程

常用API:

  • public void acquire():表示获取许可
  • public void release():表示释放许可,acquire() 和 release() 方法之间的代码为同步代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public static void main(String[] args) {
// 1.创建Semaphore对象
Semaphore semaphore = new Semaphore(3);

// 2. 10个线程同时运行
for (int i = 0; i < 10; i++) {
new Thread(() -> {
try {
// 3. 获取许可
semaphore.acquire();
sout(Thread.currentThread().getName() + " running...");
Thread.sleep(1000);
sout(Thread.currentThread().getName() + " end...");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 4. 释放许可
semaphore.release();
}
}).start();
}
}

单机限流用法

Semaphore也可以来实现一个单机限流工具(针对单台机器的线程而言)——即限制同时访问某资源的线程数。实现思路:让1个线程以固定的速度生产P,而让多个线程消费P,这样,消费者线程就能以低于某个上限的速度消费资源,不会导致系统超负荷。

特殊用法

我们可以创建一个只有1个P的Semaphore,即二元信号量。它的功能与锁类似,但是没有所有权的概念。然后,我们可以在一个线程中进行加锁(acquire),而在另一个线程中执行解锁动作(release),并且负责解锁的线程不需要事先获得这个锁。与之相反,ReentrantLock的加锁和解锁动作都必须在同一个线程中完成。

Semaphore的实现原理

首先,Semaphore内部并没有真正保存P,而是只保存了P的个数。其次,Semaphore直接复用了AQS框架的共享模式锁,其acquire和release操作直接调用共享模式的AQS加锁和AQS解锁,没有增加其他逻辑,只不过在加锁和解锁的过程中,是把P的个数存入AQS原子整数。

具体的讲,Semaphore的acquire操作(acquireUninterruptibly操作及tryAcquire操作都与acquire类似)尝试取走1个P,而如果P的个数等于0无法取出就阻塞等待。

acquire

acquire的全部实现就是直接调用AQS类的acquireSharedInterruptibly方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// acquire() -> sync.acquireSharedInterruptibly(1),可中断
public final void acquireSharedInterruptibly(int arg) {
if (Thread.interrupted())
throw new InterruptedException();
// 尝试获取通行证,获取成功返回 >= 0的值
if (tryAcquireShared(arg) < 0)
// 获取许可证失败,进入阻塞
doAcquireSharedInterruptibly(arg);
}

// tryAcquireShared() -> nonfairTryAcquireShared()
// 非公平,公平锁会在循环内 hasQueuedPredecessors()方法判断阻塞队列是否有临头节点(第二个节点)
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
// 获取 state ,state 这里【表示通行证】
int available = getState();
// 计算当前线程获取通行证完成之后,通行证还剩余数量
int remaining = available - acquires;
// 如果许可已经用完, 返回负数, 表示获取失败,
if (remaining < 0 ||
// 许可证足够分配的,如果 cas 重试成功, 返回正数, 表示获取成功
compareAndSetState(available, remaining))
return remaining;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private void doAcquireSharedInterruptibly(int arg) {
// 将调用 Semaphore.aquire 方法的线程,包装成 node 加入到 AQS 的阻塞队列中
final Node node = addWaiter(Node.SHARED);
// 获取标记
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
// 前驱节点是头节点可以再次获取许可
if (p == head) {
// 再次尝试获取许可,【返回剩余的许可证数量】
int r = tryAcquireShared(arg);
if (r >= 0) {
// 成功后本线程出队(AQS), 所在 Node设置为 head
// r 表示【可用资源数】, 为 0 则不会继续传播
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
// 不成功, 设置上一个节点 waitStatus = Node.SIGNAL, 下轮进入 park 阻塞
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
// 被打断后进入该逻辑
if (failed)
cancelAcquire(node);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
private void setHeadAndPropagate(Node node, int propagate) {    
Node h = head;
// 设置自己为 head 节点
setHead(node);
// propagate 表示有【共享资源】(例如共享读锁或信号量)
// head waitStatus == Node.SIGNAL 或 Node.PROPAGATE,doReleaseShared 函数中设置的
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
// 如果是最后一个节点或者是等待共享读锁的节点,做一次唤醒
if (s == null || s.isShared())
doReleaseShared();
}
}

release

release操作也非常简单,直接调用AQS的releaseShared方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// release() -> releaseShared()
public final boolean releaseShared(int arg) {
// 尝试释放锁
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
protected final boolean tryReleaseShared(int releases) {
for (;;) {
// 获取当前锁资源的可用许可证数量
int current = getState();
int next = current + releases;
// 索引越界判断
if (next < current)
throw new Error("Maximum permit count exceeded");
// 释放锁
if (compareAndSetState(current, next))
return true;
}
}
private void doReleaseShared() {
// PROPAGATE 详解
// 如果 head.waitStatus == Node.SIGNAL ==> 0 成功, 下一个节点 unpark
// 如果 head.waitStatus == 0 ==> Node.PROPAGATE
}

  • 接下来 Thread-0 竞争成功,permits 再次设置为 0,设置自己为 head 节点,并且 unpark 接下来的共享状态的 Thread-3 节点,但由于 permits 是 0,因此 Thread-3 在尝试不成功后再次进入 park 状态

面试

问题1.semaphore初始化有10个令牌,11个线程同时各调用1次acquire方法,会发生什么?

答案:拿不到令牌的线程阻塞,不会继续往下运行。

问题2.semaphore初始化有10个令牌,一个线程重复调用11次acquire方法,会发生什么?

答案:线程阻塞,不会继续往下运行。可能你会考虑类似于锁的重入的问题,很好,但是,令牌没有重入的概念。你只要调用一次acquire方法,就需要有一个令牌才能继续运行。

问题3.semaphore初始化有1个令牌,1个线程调用一次acquire方法,然后调用两次release方法,之后另外一个线程调用acquire(2)方法,此线程能够获取到足够的令牌并继续运行吗?

答案:能,原因是release方法会添加令牌,并不会以初始化的大小为准。

问题4.semaphore初始化有2个令牌,一个线程调用1次release方法,然后一次性获取3个令牌,会获取到吗?

答案:能,原因是release会添加令牌,并不会以初始化的大小为准。Semaphore中release方法的调用并没有限制要在acquire后调用。


JUC-CountDown

CountDown

基本使用

CountDownLatch:计数器,用来进行线程同步协作,等待所有线程完成,让一些线程阻塞直到另一些线程完成一系列操作才被唤醒

构造器:

  • public CountDownLatch(int count):初始化唤醒需要的 down 几步

常用API:

  • public void await():让当前线程等待,必须 down 完初始化的数字才可以被唤醒,否则进入无限等待
  • public void countDown():计数器进行减 1(down 1)
  • public void await(3, TimeUnit.SECONDS):限时等待

应用:同步等待多个 Rest 远程调用结束

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// LOL 10人进入游戏倒计时
public static void main(String[] args) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(10);
ExecutorService service = Executors.newFixedThreadPool(10);
String[] all = new String[10];
Random random = new Random();

for (int j = 0; j < 10; j++) {
int finalJ = j;//常量
service.submit(() -> {
for (int i = 0; i <= 100; i++) {
Thread.sleep(random.nextInt(100)); //随机休眠
all[finalJ] = i + "%";
System.out.print("\r" + Arrays.toString(all)); // \r代表覆盖
}
latch.countDown();
});
}
latch.await();
System.out.println("\n游戏开始");
service.shutdown();
}
/*
[100%, 100%, 100%, 100%, 100%, 100%, 100%, 100%, 100%, 100%]
游戏开始

面试

面试官:看你简历上有写熟悉并发编程,CountDownLatch一定用过吧,跟我说说它!

:CountDownLatch是JDK提供的一个同步工具,它可以让一个或多个线程等待,一直等到其他线程中执行完成一组操作。

面试官:CountDownLatch有哪些常用的方法?

:有countDown方法和await方法,CountDownLatch在初始化时,需要指定用给定一个整数作为计数器。当调用countDown方法时,计数器会被减1;当调用await方法时,如果计数器大于0时,线程会被阻塞,一直到计数器被countDown方法减到0时,线程才会继续执行。计数器是无法重置的,当计数器被减到0时,调用await方法都会直接返回。

面试官:调用countDown方法时,线程也会阻塞嘛?

:不会的,调用countDown的线程可以继续执行,不需要等待计数器被减到0,只是调用await方法的线程需要等待。

面试官:CountDownLatch的实现原理是什么?

:CountDownLatch有一个内部类叫做Sync,它继承了AbstractQueuedSynchronizer类,其中维护了一个整数state,并且保证了修改state的可见性和原子性。

创建CountDownLatch实例时,也会创建一个Sync的实例,同时把计数器的值传给Sync实例,具体是这样的:

1
2
3
4
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}

countDown方法中,只调用了Sync实例的releaseShared方法,具体是这样的:

1
2
3
public void countDown() {
sync.releaseShared(1);
}

其中的releaseShared方法,先对计数器进行减1操作,如果减1后的计数器为0,唤醒被await方法阻塞的所有线程,具体是这样的:

1
2
3
4
5
6
7
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) { //对计数器进行减一操作
doReleaseShared();//如果计数器为0,唤醒被await方法阻塞的所有线程
return true;
}
return false;
}

其中的tryReleaseShared方法,先获取当前计数器的值,如果计数器为0时,就直接返回;如果不为0时,使用CAS方法对计数器进行减1操作,具体是这样的:

1
2
3
4
5
6
7
8
9
10
protected boolean tryReleaseShared(int releases) {
for (;;) {//死循环,如果CAS操作失败就会不断继续尝试。
int c = getState();//获取当前计数器的值。
if (c == 0)// 计数器为0时,就直接返回。
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))// 使用CAS方法对计数器进行减1操作
return nextc == 0;//如果操作成功,返回计数器是否为0
}
}

await方法中,只调用了Sync实例的acquireSharedInterruptibly方法,具体是这样的:

1
2
3
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}

其中acquireSharedInterruptibly方法,判断计数器是否为0,如果不为0则阻塞当前线程,具体是这样的:

1
2
3
4
5
6
7
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)//判断计数器是否为0
doAcquireSharedInterruptibly(arg);//如果不为0则阻塞当前线程
}

其中tryAcquireShared方法,是AbstractQueuedSynchronizer中的一个模板方法,其具体实现在Sync类中,其主要是判断计数器是否为零,如果为零则返回1,如果不为零则返回-1,具体是这样的:

1
2
3
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}

MySQL存储函数

存储函数

  1. 介绍

存储函数是有返回值的存储过程,存储函数的参数只能是IN类型的。具体语法如下:

1
2
3
4
5
6
CREATE FUNCTION 存储函数名称 ([ 参数列表 ])
RETURNS type [characteristic ...]
BEGIN
-- SQL语句
RETURN ...;
END ;

characteristic说明:

  • DETERMINISTIC:相同的输入参数总是产生相同的结果
  • NO SQL :不包含 SQL 语句。
  • READS SQL DATA:包含读取数据的语句,但不包含写入数据的语句。
  1. 案例

计算从1累加到n的值,n为传入的参数值。

{2}
1
2
3
4
5
6
7
8
9
10
11
12
create function fun1(n int)
returns int deterministic
begin
declare total int default 0;
while n>0 do
set total := total + n;
set n := n - 1;
end while;
return total;
end;

select fun1(50);

在mysql8.0版本中binlog默认是开启的,一旦开启了,mysql就要求在定义存储过程时,需要指定characteristic特性,否则就会报如下错误:

image