码迷,mamicode.com
首页 > 编程语言 > 详细

第5章 Java中的锁

时间:2019-02-05 18:07:25      阅读:198      评论:0      收藏:0      [点我收藏+]

标签:chain   方式   角度   back   不能   erp   返回   inter   bar   

5.1 Lock接口

  并发编程安全性需要使用到锁,synchronized是一种隐式的获得与释放锁的关键字,除此之外还有Lock接口及其实现类,该接口及实现类提供了显示获取和释放锁的方式。

  除了上述编程时的区别外,在使用时Lock与synchronized的区别主要有以下三点:

  • 非阻塞的获取锁。一个线程在没有获得到锁的时候不是阻塞被挂起
  • 获得锁的线程可以被中断。一个线程在获得锁之后可以响应中断,此时该线程会释放所获得的锁。
  • 超时获取锁。线程在获取锁的时候可以指定超时时间

技术图片

 

  Lock接口的API如上所示,可以看出和lock相关的方法有很多分别对应不同的上锁的方式。

5.2 队列同步器

  AQS,是并发包下用来构建其他类的一个基础类。一般一个锁的类都是实现了Lock接口并定义了一个内部类,该内部类继承了AQS。

5.2.1 队列同步器的接口与实例

  同步器里提供了很多的方法,从程序员的角度可以分为三个类别:1、特殊的原子方法 2、可重写的方法 3、模板方法

  特殊的原子方法有三个getState setState compareAndSetState 。这三个方法是用来修改或者获取AQS里的state状态的。state是一个Int类型的变量用来表示同步状态。

  技术图片

  可重写的方法分为两大类,分别针对独占是和非独占的锁。

技术图片

  模板方法就比较多了。比如我要实现了一个独占的锁,我就要使用模板方法去重写tryAcquire和tryRelease。模板方法里的方法也可以分为三类:1、独占式的获取和释放锁的状态 2、共享式的获取和释放锁的状态 3、查看同步队列里的的等待线程的状态

  

5.2.2 队列同步器的实现分析

1、同步队列

  AQS名字里就有个Queue,AQS在内部维护了一个双端链表,每一个尝试获取锁失败的线程都会和一些额外的信息一起构成一个Node然后加入内部的Queue里,同时当前线程会被阻塞。当同步状态释放的时候会把Queue首部节点指向的线程释放并让其尝试再次获取同步状态。

  一个双端队列里的Node至少有两个结构:指向前面节点的pre和后面节点的next。除此之外还有一个指向当前节点所代表线程的一个引用thread。因为无论是互斥锁还是非互斥锁,还是处于不同状态的锁都是用这个队列来维护的,所以一个节点要包含线程的不同等待状态,因而Node里有一个字段waitStatus用来记录当前线程的等待状态。还要维护一个等待队列,用nextWaiter来表示。

  技术图片

 

  

  AQS持有队列的头结点和尾节点的引用。每当一个新的线程获取锁失败的时候就会被加入这个队列的尾部,当然这个过程要求是线程安全的,这里并没有对整个加入节点的过程使用synchronized加锁,而是使用一个CAS的设置方法。

  设置首节点即创建整个队列的工作是由成功获取锁的线程来完成的,因为这个线程只能有一个,所以这个过程不需要保证线程安全。

2、独占式同步状态的获取与释放

  所谓独占式同步状态只只能有一个线程获得锁,是通过acquire方法来实现的。

    /**
     * Acquires in exclusive mode, ignoring interrupts.  Implemented
     * by invoking at least once {@link #tryAcquire},
     * returning on success.  Otherwise the thread is queued, possibly
     * repeatedly blocking and unblocking, invoking {@link
     * #tryAcquire} until success.  This method can be used
     * to implement method {@link Lock#lock}.
     *
     * @param arg the acquire argument.  This value is conveyed to
     *        {@link #tryAcquire} but is otherwise uninterpreted and
     *        can represent anything you like.
     */
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

 

   看到这里的时候我对AQS三种类型的方法有了更深的理解。这个方法里面的tryAcquire在AQS里是一个protected的方法,需要写一个子类继承并重写,但是后面的acquireQueued等方法是AQS重新写好的。回顾上面的AQS三种分类,我们要重写的方法都一些AQS模板方法里需要使用的但是AQS却没有写好的方法,换言之AQS里的模板方法是一些半成品方法。

  回到这个独占式同步状态的获取,总的逻辑是如果If里面的条件满足当前的线程会被interrupt。if里面的条件有两个,其一是获取锁失败,其二是一个复杂的条件。

  根据短路原则,当执行&&后面的逻辑的时候当前线程获取锁的过程已经失败了,因此首先会执行后面的addWaiter方法,并传入一个exclusive参数,根据这个参数可以想到这是一个独占的、排他的性质。

    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

   进入addWaiter方法,首先新建了一个Node,即AQS里维护的那个双端Queue的节点,这个节点的mode是exclusive的。后面的操作我们也可以想象到应该是把这个节点放到队列里面去,但是这里做了一个优化处理,从那一行注释也可以看出来:先尝试使用快速的方式enq,如果不行的话才用全过程的enq。这里并没直接把新建的Node放到tail后面,而是使用一个CAS操作。这是一个非阻塞式的线程安全操作,如果这个CAS操作返回true的话说明我们新建的Node已经被设置成tail了,如果失败了话就进入full enq操作。虽然我很想知道CAS操作背后究竟发生了什么,但是点到最后发现这是一个native的方法。

  下面是enq方法,即full enq。除了死循环外,和上面相比差不多,只不过多了一个初始化的过程,即当tail为Null也就是整个队列第一次插入Node,相似的是每一次插入新的节点都是CAS操作。当tail不为Null的时候插入到尾部的方法和之前的一样。不过我这里有个疑问,if t== null那段逻辑里怎么没有return????

  综上所述,无论是快速添加还是full添加,通过使用CAS操作保证了整个队列Node添加的线程安全性。

    /**
     * Inserts node into queue, initializing if necessary. See picture above.
     * @param node the node to insert
     * @return node‘s predecessor
     */
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

 

  上面的过程都是addWaiter方法的过程,该方法最终返回一个node类型的变量作为accquireQueued的入参。

  

    /**
     * Acquires in exclusive uninterruptible mode for thread already in
     * queue. Used by condition wait methods as well as acquire.
     *
     * @param node the node
     * @param arg the acquire argument
     * @return {@code true} if interrupted while waiting
     */
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

   这里最主要的也是一个死循环,里面的if逻辑里有一个与判断,首先判断当前节点的前面的节点是否为头结点,如果是的话才有资格去获取锁,获取成功就对队列进行一些处理然后返回,否则就继续执行死循环,从这个死循环可以看到这个锁的自旋过程,在没有获得锁的情况下线程在一个死循环里等待请求获得锁。除此之外一个线程获得了锁之后,该线程对应的Node就被设置成了头节点。

  整个过程的如下。

  技术图片

  在一个线程成功获得锁并执行完逻辑后要释放锁给别的线程去获取。

    /**
     * Releases in exclusive mode.  Implemented by unblocking one or
     * more threads if {@link #tryRelease} returns true.
     * This method can be used to implement method {@link Lock#unlock}.
     *
     * @param arg the release argument.  This value is conveyed to
     *        {@link #tryRelease} but is otherwise uninterpreted and
     *        can represent anything you like.
     * @return the value returned from {@link #tryRelease}
     */
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

   这里的tryRelease也是一个需要重写的方法。在上面的acquireQueued方法里当一个线程获得锁之后就会被设置成head节点,所以这里释放锁的操作是针对head节点。这里有一点我不明白,应该是判断Queue里是否有别的在等待的线程,如果有就去调用unparkSuccessor去唤醒,这里是判断h!=null。但从这个方法的名字可能是在unparkSuccessor里包含了判断后续是否有节点的逻辑。

