标签:多线程
ConcurrentHashMap是Java5中新增加的一个线程安全的Map集合,可以用来替代HashTable。HashTable容器在竞争激烈的并发环境下表现出效率低下的原因是所有访问HashTable的线程都必须竞争同一把锁,那假如容器里有多把锁,每一把锁用于锁容器其中一部分数据,那么当多线程访问容器里不同数据段的数据时,线程间就不会存在锁竞争,从而可以有效的提高并发访问效率,这就是ConcurrentHashMap所使用的锁分段技术,首先将数据分成一段一段的存储,然后给每一段数据配一把锁,当一个线程占用锁访问其中一个段数据的时候,其他段的数据也能被其他线程访问。
ConcurrentHashMap的结构
ConcurrentHashMap 类中包含两个静态内部类 HashEntry 和 Segment。HashEntry 用来封装映射表的键 / 值对;Segment 用来充当锁的角色,每个 Segment 对象守护整个散列映射表的若干个桶。每个桶是由若干个 HashEntry 对象链接起来的链表。一个 ConcurrentHashMap 实例中包含由若干个 Segment 对象组成的数组。ConcurrentHashMap的类图如下所示:
ConcurrentHashMap的内部结构:
HashEntry 类:
HashEntry 用来封装散列映射表中的键值对。在 HashEntry 类中,key,hash 和 next 域都被声明为 final 型,value 域被声明为 volatile 型。
static final class HashEntry<K,V> { final K key; final int hash; volatile V value; final HashEntry<K,V> next; HashEntry(K key, int hash, HashEntry<K,V> next, V value) { this.key = key; this.hash = hash; this.next = next; this.value = value; } @SuppressWarnings("unchecked") static final <K,V> HashEntry<K,V>[] More ...newArray(int i) { return new HashEntry[i]; } }
static final class Segment<K,V> extends ReentrantLock implements Serializable { /** * 在本 segment 范围内,包含的 HashEntry 元素的个数 * 该变量被声明为 volatile 型 */ transient volatile int count; /** * table 被更新的次数 */ transient int modCount; /** * 当 table 中包含的 HashEntry 元素的个数超过本变量值时,触发 table 的再散列 */ transient int threshold; /** * table 是由 HashEntry 对象组成的数组 * 如果散列时发生碰撞,碰撞的 HashEntry 对象就以链表的形式链接成一个链表 * table 数组的数组成员代表散列映射表的一个桶 * 每个 table 守护整个 ConcurrentHashMap 包含桶总数的一部分 * 如果并发级别为 16,table 则守护 ConcurrentHashMap 包含的桶总数的 1/16 */ transient volatile HashEntry<K,V>[] table; /** * 装载因子 */ final float loadFactor; Segment(int initialCapacity, float lf) { loadFactor = lf; setTable(HashEntry.<K,V>newArray(initialCapacity)); } /** * 设置 table 引用到这个新生成的 HashEntry 数组 * 只能在持有锁或构造函数中调用本方法 */ void setTable(HashEntry<K,V>[] newTable) { // 计算临界阀值为新数组的长度与装载因子的乘积 threshold = (int)(newTable.length * loadFactor); table = newTable; } /** * 根据 key 的散列值,找到 table 中对应的那个桶(table 数组的某个数组成员) */ HashEntry<K,V> getFirst(int hash) { HashEntry<K,V>[] tab = table; // 把散列值与 table 数组长度减 1 的值相“与”, // 得到散列值对应的 table 数组的下标 // 然后返回 table 数组中此下标对应的 HashEntry 元素 return tab[hash & (tab.length - 1)]; } }
public class ConcurrentHashMap<K, V> extends AbstractMap<K, V> implements ConcurrentMap<K, V>, Serializable { /** * 散列映射表的默认初始容量为 16,即初始默认为 16 个桶 * 在构造函数中没有指定这个参数时,使用本参数 */ static final int DEFAULT_INITIAL_CAPACITY= 16; /** * 散列映射表的默认装载因子为 0.75,该值是 table 中包含的 HashEntry 元素的个数与 * table 数组长度的比值 * 当 table 中包含的 HashEntry 元素的个数超过了 table 数组的长度与装载因子的乘积时, * 将触发 再散列 * 在构造函数中没有指定这个参数时,使用本参数 */ static final float DEFAULT_LOAD_FACTOR= 0.75f; /** * 散列表的默认并发级别为 16。该值表示当前更新线程的估计数 * 在构造函数中没有指定这个参数时,使用本参数 */ static final int DEFAULT_CONCURRENCY_LEVEL= 16; /** * segments 的掩码值 * key 的散列码的高位用来选择具体的 segment */ final int segmentMask; /** * 偏移量 */ final int segmentShift; /** * 由 Segment 对象组成的数组 */ final Segment<K,V>[] segments; /** * 创建一个带有指定初始容量、加载因子和并发级别的新的空映射。 */ public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { if(!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); if(concurrencyLevel > MAX_SEGMENTS) concurrencyLevel = MAX_SEGMENTS; // 寻找最佳匹配参数(不小于给定参数的最接近的 2 次幂) int sshift = 0; int ssize = 1; while(ssize < concurrencyLevel) { ++sshift; ssize <<= 1; } segmentShift = 32 - sshift; // 偏移量值 segmentMask = ssize - 1; // 掩码值 this.segments = Segment.newArray(ssize); // 创建数组 if (initialCapacity > MAXIMUM_CAPACITY) initialCapacity = MAXIMUM_CAPACITY; int c = initialCapacity / ssize; if(c * ssize < initialCapacity) ++c; int cap = 1; while(cap < c) cap <<= 1; // 依次遍历每个数组元素 for(int i = 0; i < this.segments.length; ++i) // 初始化每个数组元素引用的 Segment 对象 this.segments[i] = new Segment<K,V>(cap, loadFactor); } /** * 创建一个带有默认初始容量 (16)、默认加载因子 (0.75) 和 默认并发级别 (16) * 的空散列映射表。 */ public ConcurrentHashMap() { // 使用三个默认参数,调用上面重载的构造函数来创建空散列映射表 this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL); }
public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); if (concurrencyLevel > MAX_SEGMENTS) concurrencyLevel = MAX_SEGMENTS; // Find power-of-two sizes best matching arguments int sshift = 0; int ssize = 1; while (ssize < concurrencyLevel) { ++sshift; ssize <<= 1; } segmentShift = 32 - sshift; segmentMask = ssize - 1; this.segments = Segment.newArray(ssize); if (initialCapacity > MAXIMUM_CAPACITY) initialCapacity = MAXIMUM_CAPACITY; int c = initialCapacity / ssize; if (c * ssize < initialCapacity) ++c; int cap = 1; while (cap < c) cap <<= 1; for (int i = 0; i < this.segments.length; ++i) this.segments[i] = new Segment<K,V>(cap, loadFactor); }
用分离锁实现多个线程间的并发写操作
在 ConcurrentHashMap 中,线程对映射表做读操作时,一般情况下不需要加锁就可以完成,对容器做结构性修改的操作才需要加锁。下面以 put 操作为例说明对 ConcurrentHashMap 做结构性修改的过程。
首先,根据 key 计算出对应的 hash 值:
Put 方法的实现:
public V put(K key, V value) { if (value == null) //ConcurrentHashMap 中不允许用 null 作为映射值 throw new NullPointerException(); int hash = hash(key.hashCode()); // 计算键对应的散列码 // 根据散列码找到对应的 Segment return segmentFor(hash).put(key, hash, value, false); }
/** * 使用 key 的散列码来得到 segments 数组中对应的 Segment */ final Segment<K,V> segmentFor(int hash) { // 将散列值右移 segmentShift 个位,并在高位填充 0 // 然后把得到的值与 segmentMask 相“与” // 从而得到 hash 值对应的 segments 数组的下标值 // 最后根据下标值返回散列码对应的 Segment 对象 return segments[(hash >>> segmentShift) & segmentMask]; }
V put(K key, int hash, V value, boolean onlyIfAbsent) { lock(); // 加锁,这里是锁定某个 Segment 对象而非整个 ConcurrentHashMap try { int c = count; if (c++ > threshold) // 如果超过再散列的阈值 rehash(); // 执行再散列,table 数组的长度将扩充一倍 HashEntry<K,V>[] tab = table; // 把散列码值与 table 数组的长度减 1 的值相“与” // 得到该散列码对应的 table 数组的下标值 int index = hash & (tab.length - 1); // 找到散列码对应的具体的那个桶 HashEntry<K,V> first = tab[index]; HashEntry<K,V> e = first; while (e != null && (e.hash != hash || !key.equals(e.key))) e = e.next; V oldValue; if (e != null) { // 如果键 / 值对以经存在 oldValue = e.value; if (!onlyIfAbsent) e.value = value; // 设置 value 值 } else { // 键 / 值对不存在 oldValue = null; ++modCount; // 要添加新节点到链表中,所以 modCont 要加 1 // 创建新节点,并添加到链表的头部 tab[index] = new HashEntry<K,V>(key, hash, first, value); count = c; // 写 count 变量 } return oldValue; } finally { unlock(); // 解锁 } }
ConcurrentHashMap的get操作
这里的ConcurrentHashMap的get操作是不用加锁的有别于put的实现,但都要经过两次查找的过程,我们这里看一下其实现:
public V get(Object key) { int hash = hash(key.hashCode()); //segmentFor这个函数用于确定操作应该在哪一个segment中进行,几乎对ConcurrentHashMap的所有操作都需要用到这个函数 return segmentFor(hash).get(key, hash); }
final Segment<K,V> segmentFor(int hash) { //根据传入的hash值向右无符号右移segmentShift位,然后和segmentMask进行与操作,<span style="font-family: Arial, Helvetica, sans-serif;">结合我们之前说的segmentShift和segmentMask的值,就可以得出以下结论:</span> //假设Segment的数量是2的n次方,根据元素的hash值的高n位就可以确定元素到底在哪一个Segment中。 return segments[(hash >>> segmentShift) & segmentMask]; }
V get(Object key, int hash) { //count是volatile的,实际上这里里面利用了volatile的语义: //对volatile字段的写入操作happens-before于每一个后续的同一个字段的读操作。 //因为实际上put、remove等操作也会更新count的值,所以当竞争发生的时候,volatile的语义可以保证写操作在读操作之前, //也就保证了写操作对后续的读操作都是可见的,这样后面get的后续操作就可以拿到完整的元素内容。 if (count != 0) { // read-volatile HashEntry<K,V> e = getFirst(hash); while (e != null) { if (e.hash == hash && key.equals(e.key)) { V v = e.value; if (v != null) return v; return readValueUnderLock(e); // recheck } e = e.next; } } return null; }
HashEntry<K,V> getFirst(int hash) { HashEntry<K,V>[] tab = table; return tab[hash & (tab.length - 1)]; }
同样,这里也是用位操作来确定链表的头部,hash值和HashTable的长度减一做与操作,最后的结果就是hash值的低n位,其中n是HashTable的长度以2为底的结果。在确定了链表的头部以后,就可以对整个链表进行遍历,取出key对应的value的值,如果拿出的value的值是null,则可能这个key,value对正在put的过程中,如果出现这种情况,那么就加锁来保证取出的value是完整的,如果不是null,则直接返回value。
remove操作:
V remove(Object key, int hash, Object value) { lock(); // 加锁 try{ int c = count - 1; HashEntry<K,V>[] tab = table; // 根据散列码找到 table 的下标值 int index = hash & (tab.length - 1); // 找到散列码对应的那个桶 HashEntry<K,V> first = tab[index]; HashEntry<K,V> e = first; while(e != null&& (e.hash != hash || !key.equals(e.key))) e = e.next; V oldValue = null; if(e != null) { V v = e.value; if(value == null|| value.equals(v)) { // 找到要删除的节点 oldValue = v; ++modCount; // 所有处于待删除节点之后的节点原样保留在链表中 // 所有处于待删除节点之前的节点被克隆到新链表中 HashEntry<K,V> newFirst = e.next;// 待删节点的后继结点 for(HashEntry<K,V> p = first; p != e; p = p.next) newFirst = new HashEntry<K,V>(p.key, p.hash, newFirst, p.value); // 把桶链接到新的头结点 // 新的头结点是原链表中,删除节点之前的那个节点 tab[index] = newFirst; count = c; // 写 count 变量 } } return oldValue; } finally{ unlock(); // 解锁 } }
总结
ConcurrentHashMap 是一个并发散列映射表的实现,它允许完全并发的读取,并且支持给定数量的并发更新。相比于 HashTable 和用同步包装器包装的 HashMap(Collections.synchronizedMap(new HashMap())),ConcurrentHashMap 拥有更高的并发性。在 HashTable 和由同步包装器包装的 HashMap 中,使用一个全局的锁来同步不同线程间的并发访问。同一时间点,只能有一个线程持有锁,也就是说在同一时间点,只能有一个线程能访问容器。这虽然保证多线程间的安全并发访问,但同时也导致对容器的访问变成串行化的了。
在使用锁来协调多线程间并发访问的模式下,减小对锁的竞争可以有效提高并发性。有两种方式可以减小对锁的竞争:
减小请求 同一个锁的 频率。
减少持有锁的 时间。
ConcurrentHashMap 的高并发性主要来自于三个方面:
用分离锁实现多个线程间的更深层次的共享访问。
用 HashEntery 对象的不变性来降低执行读操作的线程在遍历链表期间对加锁的需求。
通过对同一个 Volatile 变量的写 / 读访问,协调不同线程间读 / 写操作的内存可见性。
使用分离锁,减小了请求 同一个锁的频率。
通过 HashEntery 对象的不变性及对同一个 Volatile 变量的读 / 写来协调内存可见性,使得 读操作大多数时候不需要加锁就能成功获取到需要的值。由于散列映射表在实际应用中大多数操作都是成功的 读操作,所以 2 和 3 既可以减少请求同一个锁的频率,也可以有效减少持有锁的时间。
通过减小请求同一个锁的频率和尽量减少持有锁的时间 ,使得 ConcurrentHashMap 的并发性相对于 HashTable 和用同步包装器包装的 HashMap有了质的提高。
标签:多线程
原文地址:http://blog.csdn.net/maoyeqiu/article/details/46663859