码迷,mamicode.com
首页 > 编程语言 > 详细

8. 多线程并发扩展(杂七杂八)

时间:2019-12-27 22:08:21      阅读:98      评论:0      收藏:0      [点我收藏+]

标签:超过   hang   collect   can   可重入   基础   扩展   ash   abs   

本章内容:

  1.死锁

  2.多线程并发最佳实践(如何安全并发)

  3.Spring与线程安全

  4.HashMap与ConcurrentHashMap深入分析

 


一、死锁

1.产生必要条件

(1)互斥条件:进程要求对所分配的资源(如打印机)进?排他性控制,即在?段时间内某资源仅为?个进程所占有。此时若有其他进程请求该资源,则请求进程只能等待。?个资源每次只能被?个进程使?。独?桥每次只能通过?个?。

(2)不剥夺条件:进程所获得的资源在未使?完毕之前,不能被其他进程强?夺?,即只能由获得该资源的进程??来释放(只能是主动释放)。?
个进程因请求资源?阻塞时,对已获得的资源保持不放。乙不退出桥?,甲也不退出桥?。
(3)请求和保持条件:进程已经保持了?少?个资源,但?提出了新的资源请求,?该资源已被其他进程占有,此时请求进程被阻塞,但对??已获得的资源保持不放。进程已获得的资源,在未使?完之前,不能强?剥夺。甲不能强制?退出桥?,乙也不能强制甲退出桥。
(4)循环等待条件:存在?种进程资源的循环等待链,链中每?个进程已获得的资源同时被链中?个进程所请求。即存在?个处于等待状态的进程集合{Pl, P2, ..., pn},其中Pi等 待的资源被P(i+1)占有(i=0, 1, ..., n-1),Pn等待的资源被P0占有。若?进程之间形成?种头尾相接的循环等待资源关系。如果乙不退出桥?,甲不能通过,甲不退出桥?,乙不能通过。

2.预防

在有些情况下死锁是可以避免的。下?介绍三种?于避免死锁的技术:

(1)加锁顺序(线程按照?定的顺序加锁,当多个线程需要相同的?些锁,但是按照不同的顺序加锁,死锁就很容易发?。如果能确保所有的线程都是按照相同的顺序获得锁 )
(2)加锁时限(线程尝试获取锁的时候加上?定的时限,超过时限则放弃对该锁的请求,并释放??占有的锁)
(3)死锁检测

总结:避免死锁的?式

1、让程序每次?多只能获得?个锁。当然,在多线程环境下,这种情况通常并不现实。
2、设计时考虑清楚锁的顺序,尽量减少嵌在的加锁交互数量。
3、既然死锁的产?是两个线程?限等待对?持有的锁,那么只要等待时间有个上限不就好了。当然synchronized不具备这个功能,但是我们可以使?Lock类中的tryLock?法去尝试获取锁,这个?法可以指定?个超时时限,在等待超过该时限之后便会返回?个失败信息
 
 
 
二、多线程并发最佳实践
1.使?本地变量: 尽量使?本地变量,?不是创建?个类或实例的变量。
2.使?不可变类:String、Integer等。不可变类可以降低代码中需要的同步数量。
3.最?化锁的作?域范围:S=1/(1-a+a/n)a:并?计算部分所占?例. n:并?处理结点个数. S:加速?当1-a等于0时,没有串?只有并?,最?加速? S=n当a=0时,只有串?没有并?,最?加速? S = 1当n→∞时,极限加速? s→ 1/(1-a)例如,若串?代码占整个代码的25%,则并?处理的总体性能不可能超过4。该公式称为:"阿姆达尔定律"或"安达尔定理"。
4.使?线程池的Executor,?不是直接new Thread 执?: 创建?个线程的代价是昂贵的,如果要创建?个可伸缩的Java应?,那么你需要使?线程池。
5.宁可使?同步也不要使?线程的wait和notify: 从Java1.5以后,增加了许多同步?具,如:CountDownLatch、CyclicBarrier、Semaphore等,应该优先使?这些同步?具。
6.使?BlockingQueue实现?产-消费模式: 阻塞队列不仅可以处理单个?产、单个消费,也可以处理多个?产和消费。
7.使?并发集合?不是加了锁的同步集合: Java提供了下??种并发集合框架:ConcurrentHashMap、CopyOnWriteArrayList、CopyOnWriteArraySet、ConcurrentLinkedQueue 、ConcurrentLinkedDeque等.
8.使?Semaphone创建有界的访问: 为了建?稳定可靠的系统,对于数据库、?件系统和socket等资源必须要做有机的访问,Semaphone可以限制这些资源开销的选择,Semaphone可以以最低的代价阻塞线程等待,可以通过Semaphone来控制同时访问指定资源的线程数。
9.宁可使?同步代码块,也不实?同步的?法: 主要针对synchronized关键字。使?synchronized关键字同步代码块只会锁定?个对象,?不会将整个?法锁定。如果更改共同的变量或类的字段,?先应该选择的是原?型变量,然后使?volatile。如果需要互斥锁,可以考虑使?ReentrantLock。
10.避免使?静态变量: 静态变量在并发执?环境下会制造很多问题,如果必须使?静态变量,那么优先是它成为final变量,如果?来保存集合collection,那么可以考虑使?只读集合,否则定要做特别多的同步处理和并发处理操作。
 
三、Spring与线程安全
  Spring作为?个IOC/DI容器,帮助我们管理了许许多多的“bean”。但其实,Spring并没有保证这些对象的线程安全,需要由开发者??编写解决线程安全问题的代码。
  Spring对每个bean提供了?个scope属性来表示该bean的作?域。它是bean的?命周期。例如,?个scope为singleton的bean,在第?次被注?时,会创建为?个单例对象,该对象会?直被复?到应?结束。
  singleton:默认的scope,每个scope为singleton的bean都会被定义为?个单例对象,该对象的?命周期是与Spring IOC容器?致的(但在第?次被注?时才会创建)。
  prototype:bean被定义为在每次注?时都会创建?个新的对象。
  request:bean被定义为在每个HTTP请求中创建?个单例对象,也就是说在单个请求中都会复?这?个单例对象。
  session:bean被定义为在?个session的?命周期内创建?个单例对象。
  application:bean被定义为在ServletContext的?命周期中复??个单例对象。
  websocket :bean被定义为在websocket的?命周期中复??个单例对象。
   我们交由Spring管理的?多数对象其实都是?些?状态的对象,这种不会因为多线程?导致状态被破坏的对象很适合Spring的默认scope,每个单例的?状态对象都是线程安全的(也可以说只要是?状态的对象,不管单例多例都是线程安全的,不过单例毕竟节省了不断创建对象与GC的开销)。?状态的对象即是?身没有状态的对象,?然也就不会因为多个线程的交替调度?破坏?身状态导致线程安全问题。
  ?状态对象包括我们经常使?的DO、DTO、VO这些只作为数据的实体模型的贫?对象,还有Service、DAO和Controller,这些对象并没有??的状态,它们只是?来执?某些操作的。例如,每个DAO提供的函数都只是对数据库的CRUD,?且每个数据库Connection都作为函数的局部变量(局部变量是在?户栈中的,?且?户栈本身就是线程私有的内存区域,所以不存在线程安全问题),?完即关(或交还给连接池)。
  有?可能会认为,我使?request作?域不就可以避免每个请求之间的安全问题了吗?这是完全错误的,因为Controller默认是单例的,?个HTTP请求是会被多个线程执?的,这就?回到了线程的安全问题。当然,你也可以把Controller的scope改成prototype,实际上Struts2就是这么做的,但有?点要注意,Spring MVC对请求的拦截粒度是基于每个?法的,?Struts2是基于每个类的,所以把Controller设为多例将会频繁的创建与回收对象,严重影响到了性能。
  通过阅读上?其实已经说的很清楚了,Spring根本就没有对bean的多线程安全问题做出任何保证与措施。对于每个bean的线程安全问题,根本原因是每个bean?身的设计。不要在bean中声明任何有状态的实例变量或类变量,如果必须如此,那么就使?ThreadLocal把变量变为线程私有的,如果bean的实例变量或类变量需要在多个线程之间共享,那么就只能使?synchronized、lock、CAS等这些实现线程同步的?法了。
 
四、HashMap、HashTable、ConcurrentHashMap
1.HashMap
  HashMap初始的数据结构如下所示,内部维护一个数组,然后数组上维护一个单链表。
技术图片

 

【内部类Node】 节点保存hash、key、value、next属性,其中next代表下一个节点,这不就是一个链表么。

 1   /**
 2      * Basic hash bin node, used for most entries.  (See below for
 3      * TreeNode subclass, and in LinkedHashMap for its Entry subclass.)
 4      */
 5     static class Node<K,V> implements Map.Entry<K,V> {
 6         final int hash;
 7         final K key;
 8         V value;
 9         Node<K,V> next;
10 
11         Node(int hash, K key, V value, Node<K,V> next) {
12             this.hash = hash;
13             this.key = key;
14             this.value = value;
15             this.next = next;
16         }
17 
18         public final K getKey()        { return key; }
19         public final V getValue()      { return value; }
20         public final String toString() { return key + "=" + value; }
21 
22         public final int hashCode() {
23             return Objects.hashCode(key) ^ Objects.hashCode(value);
24         }
25 
26         public final V setValue(V newValue) {
27             V oldValue = value;
28             value = newValue;
29             return oldValue;
30         }
31 
32         public final boolean equals(Object o) {
33             if (o == this)
34                 return true;
35             if (o instanceof Map.Entry) {
36                 Map.Entry<?,?> e = (Map.Entry<?,?>)o;
37                 if (Objects.equals(key, e.getKey()) &&
38                     Objects.equals(value, e.getValue()))
39                     return true;
40             }
41             return false;
42         }
43     }

 