3、共享式同步状态获取与释放

  共享式的特点是同一时刻可以有多个线程获得锁。AQS里通过acquireShared方法来获得共享锁,该方法里有个tryAcquireShared方法,根据在互斥锁里的经验带try的是需要我们重写的方法,而这里的acquireShared方法是一个模板方法。tryAcquireShared返回值是一个int类型的变量分别代表尝试获取共享锁的不同结果,为负代表获取失败,为0代表该次获取成功但是不可以继续获取别的共享锁,为正的话其值大小代表还可以获取多少个共享锁。

  /**
     * Acquires in shared mode, ignoring interrupts.  Implemented by
     * first invoking at least once {@link #tryAcquireShared},
     * returning on success.  Otherwise the thread is queued, possibly
     * repeatedly blocking and unblocking, invoking {@link
     * #tryAcquireShared} until success.
     *
     * @param arg the acquire argument.  This value is conveyed to
     *        {@link #tryAcquireShared} but is otherwise uninterpreted
     *        and can represent anything you like.
     */
    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }

   当返回值小于0的时候进入doAcquireShared方法,即失败的时候进入了doAcquireShared方法,虽然现在没有看该方法的具体的实现,根据上面非共享的锁的经验来看,这个方法应该是让没有获得锁的线程自旋等待获取锁。

 /**
     * Acquires in shared uninterruptible mode.
     * @param arg the acquire argument
     */
    private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

   与上述的类似,在一个循环里尝试获取锁,只有当一个节点的前面的节点是head的时候才可以尝试获取锁,只有在获取锁成功的情况下才可以退出。

  同样的最后也需要释放资源。由于共享模式下会有多个线程获得锁也会有多个线程释放锁,这个过程也要保证是线程安全的。

    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

 4、 独占式超时获取同步状态

  和独占式获取同步状态整体上是十分相似的,区别之处在于没有获取到锁的时候回等待一个指定的时间,如果没有成功获得锁就会返回false

5、 自定义同步组件 TwinsLock

5.3 重入锁

  锁的重入指的是获得锁的线程再次申请获得锁的时候能够成功获得锁。synchronized关键字是隐式支持锁的重入的,使用synchronized关键字修饰一个递归方法的时候,可以成功执行就可以说明该锁是支持重入的。

1、实现重进入  

  先不看具体的实现,可以猜测出要实现锁的重入应该维护一个计数器,同一个线程多次获得一个锁的时候要对计数器累加,然后释放的时候要减少计数器的值,当计数器为零的时候代表锁可以释放。

  这里分析的代码是ReentrantLock,按照以前的经验该类实现了Lock接口并有一个内部类Sync,该内部类集成了AQS。ReentrantLock还有两个额外的内部类FairSync和NonfairSync都继承于Sync,这两个类分别对应于公平锁和非公平锁。新建一个ReentrantLock的时候默认新建的是NonfairSync。调用ReentrantLock的lock方法的时候调用的是NonfairSync的lock方法。  

    /**
     * Sync object for non-fair locks
     */
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;

        /**
         * Performs lock.  Try immediate barge, backing up to normal
         * acquire on failure.
         */
        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }

 

   lock方法里首先尝试一次CAS操作,CAS操作的两个参数为0和1,可以看出这个操作是当一个线程第一次获取重入锁的时候执行的操作,而且从后续的setexclusive这个名字可以看出ReentrantLock是一个非共享锁。

  如果不是线程第一次获取该锁也即线程重入一个锁的时候就会执行axquire方法。这个方法在非共享锁里分析过,这是一个模板方法,里面有一个tryAcquire方法需要我们自己重写。

  protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }

   上面就是NonfairSync对该方法的重写,直接调用了nonfairTryAcquire方法。

        /**
         * Performs non-fair tryLock.  tryAcquire is implemented in
         * subclasses, but both need nonfair try for trylock method.
         */
        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

  nonfairTryAcquire不在子类NonfairSync里而在父类Sync里。其实逻辑很简单,但是这里又多了一次当判断当前锁的state是否为0的判断,我猜测是出于线程安全的考虑。核心的逻辑在else if里,和我们之前的猜测是一样的,先判断当前线程是否是持有锁的线程,如果是的话就新增锁的state字段,其实就是AQS里队列的节点的state字段,确切的说是头节点的state字段。

  接下来是锁的释放。Lock接口的unlock方法调用的是Sync的release方法,在非公平锁里调用的是AQS的release方法,这个模板方法需要我们重写的是tryRelease方法。

  

    protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }

 

   也没啥,先判断是否是获得锁的线程,如果是的话在相应的减少state的值并作出相应的判断。

