标签:常量 return 自己的 同步器 shel rri max 数据 并发包
Lock接口
锁是用来控制多个线程访问共享资源的方式,锁能够防止多个线程同时访问共享资源。在Lock接口出现之前,Java是靠synchronized关键字实现锁功能的。在Java SE5后,并发包中新增了Lock接口用来实现锁功能,Lock在使用时需要显式地获取和释放锁。虽然缺少隐式获取锁和释放锁的便捷性,却拥有了锁获取与释放的可操作性,可中断的获取锁以及超时获取锁等多种synchrnized不具备的同步特性。
特性 | 描述 |
尝试非阻塞地获取锁 | 当前线程尝试获取锁,若这一时刻锁没有被其他线程获取到,则成功获取并持有锁 |
能被中断地获取锁 | 与synchrinozed不同,获取到锁的线程能够响应中断,当获取到锁的线程被中断时,中断异常将会被抛出,同时锁被释放 |
超时获取锁 | 在指定的截止时间之前获取锁,若截止时间到了仍无法获得锁,则返回 |
Lock API
方法名称 | 描述 |
void lock() | 获取锁,调用该方法当前线程将会获取锁,当锁获得后,从该方法返回 |
void lockInterruptibly() throws InterruptedException | 可中断地获取锁,和lock()方法不同之处在于该方法会响应中断,即在锁的获取中可以中断当前线程 |
boolean tryLock() | 尝试非阻塞的获取锁,调用该方法后立刻返回,若能够获取则返回true,否则返回false |
boolean tryLock(long time, TimeUnit unit) throws InterruptedException |
超时的获取锁,当前线程在以下3种情况下返回: 当前线程在超时时间内获得了锁 当前线程在超时时间内被中断 超时时间结束,返回false |
void unlock() | 释放锁 |
Condition newCondition() | 获取等待通知组件,该组件和当前的锁绑定,当前线程只有获得了锁才能调用该组件的wait()方法,而调用后,当前线程将释放 |
队列同步器
AbstractQueueSynchronizer是用来构建锁或其他同步组件的基础框架,它使用了一个int成员变量表示同步状态,通过内置的FIFO队列来完成资源获取线程的排队工作。
同步器的主要使用方式是继承。子类通过继承同步器并实现它的抽象方法来管理同步状态,通过getState(),setState(int newState)和compareAndSetState(int expect, int update)进行同步状态的更改。子类推荐被定义为自定义同步组件的静态内部类,同步器自身没有实现任何同步接口,它仅仅是定义了若干同步状态获取和释放的方法来供自定义同步组件使用,同步器既可以支持独占式地获取同步状态,也可以支持共享式地获取同步状态。
同步器是实现锁的关键,在锁的实现中聚合同步器,利用同步器实现锁的语义。锁是面向使用者的,他定义了使用者与锁交互的接口,隐藏了实现细节;同步器面向的是锁的实现者,它简化了锁的实现方式,屏蔽了同步状态管理,线程的排队,等待与唤醒等底层操作。
同步器的设计时基于模板方法模式的,使用者需要继承同步器并重写指定的方法,随后将同步器组合在自定义同步组件的实现中,并调用同步器提供的模板方法。而这些模板方法将会调用使用者重写的方法。
重写同步器的指定的方法时,需要使用同步器提供的如下3个方法来访问或修改同步状态:
getState():获取当前同步状态
setState(int newState):设置当前同步状态
compareAndSetState(int expect, int update):使用CAS设置当前状态,该方法能保证状态设置的原子性
同步器可重写的方法
方法名称 | 描述 |
protected boolean tryAcquire(int arg) | 独占式获取同步状态,实现该方法需要查询当前状态并判断同步状态是否符合预期,然后再进行CAS设置同步状态 |
protected boolean tryRelease(int arg) | 独占式释放同步状态,等待获取同步状态的线程将有机会获取同步状态 |
protected int tryAcquireShared(int arg) | 共享式获取同步状态,返回大于等于0的值,表示获取成功,反之,获取失败 |
protected boolean tryReleaseShared(int arg) | 共享式释放同步锁 |
protected boolean isHeldExclusively() | 当前同步器是否在独占模式下被线程占用,一般该方法表示是否被当前线程所独占 |
同步器提供的模板方法
方法名称 | 描述 |
void acquire | 独占式获取同步状态,若当前线程获取同步状态成功,则由该方法返回,否则将会进入同步队列等待,该方法将会调用重写的tryAcquire(int arg)方法 |
void acquireInterruptibly(int arg) | 与acquire(int arg)相同,但是该方法响应中断。当前线程为获取到同步状态而进入同步队列中,若当前线程被中断,则该方法会抛出InterruptedException并返回 |
boolean tryAcquireNanos(int arg, long nanos) | 在acquireInterruptibly(int arg)基础上增加了超时限制,若当前线程在超时时间内没有获取到同步状态,那么会返回false,若获取到了则返回true |
void acquireShared(int arg) | 共享式地获取同步状态,若当前线程未获取到同步状态,将会进入同步队列等待,与独占式获取的主要区别是在同一时刻可以有多个线程获取到同步状态 |
void acquireSharedInterrupibly(int arg) | 与acquireShared(int arg)一样 |
boolean tryAcquireSharedNanos(int args, long nanos) | 与acquireSharedInterruptibly(int arg)基础上增加了超时限制 |
boolean release(int arg) | 独占式地释放同步状态,该方法会在释放同步状态后,将同步队列中第一个节点包含的线程唤醒 |
boolean releaseShared(int arg) | 共享式的释放同步状态 |
Collection<Thread> getQueuedThreads() | 获取等待在同步队列上的线程集合 |
同步器提供的模板方法基本上可以分为3类:独占式获取与释放同步状态,共享式获取与释放同步状态和查询同步队列中的等待线程情况。
独占式同步状态获取与释放
独占所就是在同一时刻只能有一个线程获取到锁,而其他获取锁的线程只能处于同步队列中等待,只有获取了锁的线程释放了锁,后继的线程才能获取锁。
public class Mutex implements Lock{ private final Sync sync = new Sync(); private static class Sync extends AbstractQueuedSynchronizer{ protected boolean isHeldExclusively() { return getState() == 1; } public boolean tryAcquire(int acquires){ if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } protected boolean tryRelease(int release){ if (getState() == 0) { throw new IllegalMonitorStateException(); } setExclusiveOwnerThread(null); setState(0); return true; } Condition newCondition(){ return new ConditionObject(); } } @Override public void lock() { sync.acquire(1); } @Override public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); } @Override public boolean tryLock() { return sync.tryAcquire(1); } @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(time)); } @Override public void unlock() { sync.release(1); } @Override public Condition newCondition() { return sync.newCondition(); } }
独占锁Mutex是一个自定义同步组件,它在同一时刻只允许一个线程占有锁。Mutex中定义了一个静态内部类,该内部类集成了同步器并实现了独占式获取和释放同步状态。在tryAcquire(int acquires)方法中,若经过CAS设置成功(同步状态设置为1),则代表获取了同步状态,而在tryRelease(int releases)方法中只是将同步状态重置为0.
同步队列
同步器依赖内部的同步队列来完成同步状态的管理,当前线程获取同步状态失败时,同步器会将当前线程以及等待状态等信息构成一个节点并将其加入同步队列,同时会阻塞当前线程,当同步状态释放时,会把首节点中的线程唤醒,使其再次尝试获取同步状态。
同步队列中的节点用来保存获取同步状态失败的线程引用,等待状态以及前驱和后继节点。节点的属性类型与名称以及描述如下
属性类型和名称 | 描述 |
int waitStatus |
等待状态 包含如下状态 CANCELLED 值为1,由于在同步队列中的等待的线程等待超时或被中断,需要从同步队列中取消等待,节点进入该状态将不会变化。 SIGNAL 值为-1,后继节点的线程处于等待状态,而当前节点的线程若释放了同步状态或被取消,将会通知后继节点,使后继节点的线程得以运行 CONIDITION 值为-2,节点在等待队列中,节点线程等待在Condition上,当其他线程对Condition调用了signal()方法后,该节点将会从等待队列中转移到同步队列中,加入到对同步状态的获取中 PROPAGATE 值为-3,表示下一次共享式同步状态获取将会无条件地传播下去 INITIAL 值为0,初始状态 |
Node prev | 前驱节点,当节点加入同步队列时被设置 |
Node next | 后继节点 |
Node nextWaiter | 等待队列中的后继节点。若当前节点是共享的,那么这个字段是一个SHARED常量,也就是说节点类型和等待队列中的后继节点共用同一个字段 |
Thread thread | 获取同步状态的线程 |
节点是构成同步队列的基础,同步器拥有首节点和尾节点,没有成功获取同步状态的线程将会成为节点加入该队列的尾部。同步器提供了一基于CAS的设置尾节点的方法:compareAndSetTail(Node expect, Node update),他需要传递当前线程“认为”的尾节点和当前节点,只有设置成功后,当前节点才正式与之前的尾节点建立关联。首节点获取同步状态成功后,在释放同步状态时将会唤醒后继节点,而后继节点将会在获取同步状态成功时将自己设置为首节点。设置首节点是通过获取同步状态成功的线程来完成的,由于只有一个线程能够成功地获取到同步状态,因为设置头节点的方法并不需要使用CAS来保证,只需要将首节点设置为原首节点的后继节点并断开原首节点的next引用即可。
独占式同步状态获取与释放
通过调用同步器的acquire(int arg)方法可以获取同步状态,该方法对中断不敏感。即由于线程获取同步状态失败后进入同步队列中,后续对线程进行中断操作时,线程不会从同步队列中移除。
public final void acquire(int arg){ if(!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)){ selfInterrupt(); } }
先调用自定义同步器实现的tryAcquire(int arg)方法,该方法保证线程安全的获取同步状态,若同步状态获取失败,则构造同步节点(独占式Node.EXCLUSIVE,同一时刻只能有一个线程成功获取同步状态)并通过addWaiter(Node node)方法将该节点加入到同步队列的尾部。最后调用acquireQueued(Node node, int arg)方法,使得该节点以“死循环”的方式获取同步状态。若获取不到则阻塞节点中的线程,而被阻塞线程的唤醒主要依靠前驱节点的出队或阻塞线程被中断来实现。
private Node addWaiter(Node mode){ Node node = new Node(Thread.currentThread(), mode); Node pred = tail; if(pred != null){ node.prev = pred; if(compareAndSetTail(pred, node)){ pred.next = node; return node; } } enq(node); return node; } private Node enq(final Node node){ for(;;){ Node t = tail; if(t == null){ if(compareAndSetHead(new Node())){ tail = headl } }else{ node.prev = t; if(compareAndSetTail(t, node)){ t.next = node; return t; } } } }
通过compareAndSetTail(Node expect, Node update)方法来确保节点能够被线程安全添加。在enq(final Node node)方法中,同步器通过“死循环”来保证节点的正确添加,在“死循环”中只有通过CAS将节点设置成为尾节点之后,当前线程才能从该方法返回,否则当前线程不断地尝试设置。
节点进入同步队列之后,就进入了一个自旋的过程,每个节点都在自省的观察。当条件满足,获取到了同步状态,就可以从这个自旋过程中退出,否则依旧留在这个自旋过程中。
final boolean acquireQueued(final Node node, int arg){ boolean failed = true; try{ boolean interrupted = false; for(;;){ final Node p = node.predecessor(); if(p == head && tryAcquire(arg)){ setHead(node); p.next = null; failed = false; return interrupted; } if(shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()){ interrupted = true; } }finally{ if(failed){ cancelAcquire(node); } } } }
在acquireQueued(final Node node, int arg)方法中,当前线程在“死循环”中尝试获取同步状态。在非首节点线程前驱节点出队或被中断而从等待状态返回后,节点检查自己的前驱是否是头节点,若是则尝试获取同步状态。因此节点和节点之间在循环检查的过程中基本不互相通信,而是简单地判断自己的前驱是否为头节点,这样节点的释放符合FIFO。
独占式同步状态获取流程如下图
通过release(int arg)方法可以释放同步状态,该方法在释放同步状态后,会唤醒其后继节点。
public final boolean release(){ if(tryRelease(arg)){ Node h = head; if(h != null && h.waitStatus != 0){ unparkSuccessor(h); } return true; } return false; }
unparkSuccessor(Node node)方法使用LockSupport来唤醒处于等待状态的线程。
在获取同步状态时,同步维护一个同步队列,获取状态失败的线程都会被加入到队列中并在队列中进行自旋;移除队列的条件是前驱节点为头节点成功获取了同步状态。在释放同步状态时,同步器调用tryRelease(int arg)方法释放同步状态,然后唤醒头节点的后继节点。
共享式同步状态获取与释放
共享式获取与独占式获取最主要的区别在于同一时刻是否有多个线程同时获取到同步状态。
通过调用同步器的acquireShared(int arg)方法可以共享式地获取同步状态。
public final void acquireShared(int arg){ if(tryAcquireShared(arg) < 0){ doAcquireShared(arg); } } private void doAcquireShared(int arg){ final Node node = addWaiter(Node.SHARED); boolean failed = true; try{ boolean interrupted = false; for(;;){ final Node p = node.predecessor(); if(p == head){ int r = tryAcquireShared(arg); if(r >= 0){ setHeadAndPropagate(node, r); // difference from shared acquire p.next = null; if(interrupted){ selfInterrupt(); } failed = false; return ; } } if(shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally{ if(failed) cancelAcquire(node); } }
在acquireShared(int arg)方法中,同步器调用tryAcquireShared(int arg)方法尝试获取同步状态,tryAcquireShared(int arg)方法返回值为int类型。当返回值大于等于0时,表示能够获取到同步状态。在共享式获取的自旋过程中,成功获取到同步状态并退出自旋的条件就是tryAcquireShared(int arg)方法返回值大于等于0。
通过releaseShared(int arg)可以释放同步状态
public final boolean releaseShared(int arg){ if(tryReleaseShared(arg)){ doReleaseShared(); return true; } return false; }
该方法在释放同步状态之后,将会唤醒后续等待状态的节点。对于能够支持多个线程同时访问的并发组件,它和独占式主要区别在于tryReleaseShared(int arg)方法必须确保同步状态线程安全释放。一般是通过循环CAS来保证的。
独占式超时获取同步状态
通过调用同步器的doAcquireNanos(int arg, long nanosTimeout)方法可以超时获取同步状态,即在指定的时间段内获取同步状态,若获取到同步状态则返回true,否则返回false。同步器提供了acquireInterruptibly(int arg)方法,这个方法在等待获取同步状态时,若当前线程被中断,会立刻返回并抛出InterruptedException。doAcquireNanos(int arg, long nanosTimeout)犯法在支持响应中断地基础上,增加了超时获取的特性。
private boolean doAcquireNanos(int arg, long nanosTimeout) throw InterruptedException{ long lastTime = System.nanoTime(); final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try{ for(;;){ final Node p = node.predecessor(); if(p == head && tryAcquire(arg)){ setHead(node); p.next = null; failed = false; return true; } if(nanosTimeout <= 0){ return false; } if(shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold){ LockSupport.parkNanos(this, nanosTimeout); } long now = System.nanoTime(); nanosTimeout -= now - lastTime; lastTime = now; if(Thread.interrupted()){ throw new InterruptedException(); } } }finally{ if(failed) cancelAcquire(node); } }
当节点的前驱节点为头节点时尝试获取同步状态,若获取成功则从该方法返回,这个过程和独占式同步获取的过程类似。但若当前线程获取同步状态失败,则判断是否超时,若没有超时,重新计算超时间隔nanosTimeout,然后使当前线程等待nanosTimeout纳秒。若nanosTimeout小于等于spinForTimeoutThreshold()时,将会使该线程进行超时等待,而是进入快速地自旋过程。因为非常短的超时等待无法做到十分准确,因此同步器会进入无条件地快速自旋。
独占式超时获取同步的流程图如下:
独占式超时获取同步状态doAcquireNanos(int arg, long nanosTimeout)和独占式获取同步状态acquire(int args)在流程上非常相似,主要区别在于acquire(int args)在未获取到同步状态时,将会使当前线程一直处于等待状态,而doAcquireNanos(int arg, long nanosTimeout)会使当前线程等待nanosTimeout纳秒,若当前线程在nanosTimeout那秒内没有获取到同步状态,将会从等待逻辑中自动返回。
自定义同步组件——TwinsLock
该工具在同一时刻只允许至多两个线程同时访问(共享式),超过两个线程的访问将被阻塞(同步资源数为2)。
public class TwinsLock implements Lock{ private final Sync sync = new Sync(2); public static final class Sync extends AbstractQueuedSynchronizer{ public Sync(int count) { if(count <= 0){ throw new IllegalArgumentException("count must large than one."); } setState(count); } public int tryAcquireShared(int reduceCount){ for(;;){ int current = getState(); int newCount = current - reduceCount; if (newCount < 0 || compareAndSetState(current, newCount)) { return newCount; } } } public boolean tryRelease(int returnCount){ for(;;){ int current = getState(); int newCount = current + returnCount; if(compareAndSetState(current, newCount)){ return true; } } } } @Override public void lock() { sync.acquireShared(1); } @Override public void unlock() { sync.releaseShared(1); } }
重入锁(ReentrantLock)
重入锁就是支持重进入的锁,它表示该锁能够支持一个线程对资源的重复加锁。除此之外,该锁还支持获取锁时的公平和非公平性选择。在绝对时间上,先对锁进行获取的请求一定先被满足,那么这个锁是公平的,反之则是不公平的。公平锁的获取就是等待时间最长的线程最优先获取锁,即锁获取是顺序的。
重进入是指任意线程在获取到锁之后能够再次获取该锁而不会被锁阻塞,该特性需解决以下两个问题:
1) 线程再次获取锁。锁需要去识别获取锁的线程是否为当前占据锁的线程。若是,则再次成功获取
2) 锁的最终释放。线程重复n次获取了锁,随后在第n次释放该锁后,其他线程能够获取到该锁。锁的最终释放要求锁对于获取进行计数自增,计数表示当前锁被重复获取的次数,而锁被释放时,计数自减,当技计数等于0时表示锁已经成功释放
非公平锁的获取
final boolean nonfairTryAcquire(int acquires){ final Thread current = Thread.currentThread(); int c = getState(); if(c == 0){ if(compareAndSetState(0, acquires)){ setExclusiveOwnerThread(current); return true; } }else if(current == getExclusiveOwnerThread()){ int nextc = c + acquires; if(nextc < 0){ throw new Error("Maximum lock count exceeded"); } setState(nextc); return true; } return false; }
protected final boolean tryRelease(int releases){ int c = getState() - releases; if(Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if(c == 0){ free = true; setExclusiveOwnerThread(null); } setState(c); return free; }
公平锁的获取
protected final boolean tryAcquire(int acquires){ final Thread current = Thread.currentThread(); int c = getState(); if(c == 0){ if(!hasQueuedPredecessors() && compareAndSetState(0, acquires)){ setExclusiveOwnerThread(current); return true; } }else if(current == getExclusiveOwnerThread()){ int nextc = c+ acquires; if(nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
在测试中公平性锁与非公平性锁相比,总耗时是其94.3倍,总切换次数是其133倍。公平性锁按照FIFO的代价是进行大量的线程切换。费公平性锁虽然可能会造成线程“饥饿”,但通过减少线程切换,保证了更大的吞吐量。
读写锁
读写锁在同一时刻可以允许多个读线程访问,但是在写线程访问时,所有的读线程和其他写线程均被阻塞。读写锁维护了一对锁,一个读锁和一个写锁,通过分离读锁和写锁使得并发性比一般的排它锁有了很大提升,
ReentrantReadWriteLock
ReentrantReadWriteLock的特性
特性 | 说明 |
公平性选择 | 支持非公平和公平的锁获取方式,吞吐量还是非公平优于公平 |
重进入 | 该锁支持重进入,以读写线程为例:读线程在获取了读锁之后,能够再次获取读锁。而写线程在获取了写锁之后能够再次获取写锁,同事也可以获取读锁 |
锁降级 | 遵循获取写锁,获取读锁再释放写锁的次序,写锁能够降级为读锁 |
public class Cache{ static Map<String, Object> map = new HashMap<String, Object>(); static ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); static Lock r = rwl.readLock(); static Lock w = rwl.writeLock(); public static final Object get(String key){ r.lock(); try{ return map.get(key); }finally{ r.unlock(); } } public static final Object put(String key, Object value){ w.lock(); try{ return map.put(key, value); }finally{ w.unlock(); } } public static final void clear(){ w.lock(); try{ map.clear(); }finally{ w.unlock(); } } }
ReentrantReadWriteLock内部方法
方法名称 | 描述 |
int getReadLockCount() | 返回单钱读锁被获取的次数。该次数不等于获取读锁的线程数。 |
int getReadHoldCount() | 返回当前线程获取读锁的次数。使用ThreadLocal保存当前线程获取的次数 |
boolean isWriteLocked() | 判断写锁是否被获取 |
int getWriteHoldCount() | 返回当前写锁被获取的次数 |
读写锁依赖自定义同步器来实现同步功能,而读写状态就是同步器的同步状态。读写锁的自定义同步器需要在同步状态(一个int)上维护多个读线程和一个写线程的状态。因此需要“按位切割使用”这个变量。读写锁将变量切分成两部分,高16位表示读,低16位表示写。通过位运算来确定读写各自的状态。假设当前同步状态值为S,写状态等于S&0x0000FFFF,读状态等于S>>>16(无符号补0右移16位)。当写状态增加1时,等于S+1,当度状态增加1时,等于S+(1<<16),即S+0x00010000。因此S不等于0时,当写状态等于0,读状态大于0,此时读锁已被获取。
写锁的获取与释放
写锁是一个支持重进入的排它锁。若当前线程已经获取了写锁,则增加写状态。若当前线程在获取写锁时,读锁已被获取或该线程不是已经获取写锁的线程,则当前线程进入等待状态。
protected final boolean tryAcquire(int acquires){ Thread current = Thread.currentThread(); int c = getState(); int w = exclusiveCount(c); if(c != 0){ if( w ==0 || current != getExclusiveOwnweThread()) return false; if(w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maxium lock count exceeded"); setState(c + acquires); return true; } if(writerShouldBlock() || !compareAndSetState(c, c+acquires)) return false; setExclusiveOwnerThread(current); return true; }
该方法除了重入条件外还增加了一个读锁是否存在的判断。若存在读锁,则写锁不能被获取。原因在于读写锁要确保写锁的操作对读锁可见,若允许读锁再已被获取的情况下对写锁的获取,那么其他正在运行的其他线程就无法感知当前线程的操作。因此只有等待其他度线程都释放了读锁,写锁才能被当前线程获取,而写锁一旦被获取,则其他读写线程的后续访问均被阻塞。
写锁的每次释放均减少写状态,当写状态为0时表示写锁已被释放,从而等待的读写线程能够继续访问读写锁,同时前次写线程的修改对后续读写线程可见。
读锁的获取与释放
读锁是一个支持重进入的共享锁,它能够被多个线程同时获取。若当前线程已经成功获取了读锁,则增加读状态。若当前线程在获取读锁时,写锁已被其他线程获取,则进入等待状态。读状态是所有线程获取读锁次数的总和,而每个线程各自获取读锁的次数只能保存在ThreadLocal中,由自身线程维护。
protected final int tryAcquireShared(int unused){ for(;;){ int c = getState(); int nextc = c + (1 << 16); if(nextc < c){ throw new Error("Maximum lock count exceeded"); } if(exclusiveCount(c) != 0 && owner != Thread.currentThread()) return -1; if(compareAndSetState(c, nextc)) return 1; } }
读锁的每次释放均减少读状态,减少的值是1<<16
锁降级
锁降级是指写锁降级成读锁。若当前线程拥有写锁,然后将其释放,最后在获取读锁,这种分段完成的过程不能称之为锁降级。锁降级是指把持住写锁,再获取到读锁,随后释放写锁的过程。
public void processData(){ readLock.lock(); if(!update){ readLock.unlock(); writeLock.lock(); try{ if(!update){ update = true; } readLock.lock(); }finally{ writeLock.unlock(); } } try{ // deal with data }finally{ readLock.unlock(); } }
若当前线程不获取读锁而是直接释放写锁,假设此时另一个线程获取了写锁并修改了数据,那么当前线程无法感知线程T的数据更新,若当前线程获取读锁,即遵循锁降级的步骤,则线程T将会被阻塞,知道当前线程使用数据并释放读锁之后,线程T才能获取写锁进行数据更新。
ReentrantReadWriteLock不支持锁升级。若读锁已被多个线程获取,其中任意线程成功获取了写锁并更新了数据,则其更新对其他获取到读锁的线程是不可见的。
LockSupport
LockSupport定义了一组公共静态方法,这些方法提供了最基本的线程阻塞和唤醒功能。LockSupport定义了一组以park开头的方法用来阻塞当前线程,以及unpark(Thread thread)方法来唤醒一个被阻塞的线程。
方法名称 | 描述 |
void park() | 阻塞当前线程,若调用unpark(Thread thread)方法或当前线程被中断,才能从park()方法返回 |
void parkNanos(long nanos) | 阻塞当前线程,最长不超过nanos纳秒。返回条件在park()的基础上增加了超时返回 |
void parkUntil(long deadline) | 阻塞当前线程,知道deadline时间 |
void unpark(Thread thread) | 唤醒处于阻塞状态的线程thread |
Java 6,LockSupport增加了park(Object blocker),parkNanos(Object blocker, long nanos)和parkUntil(Object blocker, long deadline)3个方法,用于实现阻塞当前线程的功能,其中blocker是用来标识当前线程在等待的对象,该对象主要用于问题排查和系统监控。
Condition接口
任意一个Java对象,都拥有一组监视器方法,主要包括wait(),wait(long timeout),notify()以及notifyAll()方法,这些方法与synchronized同步关键字配合,可以实现等待/通知模式。
对比项 | Object Monitor Methods | Condition |
前置条件 | 获取对象的锁 |
调用Lock.lock()获取锁 调用Lock.newCondition()获取Condition对象 |
调用方式 | 直接调用 | 直接调用 |
等待队列个数 | 一个 | 多个 |
当前线程释放锁并进入等待状态 | 支持 | 支持 |
当前线程释放锁并进入等待状态,在等待状态中不响应中断 | 不支持 | 支持 |
当前线程释放锁并进入超时等待状态 | 支持 | 支持 |
当前线程释放锁并进入等待状态到某个时间 | 不支持 | 支持 |
唤醒等待队列中的一个线程 | 支持 | 支持 |
唤醒等待队列中的全部线程 | 支持 | 支持 |
Condition
Condition定义了等待/通知两种类型的方法,当前线程调用这些方法时,需要提前获取到Condition对象关联的锁。Condition对象是由Lock对象创建出来的。
Lock lock = new ReentrantLock(); Condition condition = lock.newCondition(); public void conditionWait() throws InterruptedException{ lock.lock(); try{ condition.await(); }finally{ lock.unlock(); } } public void conditionSignal() throws InterruptedException{ lock.lock(); try{ condition.signal(); }finally{ lock.unlock(); } }
Condition部分方法及描述
方法名称 | 描述 |
void await() throws InterruptedException |
当前线程进入等待状态直到被通知或中断,当前线程将进入运行状态且从await()方法返回的情况,包括: 其他线程调用该Condition的signal()或signalAll()方法,而当前线程被选中唤醒 其他线程中断当前线程 若当前等待线程从await()方法返回,那么表明该线程已经获取了Condition对象锁对应的锁 |
void awaitUninterruptibly() | 当前线程进入等待状态直到被通知,从方法名称上可以看出该方法对中断不敏感 |
long awaitNanos(long nanosTimeout) throws InterruptedException | 当前线程进入等待状态直到被通知,返回值表示剩余的时间,若在nanosTimeout纳秒之前被唤醒,那么返回值就是(nanosTimeout - 实际耗时)。若返回值是0或负数,则可以认定已经超时 |
boolean awaitUntil(Date deadline) throws InterruptedException | 当前线程进入等待状态直到被通知,中断或到某个时间。若没有到指定时间就被通知,方法返回true,否则表示到了指定时间,方法返回false |
void signal() | 唤醒一个等待在Condition上的线程,该线程从等待方法返回前必须获得与Condition相关联的锁 |
void signalAll() | 唤醒所有等待在Condition上的线程,能够从等待方法返回的线程必须获得与Condition相关联的锁 |
有界队列:当队列为空时,队列的获取操作将阻塞获取线程,直到队列中有新增元素。当队列已满时,队列的插入操作将会阻塞插入线程,知道队列队列出现“空位”。
public class BoundQueue<T> { private Object[] items; private int addIndex, removeIndex, count; private Lock lock = new ReentrantLock(); private Condition notEmpty = lock.newCondition(); private Condition notFull = lock.newCondition(); public BoundQueue(int size) { items = new Object[size]; } public void add(T t) throws InterruptedException{ lock.lock(); try { while (count == items.length) { notFull.await(); } items[addIndex] = t; if (++addIndex == items.length) { addIndex = 0; } ++count; notEmpty.signal(); } finally { lock.unlock(); } } @SuppressWarnings("unchecked") public T remove() throws InterruptedException{ lock.lock(); try { while(count == 0) notEmpty.await(); Object x = items[removeIndex]; if(++removeIndex == items.length) removeIndex = 0; --count; notFull.signal(); return (T) x; } finally { lock.unlock(); } } }
先获取锁确保数组修改的可见性和排他性。当数组数量等于数组长度时,表示数组已满,则调用notFull.await(),当前线程随之释放锁并进入等待状态。若数组数量不等于数组长度,表示数组未满,则添加元素到数组中,同时通知等待在notEmpty上的线程,数组中已经有新元素可以获取。
ConditionObject是同步器AbstractQueuedSynchronizer的内部类。每个Condition对象都包含一个队列,该队列是Condition对象实现等待/通知功能的关键。
等待队列
等待队列是一个FIFO的队列,在队列中每个节点都包含了一个线程引用,该线程就是在Condition对象上等待的线程,若一个线程调用了Condition.await()方法。那么该线程将会释放锁,构造成节点加入等待队列并进入等待状态。节点的定义复用了同步器中的节点的定义。即同步队列和等待队列中节点的类型都是同步器的静态内部类AbstractQueuedSynchronizer.Node。
一个Condition包含一个等待队列,Condition拥有首节点(firstWaiter)和尾节点(lastWaiter)。当前线程调用Condition.await()方法,将会以当前线程构造节点,并将节点从尾部加入等待队列。
等待
调动Condition的await()方法会使线程进入等待队列并释放锁,同时线程状态变为等待状态。当从await()方法返回时,当前线程一定获取了Condition相关联的锁。
public final void await() throws InterruptedException{ if(Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int saveState = fullyRelease(node); int interruptMode = 0; while(!isOnSyncQueue(node)){ LockSupport.park(this); if((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if(acquireQueued(node, saveState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if(node.nextWaiter != null) unlinkCancelledWaiters(); if(interruptMode != 0) reportInterruptAfterWait(interruptMode); }
通知
调用Condition的signal()方法,将会唤醒在等待队列中等待时间最长的节点,在唤醒节点之前,会将节点移到同步队列中。
public final void signal(){ if(!isHeldExclusively()) //检查当前线程是否获取了锁 throw new IllegalMonitorStateException(); Node first = firstWaiter; if(first != null) doSignal(first); }
标签:常量 return 自己的 同步器 shel rri max 数据 并发包
原文地址:http://www.cnblogs.com/forerver-elf/p/7603336.html