码迷,mamicode.com
首页 > 其他好文 > 详细

ConcurrentHashMap源码分析(JDK1.7和1.8对比)

时间:2017-09-12 20:48:16      阅读:165      评论:0      收藏:0      [点我收藏+]

标签:并且   stat   jdk源码   find   nsf   位置   字段   else   共享   

一、ConcurrentHashMap简介   

  并发编程大师Doug Lea开发的并发容器之一。ConcurrentHashMap是线程安全且高效的HashMap,在HashMap的基础上增加了线程安全,当然结构方面也有所改变。

  为什么要使用ConcurrentHashMap?   

  1、多线程环境下,HashMap会处于不安全状态。例如put操作可能会引起程序死循环,Cpu占有率达百分百,原因是多线程会导致HashMap的Entry链表形成环形数据结构,如此一来,他的next结点将永不为空,就会产生死循环获取Entry。   既然不能使用HashMap,在多线程环境下就给并发容器有了登场机会。   

  2、HashTable效率低下。HashTable相信很多人都知道,相比HashMap就增加了线程安全机制,在过去HashTable就会被使用,但由于它的实现是利用synchronized来保证线程安全的,在线程竞争激烈的情况下,效率就非常低下了。原因   在于当有线程访问HashTable的同步方法时,其他线程也来访问就会进行阻塞或者轮询状态。因此效率低下,如今开发中已经鲜有人使用HashTable了。   

  3、ConcurrentHashMap采用锁分段技术可有效提高并发访问率。这个将马上展开说明。

 

二、ConcurrentHashMap结构与实现

 

JDK1.7实现  

走进jdk源码,类似HashMap,先看看它的类结构:

public class ConcurrentHashMap<K,V> extends AbstractMap<K,V>
    implements ConcurrentMap<K,V>, Serializable{}

同样是继承了ConcurrentMap的骨架实现类AbstractMap,并且实现了ConcurrentMap。

技术分享

通过它的类图来分析结构,ConcurrentHashMap是由Segment数组结构和HashEntry数组结构组成,Segment是一种可重入锁,在ConcurrentHashMap中扮演锁的角色,HashEntry则用来存储键值对数据。一个ConcurrentHashMap中维护一个

Segment数组,Segment结构是一种数组和链表结构,每个Segment又包含一个HashEntry数组,每个HashEntry是一个链表结构的元素。因此当对元素进行修改的时候,只需要获得对应的Segment锁即可,不需要锁住整个容器,从而大大提高了访问并发效率。

 技术分享

初始化segments数组的源代码:

if (concurrencyLevel > MAX_SEGMENTS)
        concurrencyLevel = MAX_SEGMENTS;
int sshift = 0;
int ssize = 1;
while (ssize < concurrencyLevel) {
    ++shift;
    ssize <<= 1;      
}
segmentShift = 32 - sshift;
segmentMask = ssize - 1;
this.segments = Segment.newArray(ssize);

由源代码可知,segment的长度是通过concurrencyLevel计算得出的。最大值为65536。

而HashEntry的初始化长度则由以下源代码实现:

if (c * ssize < initialCapacity)
    ++c;
int cap = MIN_SEGMENT_TABLE_CAPACITY;
while (cap < c)
    cap <<= 1;

定位Segment方法:

  通过对元素的hashCode进行一次再散列。

private static int hash(int h) {
    h += (h<<15)^0xffffcd7d;
    h ^= (h>>>10);
    h += (h<<3);
    h ^= (h>>>6);
    h += (h<<2) + (h << 14);
    return h ^ (h >>> 16);    
}
final Segment<K,V> segmentFor(int hash) {
    return segments[(hash >>> segmentShift) & segmentMask];    
}

 

 

get、put、size操作

1、get操作

public V get(Object key) {
   int hash = hash(key.hashCode()) ;
   return segmentFor(hash).get(key, hash);   
}

get操作先经过一次再散列,然后使用这个散列值通过散列运算定位到Segment,再通过散列算法定位到元素。整个get过程,不需要加锁,除非读到的值是空才会加锁重读。

为什么不需要加锁呢?在HashTable中get方法是需要加锁的,原因就在于ConcurrentHashMap中把共享变量定义为volatile类型,如此一来即可在线程之间保持可见性,确保读到最新的值。