【put方法】
 1  /**
 2      * Associates the specified value with the specified key in this map.
 3      * If the map previously contained a mapping for the key, the old
 4      * value is replaced.
 5      *
 6      * @param key key with which the specified value is to be associated
 7      * @param value value to be associated with the specified key
 8      * @return the previous value associated with <tt>key</tt>, or
 9      *         <tt>null</tt> if there was no mapping for <tt>key</tt>.
10      *         (A <tt>null</tt> return can also indicate that the map
11      *         previously associated <tt>null</tt> with <tt>key</tt>.)
12      */
13     public V put(K key, V value) {
14         return putVal(hash(key), key, value, false, true);
15     }
16 
17     /**
18      * Implements Map.put and related methods
19      *
20      * @param hash hash for key
21      * @param key the key
22      * @param value the value to put
23      * @param onlyIfAbsent if true, don‘t change existing value
24      * @param evict if false, the table is in creation mode.
25      * @return previous value, or null if none
26      */
27     final V putVal(int hash, K key, V value, boolean onlyIfAbsent,
28                    boolean evict) {
29         Node<K,V>[] tab; Node<K,V> p; int n, i;
30         if ((tab = table) == null || (n = tab.length) == 0)
31             n = (tab = resize()).length;
32         if ((p = tab[i = (n - 1) & hash]) == null)//n是数组的长度,则数组的角标是由(tab.length-1)&hash
33             tab[i] = newNode(hash, key, value, null);//给节点数组赋值
34         else {
35             Node<K,V> e; K k;
36             if (p.hash == hash &&
37                 ((k = p.key) == key || (key != null && key.equals(k))))
38                 e = p;
39             else if (p instanceof TreeNode)
40                 e = ((TreeNode<K,V>)p).putTreeVal(this, tab, hash, key, value);
41             else {
42                 for (int binCount = 0; ; ++binCount) {
43                     if ((e = p.next) == null) {
44                         p.next = newNode(hash, key, value, null);
45                         if (binCount >= TREEIFY_THRESHOLD - 1) // -1 for 1st
46                             treeifyBin(tab, hash);
47                         break;
48                     }
49                     if (e.hash == hash &&
50                         ((k = e.key) == key || (key != null && key.equals(k))))
51                         break;
52                     p = e;
53                 }
54             }
55             if (e != null) { // existing mapping for key
56                 V oldValue = e.value;
57                 if (!onlyIfAbsent || oldValue == null)
58                     e.value = value;
59                 afterNodeAccess(e);
60                 return oldValue;
61             }
62         }
63         ++modCount;
64         if (++size > threshold)
65             resize();
66         afterNodeInsertion(evict);
67         return null;
68     }

 【hash方法】

1 static final int hash(Object key) {
2         int h;
3         return (key == null) ? 0 : (h = key.hashCode()) ^ (h >>> 16);
4 }  

 

  通过源码可以知道hash值是通过h=key.hashCode()^(h>>>16),由此就可以知道数组角标值的全部计算过程,先由Map的key的hash值和它自己右移16位的值异或计算,然后再与数组长度减一与运算。例如一个key的hash指为为363771819,数组长度为默认长度16:
技术图片
  数组长度减一,是为了保证最后的角标值在数组长度范围内(默认长度为16,16-1=15,保证最大值为15,不超范围)。15&363766277 = 9;
  为什么不直接采用与数组长度取模的方式,直接取得脚标值,而是先去异或,再与运算去计算脚标值?

 

    1.用位运算,效率更高

  2. hashCode 的高低位异或运算,让高低位更加均匀的混合到一起,可以使得在 put 元素时,可以减少哈希碰撞。

  失败的 hashCode 算法会导致 HashMap 的性能由数组下降为链表,所以想要避免发生碰撞,就要提高 hashCode 结果的均匀性。 

 
【数组的默认长度】
static final int DEFAULT_INITIAL_CAPACITY = 1 << 4; // aka 16

技术图片

技术图片

 

   就是当 table 数组的长度为 null 或长度为 0 时,调用初始化resize()方法,然后在resize()方法中也做了判断,当table数组的长度为 0 时,将新数组的长度赋值为DEFAULT_INITIAL_CAPACITY, 所以 HashMap 中数组的初始化长度就是 DEFAULT_INITIAL_CAPACITY,等于1 << 4,等于 16。

