标签:其他 值方法 线程 tar 并发 mamicode 基础 建议 类加载机制
【原创】Java并发编程系列27 | ConcurrentHashMap(下)收录于话题
#进阶架构师 | 并发编程专题
12个
点击上方“java进阶架构师”,选择右上角“置顶公众号”
20大进阶架构专题每日送达
2020年Java面试题库连载中
【000期】Java最全面试题库思维导图
【001期】JavaSE面试题(一):面向对象
【002期】JavaSE面试题(二):基本数据类型与访问修饰符
【003期】JavaSE面试题(三):JavaSE语法(1)
【004期】JavaSE面试题(四):JavaSE语法(3)
【005期】JavaSE面试题(五):String类
【006期】JavaSE面试题(六):泛型
【007期】JavaSE面试题(七):异常
更多内容,点击上面蓝字查看
上一篇详细分析了HashMap源码,介绍了HashMap的数据结构以及并发编程中HashMap的问题,这篇就来看下ConcurrentHashMap。因为ConcurrentHashMap与HashMap结构是一样的,本文将重点介绍ConcurrentHashMap在并发编程中如何保证线程安全:
关键属性
put()方法
扩容
如何保证线程线程安全
使用误区
table:用来存放Node结点数据;
Node:结点,保存key-value的数据结构;
nextTable:扩容时新生成的数据,数组为table的两倍;
ForwardingNode:特殊的Node结点,hash值为-1,其中存储nextTable的引用。扩容时,作为一个占位符放在table中表示当前结点为null或则已经被移动。
sizeCtl:控制标识符
/**
* 利用CAS操作获取table数组中索引为i的Node
*/
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}
/**
* 利用CAS操作替换table数组中索引为i的元素
*/
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
Node<K,V> c, Node<K,V> v) {
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}
/**
* 利用CAS操作设置table数组中索引为i的元素
*/
static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
}
由于数据结构一样,put()、get()、remove()方法大致步骤也一致,我们通过put()方法的源码来看下ConcurrentHashMap是如何保证线程安全的。get()、remove()这些数据操作保证线程安全的方式跟put()方法是一样的,理解了put方法,其他的也就明白了。
put()方法大致步骤:
数组下标没有对应hash值,直接newNode()添加
数组下标有对应hash值,添加到链表最后
链表超过最大长度(8),将链表改为红黑树再添加元素
结点在table数组中的位置计算:table[(length - 1) & hash] 。
put()方法源码:
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());// 得到 hash 值
int binCount = 0;// 用于记录相应链表的长度
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
// 如果数组空,进行数组初始化
if (tab == null || (n = tab.length) == 0)
tab = initTable();
// 找该 hash 值对应的数组下标,得到第一个节点 f
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// 如果数组该位置为空,利用 CAS 操作将这个新值放入其中即可
// 如果 CAS 失败,那就是有并发操作,进到下一个循环再put
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
// hash 等于 MOVED(-1),数组正在扩容,帮助数据迁移
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);// 帮助数据迁移
// 到这里就是说,f 是该位置的头结点,而且不为空
else {
V oldVal = null;
// 获取数组该位置的头结点的监视器锁
synchronized (f) {
if (tabAt(tab, i) == f) {
// 头结点的 hash 值大于 0,说明是链表
if (fh >= 0) {
binCount = 1;// 用于累加,记录链表的长度
// 遍历链表添加node
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
// 红黑树
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
// 调用红黑树的插值方法插入新节点
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
// 判断是否要将链表转换为红黑树
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}
put()方法是如何保证线程安全的?
table用volatile修饰,保证数组数据修改的可见性
获取结点使用tabAt(),设置结点使用casTabAt(),利用CAS进行数据操作,使用乐观锁,如果因为其他线程修改数据导致当前线程操作失败,自旋重试直到成功。
举例说明:
HashMap中,线程A线程B可以同时检查得到tab[1]==null,然后线程A设置了tab[1]=Node(A),马上线程B又设置tab[1]=Node(B),那么线程A设置的数据就丢失了;
ConcurrentHashMap中,当线程A设置了tab[1]=Node(A)后,线程B又设置tab[1]=Node(B)就会失败,然后自旋进下一次循环,设置Node(A).next=Node(B)。
操作链表上的数据和红黑树上的数据时加synchronized锁,将第一个结点作为监视器锁。同样避免了因为多线程操作对数据结构的破坏和数据的丢失。
不再使用++size这种操作记录结点数量,而是用volatile变量baseCount和volatile数组CounterCell[]来记录。CAS操作成功用baseCount记录,CAS失败用CounterCell[]记录,最终将两个变量结合计算出总结点数量。
ConcurrentHashMap单线程扩容过程与HashMap类似,大致过程如下:
newTab = new Node[2*length],创建一个两倍于原来数组oldTab的新数组newTab,遍历oldTable,将oldTab中的结点转移到newTab中。
如果桶中oldTab[i]只有一个元素node,直接将node放入newTab[node.hash & (newCap - 1)]中。
如果桶中oldTab[i]是链表,分成两个链表分别放入newTab[i]和newTab[i+oldTab.length]。
如果桶中oldTab[i]是树,树打散成两颗树插入到新桶中去。
ConcurrentHashMap支持多线程扩容:
当一个线程发现数组结点到达阈值时,调用transfer(tab, null)进行扩容并迁移数据,会创建一个2倍长度的新数组nextTable。
当另一个线程要操作数据时发现table数组正在扩容,就会调用transfer(tab, nextTable)帮忙迁移数据。
多个线程同时迁移数据怎么实现呢?设置一个步长stride,每个线程负责一个步长的数据迁移。例:table.length==64,步长stride=16,每个线程每次负责迁移16个桶,如果当前线程16个桶迁移结束再去申请16个桶迁移。
扩容如何保证线程安全呢?
table和nextTable的修改都是通过CAS操作,失败后自旋重试,不会造成数据丢失和错误。
链表和红黑树的操作都是将第一个结点作为监视器锁加synchronized锁,同步处理,保证线程安全。
扩容源码(源码较长,还是建议看下,实在没时间阅读源码的话可以直接看节的总结):
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
// stride步长,在单核下直接等于 n,多核模式下为 (n>>>3)/NCPU,最小值是 16
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
// 如果 nextTab 为 null,先进行一次初始化
// 第一个发起迁移的线程调用此方法时,参数 nextTab 为 null
// 之后参与迁移的线程调用此方法时,nextTab 为 nextTable
if (nextTab == null) {
try {
// 容量翻倍
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab; // nextTable 是 ConcurrentHashMap 中的属性
transferIndex = n;// transferIndex 也是 ConcurrentHashMap 的属性,用于控制迁移的位置
}
int nextn = nextTab.length;
// ForwardingNode表示已经迁移过的结点,hash值为MOED(-1)
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true;// advance 指的是做完了一个位置的迁移工作,可以准备做下一个位置的了
boolean finishing = false; // to ensure sweep before committing nextTab
for (int i = 0, bound = 0;;) {// i 是位置索引,bound 是边界,
Node<K,V> f; int fh;
// i 指向了 transferIndex,bound 指向了 transferIndex-stride
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
// 将 transferIndex 值赋给 nextIndex
// 这里 transferIndex 一旦小于等于 0,说明原数组的所有位置都有相应的线程去处理了
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;// nextBound 是这次迁移任务的边界
i = nextIndex - 1;
advance = false;
}
}
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
// 所有的迁移操作已经完成
if (finishing) {
nextTable = null;
table = nextTab;// 将新的 nextTab 赋值给 table 属性,完成迁移
sizeCtl = (n << 1) - (n >>> 1);// 重新计算 sizeCtl:n 是原数组长度,所以 sizeCtl 得出的值将是新数组长度的 0.75 倍
return;
}
// 修改sizeCtl
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
// 任务结束,方法退出
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
// 如果位置 i 处是空的,没有任何节点,那么放入刚刚初始化的 ForwardingNode
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
// 该位置处是一个 ForwardingNode,代表该位置已经迁移过了
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
// 对数组该位置处的结点加锁,开始处理数组该位置处的迁移工作
else {
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
// 头结点的 hash 大于 0,说明是链表的 Node 节点
if (fh >= 0) {
/*
* 将链表一分为二
* 找到原链表中的 lastRun,然后 lastRun 及其之后的节点是一起进行迁移的
* astRun 之前的节点需要进行克隆,然后分到两个链表中
*/
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
// 其中的一个链表放在新数组的位置 i
setTabAt(nextTab, i, ln);
// 另一个链表放在新数组的位置 i+n
setTabAt(nextTab, i + n, hn);
// 将原数组该位置处设置为 fwd,代表该位置已经处理完毕,
// 其他线程一旦看到该位置的 hash 值为 MOVED,就不会进行迁移了
setTabAt(tab, i, fwd);
// advance 设置为 true,代表该位置已经迁移完毕
advance = true;
}
else if (f instanceof TreeBin) {
// 红黑树的迁移
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
// 如果一分为二后,节点数少于 8,那么将红黑树转换回链表
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
// 将 ln 放置在新数组的位置 i
setTabAt(nextTab, i, ln);
// 将 hn 放置在新数组的位置 i+n
setTabAt(nextTab, i + n, hn);
// 将原数组该位置处设置为 fwd,代表该位置已经处理完毕,
// 其他线程一旦看到该位置的 hash 值为 MOVED,就不会进行迁移了
setTabAt(tab, i, fwd);
// advance 设置为 true,代表该位置已经迁移完毕
advance = true;
}
}
}
}
}
}
CAS操作数据:table数组的取值/设置值、链表的数值操作、sizeCtl修改都利用CAS来完成,当因为其他线程修改了数据导致操作失败后,会自旋重试直到成功,保证了在多线程环境下的数据安全。
synchronized互斥锁:操作链表/树种的元素时,使用synchronized锁,将第一个结点作为监视器锁,保证线程安全。
volatile修饰变量:table、baseCount、CounterCell[]、sizeCtl等变量用volatile修饰,保证了多线程环境下数据读写的可见性。
线程安全的容器只能保证自身的数据不被破坏,但无法保证业务的行为是否正确。
ConcurrentHashMap只是保证put进容器的数据正确保存,get时可以正确获取,但并发容器并不是锁,并不能保证业务上的线程安全。
举例说明:启动100个线程统计字符串"a"出现的次数,正确结果应该是100,但是每次执行完结果都小于100。
ConcurrentHashMap能保证的情况:
put("a1", 1)和put("a2", 2),如果"a1"和"a2"的hash值相等,但并不equals,在并发环境中出问题。
get("a")在多线程环境中可以取到正确的值。
ConcurrentHashMap不能解决的情况:两个线程同时get("a")=1,然后又同时put("a", 2)。这个问题不是ConcurrentHashMap本身的问题,因为get得到的是最新值没问题,put的值也已经保存了,而是业务代码的线程安全问题,对一个共享的集合操作时没有同步处理,需要加锁解决。
public class ConcurrentHashMapTest {
private static final Map<String, Long> wordCounts = new ConcurrentHashMap<String, Long>();
public static void main(String[] args) {
for (int i = 0; i <= 99; i++) {
new Thread(){
public void run() {
Long oldValue = wordCounts.get("a");
Long newValue = (oldValue == null) ? 1L : oldValue + 1;
wordCounts.put("a", newValue);
};
}.start();
}
System.out.println(wordCounts);
}
}
解决:synchronized加锁
public class ConcurrentHashMapTest {
private static final Map<String, Long> wordCounts = new ConcurrentHashMap<String, Long>();
public static void main(String[] args) {
for (int i = 0; i <= 99; i++) {
new Thread(){
public void run() {
synchronized (wordCounts) {
Long oldValue = wordCounts.get("a");
Long newValue = (oldValue == null) ? 1L : oldValue + 1;
wordCounts.put("a", newValue);
}
};
}.start();
}
System.out.println(wordCounts);
}
}
【原创】01|开篇获奖感言
【原创】02|并发编程三大核心问题
【原创】03|重排序-可见性和有序性问题根源
【原创】04|Java 内存模型详解
【原创】05|深入理解 volatile
【原创】06|你不知道的 final
【原创】07|synchronized 原理
【原创】08|synchronized 锁优化
【原创】09|基础干货
【原创】10|线程状态
【原创】11|线程调度
【原创】12|揭秘 CAS
【原创】13|LockSupport
【原创】14|AQS 源码分析
【原创】15|重入锁 ReentrantLock
【原创】16|公平锁与非公平锁
【原创】17|读写锁八讲(上)
【原创】18|读写锁八讲(下)
【原创】19|JDK8新增锁StampedLock
【原创】20|StampedLock源码解析
【原创】21|Condition-Lock的等待通知
【原创】22|倒计时器CountDownLatch
【原创】22|倒计时器CountDownLatch
【原创】23|循环屏障CyclicBarrier
【原创】24|信号量Semaphore
之前,给大家发过四份Java面试宝典,这次新增了更全面的资料,相信在跳槽前准备准备,基本没大问题。
《java基础:设计模式等》(初中级)
《JVM:整理BAT最新题库》《并发编程》(中高级)
《分布式微服务架构》《架构|软技能》(资深)
《一线互联网公司面试指南》(资深)
学习视频包含深入运行时数据区、垃圾回收、详解类装载过程及类加载机制、手写Spring-IOC容器、redis入门到高性能缓存组件等等
获取方式:点“在看”,在公众号后台打开,回复 【666】即可领取,资料持续更新。
看到这里,证明有所收获
必须点个在看支持呀,喵
【原创】Java并发编程系列27 | ConcurrentHashMap(下)
标签:其他 值方法 线程 tar 并发 mamicode 基础 建议 类加载机制
原文地址:https://blog.51cto.com/15009303/2552587