AQS
本文粗略的过一下juc下面的锁,其aqs的数据结构与一些基本原理。
AQS全称AbstractQueuedSynchronizer,其实这个名字就道出了他的一切。
- 是一个抽象类,Abstract
- 其数据结构是一个队列,Queued
- 通过改变一个state的值来达到,Synchronizer
数据结构
AQS核心思想是,如果被请求的共享资源空闲,那么就将当前请求资源的线程设置为有效的工作线程,将共享资源设置为锁定状态;如果共享资源被占用,就需要一定的阻塞等待唤醒机制来保证锁分配。这个机制主要用的是CLH队列的变体实现的,将暂时获取不到锁的线程加入到队列中。
AQS使用一个Volatile的int类型的成员变量来表示同步状态,通过内置的FIFO队列来完成资源获取的排队工作,通过CAS完成对State值的修改。
没有获取到的资源的线程,即state值达不到他想要的,他就会把他生成一个node(将线程存入其中)随后加入队列中,并挂起此线程
源码解析主要流程
看源码时需注意他的头结点是没有实际数据的节点在算法中经常使用称之为哨兵节点,不然容易陷进去
链表中的哨兵节点的好处:头结点不会为空,减少边界的判断,从而防止对特殊条件的判断,使代码更为简便优雅
没有哨兵节点的正常流程
- 新增元素的时候先判断判断头结点是否为空,头结点为空则新建头结点
- 头结点不为空则取笑一个元素直至next为空
如果使用的哨兵节点则第一步可以省略,既可以使代码对边界的处理更加清晰,也能减少每次新增是的判断头结点
资源获取acquire
aqs中的源代码如下,acquire()获取资源,state符合要求返回true(也就是tryAcquire),否则将线程加入队列中,并将其挂起。tryAcquire由子类实现,aqs主要负责获取资源时,不符合条件后的操作(加入等待队列,挂起线程)与释放的操作
/**
* Acquires in exclusive mode, ignoring interrupts. Implemented
* by invoking at least once {@link #tryAcquire},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquire} until success. This method can be used
* to implement method {@link Lock#lock}.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
资源释放release
当释放锁是需要释放对应的资源,且唤醒在排队的线程源码如下
/**
* Releases in exclusive mode. Implemented by unblocking one or
* more threads if {@link #tryRelease} returns true.
* This method can be used to implement method {@link Lock#unlock}.
*
* @param arg the release argument. This value is conveyed to
* {@link #tryRelease} but is otherwise uninterpreted and
* can represent anything you like.
* @return the value returned from {@link #tryRelease}
*/
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
tryRelease是一个抽象方法,由子类实现,他来决定干些什么,unparkSuccessor唤醒下一个可以获得资源的线程源码如下
/**
* Wakes up node's successor, if one exists.
*
* @param node the node
*/
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
如上代码所示将下一个节点唤醒
对应aqs实现类
FairSync
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
acquire(1);
}
/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
如上代码所示,lock时判断state是否为0或者是不是本身就是他在占有,不为0则释放
总结
AQS提供的是一种数据结构与通用方法,让子类随自己需求进行线程锁定。
如果翻看过我之前编写future源码解析,你会发现他们的共同之处,都是通过一个state来进行控制各个线程的状态,且将没有达到条件的线程挂起,并加入到阻塞队列,等待条件满足后再一一唤醒