线程同步组件CountDownLatch与CyclicBarrier

前文

闲来无事,随便看看,看到了两个同步组件,正想前段时间看了ReentrantLock之后了解了一下AQS就没怎么看关于并发这块的源码了,今天下午没事,来看看,做个记录

开始

感觉自己看源码越来越快了,因为知道源码里面一些常用的套路,知道他是做什么的,只用看看大致的流程,就能明白他要做什么,挺厉害的,加油哦。

正文

CountDownLatch是JUC包里面类似于计数器的组件,一般用来等待其他线程执行完毕之后,在去执行其他操作,或者执行主线程

例子

CountDownLatch
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public static void main(String[] args)throws InterruptedException {

CountDownLatch countDownLatch = new CountDownLatch(6);
ExecutorService executorService = Executors.newFixedThreadPool(6);
for (int i = 0; i < 6; i++) {
executorService.execute(() -> {
System.out.println(Thread.currentThread().getName() + " 开始等待!!!");
countDownLatch.countDown();
});
}
countDownLatch.await();
System.out.println("起飞咯!!!");
executorService.shutdown();
}

输出

1
2
3
4
5
6
7
pool-1-thread-1 开始等待!!!
pool-1-thread-3 开始等待!!!
pool-1-thread-2 开始等待!!!
pool-1-thread-4 开始等待!!!
pool-1-thread-5 开始等待!!!
pool-1-thread-6 开始等待!!!
起飞咯!!!

这个例子是很正常的例子,保证线程数和CountDownLatch的数是一致,他会等待等同于计数器数量的线程准备好之后,在去执行其他线程,这里的其他线程是主线程,这个效果不明显,我现在将CountDownLatch初始化的参数改为7

1
CountDownLatch countDownLatch = new CountDownLatch(7);

这表示会等待7个线程然后在执行主线程,但是在for循环里面我们只初始化了6条线程,所以主线程会一直等待,因为我们没有设置超时时间并且线程数没有达到计数器里面的7个

1
2
3
4
5
6
pool-1-thread-1 开始等待!!!
pool-1-thread-2 开始等待!!!
pool-1-thread-3 开始等待!!!
pool-1-thread-4 开始等待!!!
pool-1-thread-5 开始等待!!!
pool-1-thread-6 开始等待!!!

在举一个例子,我们将7改成1

1
CountDownLatch countDownLatch = new CountDownLatch(1);
1
2
3
4
5
6
7
pool-1-thread-2 开始等待!!!
pool-1-thread-1 开始等待!!!
起飞咯!!!
pool-1-thread-3 开始等待!!!
pool-1-thread-4 开始等待!!!
pool-1-thread-6 开始等待!!!
pool-1-thread-5 开始等待!!!

此时计数器只用执行一遍countDown()就会唤醒其他所有线程包括主线程,所以例子里面的起飞咯在线程还没有等待完毕就已经执行完了

CyclicBarrier
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private CyclicBarrier cyclicBarrier;
public CyclicBarrierDemo(CyclicBarrier cyclicBarrier){
this.cyclicBarrier=cyclicBarrier;
}
@Override
public void run() {
System.out.println("线程"+Thread.currentThread().getName()+"准备!");
try {
cyclicBarrier.await();
System.out.println("线程"+Thread.currentThread().getName()+"结束!");


} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}

public static void main(String[] args) {
CyclicBarrier cyclicBarrier=new CyclicBarrier(11);
Executor executor=Executors.newFixedThreadPool(10);
for(int i=0;i<10;i++){
executor.execute(new CyclicBarrierDemo(cyclicBarrier));
}
}

这个结果和CountDownLatch差不多,也是等待一个时机,然后一起执行后续操作,但是还有一些细微差别,在后面在来分析

源码

CountDownLatch

源码挺简单的,前面分析过ReentrantLock和部分AQS的源码,在来看这个就很容易

这个源码我主要说三个方法,也是两个比较核心的方法

1
2
public void countDown() //计数器-1
public void await() //线程等待

这个唯一的构造函数必须执行一个参数,就是计数器的数值,里面也是使用的Sync同步组件

在构造函数里面new了Sync类,将计数器的值传入,然后设置state的值为count,看到这里应该就明白一大半了,之前ReentrantLock里面的可重入锁也是这样,如果相同线程进入了同一个锁,会将state的值不断的+1 然后解锁就是不断的-1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* Constructs a {@code CountDownLatch} initialized with the given count.
*
* @param count the number of times {@link #countDown} must be invoked
* before threads can pass through {@link #await}
* @throws IllegalArgumentException if {@code count} is negative
*/
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
//new Sync
this.sync = new Sync(count);
}


Sync(int count) {
setState(count);
}
countDown()

先来看看countDown方法,这个方法是将计数器的值-1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public void countDown() {
sync.releaseShared(1);
}

//尝试将计数器的值+1 如果达到了初始化设置的count值,就激活所有线程
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
//如果tryReleaseShared返回true,表示达到了count的值,就会激活所有线程
//上述例子中将count的值设置小于线程数,所以激活所有线程包括主线程,主线程就执行了
doReleaseShared();
return true;
}
return false;
}
await()

这个方法是将当前线程等待,如果计数器未达到设定值,将一直等待,上述例子也有,当计数器设置为7时,只new 6条线程,所以主线程会一直等待

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
  public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}

public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
//判断线程是否中断
if (Thread.interrupted())
throw new InterruptedException();
//判断计数器也就是当前锁的状态,如果state不为0,则需要将线程挂起
if (tryAcquireShared(arg) < 0)
//挂起线程
doAcquireSharedInterruptibly(arg);
}

//判断计数器的值
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}


