[TOC]
AQS
AQS的两种功能
从使用上来说,
AQS(AbstractQueueSynchronizer 队列同步器)
的功能可以分为两种:独占和共享。ReentrantReadWriteLock就是通过两个内部类来分别实现了这两种功能,提供读锁和写锁的功能独占锁
ReentrantLock lock = new ReentrantLock(); public void function(){ lock.lock(); try { // do something... } finally { lock.unlock(); } }
共享锁 共享就是同时可以n个线程一起访问,当n==1时,共享就变成了独占
ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); public void function(){ //读锁(共享锁) lock.readLock().lock(); try { // do something... } finally { lock.readLock().unlock(); } }
- 读锁与读锁可以共享
- 读锁与写锁不可以共享(排他)
- 写锁与写锁不可以共享(排他)
AQS 内部实现
设计模式
标准模板方法模式
//独占模式接口 public final void acquire(int arg); public final void acquireInterruptibly(int arg) throws InterruptedException; public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException; public final boolean release(int arg); //共享模式接口 public final void acquireShared(int arg); public final void acquireSharedInterruptibly(int arg) throws InterruptedException; public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException; public final boolean releaseShared(int arg);
重写同步器指定方法时,需要使用同步器提供的3个方法来访问或者修改同步状态
- getState():获取当前同步状态
- setState(int newState): 设置当前同步状态
- compareAndSetState(int expect,int update):使用cas设置当前状态,保证状态的原子性
主要数据结构
node
AQS内部维护着一个双向链表实现的FIFO的队列(只有申请资源发现资源不够的线程会加入到这个队列,比如lock.lock()失败),该队列就是用来实现线程的并发访问控制。队列中的元素是一个Node类型的节点(Node是AQS的内部类),Node的主要属性如下:
//队列的头节点,头节点不存储Thread,仅仅保存next节点的引用 private transient volatile Node tail; private transient volatile Node tail; /** *state变量对于不同的子类实现有不同的意义 **/ private volatile int state; static final class Node { static final Node SHARED = new Node(); static final Node EXCLUSIVE = null; static final int CANCELLED = 1; static final int SIGNAL = -1; static final int CONDITION = -2; static final int PROPAGATE = -3; /** * CLH队列的某一个节点的状态(自旋,挂起,唤醒)都由自己的前驱结点的状态决定,这也是 * waitStatus的意义所在。入队和出队时挂起线程、唤醒线程都借助的是LockSupport这个 * 辅助类,park和unpark * *表示节点状态: * 1. CANCELLED:值为1,表示当前节点被取消 * 2. SIGNAL:值为-1,表示当前节点的的后继节点将要或者已经被阻塞,在当前节点释放的时候需要unpark后继节点(唤醒); * 3. CONDITION:值为-2,表示当前节点在等待condition,即在condition队列中 * 4. PROPAGATE:值为-3,表示releaseShared需要被传播给后续节点(仅在共享模式下使用); * 5. 0:无状态,表示当前节点在队列中等待获取锁 * * **/ volatile int waitStatus; //前继节点 volatile Node prev; //后继节点 volatile Node next; //被阻塞的线程 volatile Thread thread; //存储condition队列中的后继节点 Node nextWaiter; final boolean isShared() { return nextWaiter == SHARED; } final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } Node() { // Used to establish initial head or SHARED marker } Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread; } Node(Thread thread, int waitStatus) { // Used by Condition this.waitStatus = waitStatus; this.thread = thread; } }
队列
- AQS内部有两个队列,一个是syn队列,一个是condition队列。任何线程要么获取了资源,要么就在syn队列,要么就在condition队列。syn队列里是去争取某个资源的线程,condition队列里都是等待condition.signal的线程
源码分析
获取资源的入口是acquire方法
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
当tryAcquire(arg)失败后,后面的acquireQueued(入队操作)才会执行
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) {//自旋等待直至挂起 final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) ///因为前驱结点是SIGNAL,所以后续节点可以放心挂起 return true; if (ws > 0) {//ws>0代表前驱结点已被取消,不断向前移动跳过这类节点 /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else {//代表前驱结点状态为0或者为PROPAGATE态 //我们设置前驱结点的状态为SIGNAL,下一次访问的时候再挂起。 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }