码迷,mamicode.com
首页 > 编程语言 > 详细

【原创】Java并发编程系列27 | ConcurrentHashMap(下)

时间:2020-11-24 12:55:34      阅读:11      评论:0      收藏:0      [点我收藏+]

标签:其他   值方法   线程   tar   并发   mamicode   基础   建议   类加载机制   

【原创】Java并发编程系列27 | ConcurrentHashMap(下)
收录于话题
#进阶架构师 | 并发编程专题
12个

点击上方“java进阶架构师”,选择右上角“置顶公众号”
20大进阶架构专题每日送达
技术图片

2020年Java面试题库连载中
【000期】Java最全面试题库思维导图
【001期】JavaSE面试题(一):面向对象
【002期】JavaSE面试题(二):基本数据类型与访问修饰符
【003期】JavaSE面试题(三):JavaSE语法(1)
【004期】JavaSE面试题(四):JavaSE语法(3)
【005期】JavaSE面试题(五):String类
【006期】JavaSE面试题(六):泛型
【007期】JavaSE面试题(七):异常
更多内容,点击上面蓝字查看

正文

技术图片
上一篇详细分析了HashMap源码,介绍了HashMap的数据结构以及并发编程中HashMap的问题,这篇就来看下ConcurrentHashMap。因为ConcurrentHashMap与HashMap结构是一样的,本文将重点介绍ConcurrentHashMap在并发编程中如何保证线程安全:
关键属性
put()方法
扩容
如何保证线程线程安全
使用误区

1. 关键属性


table:用来存放Node结点数据;
Node:结点,保存key-value的数据结构;
nextTable:扩容时新生成的数据,数组为table的两倍;
ForwardingNode:特殊的Node结点,hash值为-1,其中存储nextTable的引用。扩容时,作为一个占位符放在table中表示当前结点为null或则已经被移动。
sizeCtl:控制标识符

  • 负数代表正在进行初始化或扩容操作
  • -1 代表正在初始化
  • -N 表示有N-1个线程正在进行扩容操作
  • 正数 如果当前数组为null,表示table在初始化过程中,sizeCtl表示为需要新建数组的长度
  • 正数 若table已经初始化了,表示临界值,数组的长度n乘以加载因子loadFactor;
    sun.misc.Unsafe U:使用CAS修改属性和做一些操作来保证线程安全性,例如:
/**
 * 利用CAS操作获取table数组中索引为i的Node
 */
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
    return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}

/**
 * 利用CAS操作替换table数组中索引为i的元素
 */
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
                                    Node<K,V> c, Node<K,V> v) {
    return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}

/**
 * 利用CAS操作设置table数组中索引为i的元素
 */
static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
    U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
}

2. put()方法


由于数据结构一样,put()、get()、remove()方法大致步骤也一致,我们通过put()方法的源码来看下ConcurrentHashMap是如何保证线程安全的。get()、remove()这些数据操作保证线程安全的方式跟put()方法是一样的,理解了put方法,其他的也就明白了。
put()方法大致步骤:
数组下标没有对应hash值,直接newNode()添加
数组下标有对应hash值,添加到链表最后
链表超过最大长度(8),将链表改为红黑树再添加元素
结点在table数组中的位置计算:table[(length - 1) & hash] 。
put()方法源码:

final V putVal(K key, V value, boolean onlyIfAbsent) {
    if (key == null || value == null) throw new NullPointerException();
    int hash = spread(key.hashCode());// 得到 hash 值
    int binCount = 0;// 用于记录相应链表的长度
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        // 如果数组空,进行数组初始化
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();

        // 找该 hash 值对应的数组下标,得到第一个节点 f
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            // 如果数组该位置为空,利用 CAS 操作将这个新值放入其中即可
            // 如果 CAS 失败,那就是有并发操作,进到下一个循环再put
            if (casTabAt(tab, i, null,
                         new Node<K,V>(hash, key, value, null)))
                break;                   // no lock when adding to empty bin
        }

        // hash 等于 MOVED(-1),数组正在扩容,帮助数据迁移
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);// 帮助数据迁移

        // 到这里就是说,f 是该位置的头结点,而且不为空
        else {
            V oldVal = null;
            // 获取数组该位置的头结点的监视器锁
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    // 头结点的 hash 值大于 0,说明是链表
                    if (fh >= 0) {
                        binCount = 1;// 用于累加,记录链表的长度
                        // 遍历链表添加node
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            Node<K,V> pred = e;
                            if ((e = e.next) == null) {
                                pred.next = new Node<K,V>(hash, key,
                                                          value, null);
                                break;
                            }
                        }
                    }
                    // 红黑树
                    else if (f instanceof TreeBin) {
                        Node<K,V> p;
                        binCount = 2;
                        // 调用红黑树的插值方法插入新节点
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                       value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }

            // 判断是否要将链表转换为红黑树
            if (binCount != 0) {
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    addCount(1L, binCount);
    return null;
}