2、公平与非公平获取锁的区别

  所谓公平即一个在为线程分配锁的时候是否考虑线程的等待时间。如果只要CAS操作成功一个线程就可以获得锁就是非公平的。

        /**
         * Fair version of tryAcquire.  Don‘t grant access unless
         * recursive call or no waiters or is first.
         */
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

   分析是否公平的时候可以暂时不考虑可重入性。锁的公平性与否主要体现在一个线程第一次获取锁的时候即上述c==0分支里,c==0说明此时锁没有被任何线程获取。当一个线程试图获取锁的时候,首先要通过hasQueuedPredecessors的判断,这个方法是判断Queue里是否有等待该锁的线程,只有在没有的情况下才会执行CAS操作试图获取锁。这里也是公平和非公平的区别,非公平的锁不会考虑是否有线程已经在自旋等待锁而是直接CAS试图获取锁,这样可能的一个结果是某些一直在自旋等待的线程一直无法获取锁,带来的了饥饿。

  非公平锁是默认的实现,根据实验结果显示一个刚刚释放锁的线程再次获取锁的几率是非常大的。所以非公平锁很有可能是一个线程连续获得锁,这样线程切换的开销没有了,效率更高。

5.4 读写锁

  ReentrantReadWriteLock,多个线程可以读,一个线程去写。读写锁是一对锁,一个读锁一个写锁。

  技术图片

 

5.4.1 读写锁的接口与示例

5.4.2 读写锁的实现分析

