码迷,mamicode.com
首页 > 其他好文 > 详细

hashmap&concurrenthashMap

时间:2016-06-09 11:05:38      阅读:177      评论:0      收藏:0      [点我收藏+]

标签:

让HashMap陷入死循环

多线程下HashMap陷入死循环的场景,get/put都会陷入死循环的风险

public class HashMapDead {
    private static Logger logger = Logger.getLogger("hashMapDead");
    private static long    Cycle  = 100000;

    public static void main(String[] args) throws InterruptedException {
        Map<String,String> map = new HashMap<String,String>();
        CountDownLatch latch = new CountDownLatch(1);
        int i = 0;
        for (; i < 3; i++) {
            new Thread(new Write(map,i*Cycle,latch)).start();
        }
        new Thread(new Read(map,(i)*Cycle,latch)).start();
        latch.countDown();
        System.out.println(" main end");
    }

    private static class Write implements Runnable {

        private Map<String, String> map;
        private long                startIdx;
        private CountDownLatch      latch;

        public Write(Map map, long start, CountDownLatch latch) {
            this.map = map;
            this.startIdx = start;
            this.latch = latch;
        }

        /** 
         * @see java.lang.Runnable#run()
         */
        @Override
        public void run() {

            try {
                latch.await();
            } catch (InterruptedException e) {
                logger.warning(e.toString());
            }
            System.out.println(Thread.currentThread().getName() + "recover");
            long end = startIdx + Cycle;
            for (; startIdx < end; startIdx++) {
                String tmp = String.valueOf(startIdx);
                map.put(tmp, tmp);
            }
            System.out.println(Thread.currentThread().getName() + "over");
        }

    }

    private static class Read implements Runnable {

        private Map<String, String> map;
        private long                size;
        private CountDownLatch      latch;

        public Read(Map map, long end, CountDownLatch latch) {
            this.map = map;
            this.size = end;
            this.latch = latch;
        }

        /** 
         * @see java.lang.Runnable#run()
         */
        @Override
        public void run() {
            try {
                latch.await();
            } catch (InterruptedException e) {
                logger.warning(e.toString());
            }
            System.out.println(Thread.currentThread().getName() + "recover");
            while (true) {
                Random random = new Random();
                long l = random.nextLong() % size - 1;
                String result = map.get(String.valueOf(l));
                System.out.println(System.currentTimeMillis() + ": " + result);
            }
        }
    }
}

ConcurrentHashMap

  • 内部结构 基于jdk1.7
    技术分享
    final Segment<K,V>[] segments;
    采用了segment[]数组 分段,segment类似于一个HashMap。segment数组是不可以增长的,在默认情况下是为16。在每个segment中HashEntry[]数组是可以增长的,和hashMap的resize一样。但是HashEntry中的resize是安全的,不会出现死循环的情况

ConcurrentHashMap通过两次散列,把数据存储到HashEntry中。第一散列定位到哪个segment,第二次散列到segment中哪个HashEntry。hash算法决定着ConcurrentHashMap的效率,目的是把key均匀地散列到segment和HashEntry中。

put 写上锁

* 先定位到某一个segment,
* 然后操作特定的segment.put(此处会有锁;不同的segment,锁都独立)

Segment继承了ReentrantLock,在每次修改segment的时候都会上锁

final V put(K key, int hash, V value, boolean onlyIfAbsent) {
    HashEntry<K,V> node = tryLock() ? null :
        scanAndLockForPut(key, hash, value);
    V oldValue;
    try {
        //定位到那个HashEntry,
        HashEntry<K,V>[] tab = table;
        int index = (tab.length - 1) & hash;
        HashEntry<K,V> first = entryAt(tab, index);
        //插入键值对,这个方法是自己抽取的
        putKeyValue();
    } finally {
        unlock();
    }
    return oldValue;
}

scanAndLockForPut(key,hash,value)的作用获取到锁,如果元素不存在,则先创建出一个HashEntry,供后续使用。这是在首次trylock失败的时候才会进行,猜测的目的就是为了创建出不存在的HashEntry。

scanAndLockForPut方法的代码

