标签:服务 没有初始化 comm 没有 boolean 数组 while 循环 font 遍历
在JDK1.8之前的实现结构是:ReentrantLock+Segment+HashEntry+链表
JDK1.8之后的实现结构是:synchronized+CAS+Node+链表或红黑树(与HashMap一致)
而1.8之前锁的是Segment,1.8锁的是Node数组里的Node,准确来说是头结点。如图虚线所示:
为什么要废弃锁分段机制:
1. 分段造成内存浪费(内存不连续,碎片化)
2. 在添加时竞争同一个锁的概率非常小,分段锁反而会造成更新等操作的长时间等待;并且当某个段很大时,分段锁的性能会下降。
3. 为了提高 GC 的效率
为什么加锁不用ReentrantLock而是用synchronized:
1. 锁的细化,之前ReentrantLock锁住的是整个段,现在synchronized锁住的是单个Node。
2. 因为锁的细化,出现竞争的情况大大减少。
3. 如果竞争同一个Node,只要线程可以在自旋有限次数内拿到锁,Synchronized就不会升级为重量级锁,而等待的线程也就不用被挂起,我们也就少了挂起和唤醒这个上下文切换的过程开销;而ReentrantLock不会自旋,只会挂起,多了个上下文切换的开销。
为什么容量最好为2的幂:
当数组长度为2的n次幂的时候,不同的key算得hash相同的几率较小,那么数据在数组上分布就比较均匀,也就是发生碰撞的几率较小,进而导致链表结构减少,查询的时候不用遍历链表的话查询效率就高了。
为什么get不用加锁:
前面我画的图里,Node的成员变量val是用volatile关键字修饰的,其它线程做出的修改能够马上看见,保证每次读取的都是最新的数据。
put
final V putVal(K key, V value, boolean onlyIfAbsent) { // key和value 不能为null if (key == null || value == null) throw new NullPointerException(); int hash = spread(key.hashCode()); int binCount = 0; // 遍历Node数组 for (Node<K,V>[] tab = table;;) { // f存储当前位置数组上的Node,n代表数组长度,i代表当前数组下标,fh代表当前Node的hash值 Node<K,V> f; int n, i, fh; if (tab == null || (n = tab.length) == 0) // 为空或者长度为0,初始化数组 tab = initTable(); else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { // 目标位置的值为null,利用CAS设置value,返回。 if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break; // no lock when adding to empty bin } else if ((fh = f.hash) == MOVED) // 如果hash值等于-1,代表正在扩容,helpTransfer会帮助扩容 tab = helpTransfer(tab, f); else { V oldVal = null; // 加锁进入 synchronized (f) { // 再获取一下当前位置的Node,如果和前面获取的f不一致则发生了变化,跳出同步块 if (tabAt(tab, i) == f) { // fh为正数,代表链表结构 if (fh >= 0) { binCount = 1; // 遍历链表节点 for (Node<K,V> e = f;; ++binCount) { K ek; // 如果hash值一样,并且key也一样,则覆盖旧值 if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } Node<K,V> pred = e; // 如果已经遍历到了最后(e.next==null),则直接插入到最后 if ((e = e.next) == null) { pred.next = new Node<K,V>(hash, key, value, null); break; } } } // fh < 0代表正在扩容或者红黑树结构 else if (f instanceof TreeBin) { Node<K,V> p; binCount = 2; // 添加到红黑树中 if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; // key冲突,则覆盖旧值 if (!onlyIfAbsent) p.val = value; } } } } // binCount为当前位置包含的的Node数量,如果不是0,则判断是否需要扩容 if (binCount != 0) { // Node数量大于等于8,当前位置的数据类型转为树 if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); // 如果oldVal不为空,证明存在覆盖的情况,直接返回旧值 if (oldVal != null) return oldVal; break; } } } // 整个Map的Node数量+1,如果需要扩容则进行扩容 addCount(1L, binCount); return null; }
过程和HashMap类似:
0. Node数组没有初始化先去初始化;
1. 根据hash找到数组中的位置,如果此位置为空,则直接利用CAS将新节点放在此处;
2. 如果当前位置不为空,则判断此位置的Node的hash是否等于-1,等于-1代表正在进行扩容操作,调用helpTransfer协助扩容;
3. 此位置Node的hash不等于-1,则对其进行加锁:
4. 如果此位置Node的hash大于等于0,证明这是个链表结构,先看是否存在相同的key,有则覆盖,无则把新结点添加到链表最后;
5. 否则判断当前节点是否是树节点,如果是树节点,则添加到树中,有重复的key同样会覆盖;
6. 退出同步块,判断binCount(Node计数器)如果大于等于8,则把当前位置的链表转变成红黑树;(这里可以看出,binCount主要服务于链表结构,具体位置统计当前链表的大小)
7. 最后把整个Map的Node总数+1,如果需要扩容则扩容。
下面看一下initTable【初始化的过程】
private final Node<K,V>[] initTable() { Node<K,V>[] tab; int sc; while ((tab = table) == null || tab.length == 0) { // table为空并且sizeCtl < 0,有其它线程在初始化,则调用Thread.yield(),让掉自己的CPU执行时间 if ((sc = sizeCtl) < 0) // 不扩容时:sizeCtl=数组长度*扩容因子;扩容和初始化table时:sizeCtl < 0 Thread.yield(); // 放弃初始化的竞争,仅仅自旋 else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { // sizeCtl > 0,说明没有线程竞争初始化table,利用CAS将sizeCtl设置为-1,代表正在初始化 try { // 再次判断table是否为空 if ((tab = table) == null || tab.length == 0) { // 设置table容量,如果sc大于0则使用sc,否则使用默认的16 int n = (sc > 0) ? sc : DEFAULT_CAPACITY; // 根据容量new一个Node数组 @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; // 新数组替换老数组 table = tab = nt; // sc = 新容量 - (新容量/2^2),无符号右移2位,相当于除以2^2=4 // 以16为例:sc = 16-(16/4)= 16-4 = 12,也就是下一次扩容的阈值 sc = n - (n >>> 2); } } finally { // 最后,更新sizeCtl sizeCtl = sc; } break; } } // 返回新数组 return tab; }
总结:
1. 根据sizeCtl判断,如果小于0,表示正在初始化,则让出当前线程的时间片。
2. 设置sizeCtl为-1,代表正在执行初始化操作;如果sc存储的变量大于0,则新容量=sc,否则等于默认容量16;根据新容量new一个新Node数组,并更新table为新数组;更新sizeCtl为新容量的75%
再来看一下addCount【Node总数+1 & 扩容的过程】
/** * sizeCtl(-1表示table正在初始化,其他线程要让出CPU时间片;-N表示有N-1个线程正在执行扩容操作;大于0表示扩容阈值=容量*负载因子) * @param x 需要加上的数量 * @param check if <0, don‘t check resize, if <= 1 only check if uncontended */ private final void addCount(long x, int check) { // CounterCell:顾名思义,用于计数的格子。说白了就是用来统计table中每一个位置的Node数量。 CounterCell[] as; long b, s; // CounterCell不为null if ((as = counterCells) != null || // 或者利用CAS将baseCount更新为baseCount+1失败,就放弃对baseCount的操作 !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) { CounterCell a; long v; int m; boolean uncontended = true; if (as == null || (m = as.length - 1) < 0 || (a = as[ThreadLocalRandom.getProbe() & m]) == null || !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { fullAddCount(x, uncontended); return; } if (check <= 1) return; // 合计Node总数,其中的实现是遍历CounterCell[],累加其中的value s = sumCount(); } // check>=0,需要检查是否需要扩容 if (check >= 0) { // tab:指向table,nt:指向nextTable;n为当前table的容量,sc为当前扩容阈值 Node<K,V>[] tab, nt; int n, sc; // Node总数大于扩容阈值sizeCtl 并且 table不为空 并且 table容量小于最大容量 while (s >= (long)(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) { int rs = resizeStamp(n); // 如果正在扩容 if (sc < 0) { // 如果sizeCtl变化了或者扩容结束了,则跳出循环 if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) break; // 如果可以帮助扩容,那么将 sc 加 1. 表示多了一个线程在帮助扩容 if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) transfer(tab, nt); } // 如果没有扩容,将 sc 更新为负数,表示当前线程发起扩容操作 else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) transfer(tab, null); s = sumCount(); } } }
总结:
1. 使table的长度+1。CounterCell不为null,就使用CounterCell,否则直接利用CAS操纵baseCount。
2. 如果需要扩容,先看是否已经在扩容了,如果是,则加入扩容线程,否则就调用扩容方法开启扩容。
最后看transfer方法,这是扩容过程 的核心
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) { // n为当前数组大小,stride存储步长 int n = tab.length, stride; // 根据cpu核数计算出步长,用于分割扩容任务,方便其余线程帮助扩容,最小为16 // 默认每个线程处理16个桶。因此,如果长度是16的时候,扩容的时候只会有一个线程扩容。 if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; // subdivide range // 判断nextTab是否为空,nextTab是暂时存储扩容后的node的数组,第一次进入这个方法的线程才会发现nextTab为空 if (nextTab == null) { // initiating try { // 初始化nextTab,容量是tab的2倍 @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; nextTab = nt; } catch (Throwable ex) { // try to cope with OOME sizeCtl = Integer.MAX_VALUE; return; } nextTable = nextTab; // 当前数组长度赋给transferIndex transferIndex = n; } // nextTab的大小 int nextn = nextTab.length; ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); boolean advance = true; // finishing为true代表扩容结束 boolean finishing = false; // to ensure sweep before committing nextTab for (int i = 0, bound = 0;;) { Node<K,V> f; int fh; // 进入一个 while 循环,分配数组中一个桶的区间给线程. 从大到小进行分配。当拿到分配值后,进行 i-- 递减。这个 i 就是数组下标。 while (advance) { int nextIndex, nextBound; if (--i >= bound || finishing) advance = false; else if ((nextIndex = transferIndex) <= 0) { i = -1; advance = false; } else if (U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { bound = nextBound; i = nextIndex - 1; advance = false; } } if (i < 0 || i >= n || i + n >= nextn) { int sc; // 如果扩容结束 if (finishing) { // 清除临时变量 nextTable = null; // 更新table变量 table = nextTab; // 更新sizeCtl,这个等价于新容量*0.75 sizeCtl = (n << 1) - (n >>> 1); return; } // 尝试将 sc -1. 表示这个线程结束帮助扩容了 if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { // 果 sc - 2 不等于标识符左移 16 位。如果他们相等了,说明没有线程在帮助他们扩容了。也就是说,扩容结束了。 if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) // 不相等,说明没结束,当前线程结束方法。 return; // 如果相等,扩容结束了,更新 finising 变量 finishing = advance = true; // 再次循环检查一下整张表 i = n; // recheck before commit } } // 如果没有完成任务,且 i 对应的槽位是空,尝试 CAS 插入占位符,让 putVal 方法的线程感知。 else if ((f = tabAt(tab, i)) == null) advance = casTabAt(tab, i, null, fwd); // 如果 i 对应的槽位不是空,且有了占位符,那么该线程跳过这个槽位,处理下一个槽位。 else if ((fh = f.hash) == MOVED) advance = true; // already processed else { // 如果以上都是不是,说明这个槽位有一个实际的值。开始同步处理这个桶。 // 到这里,都还没有对桶内数据进行转移,只是计算了下标和处理区间,然后一些完成状态判断。同时,如果对应下标内没有数据或已经被占位了,就跳过了。 // 下面的处理过程和HashMap基本一样 synchronized (f) { // 再次判断当前节点是否发生了改变 if (tabAt(tab, i) == f) { // ln=lowNode=低位桶,hn=highNode=高位桶 Node<K,V> ln, hn; // 当前是链表结构 if (fh >= 0) { // 当前节点hash和老长度进行与运算 int runBit = fh & n; Node<K,V> lastRun = f; // 从当前节点的后继开始遍历 for (Node<K,V> p = f.next; p != null; p = p.next) { // 对每个节点的hash同长度进行按位与操作 int b = p.hash & n; // 如果节点的 hash 值和首节点的 hash 值按位与结果不同 if (b != runBit) { // 更新 runBit,用于下面判断 lastRun 该赋值给 ln 还是 hn。 runBit = b; // 这个 lastRun 保证后面的节点与自己的按位与值相同,避免后面没有必要的循环 lastRun = p; } } if (runBit == 0) { // 如果最后更新的 runBit 是 0 ,设置低位节点 ln = lastRun; hn = null; } else { // 否则设置高位节点 hn = lastRun; ln = null; } // 从头开始循环,目的是生成两个链表,lastRun 作为停止条件,这样做为了避免不必要的循环(lastRun 后面都是相同的hash按位与结果) for (Node<K,V> p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; // 依然根据是否为0作为区分条件 if ((ph & n) == 0) ln = new Node<K,V>(ph, pk, pv, ln); else hn = new Node<K,V>(ph, pk, pv, hn); } // 在新的数组i的位置上设置低位链表 setTabAt(nextTab, i, ln); // 在新的数组i+n的位置上设置高位链表 setTabAt(nextTab, i + n, hn); // 在老数组i的位置的链表设置成占位符 setTabAt(tab, i, fwd); // 继续向后 advance = true; } // 树结构 else if (f instanceof TreeBin) { // 当前位置的头节点,只不过是TreeNode TreeBin<K,V> t = (TreeBin<K,V>)f; // 定义低位树和高位树 TreeNode<K,V> lo = null, loTail = null; TreeNode<K,V> hi = null, hiTail = null; // 统计树的大小,为了判断是否需要退化成链表 int lc = 0, hc = 0; // 从根开始遍历 for (Node<K,V> e = t.first; e != null; e = e.next) { int h = e.hash; TreeNode<K,V> p = new TreeNode<K,V> (h, e.key, e.val, null, null); // 当前节点的hash和老长度做按位与操作,为0放在低位 if ((h & n) == 0) { if ((p.prev = loTail) == null) lo = p; else loTail.next = p; loTail = p; ++lc; } else { if ((p.prev = hiTail) == null) hi = p; else hiTail.next = p; hiTail = p; ++hc; } } // 低位达到临界值,低位退化 ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin<K,V>(lo) : t; // 高位达到临界值,高位退化 hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin<K,V>(hi) : t; // 在新的数组i的位置上设置低位树 setTabAt(nextTab, i, ln); // 在新的数组i+n的位置上设置高位链表 setTabAt(nextTab, i + n, hn); // 老数组i的位置上设置占位符 setTabAt(tab, i, fwd); // 继续向后 advance = true; } } } } } }
总体来说分为两部分:
1. 扩容前的准备和相关状态的检查
①:初始化用于存储扩容后数据的nextTable
②:分配一个桶给当前线程;判断是否扩容结束,扩容结束更新table和sizeCtl变量;判断当前桶是不是被占用了,被占用则跳过这个桶;
2. 加锁扩容
①:判断节点类型
②:如果是链表,从头结点开始遍历链表,通过当前节点和老长度的按位与操作生成一个runBit,每次遇到与前一个runBit不同的节点,则更新runBit和lastRun(当前与前面runBit不同的节点),直到遍历结束;
③:根据runBit是否为0,把lastRun节点赋给低位链表或者高位链表;
④:再次遍历链表,分割出两部分链表:以lastRun节点为停止遍历条件,根据每个Node的hash和老长度的按位与结果是否为0,把Node划分到低位链表和高位链表中。最后把低位链表和高位链表放到新数组i和i+n的位置上,老数组i的位置上设置占位符。继续处理其它剩余的桶。
⑤:处理树形结构,逻辑和链表一样,只不过多了个判断是否退化成链表的逻辑。
扩容过程我画了个图
最后设置新位置
未完待续
.
集合类源码(七)Map(ConcurrentHashMap, ConcurrentSkipListMap, TreeMap)
标签:服务 没有初始化 comm 没有 boolean 数组 while 循环 font 遍历
原文地址:https://www.cnblogs.com/LUA123/p/11928987.html