1、读写状态的设计

  读写锁也是实现Lock接口并维护一个继承AQS的内部类来实现的,所以本质上也要通过state来维护锁的状态。state按位切割使用来维护状态。

  技术图片

  左边高位是读状态,右边低位是写状态,读写,左右,很好记忆。要想获取读状态,就是不看左边的低16位,只要把S>>16即无符号右移16位并补0。要想看写状态,只要把左边的高16位全部不看。当读状态加一的时候S+(1<<16),当写状态加一的时候直接加一就行。

  2、写锁的获取与释放

  根据读写锁名字ReentrantReadWriteLock可以看出无论是读锁还是写锁都是支持重入的,写锁是一个排他锁,只支持一个线程获得锁。

  技术图片

  ReentrantReadWriteLock一如既往的实现了Lock接口并有一个Sync内部类,和可重入锁一样有两个类分别对应公平和非公平内部类,这两个类都是private的,他暴露出来的两个public的类就是我们使用的ReadLock和WriteLock。最终模板方法里还是调用了下面的tryWriteLock方法。

  该方法里有两处return false其余都是return true,false代表着获取写锁的失败。

        /**
         * Performs tryLock for write, enabling barging in both modes.
         * This is identical in effect to tryAcquire except for lack
         * of calls to writerShouldBlock.
         */
        final boolean tryWriteLock() {
            Thread current = Thread.currentThread();
            int c = getState();
            if (c != 0) {
                int w = exclusiveCount(c);
                if (w == 0 || current != getExclusiveOwnerThread())
                    return false;
                if (w == MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
            }
            if (!compareAndSetState(c, c + 1))
                return false;
            setExclusiveOwnerThread(current);
            return true;
        }

   第一个条件是w==0,w是从exclusiveCount(c)得到的,c是当前锁的状态。exclusiveCount代码就不贴了,就是一个与运算根据State的低16位的值去判断当前是否有线程持有写锁,如果没有那么w的值为0。结合上面的对state的判断,这个意思就是如果state!=0且没有线程获得了写锁,那么此时一定存在线程获取了读锁,那么此时就不可以获得写锁。总结下来就是不可以在读锁存在的情况下获得写锁。这种设计的考虑是为了保证可见性,确保写的线程写的内容能够被读的线程读到。

  第二个条件是当前线程不是锁持有的线程。这个是保证了写锁的互斥性。

  第三个条件是在当前锁的state=0的条件下执行,即没有线程获得锁,此时尝试CAS操作如果失败就返回false。

  写锁是非共享锁,所以释放起来很简单没有考虑线程安全的问题。由于是可重入锁需要考虑计数器为0的情况修改其状态。和上面的锁的释放没有太大的区别。

  3、读锁的获取与释放

  读锁,首先可以共享,其次也可以重入。经过类似的调用分析,读锁的获取核心方法如下

     protected final int tryAcquireShared(int unused) {
            /*
             * Walkthrough:
             * 1. If write lock held by another thread, fail.
             * 2. Otherwise, this thread is eligible for
             *    lock wrt state, so ask if it should block
             *    because of queue policy. If not, try
             *    to grant by CASing state and updating count.
             *    Note that step does not check for reentrant
             *    acquires, which is postponed to full version
             *    to avoid having to check hold count in
             *    the more typical non-reentrant case.
             * 3. If step 2 fails either because thread
             *    apparently not eligible or CAS fails or count
             *    saturated, chain to version with full retry loop.
             */
            Thread current = Thread.currentThread();
            int c = getState();
            if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current)
                return -1;
            int r = sharedCount(c);
            if (!readerShouldBlock() &&
                r < MAX_COUNT &&
                compareAndSetState(c, c + SHARED_UNIT)) {
                if (r == 0) {
                    firstReader = current;
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) {
                    firstReaderHoldCount++;
                } else {
                    HoldCounter rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))
                        cachedHoldCounter = rh = readHolds.get();
                    else if (rh.count == 0)
                        readHolds.set(rh);
                    rh.count++;
                }
                return 1;
            }
            return fullTryAcquireShared(current);
        }

   首先是第一个return -1的地方,看到了熟悉的方法exclusiveCount,该方法是用来判断是否有线程获取了写锁,这个判断是意思是如果存在写锁且当前不是线程不是写锁的持有线程,返回失败。之所以这么设计是为了考虑到锁降级的问题,一个持有写锁的线程是可以持有读锁的,但是当一个线程持有写锁,其余非持有写锁的线程不可以持有读锁。这也就是代码里面注释的标记为1的部分:如果写锁被其他的线程持有那么当前线程就可以获得读锁。

  接下来有一个return1的判断,这个判断就是尝试进行一次CAS操作获取写锁,如果成功就返回。

  最后一个return代表上面尝试CAS操作失败的情况下,进入一个循环取试图获取写锁。这个之前的叙述是类似的。我记得这个操作是封装在模板方法里的,即这个尝试CAS如果失败就进入一个循环的CAS,不明白为什么这个操作要转移到这里。

  4、锁降级

  如果根据state的状态判断出此时有线程持有读锁,那么此时任何线程不可以获得写锁。如果根据state状态判断出此时有线程持有写锁,当且仅当持有写锁的试图获得读锁是可以允许的,其余任何没有持有写锁的线程是不可以获得读锁的。这种考虑是为了考虑到可见性。

  AQS框架下所有操作的可见性都是通过state变量来保证的,state变量是volatile的,根据happens-before原则对state写操作happens-before所有对其读操作。根据上面对源码的分析,无论是读锁还是写锁的释放都涉及到对state的写操作,所有试图获取锁的操作无论是读锁还是写锁都有一个读锁的操作,即写操作之前所有的对变量的修改对读操作之后的操作是可见的,就可见根据锁的获取与释放来保证可见性。

  首先在不考虑锁降级的情况下,读写锁可以理解成“互斥”的,即读锁和写锁不能同时分发出去,不允许在有线程获得读or写锁的情况下,别的线程获得写or读锁。这就是为了保证可见性,根据上面的分析可见性是基于锁的释放这个操作来实现的,假设A线程获得读锁,B线程获得写锁,那么就会存在这么一个现象:B在写的同时A在读,由于A读的时候B并没释放锁,所以B的写操作A看不到,这就带来了可见性问题。

  现在回到锁降级上,一个线程在获得写锁的时候可以获得读锁。这里为什么没有可见性问题?因为他是一个线程,线程的读写是针对线程私有内存空间的,你爱读就读,爱写就写。那按照这个理论,我获得写锁,写完之后写锁一释放直接读不就行了?我还要获得读锁吗?答案是需要的。考虑这样一个情况,A线程获得写锁,A线程写了内容,A线程释放写锁,然后A线程开始在没有获得读锁的前提下就读。假设在A线程释放写锁和在A线程在没有获得读锁的情况下就读数据之间另一个B线程写了一些数据,由于A线程在没有读锁的情况下就读数据那么由于volatile state保证的可见性就无法成立,就带来了新的可见性的问题。

  那为什么没有锁升级?A线程在获得读线程的时候也可以获得写线程?因为读锁是共享的,A线程有读锁的时候,有可能BCD线程也有读锁,当A线程又拿到写锁的时候,此时就不满足“互斥”性了。