private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
    HashEntry<K,V> first = entryForHash(this, hash);
    HashEntry<K,V> e = first;
    HashEntry<K,V> node = null;
    int retries = -1; // negative while locating node
    while (!tryLock()) {
        HashEntry<K,V> f; // to recheck first below
        if (retries < 0) {//第一次检索,遇到了相同key,或者检索完链表,将retries设置为0
            if (e == null) {//链表的最后一个元素
                if (node == null) // speculatively create node
                    node = new HashEntry<K,V>(hash, key, value, null);
                retries = 0;
            }
            else if (key.equals(e.key))
                retries = 0;
            else
                e = e.next;
        }
        else if (++retries > MAX_SCAN_RETRIES) {//超过后直接堵塞线程,让出计算能力
            lock();
            break;
        }
        else if ((retries & 1) == 0 &&
                 (f = entryForHash(this, hash)) != first) {//检查被resize了,重新检索链表,HashEntry[]是volatile的,有可见性保证,此处的代码才可行
            e = first = f; // re-traverse if entry changed
            retries = -1;
        }
    }
    return node;
}

putKeyValue这段代码在执行的时候线程就已经获取到了锁()

for (HashEntry<K,V> e = first;;) {
if (e != null) {
    K k;
    //找到了相同的key
    if ((k = e.key) == key ||
        (e.hash == hash && key.equals(k))) {
        oldValue = e.value;
        //对于putIfAbsent的支持
        if (!onlyIfAbsent) {
            e.value = value;
            ++modCount;
        }
        break;
    }
    e = e.next;
}
//遍历完了整个链表没有找到元素
else {
    //在scanAndLockForPut中创建出来了
    if (node != null)
        node.setNext(first);//直接放在链表的最前面,链表很适合实现stack
    else
        node = new HashEntry<K,V>(hash, key, value, first);
    int c = count + 1;
    //如果当前segment的数量超过了阈值,且segment中HashEntry数组的长度没有超过最大长度,重hash,此时node在原来的HashEntry[]并访问不了
    if (c > threshold && tab.length < MAXIMUM_CAPACITY)
        rehash(node);
    else
        setEntryAt(tab, index, node);//改了HashEntry[]中第index个元素,而且是插入在头部,必须写回
    ++modCount;
    count = c;
    oldValue = null;
    break;
}

rehash(node)函数 这个是将segment中table扩大两倍,并且将node放到新的table中。
rehash是在锁的保护中执行的(在put的内部),所以是安全。并且不会修改旧的table的结构,这是不会死循环的原因

/**
 * Doubles size of table and repacks entries, also adding the
 * given node to new table
 */
@SuppressWarnings("unchecked")
// 
private void rehash(HashEntry<K,V> node) {
    /*
     * Reclassify nodes in each list to new table.  Because we
     * are using power-of-two expansion, the elements from
     * each bin must either stay at same index, or move with a
     * power of two offset. We eliminate unnecessary node
     * creation by catching cases where old nodes can be
     * reused because their next fields won‘t change.
     * Statistically, at the default threshold, only about
     * one-sixth of them need cloning when a table
     * doubles. The nodes they replace will be garbage
     * collectable as soon as they are no longer referenced by
     * any reader thread that may be in the midst of
     * concurrently traversing table. Entry accesses use plain
     * array indexing because they are followed by volatile
     * table write.
     */
    HashEntry<K,V>[] oldTable = table;
    int oldCapacity = oldTable.length;
    int newCapacity = oldCapacity << 1;//新的容量扩大两倍
    threshold = (int)(newCapacity * loadFactor);//扩容因子是决定下一次rehash的大小,容量的占比
    HashEntry<K,V>[] newTable =
        (HashEntry<K,V>[]) new HashEntry[newCapacity];
    int sizeMask = newCapacity - 1;
    for (int i = 0; i < oldCapacity ; i++) {
        HashEntry<K,V> e = oldTable[i];
        if (e != null) {
            HashEntry<K,V> next = e.next;
            int idx = e.hash & sizeMask;
            if (next == null)   //  Single node on list
                newTable[idx] = e;
            else { // Reuse consecutive sequence at same slot
                HashEntry<K,V> lastRun = e;
                int lastIdx = idx;
                //找到能复用的最后那段
                for (HashEntry<K,V> last = next;
                     last != null;
                     last = last.next) {
                    int k = last.hash & sizeMask;
                    if (k != lastIdx) {
                        lastIdx = k;
                        lastRun = last;
                    }
                }
                newTable[lastIdx] = lastRun;
                // Clone remaining nodes
                //将剩下的链表重新添加到新的table中(只会涉及到两个HashEntry,因为是x2 扩容)
                for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
                    V v = p.value;
                    int h = p.hash;
                    int k = h & sizeMask;
                    HashEntry<K,V> n = newTable[k];
                    newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
                }
            }
        }
    }
    //对于老的table  rehash完成,加入新的node,直接加载链表的头
    int nodeIndex = node.hash & sizeMask; // add the new node
    node.setNext(newTable[nodeIndex]);
    newTable[nodeIndex] = node;
    table = newTable;
}

