ReentrantLock源码浅谈

开始之前

最近一直没有写东西,两个原因 一:前段时间工作太忙,再加上办了一张游泳卡,近期都在去游泳,二:所有的输出都输出到纸上去了

深入理解Java虚拟机这本书买来一直吃灰状态中,最近拜读完,并且做了笔记。发现一个问题,很多博客上面的内容都是来自此书,而且都是断断续续的,建议如果需要系统的了解JVM,还是买来看一看

有时间将笔记上的内容copy到网上,最近无事,准备看看JUC (java.util.concurrent)里面的内容,然后结合Java并发编程的艺术,来看看Java并发是怎么一回事

开始

是什么?

ReentrantLock是一个可重入的互斥锁,又被称为独占锁。ReentrantLock锁在同一时间点只能被一个线程锁持有,而可重入的意思是,ReentrantLock锁可以被单个线程多次获取

ReentrantLock可以分为公平锁非公平锁 ,公平锁:通过一个FIFO等待队列来管理获取该锁的所有线程,保证线程先进先出。非公平锁:在锁是可获取状态时,任何线程都有可能获取到该锁。

方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// 创建一个 ReentrantLock ,默认是“非公平锁”。
ReentrantLock()
// 创建策略是fair的 ReentrantLock。fair为true表示是公平锁,fair为false表示是非公平锁。
ReentrantLock(boolean fair)

// 查询当前线程保持此锁的次数。
int getHoldCount()
// 返回目前拥有此锁的线程,如果此锁不被任何线程拥有,则返回 null。
protected Thread getOwner()
// 返回一个 collection,它包含可能正等待获取此锁的线程。
protected Collection<Thread> getQueuedThreads()
// 返回正等待获取此锁的线程估计数。
int getQueueLength()
// 返回一个 collection,它包含可能正在等待与此锁相关给定条件的那些线程。
protected Collection<Thread> getWaitingThreads(Condition condition)
// 返回等待与此锁相关的给定条件的线程估计数。
int getWaitQueueLength(Condition condition)
// 查询给定线程是否正在等待获取此锁。
boolean hasQueuedThread(Thread thread)
// 查询是否有些线程正在等待获取此锁。
boolean hasQueuedThreads()
// 查询是否有些线程正在等待与此锁有关的给定条件。
boolean hasWaiters(Condition condition)
// 如果是“公平锁”返回true,否则返回false。
boolean isFair()
// 查询当前线程是否保持此锁。
boolean isHeldByCurrentThread()
// 查询此锁是否由任意线程保持。
boolean isLocked()
// 获取锁。
void lock()
// 如果当前线程未被中断,则获取锁。
void lockInterruptibly()
// 返回用来与此 Lock 实例一起使用的 Condition 实例。
Condition newCondition()
// 仅在调用时锁未被另一个线程保持的情况下,才获取该锁。
boolean tryLock()
// 如果锁在给定等待时间内没有被另一个线程保持,且当前线程未被中断,则获取该锁。
boolean tryLock(long timeout, TimeUnit unit)
// 试图释放此锁。
void unlock()

源码结构

ReentrantLock实现了Lock接口

ReentrantLock的锁实现机制是通过Sync实现的。Sync下有两个子类 FairSync (公平锁)和 NonfairSync (非公平锁)

Sync又是AQS的子类

节点数据结构

Node是CLH队列的节点,代表等待锁的线程队列

每个Node都会对应一个线程,每个Node的prev和next都会对应等待队列的上一个线程和下一个线程

Node通过waitStatus来判断线程的等待状态,具体的状态下文表格中有

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
static final class Node {
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;

/**线程已被取消的状态*/
static final int CANCELLED = 1;
/** 正常等待状态,需要被唤醒 */
static final int SIGNAL = -1;
/** 线程处于条件判断状态 */
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;

/**
* 上述几种状态
*/
volatile int waitStatus;

/**
* 当前节点的上一个节点
*/
volatile Node prev;

/**
* 当前节点的下一个节点
*/
volatile Node next;

/**
* 节点对应的线程
*/
volatile Thread thread;

/**
* 在有条件判断时的串行队列
*/
Node nextWaiter;

/**
* 判断是否共享
*/
final boolean isShared() {
return nextWaiter == SHARED;
}

/**
* 返回当前节点的上一个节点
*
* @return the predecessor of this node
*/
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}

Node() { // Used to establish initial head or SHARED marker
}

// 构造函数,构造一个当前线程的节点,mode表示是独占还是共享
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
//构造函数,thread是当前现场,waitStatus是线程的等待状态
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}

源码

结构图

ReentrantLock的锁实现机制是有Sync类具体实现的

1
2
/** Synchronizer providing all implementation mechanics */
private final Sync sync;