5.5 LockSupport工具

  阻塞和唤醒线程

5.6 Condition接口

  类似于前面的等待通知机制,前面的等待通知机制是基于synchronized和Object上定义的监视器方法来实现的,Condition接口实现的等待通知机制也要结合Lock接口的实现类。

5.6.1 Condition接口与示例

  Condition接口基于Lock来实现的。

5.6.2 Condition接口实现分析

  ConditionObject是AQS的一个内部类,ConditionObject维护了一个等待队列。

1、等待队列

  等待队列也是一个FIFO队列,队列里的节点和AQS里的同步队列的Node都是一样的,但是一些属性在等待队列里是用不到的。当一个线程调用await方法的时候,会生成一个对应的节点并加入等待队列的末尾,这个加入节点的过程没有使用CAS保证线程安全性,因为在调用await的时候已经获得了锁,所以这是一个线程安全的操作。但是对于非互斥的锁不就存在多个线程可以并发的调用await方法了吗?这不也存在线程不安全的情况吗?

2、等待

  首先能够调用await的方法一定是获取了锁的线程,即一定是在同步队列的首节点,当他调用await的时候就会被放到等待队列里去。这就是底层的实现,从同步队列的首节点去除然后放到等待队列里去,相当于放弃了锁。

     /**
         * Implements interruptible condition wait.
         * <ol>
         * <li> If current thread is interrupted, throw InterruptedException.
         * <li> Save lock state returned by {@link #getState}.
         * <li> Invoke {@link #release} with saved state as argument,
         *      throwing IllegalMonitorStateException if it fails.
         * <li> Block until signalled or interrupted.
         * <li> Reacquire by invoking specialized version of
         *      {@link #acquire} with saved state as argument.
         * <li> If interrupted while blocked in step 4, throw InterruptedException.
         * </ol>
         */
        public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            Node node = addConditionWaiter();
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

 

3、通知

  通知的时候是把等待最长时间的线程从同步队列里取出来,任务队列的头部的节点是等待时间最长的节点即firstWaiter是等待时间最长的节点。删除的方法是把该节点的nextWaiter置为null,然后把下一个节点放到等待队列的队首。 

        /**
         * Moves the longest-waiting thread, if one exists, from the
         * wait queue for this condition to the wait queue for the
         * owning lock.
         *
         * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
         *         returns {@code false}
         */
        public final void signal() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignal(first);
        }

 

第5章 Java中的锁

标签:chain   方式   角度   back   不能   erp   返回   inter   bar   

原文地址:https://www.cnblogs.com/AshOfTime/p/10351153.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!