put()方法是如何保证线程安全的?

table用volatile修饰,保证数组数据修改的可见性
获取结点使用tabAt(),设置结点使用casTabAt(),利用CAS进行数据操作,使用乐观锁,如果因为其他线程修改数据导致当前线程操作失败,自旋重试直到成功。
举例说明:
HashMap中,线程A线程B可以同时检查得到tab[1]==null,然后线程A设置了tab[1]=Node(A),马上线程B又设置tab[1]=Node(B),那么线程A设置的数据就丢失了;
ConcurrentHashMap中,当线程A设置了tab[1]=Node(A)后,线程B又设置tab[1]=Node(B)就会失败,然后自旋进下一次循环,设置Node(A).next=Node(B)。
操作链表上的数据和红黑树上的数据时加synchronized锁,将第一个结点作为监视器锁。同样避免了因为多线程操作对数据结构的破坏和数据的丢失。
不再使用++size这种操作记录结点数量,而是用volatile变量baseCount和volatile数组CounterCell[]来记录。CAS操作成功用baseCount记录,CAS失败用CounterCell[]记录,最终将两个变量结合计算出总结点数量。

3. 扩容


ConcurrentHashMap单线程扩容过程与HashMap类似,大致过程如下:
newTab = new Node[2*length],创建一个两倍于原来数组oldTab的新数组newTab,遍历oldTable,将oldTab中的结点转移到newTab中。
如果桶中oldTab[i]只有一个元素node,直接将node放入newTab[node.hash & (newCap - 1)]中。
如果桶中oldTab[i]是链表,分成两个链表分别放入newTab[i]和newTab[i+oldTab.length]。
如果桶中oldTab[i]是树,树打散成两颗树插入到新桶中去。
ConcurrentHashMap支持多线程扩容:
当一个线程发现数组结点到达阈值时,调用transfer(tab, null)进行扩容并迁移数据,会创建一个2倍长度的新数组nextTable。
当另一个线程要操作数据时发现table数组正在扩容,就会调用transfer(tab, nextTable)帮忙迁移数据。
多个线程同时迁移数据怎么实现呢?设置一个步长stride,每个线程负责一个步长的数据迁移。例:table.length==64,步长stride=16,每个线程每次负责迁移16个桶,如果当前线程16个桶迁移结束再去申请16个桶迁移。

扩容如何保证线程安全呢?

table和nextTable的修改都是通过CAS操作,失败后自旋重试,不会造成数据丢失和错误。
链表和红黑树的操作都是将第一个结点作为监视器锁加synchronized锁,同步处理,保证线程安全。
扩容源码(源码较长,还是建议看下,实在没时间阅读源码的话可以直接看节的总结):