【数组扩容的阈值】

  当数组的长度达到某个值或某个条件时,数组就开始扩容,而这里的某个值或某个条件就是我们所说的数组扩容的阈值。
 
 1    //这里put方法只调用了putVal方法,那我们就直接看putVal方法
 2     public V put(K key, V value) {
 3         return putVal(hash(key), key, value, false, true);
 4     }
 5 
 6     //我解释下这个方法里面,大概的操作
 7     final V putVal(int hash, K key, V value, boolean onlyIfAbsent,
 8                    boolean evict) {
 9         Node<K,V>[] tab; Node<K,V> p; int n, i;
10         if ((tab = table) == null || (n = tab.length) == 0)
11             n = (tab = resize()).length; //这一步之前分析过了,就是判断数组为null或长度为0时,对数组进行扩容
12         if ((p = tab[i = (n - 1) & hash]) == null)
13             tab[i] = newNode(hash, key, value, null); //这一步其实也很清楚了,就是根据hash值计算出数组的脚标,然后判断数组的该脚标的元素是否为空,为空的话就把put进来的数据封装成节点赋值进数组
14         else {
15             //根据上面的两个判断,那么走到这里的代码就是说,数组不为空,而且put进来的key计算所得的脚标节点也不为空,走这一块逻辑(实际上这块逻辑也跟扩容的阈值无关,只是单纯的判断然后加节点的操作,但是我还是解释下这里的代码)
16             Node<K,V> e; K k;
17             if (p.hash == hash &&
18                 ((k = p.key) == key || (key != null && key.equals(k))))
19                 e = p;//这里的意思是说,如果hash值相同,key值也相同,那么就说明此时put操作的元素在数组从存在,这覆盖该节点
20             else if (p instanceof TreeNode)
21                 e = ((TreeNode<K,V>)p).putTreeVal(this, tab, hash, key, value); //这里是判断节点类型是否是树类型,为什么会是树类型呢?不是说是HashMap是数组加链表吗?后面的章节会详细讲到,这里暂且跳过
22             else {
23                 //代码走到这里,就说明此时put进来的元素,对应的数组脚标是个链表
24                 for (int binCount = 0; ; ++binCount) {
25                     //此处的代码后面讲链表时会细讲,这里暂且跳过
26                     if ((e = p.next) == null) {
27                         p.next = newNode(hash, key, value, null);
28                         if (binCount >= TREEIFY_THRESHOLD - 1) // -1 for 1st
29                             treeifyBin(tab, hash);
30                         break;
31                     }
32                     //这里判断hash值与key值是否都相同,如果是即说明map中存在该key-value,此时跳出循环
33                     if (e.hash == hash &&
34                         ((k = e.key) == key || (key != null && key.equals(k))))
35                         break;
36                     p = e;
37                 }
38             }
39             //此逻辑是判断hash值与key值是否都相同跳出循环后,将新值覆盖旧值,然后将旧值返回出去
40             if (e != null) { // existing mapping for key
41                 V oldValue = e.value;
42                 if (!onlyIfAbsent || oldValue == null)
43                     e.value = value;
44                 afterNodeAccess(e);
45                 return oldValue;
46             }
47         }
48         ++modCount;//hashMap内部维护的一个修改的次数,有兴趣了解的话可以看源码里面对这个属性的翻译
49         if (++size > threshold)
50             resize();//扩容,在此之前的代码,都是判断之后进行添加覆盖节点的操作,此处是插入新节点之后判断是否扩容,所以这里的条件就是我们找了这么久的扩容的阈值!!!
51         afterNodeInsertion(evict);
52         return null;
53     }

   走读完上面的代码,我们可以得知 if (++size > threshold),如下代码可知 size实际上就是HashMap集合的键值对数,即长度,所以就是说,当 size的大小超过 threshold时,开始进行扩容,也即 threshold就是进行扩容的阈值。那么这个阈值的大小是多少呢?

技术图片
1    /**
2      * The load factor used when none specified in constructor.
3      */
4     static final float DEFAULT_LOAD_FACTOR = 0.75f;
5     /**
6      * The default initial capacity - MUST be a power of two.
7      */
8     static final int DEFAULT_INITIAL_CAPACITY = 1 << 4; // aka 16

 

  当 HashMap 数组为 null 或长度为 0 时,初始化threshold的值,
DEFAULT_LOAD_FACTOR * DEFAULT_INITIAL_CAPACITYDEFAULT_INITIAL_CAPACITY为数组的初始长度,DEFAULT_LOAD_FACTOR是阈值的计算因子,他的值是 0.75f,意思就是当 HashMap 的 size 超过数组长度的75%的时候,就进行扩容
 
我们可以继续走读源码来验证是否数组长度超过 75% 就进行扩容,还是上面那张图的源码,我把其中一段给抽离出来,如下:
 1  if (oldCap > 0) {
 2             if (oldCap >= MAXIMUM_CAPACITY) {
 3                 threshold = Integer.MAX_VALUE;
 4                 return oldTab;
 5             }
 6             //此处的意思是说,当数组的长度是大于0的时候,而且数组扩容一倍之后,小于默认配置的最大值时,并且大于初始化数组的长度,则执行if下面的代码,那就是说,扩容之后如果没超过最大值,就走这个逻辑
 7             else if ((newCap = oldCap << 1) < MAXIMUM_CAPACITY &&
 8                      oldCap >= DEFAULT_INITIAL_CAPACITY)
 9                    //而这个逻辑的代码意思,就是阈值threshold增大一倍(左移一位)
10                    newThr = oldThr << 1; // double threshold
11   }

 

? 那么,我们就知道了,当数组扩容时,threshold的值也会增大一倍,那么下一次扩容时,也是HashMap的 size 超过数组长度的 75% 的时候,就进行扩容。

 

【数组扩容】

  HashMap 内数组的扩容是将数组的长度左移一位,在二进制运算中,左移一位实际上就是将数值扩大一倍。而且我们也知道,扩容的源码就是resize()这个方法,所以这一章节就来重点解读resize()方法的源码
 1     /**
 2      * Initializes or doubles table size.  If null, allocates in
 3      * accord with initial capacity target held in field threshold.
 4      * Otherwise, because we are using power-of-two expansion, the
 5      * elements from each bin must either stay at same index, or move
 6      * with a power of two offset in the new table.
 7      *
 8      * @return the table
 9      */
