标签:syn val available static init 不能 end set public
(1) 背景
HashMap死循环:HashMap在并发执行put操作时会引起死循环,是因为多线程会导致HashMap的Entry链表形成环形数据结构,一旦形成环形数据结构,Entry的next节点永远不为空,就会产生死循环获取Entry.
HashTable效率低下:HashTable容器使用synchronized来保证线程安全,但在线程竞争激烈的情况下HashTable的效率非常低下.因为当一个线程访问HashTable的同步方法,其它线程也访问HashTable的同步方法时,会进入阻塞或轮询状态.如线程1使用put进行元素添加,线程2不但不能使用put方法添加元素,也不能使用get方法获取元素,所以竞争越激烈效率越低.
(2) 简介
HashTable容器在竞争激烈的并发环境下表现出效率低下的原因是所有访问HashTable的线程都必须竞争一把锁,假如容器里有多把锁,每一把锁用于锁容器其中一部分数据,那么多线程访问容器里不同的数据段时,线程间不会存在竞争,从而可以有效提高并发访问效率,这就是ConcurrentHash所使用的锁分段技术.首先将数据分成一段一段地储存,然后给每一段配一把锁,当一个线程占用锁访问其中一段数据时,其它段的数据也能被其它线程访问.
结构
ConcurrentHash是由Segments数组结构和HashEntry数组结构组成.Segment是一种可重入锁(ReentrantLock),在ConcurrentHashMap里扮演锁的色;HashEntry则用于存储键值对数据.一个ConcurrentHashMap里包含一个Segment组.Segment的结构和HashMap类似,是一种数组加链表的结构.一个Segment里包含一个HashEntry数组,每个HashEntry是一个链表结构的元素,每个Segment守护者一个HashEntry数组里面的元素,当对HashEntry数组的数据进行修改时,必须先获得与它对应的Segment锁,如下图所示.
基本成员
default_initial_capacitymap默认容量,必须是2的冥
/**
* 默认的初始容量 16
*/
static final int DEFAULT_INITIAL_CAPACITY = 16;
default_load_factor默认负载因子(存储的比例)
/**
* 默认的负载因子
*/
static final float DEFAULT_LOAD_FACTOR = 0.75f;
default_concurrency_level默认并发数量,segments数组量
/**
* 默认的并发数量,会影响segments数组的长度
*/
static final int DEFAULT_CONCURRENCY_LEVEL = 16;
maximum_capacitymap最大容量
/**
* 最大容量,构造ConcurrentHashMap时指定的值超过,就用该值替换
* ConcurrentHashMap大小必须是2^n,且小于等于2^30
*/
static final int MAXIMUM_CAPACITY = 1 << 30;
min_segment_table_capacityHashEntry[]默认容量
/**
* 每个segment中table数组的长度,必须是2^n,至少为2
*/
static final int MIN_SEGMENT_TABLE_CAPACITY = 2;
max_segments最大并发数,segments数组最大量
/**
* 允许最大segment数量,用于限定concurrencyLevel的边界,必须是2^n
*/
static final int MAX_SEGMENTS = 1 << 16;
retries_before_lock重试次数,在加锁之前
/**
* 非锁定情况下调用size和contains方法的重试次数,避免由于table连续被修改导致无限重试
*/
static final int RETRIES_BEFORE_LOCK = 2;
segmentMask计算segment位置的掩码(segments.length-1)
/**
* 用于segment的掩码值,用于与hash的高位进行取&
*/
final int segmentMask;
segmentShift
/**
* 用于算segment位置时,hash参与运算的位数
*/
final int segmentShift;
segmentssegment数组
/**
* segments数组
*/
final Segment<K,V>[] segments;
HashEntry存储数据的链式结构
static final class HashEntry<K,V> {
// hash值
final int hash;
// key
final K key;
// 保证内存可见性,每次从内存中获取
volatile V value;
volatile HashEntry<K,V> next;
HashEntry(int hash, K key, V value, HashEntry<K,V> next) {
this.hash = hash;
this.key = key;
this.value = value;
this.next = next;
}
/**
* 使用volatile语义写入next,保证可见性
*/
final void setNext(HashEntry<K,V> n) {
UNSAFE.putOrderedObject(this, nextOffset, n);
}
Segment继承ReentrantLock锁,用于存放HashEntry[]
static final class Segment<K,V> extends ReentrantLock implements Serializable {
private static final long serialVersionUID = 2249069246763182397L;
/**
* 对segment加锁时,在阻塞之前自旋的次数
*
*/
static final int MAX_SCAN_RETRIES =
Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;
/**
* 每个segment的HashEntry table数组,访问数组元素可以通过entryAt/setEntryAt提供的volatile语义来完成
* volatile保证可见性
*/
transient volatile HashEntry<K,V>[] table;
/**
* 元素的数量,只能在锁中或者其他保证volatile可见性之间进行访问
*/
transient int count;
/**
* 当前segment中可变操作发生的次数,put,remove等,可能会溢出32位
* 它为chm isEmpty() 和size()方法中的稳定性检查提供了足够的准确性.
* 只能在锁中或其他volatile读保证可见性之间进行访问
*/
transient int modCount;
/**
* 当table大小超过阈值时,对table进行扩容,值为(int)(capacity *loadFactor)
*/
transient int threshold;
/**
* 负载因子
*/
final float loadFactor;
/**
* 构造方法
*/
Segment(float lf, int threshold, HashEntry<K,V>[] tab) {
this.loadFactor = lf;
this.threshold = threshold;
this.table = tab;
}
/**
* ConcurrentHashMap 构造方法
* @param initialCapacity 初始化容量
* @param loadFactor 负载因子
* @param concurrencyLevel 并发segment,segments数组的长度
*/
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
// 大于最大segments容量,取最大容量
if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;
// Find power-of-two sizes best matching arguments
// 2^sshift = ssize 例如:sshift = 4,ssize = 16
// 根据concurrencyLevel计算出ssize为segments数组的长度
int sshift = 0;
int ssize = 1;
while (ssize < concurrencyLevel) { // 第一次 满足
++sshift; // 第一次 1
ssize <<= 1; // 第一次 ssize = ssize << 1 (1 * 2^1)
}
// segmentShift和segmentMask的定义
this.segmentShift = 32 - sshift; // 用于计算hash参与运算位数
this.segmentMask = ssize - 1; // segments位置范围
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
// 计算每个segment中table的容量
int c = initialCapacity / ssize;
if (c * ssize < initialCapacity)
++c;
// HashEntry[]默认 容量
int cap = MIN_SEGMENT_TABLE_CAPACITY;
// 确保cap是2^n
while (cap < c)
cap <<= 1;
// create segments and segments[0]
// 创建segments并初始化第一个segment数组,其余的segment延迟初始化
Segment<K,V> s0 =
new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
(HashEntry<K,V>[])new HashEntry[cap]);
Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
this.segments = ss;
}
无参构造使用默认参数
public ConcurrentHashMap() {
this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
}
一些UNSAFE方法
HashEntry
setNext
/**
* 使用volatile语义写入next,保证可见性
*/
final void setNext(HashEntry<K,V> n) {
UNSAFE.putOrderedObject(this, nextOffset, n);
}
entryAt get HashEntry
/**
* 获取给定table的第i个元素,使用volatile读语义
*/
static final <K,V> HashEntry<K,V> entryAt(HashEntry<K,V>[] tab, int i) {
return (tab == null) ? null :
(HashEntry<K,V>) UNSAFE.getObjectVolatile
(tab, ((long)i << TSHIFT) + TBASE);
}
setEntryAt set HashEntry
/**
* 设置给定的table的第i个元素,使用volatile写语义
*/
static final <K,V> void setEntryAt(HashEntry<K,V>[] tab, int i,
HashEntry<K,V> e) {
UNSAFE.putOrderedObject(tab, ((long)i << TSHIFT) + TBASE, e);
}
put 插入元素
标签:syn val available static init 不能 end set public
原文地址:https://blog.51cto.com/14220760/2364724