private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
    int n = tab.length, stride;

    // stride步长,在单核下直接等于 n,多核模式下为 (n>>>3)/NCPU,最小值是 16
    if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
        stride = MIN_TRANSFER_STRIDE; // subdivide range

    // 如果 nextTab 为 null,先进行一次初始化
    // 第一个发起迁移的线程调用此方法时,参数 nextTab 为 null
    // 之后参与迁移的线程调用此方法时,nextTab 为 nextTable
    if (nextTab == null) {
        try {
            // 容量翻倍
            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
            nextTab = nt;
        } catch (Throwable ex) {      // try to cope with OOME
            sizeCtl = Integer.MAX_VALUE;
            return;
        }
        nextTable = nextTab; // nextTable 是 ConcurrentHashMap 中的属性
        transferIndex = n;// transferIndex 也是 ConcurrentHashMap 的属性,用于控制迁移的位置
    }

    int nextn = nextTab.length;

    // ForwardingNode表示已经迁移过的结点,hash值为MOED(-1)
    ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);

    boolean advance = true;// advance 指的是做完了一个位置的迁移工作,可以准备做下一个位置的了
    boolean finishing = false; // to ensure sweep before committing nextTab

    for (int i = 0, bound = 0;;) {// i 是位置索引,bound 是边界,
        Node<K,V> f; int fh;

        // i 指向了 transferIndex,bound 指向了 transferIndex-stride
        while (advance) {
            int nextIndex, nextBound;
            if (--i >= bound || finishing)
                advance = false;

            // 将 transferIndex 值赋给 nextIndex
            // 这里 transferIndex 一旦小于等于 0,说明原数组的所有位置都有相应的线程去处理了
            else if ((nextIndex = transferIndex) <= 0) {
                i = -1;
                advance = false;
            }
            else if (U.compareAndSwapInt
                     (this, TRANSFERINDEX, nextIndex,
                      nextBound = (nextIndex > stride ?
                                   nextIndex - stride : 0))) {
                bound = nextBound;// nextBound 是这次迁移任务的边界
                i = nextIndex - 1;
                advance = false;
            }
        }
        if (i < 0 || i >= n || i + n >= nextn) {
            int sc;
            // 所有的迁移操作已经完成
            if (finishing) {
                nextTable = null;
                table = nextTab;// 将新的 nextTab 赋值给 table 属性,完成迁移
                sizeCtl = (n << 1) - (n >>> 1);// 重新计算 sizeCtl:n 是原数组长度,所以 sizeCtl 得出的值将是新数组长度的 0.75 倍
                return;
            }

            // 修改sizeCtl
            if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                // 任务结束,方法退出
                if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                    return;
                finishing = advance = true;
                i = n; // recheck before commit
            }
        }
        // 如果位置 i 处是空的,没有任何节点,那么放入刚刚初始化的 ForwardingNode
        else if ((f = tabAt(tab, i)) == null)
            advance = casTabAt(tab, i, null, fwd);
        // 该位置处是一个 ForwardingNode,代表该位置已经迁移过了
        else if ((fh = f.hash) == MOVED)
            advance = true; // already processed
        // 对数组该位置处的结点加锁,开始处理数组该位置处的迁移工作
        else {
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    Node<K,V> ln, hn;
                    // 头结点的 hash 大于 0,说明是链表的 Node 节点
                    if (fh >= 0) {
                        /*
                         * 将链表一分为二
                         * 找到原链表中的 lastRun,然后 lastRun 及其之后的节点是一起进行迁移的
                         * astRun 之前的节点需要进行克隆,然后分到两个链表中
                         */
                        int runBit = fh & n;
                        Node<K,V> lastRun = f;
                        for (Node<K,V> p = f.next; p != null; p = p.next) {
                            int b = p.hash & n;
                            if (b != runBit) {
                                runBit = b;
                                lastRun = p;
                            }
                        }
                        if (runBit == 0) {
                            ln = lastRun;
                            hn = null;
                        }
                        else {
                            hn = lastRun;
                            ln = null;
                        }
                        for (Node<K,V> p = f; p != lastRun; p = p.next) {
                            int ph = p.hash; K pk = p.key; V pv = p.val;
                            if ((ph & n) == 0)
                                ln = new Node<K,V>(ph, pk, pv, ln);
                            else
                                hn = new Node<K,V>(ph, pk, pv, hn);
                        }
                        // 其中的一个链表放在新数组的位置 i
                        setTabAt(nextTab, i, ln);
                        // 另一个链表放在新数组的位置 i+n
                        setTabAt(nextTab, i + n, hn);
                        // 将原数组该位置处设置为 fwd,代表该位置已经处理完毕,
                        // 其他线程一旦看到该位置的 hash 值为 MOVED,就不会进行迁移了
                        setTabAt(tab, i, fwd);
                        // advance 设置为 true,代表该位置已经迁移完毕
                        advance = true;
                    }
                    else if (f instanceof TreeBin) {
                        // 红黑树的迁移
                        TreeBin<K,V> t = (TreeBin<K,V>)f;
                        TreeNode<K,V> lo = null, loTail = null;
                        TreeNode<K,V> hi = null, hiTail = null;
                        int lc = 0, hc = 0;
                        for (Node<K,V> e = t.first; e != null; e = e.next) {
                            int h = e.hash;
                            TreeNode<K,V> p = new TreeNode<K,V>
                                (h, e.key, e.val, null, null);
                            if ((h & n) == 0) {
                                if ((p.prev = loTail) == null)
                                    lo = p;
                                else
                                    loTail.next = p;
                                loTail = p;
                                ++lc;
                            }
                            else {
                                if ((p.prev = hiTail) == null)
                                    hi = p;
                                else
                                    hiTail.next = p;
                                hiTail = p;
                                ++hc;
                            }
                        }
                        // 如果一分为二后,节点数少于 8,那么将红黑树转换回链表
                        ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                            (hc != 0) ? new TreeBin<K,V>(lo) : t;
                        hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                            (lc != 0) ? new TreeBin<K,V>(hi) : t;

                        // 将 ln 放置在新数组的位置 i
                        setTabAt(nextTab, i, ln);
                        // 将 hn 放置在新数组的位置 i+n
                        setTabAt(nextTab, i + n, hn);
                        // 将原数组该位置处设置为 fwd,代表该位置已经处理完毕,
                        // 其他线程一旦看到该位置的 hash 值为 MOVED,就不会进行迁移了
                        setTabAt(tab, i, fwd);
                        // advance 设置为 true,代表该位置已经迁移完毕
                        advance = true;
                    }
                }
            }
        }
    }
}

