码迷,mamicode.com
首页 > 编程语言 > 详细

非阻塞同步算法与CAS(Compare and Swap)无锁算法

时间:2018-01-14 14:28:10      阅读:279      评论:0      收藏:0      [点我收藏+]

标签:poi   includes   wiki   led   extent   remove   oat   xpl   click   

锁(lock)的代价

锁是用来做并发最简单的方式,当然其代价也是最高的。内核态的锁的时候需要操作系统进行一次上下文切换,加锁、释放锁会导致比较多的上下文切换和调度延时,等待锁的线程会被挂起直至锁释放。在上下文切换的时候,cpu之前缓存的指令和数据都将失效,对性能有很大的损失。操作系统对多线程的锁进行判断就像两姐妹在为一个玩具在争吵,然后操作系统就是能决定他们谁能拿到玩具的父母,这是很慢的。用户态的锁虽然避免了这些问题,但是其实它们只是在没有真实的竞争时才有效。

Java在JDK1.5之前都是靠synchronized关键字保证同步的,这种通过使用一致的锁定协议来协调对共享状态的访问,可以确保无论哪个线程持有守护变量的锁,都采用独占的方式来访问这些变量,如果出现多个线程同时访问锁,那第一些线线程将被挂起,当线程恢复执行时,必须等待其它线程执行完他们的时间片以后才能被调度执行,在挂起和恢复执行过程中存在着很大的开销。锁还存在着其它一些缺点,当一个线程正在等待锁时,它不能做任何事。如果一个线程在持有锁的情况下被延迟执行,那么所有需要这个锁的线程都无法执行下去。如果被阻塞的线程优先级高,而持有锁的线程优先级低,将会导致优先级反转(Priority Inversion)。

乐观锁与悲观锁

独占锁是一种悲观锁,synchronized就是一种独占锁,它假设最坏的情况,并且只有在确保其它线程不会造成干扰的情况下执行,会导致其它所有需要锁的线程挂起,等待持有锁的线程释放锁。而另一个更加有效的锁就是乐观锁。所谓乐观锁就是,每次不加锁而是假设没有冲突而去完成某项操作,如果因为冲突失败就重试,直到成功为止。

volatile的问题