从这里可以看出来,在实例化一个ReentrantLock的时候默认是实例化一个非公平锁的,可以选择当前锁是使用公平锁还是非公平锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* 实例化一个非公平锁
* This is equivalent to using {@code ReentrantLock(false)}.
*/
public ReentrantLock() {
sync = new NonfairSync();
}

/**
* 根据fair参数选择创建公平锁或非公平锁
*
* @param fair {@code true} if this lock should use a fair ordering policy
*/
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}

加锁

1
2
3
4
5
6
7
8
9
10
11
/**
* ReentrantLock里面的lock方法,其实是调用的Sync里面的lock方法
*/
public void lock() {
sync.lock();
}

/**
* 这个是Sync里面的抽象方法,在ReentrantLock里面调用的就是这个
*/
abstract void lock();

这里是父类实现的抽象方法,具体实现在两个子类里面,就是NonfairSync和FairSync里面

非公平锁加锁

非公平锁加锁是每次都会尝试获取锁,如果获取成功了,就直接返回,如果没有成功,就会添加到等待队列

所以非公平锁在第一次没有获取成功时,加入等待队列之后也和公平锁没什么区别

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
* Sync object for non-fair locks
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;

/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
//应为这里是非公平锁,首先用CAS抢占锁。如果成功则保存当前线程,不成功就走锁竞争的逻辑
//通过CAS修改状态,如果能将状态值修改为1,则表示获取锁成功
if (compareAndSetState(0, 1))
//讲当前线程存起来,供后续使用
setExclusiveOwnerThread(Thread.currentThread());
else
//如果没有修改成功,表示当前存在竞争,则去走另外的逻辑
acquire(1);
}

protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}

