标签:并且 stat jdk源码 find nsf 位置 字段 else 共享
一、ConcurrentHashMap简介
并发编程大师Doug Lea开发的并发容器之一。ConcurrentHashMap是线程安全且高效的HashMap,在HashMap的基础上增加了线程安全,当然结构方面也有所改变。
为什么要使用ConcurrentHashMap?
1、多线程环境下,HashMap会处于不安全状态。例如put操作可能会引起程序死循环,Cpu占有率达百分百,原因是多线程会导致HashMap的Entry链表形成环形数据结构,如此一来,他的next结点将永不为空,就会产生死循环获取Entry。 既然不能使用HashMap,在多线程环境下就给并发容器有了登场机会。
2、HashTable效率低下。HashTable相信很多人都知道,相比HashMap就增加了线程安全机制,在过去HashTable就会被使用,但由于它的实现是利用synchronized来保证线程安全的,在线程竞争激烈的情况下,效率就非常低下了。原因 在于当有线程访问HashTable的同步方法时,其他线程也来访问就会进行阻塞或者轮询状态。因此效率低下,如今开发中已经鲜有人使用HashTable了。
3、ConcurrentHashMap采用锁分段技术可有效提高并发访问率。这个将马上展开说明。
二、ConcurrentHashMap结构与实现
JDK1.7实现
走进jdk源码,类似HashMap,先看看它的类结构:
public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> implements ConcurrentMap<K,V>, Serializable{}
同样是继承了ConcurrentMap的骨架实现类AbstractMap,并且实现了ConcurrentMap。
通过它的类图来分析结构,ConcurrentHashMap是由Segment数组结构和HashEntry数组结构组成,Segment是一种可重入锁,在ConcurrentHashMap中扮演锁的角色,HashEntry则用来存储键值对数据。一个ConcurrentHashMap中维护一个
Segment数组,Segment结构是一种数组和链表结构,每个Segment又包含一个HashEntry数组,每个HashEntry是一个链表结构的元素。因此当对元素进行修改的时候,只需要获得对应的Segment锁即可,不需要锁住整个容器,从而大大提高了访问并发效率。
初始化segments数组的源代码:
if (concurrencyLevel > MAX_SEGMENTS) concurrencyLevel = MAX_SEGMENTS; int sshift = 0; int ssize = 1; while (ssize < concurrencyLevel) { ++shift; ssize <<= 1; } segmentShift = 32 - sshift; segmentMask = ssize - 1; this.segments = Segment.newArray(ssize);
由源代码可知,segment的长度是通过concurrencyLevel计算得出的。最大值为65536。
而HashEntry的初始化长度则由以下源代码实现:
if (c * ssize < initialCapacity) ++c; int cap = MIN_SEGMENT_TABLE_CAPACITY; while (cap < c) cap <<= 1;
定位Segment方法:
通过对元素的hashCode进行一次再散列。
private static int hash(int h) { h += (h<<15)^0xffffcd7d; h ^= (h>>>10); h += (h<<3); h ^= (h>>>6); h += (h<<2) + (h << 14); return h ^ (h >>> 16); }
final Segment<K,V> segmentFor(int hash) { return segments[(hash >>> segmentShift) & segmentMask]; }
get、put、size操作
1、get操作
public V get(Object key) { int hash = hash(key.hashCode()) ; return segmentFor(hash).get(key, hash); }
get操作先经过一次再散列,然后使用这个散列值通过散列运算定位到Segment,再通过散列算法定位到元素。整个get过程,不需要加锁,除非读到的值是空才会加锁重读。
为什么不需要加锁呢?在HashTable中get方法是需要加锁的,原因就在于ConcurrentHashMap中把共享变量定义为volatile类型,如此一来即可在线程之间保持可见性,确保读到最新的值。
2、put操作
put操作首先定位到Segment,然后在里面进行插入操作,在此之前需要进行是否扩容判断。扩容会创建容量为原来两倍的数组,然后把元素重新散列后插入到新的数组,为了高效
ConcurrentHashMap只会对某一个Segment进行扩容。
3、size操作
统计整个容器的元素大小。Segment里面的count变量是volatie变量,似乎只要把全部count相加即可得到结果。但实际上,有可能发送在累加完毕前count发生了变化,因此统计结果就会不准确。
针对这种问题,ConcurrentHashMap采取了先尝试两次不锁Segment的方式来统计,如果两次统计结果相同则准确,否则则对Segment加锁进行最安全的统计。
接下来看JDK1.8的ConcurrentHashMap实现:
1.8中放弃了Segment
臃肿的设计,取而代之的是采用Node
+ CAS
+ Synchronized
来保证并发安全进行实现
改进主要有两点:
一:取消了Segment字段,直接采用HashEntry的table来存储数据。采用table的元素作为锁。
二:将原先table数组+单向链表的数据结构,变更为table数组+单向链表+红黑树的结构,这一点上与1.8HashMap更加相似。
这里就不在画图展示结构。直接进入1.8的源码分析。
初始化:
构造函数中不再初始化table,初始化table延迟到第一次put操作。如此一来又会存在并发问题,因为即便是第一次put操作也存在并发可能性,下面看看是怎么解决这个延迟初始化操作同时避免并发问题:
首先介绍sizeCtl字段的含义
private final Node<K,V>[] initTable() { Node<K,V>[] tab; int sc; while ((tab = table) == null || tab.length == 0) { //如果一个线程发现sizeCtl<0,意味着另外的线程执行CAS操作成功,当前线程只需要让出cpu时间片 if ((sc = sizeCtl) < 0) Thread.yield(); // lost initialization race; just spin else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { try { if ((tab = table) == null || tab.length == 0) { int n = (sc > 0) ? sc : DEFAULT_CAPACITY; @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; table = tab = nt; sc = n - (n >>> 2); } } finally { sizeCtl = sc; } break; } } return tab; }
这样便可延迟初始化数组了。
put操作、get操作、size操作
1、put操作
public V put(K key, V value) { return putVal(key, value, false); } /** Implementation for put and putIfAbsent */ final V putVal(K key, V value, boolean onlyIfAbsent) { if (key == null || value == null) throw new NullPointerException(); int hash = spread(key.hashCode()); int binCount = 0; for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; if (tab == null || (n = tab.length) == 0) tab = initTable(); //第一次put,即table还没初始化,详情看上面的代码 else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break; // no lock when adding to empty bin } else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); else { V oldVal = null;
//加锁,针对单个结点加锁,进一步减少并发冲突,比segment更先进 synchronized (f) { if (tabAt(tab, i) == f) { //数量未达到使用红黑树 if (fh >= 0) { binCount = 1; for (Node<K,V> e = f;; ++binCount) { K ek; if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { //如果找到key 直接替代旧value oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } Node<K,V> pred = e; if ((e = e.next) == null) { pred.next = new Node<K,V>(hash, key, value, null); break; } } } else if (f instanceof TreeBin) { //如果是红黑树结点,则操作红黑树插入元素 Node<K,V> p; binCount = 2; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } if (binCount != 0) { //如果数量大于8 转化为红黑树结构 if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } } addCount(1L, binCount); return null; }
Hash算法 static final int spread(int h) {return (h ^ (h >>> 16)) & HASH_BITS;}
定位索引位置 int index = (n - 1) & hash
public V get(Object key) { Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek; int h = spread(key.hashCode()); if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) { if ((eh = e.hash) == h) { if ((ek = e.key) == key || (ek != null && key.equals(ek))) return e.val; } else if (eh < 0) return (p = e.find(h, key)) != null ? p.val : null; while ((e = e.next) != null) { if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; } } return null; }
3.size操作
1.8中使用一个volatile
类型的变量baseCount
记录元素的个数,当插入新数据或则删除数据时,会通过addCount()
方法更新baseCount
,实现如下:
if ((as = counterCells) != null || !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) { CounterCell a; long v; int m; boolean uncontended = true; if (as == null || (m = as.length - 1) < 0 || (a = as[ThreadLocalRandom.getProbe() & m]) == null || !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { fullAddCount(x, uncontended); return; } if (check <= 1) return; s = sumCount(); }
1、初始化时counterCells
为空,在并发量很高时,如果存在两个线程同时执行CAS
修改baseCount
值,则失败的线程会继续执行方法体中的逻辑,使用CounterCell
记录元素个数的变化;
2、如果CounterCell
数组counterCells
为空,调用fullAddCount()
方法进行初始化,并插入对应的记录数,通过CAS
设置cellsBusy字段,只有设置成功的线程才能初始化CounterCell
数组,实现如下:
else if (cellsBusy == 0 && counterCells == as && U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) { boolean init = false; try { // Initialize table if (counterCells == as) { CounterCell[] rs = new CounterCell[2]; rs[h & 1] = new CounterCell(x); counterCells = rs; init = true; } } finally { cellsBusy = 0; } if (init) break; }
3、如果通过CAS
设置cellsBusy字段失败的话,则继续尝试通过CAS
修改baseCount
字段,如果修改baseCount
字段成功的话,就退出循环,否则继续循环插入CounterCell
对象;
else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x)) break;
所以在1.8中的size
实现比1.7简单多,因为元素个数保存baseCount
中,部分元素的变化个数保存在CounterCell
数组中,实现如下:
public int size() { long n = sumCount(); return ((n < 0L) ? 0 : (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int)n); } final long sumCount() { CounterCell[] as = counterCells; CounterCell a; long sum = baseCount; if (as != null) { for (int i = 0; i < as.length; ++i) { if ((a = as[i]) != null) sum += a.value; } } return sum; }
通过累加baseCount
和CounterCell
数组中的数量,即可得到元素的总个数;
参考资料:文章 http://www.jianshu.com/p/e694f1e868ec
书籍 《JAVA并发编程的艺术》
ConcurrentHashMap源码分析(JDK1.7和1.8对比)
标签:并且 stat jdk源码 find nsf 位置 字段 else 共享
原文地址:http://www.cnblogs.com/sunlightcty/p/7511114.html