10     final Node<K,V>[] resize() {
11         Node<K,V>[] oldTab = table; //oldTab就是扩容前数组对象
12         int oldCap = (oldTab == null) ? 0 : oldTab.length; //oldCap就是扩容前数组的长度
13         int oldThr = threshold; //oldThr就是扩容前的阈值
14         int newCap, newThr = 0; //声明newCap-扩容后的数组长度,newThr-扩容后的阈值
15         if (oldCap > 0) {
16             //这一部分逻辑其实上一节已经讲过了,在这里我就大致说一下,就是如果这是扩容前数组长度已经达到了默认配置的最大值时,那么就不扩容了,直接返回原数组,否则,数组扩容一倍,阈值也增大一倍
17             if (oldCap >= MAXIMUM_CAPACITY) {
18                 threshold = Integer.MAX_VALUE;
19                 return oldTab;
20             }
21             else if ((newCap = oldCap << 1) < MAXIMUM_CAPACITY &&
22                      oldCap >= DEFAULT_INITIAL_CAPACITY)
23                 newThr = oldThr << 1; // double threshold
24         }
25         else if (oldThr > 0) // initial capacity was placed in threshold
26             newCap = oldThr; //这个判断不是正常创建Map集合走的逻辑,这里可以跳过这句代码
27         else {               // zero initial threshold signifies using defaults
28             //这一步的代码前面也解释过了,就是当数组长度为0,初始化数组长度与扩容的阈值
29             newCap = DEFAULT_INITIAL_CAPACITY;
30             newThr = (int)(DEFAULT_LOAD_FACTOR * DEFAULT_INITIAL_CAPACITY);
31         }
32         if (newThr == 0) {
33             float ft = (float)newCap * loadFactor;
34             newThr = (newCap < MAXIMUM_CAPACITY && ft < (float)MAXIMUM_CAPACITY ?
35                       (int)ft : Integer.MAX_VALUE);
36         }
37         //将新的扩容后的阈值赋值给threshold
38         threshold = newThr;
39         @SuppressWarnings({"rawtypes","unchecked"})
40             Node<K,V>[] newTab = (Node<K,V>[])new Node[newCap]; //创建新的数组
41         table = newTab;
42         if (oldTab != null) {
43             for (int j = 0; j < oldCap; ++j) {
44                 //遍历旧的数组,下面的内容就是将旧数组重新散列将数据保存到新数组
45                 Node<K,V> e;
46                 if ((e = oldTab[j]) != null) {
47                     oldTab[j] = null;
48                     //如果节点下没有下一个节点,就是说不是链表仅是单个节点走这个逻辑
49                     if (e.next == null) 
50                         newTab[e.hash & (newCap - 1)] = e; //根据hash值与新数组的长度进行与操作,获取新数组的脚标值,
将节点存储到新数组
51 else if (e instanceof TreeNode) //如果节点是树节点,走这个逻辑,后面讲链表的红黑树的时候会做解释,这里先跳过 52 ((TreeNode<K,V>)e).split(this, newTab, j, oldCap); 53 else { //所以这一部分的逻辑,就是如果节点是链表,走这里 54 Node<K,V> loHead = null, loTail = null; 55 Node<K,V> hiHead = null, hiTail = null; 56 Node<K,V> next; 57 do { 58 //遍历链表 59 next = e.next; //链表中的节点 60 if ((e.hash & oldCap) == 0) { 61 //这个判断是理解这整个链表遍历的关键,这里也涉及到了前面讲到的2^n-1对应二进制是0111xxxx的内容,
我们知道数组的长度总是2^n,所以oldCap的值实际上就是1000xxxx,然后hash & oldCap的操作,就是判断oldCap高位的1与对应hash那一位的值是否是1,
如果是0走这个逻辑,如果是1走下面的else代码
62 //这里,前面声明的4个变量loHead, loTail, hiHead, hiTail中,lo的指的是低位,hi的指的是高位,
走完这个do里面的逻辑,就是将oldCap高位的1与对应hash那一位的值是0的存到loTail这个链表中,高位是1的存到hiTail这个链表中!!!!
63 if (loTail == null) 64 loHead = e; 65 else 66 loTail.next = e; 67 loTail = e; 68 } 69 else { 70 if (hiTail == null) 71 hiHead = e; 72 else 73 hiTail.next = e; 74 hiTail = e; 75 } 76 } while ((e = next) != null); 77 if (loTail != null) { 78 //将上面遍历之后低位的loTail存放到新数组的原脚标处 79 loTail.next = null; 80 newTab[j] = loHead; 81 } 82 if (hiTail != null) { 83 //将上面遍历之后高位的hiTail存放到新数组的扩容后的脚标处 84 hiTail.next = null; 85 newTab[j + oldCap] = hiHead; 86 } 87 } 88 } 89 } 90 } 91 return newTab; 92 }

?   在上面的源码解读中,我们可能会留有一个问题,就是为什么扩容后,对数组中的链表还要做 (e.hash & oldCap) == 0的判断?

?   实际上这部分逻辑是为了提高HashMap的查询性能,因为数组扩容后,节点要重新散列,那么节点上面的链表当然也最好要做到均匀的分布,减少单个数组节点上的链表长度,变相的提高了查询性能。所以,源码的逻辑是在扩容后将低位的 loTail 存放到新数组的原脚标处,高位的 hiTail 存放到新数组的扩容后的脚标处(jdk1.8新设计)

技术图片

 

 【链表的扩容——红黑树】

  我们了解到,如果链表的长度越来越长,HashMap 的查询效率也会随之降低。所以单纯的对链表长度的增加,显然是不可取的。

?  所以在 HashMap 中,对于链表实际上并没有扩容操作。在本文开头列出的 Node 节点的源码中也可以看到,内部并没有维护一个size或者length的属性,也没有一个去获取 length 或 size 相关的方法,所以本章节主要阐述的内容,是链表结构向树状结构的转化

单链表-->红黑树

  在前面“数组扩容的阈值”章节的时候,我曾解读过 putVal 方法的代码,在解读过程中,我跳过了两次代码逻辑,在这一章节我就来详细的解读这两处逻辑

技术图片

  第一处之前未解读的源码,其实只是判断是树状结构时,将节点按照红黑树的规则,put进树中而已。

  接着我们看 for 循环遍历处的代码,此处的遍历的内容是 HashMap 是 put 操作节点是为链表时的逻辑:首先这里先判断链表的next节点是否为空,为空则将 put 操作的 key-value 封装为 node 对象,赋值给next节点,然后下一步的判断if (binCount >= TREEIFY_THRESHOLD - 1)是这里的关键,TREEIFY_THRESHOLD 这个是什么呢?THRESHOLD 这个单词是不是看着有点眼熟,在前面将数组扩容的阈值的时候,是不是用的这个单词,那在这里的TREEIFY_THRESHOLD 会不会就是链表结构转树状结构的阈值呢?

  通过上面这段代码的上下文,我们知道 binCount就是链表的长度(注意:这里是从 0 开始的),而TREEIFY_THRESHOLD 看下面的源码,默认值是 8,意思就是说当链表的长度,大于等于 8 时,就执行treeifyBin(tab, hash);

 /**
     * The bin count threshold for using a tree rather than list for a
     * bin.  Bins are converted to trees when adding an element to a
     * bin with at least this many nodes. The value must be greater
     * than 2 and should be at least 8 to mesh with assumptions in
     * tree removal about conversion back to plain bins upon
     * shrinkage.
     */
    static final int TREEIFY_THRESHOLD = 8;

 

 1  /**
 2      * Replaces all linked nodes in bin at index for given hash unless
 3      * table is too small, in which case resizes instead.
 4      */
 5     //翻译大概的意思就是,在给定hash的节点处替换节点类型,除非是数组的长度太小了,才进行resize操作
 6     //总结就是说,并不是链表的长度超过了默认的阈值8时,就一定转树状结构,还要判断数组的长度是否已经经过了扩容
 7     final void treeifyBin(Node<K,V>[] tab, int hash) {
 8         int n, index; Node<K,V> e;
 9         //这里就是上面翻译说的判断,MIN_TREEIFY_CAPACITY的值是64,就是说如果你的数组没有经过扩容操作的情况下,如果链表长度已经超过8了,
此时不转树状结构,而是进行数组扩容,数组扩容时会重新散列,将链表的节点均匀的分布,查询效率对比转树状结构也要好,不得不佩服设计者的设计。
10 if (tab == null || (n = tab.length) < MIN_TREEIFY_CAPACITY) 11 resize(); 12 else if ((e = tab[index = (n - 1) & hash]) != null) { 13 //此处代码就是找到给定的hash的节点,将此节点的链表转为红黑树,下面的代码主要是数据结构代码的内容,有兴趣的同学可以自己解读,由于时间原因,我就不解读这部分转红黑树的代码了 14 TreeNode<K,V> hd = null, tl = null; 15 do { 16 TreeNode<K,V> p = replacementTreeNode(e, null); 17 if (tl == null) 18 hd = p; 19 else { 20 p.prev = tl; 21 tl.next = p; 22 } 23 tl = p; 24 } while ((e = e.next) != null); 25 if ((tab[index] = hd) != null) 26 hd.treeify(tab); 27 } 28 }

 

技术图片

 

  我们知道链表的查询时间复杂度最坏的情况有可能是 O(n) ,当你想要找到节点刚好是在链表的最后一个时,你就必须得遍历完链表中所有的节点才能找到你要的值,查找效率太低。而红黑树的本质其实是一棵平衡二叉查找树,平衡二叉查找树的特点就是左子节点小于等于父节点,右子节点大于等于父节点,所以他的查询时间复杂度是 O(Log2n) ,比链表的 O(n) 效率就要高很多了。

 

2.HashTable

  在前面解读的 HashMap 中,已经将HashMap的数据结构,还有put操作、扩容做了详细的解读,而其实 HashTable,只是在 HashMap 的基础上,给各个操作都加上了 synchronized 关键字而已,这就是我们常说的 HashTable 是线程安全的,而 HashMap 是线程不安全的。
  而HashTable只是保证单个方法操作是原子性的,但在不保证原子性的复合操作下,HashTable 也存在线程安全问题。
技术图片

 

 

3.ConcurrentHashMap

  我们知道 HashTable 之所以性能低下,是因为其在 public 方法的实现上都加上了 synchronized 的关键字,即当任意一个 put 或 get 操作,都将整个 map 对象锁住,只有等待持有锁的线程操作结束,才有机会获得锁进行操作。

?   这里有一种场景,在 Map 的数组 table 中,线程1对 table[0] 进行 put 操作,而此时有线程2想对 table[1] 进行 put 操作,实际上两者的 put 操作互不干涉,而在 HashTable 的实现下,线程2只能等待线程1操作完成之后才能执行。那么,我们是否可以这样实现,当线程1对 table[0] 进行 put 操作时,对 table[0] 下的链表进行加锁,而操作 table[1] 时,对 table[1] 的链表进行加锁,各自那各自的锁,这样线程1在操作 table[0] 时,线程2也可以操作 table[1]。

  ConCurrentHashMap 在解决多线程场景下的线程安全问题,采用的是分段锁的技术。分段锁采用的就是这种思想,在ConCurrentHashMap中维护着Segment[]的数组,这种实现方式把原本 HashTable 粗粒度的锁实现,拆分成一段一段的Segment锁。

技术图片

 

 

 1 //jdk1.7的ConcurrentHashMap的源码
 2 public class ConcurrentHashMap<K, V> extends AbstractMap<K, V>
 3         implements ConcurrentMap<K, V>, Serializable {  
 4     /**
 5      * Mask value for indexing into segments. The upper bits of a
 6      * key‘s hash code are used to choose the segment.
 7      */
 8     final int segmentMask;
 9 
10     /**
11      * Shift value for indexing within segments.
12      */
13     final int segmentShift;
14 
15     /**
16      * The segments, each of which is a specialized hash table.
17      */
18     //Segment是继承了可重入锁的子类,所以在Segment的操作方法中,包含了tryLock、unLock等方法
19     final Segment<K,V>[] segments;
20     
21     /**
22      * Segments are specialized versions of hash tables.  This
23      * subclasses from ReentrantLock opportunistically, just to
24      * simplify some locking and avoid separate construction.
25      */
26     static final class Segment<K,V> extends ReentrantLock implements Serializable {
27         
28         /**
29          * The per-segment table. Elements are accessed via
30          * entryAt/setEntryAt providing volatile semantics.
31          */
32         transient volatile HashEntry<K,V>[] table;
33     }
34 }

 

  简单理解就是,ConcurrentHashMap 维护一个 Segment 数组,Segment 通过继承 ReentrantLock 来进行加锁,所以每次需要加锁的操作锁住的是一个 Segment,这样只要保证每个 Segment 是线程安全的,也就实现了全局的线程安全。
技术图片

 

?   如下,是 ConcurrentHashMap 的各个构造方法,但是实际上只有 public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel)该构造方法是真正完成初始化的方法,其他的都是方法重载

 1 public ConcurrentHashMap(int initialCapacity,
 2                          float loadFactor, int concurrencyLevel) {
 3         if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
 4             throw new IllegalArgumentException();
 5         if (concurrencyLevel > MAX_SEGMENTS)
 6             concurrencyLevel = MAX_SEGMENTS;
 7         // Find power-of-two sizes best matching arguments
 8         int sshift = 0;
 9         int ssize = 1;
10         // 计算并行级别 ssize,因为要保持并行级别是 2 的 n 次方
11         while (ssize < concurrencyLevel) {
12             ++sshift;
13             ssize <<= 1;
14         }
15         // 我们这里先不要那么烧脑,用默认值,concurrencyLevel 为 16,sshift 为 4
16         // 那么计算出 segmentShift 为 28,segmentMask 为 15,后面会用到这两个值
17         this.segmentShift = 32 - sshift;
18         this.segmentMask = ssize - 1;
19 
20         if (initialCapacity > MAXIMUM_CAPACITY)
21             initialCapacity = MAXIMUM_CAPACITY;
22 
23         // initialCapacity 是设置整个 map 初始的大小,
24         // 这里根据 initialCapacity 计算 Segment 数组中每个位置可以分到的大小
25         // 如 initialCapacity 为 64,那么每个 Segment 或称之为"槽"可以分到 4 个
26         int c = initialCapacity / ssize;
27         if (c * ssize < initialCapacity)
28             ++c;
29         // 默认 MIN_SEGMENT_TABLE_CAPACITY 是 2,这个值也是有讲究的,因为这样的话,对于具体的槽上,
30         // 插入一个元素不至于扩容,插入第二个的时候才会扩容
31         int cap = MIN_SEGMENT_TABLE_CAPACITY; 
32         while (cap < c)
33             cap <<= 1;
34 
35         // 创建 Segment 数组,
36         // 并创建数组的第一个元素 segment[0]
37         Segment<K,V> s0 =
38             new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
39                              (HashEntry<K,V>[])new HashEntry[cap]);
40         Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
41         // 往数组写入 segment[0]
42         UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
43         this.segments = ss;
44     }
45 
46     public ConcurrentHashMap(int initialCapacity, float loadFactor) {
47         this(initialCapacity, loadFactor, DEFAULT_CONCURRENCY_LEVEL);
48     }
49 
50   
51     public ConcurrentHashMap(int initialCapacity) {
52         this(initialCapacity, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
53     }
54 
55     public ConcurrentHashMap() {
56         this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
57     }
58 
59     public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
60         this(Math.max((int) (m.size() / DEFAULT_LOAD_FACTOR) + 1,
61                       DEFAULT_INITIAL_CAPACITY),
62              DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
63         putAll(m);
64     }

concurrencyLevel:并行级别、并发数、Segment 数,怎么翻译不重要,理解它。默认是 16,也就是说 ConcurrentHashMap 有 16 个 Segments,所以理论上,这个时候,最多可以同时支持 16 个线程并发写,只要它们的操作分别分布在不同的 Segment 上。这个值可以在初始化的时候设置为其他值,但是一旦初始化以后,它是不可以扩容的。

? 再具体到每个 Segment 内部,其实每个 Segment 很像前面介绍的 HashMap,不过它要保证线程安全,所以处理起来要麻烦些。

? initialCapacity:初始容量,这个值指的是整个 ConcurrentHashMap 的初始容量,实际操作的时候需要平均分给每个 Segment。

? loadFactor:负载因子,之前我们说了,Segment 数组不可以扩容,所以这个负载因子是给每个 Segment 内部使用的。

 在 jdk1.8 以下版本的 ConcurrentHashMap 为了保证线程安全又要提供高性能的情况下,采用锁分段的技术,而在java8中对于 ConcurrentHashMap 的实现又变成了另外一种方式----CAS

? CAS的全称是compare and swap,直译过来就是比较与替换。CAS的机制就相当于这种(非阻塞算法),CAS是由CPU硬件实现,所以执行相当快.CAS有三个操作参数:内存地址,期望值,要修改的新值,当期望值和内存当中的值进行比较不相等的时候,表示内存中的值已经被别线程改动过,这时候失败返回,当相等的时候,将内存中的值改为新的值,并返回成功。

? 下面的代码是摘自网上一篇文章的对 java8 中 ConcurrentHashMap 的源码分析,

 1 public V put(K key, V value) {
 2     return putVal(key, value, false);
 3 }
 4 final V putVal(K key, V value, boolean onlyIfAbsent) {
 5     if (key == null || value == null) throw new NullPointerException();
 6     // 得到 hash 值
 7     int hash = spread(key.hashCode());
 8     // 用于记录相应链表的长度
 9     int binCount = 0;
10     for (Node<K,V>[] tab = table;;) {
11         Node<K,V> f; int n, i, fh;
12         // 如果数组"空",进行数组初始化
13         if (tab == null || (n = tab.length) == 0)
14             // 初始化数组,后面会详细介绍
15             tab = initTable();
16  
17         // 找该 hash 值对应的数组下标,得到第一个节点 f
18         else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
19             // 如果数组该位置为空,
20             // 用一次 CAS 操作将这个新值放入其中即可,这个 put 操作差不多就结束了,可以拉到最后面了
21             // 如果 CAS 失败,那就是有并发操作,进到下一个循环就好了
22             if (casTabAt(tab, i, null,
23                          new Node<K,V>(hash, key, value, null)))
24                 break;                   // no lock when adding to empty bin
25         }
26         // hash 居然可以等于 MOVED,这个需要到后面才能看明白,不过从名字上也能猜到,肯定是因为在扩容
27         else if ((fh = f.hash) == MOVED)
28             // 帮助数据迁移,这个等到看完数据迁移部分的介绍后,再理解这个就很简单了
29             tab = helpTransfer(tab, f);
30  
31         else { // 到这里就是说,f 是该位置的头结点,而且不为空
32  
33             V oldVal = null;
34             // 获取数组该位置的头结点的监视器锁
35             synchronized (f) {
36                 if (tabAt(tab, i) == f) {
37                     if (fh >= 0) { // 头结点的 hash 值大于 0,说明是链表
38                         // 用于累加,记录链表的长度
39                         binCount = 1;
40                         // 遍历链表
41                         for (Node<K,V> e = f;; ++binCount) {
42                             K ek;
43                             // 如果发现了"相等"的 key,判断是否要进行值覆盖,然后也就可以 break 了
44                             if (e.hash == hash &&
45                                 ((ek = e.key) == key ||
46                                  (ek != null && key.equals(ek)))) {
47                                 oldVal = e.val;
48                                 if (!onlyIfAbsent)
49                                     e.val = value;
50                                 break;
51                             }
52                             // 到了链表的最末端,将这个新值放到链表的最后面
53                             Node<K,V> pred = e;
54                             if ((e = e.next) == null) {
55                                 pred.next = new Node<K,V>(hash, key,
56                                                           value, null);
57                                 break;
58                             }
59                         }
60                     }
61                     else if (f instanceof TreeBin) { // 红黑树
62                         Node<K,V> p;
63                         binCount = 2;
64                         // 调用红黑树的插值方法插入新节点
65                         if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
66                                                        value)) != null) {
67                             oldVal = p.val;
68                             if (!onlyIfAbsent)
69                                 p.val = value;
70                         }
71                     }
72                 }
73             }
74             // binCount != 0 说明上面在做链表操作
75             if (binCount != 0) {
76                 // 判断是否要将链表转换为红黑树,临界值和 HashMap 一样,也是 8
77                 if (binCount >= TREEIFY_THRESHOLD)
78                     // 这个方法和 HashMap 中稍微有一点点不同,那就是它不是一定会进行红黑树转换,
79                     // 如果当前数组的长度小于 64,那么会选择进行数组扩容,而不是转换为红黑树
80                     //    具体源码我们就不看了,扩容部分后面说
81                     treeifyBin(tab, i);
82                 if (oldVal != null)
83                     return oldVal;
84                 break;
85             }
86         }
87     }
88     // 
89     addCount(1L, binCount);
90     return null;
91 }

 

 

 

 

 

 

 

 

 

 
 
 
 
 
 
 
 
 
 
 
 
 
 

8. 多线程并发扩展(杂七杂八)

标签:超过   hang   collect   can   可重入   基础   扩展   ash   abs   

原文地址:https://www.cnblogs.com/qmillet/p/12103268.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!