acquire是AQS里面实现的方法,具体的方法又会在Sync的子类里面去重写,这里挺绕的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* Acquires in exclusive mode, ignoring interrupts. Implemented
* by invoking at least once {@link #tryAcquire},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquire} until success. This method can be used
* to implement method {@link Lock#lock}.
* 这方法可以理解为去尝试获取锁,如果获取不成功,则在AQS队列的尾部添加一个节点
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

tryAcquire方法就是在Sync子类里面重写的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132

//尝试获取锁
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}

/**
* Performs non-fair tryLock. tryAcquire is implemented in
* subclasses, but both need nonfair try for trylock method.
*/
final boolean nonfairTryAcquire(int acquires) {
//得到当前线程
final Thread current = Thread.currentThread();
//获取状态
int c = getState();
//如果为0,则表示没有锁,直接获取锁
if (c == 0) {
//CAS修改state的状态,将0改成1
if (compareAndSetState(0, acquires)) {
//设置线程
setExclusiveOwnerThread(current);
//返回true,表示获取锁成功
return true;
}
}
//应为这个是可重入锁,如果在被锁住的情况下,是不是当前线程又重新获取了一次
else if (current == getExclusiveOwnerThread()) {
//如果这个锁重入了,则需要将state的值+1,来表示被同一个线程重入了多少次
//后续,在解锁操作时,需要依次 -1 直到state的值为0,才表示锁释放
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
//return false 表示锁没有获取成功
return false;
}

/**
* Creates and enqueues node for current thread and given mode.
* tryAcquire这个方法没有成功获取锁,就会将当前线程封装成一个Node,添加到AQS队列的尾部
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*/
private Node addWaiter(Node mode) {
// new 一个Node节点,将当前线程放进去
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
//如果尾节点不等于空,表示队列里面存在其他等待的线程
if (pred != null) {
// 将尾节点指向当前节点的上一个,表示把当前需要排队的节点添加到尾节点的后面
node.prev = pred;
// 用CAS替换掉尾节点,将当前节点放在尾节点的位置上
if (compareAndSetTail(pred, node)) {
//然后将尾节点的next指向当前节点,就形成了一个链式
pred.next = node;
return node;
}
}
enq(node);//如果tail为空,或者cas失败,则用自旋将节点插入到队列尾部
return node;
}

/**
* Inserts node into queue, initializing if necessary. See picture above.
* 将节点插入到队列尾部,必要的时候初始化
* @param node the node to insert
* @return node's predecessor
*/
private Node enq(final Node node) {
//自旋
for (;;) {
// 获取尾节点,如果尾节点为null,则表示是第一次添加到队列
Node t = tail;
if (t == null) { // Must initialize 必须初始化
// new 一个节点放到Head节点去
if (compareAndSetHead(new Node()))
//将头结点赋值给尾结点
tail = head;
} else {
// 如果不是第一次添加,则将当前节前添加到最后
node.prev = t;
//将当前节点设置到最后
if (compareAndSetTail(t, node)) {
//将原尾节点的下一个节点指向当前节点,
t.next = node;
// 返回尾节点
return t;
}
}
}
}


/**
* Acquires in exclusive uninterruptible mode for thread already in
* queue. Used by condition wait methods as well as acquire.
* 用于队列中的线程自旋的以独占且不可中断的方式获取同步状态
* @param node the node
* @param arg the acquire argument
* @return {@code true} if interrupted while waiting
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 获取上一个节点
final Node p = node.predecessor();
//只有上一个节点为头节点时,才能参与锁的竞争
if (p == head && tryAcquire(arg)) {
// 如果竞争成功,则设置头结点
// 头结点不需要保存线程信息,和上一个节点的信息,所以设置为null
setHead(node);
// 将原头结点的的next设置为null,相当于孤立原头节点,帮助GC
p.next = null; // help GC
failed = false;
return interrupted;
}
// 判断线程的是否应该阻塞,如果是,则安全阻塞,进入到parkAndCheckInterrupt()
// 方法则将线程挂起
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed) //如果抛出异常,则取消锁的获取,进行出队操作
cancelAcquire(node);
}
}
非公平锁加锁的过程描述

1、通过compareAndSetState()去尝试更改锁的状态,如果锁状态更改成功,则表示获取锁成功,然后将当前线程保存到AbstractOwnableSynchronizer 当前的同步器

2、如果1 不成功过,就去走锁竞争的逻辑,首先会去尝试获取锁,如果获取成功 (两种可能 上一条线程执行完了,是重入了,state+1)就直接返回了

3、如果2不成功,就会先去构建一个包含当前线程信息的节点,然后插入到队列尾部,然后当前线程的上一个节点是头节点,则去尝试获取锁,如果不是,则挂起当前线程,等待解锁操作的唤醒

做了一个流程图可以参考:

这里面涉及到几个状态的值:

状态 说明
CANCELLED 1 等待超时或者中断,需要从同步队列中取消
SIGNAL -1 后继节点出于等待状态,当前节点释放锁后将会唤醒后继节点
CONDITION -2 节点在等待队列中,节点线程等待在Condition上,其它线程对Condition调用signal()方法后,该节点将会从等待同步队列中移到同步队列中,然后等待获取锁。
PROPAGATE -3 表示下一次共享式同步状态获取将会无条件地传播下去
INITIAL 0 初始状态

加锁的过程就这些了,可以一句话总结一下,就是加锁,如果加锁失败,则放到AQS队列尾部,这个是重入锁,可以被同一个线程加锁多次,只需要在状态上+1 解锁也需将状态减少到0

公平锁的加锁

公平锁和非公平锁在上文说过了,在代码上的区别不是很多,只是在尝试获取锁之前,需要判断当前线程是cLH等待队列的表头时,才获取锁,非公平锁就是只要锁处于空闲状态,不管CLH里面的队列,就直接获取,如果获取失败,就还是相当于一个公平锁放到CLH队列里面等待

公平锁在尝试获取锁的的时候多了一个hasQueuedPredecessors()方法,如果当线程状态为0的时候,表示当前线程是空闲的,就判断当前线程是不是在等待队列的头部,如果是,则获取锁,如果不是,则和非公平锁一样,放到等待队列去

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
* 尝试获取锁
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}


//判断当前线程是不是头结点的下一个节点,如果返回true,表示前面有线程在排队,如果返回false,表示没有线程在排队,可是直接获取
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}

解锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
/**
* Attempts to release this lock.
*
* <p>If the current thread is the holder of this lock then the hold
* count is decremented. If the hold count is now zero then the lock
* is released. If the current thread is not the holder of this
* lock then {@link IllegalMonitorStateException} is thrown.
* 解锁
* @throws IllegalMonitorStateException if the current thread does not
* hold this lock
*/
public void unlock() {
sync.release(1);
}

//解锁
public final boolean release(int arg) {
// 如果状态==0,就返回true,然后去唤醒
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
// 将头结点的下一个节点激活
unparkSuccessor(h);
return true;
}
return false;
}

protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 状态==0 将当前执行的线程设置为null
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
//设置状态
setState(c);
return free;
}

// 将头节点的下一个节点激活
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);

/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
非公平锁解锁的过程描述

解锁,将state的值-1 如果为0,则去解锁,并且将当前保存的线程信息设置为null,解锁分为两个步骤,第一

1、解锁,将state的值 -1 如果为0,就表示能直接解锁

2、开始解锁,解锁分为两步,第一步是将头节点的状态改成0,然后将头结点的next后置节点激活

流程图:

总结

ReentrantLock是一个可重入锁,他的可重入是标识当前线程能在状态上+1来保证当前线程能进入已经被当前线程加锁的线程,这个源码的重点还是AbstractQueuedSynchronizer里面的这个AQS队列

-------------本文结束感谢您的阅读-------------
0%