remove 操作

Concurrenthashmap先是找到了哪一个segment,然后委派给了segment.remove(锁保护)。定位segment中哪个HashEntry,接下来就是链表操作。
count和modCount虽然都是在锁保护的情况下修改的,但是读都没用到锁

/**
 * Remove; match on key only if value null, else match both.
 */
final V remove(Object key, int hash, Object value) {
    if (!tryLock())
        scanAndLock(key, hash);//感觉这里的scanAndLock没有任何作用,还是直接lock好了
    V oldValue = null;
    try {
        HashEntry<K,V>[] tab = table;
        int index = (tab.length - 1) & hash;
        HashEntry<K,V> e = entryAt(tab, index);//定位到那个hashEntry,开头
        HashEntry<K,V> pred = null;
        while (e != null) {
            K k;
            HashEntry<K,V> next = e.next;
            if ((k = e.key) == key ||
                (e.hash == hash && key.equals(k))) {
                V v = e.value;
                if (value == null || value == v || value.equals(v)) {
                    if (pred == null)//如果是链表头,直接将第二个元素设为表头
                        setEntryAt(tab, index, next);
                    else
                        pred.setNext(next);
                    ++modCount;
                    --count;
                    oldValue = v;
                }
                break;
            }
            pred = e;
            e = next;
        }
    } finally {
        unlock();
    }
    return oldValue;
}

get 读无锁,弱一致性

先定位到哪个segment,然后用定位到segment中的哪个HashEntry,遍历查找。因为是无锁的,所以无法保证线程A先调用put(k1,v1),线程B后调用get(k1)一定能取到v1。这个是concurrentHashmap的设计决定的,将锁封装在内部(各个segment上),对ConcurrentHashMap整个加锁就变成了hashTable。如果要保证强一致性,就只能使用HashTable,或者Collections.synchronizedMap,一个装饰器

public V get(Object key) {
    Segment<K,V> s; // manually integrate access methods to reduce overhead
    HashEntry<K,V>[] tab;
    int h = hash(key);
    long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
    if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
        (tab = s.table) != null) {
        for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
                 (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
             e != null; e = e.next) {
            K k;
            if ((k = e.key) == key || (e.hash == h && key.equals(k)))
                return e.value;
        }
    }
    return null;
}

size

size会先尝试无锁的计算整个map的size,将所有的segment中的count加起来,如果连续两次计算出来的值都相同就认为map在这个时间窗口内没有做过修改。如果尝试3次失败后就直接锁住所有的segments,累加所有的count

/**
 * Returns the number of key-value mappings in this map.  If the
 * map contains more than <tt>Integer.MAX_VALUE</tt> elements, returns
 * <tt>Integer.MAX_VALUE</tt>.
 *
 * @return the number of key-value mappings in this map
 */
public int size() {
    // Try a few times to get accurate count. On failure due to
    // continuous async changes in table, resort to locking.
    final Segment<K,V>[] segments = this.segments;
    int size;
    boolean overflow; // true if size overflows 32 bits
    long sum;         // sum of modCounts
    long last = 0L;   // previous sum
    int retries = -1; // first iteration isn‘t retry
    try {
        for (;;) {
            if (retries++ == RETRIES_BEFORE_LOCK) {
                for (int j = 0; j < segments.length; ++j)
                    ensureSegment(j).lock(); // force creation
            }
            sum = 0L;
            size = 0;
            overflow = false;
            for (int j = 0; j < segments.length; ++j) {
                Segment<K,V> seg = segmentAt(segments, j);
                if (seg != null) {
                    sum += seg.modCount;
                    int c = seg.count;
                    if (c < 0 || (size += c) < 0)
                        overflow = true;
                }
            }
            if (sum == last)
                break;
            last = sum;
        }
    } finally {
        if (retries > RETRIES_BEFORE_LOCK) {
            for (int j = 0; j < segments.length; ++j)
                segmentAt(segments, j).unlock();
        }
    }
    return overflow ? Integer.MAX_VALUE : size;
}