4. 总结:如何保证线程线程安全


CAS操作数据:table数组的取值/设置值、链表的数值操作、sizeCtl修改都利用CAS来完成,当因为其他线程修改了数据导致操作失败后,会自旋重试直到成功,保证了在多线程环境下的数据安全。
synchronized互斥锁:操作链表/树种的元素时,使用synchronized锁,将第一个结点作为监视器锁,保证线程安全。
volatile修饰变量:table、baseCount、CounterCell[]、sizeCtl等变量用volatile修饰,保证了多线程环境下数据读写的可见性。

5. 使用注意


线程安全的容器只能保证自身的数据不被破坏,但无法保证业务的行为是否正确。
ConcurrentHashMap只是保证put进容器的数据正确保存,get时可以正确获取,但并发容器并不是锁,并不能保证业务上的线程安全。
举例说明:启动100个线程统计字符串"a"出现的次数,正确结果应该是100,但是每次执行完结果都小于100。
ConcurrentHashMap能保证的情况:
put("a1", 1)和put("a2", 2),如果"a1"和"a2"的hash值相等,但并不equals,在并发环境中出问题。
get("a")在多线程环境中可以取到正确的值。
ConcurrentHashMap不能解决的情况:两个线程同时get("a")=1,然后又同时put("a", 2)。这个问题不是ConcurrentHashMap本身的问题,因为get得到的是最新值没问题,put的值也已经保存了,而是业务代码的线程安全问题,对一个共享的集合操作时没有同步处理,需要加锁解决。

public class ConcurrentHashMapTest {
    private static final Map<String, Long> wordCounts = new ConcurrentHashMap<String, Long>();
    public static void main(String[] args) {
        for (int i = 0; i <= 99; i++) {
            new Thread(){
                public void run() {
                    Long oldValue = wordCounts.get("a");
                    Long newValue = (oldValue == null) ? 1L : oldValue + 1;
                    wordCounts.put("a", newValue);
                };
            }.start();
        }
        System.out.println(wordCounts);
    }
}

解决:synchronized加锁


public class ConcurrentHashMapTest {
    private static final Map<String, Long> wordCounts = new ConcurrentHashMap<String, Long>();
    public static void main(String[] args) {
        for (int i = 0; i <= 99; i++) {
            new Thread(){
                public void run() {
                    synchronized (wordCounts) {
                        Long oldValue = wordCounts.get("a");
                        Long newValue = (oldValue == null) ? 1L : oldValue + 1;
                        wordCounts.put("a", newValue);
                    }
                };
            }.start();
        }
        System.out.println(wordCounts);
    }
}

并发系列文章汇总


【原创】01|开篇获奖感言
【原创】02|并发编程三大核心问题
【原创】03|重排序-可见性和有序性问题根源
【原创】04|Java 内存模型详解
【原创】05|深入理解 volatile
【原创】06|你不知道的 final
【原创】07|synchronized 原理
【原创】08|synchronized 锁优化
【原创】09|基础干货
【原创】10|线程状态
【原创】11|线程调度
【原创】12|揭秘 CAS
【原创】13|LockSupport
【原创】14|AQS 源码分析
【原创】15|重入锁 ReentrantLock
【原创】16|公平锁与非公平锁
【原创】17|读写锁八讲(上)
【原创】18|读写锁八讲(下)
【原创】19|JDK8新增锁StampedLock
【原创】20|StampedLock源码解析
【原创】21|Condition-Lock的等待通知
【原创】22|倒计时器CountDownLatch
【原创】22|倒计时器CountDownLatch
【原创】23|循环屏障CyclicBarrier
【原创】24|信号量Semaphore

之前,给大家发过四份Java面试宝典,这次新增了更全面的资料,相信在跳槽前准备准备,基本没大问题。
《java基础:设计模式等》(初中级)
《JVM:整理BAT最新题库》《并发编程》(中高级)
《分布式微服务架构》《架构|软技能》(资深)
《一线互联网公司面试指南》(资深)
学习视频包含深入运行时数据区、垃圾回收、详解类装载过程及类加载机制、手写Spring-IOC容器、redis入门到高性能缓存组件等等
技术图片

获取方式:点“在看”,在公众号后台打开,回复 【666】即可领取,资料持续更新。

看到这里,证明有所收获
必须点个在看支持呀,喵

【原创】Java并发编程系列27 | ConcurrentHashMap(下)

标签:其他   值方法   线程   tar   并发   mamicode   基础   建议   类加载机制   

原文地址:https://blog.51cto.com/15009303/2552587

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!