2、put操作

put操作首先定位到Segment,然后在里面进行插入操作,在此之前需要进行是否扩容判断。扩容会创建容量为原来两倍的数组,然后把元素重新散列后插入到新的数组,为了高效

ConcurrentHashMap只会对某一个Segment进行扩容。

3、size操作

统计整个容器的元素大小。Segment里面的count变量是volatie变量,似乎只要把全部count相加即可得到结果。但实际上,有可能发送在累加完毕前count发生了变化,因此统计结果就会不准确。

针对这种问题,ConcurrentHashMap采取了先尝试两次不锁Segment的方式来统计,如果两次统计结果相同则准确,否则则对Segment加锁进行最安全的统计。

 

 

 

接下来看JDK1.8的ConcurrentHashMap实现:

1.8中放弃了Segment臃肿的设计,取而代之的是采用Node + CAS + Synchronized来保证并发安全进行实现

改进主要有两点:

一:取消了Segment字段,直接采用HashEntry的table来存储数据。采用table的元素作为锁。

二:将原先table数组+单向链表的数据结构,变更为table数组+单向链表+红黑树的结构,这一点上与1.8HashMap更加相似。

这里就不在画图展示结构。直接进入1.8的源码分析。

 

初始化:

  构造函数中不再初始化table,初始化table延迟到第一次put操作。如此一来又会存在并发问题,因为即便是第一次put操作也存在并发可能性,下面看看是怎么解决这个延迟初始化操作同时避免并发问题:

首先介绍sizeCtl字段的含义

sizeCtl :默认为0,用来控制table的初始化和扩容操作。
  • -1 代表table正在初始化
  • -N 表示有N-1个线程正在进行扩容操作
  • 其余情况:
    1、如果table未初始化,表示table需要初始化的大小。
    2、如果table初始化完成,表示table的容量,默认是table大小的0.75倍,居然用这个公式算0.75(n - (n >>> 2))。
private final Node<K,V>[] initTable() {
    Node<K,V>[] tab; int sc;
    while ((tab = table) == null || tab.length == 0) {
//如果一个线程发现sizeCtl<0,意味着另外的线程执行CAS操作成功,当前线程只需要让出cpu时间片
        if ((sc = sizeCtl) < 0) 
            Thread.yield(); // lost initialization race; just spin
        else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
            try {
                if ((tab = table) == null || tab.length == 0) {
                    int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                    @SuppressWarnings("unchecked")
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                    table = tab = nt;
                    sc = n - (n >>> 2);
                }
            } finally {
                sizeCtl = sc;
            }
            break;
        }
    }
    return tab;
}

这样便可延迟初始化数组了。

put操作、get操作、size操作

1、put操作