containsXxx

1. containsKey
    实现方式和get()原理相同,先定位那个segment,然后定位哪个HashEntry,然后遍历
2. containsValue
    实现方式和size()原理类似,检索value只能遍历整个map。  

containsKey实现,结构上和get一样

public boolean containsKey(Object key) {
    Segment<K,V> s; // same as get() except no need for volatile value read
    HashEntry<K,V>[] tab;
    int h = hash(key);
    long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
    if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
        (tab = s.table) != null) {
        for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
                 (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
             e != null; e = e.next) {
            K k;
            if ((k = e.key) == key || (e.hash == h && key.equals(k)))
                return true;
        }
    }
    return false;
}

containsValue会先在无锁的情况下遍历整个map多次,如果连续两次map的modCount没变就认为这个map没被修改过。直接结束了,否则就会锁住全部的segment遍历。这里也体现了尽量无锁的操作

public boolean containsValue(Object value) {
    // Same idea as size()
    if (value == null)
        throw new NullPointerException();
    final Segment<K,V>[] segments = this.segments;
    boolean found = false;
    long last = 0;
    int retries = -1;
    try {
        outer: for (;;) {
            if (retries++ == RETRIES_BEFORE_LOCK) {
                for (int j = 0; j < segments.length; ++j)
                    ensureSegment(j).lock(); // force creation
            }
            long hashSum = 0L;
            int sum = 0;
            for (int j = 0; j < segments.length; ++j) {
                HashEntry<K,V>[] tab;
                Segment<K,V> seg = segmentAt(segments, j);
                if (seg != null && (tab = seg.table) != null) {
                    for (int i = 0 ; i < tab.length; i++) {
                        HashEntry<K,V> e;
                        for (e = entryAt(tab, i); e != null; e = e.next) {
                            V v = e.value;
                            if (v != null && value.equals(v)) {
                                found = true;
                                break outer;
                            }
                        }
                    }
                    sum += seg.modCount;
                }
            }
            if (retries > 0 && sum == last)
                break;
            last = sum;
        }
    } finally {
        if (retries > RETRIES_BEFORE_LOCK) {
            for (int j = 0; j < segments.length; ++j)
                segmentAt(segments, j).unlock();
        }
    }
    return found;
}

HashEntry的resize安全,HashMap的死循环

hashmap中resize的代码
决定是否要resize的标准是当前的size是否超过了table[]的长度 * loadFactor,默认的loadFactor是0.75,那就意味着HashMap还没有填充满就要被重新散列,不知道为什么? ConcurrentHashMap中segment的设计也是如此。

resize关键代码

void resize(int newCapacity) {
    Entry[] oldTable = table;
    int oldCapacity = oldTable.length;
    if (oldCapacity == MAXIMUM_CAPACITY) {
        threshold = Integer.MAX_VALUE;
        return;
    }

    Entry[] newTable = new Entry[newCapacity];
    transfer(newTable, initHashSeedAsNeeded(newCapacity));
    table = newTable;
    threshold = (int)Math.min(newCapacity * loadFactor, MAXIMUM_CAPACITY + 1);
}


void transfer(Entry[] newTable, boolean rehash) {
    int newCapacity = newTable.length;
    for (Entry<K,V> e : table) {
        while(null != e) {
            Entry<K,V> next = e.next;
            if (rehash) {
                e.hash = null == e.key ? 0 : hash(e.key);
            }
            int i = indexFor(e.hash, newCapacity);
            e.next = newTable[i];//这里改变了原先的结构,如果出现了回环就会出现死循环
            newTable[i] = e;
            e = next;
        }
    }
}

HashMap中Entry的next不是volatile的,没有可见性保证。如果把next设置为volatile就能可以保证不是死循环。如果每次也使用new的方式 也能避免出现回环。但是HashMap使用的场景就是单线程。一次volatile的读写是普通的45倍左右,在自己的机子上测试。 table的size是2的整数次幂,是为了将取模操作直接转换成位与,加快速度。2^n-1刚好为为n个1

public V put(K key, V value) {
    if (table == EMPTY_TABLE) {
        inflateTable(threshold);
    }
    if (key == null)
        return putForNullKey(value);
    int hash = hash(key);
    int i = indexFor(hash, table.length);
    //查找是否已经存在了,这里如果并发写入也可能会出现死循环。resize的时候出现了回环
    for (Entry<K,V> e = table[i]; e != null; e = e.next) {
        Object k;
        if (e.hash == hash && ((k = e.key) == key || key.equals(k))) {
            V oldValue = e.value;
            e.value = value;
            e.recordAccess(this);
            return oldValue;
        }
    }

    modCount++;
    addEntry(hash, key, value, i);
    return null;
}

