标签:分析 下标 相等 redo default seq size 为什么 efault
集合(Collection)是编程中常用的数据结构,而并发也是服务器端编程常用的技术之一,并发总是离不开集合这类高级数据结构的支持。比如两个线程需要同时访问一个中间临界区(Queue),比如常会用缓存作为外部文件的副本(HashMap)。而Map这种以键值对为元素的数据结构也是集合中最常用到的。Map家族中的三大类:HashMap、HashTable、ConcurrentHashMap。前者非线程安全的,后两者是线程安全的,而HashTable的实现原理与HashMap很相似,只是在公开的方法上(例如get)加入了synchronized关键字,保证同步。实现简单方便的劣势就是带来了性能的下降,因此目前HashTable已被淘汰,被同时满足线程安全而且性能更高(读写更快)的ConcurrentHashMap代替。
Java7和Java8的HashMap差别还是比较大的,源码角度来看也越来越复杂,本文将依次从Java7的HashMap以及Java8的HashMap,Java7的ConcurrentHashMap和Java8的ConcurrentHashMap的顺序来逐步分析源码,深入理解他们的原理。
因为HashMap不支持并发操作,所以它的代码也比较简单。先从下面这张图来了解HashMap的结构,由数组+链表而组成。
大方向上,HashMap 里面是一个数组,然后数组中每个元素是一个单向链表。
上图中,每个绿色的实体是嵌套类 Entry 的实例,Entry 包含四个属性:key, value, hash 值和用于单向链表的 next。
size的大小始终为2的n次方,
loadFactor:负载因子,默认为 0.75。
threshold:扩容的阈值,等于 capacity * loadFactor
我们知道在Java中最常用的两种结构是数组和模拟指针(引用),几乎所有的数据结构都可以利用这两种来组合实现。数组的存储方式在内存的地址是连续的,大小固定,一旦分配不能被其他引用占用。它的特点是查询快,时间复杂度是O(1),插入和删除的操作比较慢,时间复杂度是O(n),链表的存储方式是非连续的,大小不固定,特点与数组相反,插入和删除快,查询速度慢。HashMap可以说是一种折中的方案吧:外层是数组,每个数组下面连着链表。
put的过程比较简单,跟着代码看一遍就能看懂
public V put(K key, V value) { // 当插入第一个元素的时候,需要先初始化数组大小 if (table == EMPTY_TABLE) { inflateTable(threshold); } // 如果 key 为 null,感兴趣的可以往里看,最终会将这个 entry 放到 table[0] 中 if (key == null) return putForNullKey(value); // 1. 求 key 的 hash 值 int hash = hash(key); // 2. 找到对应的数组下标 int i = indexFor(hash, table.length); // 3. 遍历一下对应下标处的链表,看是否有重复的 key 已经存在, // 如果有,直接覆盖,put 方法返回旧值就结束了 for (Entry<K,V> e = table[i]; e != null; e = e.next) { Object k; if (e.hash == hash && ((k = e.key) == key || key.equals(k))) { V oldValue = e.value; e.value = value; e.recordAccess(this); return oldValue; } } modCount++; // 4. 不存在重复的 key,将此 entry 添加到链表中,细节后面说 addEntry(hash, key, value, i); return null; }
在第一个元素插入 HashMap 的时候做一次数组的初始化,就是先确定初始的数组大小,并计算数组扩容的阈值。
private void inflateTable(int toSize) { // 保证数组大小一定是 2 的 n 次方。 // 比如这样初始化:new HashMap(20),那么处理成初始数组大小是 32 int capacity = roundUpToPowerOf2(toSize); // 计算扩容阈值:capacity * loadFactor threshold = (int) Math.min(capacity * loadFactor, MAXIMUM_CAPACITY + 1); // 算是初始化数组吧 table = new Entry[capacity]; initHashSeedAsNeeded(capacity); //ignore }
这里有一个将数组大小保持为 2 的 n 次方的做法,Java7 和 Java8 的 HashMap 和 ConcurrentHashMap 都有相应的要求,只不过实现的代码稍微有些不同,后面再看到的时候就知道了。
这个比较简单,使用 key 的 hash 值对数组长度进行取模就可以了。
static int indexFor(int hash, int length) { // assert Integer.bitCount(length) == 1 : "length must be a non-zero power of 2"; return hash & (length-1); }
这个方法很简单,简单说就是取 hash 值的低 n 位。如在数组长度为 32 的时候,其实取的就是 key 的 hash 值的低 5 位,作为它在数组中的下标位置。
找到数组下标后,会先进行 key 判重,如果没有重复,说明发生了哈希碰撞就准备将新值放入到链表的表头(头插法)。
void addEntry(int hash, K key, V value, int bucketIndex) { // 如果当前 HashMap 大小已经达到了阈值,并且新值要插入的数组位置已经有元素了,那么要扩容 if ((size >= threshold) && (null != table[bucketIndex])) { // 扩容,后面会介绍一下 resize(2 * table.length); // 扩容以后,重新计算 hash 值 hash = (null != key) ? hash(key) : 0; // 重新计算扩容后的新的下标 bucketIndex = indexFor(hash, table.length); } // 往下看 createEntry(hash, key, value, bucketIndex); } // 这个很简单,其实就是将新值放到链表的表头,然后 size++ void createEntry(int hash, K key, V value, int bucketIndex) { Entry<K,V> e = table[bucketIndex]; table[bucketIndex] = new Entry<>(hash, key, value, e); size++; }
这个方法的主要逻辑就是先判断是否需要扩容,需要的话先扩容,然后再将这个新的数据插入到扩容后的数组的相应位置处的链表的表头。
前面我们看到,在插入新值的时候,如果当前的 size 已经达到了阈值,并且要插入的数组位置上已经有元素,那么就会触发扩容,扩容后,数组大小为原来的 2 倍。
void resize(int newCapacity) { Entry[] oldTable = table; int oldCapacity = oldTable.length; if (oldCapacity == MAXIMUM_CAPACITY) { threshold = Integer.MAX_VALUE; return; } // 新的数组 Entry[] newTable = new Entry[newCapacity]; // 将原来数组中的值迁移到新的更大的数组中 transfer(newTable, initHashSeedAsNeeded(newCapacity)); table = newTable; threshold = (int)Math.min(newCapacity * loadFactor, MAXIMUM_CAPACITY + 1);
我们接下来看一下迁移数组用到的transfer函数,这也是导致HashMap线程不安全的主要原因,再接下来会详细讲到
void transfer(Entry[] newTable, boolean rehash) { int newCapacity = newTable.length; for (Entry<K,V> e : table) { while(null != e) { Entry<K,V> next = e.next; if (rehash) { e.hash = null == e.key ? 0 : hash(e.key); } int i = indexFor(e.hash, newCapacity); e.next = newTable[i]; newTable[i] = e; e = next; } } }
大概做了一下几个事情:
相对于 put 过程,get 过程是非常简单的。
public V get(Object key) { // 之前说过,key 为 null 的话,会被放到 table[0],所以只要遍历下 table[0] 处的链表就可以了 if (key == null) return getForNullKey(); // Entry<K,V> entry = getEntry(key); return null == entry ? null : entry.getValue();
getEntry(key):
final Entry<K,V> getEntry(Object key) { if (size == 0) { return null; } int hash = (key == null) ? 0 : hash(key); // 确定数组下标,然后从头开始遍历链表,直到找到为止 for (Entry<K,V> e = table[indexFor(hash, table.length)]; e != null; e = e.next) { Object k; if (e.hash == hash && ((k = e.key) == key || (key != null && key.equals(k)))) return e; } return null; }
Java8 对 HashMap 进行了一些修改,最大的不同就是利用了红黑树,所以其由 数组+链表+红黑树 组成。
根据 Java7 HashMap 的介绍,我们知道,查找的时候,根据 hash 值我们能够快速定位到数组的具体下标,但是之后的话,需要顺着链表一个个比较下去才能找到我们需要的,时间复杂度取决于链表的长度,为 O(n)。
为了降低这部分的开销,在 Java8 中,当链表中的元素超过了 8 个以后,会将链表转换为红黑树,在这些位置进行查找的时候可以降低时间复杂度为 O(logN)。
Java7 中使用 Entry 来代表每个 HashMap 中的数据节点,Java8 中使用 Node,基本没有区别,都是 key,value,hash 和 next 这四个属性,不过,Node 只能用于链表的情况,红黑树的情况需要使用 TreeNode。
我们根据数组元素中,第一个节点数据类型是 Node 还是 TreeNode 来判断该位置下是链表还是红黑树的。
public V put(K key, V value) { return putVal(hash(key), key, value, false, true); } // 第三个参数 onlyIfAbsent 如果是 true,那么只有在不存在该 key 时才会进行 put 操作 // 第四个参数 evict 我们这里不关心 final V putVal(int hash, K key, V value, boolean onlyIfAbsent, boolean evict) { Node<K,V>[] tab; Node<K,V> p; int n, i; // 第一次 put 值的时候,会触发下面的 resize(),类似 java7 的第一次 put 也要初始化数组长度 // 第一次 resize 和后续的扩容有些不一样,因为这次是数组从 null 初始化到默认的 16 或自定义的初始容量 if ((tab = table) == null || (n = tab.length) == 0) n = (tab = resize()).length; // 找到具体的数组下标,如果此位置没有值,那么直接初始化一下 Node 并放置在这个位置就可以了 if ((p = tab[i = (n - 1) & hash]) == null) tab[i] = newNode(hash, key, value, null); else {// 数组该位置有数据 Node<K,V> e; K k; // 首先,判断该位置的第一个数据和我们要插入的数据,key 是不是"相等",如果是,取出这个节点 if (p.hash == hash && ((k = p.key) == key || (key != null && key.equals(k)))) e = p; // 如果该节点是代表红黑树的节点,调用红黑树的插值方法,本文不展开说红黑树 else if (p instanceof TreeNode) e = ((TreeNode<K,V>)p).putTreeVal(this, tab, hash, key, value); else { // 到这里,说明数组该位置上是一个链表 for (int binCount = 0; ; ++binCount) { // 插入到链表的最后面(Java7 是插入到链表的最前面) if ((e = p.next) == null) { p.next = newNode(hash, key, value, null); // TREEIFY_THRESHOLD 为 8,所以,如果新插入的值是链表中的第 9 个 // 会触发下面的 treeifyBin,也就是将链表转换为红黑树 if (binCount >= TREEIFY_THRESHOLD - 1) // -1 for 1st treeifyBin(tab, hash); break; } // 如果在该链表中找到了"相等"的 key(== 或 equals) if (e.hash == hash && ((k = e.key) == key || (key != null && key.equals(k)))) // 此时 break,那么 e 为链表中[与要插入的新值的 key "相等"]的 node break; p = e; } } // e!=null 说明存在旧值的key与要插入的key"相等" // 对于我们分析的put操作,下面这个 if 其实就是进行 "值覆盖",然后返回旧值 if (e != null) { V oldValue = e.value; if (!onlyIfAbsent || oldValue == null) e.value = value; afterNodeAccess(e); return oldValue; } } ++modCount; // 如果 HashMap 由于新插入这个值导致 size 已经超过了阈值,需要进行扩容 if (++size > threshold) resize(); afterNodeInsertion(evict); return null; }
和 Java7 稍微有点不一样的地方就是,Java7 是先扩容后插入新值的,Java8 先插值再扩容,不过这个不重要。
resize() 方法用于初始化数组或数组扩容,每次扩容后,容量为原来的 2 倍,并进行数据迁移。
1 final Node<K,V>[] resize() { 2 Node<K,V>[] oldTab = table; 3 int oldCap = (oldTab == null) ? 0 : oldTab.length; 4 int oldThr = threshold; 5 int newCap, newThr = 0; 6 if (oldCap > 0) { // 对应数组扩容 7 if (oldCap >= MAXIMUM_CAPACITY) { 8 threshold = Integer.MAX_VALUE; 9 return oldTab; 10 } 11 // 将数组大小扩大一倍 12 else if ((newCap = oldCap << 1) < MAXIMUM_CAPACITY && 13 oldCap >= DEFAULT_INITIAL_CAPACITY) 14 // 将阈值扩大一倍 15 newThr = oldThr << 1; // double threshold 16 } 17 else if (oldThr > 0) // 对应使用 new HashMap(int initialCapacity) 初始化后,第一次 put 的时候 18 newCap = oldThr; 19 else {// 对应使用 new HashMap() 初始化后,第一次 put 的时候 20 newCap = DEFAULT_INITIAL_CAPACITY; 21 newThr = (int)(DEFAULT_LOAD_FACTOR * DEFAULT_INITIAL_CAPACITY); 22 } 23 24 if (newThr == 0) { 25 float ft = (float)newCap * loadFactor; 26 newThr = (newCap < MAXIMUM_CAPACITY && ft < (float)MAXIMUM_CAPACITY ? 27 (int)ft : Integer.MAX_VALUE); 28 } 29 threshold = newThr; 30 31 // 用新的数组大小初始化新的数组 32 Node<K,V>[] newTab = (Node<K,V>[])new Node[newCap]; 33 table = newTab; // 如果是初始化数组,到这里就结束了,返回 newTab 即可 34 35 if (oldTab != null) { 36 // 开始遍历原数组,进行数据迁移。 37 for (int j = 0; j < oldCap; ++j) { 38 Node<K,V> e; 39 if ((e = oldTab[j]) != null) { 40 oldTab[j] = null; 41 // 如果该数组位置上只有单个元素,那就简单了,简单迁移这个元素就可以了 42 if (e.next == null) 43 newTab[e.hash & (newCap - 1)] = e; 44 // 如果是红黑树,具体我们就不展开了 45 else if (e instanceof TreeNode) 46 ((TreeNode<K,V>)e).split(this, newTab, j, oldCap); 47 else { 48 // 这块是处理链表的情况, 49 // 需要将此链表拆成两个链表,放到新的数组中,并且保留原来的先后顺序 50 // loHead、loTail 对应一条链表,hiHead、hiTail 对应另一条链表,代码还是比较简单的 51 Node<K,V> loHead = null, loTail = null; 52 Node<K,V> hiHead = null, hiTail = null; 53 Node<K,V> next; 54 do { 55 next = e.next; 56 if ((e.hash & oldCap) == 0) { 57 if (loTail == null) 58 loHead = e; 59 else 60 loTail.next = e; 61 loTail = e; 62 } 63 else { 64 if (hiTail == null) 65 hiHead = e; 66 else 67 hiTail.next = e; 68 hiTail = e; 69 } 70 } while ((e = next) != null); 71 if (loTail != null) { 72 loTail.next = null; 73 // 第一条链表 74 newTab[j] = loHead; 75 } 76 if (hiTail != null) { 77 hiTail.next = null; 78 // 第二条链表的新的位置是 j + oldCap,这个很好理解 79 newTab[j + oldCap] = hiHead; 80 } 81 } 82 } 83 } 84 } 85 return newTab; 86 }
相对于 put 来说,get 比较简单了。
public V get(Object key) { Node<K,V> e; return (e = getNode(hash(key), key)) == null ? null : e.value; } final Node<K,V> getNode(int hash, Object key) { Node<K,V>[] tab; Node<K,V> first, e; int n; K k; if ((tab = table) != null && (n = tab.length) > 0 && (first = tab[(n - 1) & hash]) != null) { // 判断第一个节点是不是就是需要的 if (first.hash == hash && // always check first node ((k = first.key) == key || (key != null && key.equals(k)))) return first; if ((e = first.next) != null) { // 判断是否是红黑树 if (first instanceof TreeNode) return ((TreeNode<K,V>)first).getTreeNode(hash, key); // 链表遍历 do { if (e.hash == hash && ((k = e.key) == key || (key != null && key.equals(k)))) return e; } while ((e = e.next) != null); } } return null; }
HashMap众所周知是线程不安全的,所有方法没采用同步处理,无法做到线程同步(线程安全)也是可以想象得到的。HashMap的线程不安全性主要体现在以下三个方面:
在多线程访问的情况下:
1.在hashmap做put操作的时候会调用下面方法:
// 新增Entry。将“key-value”插入指定位置,bucketIndex是位置索引。 void addEntry(int hash, K key, V value, int bucketIndex) { // 保存“bucketIndex”位置的值到“e”中 Entry<K,V> e = table[bucketIndex]; // 设置“bucketIndex”位置的元素为“新Entry”, // 设置“e”为“新Entry的下一个节点” table[bucketIndex] = new Entry<K,V>(hash, key, value, e); // 若HashMap的实际大小 不小于 “阈值”,则调整HashMap的大小 if (size++ >= threshold) resize(2 * table.length); }
在hashmap做put操作的时候会调用到以上的方法。现在假如A线程和B线程同时对同一个数组位置调用addEntry,两个线程会同时得到现在的头结点,然后A写入新的头结点之后,B也写入新的头结点,那B的写入操作就会覆盖A的写入操作造成A的写入操作丢失
2.删除键值对的代码
final Entry<K,V> removeEntryForKey(Object key) { // 获取哈希值。若key为null,则哈希值为0;否则调用hash()进行计算 int hash = (key == null) ? 0 : hash(key.hashCode()); int i = indexFor(hash, table.length); Entry<K,V> prev = table[i]; Entry<K,V> e = prev; // 删除链表中“键为key”的元素 // 本质是“删除单向链表中的节点” while (e != null) { Entry<K,V> next = e.next; Object k; if (e.hash == hash && ((k = e.key) == key || (key != null && key.equals(k)))) { modCount++; size--; if (prev == e) table[i] = next; else prev.next = next; e.recordRemoval(this); return e; } prev = e; e = next; }
当多个线程同时操作同一个数组位置的时候,也都会先取得现在状态下该位置存储的头结点,然后各自去进行计算操作,之后再把结果写会到该数组位置去,其实写回的时候可能其他的线程已经就把这个位置给修改过了,就会覆盖其他线程的修改。
3.addEntry中当加入新的键值对后键值对总数量超过门限值的时候会调用一个resize操作,代码如下:
这个操作会新生成一个新的容量的数组,然后对原数组的所有键值对重新进行计算和写入新的数组,之后指向新生成的数组。
当多个线程同时检测到总数量超过门限值的时候就会同时调用resize操作,各自生成新的数组并rehash后赋给该map底层的数组table,结果最终只有最后一个线程生成的新数组被赋给table变量,其他线程的均会丢失。而且某些线程已经完成赋值而其他线程刚开始的时候,就会用已经被赋值的table作为原始数组,这样也会有问题,多个线程同时进行resize操作也有可能使链表产生环,导致死锁,详细图文详解见http://www.importnew.com/22011.html
ConcurrentHashMap 和 HashMap 思路是差不多的,但是因为它支持并发操作,所以要复杂一些。
整个 ConcurrentHashMap 由一个个 Segment 组成,Segment 代表”部分“或”一段“的意思,所以很多地方都会将其描述为分段锁。注意,行文中,我很多地方用了“槽”来代表一个 segment。
简单理解就是,ConcurrentHashMap 是一个 Segment 数组,Segment 通过继承 ReentrantLock 来进行加锁,所以每次需要加锁的操作锁住的是一个 segment,这样只要保证每个 Segment 是线程安全的,也就实现了全局的线程安全。
concurrencyLevel:并行级别、并发数、Segment 数,怎么翻译不重要,理解它。默认是 16,也就是说 ConcurrentHashMap 有 16 个 Segments,所以理论上,这个时候,最多可以同时支持 16 个线程并发写,只要它们的操作分别分布在不同的 Segment 上。这个值可以在初始化的时候设置为其他值,但是一旦初始化以后,它是不可以扩容的。
再具体到每个 Segment 内部,其实每个 Segment 很像之前介绍的 HashMap,不过它要保证线程安全,所以处理起来要麻烦些。
initialCapacity:初始容量,这个值指的是整个 ConcurrentHashMap 的初始容量,实际操作的时候需要平均分给每个 Segment。
loadFactor:负载因子,之前我们说了,Segment 数组不可以扩容,所以这个负载因子是给每个 Segment 内部使用的。
public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); if (concurrencyLevel > MAX_SEGMENTS) concurrencyLevel = MAX_SEGMENTS; // Find power-of-two sizes best matching arguments int sshift = 0; int ssize = 1; // 计算并行级别 ssize,因为要保持并行级别是 2 的 n 次方 while (ssize < concurrencyLevel) { ++sshift; ssize <<= 1; } // 我们这里先不要那么烧脑,用默认值,concurrencyLevel 为 16,sshift 为 4 // 那么计算出 segmentShift 为 28,segmentMask 为 15,后面会用到这两个值 this.segmentShift = 32 - sshift; this.segmentMask = ssize - 1; if (initialCapacity > MAXIMUM_CAPACITY) initialCapacity = MAXIMUM_CAPACITY; // initialCapacity 是设置整个 map 初始的大小, // 这里根据 initialCapacity 计算 Segment 数组中每个位置可以分到的大小 // 如 initialCapacity 为 64,那么每个 Segment 或称之为"槽"可以分到 4 个 int c = initialCapacity / ssize; if (c * ssize < initialCapacity) ++c; // 默认 MIN_SEGMENT_TABLE_CAPACITY 是 2,这个值也是有讲究的,因为这样的话,对于具体的槽上, // 插入一个元素不至于扩容,插入第二个的时候才会扩容 int cap = MIN_SEGMENT_TABLE_CAPACITY; while (cap < c) cap <<= 1; // 创建 Segment 数组, // 并创建数组的第一个元素 segment[0] Segment<K,V> s0 = new Segment<K,V>(loadFactor, (int)(cap * loadFactor), (HashEntry<K,V>[])new HashEntry[cap]); Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize]; // 往数组写入 segment[0] UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0] this.segments = ss; }
初始化完成,我们得到了一个 Segment 数组。
我们就当是用 new ConcurrentHashMap() 无参构造函数进行初始化的,那么初始化完成后:
我们先看 put 的主流程,对于其中的一些关键细节操作,后面会进行详细介绍。
public V put(K key, V value) { Segment<K,V> s; if (value == null) throw new NullPointerException(); // 1. 计算 key 的 hash 值 int hash = hash(key); // 2. 根据 hash 值找到 Segment 数组中的位置 j // hash 是 32 位,无符号右移 segmentShift(28) 位,剩下低 4 位, // 然后和 segmentMask(15) 做一次与操作,也就是说 j 是 hash 值的最后 4 位,也就是槽的数组下标 int j = (hash >>> segmentShift) & segmentMask; // 刚刚说了,初始化的时候初始化了 segment[0],但是其他位置还是 null, // ensureSegment(j) 对 segment[j] 进行初始化 if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck (segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment s = ensureSegment(j); // 3. 插入新值到 槽 s 中 return s.put(key, hash, value, false); }
第一层皮很简单,根据 hash 值很快就能找到相应的 Segment,之后就是 Segment 内部的 put 操作了。
Segment 内部是由 数组+链表 组成的。
final V put(K key, int hash, V value, boolean onlyIfAbsent) { // 在往该 segment 写入前,需要先获取该 segment 的独占锁 // 先看主流程,后面还会具体介绍这部分内容 HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value); V oldValue; try { // 这个是 segment 内部的数组 HashEntry<K,V>[] tab = table; // 再利用 hash 值,求应该放置的数组下标 int index = (tab.length - 1) & hash; // first 是数组该位置处的链表的表头 HashEntry<K,V> first = entryAt(tab, index); // 下面这串 for 循环虽然很长,不过也很好理解,想想该位置没有任何元素和已经存在一个链表这两种情况 for (HashEntry<K,V> e = first;;) { if (e != null) { K k; if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { oldValue = e.value; if (!onlyIfAbsent) { // 覆盖旧值 e.value = value; ++modCount; } break; } // 继续顺着链表走 e = e.next; } else { // node 到底是不是 null,这个要看获取锁的过程,不过和这里都没有关系。 // 如果不为 null,那就直接将它设置为链表表头;如果是null,初始化并设置为链表表头。 if (node != null) node.setNext(first); else node = new HashEntry<K,V>(hash, key, value, first); int c = count + 1; // 如果超过了该 segment 的阈值,这个 segment 需要扩容 if (c > threshold && tab.length < MAXIMUM_CAPACITY) rehash(node); // 扩容后面也会具体分析 else // 没有达到阈值,将 node 放到数组 tab 的 index 位置, // 其实就是将新的节点设置成原链表的表头 setEntryAt(tab, index, node); ++modCount; count = c; oldValue = null; break; } } } finally { // 解锁 unlock(); } return oldValue; }
整体流程还是比较简单的,由于有独占锁的保护,所以 segment 内部的操作并不复杂。至于这里面的并发问题,我们稍后再进行介绍。
到这里 put 操作就结束了,接下来,我们说一说其中几步关键的操作。
前面我们看到,在往某个 segment 中 put 的时候,首先会调用 node = tryLock() ? null : scanAndLockForPut(key, hash, value),也就是说先进行一次 tryLock() 快速获取该 segment 的独占锁,如果失败,那么进入到 scanAndLockForPut 这个方法来获取锁。
下面我们来具体分析这个方法中是怎么控制加锁的。
private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) { HashEntry<K,V> first = entryForHash(this, hash); HashEntry<K,V> e = first; HashEntry<K,V> node = null; int retries = -1; // negative while locating node // 循环获取锁 while (!tryLock()) { HashEntry<K,V> f; // to recheck first below if (retries < 0) { if (e == null) { if (node == null) // speculatively create node // 进到这里说明数组该位置的链表是空的,没有任何元素 // 当然,进到这里的另一个原因是 tryLock() 失败,所以该槽存在并发,不一定是该位置 node = new HashEntry<K,V>(hash, key, value, null); retries = 0; } else if (key.equals(e.key)) retries = 0; else // 顺着链表往下走 e = e.next; } // 重试次数如果超过 MAX_SCAN_RETRIES(单核1多核64),那么不抢了,进入到阻塞队列等待锁 // lock() 是阻塞方法,直到获取锁后返回 else if (++retries > MAX_SCAN_RETRIES) { lock(); break; } else if ((retries & 1) == 0 && // 这个时候是有大问题了,那就是有新的元素进到了链表,成为了新的表头 // 所以这边的策略是,相当于重新走一遍这个 scanAndLockForPut 方法 (f = entryForHash(this, hash)) != first) { e = first = f; // re-traverse if entry changed retries = -1; } } return node; }
这个方法有两个出口,一个是 tryLock() 成功了,循环终止,另一个就是重试次数超过了 MAX_SCAN_RETRIES,进到 lock() 方法,此方法会阻塞等待,直到成功拿到独占锁。
这个方法就是看似复杂,但是其实就是做了一件事,那就是获取该 segment 的独占锁,如果需要的话顺便实例化了一下 node。
ConcurrentHashMap 初始化的时候会初始化第一个槽 segment[0],对于其他槽来说,在插入第一个值的时候进行初始化。
这里需要考虑并发,因为很可能会有多个线程同时进来初始化同一个槽 segment[k],不过只要有一个成功了就可以。
private Segment<K,V> ensureSegment(int k) { final Segment<K,V>[] ss = this.segments; long u = (k << SSHIFT) + SBASE; // raw offset Segment<K,V> seg; if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { // 这里看到为什么之前要初始化 segment[0] 了, // 使用当前 segment[0] 处的数组长度和负载因子来初始化 segment[k] // 为什么要用“当前”,因为 segment[0] 可能早就扩容过了 Segment<K,V> proto = ss[0]; int cap = proto.table.length; float lf = proto.loadFactor; int threshold = (int)(cap * lf); // 初始化 segment[k] 内部的数组 HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap]; if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { // 再次检查一遍该槽是否被其他线程初始化了。 Segment<K,V> s = new Segment<K,V>(lf, threshold, tab); // 使用 while 循环,内部用 CAS,当前线程成功设值或其他线程成功设值后,退出 while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s)) break; } } } return seg; }
重复一下,segment 数组不能扩容,扩容是 segment 数组某个位置内部的数组 HashEntry\[] 进行扩容,扩容后,容量为原来的 2 倍。
首先,我们要回顾一下触发扩容的地方,put 的时候,如果判断该值的插入会导致该 segment 的元素个数超过阈值,那么先进行扩容,再插值,读者这个时候可以回去 put 方法看一眼。
该方法不需要考虑并发,因为到这里的时候,是持有该 segment 的独占锁的。
// 方法参数上的 node 是这次扩容后,需要添加到新的数组中的数据。 private void rehash(HashEntry<K,V> node) { HashEntry<K,V>[] oldTable = table; int oldCapacity = oldTable.length; // 2 倍 int newCapacity = oldCapacity << 1; threshold = (int)(newCapacity * loadFactor); // 创建新数组 HashEntry<K,V>[] newTable = (HashEntry<K,V>[]) new HashEntry[newCapacity]; // 新的掩码,如从 16 扩容到 32,那么 sizeMask 为 31,对应二进制 ‘000...00011111’ int sizeMask = newCapacity - 1; // 遍历原数组,老套路,将原数组位置 i 处的链表拆分到 新数组位置 i 和 i+oldCap 两个位置 for (int i = 0; i < oldCapacity ; i++) { // e 是链表的第一个元素 HashEntry<K,V> e = oldTable[i]; if (e != null) { HashEntry<K,V> next = e.next; // 计算应该放置在新数组中的位置, // 假设原数组长度为 16,e 在 oldTable[3] 处,那么 idx 只可能是 3 或者是 3 + 16 = 19 int idx = e.hash & sizeMask; if (next == null) // 该位置处只有一个元素,那比较好办 newTable[idx] = e; else { // Reuse consecutive sequence at same slot // e 是链表表头 HashEntry<K,V> lastRun = e; // idx 是当前链表的头结点 e 的新位置 int lastIdx = idx; // 下面这个 for 循环会找到一个 lastRun 节点,这个节点之后的所有元素是将要放到一起的 for (HashEntry<K,V> last = next; last != null; last = last.next) { int k = last.hash & sizeMask; if (k != lastIdx) { lastIdx = k; lastRun = last; } } // 将 lastRun 及其之后的所有节点组成的这个链表放到 lastIdx 这个位置 newTable[lastIdx] = lastRun; // 下面的操作是处理 lastRun 之前的节点, // 这些节点可能分配在另一个链表中,也可能分配到上面的那个链表中 for (HashEntry<K,V> p = e; p != lastRun; p = p.next) { V v = p.value; int h = p.hash; int k = h & sizeMask; HashEntry<K,V> n = newTable[k]; newTable[k] = new HashEntry<K,V>(h, p.key, v, n); } } } } // 将新来的 node 放到新数组中刚刚的 两个链表之一 的 头部 int nodeIndex = node.hash & sizeMask; // add the new node node.setNext(newTable[nodeIndex]); newTable[nodeIndex] = node; table = newTable; }
这里的扩容比之前的 HashMap 要复杂一些,代码难懂一点。上面有两个挨着的 for 循环,第一个 for 有什么用呢?
仔细一看发现,如果没有第一个 for 循环,也是可以工作的,但是,这个 for 循环下来,如果 lastRun 的后面还有比较多的节点,那么这次就是值得的。因为我们只需要克隆 lastRun 前面的节点,后面的一串节点跟着 lastRun 走就是了,不需要做任何操作。
我觉得 Doug Lea 的这个想法也是挺有意思的,不过比较坏的情况就是每次 lastRun 都是链表的最后一个元素或者很靠后的元素,那么这次遍历就有点浪费了。不过 Doug Lea 也说了,根据统计,如果使用默认的阈值,大约只有 1/6 的节点需要克隆。
相对于 put 来说,get由于不会受到并发的影响,所以不涉及到同步操作,简单了很多。
public V get(Object key) { Node<K,V> e; return (e = getNode(hash(key), key)) == null ? null : e.value; } final Node<K,V> getNode(int hash, Object key) { Node<K,V>[] tab; Node<K,V> first, e; int n; K k; if ((tab = table) != null && (n = tab.length) > 0 && (first = tab[(n - 1) & hash]) != null) { // 判断第一个节点是不是就是需要的 if (first.hash == hash && // always check first node ((k = first.key) == key || (key != null && key.equals(k)))) return first; if ((e = first.next) != null) { // 判断是否是红黑树 if (first instanceof TreeNode) return ((TreeNode<K,V>)first).getTreeNode(hash, key); // 链表遍历 do { if (e.hash == hash && ((k = e.key) == key || (key != null && key.equals(k)))) return e; } while ((e = e.next) != null); } } return null; }
在Java6和Java7中ConcurrentHashMap使用锁分段技术提高并发访问效率。首先将数据分成一段一段地存储,然后给每一段数据配一个锁,当一个线程占用锁访问其中一段数据时,其他段的数据也能被其他线程访问。然而在Java8中的实现已经抛弃了Segment分段锁机制,利用CAS+Synchronized来保证并发更新的安全,底层依然采用数组+链表+红黑树的存储结构。
Java 8 CouncurrentHashMap的结构和Java 8 HashMap很像,不过要保证线程安全,源码上要复杂很多。
Java 8的ConcurrentHashMap取消segments字段,直接采用transient volatile HashEntry<K,V> table
保存数据,采用table数组元素作为锁,从而实现了对每一行数据进行加锁,进一步减少并发冲突的概率。
Node节点的value和next都用volatile修饰,保证并发性。
// 这构造函数里,什么都不干 public ConcurrentHashMap() { } public ConcurrentHashMap(int initialCapacity) { if (initialCapacity < 0) throw new IllegalArgumentException(); int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1)); this.sizeCtl = cap; }
这个初始化方法有点意思,通过提供初始容量,计算了 sizeCtl,sizeCtl = 【 (1.5 * initialCapacity + 1),然后向上取最近的 2 的 n 次方】。如 initialCapacity 为 10,那么得到 sizeCtl 为 16,如果 initialCapacity 为 11,得到 sizeCtl 为 32。
主要由乐观锁CAS和悲观锁synchronized来代替segment保证并发性。
public V put(K key, V value) { return putVal(key, value, false); } final V putVal(K key, V value, boolean onlyIfAbsent) { if (key == null || value == null) throw new NullPointerException(); // 得到 hash 值 int hash = spread(key.hashCode()); // 用于记录相应链表的长度 int binCount = 0; for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; // 如果数组"空",进行数组初始化 if (tab == null || (n = tab.length) == 0) // 初始化数组,后面会详细介绍 tab = initTable(); // 找该 hash 值对应的数组下标,得到第一个节点 f else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { // 如果数组该位置为空, // 用一次 CAS 操作将这个新值放入其中即可,这个 put 操作差不多就结束了,可以拉到最后面了 // 如果 CAS 失败,那就是有并发操作,进到下一个循环就好了 if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break; // no lock when adding to empty bin } // hash 居然可以等于 MOVED,这个需要到后面才能看明白,不过从名字上也能猜到,肯定是因为在扩容 else if ((fh = f.hash) == MOVED) // 帮助数据迁移,这个等到看完数据迁移部分的介绍后,再理解这个就很简单了 tab = helpTransfer(tab, f); else { // 到这里就是说,f 是该位置的头结点,而且不为空 V oldVal = null; // 获取数组该位置的头结点的监视器锁 synchronized (f) { if (tabAt(tab, i) == f) { if (fh >= 0) { // 头结点的 hash 值大于 0,说明是链表 // 用于累加,记录链表的长度 binCount = 1; // 遍历链表 for (Node<K,V> e = f;; ++binCount) { K ek; // 如果发现了"相等"的 key,判断是否要进行值覆盖,然后也就可以 break 了 if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } // 到了链表的最末端,将这个新值放到链表的最后面 Node<K,V> pred = e; if ((e = e.next) == null) { pred.next = new Node<K,V>(hash, key, value, null); break; } } } else if (f instanceof TreeBin) { // 红黑树 Node<K,V> p; binCount = 2; // 调用红黑树的插值方法插入新节点 if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } // binCount != 0 说明上面在做链表操作 if (binCount != 0) { // 判断是否要将链表转换为红黑树,临界值和 HashMap 一样,也是 8 if (binCount >= TREEIFY_THRESHOLD) // 这个方法和 HashMap 中稍微有一点点不同,那就是它不是一定会进行红黑树转换, // 如果当前数组的长度小于 64,那么会选择进行数组扩容,而不是转换为红黑树 // 具体源码我们就不看了,扩容部分后面说 treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } } // addCount(1L, binCount); return null; }
public V get(Object key) { Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek; int h = spread(key.hashCode()); if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) { // 判断头结点是否就是我们需要的节点 if ((eh = e.hash) == h) { if ((ek = e.key) == key || (ek != null && key.equals(ek))) return e.val; } // 如果头结点的 hash 小于 0,说明 正在扩容,或者该位置是红黑树 else if (eh < 0) // 参考 ForwardingNode.find(int h, Object k) 和 TreeBin.find(int h, Object k) return (p = e.find(h, key)) != null ? p.val : null; // 遍历链表 while ((e = e.next) != null) { if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; } } return null; }
http://www.importnew.com/28263.html
https://www.cnblogs.com/lchzls/p/6714689.html
https://blog.csdn.net/fjse51/article/details/55260493
标签:分析 下标 相等 redo default seq size 为什么 efault
原文地址:https://www.cnblogs.com/kukri/p/9392906.html