标签:
线程安全的HashMap版本。
1)基本思想:将整个大的hash table进一步细分成小的hash table,即Segment;
2)读不用加锁;写操作在所在的Segmenet上加锁,而不是整个HashMap,Hashtable就是所有方法竞争Hashtable上的锁,导致并发效率低;
3)采用懒构造segment(除了segments[0]),以减少初始化内存。Unsafe类实现了AtomicReferenceArrays的功能,但减少了间接引用的程度。对Segment.table元素、HashEntry.next属性采用更轻量级的“lazySet”写(用Unsafe.putOrderedObject实现,确保用在lock中,其后有unlock释放),以保证table更新的顺序一致性。之前的ConcurrentHashMap类过度依赖final关键字,虽然能避免一些volatile读,但是初始化时需要较大的内存。
4)并发级别由concurrencyLevel参数确定,太大会浪费内存空间和遍历时间,太小则加强竞争;当只有一个线程写,其他线程都是读,则设置为1;
5)迭代器为其创建时或之后的某个时刻ConcurrentHashMap状态,不会抛出ConcurrentModificationException,用来在一个线程中一次使用;
6)尽量避免rehash。
Segment<K,V>数组,每一个Segment又是特殊的hash table,包括HashEntry<K,V>数组,HashEntry为并发版的键值对:
final Segment<K,V>[] segments; // final方式 static final class Segment<K,V> extends ReentrantLock implements Serializable { // 在预扫描中,segment阻塞前tryLock尝试的最大数 // 在多处理器中,采用有限的尝试次数使得在查找节点时缓存 static final int MAX_SCAN_RETRIES = Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1; // segment的table数组 // 采用entryAt/setEntryAt提供对其元素的volatile读写 transient volatile HashEntry<K,V>[] table; // segment中元素数目,获取方式采用lock或其他volatile方式 transient int count; // 所在segment的写总数 // 即使其overflows,也可以保证在isEmpty()、size()使用中的正确性 // 获取方式采用lock或其他volatile方式 transient int modCount; // segment中table的负载阈值,超过则其table rehash transient int threshold; // 负载因子,对所有segment都一样。 // 这个单独作为segment的一个属性是为了避免链接到外部对象 final float loadFactor; Segment(float lf, int threshold, HashEntry<K,V>[] tab) { this.loadFactor = lf; this.threshold = threshold; this.table = tab; } } static final class HashEntry<K,V> { // hash、key为final;value、next为volatile final int hash; final K key; volatile V value; volatile HashEntry<K,V> next; HashEntry(int hash, K key, V value, HashEntry<K,V> next) { this.hash = hash; this.key = key; this.value = value; this.next = next; } // 用UNSAFE.putOrderedObject进行next的volatile写 final void setNext(HashEntry<K,V> n) { UNSAFE.putOrderedObject(this, nextOffset, n); } // Unsafe mechanics static final sun.misc.Unsafe UNSAFE; static final long nextOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class k = HashEntry.class; nextOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("next")); } catch (Exception e) { throw new Error(e); } } }
segments.length = 16
segmentShift = 28
segmentMask = 15
Segment.table.length = 2
Segment.loadFactor = 0.75
Segment.threshold = 1
// 带初始容量、负载因子、并发级别参数构造 // initialCapacity参数:用来确定Segment容量 // loadFactor参数:Segment的负载因子 // concurrencyLevel参数:用来确定segments长度,即并发级别 public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); if (concurrencyLevel > MAX_SEGMENTS) concurrencyLevel = MAX_SEGMENTS; // Find power-of-two sizes best matching arguments int sshift = 0; int ssize = 1; while (ssize < concurrencyLevel) { ++sshift; ssize <<= 1; } this.segmentShift = 32 - sshift; // 用来确定segment的移位 this.segmentMask = ssize - 1; // 用来确定segment的掩码,即Segment数组长度-1,key hashCode的高位确定segment index if (initialCapacity > MAXIMUM_CAPACITY) initialCapacity = MAXIMUM_CAPACITY; int c = initialCapacity / ssize; // 初始容量/Segment数组长度,即用来确定Segment容量 ++c; int cap = MIN_SEGMENT_TABLE_CAPACITY; while (cap < c) cap <<= 1; // Segment容量 // create segments and segments[0] Segment<K,V> s0 = new Segment<K,V>(loadFactor, (int)(cap * loadFactor), (HashEntry<K,V>[])new HashEntry[cap]); // 保证序列化与之前版本的兼容 Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize]; UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0] this.segments = ss; } // 带初始容量、负载因子参数构造,默认并发级别为16 public ConcurrentHashMap(int initialCapacity, float loadFactor) { this(initialCapacity, loadFactor, DEFAULT_CONCURRENCY_LEVEL); } // 带初始容量参数构造,默认负载因子0.75 public ConcurrentHashMap(int initialCapacity) { this(initialCapacity, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL); } // 无参构造,默认初始容量16 public ConcurrentHashMap() { this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL); } /** * Creates a new map with the same mappings as the given map. * The map is created with a capacity of 1.5 times the number * of mappings in the given map or 16 (whichever is greater), * and a default load factor (0.75) and concurrencyLevel (16). * * @param m the map */ public ConcurrentHashMap(Map<? extends K, ? extends V> m) { this(Math.max((int) (m.size() / DEFAULT_LOAD_FACTOR) + 1, DEFAULT_INITIAL_CAPACITY), DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL); putAll(m); }
1)当Segment中键值对数超过负载阈值且table长度小于MAXIMUM_CAPACITY = 1 << 30,则对其table进行容量调整,table容量翻倍;
2)table最大容量为MAXIMUM_CAPACITY = 1 << 30;
3)table容量达到MAXIMUM_CAPACITY 后,如果有put请求,则直接在相应的bucket中链接进来,不会控制键值对的添加;
4)若进行了table的容量调整,需要将旧table关联的键值对重新在新table中确定bucket,再添加进来,也就是所说的hash table rehash,这里可以重用一些旧table的节点,因为next属性是不变的。
// rehash @SuppressWarnings("unchecked") private void rehash(HashEntry<K,V> node) { // table长度翻倍 // 由于table长度为2的幂次方,在从就table到新table的rehash过程中, // 元素的索引要么不变,要么移位2的幂次方; // 一些老的节点因其next属性不变可以重用,在默认的负载阈值下, // 只有1/6的元素需要复制。Entry的读取值采用普通的数组索引,这是 // 因为其后有table的volatile写 HashEntry<K,V>[] oldTable = table; int oldCapacity = oldTable.length; int newCapacity = oldCapacity << 1; // table长度翻倍,长度依然为2的幂次方 threshold = (int)(newCapacity * loadFactor); // 负载阈值 HashEntry<K,V>[] newTable = (HashEntry<K,V>[]) new HashEntry[newCapacity]; int sizeMask = newCapacity - 1; for (int i = 0; i < oldCapacity ; i++) { HashEntry<K,V> e = oldTable[i]; // 采用普通的数组索引 if (e != null) { HashEntry<K,V> next = e.next; int idx = e.hash & sizeMask; if (next == null) // Single node on list newTable[idx] = e; else { HashEntry<K,V> lastRun = e; int lastIdx = idx; for (HashEntry<K,V> last = next; last != null; last = last.next) { int k = last.hash & sizeMask; if (k != lastIdx) { lastIdx = k; lastRun = last; } } newTable[lastIdx] = lastRun; // 在新的table中,重用索引相同的老的节点,其next属性不变 // Clone remaining nodes for (HashEntry<K,V> p = e; p != lastRun; p = p.next) { V v = p.value; int h = p.hash; int k = h & sizeMask; HashEntry<K,V> n = newTable[k]; newTable[k] = new HashEntry<K,V>(h, p.key, v, n); } } } } int nodeIndex = node.hash & sizeMask; // add the new node node.setNext(newTable[nodeIndex]); newTable[nodeIndex] = node; table = newTable; }
确定segments数组中Segment索引、Segment中table数组bucket索引
基于key的hashCode再哈希产生hash码,用其高位确定Segment索引,用其低位确定bucket索引:
// 对key的hashCode进行再次哈希,对其高位、低位进一步散列 // 由于segments、Segment.table length为均2的幂次方,所以对那些高位、低位相同的hashCode,容易产生hash碰撞; private int hash(Object k) { int h = hashSeed; if ((0 != h) && (k instanceof String)) { return sun.misc.Hashing.stringHash32((String) k); } h ^= k.hashCode(); // Spread bits to regularize both segment and index locations, // using variant of single-word Wang/Jenkins hash. h += (h << 15) ^ 0xffffcd7d; h ^= (h >>> 10); h += (h << 3); h ^= (h >>> 6); h += (h << 2) + (h << 14); return h ^ (h >>> 16); } // 用hash码的高位在segments中确定index int j = (hash >>> segmentShift) & segmentMask; // 用hash码的低位确定bucket index int index = (tab.length - 1) & hash;
步骤:
1)根据key的hashCode获取hash码;
2)用hash码的高位确定Segment;
3)将put操作委托给确定的Segment进行put;
4)先采用自旋获取该Segment的锁;
5)用hash码的低位确定该Segment中table的bucket;
6)先遍历该bucket中键值对,确定是否已有相同或hash码相等且key相等的键值对,有则替换新value后返回;否则用key、value、hash创建Entry,将其链接到bucket首位;
7)释放该Segment的锁。
public V put(K key, V value) { Segment<K,V> s; if (value == null) throw new NullPointerException(); int hash = hash(key); int j = (hash >>> segmentShift) & segmentMask; if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck (segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment s = ensureSegment(j); return s.put(key, hash, value, false); // 委托给确定的Segment进行put } // Segment put final V put(K key, int hash, V value, boolean onlyIfAbsent) { HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value); // 采用自旋,获取锁与遍历bucket热身 V oldValue; try { HashEntry<K,V>[] tab = table; int index = (tab.length - 1) & hash; // bucketIndex HashEntry<K,V> first = entryAt(tab, index); // table元素 volatile读 for (HashEntry<K,V> e = first;;) { if (e != null) { // 先遍历看是否已有键值对 K k; // key相同或hash码相等且key相等 if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { oldValue = e.value; if (!onlyIfAbsent) { e.value = value; ++modCount; } break; } e = e.next; } else { if (node != null) node.setNext(first); else node = new HashEntry<K,V>(hash, key, value, first); int c = count + 1; if (c > threshold && tab.length < MAXIMUM_CAPACITY) rehash(node); else setEntryAt(tab, index, node); // table元素 volatile写 ++modCount; count = c; oldValue = null; break; } } } finally { unlock(); } return oldValue; } // segments元素volatile读,若不存在,则创建再用CAS写入segments @SuppressWarnings("unchecked") private Segment<K,V> ensureSegment(int k) { final Segment<K,V>[] ss = this.segments; long u = (k << SSHIFT) + SBASE; // raw offset Segment<K,V> seg; // 对segments中元素进行volatile读,若为null则 if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { Segment<K,V> proto = ss[0]; // use segment 0 as prototype int cap = proto.table.length; float lf = proto.loadFactor; int threshold = (int)(cap * lf); HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap]; if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { // 再次检查是否为null Segment<K,V> s = new Segment<K,V>(lf, threshold, tab); while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))// 用CAS方式写 == null) { if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s)) break; } } } return seg; } // 在尝试获取Segment锁的过程中,遍历hash码所确定的bucket中的节点, // 如果没有,则创建返回,返回时确保获取到当前Segment锁。这里只采用equals // 来比较key,这不重要,主要是为了遍历热身。 private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) { HashEntry<K,V> first = entryForHash(this, hash); HashEntry<K,V> e = first; HashEntry<K,V> node = null; int retries = -1; // negative while locating node while (!tryLock()) { HashEntry<K,V> f; // to recheck first below if (retries < 0) { if (e == null) { if (node == null) // speculatively create node node = new HashEntry<K,V>(hash, key, value, null); retries = 0; } else if (key.equals(e.key)) // 不是真的比较,仅仅简单遍历 retries = 0; else e = e.next; } else if (++retries > MAX_SCAN_RETRIES) { lock(); break; } else if ((retries & 1) == 0 && (f = entryForHash(this, hash)) != first) { e = first = f; // re-traverse if entry changed retries = -1; } } return node; } // table元素volatile读 @SuppressWarnings("unchecked") static final <K,V> HashEntry<K,V> entryAt(HashEntry<K,V>[] tab, int i) { return (tab == null) ? null : (HashEntry<K,V>) UNSAFE.getObjectVolatile (tab, ((long)i << TSHIFT) + TBASE); } // table元素volatile写 static final <K,V> void setEntryAt(HashEntry<K,V>[] tab, int i, HashEntry<K,V> e) { UNSAFE.putOrderedObject(tab, ((long)i << TSHIFT) + TBASE, e); }
步骤:
1)根据key的hashCode获取hash码;
2)用hash码的高位确定Segment;
3)将put操作委托给确定的Segment进行remove;
4)先采用自旋获取该Segment的锁;
5)用hash码的低位确定该Segment中table的bucket;
6)遍历该bucket中键值对,确定是否已有相同或hash码相等且key相等的键值对,有则删除;
7)释放该Segment的锁。
public V remove(Object key) { int hash = hash(key); Segment<K,V> s = segmentForHash(hash); return s == null ? null : s.remove(key, hash, null); } final V remove(Object key, int hash, Object value) { if (!tryLock()) scanAndLock(key, hash); V oldValue = null; try { HashEntry<K,V>[] tab = table; int index = (tab.length - 1) & hash; HashEntry<K,V> e = entryAt(tab, index); HashEntry<K,V> pred = null; while (e != null) { K k; HashEntry<K,V> next = e.next; if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { V v = e.value; if (value == null || value == v || value.equals(v)) { if (pred == null) setEntryAt(tab, index, next); // 删除的是bucket的第一个节点,volatile写 else pred.setNext(next); // 通过改变next删除节点,next的volatile写 ++modCount; --count; oldValue = v; } break; } pred = e; e = next; } } finally { unlock(); } return oldValue; }
步骤:
1)根据key的hashCode获取hash码;
2)用hash码的高位确定Segment;
3)用hash码的低位确定该Segment中table的bucket;
4)遍历该bucket中键值对,确定是否已有相同或hash码相等且key相等的键值对,有则返回关联的value;否则返回null。
public V get(Object key) { Segment<K,V> s; // manually integrate access methods to reduce overhead HashEntry<K,V>[] tab; int h = hash(key); long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE; if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null && (tab = s.table) != null) { for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE); e != null; e = e.next) { K k; if ((k = e.key) == key || (e.hash == h && key.equals(k))) return e.value; } } return null; }
// 判断ConcurrentHashMap是否为空 // 用segment的modCount来确定调用时间段 public boolean isEmpty() { long sum = 0L; final Segment<K,V>[] segments = this.segments; for (int j = 0; j < segments.length; ++j) { Segment<K,V> seg = segmentAt(segments, j); if (seg != null) { if (seg.count != 0) return false; sum += seg.modCount; } } if (sum != 0L) { // recheck unless no modifications for (int j = 0; j < segments.length; ++j) { Segment<K,V> seg = segmentAt(segments, j); if (seg != null) { if (seg.count != 0) return false; sum -= seg.modCount; } } if (sum != 0L) // modCount溢不溢出都没有关系,只要sum为0L就表明没有修改 return false; } return true; } // 获取ConcurrentHashMap的键值对总数 // 用segment的modCount来确定调用时间段 // 先尝试RETRIES_BEFORE_LOCK次数获取size,获取失败则加全锁 public int size() { // Try a few times to get accurate count. On failure due to // continuous async changes in table, resort to locking. final Segment<K,V>[] segments = this.segments; int size; boolean overflow; // true if size overflows 32 bits long sum; // sum of modCounts long last = 0L; // previous sum int retries = -1; // first iteration isn't retry try { for (;;) { if (retries++ == RETRIES_BEFORE_LOCK) { for (int j = 0; j < segments.length; ++j) ensureSegment(j).lock(); // force creation } sum = 0L; size = 0; overflow = false; for (int j = 0; j < segments.length; ++j) { Segment<K,V> seg = segmentAt(segments, j); if (seg != null) { sum += seg.modCount; int c = seg.count; if (c < 0 || (size += c) < 0) overflow = true; } } if (sum == last) break; last = sum; } } finally { if (retries > RETRIES_BEFORE_LOCK) { for (int j = 0; j < segments.length; ++j) segmentAt(segments, j).unlock(); } } return overflow ? Integer.MAX_VALUE : size; }
HashIterator为迭代器基类,从后往前对ConcurrentHashMap进行全遍历,性能一般,KeyIterator、ValueIterator、EntryIterator都继承于该类:
abstract class HashIterator { int nextSegmentIndex; int nextTableIndex; HashEntry<K,V>[] currentTable; HashEntry<K, V> nextEntry; HashEntry<K, V> lastReturned; HashIterator() { nextSegmentIndex = segments.length - 1; nextTableIndex = -1; advance(); } /** * Set nextEntry to first node of next non-empty table * (in backwards order, to simplify checks). */ final void advance() { for (;;) { if (nextTableIndex >= 0) { if ((nextEntry = entryAt(currentTable, nextTableIndex--)) != null) // 往前找到不为null的bucket break; } else if (nextSegmentIndex >= 0) { Segment<K,V> seg = segmentAt(segments, nextSegmentIndex--);// 往前找到不为null的Segment if (seg != null && (currentTable = seg.table) != null) nextTableIndex = currentTable.length - 1; } else break; } } final HashEntry<K,V> nextEntry() { HashEntry<K,V> e = nextEntry; if (e == null) throw new NoSuchElementException(); lastReturned = e; // cannot assign until after null check if ((nextEntry = e.next) == null) advance(); return e; } public final boolean hasNext() { return nextEntry != null; } public final boolean hasMoreElements() { return nextEntry != null; } public final void remove() { if (lastReturned == null) throw new IllegalStateException(); ConcurrentHashMap.this.remove(lastReturned.key); lastReturned = null; } }
1)segments、Segment.table的长度均为2的幂次方;
2)用hash函数,基于key的hashCode再次哈希,对其高位、低位进一步散列 ;
3)用hash码的高位在segments中确定index,低位确定bucket index;
// 用hash码的高位在segments中确定index int j = (hash >>> segmentShift) & segmentMask; // 用hash码的低位确定bucket index int index = (tab.length - 1) & hash;
其并发体现在两点:
1)Segment之间的并发;
2)Segment内部的读写并发;
1)并发读取segments中的元素;2)将所有操作委托给Segment。
int h = hash(key); // hash码 long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;// segment index s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)// 对segment index元素volatile读
1)写线程之间竞争Segment同一把锁,读线程不加锁;
2)利用volatile、final语义保证并发写线程之间、并发读写线程之间的可见性。
public V get(Object key) { Segment<K,V> s; // manually integrate access methods to reduce overhead HashEntry<K,V>[] tab; int h = hash(key); long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE; if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null && (tab = s.table) != null) { // table volatile读 for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);// table元素volatile读 e != null; e = e.next) { // next volatile读 K k; if ((k = e.key) == key || (e.hash == h && key.equals(k))) // key、hash为final return e.value;// value volatile读 } } return null; } final V put(K key, int hash, V value, boolean onlyIfAbsent) { HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value); V oldValue; try { HashEntry<K,V>[] tab = table; // // table volatile读 int index = (tab.length - 1) & hash; HashEntry<K,V> first = entryAt(tab, index); // table元素volatile读 for (HashEntry<K,V> e = first;;) { if (e != null) { K k; if ((k = e.key) == key || (e.hash == hash && key.equals(k))) {// key、hash为final oldValue = e.value; if (!onlyIfAbsent) { e.value = value;// value volatile写 ++modCount; } break; } e = e.next;// next volatile读 } else { if (node != null) node.setNext(first); else node = new HashEntry<K,V>(hash, key, value, first); int c = count + 1; if (c > threshold && tab.length < MAXIMUM_CAPACITY) rehash(node); else setEntryAt(tab, index, node);// table元素volatile写,弱一致性 ++modCount; count = c; oldValue = null; break; } } } finally { unlock(); } return oldValue; }
在put时,如果获取的segment为null,则进行懒构造
if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck (segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment s = ensureSegment(j);
这样可以减少初始化内存空间。
final字段所在的类实例初始化完成后,在未在构造完成前暴露实例的this前提下,final字段对所有并发线程可见,可以部分减少volatile读,但是会增加初始化内存空间。
JDK容器与并发—Map—ConcurrentHashMap
标签:
原文地址:http://blog.csdn.net/architect0719/article/details/51119194