public V put(K key, V value) {
        return putVal(key, value, false);
    }


    /** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
        int hash = spread(key.hashCode());
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();  //第一次put,即table还没初始化,详情看上面的代码
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
            else {
                V oldVal = null;
          //加锁,针对单个结点加锁,进一步减少并发冲突,比segment更先进 synchronized (f) {
if (tabAt(tab, i) == f) { //数量未达到使用红黑树 if (fh >= 0) { binCount = 1; for (Node<K,V> e = f;; ++binCount) { K ek; if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { //如果找到key 直接替代旧value oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } Node<K,V> pred = e; if ((e = e.next) == null) { pred.next = new Node<K,V>(hash, key, value, null); break; } } } else if (f instanceof TreeBin) { //如果是红黑树结点,则操作红黑树插入元素 Node<K,V> p; binCount = 2; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } if (binCount != 0) { //如果数量大于8 转化为红黑树结构 if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } } addCount(1L, binCount); return null; }
Hash算法
static final int spread(int h) {return (h ^ (h >>> 16)) & HASH_BITS;}
定位索引位置
int index = (n - 1) & hash
  • 获取table中对应索引的元素f。
    Doug Lea采用Unsafe.getObjectVolatile来获取,也许有人质疑,直接table[index]不可以么,为什么要这么复杂?
    在java内存模型中,我们已经知道每个线程都有一个工作内存,里面存储着table的副本,虽然table是volatile修饰的,但不能保证线程每次都拿到table中的最新元素,Unsafe.getObjectVolatile可以直接获取指定内存的数据,保证了每次拿到数据都是最新的。
  • 如果f为null,说明table中这个位置第一次插入元素,利用Unsafe.compareAndSwapObject方法插入Node节点。
    • 如果CAS成功,说明Node节点已经插入,随后addCount(1L, binCount)方法会检查当前容量是否需要进行扩容。
    • 如果CAS失败,说明有其它线程提前插入了节点,自旋重新尝试在这个位置插入节点。
  • 如果f的hash值为-1,说明当前f是ForwardingNode节点,意味有其它线程正在扩容,则一起进行扩容操作。
  • 其余情况把新的Node节点按链表或红黑树的方式插入到合适的位置,这个过程采用同步内置锁实现并发
2、get操作
 
get操作就相对简单,过程为:先判断table是否为空,是的话直接返回null;然后通过计算key的hash值,找到对应table,再遍历链表或者红黑树利用equals方法返回对应的value。
 
public V get(Object key) {
        Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
        int h = spread(key.hashCode());
        if ((tab = table) != null && (n = tab.length) > 0 &&
            (e = tabAt(tab, (n - 1) & h)) != null) {
            if ((eh = e.hash) == h) {
                if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                    return e.val;
            }
            else if (eh < 0)
                return (p = e.find(h, key)) != null ? p.val : null;
            while ((e = e.next) != null) {
                if (e.hash == h &&
                    ((ek = e.key) == key || (ek != null && key.equals(ek))))
                    return e.val;
            }
        }
        return null;
    }

 

3.size操作

 

1.8中使用一个volatile类型的变量baseCount记录元素的个数,当插入新数据或则删除数据时,会通过addCount()方法更新baseCount,实现如下:

if ((as = counterCells) != null ||
    !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
    CounterCell a; long v; int m;
    boolean uncontended = true;
    if (as == null || (m = as.length - 1) < 0 ||
        (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
        !(uncontended =
          U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
        fullAddCount(x, uncontended);
        return;
    }
    if (check <= 1)
        return;
    s = sumCount();
}

1、初始化时counterCells为空,在并发量很高时,如果存在两个线程同时执行CAS修改baseCount值,则失败的线程会继续执行方法体中的逻辑,使用CounterCell记录元素个数的变化;

2、如果CounterCell数组counterCells为空,调用fullAddCount()方法进行初始化,并插入对应的记录数,通过CAS设置cellsBusy字段,只有设置成功的线程才能初始化CounterCell数组,实现如下:

else if (cellsBusy == 0 && counterCells == as &&
         U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
    boolean init = false;
    try {                           // Initialize table
        if (counterCells == as) {
            CounterCell[] rs = new CounterCell[2];
            rs[h & 1] = new CounterCell(x);
            counterCells = rs;
            init = true;
        }
    } finally {
        cellsBusy = 0;
    }
    if (init)
        break;
}

 

3、如果通过CAS设置cellsBusy字段失败的话,则继续尝试通过CAS修改baseCount字段,如果修改baseCount字段成功的话,就退出循环,否则继续循环插入CounterCell对象;

else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
    break;

 

所以在1.8中的size实现比1.7简单多,因为元素个数保存baseCount中,部分元素的变化个数保存在CounterCell数组中,实现如下:

public int size() {
    long n = sumCount();
    return ((n < 0L) ? 0 :
            (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
            (int)n);
}

final long sumCount() {
    CounterCell[] as = counterCells; CounterCell a;
    long sum = baseCount;
    if (as != null) {
        for (int i = 0; i < as.length; ++i) {
            if ((a = as[i]) != null)
                sum += a.value;
        }
    }
    return sum;
}

 

通过累加baseCountCounterCell数组中的数量,即可得到元素的总个数;



 参考资料:文章  http://www.jianshu.com/p/e694f1e868ec

书籍 《JAVA并发编程的艺术》

ConcurrentHashMap源码分析(JDK1.7和1.8对比)

标签:并且   stat   jdk源码   find   nsf   位置   字段   else   共享   

原文地址:http://www.cnblogs.com/sunlightcty/p/7511114.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!