[TOC]

AQS

AQS的两种功能

  1. 从使用上来说,AQS(AbstractQueueSynchronizer 队列同步器)的功能可以分为两种:独占和共享。ReentrantReadWriteLock就是通过两个内部类来分别实现了这两种功能,提供读锁和写锁的功能

  2. 独占锁

     ReentrantLock lock = new ReentrantLock();
     public void function(){
         lock.lock();
         try {
    
         // do something...
    
         } finally {
             lock.unlock();
         }
    
     }
    
  3. 共享锁 共享就是同时可以n个线程一起访问,当n==1时,共享就变成了独占

     ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    
     public void function(){
    
         //读锁(共享锁)
         lock.readLock().lock();
         try {
    
         // do something...
    
         } finally {
             lock.readLock().unlock();
         }
    
     }
    
    • 读锁与读锁可以共享
    • 读锁与写锁不可以共享(排他)
    • 写锁与写锁不可以共享(排他)

AQS 内部实现

设计模式

  1. 标准模板方法模式

     //独占模式接口 
     public final void acquire(int arg);
     public final void acquireInterruptibly(int arg) throws InterruptedException;
     public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException;
     public final boolean release(int arg);
    
     //共享模式接口 
     public final void acquireShared(int arg);
     public final void acquireSharedInterruptibly(int arg) throws InterruptedException;
     public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException;
     public final boolean releaseShared(int arg);
    

    重写同步器指定方法时,需要使用同步器提供的3个方法来访问或者修改同步状态

    • getState():获取当前同步状态
    • setState(int newState): 设置当前同步状态
    • compareAndSetState(int expect,int update):使用cas设置当前状态,保证状态的原子性

主要数据结构

node

  1. AQS内部维护着一个双向链表实现的FIFO的队列(只有申请资源发现资源不够的线程会加入到这个队列,比如lock.lock()失败),该队列就是用来实现线程的并发访问控制。队列中的元素是一个Node类型的节点(Node是AQS的内部类),Node的主要属性如下:

     //队列的头节点,头节点不存储Thread,仅仅保存next节点的引用
     private transient volatile Node tail;
     private transient volatile Node tail;
    /**
     *state变量对于不同的子类实现有不同的意义 
     **/
     private volatile int state;
    
     static final class Node {
             static final Node SHARED = new Node();
             static final Node EXCLUSIVE = null;
             static final int CANCELLED =  1;
             static final int SIGNAL    = -1;
             static final int CONDITION = -2;
             static final int PROPAGATE = -3;
    
            /**
             * CLH队列的某一个节点的状态(自旋,挂起,唤醒)都由自己的前驱结点的状态决定,这也是
             * waitStatus的意义所在。入队和出队时挂起线程、唤醒线程都借助的是LockSupport这个
             * 辅助类,park和unpark
             * 
             *表示节点状态:
             * 1. CANCELLED:值为1,表示当前节点被取消
             * 2. SIGNAL:值为-1,表示当前节点的的后继节点将要或者已经被阻塞,在当前节点释放的时候需要unpark后继节点(唤醒);
             * 3. CONDITION:值为-2,表示当前节点在等待condition,即在condition队列中
             * 4. PROPAGATE:值为-3,表示releaseShared需要被传播给后续节点(仅在共享模式下使用);
             * 5. 0:无状态,表示当前节点在队列中等待获取锁
             *
             *
             **/
             volatile int waitStatus;
             //前继节点
             volatile Node prev;
             //后继节点
             volatile Node next;
             //被阻塞的线程
             volatile Thread thread;
             //存储condition队列中的后继节点
             Node nextWaiter;
    
             final boolean isShared() {
                 return nextWaiter == SHARED;
             }
    
             final Node predecessor() throws NullPointerException {
                 Node p = prev;
                 if (p == null)
                     throw new NullPointerException();
                 else
                     return p;
             }
    
             Node() {    // Used to establish initial head or SHARED marker
             }
    
             Node(Thread thread, Node mode) {     // Used by addWaiter
                 this.nextWaiter = mode;
                 this.thread = thread;
             }
    
             Node(Thread thread, int waitStatus) { // Used by Condition
                 this.waitStatus = waitStatus;
                 this.thread = thread;
             }
         }
    

队列

  1. AQS内部有两个队列,一个是syn队列,一个是condition队列。任何线程要么获取了资源,要么就在syn队列,要么就在condition队列。syn队列里是去争取某个资源的线程,condition队列里都是等待condition.signal的线程

源码分析

  1. 获取资源的入口是acquire方法

      public final void acquire(int arg) {
             if (!tryAcquire(arg) &&
                 acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                 selfInterrupt();
         }
    

    当tryAcquire(arg)失败后,后面的acquireQueued(入队操作)才会执行

     final boolean acquireQueued(final Node node, int arg) {
             boolean failed = true;
             try {
                 boolean interrupted = false;
                 for (;;) {//自旋等待直至挂起
                     final Node p = node.predecessor();
                     if (p == head && tryAcquire(arg)) {
                         setHead(node);
                         p.next = null; // help GC
                         failed = false;
                         return interrupted;
                     }
                     if (shouldParkAfterFailedAcquire(p, node) &&
                         parkAndCheckInterrupt())
                         interrupted = true;
                 }
             } finally {
                 if (failed)
                     cancelAcquire(node);
             }
         }
    
      private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
         int ws = pred.waitStatus;
         if (ws == Node.SIGNAL)
            ///因为前驱结点是SIGNAL,所以后续节点可以放心挂起
             return true;
         if (ws > 0) {//ws>0代表前驱结点已被取消,不断向前移动跳过这类节点
             /*
              * Predecessor was cancelled. Skip over predecessors and
              * indicate retry.
              */
             do {
                 node.prev = pred = pred.prev;
             } while (pred.waitStatus > 0);
             pred.next = node;
         } else {//代表前驱结点状态为0或者为PROPAGATE态
               //我们设置前驱结点的状态为SIGNAL,下一次访问的时候再挂起。
             compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
         }
         return false;
     }
    

results matching ""

    No results matching ""