与锁相比,volatile变量是一和更轻量级的同步机制,因为在使用这些变量时不会发生上下文切换和线程调度等操作,但是volatile变量也存在一些局限:不能用于构建原子的复合操作,因此当一个变量依赖旧值时就不能使用volatile变量。(参考:谈谈volatiile

volatile只能保证变量对各个线程的可见性,但不能保证原子性。为什么?见我的另外一篇文章:

Java中的原子操作( atomic operations)

原子操作指的是在一步之内就完成而且不能被中断。原子操作在多线程环境中是线程安全的,无需考虑同步的问题。在java中,下列操作是原子操作:

  • all assignments of primitive types except for long and double
  • all assignments of references
  • all operations of java.concurrent.Atomic* classes
  • all assignments to volatile longs and doubles

问题来了,为什么long型赋值不是原子操作呢?例如:

long foo = 65465498L;

实时上java会分两步写入这个long变量,先写32位,再写后32位。这样就线程不安全了。如果改成下面的就线程安全了:

private volatile long foo;

因为volatile内部已经做了synchronized.

CAS无锁算法

要实现无锁(lock-free)的非阻塞算法有多种实现方法,其中CAS(比较与交换,Compare and swap)是一种有名的无锁算法。CAS, CPU指令,在大多数处理器架构,包括IA32、Space中采用的都是CAS指令,CAS的语义是“我认为V的值应该为A,如果是,那么将V的值更新为B,否则不修改并告诉V的值实际为多少”,CAS是项乐观锁技术,当多个线程尝试使用CAS同时更新同一个变量时,只有其中一个线程能更新变量的值,而其它线程都失败,失败的线程并不会被挂起,而是被告知这次竞争中失败,并可以再次尝试。CAS有3个操作数,内存值V,旧的预期值A,要修改的新值B。当且仅当预期值A和内存值V相同时,将内存值V修改为B,否则什么都不做。CAS无锁算法的C实现如下:

int compare_and_swap (int* reg, int oldval, int newval) 
{
  ATOMIC();
  int old_reg_val = *reg;
  if (old_reg_val == oldval) 
     *reg = newval;
  END_ATOMIC();
  return old_reg_val;
}

CAS(乐观锁算法)的基本假设前提

CAS比较与交换的伪代码可以表示为:

do{   
       备份旧数据;  
       基于旧数据构造新数据;  
}while(!CAS( 内存地址,备份的旧数据,新数据 ))  

技术分享图片

(上图的解释:CPU去更新一个值,但如果想改的值不再是原来的值,操作就失败,因为很明显,有其它操作先改变了这个值。)

就是指当两者进行比较时,如果相等,则证明共享数据没有被修改,替换成新值,然后继续往下运行;如果不相等,说明共享数据已经被修改,放弃已经所做的操作,然后重新执行刚才的操作。容易看出 CAS 操作是基于共享数据不会被修改的假设,采用了类似于数据库的 commit-retry 的模式。当同步冲突出现的机会很少时,这种假设能带来较大的性能提升。

CAS的开销(CPU Cache Miss problem)

前面说过了,CAS(比较并交换)是CPU指令级的操作,只有一步原子操作,所以非常快。而且CAS避免了请求操作系统来裁定锁的问题,不用麻烦操作系统,直接在CPU内部就搞定了。但CAS就没有开销了吗?不!有cache miss的情况。这个问题比较复杂,首先需要了解CPU的硬件体系结构:

技术分享图片

  • CPU0 检查本地高速缓存,没有找到缓存线。
  • 请求被转发到 CPU0 和 CPU1 的互联模块,检查 CPU1 的本地高速缓存,没有找到缓存线。
  • 请求被转发到系统互联模块,检查其他三个管芯,得知缓存线被 CPU6和 CPU7 所在的管芯持有。
  • 请求被转发到 CPU6 和 CPU7 的互联模块,检查这两个 CPU 的高速缓存,在 CPU7 的高速缓存中找到缓存线。
  • CPU7 将缓存线发送给所属的互联模块,并且刷新自己高速缓存中的缓存线。
  • CPU6 和 CPU7 的互联模块将缓存线发送给系统互联模块。
  • 系统互联模块将缓存线发送给 CPU0 和 CPU1 的互联模块。
  • CPU0 和 CPU1 的互联模块将缓存线发送给 CPU0 的高速缓存。
  • CPU0 现在可以对高速缓存中的变量执行 CAS 操作了

以上是刷新不同CPU缓存的开销。最好情况下的 CAS 操作消耗大概 40 纳秒,超过 60 个时钟周期。这里的“最好情况”是指对某一个变量执行 CAS 操作的 CPU 正好是最后一个操作该变量的CPU,所以对应的缓存线已经在 CPU 的高速缓存中了,类似地,最好情况下的锁操作(一个“round trip 对”包括获取锁和随后的释放锁)消耗超过 60 纳秒,超过 100 个时钟周期。这里的“最好情况”意味着用于表示锁的数据结构已经在获取和释放锁的 CPU 所属的高速缓存中了。锁操作比 CAS 操作更加耗时,是因深入理解并行编程 
为锁操作的数据结构中需要两个原子操作。缓存未命中消耗大概 140 纳秒,超过 200 个时钟周期。需要在存储新值时查询变量的旧值的 CAS 操作,消耗大概 300 纳秒,超过 500 个时钟周期。想想这个,在执行一次 CAS 操作的时间里,CPU 可以执行 500 条普通指令。这表明了细粒度锁的局限性。

技术分享图片

M对CAS的支持:AtomicInt, AtomicLong.incrementAndGet()

在JDK1.5之前,如果不编写明确的代码就无法执行CAS操作,在JDK1.5中引入了底层的支持,在int、long和对象的引用等类型上都公开了CAS的操作,并且JVM把它们编译为底层硬件提供的最有效的方法,在运行CAS的平台上,运行时把它们编译为相应的机器指令,如果处理器/CPU不支持CAS指令,那么JVM将使用自旋锁。因此,值得注意的是,CAS解决方案与平台/编译器紧密相关(比如x86架构下其对应的汇编指令是lock cmpxchg,如果想要64Bit的交换,则应使用lock cmpxchg8b。在.NET中我们可以使用Interlocked.CompareExchange函数)

在原子类变量中,如java.util.concurrent.atomic中的AtomicXXX,都使用了这些底层的JVM支持为数字类型的引用类型提供一种高效的CAS操作,而在java.util.concurrent中的大多数类在实现时都直接或间接的使用了这些原子变量类。

Java 1.6中AtomicLong.incrementAndGet()的实现源码为:

技术分享图片
1: /*
   2:  * Written by Doug Lea with assistance from members of JCP JSR-166
   3:  * Expert Group and released to the public domain, as explained at
   4:  * http://creativecommons.org/licenses/publicdomain
   5:  */
   6: 
   7: package java.util.concurrent.atomic;
   8: import sun.misc.Unsafe;
   9: 
  10: /**
  11:  * A <tt>long</tt> value that may be updated atomically.  See the
  12:  * {@link java.util.concurrent.atomic} package specification for
  13:  * description of the properties of atomic variables. An
  14:  * <tt>AtomicLong</tt> is used in applications such as atomically
  15:  * incremented sequence numbers, and cannot be used as a replacement
  16:  * for a {@link java.lang.Long}. However, this class does extend
  17:  * <tt>Number</tt> to allow uniform access by tools and utilities that
  18:  * deal with numerically-based classes.
  19:  *
  20:  * @since 1.5
  21:  * @author Doug Lea
  22:  */
  23: public class AtomicLong extends Number implements java.io.Serializable {
  24:     private static final long serialVersionUID = 1927816293512124184L;
  25: 
  26:     // setup to use Unsafe.compareAndSwapLong for updates
  27:     private static final Unsafe unsafe = Unsafe.getUnsafe();
  28:     private static final long valueOffset;
  29: 
  30:     /**
  31:      * Records whether the underlying JVM supports lockless
  32:      * CompareAndSet for longs. While the unsafe.CompareAndSetLong
  33:      * method works in either case, some constructions should be
  34:      * handled at Java level to avoid locking user-visible locks.
  35:      */
  36:     static final boolean VM_SUPPORTS_LONG_CAS = VMSupportsCS8();
  37: 
  38:     /**
  39:      * Returns whether underlying JVM supports lockless CompareAndSet
  40:      * for longs. Called only once and cached in VM_SUPPORTS_LONG_CAS.
  41:      */
  42:     private static native boolean VMSupportsCS8();
  43: 
  44:     static {
  45:       try {
  46:         valueOffset = unsafe.objectFieldOffset
  47:             (AtomicLong.class.getDeclaredField("value"));
  48:       } catch (Exception ex) { throw new Error(ex); }
  49:     }
  50: 
  51:     private volatile long value;
  52: 
  53:     /**
  54:      * Creates a new AtomicLong with the given initial value.
  55:      *
  56:      * @param initialValue the initial value
  57:      */
  58:     public AtomicLong(long initialValue) {
  59:         value = initialValue;
  60:     }
  61: 
  62:     /**
  63:      * Creates a new AtomicLong with initial value <tt>0</tt>.
  64:      */
  65:     public AtomicLong() {
  66:     }
  67: 
  68:     /**
  69:      * Gets the current value.
  70:      *
  71:      * @return the current value
  72:      */
  73:     public final long get() {
  74:         return value;
  75:     }
  76: 
  77:     /**
  78:      * Sets to the given value.
  79:      *
  80:      * @param newValue the new value
  81:      */
  82:     public final void set(long newValue) {
  83:         value = newValue;
  84:     }
  85: 
  86:     /**
  87:      * Eventually sets to the given value.
  88:      *
  89:      * @param newValue the new value
  90:      * @since 1.6
  91:      */
  92:     public final void lazySet(long newValue) {
  93:         unsafe.putOrderedLong(this, valueOffset, newValue);
  94:     }
  95: 
  96:     /**
  97:      * Atomically sets to the given value and returns the old value.
  98:      *
  99:      * @param newValue the new value
 100:      * @return the previous value
 101:      */
 102:     public final long getAndSet(long newValue) {
 103:         while (true) {
 104:             long current = get();
 105:             if (compareAndSet(current, newValue))
 106:                 return current;
 107:         }
 108:     }
 109: 
 110:     /**
 111:      * Atomically sets the value to the given updated value
 112:      * if the current value <tt>==</tt> the expected value.
 113:      *
 114:      * @param expect the expected value
 115:      * @param update the new value
 116:      * @return true if successful. False return indicates that
 117:      * the actual value was not equal to the expected value.
 118:      */
 119:     public final boolean compareAndSet(long expect, long update) {
 120:     return unsafe.compareAndSwapLong(this, valueOffset, expect, update);
 121:     }
 122: 
 123:     /**
 124:      * Atomically sets the value to the given updated value
 125:      * if the current value <tt>==</tt> the expected value.
 126:      * May fail spuriously and does not provide ordering guarantees,
 127:      * so is only rarely an appropriate alternative to <tt>compareAndSet</tt>.
 128:      *
 129:      * @param expect the expected value
 130:      * @param update the new value
 131:      * @return true if successful.
 132:      */
 133:     public final boolean weakCompareAndSet(long expect, long update) {
 134:     return unsafe.compareAndSwapLong(this, valueOffset, expect, update);
 135:     }
 136: 
 137:     /**
 138:      * Atomically increments by one the current value.
 139:      *
 140:      * @return the previous value
 141:      */
 142:     public final long getAndIncrement() {
 143:         while (true) {
 144:             long current = get();
 145:             long next = current + 1;
 146:             if (compareAndSet(current, next))
 147:                 return current;
 148:         }
 149:     }
 150: 
 151:     /**
 152:      * Atomically decrements by one the current value.
 153:      *
 154:      * @return the previous value
 155:      */
 156:     public final long getAndDecrement() {
 157:         while (true) {
 158:             long current = get();
 159:             long next = current - 1;
 160:             if (compareAndSet(current, next))
 161:                 return current;
 162:         }
 163:     }
 164: 
 165:     /**
 166:      * Atomically adds the given value to the current value.
 167:      *
 168:      * @param delta the value to add
 169:      * @return the previous value
 170:      */
 171:     public final long getAndAdd(long delta) {
 172:         while (true) {
 173:             long current = get();
 174:             long next = current + delta;
 175:             if (compareAndSet(current, next))
 176:                 return current;
 177:         }
 178:     }
 179: 
 180:     /**
 181:      * Atomically increments by one the current value.
 182:      *
 183:      * @return the updated value
 184:      */
 185:     public final long incrementAndGet() {
 186:         for (;;) {
 187:             long current = get();
 188:             long next = current + 1;
 189:             if (compareAndSet(current, next))
 190:                 return next;
 191:         }
 192:     }
 193: 
 194:     /**
 195:      * Atomically decrements by one the current value.
 196:      *
 197:      * @return the updated value
 198:      */
 199:     public final long decrementAndGet() {
 200:         for (;;) {
 201:             long current = get();
 202:             long next = current - 1;
 203:             if (compareAndSet(current, next))
 204:                 return next;
 205:         }
 206:     }
 207: 
 208:     /**
 209:      * Atomically adds the given value to the current value.
 210:      *
 211:      * @param delta the value to add
 212:      * @return the updated value
 213:      */
 214:     public final long addAndGet(long delta) {
 215:         for (;;) {
 216:             long current = get();
 217:             long next = current + delta;
 218:             if (compareAndSet(current, next))
 219:                 return next;
 220:         }
 221:     }
 222: 
 223:     /**
 224:      * Returns the String representation of the current value.
 225:      * @return the String representation of the current value.
 226:      */
 227:     public String toString() {
 228:         return Long.toString(get());
 229:     }
 230: 
 231: 
 232:     public int intValue() {
 233:     return (int)get();
 234:     }
 235: 
 236:     public long longValue() {
 237:     return (long)get();
 238:     }
 239: 
 240:     public float floatValue() {
 241:     return (float)get();
 242:     }
 243: 
 244:     public double doubleValue() {
 245:     return (double)get();
 246:     }
 247: 
 248: }
View Code

由此可见,AtomicLong.incrementAndGet的实现用了乐观锁技术,调用了sun.misc.Unsafe类库里面的 CAS算法,用CPU指令来实现无锁自增。所以,AtomicLong.incrementAndGet的自增比用synchronized的锁效率倍增。

public final int getAndIncrement() {  
        for (;;) {  
            int current = get();  
            int next = current + 1;  
            if (compareAndSet(current, next))  
                return current;  
        }  
}  
   
public final boolean compareAndSet(int expect, int update) {  
    return unsafe.compareAndSwapInt(this, valueOffset, expect, update);  
}

下面是测试代码:可以看到用AtomicLong.incrementAndGet的性能比用synchronized高出几倍。

技术分享图片

package console;
 
import java.util.concurrent.atomic.AtomicLong;
 
public class main {
 
    /**
     * @param args
     */
    public static void main(String[] args) {
 
        System.out.println("START -- ");
        calc();
        calcSynchro();
        calcAtomic();
         
        testThreadsSync();
        testThreadsAtomic();
         
        testThreadsSync2();
        testThreadsAtomic2();
         
        System.out.println("-- FINISHED ");
    }
 
    private static void calc() {
        stopwatch sw = new stopwatch();
        sw.start();
 
        long val = 0;
        while (val < 10000000L) {
            val++;
        }
        sw.stop();
        long milSecds = sw.getElapsedTime();
 
        System.out.println(" calc() elapsed (ms): " + milSecds);
    }
 
    private static void calcSynchro() {
        stopwatch sw = new stopwatch();
        sw.start();
 
        long val = 0;
 
        while (val < 10000000L) {
            synchronized (main.class) {
                val++;
            }
        }
 
        sw.stop();
        long milSecds = sw.getElapsedTime();
 
        System.out.println(" calcSynchro() elapsed (ms): " + milSecds);
    }
 
    private static void calcAtomic() {
        stopwatch sw = new stopwatch();
        sw.start();
 
        AtomicLong val = new AtomicLong(0);
        while (val.incrementAndGet() < 10000000L) {
 
        }
        sw.stop();
        long milSecds = sw.getElapsedTime();
 
        System.out.println(" calcAtomic() elapsed (ms): " + milSecds);
 
    }
     
 
    private static void testThreadsSync(){
         
        stopwatch sw = new stopwatch();
        sw.start();
         
        Thread t1 = new Thread(new LoopSync());
        t1.start();
         
        Thread t2 = new Thread(new LoopSync());
        t2.start();
                 
        while (t1.isAlive() || t2.isAlive()) {
             
        }
                 
        sw.stop();
        long milSecds = sw.getElapsedTime();
 
        System.out.println(" testThreadsSync() 1 thread elapsed (ms): " + milSecds);
         
    }
     
    private static void testThreadsAtomic(){
     
        stopwatch sw = new stopwatch();
        sw.start();
         
        Thread t1 = new Thread(new LoopAtomic());
        t1.start();
         
        Thread t2 = new Thread(new LoopAtomic());
        t2.start();
                 
        while (t1.isAlive() || t2.isAlive()) {
             
        }
                 
        sw.stop();
        long milSecds = sw.getElapsedTime();
 
        System.out.println(" testThreadsAtomic() 1 thread elapsed (ms): " + milSecds);
         
    }
     
    private static void testThreadsSync2(){
         
        stopwatch sw = new stopwatch();
        sw.start();
         
        Thread t1 = new Thread(new LoopSync());
        t1.start();
         
        Thread t2 = new Thread(new LoopSync());
        t2.start();
                 
        while (t1.isAlive() || t2.isAlive()) {
             
        }
                 
        sw.stop();
        long milSecds = sw.getElapsedTime();
 
        System.out.println(" testThreadsSync() 2 threads elapsed (ms): " + milSecds);
         
    }
     
    private static void testThreadsAtomic2(){
     
        stopwatch sw = new stopwatch();
        sw.start();
         
        Thread t1 = new Thread(new LoopAtomic());
        t1.start();
         
        Thread t2 = new Thread(new LoopAtomic());
        t2.start();
                 
        while (t1.isAlive() || t2.isAlive()) {
             
        }
                 
        sw.stop();
        long milSecds = sw.getElapsedTime();
 
        System.out.println(" testThreadsAtomic() 2 threads elapsed (ms): " + milSecds);
         
    }
     
    private static class LoopAtomic implements Runnable {
        public void run() {
            AtomicLong val = new AtomicLong(0);
            while (val.incrementAndGet() < 10000000L) {
 
            }
        }
    }
    private static class LoopSync implements Runnable {
        public void run() {
            long val = 0;
 
            while (val < 10000000L) {
                synchronized (main.class) {
                    val++;
                }
            }
        }
    }
}
 
 
public class stopwatch {
 
    private long startTime = 0;
    private long stopTime = 0;
    private boolean running = false;
 
    public void start() {
        this.startTime = System.currentTimeMillis();
        this.running = true;
    }
 
    public void stop() {
        this.stopTime = System.currentTimeMillis();
        this.running = false;
    }
 
    public long getElapsedTime() {
        long elapsed;
        if (running) {
            elapsed = (System.currentTimeMillis() - startTime);
        } else {
            elapsed = (stopTime - startTime);
        }
        return elapsed;
    }
 
    public long getElapsedTimeSecs() {
        long elapsed;
        if (running) {
            elapsed = ((System.currentTimeMillis() - startTime) / 1000);
        } else {
            elapsed = ((stopTime - startTime) / 1000);
        }
        return elapsed;
    }
 
    // sample usage
    // public static void main(String[] args) {
    // StopWatch s = new StopWatch();
    // s.start();
    // //code you want to time goes here
    // s.stop();
    // System.out.println("elapsed time in milliseconds: " +
    // s.getElapsedTime());
    // }
}

CAS的例子:非阻塞堆栈

下面是比非阻塞自增稍微复杂一点的CAS的例子:非阻塞堆栈/ConcurrentStack 。ConcurrentStack 中的 push() 和 pop() 操作在结构上与NonblockingCounter 上相似,只是做的工作有些冒险,希望在 “提交” 工作的时候,底层假设没有失效。push() 方法观察当前最顶的节点,构建一个新节点放在堆栈上,然后,如果最顶端的节点在初始观察之后没有变化,那么就安装新节点。如果 CAS 失败,意味着另一个线程已经修改了堆栈,那么过程就会重新开始。

 

public class ConcurrentStack<E> {
    AtomicReference<Node<E>> head = new AtomicReference<Node<E>>();
    public void push(E item) {
        Node<E> newHead = new Node<E>(item);
        Node<E> oldHead;
        do {
            oldHead = head.get();
            newHead.next = oldHead;
        } while (!head.compareAndSet(oldHead, newHead));
    }
    public E pop() {
        Node<E> oldHead;
        Node<E> newHead;
        do {
            oldHead = head.get();
            if (oldHead == null) 
                return null;
            newHead = oldHead.next;
        } while (!head.compareAndSet(oldHead,newHead));
        return oldHead.item;
    }
    static class Node<E> {
        final E item;
        Node<E> next;
        public Node(E item) { this.item = item; }
    }
}

在轻度到中度的争用情况下,非阻塞算法的性能会超越阻塞算法,因为 CAS 的多数时间都在第一次尝试时就成功,而发生争用时的开销也不涉及线程挂起和上下文切换,只多了几个循环迭代。没有争用的 CAS 要比没有争用的锁便宜得多(这句话肯定是真的,因为没有争用的锁涉及 CAS 加上额外的处理),而争用的 CAS 比争用的锁获取涉及更短的延迟。

在高度争用的情况下(即有多个线程不断争用一个内存位置的时候),基于锁的算法开始提供比非阻塞算法更好的吞吐率,因为当线程阻塞时,它就会停止争用,耐心地等候轮到自己,从而避免了进一步争用。但是,这么高的争用程度并不常见,因为多数时候,线程会把线程本地的计算与争用共享数据的操作分开,从而给其他线程使用共享数据的机会。

CAS的例子3:非阻塞链表

以上的示例(自增计数器和堆栈)都是非常简单的非阻塞算法,一旦掌握了在循环中使用 CAS,就可以容易地模仿它们。对于更复杂的数据结构,非阻塞算法要比这些简单示例复杂得多,因为修改链表、树或哈希表可能涉及对多个指针的更新。CAS 支持对单一指针的原子性条件更新,但是不支持两个以上的指针。所以,要构建一个非阻塞的链表、树或哈希表,需要找到一种方式,可以用 CAS 更新多个指针,同时不会让数据结构处于不一致的状态。

在链表的尾部插入元素,通常涉及对两个指针的更新:“尾” 指针总是指向列表中的最后一个元素,“下一个” 指针从过去的最后一个元素指向新插入的元素。因为需要更新两个指针,所以需要两个 CAS。在独立的 CAS 中更新两个指针带来了两个需要考虑的潜在问题:如果第一个 CAS 成功,而第二个 CAS 失败,会发生什么?如果其他线程在第一个和第二个 CAS 之间企图访问链表,会发生什么?

对于非复杂数据结构,构建非阻塞算法的 “技巧” 是确保数据结构总处于一致的状态(甚至包括在线程开始修改数据结构和它完成修改之间),还要确保其他线程不仅能够判断出第一个线程已经完成了更新还是处在更新的中途,还能够判断出如果第一个线程走向 AWOL,完成更新还需要什么操作。如果线程发现了处在更新中途的数据结构,它就可以 “帮助” 正在执行更新的线程完成更新,然后再进行自己的操作。当第一个线程回来试图完成自己的更新时,会发现不再需要了,返回即可,因为 CAS 会检测到帮助线程的干预(在这种情况下,是建设性的干预)。

这种 “帮助邻居” 的要求,对于让数据结构免受单个线程失败的影响,是必需的。如果线程发现数据结构正处在被其他线程更新的中途,然后就等候其他线程完成更新,那么如果其他线程在操作中途失败,这个线程就可能永远等候下去。即使不出现故障,这种方式也会提供糟糕的性能,因为新到达的线程必须放弃处理器,导致上下文切换,或者等到自己的时间片过期(而这更糟)。

public class LinkedQueue <E> {
    private static class Node <E> {
        final E item;
        final AtomicReference<Node<E>> next;
        Node(E item, Node<E> next) {
            this.item = item;
            this.next = new AtomicReference<Node<E>>(next);
        }
    }
    private AtomicReference<Node<E>> head
        = new AtomicReference<Node<E>>(new Node<E>(null, null));
    private AtomicReference<Node<E>> tail = head;
    public boolean put(E item) {
        Node<E> newNode = new Node<E>(item, null);
        while (true) {
            Node<E> curTail = tail.get();
            Node<E> residue = curTail.next.get();
            if (curTail == tail.get()) {
                if (residue == null) /* A */ {
                    if (curTail.next.compareAndSet(null, newNode)) /* C */ {
                        tail.compareAndSet(curTail, newNode) /* D */ ;
                        return true;
                    }
                } else {
                    tail.compareAndSet(curTail, residue) /* B */;
                }
            }
        }
    }
}

Java的ConcurrentHashMap的实现原理

Java5中的ConcurrentHashMap,线程安全,设计巧妙,用桶粒度的锁,避免了put和get中对整个map的锁定,尤其在get中,只对一个HashEntry做锁定操作,性能提升是显而易见的。

技术分享图片

具体实现中使用了锁分离机制,在这个帖子中有非常详细的讨论。这里有关于Java内存模型结合ConcurrentHashMap的分析。以下是JDK6的ConcurrentHashMap的源码:

 

技术分享图片
   1 /*
   2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   3  *
   4  * This code is free software; you can redistribute it and/or modify it
   5  * under the terms of the GNU General Public License version 2 only, as
   6  * published by the Free Software Foundation.  Oracle designates this
   7  * particular file as subject to the "Classpath" exception as provided
   8  * by Oracle in the LICENSE file that accompanied this code.
   9  *
  10  * This code is distributed in the hope that it will be useful, but WITHOUT
  11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  12  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  13  * version 2 for more details (a copy is included in the LICENSE file that
  14  * accompanied this code).
  15  *
  16  * You should have received a copy of the GNU General Public License version
  17  * 2 along with this work; if not, write to the Free Software Foundation,
  18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  19  *
  20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  21  * or visit www.oracle.com if you need additional information or have any
  22  * questions.
  23  */
  24  
  25 /*
  26  * This file is available under and governed by the GNU General Public
  27  * License version 2 only, as published by the Free Software Foundation.
  28  * However, the following notice accompanied the original version of this
  29  * file:
  30  *
  31  * Written by Doug Lea with assistance from members of JCP JSR-166
  32  * Expert Group and released to the public domain, as explained at
  33  * http://creativecommons.org/licenses/publicdomain
  34  */
  35  
  36 package java.util.concurrent;
  37 import java.util.concurrent.locks.*;
  38 import java.util.*;
  39 import java.io.Serializable;
  40 import java.io.IOException;
  41 import java.io.ObjectInputStream;
  42 import java.io.ObjectOutputStream;
  43 import java.io.ObjectStreamField;
  44  
  45 /**
  46  * A hash table supporting full concurrency of retrievals and
  47  * adjustable expected concurrency for updates. This class obeys the
  48  * same functional specification as {@link java.util.Hashtable}, and
  49  * includes versions of methods corresponding to each method of
  50  * <tt>Hashtable</tt>. However, even though all operations are
  51  * thread-safe, retrieval operations do <em>not</em> entail locking,
  52  * and there is <em>not</em> any support for locking the entire table
  53  * in a way that prevents all access.  This class is fully
  54  * interoperable with <tt>Hashtable</tt> in programs that rely on its
  55  * thread safety but not on its synchronization details.
  56  *
  57  * <p> Retrieval operations (including <tt>get</tt>) generally do not
  58  * block, so may overlap with update operations (including
  59  * <tt>put</tt> and <tt>remove</tt>). Retrievals reflect the results
  60  * of the most recently <em>completed</em> update operations holding
  61  * upon their onset.  For aggregate operations such as <tt>putAll</tt>
  62  * and <tt>clear</tt>, concurrent retrievals may reflect insertion or
  63  * removal of only some entries.  Similarly, Iterators and
  64  * Enumerations return elements reflecting the state of the hash table
  65  * at some point at or since the creation of the iterator/enumeration.
  66  * They do <em>not</em> throw {@link ConcurrentModificationException}.
  67  * However, iterators are designed to be used by only one thread at a time.
  68  *
  69  * <p> The allowed concurrency among update operations is guided by
  70  * the optional <tt>concurrencyLevel</tt> constructor argument
  71  * (default <tt>16</tt>), which is used as a hint for internal sizing.  The
  72  * table is internally partitioned to try to permit the indicated
  73  * number of concurrent updates without contention. Because placement
  74  * in hash tables is essentially random, the actual concurrency will
  75  * vary.  Ideally, you should choose a value to accommodate as many
  76  * threads as will ever concurrently modify the table. Using a
  77  * significantly higher value than you need can waste space and time,
  78  * and a significantly lower value can lead to thread contention. But
  79  * overestimates and underestimates within an order of magnitude do
  80  * not usually have much noticeable impact. A value of one is
  81  * appropriate when it is known that only one thread will modify and
  82  * all others will only read. Also, resizing this or any other kind of
  83  * hash table is a relatively slow operation, so, when possible, it is
  84  * a good idea to provide estimates of expected table sizes in
  85  * constructors.
  86  *
  87  * <p>This class and its views and iterators implement all of the
  88  * <em>optional</em> methods of the {@link Map} and {@link Iterator}
  89  * interfaces.
  90  *
  91  * <p> Like {@link Hashtable} but unlike {@link HashMap}, this class
  92  * does <em>not</em> allow <tt>null</tt> to be used as a key or value.
  93  *
  94  * <p>This class is a member of the
  95  * <a href="{@docRoot}/../technotes/guides/collections/index.html">
  96  * Java Collections Framework</a>.
  97  *
  98  * @since 1.5
  99  * @author Doug Lea
 100  * @param <K> the type of keys maintained by this map
 101  * @param <V> the type of mapped values
 102  */
 103 public class ConcurrentHashMap<K, V> extends AbstractMap<K, V>
 104         implements ConcurrentMap<K, V>, Serializable {
 105     private static final long serialVersionUID = 7249069246763182397L;
 106  
 107     /*
 108      * The basic strategy is to subdivide the table among Segments,
 109      * each of which itself is a concurrently readable hash table.  To
 110      * reduce footprint, all but one segments are constructed only
 111      * when first needed (see ensureSegment). To maintain visibility
 112      * in the presence of lazy construction, accesses to segments as
 113      * well as elements of segment‘s table must use volatile access,
 114      * which is done via Unsafe within methods segmentAt etc
 115      * below. These provide the functionality of AtomicReferenceArrays
 116      * but reduce the levels of indirection. Additionally,
 117      * volatile-writes of table elements and entry "next" fields
 118      * within locked operations use the cheaper "lazySet" forms of
 119      * writes (via putOrderedObject) because these writes are always
 120      * followed by lock releases that maintain sequential consistency
 121      * of table updates.
 122      *
 123      * Historical note: The previous version of this class relied
 124      * heavily on "final" fields, which avoided some volatile reads at
 125      * the expense of a large initial footprint.  Some remnants of
 126      * that design (including forced construction of segment 0) exist
 127      * to ensure serialization compatibility.
 128      */
 129  
 130     /* ---------------- Constants -------------- */
 131  
 132     /**
 133      * The default initial capacity for this table,
 134      * used when not otherwise specified in a constructor.
 135      */
 136     static final int DEFAULT_INITIAL_CAPACITY = 16;
 137  
 138     /**
 139      * The default load factor for this table, used when not
 140      * otherwise specified in a constructor.
 141      */
 142     static final float DEFAULT_LOAD_FACTOR = 0.75f;
 143  
 144     /**
 145      * The default concurrency level for this table, used when not
 146      * otherwise specified in a constructor.
 147      */
 148     static final int DEFAULT_CONCURRENCY_LEVEL = 16;
 149  
 150     /**
 151      * The maximum capacity, used if a higher value is implicitly
 152      * specified by either of the constructors with arguments.  MUST
 153      * be a power of two <= 1<<30 to ensure that entries are indexable
 154      * using ints.
 155      */
 156     static final int MAXIMUM_CAPACITY = 1 << 30;
 157  
 158     /**
 159      * The minimum capacity for per-segment tables.  Must be a power
 160      * of two, at least two to avoid immediate resizing on next use
 161      * after lazy construction.
 162      */
 163     static final int MIN_SEGMENT_TABLE_CAPACITY = 2;
 164  
 165     /**
 166      * The maximum number of segments to allow; used to bound
 167      * constructor arguments. Must be power of two less than 1 << 24.
 168      */
 169     static final int MAX_SEGMENTS = 1 << 16; // slightly conservative
 170  
 171     /**
 172      * Number of unsynchronized retries in size and containsValue
 173      * methods before resorting to locking. This is used to avoid
 174      * unbounded retries if tables undergo continuous modification
 175      * which would make it impossible to obtain an accurate result.
 176      */
 177     static final int RETRIES_BEFORE_LOCK = 2;
 178  
 179     /* ---------------- Fields -------------- */
 180  
 181     /**
 182      * Mask value for indexing into segments. The upper bits of a
 183      * key‘s hash code are used to choose the segment.
 184      */
 185     final int segmentMask;
 186  
 187     /**
 188      * Shift value for indexing within segments.
 189      */
 190     final int segmentShift;
 191  
 192     /**
 193      * The segments, each of which is a specialized hash table.
 194      */
 195     final Segment<K,V>[] segments;
 196  
 197     transient Set<K> keySet;
 198     transient Set<Map.Entry<K,V>> entrySet;
 199     transient Collection<V> values;
 200  
 201     /**
 202      * ConcurrentHashMap list entry. Note that this is never exported
 203      * out as a user-visible Map.Entry.
 204      */
 205     static final class HashEntry<K,V> {
 206         final int hash;
 207         final K key;
 208         volatile V value;
 209         volatile HashEntry<K,V> next;
 210  
 211         HashEntry(int hash, K key, V value, HashEntry<K,V> next) {
 212             this.hash = hash;
 213             this.key = key;
 214             this.value = value;
 215             this.next = next;
 216         }
 217  
 218         /**
 219          * Sets next field with volatile write semantics.  (See above
 220          * about use of putOrderedObject.)
 221          */
 222         final void setNext(HashEntry<K,V> n) {
 223             UNSAFE.putOrderedObject(this, nextOffset, n);
 224         }
 225  
 226         // Unsafe mechanics
 227         static final sun.misc.Unsafe UNSAFE;
 228         static final long nextOffset;
 229         static {
 230             try {
 231                 UNSAFE = sun.misc.Unsafe.getUnsafe();
 232                 Class k = HashEntry.class;
 233                 nextOffset = UNSAFE.objectFieldOffset
 234                     (k.getDeclaredField("next"));
 235             } catch (Exception e) {
 236                 throw new Error(e);
 237             }
 238         }
 239     }
 240  
 241     /**
 242      * Gets the ith element of given table (if nonnull) with volatile
 243      * read semantics.
 244      */
 245     @SuppressWarnings("unchecked")
 246     static final <K,V> HashEntry<K,V> entryAt(HashEntry<K,V>[] tab, int i) {
 247         return (tab == null) ? null :
 248             (HashEntry<K,V>) UNSAFE.getObjectVolatile
 249             (tab, ((long)i << TSHIFT) + TBASE);
 250     }
 251  
 252     /**
 253      * Sets the ith element of given table, with volatile write
 254      * semantics. (See above about use of putOrderedObject.)
 255      */
 256     static final <K,V> void setEntryAt(HashEntry<K,V>[] tab, int i,
 257                                        HashEntry<K,V> e) {
 258         UNSAFE.putOrderedObject(tab, ((long)i << TSHIFT) + TBASE, e);
 259     }
 260  
 261     /**
 262      * Applies a supplemental hash function to a given hashCode, which
 263      * defends against poor quality hash functions.  This is critical
 264      * because ConcurrentHashMap uses power-of-two length hash tables,
 265      * that otherwise encounter collisions for hashCodes that do not
 266      * differ in lower or upper bits.
 267      */
 268     private static int hash(int h) {
 269         // Spread bits to regularize both segment and index locations,
 270         // using variant of single-word Wang/Jenkins hash.
 271         h += (h <<  15) ^ 0xffffcd7d;
 272         h ^= (h >>> 10);
 273         h += (h <<   3);
 274         h ^= (h >>>  6);
 275         h += (h <<   2) + (h << 14);
 276         return h ^ (h >>> 16);
 277     }
 278  
 279     /**
 280      * Segments are specialized versions of hash tables.  This
 281      * subclasses from ReentrantLock opportunistically, just to
 282      * simplify some locking and avoid separate construction.
 283      */
 284     static final class Segment<K,V> extends ReentrantLock implements Serializable {
 285         /*
 286          * Segments maintain a table of entry lists that are always
 287          * kept in a consistent state, so can be read (via volatile
 288          * reads of segments and tables) without locking.  This
 289          * requires replicating nodes when necessary during table
 290          * resizing, so the old lists can be traversed by readers
 291          * still using old version of table.
 292          *
 293          * This class defines only mutative methods requiring locking.
 294          * Except as noted, the methods of this class perform the
 295          * per-segment versions of ConcurrentHashMap methods.  (Other
 296          * methods are integrated directly into ConcurrentHashMap
 297          * methods.) These mutative methods use a form of controlled
 298          * spinning on contention via methods scanAndLock and
 299          * scanAndLockForPut. These intersperse tryLocks with
 300          * traversals to locate nodes.  The main benefit is to absorb
 301          * cache misses (which are very common for hash tables) while
 302          * obtaining locks so that traversal is faster once
 303          * acquired. We do not actually use the found nodes since they
 304          * must be re-acquired under lock anyway to ensure sequential
 305          * consistency of updates (and in any case may be undetectably
 306          * stale), but they will normally be much faster to re-locate.
 307          * Also, scanAndLockForPut speculatively creates a fresh node
 308          * to use in put if no node is found.
 309          */
 310  
 311         private static final long serialVersionUID = 2249069246763182397L;
 312  
 313         /**
 314          * The maximum number of times to tryLock in a prescan before
 315          * possibly blocking on acquire in preparation for a locked
 316          * segment operation. On multiprocessors, using a bounded
 317          * number of retries maintains cache acquired while locating
 318          * nodes.
 319          */
 320         static final int MAX_SCAN_RETRIES =
 321             Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;
 322  
 323         /**
 324          * The per-segment table. Elements are accessed via
 325          * entryAt/setEntryAt providing volatile semantics.
 326          */
 327         transient volatile HashEntry<K,V>[] table;
 328  
 329         /**
 330          * The number of elements. Accessed only either within locks
 331          * or among other volatile reads that maintain visibility.
 332          */
 333         transient int count;
 334  
 335         /**
 336          * The total number of mutative operations in this segment.
 337          * Even though this may overflows 32 bits, it provides
 338          * sufficient accuracy for stability checks in CHM isEmpty()
 339          * and size() methods.  Accessed only either within locks or
 340          * among other volatile reads that maintain visibility.
 341          */
 342         transient int modCount;
 343  
 344         /**
 345          * The table is rehashed when its size exceeds this threshold.
 346          * (The value of this field is always <tt>(int)(capacity *
 347          * loadFactor)</tt>.)
 348          */
 349         transient int threshold;
 350  
 351         /**
 352          * The load factor for the hash table.  Even though this value
 353          * is same for all segments, it is replicated to avoid needing
 354          * links to outer object.
 355          * @serial
 356          */
 357         final float loadFactor;
 358  
 359         Segment(float lf, int threshold, HashEntry<K,V>[] tab) {
 360             this.loadFactor = lf;
 361             this.threshold = threshold;
 362             this.table = tab;
 363         }
 364  
 365         final V put(K key, int hash, V value, boolean onlyIfAbsent) {
 366             HashEntry<K,V> node = tryLock() ? null :
 367                 scanAndLockForPut(key, hash, value);
 368             V oldValue;
 369             try {
 370                 HashEntry<K,V>[] tab = table;
 371                 int index = (tab.length - 1) & hash;
 372                 HashEntry<K,V> first = entryAt(tab, index);
 373                 for (HashEntry<K,V> e = first;;) {
 374                     if (e != null) {
 375                         K k;
 376                         if ((k = e.key) == key ||
 377                             (e.hash == hash && key.equals(k))) {
 378                             oldValue = e.value;
 379                             if (!onlyIfAbsent) {
 380                                 e.value = value;
 381                                 ++modCount;
 382                             }
 383                             break;
 384                         }
 385                         e = e.next;
 386                     }
 387                     else {
 388                         if (node != null)
 389                             node.setNext(first);
 390                         else
 391                             node = new HashEntry<K,V>(hash, key, value, first);
 392                         int c = count + 1;
 393                         if (c > threshold && first != null &&
 394                             tab.length < MAXIMUM_CAPACITY)
 395                             rehash(node);
 396                         else
 397                             setEntryAt(tab, index, node);
 398                         ++modCount;
 399                         count = c;
 400                         oldValue = null;
 401                         break;
 402                     }
 403                 }
 404             } finally {
 405                 unlock();
 406             }
 407             return oldValue;
 408         }
 409  
 410         /**
 411          * Doubles size of table and repacks entries, also adding the
 412          * given node to new table
 413          */
 414         @SuppressWarnings("unchecked")
 415         private void rehash(HashEntry<K,V> node) {
 416             /*
 417              * Reclassify nodes in each list to new table.  Because we
 418              * are using power-of-two expansion, the elements from
 419              * each bin must either stay at same index, or move with a
 420              * power of two offset. We eliminate unnecessary node
 421              * creation by catching cases where old nodes can be
 422              * reused because their next fields won‘t change.
 423              * Statistically, at the default threshold, only about
 424              * one-sixth of them need cloning when a table
 425              * doubles. The nodes they replace will be garbage
 426              * collectable as soon as they are no longer referenced by
 427              * any reader thread that may be in the midst of
 428              * concurrently traversing table. Entry accesses use plain
 429              * array indexing because they are followed by volatile
 430              * table write.
 431              */
 432             HashEntry<K,V>[] oldTable = table;
 433             int oldCapacity = oldTable.length;
 434             int newCapacity = oldCapacity << 1;
 435             threshold = (int)(newCapacity * loadFactor);
 436             HashEntry<K,V>[] newTable =
 437                 (HashEntry<K,V>[]) new HashEntry[newCapacity];
 438             int sizeMask = newCapacity - 1;
 439             for (int i = 0; i < oldCapacity ; i++) {
 440                 HashEntry<K,V> e = oldTable[i];
 441                 if (e != null) {
 442                     HashEntry<K,V> next = e.next;
 443                     int idx = e.hash & sizeMask;
 444                     if (next == null)   //  Single node on list
 445                         newTable[idx] = e;
 446                     else { // Reuse consecutive sequence at same slot
 447                         HashEntry<K,V> lastRun = e;
 448                         int lastIdx = idx;
 449                         for (HashEntry<K,V> last = next;
 450                              last != null;
 451                              last = last.next) {
 452                             int k = last.hash & sizeMask;
 453                             if (k != lastIdx) {
 454                                 lastIdx = k;
 455                                 lastRun = last;
 456                             }
 457                         }
 458                         newTable[lastIdx] = lastRun;
 459                         // Clone remaining nodes
 460                         for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
 461                             V v = p.value;
 462                             int h = p.hash;
 463                             int k = h & sizeMask;
 464                             HashEntry<K,V> n = newTable[k];
 465                             newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
 466                         }
 467                     }
 468                 }
 469             }
 470             int nodeIndex = node.hash & sizeMask; // add the new node
 471             node.setNext(newTable[nodeIndex]);
 472             newTable[nodeIndex] = node;
 473             table = newTable;
 474         }
 475  
 476         /**
 477          * Scans for a node containing given key while trying to
 478          * acquire lock, creating and returning one if not found. Upon
 479          * return, guarantees that lock is held. UNlike in most
 480          * methods, calls to method equals are not screened: Since
 481          * traversal speed doesn‘t matter, we might as well help warm
 482          * up the associated code and accesses as well.
 483          *
 484          * @return a new node if key not found, else null
 485          */
 486         private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
 487             HashEntry<K,V> first = entryForHash(this, hash);
 488             HashEntry<K,V> e = first;
 489             HashEntry<K,V> node = null;
 490             int retries = -1; // negative while locating node
 491             while (!tryLock()) {
 492                 HashEntry<K,V> f; // to recheck first below
 493                 if (retries < 0) {
 494                     if (e == null) {
 495                         if (node == null) // speculatively create node
 496                             node = new HashEntry<K,V>(hash, key, value, null);
 497                         retries = 0;
 498                     }
 499                     else if (key.equals(e.key))
 500                         retries = 0;
 501                     else
 502                         e = e.next;
 503                 }
 504                 else if (++retries > MAX_SCAN_RETRIES) {
 505                     lock();
 506                     break;
 507                 }
 508                 else if ((retries & 1) == 0 &&
 509                          (f = entryForHash(this, hash)) != first) {
 510                     e = first = f; // re-traverse if entry changed
 511                     retries = -1;
 512                 }
 513             }
 514             return node;
 515         }
 516  
 517         /**
 518          * Scans for a node containing the given key while trying to
 519          * acquire lock for a remove or replace operation. Upon
 520          * return, guarantees that lock is held.  Note that we must
 521          * lock even if the key is not found, to ensure sequential
 522          * consistency of updates.
 523          */
 524         private void scanAndLock(Object key, int hash) {
 525             // similar to but simpler than scanAndLockForPut
 526             HashEntry<K,V> first = entryForHash(this, hash);
 527             HashEntry<K,V> e = first;
 528             int retries = -1;
 529             while (!tryLock()) {
 530                 HashEntry<K,V> f;
 531                 if (retries < 0) {
 532                     if (e == null || key.equals(e.key))
 533                         retries = 0;
 534                     else
 535                         e = e.next;
 536                 }
 537                 else if (++retries > MAX_SCAN_RETRIES) {
 538                     lock();
 539                     break;
 540                 }
 541                 else if ((retries & 1) == 0 &&
 542                          (f = entryForHash(this, hash)) != first) {
 543                     e = first = f;
 544                     retries = -1;
 545                 }
 546             }
 547         }
 548  
 549         /**
 550          * Remove; match on key only if value null, else match both.
 551          */
 552         final V remove(Object key, int hash, Object value) {
 553             if (!tryLock())
 554                 scanAndLock(key, hash);
 555             V oldValue = null;
 556             try {
 557                 HashEntry<K,V>[] tab = table;
 558                 int index = (tab.length - 1) & hash;
 559                 HashEntry<K,V> e = entryAt(tab, index);
 560                 HashEntry<K,V> pred = null;
 561                 while (e != null) {
 562                     K k;
 563                     HashEntry<K,V> next = e.next;
 564                     if ((k = e.key) == key ||
 565                         (e.hash == hash && key.equals(k))) {
 566                         V v = e.value;
 567                         if (value == null || value == v || value.equals(v)) {
 568                             if (pred == null)
 569                                 setEntryAt(tab, index, next);
 570                             else
 571                                 pred.setNext(next);
 572                             ++modCount;
 573                             --count;
 574                             oldValue = v;
 575                         }
 576                         break;
 577                     }
 578                     pred = e;
 579                     e = next;
 580                 }
 581             } finally {
 582                 unlock();
 583             }
 584             return oldValue;
 585         }
 586  
 587         final boolean replace(K key, int hash, V oldValue, V newValue) {
 588             if (!tryLock())
 589                 scanAndLock(key, hash);
 590             boolean replaced = false;
 591             try {
 592                 HashEntry<K,V> e;
 593                 for (e = entryForHash(this, hash); e != null; e = e.next) {
 594                     K k;
 595                     if ((k = e.key) == key ||
 596                         (e.hash == hash && key.equals(k))) {
 597                         if (oldValue.equals(e.value)) {
 598                             e.value = newValue;
 599                             ++modCount;
 600                             replaced = true;
 601                         }
 602                         break;
 603                     }
 604                 }
 605             } finally {
 606                 unlock();
 607             }
 608             return replaced;
 609         }
 610  
 611         final V replace(K key, int hash, V value) {
 612             if (!tryLock())
 613                 scanAndLock(key, hash);
 614             V oldValue = null;
 615             try {
 616                 HashEntry<K,V> e;
 617                 for (e = entryForHash(this, hash); e != null; e = e.next) {
 618                     K k;
 619                     if ((k = e.key) == key ||
 620                         (e.hash == hash && key.equals(k))) {
 621                         oldValue = e.value;
 622                         e.value = value;
 623                         ++modCount;
 624                         break;
 625                     }
 626                 }
 627             } finally {
 628                 unlock();
 629             }
 630             return oldValue;
 631         }
 632  
 633         final void clear() {
 634             lock();
 635             try {
 636                 HashEntry<K,V>[] tab = table;
 637                 for (int i = 0; i < tab.length ; i++)
 638                     setEntryAt(tab, i, null);
 639                 ++modCount;
 640                 count = 0;
 641             } finally {
 642                 unlock();
 643             }
 644         }
 645     }
 646  
 647     // Accessing segments
 648  
 649     /**
 650      * Gets the jth element of given segment array (if nonnull) with
 651      * volatile element access semantics via Unsafe.
 652      */
 653     @SuppressWarnings("unchecked")
 654     static final <K,V> Segment<K,V> segmentAt(Segment<K,V>[] ss, int j) {
 655         long u = (j << SSHIFT) + SBASE;
 656         return ss == null ? null :
 657             (Segment<K,V>) UNSAFE.getObjectVolatile(ss, u);
 658     }
 659  
 660     /**
 661      * Returns the segment for the given index, creating it and
 662      * recording in segment table (via CAS) if not already present.
 663      *
 664      * @param k the index
 665      * @return the segment
 666      */
 667     @SuppressWarnings("unchecked")
 668     private Segment<K,V> ensureSegment(int k) {
 669         final Segment<K,V>[] ss = this.segments;
 670         long u = (k << SSHIFT) + SBASE; // raw offset
 671         Segment<K,V> seg;
 672         if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
 673             Segment<K,V> proto = ss[0]; // use segment 0 as prototype
 674             int cap = proto.table.length;
 675             float lf = proto.loadFactor;
 676             int threshold = (int)(cap * lf);
 677             HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
 678             if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
 679                 == null) { // recheck
 680                 Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
 681                 while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
 682                        == null) {
 683                     if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
 684                         break;
 685                 }
 686             }
 687         }
 688         return seg;
 689     }
 690  
 691     // Hash-based segment and entry accesses
 692  
 693     /**
 694      * Get the segment for the given hash
 695      */
 696     @SuppressWarnings("unchecked")
 697     private Segment<K,V> segmentForHash(int h) {
 698         long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
 699         return (Segment<K,V>) UNSAFE.getObjectVolatile(segments, u);
 700     }
 701  
 702     /**
 703      * Gets the table entry for the given segment and hash
 704      */
 705     @SuppressWarnings("unchecked")
 706     static final <K,V> HashEntry<K,V> entryForHash(Segment<K,V> seg, int h) {
 707         HashEntry<K,V>[] tab;
 708         return (seg == null || (tab = seg.table) == null) ? null :
 709             (HashEntry<K,V>) UNSAFE.getObjectVolatile
 710             (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
 711     }
 712  
 713     /* ---------------- Public operations -------------- */
 714  
 715     /**
 716      * Creates a new, empty map with the specified initial
 717      * capacity, load factor and concurrency level.
 718      *
 719      * @param initialCapacity the initial capacity. The implementation
 720      * performs internal sizing to accommodate this many elements.
 721      * @param loadFactor  the load factor threshold, used to control resizing.
 722      * Resizing may be performed when the average number of elements per
 723      * bin exceeds this threshold.
 724      * @param concurrencyLevel the estimated number of concurrently
 725      * updating threads. The implementation performs internal sizing
 726      * to try to accommodate this many threads.
 727      * @throws IllegalArgumentException if the initial capacity is
 728      * negative or the load factor or concurrencyLevel are
 729      * nonpositive.
 730      */
 731     @SuppressWarnings("unchecked")
 732     public ConcurrentHashMap(int initialCapacity,
 733                              float loadFactor, int concurrencyLevel) {
 734         if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
 735             throw new IllegalArgumentException();
 736         if (concurrencyLevel > MAX_SEGMENTS)
 737             concurrencyLevel = MAX_SEGMENTS;
 738         // Find power-of-two sizes best matching arguments
 739         int sshift = 0;
 740         int ssize = 1;
 741         while (ssize < concurrencyLevel) {
 742             ++sshift;
 743             ssize <<= 1;
 744         }
 745         this.segmentShift = 32 - sshift;
 746         this.segmentMask = ssize - 1;
 747         if (initialCapacity > MAXIMUM_CAPACITY)
 748             initialCapacity = MAXIMUM_CAPACITY;
 749         int c = initialCapacity / ssize;
 750         if (c * ssize < initialCapacity)
 751             ++c;
 752         int cap = MIN_SEGMENT_TABLE_CAPACITY;
 753         while (cap < c)
 754             cap <<= 1;
 755         // create segments and segments[0]
 756         Segment<K,V> s0 =
 757             new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
 758                              (HashEntry<K,V>[])new HashEntry[cap]);
 759         Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
 760         UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
 761         this.segments = ss;
 762     }
 763  
 764     /**
 765      * Creates a new, empty map with the specified initial capacity
 766      * and load factor and with the default concurrencyLevel (16).
 767      *
 768      * @param initialCapacity The implementation performs internal
 769      * sizing to accommodate this many elements.
 770      * @param loadFactor  the load factor threshold, used to control resizing.
 771      * Resizing may be performed when the average number of elements per
 772      * bin exceeds this threshold.
 773      * @throws IllegalArgumentException if the initial capacity of
 774      * elements is negative or the load factor is nonpositive
 775      *
 776      * @since 1.6
 777      */
 778     public ConcurrentHashMap(int initialCapacity, float loadFactor) {
 779         this(initialCapacity, loadFactor, DEFAULT_CONCURRENCY_LEVEL);
 780     }
 781  
 782     /**
 783      * Creates a new, empty map with the specified initial capacity,
 784      * and with default load factor (0.75) and concurrencyLevel (16).
 785      *
 786      * @param initialCapacity the initial capacity. The implementation
 787      * performs internal sizing to accommodate this many elements.
 788      * @throws IllegalArgumentException if the initial capacity of
 789      * elements is negative.
 790      */
 791     public ConcurrentHashMap(int initialCapacity) {
 792         this(initialCapacity, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
 793     }
 794  
 795     /**
 796      * Creates a new, empty map with a default initial capacity (16),
 797      * load factor (0.75) and concurrencyLevel (16).
 798      */
 799     public ConcurrentHashMap() {
 800         this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
 801     }
 802  
 803     /**
 804      * Creates a new map with the same mappings as the given map.
 805      * The map is created with a capacity of 1.5 times the number
 806      * of mappings in the given map or 16 (whichever is greater),
 807      * and a default load factor (0.75) and concurrencyLevel (16).
 808      *
 809      * @param m the map
 810      */
 811     public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
 812         this(Math.max((int) (m.size() / DEFAULT_LOAD_FACTOR) + 1,
 813                       DEFAULT_INITIAL_CAPACITY),
 814              DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
 815         putAll(m);
 816     }
 817  
 818     /**
 819      * Returns <tt>true</tt> if this map contains no key-value mappings.
 820      *
 821      * @return <tt>true</tt> if this map contains no key-value mappings
 822      */
 823     public boolean isEmpty() {
 824         /*
 825          * Sum per-segment modCounts to avoid mis-reporting when
 826          * elements are concurrently added and removed in one segment
 827          * while checking another, in which case the table was never
 828          * actually empty at any point. (The sum ensures accuracy up
 829          * through at least 1<<31 per-segment modifications before
 830          * recheck.)  Methods size() and containsValue() use similar
 831          * constructions for stability checks.
 832          */
 833         long sum = 0L;
 834         final Segment<K,V>[] segments = this.segments;
 835         for (int j = 0; j < segments.length; ++j) {
 836             Segment<K,V> seg = segmentAt(segments, j);
 837             if (seg != null) {
 838                 if (seg.count != 0)
 839                     return false;
 840                 sum += seg.modCount;
 841             }
 842         }
 843         if (sum != 0L) { // recheck unless no modifications
 844             for (int j = 0; j < segments.length; ++j) {
 845                 Segment<K,V> seg = segmentAt(segments, j);
 846                 if (seg != null) {
 847                     if (seg.count != 0)
 848                         return false;
 849                     sum -= seg.modCount;
 850                 }
 851             }
 852             if (sum != 0L)
 853                 return false;
 854         }
 855         return true;
 856     }
 857  
 858     /**
 859      * Returns the number of key-value mappings in this map.  If the
 860      * map contains more than <tt>Integer.MAX_VALUE</tt> elements, returns
 861      * <tt>Integer.MAX_VALUE</tt>.
 862      *
 863      * @return the number of key-value mappings in this map
 864      */
 865     public int size() {
 866         // Try a few times to get accurate count. On failure due to
 867         // continuous async changes in table, resort to locking.
 868         final Segment<K,V>[] segments = this.segments;
 869         int size;
 870         boolean overflow; // true if size overflows 32 bits
 871         long sum;         // sum of modCounts
 872         long last = 0L;   // previous sum
 873         int retries = -1; // first iteration isn‘t retry
 874         try {
 875             for (;;) {
 876                 if (retries++ == RETRIES_BEFORE_LOCK) {
 877                     for (int j = 0; j < segments.length; ++j)
 878                         ensureSegment(j).lock(); // force creation
 879                 }
 880                 sum = 0L;
 881                 size = 0;
 882                 overflow = false;
 883                 for (int j = 0; j < segments.length; ++j) {
 884                     Segment<K,V> seg = segmentAt(segments, j);
 885                     if (seg != null) {
 886                         sum += seg.modCount;
 887                         int c = seg.count;
 888                         if (c < 0 || (size += c) < 0)
 889                             overflow = true;
 890                     }
 891                 }
 892                 if (sum == last)
 893                     break;
 894                 last = sum;
 895             }
 896         } finally {
 897             if (retries > RETRIES_BEFORE_LOCK) {
 898                 for (int j = 0; j < segments.length; ++j)
 899                     segmentAt(segments, j).unlock();
 900             }
 901         }
 902         return overflow ? Integer.MAX_VALUE : size;
 903     }
 904  
 905     /**
 906      * Returns the value to which the specified key is mapped,
 907      * or {@code null} if this map contains no mapping for the key.
 908      *
 909      * <p>More formally, if this map contains a mapping from a key
 910      * {@code k} to a value {@code v} such that {@code key.equals(k)},
 911      * then this method returns {@code v}; otherwise it returns
 912      * {@code null}.  (There can be at most one such mapping.)
 913      *
 914      * @throws NullPointerException if the specified key is null
 915      */
 916     public V get(Object key) {
 917         int hash = hash(key.hashCode());
 918         for (HashEntry<K,V> e = entryForHash(segmentForHash(hash), hash);
 919              e != null; e = e.next) {
 920             K k;
 921             if ((k = e.key) == key || (e.hash == hash && key.equals(k)))
 922                 return e.value;
 923         }
 924         return null;
 925     }
 926  
 927     /**
 928      * Tests if the specified object is a key in this table.
 929      *
 930      * @param  key   possible key
 931      * @return <tt>true</tt> if and only if the specified object
 932      *         is a key in this table, as determined by the
 933      *         <tt>equals</tt> method; <tt>false</tt> otherwise.
 934      * @throws NullPointerException if the specified key is null
 935      */
 936     public boolean containsKey(Object key) {
 937         int hash = hash(key.hashCode());
 938         for (HashEntry<K,V> e = entryForHash(segmentForHash(hash), hash);
 939              e != null; e = e.next) {
 940             K k;
 941             if ((k = e.key) == key || (e.hash == hash && key.equals(k)))
 942                 return true;
 943         }
 944         return false;
 945     }
 946  
 947     /**
 948      * Returns <tt>true</tt> if this map maps one or more keys to the
 949      * specified value. Note: This method requires a full internal
 950      * traversal of the hash table, and so is much slower than
 951      * method <tt>containsKey</tt>.
 952      *
 953      * @param value value whose presence in this map is to be tested
 954      * @return <tt>true</tt> if this map maps one or more keys to the
 955      *         specified value
 956      * @throws NullPointerException if the specified value is null
 957      */
 958     public boolean containsValue(Object value) {
 959         // Same idea as size()
 960         if (value == null)
 961             throw new NullPointerException();
 962         final Segment<K,V>[] segments = this.segments;
 963         boolean found = false;
 964         long last = 0;
 965         int retries = -1;
 966         try {
 967             outer: for (;;) {
 968                 if (retries++ == RETRIES_BEFORE_LOCK) {
 969                     for (int j = 0; j < segments.length; ++j)
 970                         ensureSegment(j).lock(); // force creation
 971                 }
 972                 long hashSum = 0L;
 973                 int sum = 0;
 974                 for (int j = 0; j < segments.length; ++j) {
 975                     HashEntry<K,V>[] tab;
 976                     Segment<K,V> seg = segmentAt(segments, j);
 977                     if (seg != null && (tab = seg.table) != null) {
 978                         for (int i = 0 ; i < tab.length; i++) {
 979                             HashEntry<K,V> e;
 980                             for (e = entryAt(tab, i); e != null; e = e.next) {
 981                                 V v = e.value;
 982                                 if (v != null && value.equals(v)) {
 983                                     found = true;
 984                                     break outer;
 985                                 }
 986                             }
 987                         }
 988                         sum += seg.modCount;
 989                     }
 990                 }
 991                 if (retries > 0 && sum == last)
 992                     break;
 993                 last = sum;
 994             }
 995         } finally {
 996             if (retries > RETRIES_BEFORE_LOCK) {
 997                 for (int j = 0; j < segments.length; ++j)
 998                     segmentAt(segments, j).unlock();
 999             }
1000         }
1001         return found;
1002     }
1003  
1004     /**
1005      * Legacy method testing if some key maps into the specified value
1006      * in this table.  This method is identical in functionality to
1007      * {@link #containsValue}, and exists solely to ensure
1008      * full compatibility with class {@link java.util.Hashtable},
1009      * which supported this method prior to introduction of the
1010      * Java Collections framework.
1011  
1012      * @param  value a value to search for
1013      * @return <tt>true</tt> if and only if some key maps to the
1014      *         <tt>value</tt> argument in this table as
1015      *         determined by the <tt>equals</tt> method;
1016      *         <tt>false</tt> otherwise
1017      * @throws NullPointerException if the specified value is null
1018      */
1019     public boolean contains(Object value) {
1020         return containsValue(value);
1021     }
1022  
1023     /**
1024      * Maps the specified key to the specified value in this table.
1025      * Neither the key nor the value can be null.
1026      *
1027      * <p> The value can be retrieved by calling the <tt>get</tt> method
1028      * with a key that is equal to the original key.
1029      *
1030      * @param key key with which the specified value is to be associated
1031      * @param value value to be associated with the specified key
1032      * @return the previous value associated with <tt>key</tt>, or
1033      *         <tt>null</tt> if there was no mapping for <tt>key</tt>
1034      * @throws NullPointerException if the specified key or value is null
1035      */
1036     public V put(K key, V value) {
1037         if (value == null)
1038             throw new NullPointerException();
1039         int hash = hash(key.hashCode());
1040         int j = (hash >>> segmentShift) & segmentMask;
1041         Segment<K,V> s = segmentAt(segments, j);
1042         if (s == null)
1043             s = ensureSegment(j);
1044         return s.put(key, hash, value, false);
1045     }
1046  
1047     /**
1048      * {@inheritDoc}
1049      *
1050      * @return the previous value associated with the specified key,
1051      *         or <tt>null</tt> if there was no mapping for the key
1052      * @throws NullPointerException if the specified key or value is null
1053      */
1054     public V putIfAbsent(K key, V value) {
1055         if (value == null)
1056             throw new NullPointerException();
1057         int hash = hash(key.hashCode());
1058         int j = (hash >>> segmentShift) & segmentMask;
1059         Segment<K,V> s = segmentAt(segments, j);
1060         if (s == null)
1061             s = ensureSegment(j);
1062         return s.put(key, hash, value, true);
1063     }
1064  
1065     /**
1066      * Copies all of the mappings from the specified map to this one.
1067      * These mappings replace any mappings that this map had for any of the
1068      * keys currently in the specified map.
1069      *
1070      * @param m mappings to be stored in this map
1071      */
1072     public void putAll(Map<? extends K, ? extends V> m) {
1073         for (Map.Entry<? extends K, ? extends V> e : m.entrySet())
1074             put(e.getKey(), e.getValue());
1075     }
1076  
1077     /**
1078      * Removes the key (and its corresponding value) from this map.
1079      * This method does nothing if the key is not in the map.
1080      *
1081      * @param  key the key that needs to be removed
1082      * @return the previous value associated with <tt>key</tt>, or
1083      *         <tt>null</tt> if there was no mapping for <tt>key</tt>
1084      * @throws NullPointerException if the specified key is null
1085      */
1086     public V remove(Object key) {
1087         int hash = hash(key.hashCode());
1088         Segment<K,V> s = segmentForHash(hash);
1089         return s == null ? null : s.remove(key, hash, null);
1090     }
1091  
1092     /**
1093      * {@inheritDoc}
1094      *
1095      * @throws NullPointerException if the specified key is null
1096      */
1097     public boolean remove(Object key, Object value) {
1098         int hash = hash(key.hashCode());
1099         Segment<K,V> s;
1100         return value != null && (s = segmentForHash(hash)) != null &&
1101             s.remove(key, hash, value) != null;
1102     }
1103  
1104     /**
1105      * {@inheritDoc}
1106      *
1107      * @throws NullPointerException if any of the arguments are null
1108      */
1109     public boolean replace(K key, V oldValue, V newValue) {
1110         int hash = hash(key.hashCode());
1111         if (oldValue == null || newValue == null)
1112             throw new NullPointerException();
1113         Segment<K,V> s = segmentForHash(hash);
1114         return s != null && s.replace(key, hash, oldValue, newValue);
1115     }
1116  
1117     /**
1118      * {@inheritDoc}
1119      *
1120      * @return the previous value associated with the specified key,
1121      *         or <tt>null</tt> if there was no mapping for the key
1122      * @throws NullPointerException if the specified key or value is null
1123      */
1124     public V replace(K key, V value) {
1125         int hash = hash(key.hashCode());
1126         if (value == null)
1127             throw new NullPointerException();
1128         Segment<K,V> s = segmentForHash(hash);
1129         return s == null ? null : s.replace(key, hash, value);
1130     }
1131  
1132     /**
1133      * Removes all of the mappings from this map.
1134      */
1135     public void clear() {
1136         final Segment<K,V>[] segments = this.segments;
1137         for (int j = 0; j < segments.length; ++j) {
1138             Segment<K,V> s = segmentAt(segments, j);
1139             if (s != null)
1140                 s.clear();
1141         }
1142     }
1143  
1144     /**
1145      * Returns a {@link Set} view of the keys contained in this map.
1146      * The set is backed by the map, so changes to the map are
1147      * reflected in the set, and vice-versa.  The set supports element
1148      * removal, which removes the corresponding mapping from this map,
1149      * via the <tt>Iterator.remove</tt>, <tt>Set.remove</tt>,
1150      * <tt>removeAll</tt>, <tt>retainAll</tt>, and <tt>clear</tt>
1151      * operations.  It does not support the <tt>add</tt> or
1152      * <tt>addAll</tt> operations.
1153      *
1154      * <p>The view‘s <tt>iterator</tt> is a "weakly consistent" iterator
1155      * that will never throw {@link ConcurrentModificationException},
1156      * and guarantees to traverse elements as they existed upon
1157      * construction of the iterator, and may (but is not guaranteed to)
1158      * reflect any modifications subsequent to construction.
1159      */
1160     public Set<K> keySet() {
1161         Set<K> ks = keySet;
1162         return (ks != null) ? ks : (keySet = new KeySet());
1163     }
1164  
1165     /**
1166      * Returns a {@link Collection} view of the values contained in this map.
1167      * The collection is backed by the map, so changes to the map are
1168      * reflected in the collection, and vice-versa.  The collection
1169      * supports element removal, which removes the corresponding
1170      * mapping from this map, via the <tt>Iterator.remove</tt>,
1171      * <tt>Collection.remove</tt>, <tt>removeAll</tt>,
1172      * <tt>retainAll</tt>, and <tt>clear</tt> operations.  It does not
1173      * support the <tt>add</tt> or <tt>addAll</tt> operations.
1174      *
1175      * <p>The view‘s <tt>iterator</tt> is a "weakly consistent" iterator
1176      * that will never throw {@link ConcurrentModificationException},
1177      * and guarantees to traverse elements as they existed upon
1178      * construction of the iterator, and may (but is not guaranteed to)
1179      * reflect any modifications subsequent to construction.
1180      */
1181     public Collection<V> values() {
1182         Collection<V> vs = values;
1183         return (vs != null) ? vs : (values = new Values());
1184     }
1185  
1186     /**
1187      * Returns a {@link Set} view of the mappings contained in this map.
1188      * The set is backed by the map, so changes to the map are
1189      * reflected in the set, and vice-versa.  The set supports element
1190      * removal, which removes the corresponding mapping from the map,
1191      * via the <tt>Iterator.remove</tt>, <tt>Set.remove</tt>,
1192      * <tt>removeAll</tt>, <tt>retainAll</tt>, and <tt>clear</tt>
1193      * operations.  It does not support the <tt>add</tt> or
1194      * <tt>addAll</tt> operations.
1195      *
1196      * <p>The view‘s <tt>iterator</tt> is a "weakly consistent" iterator
1197      * that will never throw {@link ConcurrentModificationException},
1198      * and guarantees to traverse elements as they existed upon
1199      * construction of the iterator, and may (but is not guaranteed to)
1200      * reflect any modifications subsequent to construction.
1201      */
1202     public Set<Map.Entry<K,V>> entrySet() {
1203         Set<Map.Entry<K,V>> es = entrySet;
1204         return (es != null) ? es : (entrySet = new EntrySet());
1205     }
1206  
1207     /**
1208      * Returns an enumeration of the keys in this table.
1209      *
1210      * @return an enumeration of the keys in this table
1211      * @see #keySet()
1212      */
1213     public Enumeration<K> keys() {
1214         return new KeyIterator();
1215     }
1216  
1217     /**
1218      * Returns an enumeration of the values in this table.
1219      *
1220      * @return an enumeration of the values in this table
1221      * @see #values()
1222      */
1223     public Enumeration<V> elements() {
1224         return new ValueIterator();
1225     }
1226  
1227     /* ---------------- Iterator Support -------------- */
1228  
1229     abstract class HashIterator {
1230         int nextSegmentIndex;
1231         int nextTableIndex;
1232         HashEntry<K,V>[] currentTable;
1233         HashEntry<K, V> nextEntry;
1234         HashEntry<K, V> lastReturned;
1235  
1236         HashIterator() {
1237             nextSegmentIndex = segments.length - 1;
1238             nextTableIndex = -1;
1239             advance();
1240         }
1241  
1242         /**
1243          * Set nextEntry to first node of next non-empty table
1244          * (in backwards order, to simplify checks).
1245          */
1246         final void advance() {
1247             for (;;) {
1248                 if (nextTableIndex >= 0) {
1249                     if ((nextEntry = entryAt(currentTable,
1250                                              nextTableIndex--)) != null)
1251                         break;
1252                 }
1253                 else if (nextSegmentIndex >= 0) {
1254                     Segment<K,V> seg = segmentAt(segments, nextSegmentIndex--);
1255                     if (seg != null && (currentTable = seg.table) != null)
1256                         nextTableIndex = currentTable.length - 1;
1257                 }
1258                 else
1259                     break;
1260             }
1261         }
1262  
1263         final HashEntry<K,V> nextEntry() {
1264             HashEntry<K,V> e = nextEntry;
1265             if (e == null)
1266                 throw new NoSuchElementException();
1267             lastReturned = e; // cannot assign until after null check
1268             if ((nextEntry = e.next) == null)
1269                 advance();
1270             return e;
1271         }
1272  
1273         public final boolean hasNext() { return nextEntry != null; }
1274         public final boolean hasMoreElements() { return nextEntry != null; }
1275  
1276         public final void remove() {
1277             if (lastReturned == null)
1278                 throw new IllegalStateException();
1279             ConcurrentHashMap.this.remove(lastReturned.key);
1280             lastReturned = null;
1281         }
1282     }
1283  
1284     final class KeyIterator
1285         extends HashIterator
1286         implements Iterator<K>, Enumeration<K>
1287     {
1288         public final K next()        { return super.nextEntry().key; }
1289         public final K nextElement() { return super.nextEntry().key; }
1290     }
1291  
1292     final class ValueIterator
1293         extends HashIterator
1294         implements Iterator<V>, Enumeration<V>
1295     {
1296         public final V next()        { return super.nextEntry().value; }
1297         public final V nextElement() { return super.nextEntry().value; }
1298     }
1299  
1300     /**
1301      * Custom Entry class used by EntryIterator.next(), that relays
1302      * setValue changes to the underlying map.
1303      */
1304     final class WriteThroughEntry
1305         extends AbstractMap.SimpleEntry<K,V>
1306     {
1307         WriteThroughEntry(K k, V v) {
1308             super(k,v);
1309         }
1310  
1311         /**
1312          * Set our entry‘s value and write through to the map. The
1313          * value to return is somewhat arbitrary here. Since a
1314          * WriteThroughEntry does not necessarily track asynchronous
1315          * changes, the most recent "previous" value could be
1316          * different from what we return (or could even have been
1317          * removed in which case the put will re-establish). We do not
1318          * and cannot guarantee more.
1319          */
1320         public V setValue(V value) {
1321             if (value == null) throw new NullPointerException();
1322             V v = super.setValue(value);
1323             ConcurrentHashMap.this.put(getKey(), value);
1324             return v;
1325         }
1326     }
1327  
1328     final class EntryIterator
1329         extends HashIterator
1330         implements Iterator<Entry<K,V>>
1331     {
1332         public Map.Entry<K,V> next() {
1333             HashEntry<K,V> e = super.nextEntry();
1334             return new WriteThroughEntry(e.key, e.value);
1335         }
1336     }
1337  
1338     final class KeySet extends AbstractSet<K> {
1339         public Iterator<K> iterator() {
1340             return new KeyIterator();
1341         }
1342         public int size() {
1343             return ConcurrentHashMap.this.size();
1344         }
1345         public boolean isEmpty() {
1346             return ConcurrentHashMap.this.isEmpty();
1347         }
1348         public boolean contains(Object o) {
1349             return ConcurrentHashMap.this.containsKey(o);
1350         }
1351         public boolean remove(Object o) {
1352             return ConcurrentHashMap.this.remove(o) != null;
1353         }
1354         public void clear() {
1355             ConcurrentHashMap.this.clear();
1356         }
1357     }
1358  
1359     final class Values extends AbstractCollection<V> {
1360         public Iterator<V> iterator() {
1361             return new ValueIterator();
1362         }
1363         public int size() {
1364             return ConcurrentHashMap.this.size();
1365         }
1366         public boolean isEmpty() {
1367             return ConcurrentHashMap.this.isEmpty();
1368         }
1369         public boolean contains(Object o) {
1370             return ConcurrentHashMap.this.containsValue(o);
1371         }
1372         public void clear() {
1373             ConcurrentHashMap.this.clear();
1374         }
1375     }
1376  
1377     final class EntrySet extends AbstractSet<Map.Entry<K,V>> {
1378         public Iterator<Map.Entry<K,V>> iterator() {
1379             return new EntryIterator();
1380         }
1381         public boolean contains(Object o) {
1382             if (!(o instanceof Map.Entry))
1383                 return false;
1384             Map.Entry<?,?> e = (Map.Entry<?,?>)o;
1385             V v = ConcurrentHashMap.this.get(e.getKey());
1386             return v != null && v.equals(e.getValue());
1387         }
1388         public boolean remove(Object o) {
1389             if (!(o instanceof Map.Entry))
1390                 return false;
1391             Map.Entry<?,?> e = (Map.Entry<?,?>)o;
1392             return ConcurrentHashMap.this.remove(e.getKey(), e.getValue());
1393         }
1394         public int size() {
1395             return ConcurrentHashMap.this.size();
1396         }
1397         public boolean isEmpty() {
1398             return ConcurrentHashMap.this.isEmpty();
1399         }
1400         public void clear() {
1401             ConcurrentHashMap.this.clear();
1402         }
1403     }
1404  
1405     /* ---------------- Serialization Support -------------- */
1406  
1407     /**
1408      * Save the state of the <tt>ConcurrentHashMap</tt> instance to a
1409      * stream (i.e., serialize it).
1410      * @param s the stream
1411      * @serialData
1412      * the key (Object) and value (Object)
1413      * for each key-value mapping, followed by a null pair.
1414      * The key-value mappings are emitted in no particular order.
1415      */
1416     private void writeObject(java.io.ObjectOutputStream s) throws IOException  {
1417         // force all segments for serialization compatibility
1418         for (int k = 0; k < segments.length; ++k)
1419             ensureSegment(k);
1420         s.defaultWriteObject();
1421  
1422         final Segment<K,V>[] segments = this.segments;
1423         for (int k = 0; k < segments.length; ++k) {
1424             Segment<K,V> seg = segmentAt(segments, k);
1425             seg.lock();
1426             try {
1427                 HashEntry<K,V>[] tab = seg.table;
1428                 for (int i = 0; i < tab.length; ++i) {
1429                     HashEntry<K,V> e;
1430                     for (e = entryAt(tab, i); e != null; e = e.next) {
1431                         s.writeObject(e.key);
1432                         s.writeObject(e.value);
1433                     }
1434                 }
1435             } finally {
1436                 seg.unlock();
1437             }
1438         }
1439         s.writeObject(null);
1440         s.writeObject(null);
1441     }
1442  
1443     /**
1444      * Reconstitute the <tt>ConcurrentHashMap</tt> instance from a
1445      * stream (i.e., deserialize it).
1446      * @param s the stream
1447      */
1448     @SuppressWarnings("unchecked")
1449     private void readObject(java.io.ObjectInputStream s)
1450         throws IOException, ClassNotFoundException  {
1451         // Don‘t call defaultReadObject()
1452         ObjectInputStream.GetField oisFields = s.readFields();
1453         final Segment<K,V>[] oisSegments = (Segment<K,V>[])oisFields.get("segments", null);
1454  
1455         final int ssize = oisSegments.length;
1456         if (ssize < 1 || ssize > MAX_SEGMENTS
1457             || (ssize & (ssize-1)) != 0 )  // ssize not power of two
1458             throw new java.io.InvalidObjectException("Bad number of segments:"
1459                                                      + ssize);
1460         int sshift = 0, ssizeTmp = ssize;
1461         while (ssizeTmp > 1) {
1462             ++sshift;
1463             ssizeTmp >>>= 1;
1464         }
1465         UNSAFE.putIntVolatile(this, SEGSHIFT_OFFSET, 32 - sshift);
1466         UNSAFE.putIntVolatile(this, SEGMASK_OFFSET, ssize - 1);
1467         UNSAFE.putObjectVolatile(this, SEGMENTS_OFFSET, oisSegments);
1468  
1469         // Re-initialize segments to be minimally sized, and let grow.
1470         int cap = MIN_SEGMENT_TABLE_CAPACITY;
1471         final Segment<K,V>[] segments = this.segments;
1472         for (int k = 0; k < segments.length; ++k) {
1473             Segment<K,V> seg = segments[k];
1474             if (seg != null) {
1475                 seg.threshold = (int)(cap * seg.loadFactor);
1476                 seg.table = (HashEntry<K,V>[]) new HashEntry[cap];
1477             }
1478         }
1479  
1480         // Read the keys and values, and put the mappings in the table
1481         for (;;) {
1482             K key = (K) s.readObject();
1483             V value = (V) s.readObject();
1484             if (key == null)
1485                 break;
1486             put(key, value);
1487         }
1488     }
1489  
1490     // Unsafe mechanics
1491     private static final sun.misc.Unsafe UNSAFE;
1492     private static final long SBASE;
1493     private static final int SSHIFT;
1494     private static final long TBASE;
1495     private static final int TSHIFT;
1496     private static final long SEGSHIFT_OFFSET;
1497     private static final long SEGMASK_OFFSET;
1498     private static final long SEGMENTS_OFFSET;
1499  
1500     static {
1501         int ss, ts;
1502         try {
1503             UNSAFE = sun.misc.Unsafe.getUnsafe();
1504             Class tc = HashEntry[].class;
1505             Class sc = Segment[].class;
1506             TBASE = UNSAFE.arrayBaseOffset(tc);
1507             SBASE = UNSAFE.arrayBaseOffset(sc);
1508             ts = UNSAFE.arrayIndexScale(tc);
1509             ss = UNSAFE.arrayIndexScale(sc);
1510             SEGSHIFT_OFFSET = UNSAFE.objectFieldOffset(
1511                 ConcurrentHashMap.class.getDeclaredField("segmentShift"));
1512             SEGMASK_OFFSET = UNSAFE.objectFieldOffset(
1513                 ConcurrentHashMap.class.getDeclaredField("segmentMask"));
1514             SEGMENTS_OFFSET = UNSAFE.objectFieldOffset(
1515                 ConcurrentHashMap.class.getDeclaredField("segments"));
1516         } catch (Exception e) {
1517             throw new Error(e);
1518         }
1519         if ((ss & (ss-1)) != 0 || (ts & (ts-1)) != 0)
1520             throw new Error("data type scale not a power of two");
1521         SSHIFT = 31 - Integer.numberOfLeadingZeros(ss);
1522         TSHIFT = 31 - Integer.numberOfLeadingZeros(ts);
1523     }
1524  
1525 }
View Code