在HashMap在put的时候也会出现死循环,因为先会判断当前的key是否已经存在,也会遍历table[i]上的链表。
ConcurrentHashMap中segment中rehash时,不能复用的都是直接new出来的,所以不会打乱之前segment中table[i]中顺序,不会出现回环。

迭代器实现 弱一致性

提供了一个基础的迭代器,供key,value,entry的迭代器使用。基本思路是遍历全部的segements和每个segment中的HashEntrys。都是倒序遍历(i–的方式),当HashEntry或者Segment遍历完,就跳到他的上一个。因为这个是不同步的,所以可能在遍历的时候可能会出现比真实的少值(rehash扩容的时候);如果i++,可能会出现一个键值对在迭代器被出现了两次,也是在rehash的时候出现的。

abstract class HashIterator {

    int nextSegmentIndex;
    int nextTableIndex;
    HashEntry<K,V>[] currentTable;
    HashEntry<K, V> nextEntry;
    HashEntry<K, V> lastReturned;
    HashIterator() {
        nextSegmentIndex = segments.length - 1;
        nextTableIndex = -1;
        advance();
    }

    /**
     * Set nextEntry to first node of next non-empty table
     * (in backwards order, to simplify checks).
     */
    final void advance() {
        for (;;) {
            if (nextTableIndex >= 0) {
                if ((nextEntry = entryAt(currentTable,
                                         nextTableIndex--)) != null)
                    break;
            }
            else if (nextSegmentIndex >= 0) {
                Segment<K,V> seg = segmentAt(segments, nextSegmentIndex--);
                if (seg != null && (currentTable = seg.table) != null)
                    nextTableIndex = currentTable.length - 1;
            }
            else
                break;
        }
    }

    final HashEntry<K,V> nextEntry() {
        HashEntry<K,V> e = nextEntry;
        if (e == null)
            throw new NoSuchElementException();
        lastReturned = e; // cannot assign until after null check
        if ((nextEntry = e.next) == null)
            advance();
        return e;
    }

    public final boolean hasNext() { return nextEntry != null; }
    public final boolean hasMoreElements() { return nextEntry != null; }

    public final void remove() {
        if (lastReturned == null)
            throw new IllegalStateException();
        ConcurrentHashMap.this.remove(lastReturned.key);
        lastReturned = null;
    }
}

keyIterator,valueIterator, EntryIterator都是继承这个父类,在他的基础上实现了next方法就ok了(其实就是调用父类的nextEntry.xxx)。内部类可以共享外部类的属性,迭代器就是用了这个特点。用了final类不允许继承。
HashMap的迭代器可以理解为强一致性的,因为map在迭代遍历的时候,如果被修改了就会抛出CurrentModifyException。具体实现是基础迭代器的构造函数中传入一个当前map的modCount记为pre,迭代器在取下一个值的时候会检查当前的modCount和pre是否一致

 final Entry<K,V> nextEntry() {
    if (modCount != expectedModCount)
        throw new ConcurrentModificationException();
    Entry<K,V> e = next;
    if (e == null)
        throw new NoSuchElementException();

    if ((next = e.next) == null) {//一个entry的链表遍历完,换到下一个
        Entry[] t = table;
        while (index < t.length && (next = t[index++]) == null)
            ;
    }
    current = e;
    return e;
}

sun.misc.Unsafe

很多操作都使用了Unsafa这个类,具体的用法待补充

Segment内部属性

volitole HashEntry[] table;这个设置为volatile是为了保证可见性,一写多读,在修改table的时候都是有锁保护的,保证了竞态条件;volatile保证了写了立即对读线程生效。

int count; 这两个或许设置成volatile也更加合适,虽然在真正计算的时候会有重试保证,但因为不是volatile也一定能保证,重试失败,会全部加锁segment,分别读取count,后解锁
int modCount; 在迭代器中是不管modCount的,增删改操作会立即在迭代器生效,并且无感知,不像HashMap会有异常抛出

hashmap&concurrenthashMap

标签:

原文地址:http://blog.csdn.net/carey_ooo/article/details/51619646

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!