private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
//先往AQS里面添加一个节点,这个addWaiter在ReentrantLock里面有分析
//会往AQS的尾部添加一个节点
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {//自旋
//得到前节点
final Node p = node.predecessor();
//如果前节点是头节点,则开始尝试激活
if (p == head) {
//尝试获取当前状态,如果为0返回1 如果不为0 返回 -1
int r = tryAcquireShared(arg);
if (r >= 0) {//表示当前状态为0,可以激活线程
//
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
//这个之前看过
//判断线程是否应该阻塞,如果是,则安全阻塞,进入到parkAndCheckInterrupt将线程挂起
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
// 如果出现异常,则取消锁的获取,进行出队操作
cancelAcquire(node);
}
}

//设置成头节点,并通过propagate判断是否可以激活
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node); //设置头结点
/*
* Try to signal next queued node if:
* Propagation was indicated by caller,
* or was recorded (as h.waitStatus either before
* or after setHead) by a previous operation
* (note: this uses sign-check of waitStatus because
* PROPAGATE status may transition to SIGNAL.)
* and
* The next node is waiting in shared mode,
* or we don't know, because it appears null
*
* The conservatism in both of these checks may cause
* unnecessary wake-ups, but only when there are multiple
* racing acquires/releases, so most need signals now or soon
* anyway.
*/

if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
//判断是否共享模式
if (s == null || s.isShared())
//进行共享模式的释放
doReleaseShared();
}
}

这个CountDownLatch看了,其实看源码很好理解,但是又深入到AQS里面去了,也有很多地方不懂,刚刚在查资料的过程中,看到了几个新名字,也不算新名字吧,只是对概念的模糊不清在这记录一下

题外

AQS是AbstractQueuedSynchronizer的简称,AQS提供了一种实现阻塞锁和一系列依赖FIFO等待队列的同步器的框架

CLH是一个自旋锁,能确保无饥饿性,提供先来先服务的公平性。CLH锁也是一种基于链表的可扩展、高性能、公平的自旋锁,申请线程只在本地变量上自旋,它不断轮询前驱的状态,如果发现前驱释放了锁就结束自旋。

AQS中有两种获取和释放资源的模式。

AQS独占模式**只允许同时一个线程获取同一个许可。可以对应为一些锁ReetrantLock

AQS共享模式允许同时多个线程获取同一个许可。可以对应一些同步器类似CountDownLatch,CyclicBarrier

CyclicBarrier
构造函数
1
2
3
4
5
6
7
8
9
10
11
12
//parties可以理解为一个屏障
public CyclicBarrier(int parties) {
this(parties, null);
}

//构造函数会调用这个,其实parties,count和CountDownLatch类似
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
await()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
 public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
//核心方法就是这个,他不需要像CountDownLatch一样,需要调用CountDown
//而是直接去--count,然后
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;

if (g.broken)
throw new BrokenBarrierException();

if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
//将count减1
int index = --count;
//如果等于0,则表示已经达到屏障,就可以开始执行唤醒操作
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}

// loop until tripped, broken, interrupted, or timed out
//线程运行到这里,就会被屏障挡住,执行await方法,挂起线程
for (;;) {
try {
//这里是设置超时操作的一些,这次先不管
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
//如果抛出异常,则破坏屏障,然后抛出异常
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}

if (g.broken)
throw new BrokenBarrierException();

//如果换了新的一轮,返回上一轮线程达到count的顺序
if (g != generation)
return index;

if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}

//唤醒,然后重新激活,重置count值和屏障
private void breakBarrier() {
generation.broken = true;
count = parties;
trip.signalAll();
}

这个就简单多了,这两个同步计数器,其实实现的功能差不多,下面再来看看一些细微的区别

区别

CountDownLatch是执行的CountDown()方法将计数器的值-1,这个是可控的,如果数值减到0之后,就开始执行后续操作

CyclicBarrier 是只用让方法await()等待之后,内部会自动去-1,然后达到初始化设置的值,就会激活全部线程

如果难理解的话,我举个例子

CountDownLatch就好比一个跑步比赛,来了一个人,计数器就 -1 ,人来起了,计数器到0了,则就开始跑,此时await阻塞的是主体,都不能动

CyclicBarrier 更像是一个屏障,屏障只能承受10g的力量,来了一个线程就相当于给这个屏障加了1g,然后达到10g的时候,这个屏障就会破了,然后所有线程一起跑起来,然后跑完了,自动修复,等着承受下一个10g,这个await阻塞的是单个线程

总结

快下班了,今天星期五,收货颇丰,研究了两个同步计数器,感觉AQS里面的内容,我看的还是太片面了,没有顾上整体,在今天查资料的过程中,看到了很多东西理解起来还是有点困难,接下来研究到了在继续分析

最后

​ 这段时间看的东西比较杂,这里看一下,那里看一下,看到什么东西,就研究一下,不知道好不好,反正我感觉挺乱的,项目这段时间也不是挺忙,上述的两个计数器在项目里面搜了一下,基本上没怎么用,只能说以后要用了,知道这个东西,能快速的上手。

​ 最近感觉来自于经济方面的压力越来越大,房贷已经还了3个月,基本上就是还了房贷,就没啥钱的地步了,可能近几年是最难熬的几年吧,以后转了公积金,或者一个收入水平提升就会好一点,突然想到我还蛮像乐观锁的,先啥也不管,该吃吃该喝喝,等出问题,在来想办法,哈哈。加油哦,给自己一个目标吧,明年尝试去面试,去看看外面的世界是什么样的,说不定能对自己目前的收入水平得到一个提升

-------------本文结束感谢您的阅读-------------
0%