Java的ConcurrentLinkedQueue实现方法

ConcurrentLinkedQueue也是同样使用了CAS指令,但其性能并不高因为太多CAS操作。其源码如下:

 

 

高并发环境下优化锁或无锁(lock-free)的设计思路

服务端编程的3大性能杀手:1、大量线程导致的线程切换开销。2、锁。3、非必要的内存拷贝。在高并发下,对于纯内存操作来说,单线程是要比多线程快的, 可以比较一下多线程程序在压力测试下cpu的sy和ni百分比。高并发环境下要实现高吞吐量和线程安全,两个思路:一个是用优化的锁实现,一个是lock-free的无锁结构。但非阻塞算法要比基于锁的算法复杂得多。开发非阻塞算法是相当专业的训练,而且要证明算法的正确也极为困难,不仅和具体的目标机器平台和编译器相关,而且需要复杂的技巧和严格的测试。虽然Lock-Free编程非常困难,但是它通常可以带来比基于锁编程更高的吞吐量。所以Lock-Free编程是大有前途的技术。它在线程中止、优先级倒置以及信号安全等方面都有着良好的表现。

  • 优化锁实现的例子:Java中的ConcurrentHashMap,设计巧妙,用桶粒度的锁和锁分离机制,避免了put和get中对整个map的锁定,尤其在get中,只对一个HashEntry做锁定操作,性能提升是显而易见的(详细分析见《探索 ConcurrentHashMap 高并发性的实现机制》)。
  • Lock-free无锁的例子:CAS(CPU的Compare-And-Swap指令)的利用和LMAX的disruptor无锁消息队列数据结构等。有兴趣了解LMAX的disruptor无锁消息队列数据结构的可以移步slideshare

 

深入JVM的OS的无锁非阻塞算法

 

如果深入 JVM 和操作系统,会发现非阻塞算法无处不在。垃圾收集器使用非阻塞算法加快并发和平行的垃圾搜集;调度器使用非阻塞算法有效地调度线程和进程,实现内在锁。在 Mustang(Java 6.0)中,基于锁的SynchronousQueue 算法被新的非阻塞版本代替。很少有开发人员会直接使用 SynchronousQueue,但是通过 Executors.newCachedThreadPool() 工厂构建的线程池用它作为工作队列。比较缓存线程池性能的对比测试显示,新的非阻塞同步队列实现提供了几乎是当前实现 3 倍的速度。在 Mustang 的后续版本(代码名称为 Dolphin)中,已经规划了进一步的改进。

 

 

非阻塞同步算法与CAS(Compare and Swap)无锁算法

标签:poi   includes   wiki   led   extent   remove   oat   xpl   click   

原文地址:https://www.cnblogs.com/jack-Star/p/8283230.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!