码迷,mamicode.com
首页 > 其他好文 > 详细

15.并发容器之ConcurrentLinkedQueue

时间:2019-02-08 20:02:34      阅读:141      评论:0      收藏:0      [点我收藏+]

标签:hot   总结   程序   fifo   常用   速度慢   修改   another   boolean   

 

1.ConcurrentLinkedQueue简介

在单线程编程中我们会经常用到一些集合类,比如ArrayList,HashMap等,但是这些类都不是线程安全的类。在面试中也经常会有一些考点,比如ArrayList不是线程安全的,Vector是线程安全。而保障Vector线程安全的方式,是非常粗暴的在方法上用synchronized独占锁,将多线程执行变成串行化。要想将ArrayList变成线程安全的也可以使用Collections.synchronizedList(List<T> list)将ArrayList转换成线程安全的,但这种转换方式依然是通过synchronized修饰方法实现的,很显然这不是一种高效的方式,同时,队列也是我们常用的一种数据结构,为了解决线程安全的问题,Doug Lea大师为我们准备了ConcurrentLinkedQueue这个线程安全的队列。从类名就可以看的出来实现队列的数据结构是链式。

1.1 Node

要想先学习ConcurrentLinkedQueue自然而然得先从它的节点类看起,明白它的底层数据结构。Node类的源码为:

private static class Node<E> {
       volatile E item;
       volatile Node<E> next;
.......
}

Node节点主要包含了两个域:一个是数据域item,另一个是next指针,用于指向下一个节点从而构成链式队列。并且都是用volatile进行修饰的,以保证内存可见性(关于volatile可以看这篇文章)。另外ConcurrentLinkedQueue含有这样两个成员变量:

private transient volatile Node<E> head;
private transient volatile Node<E> tail;

说明ConcurrentLinkedQueue通过持有头尾指针进行管理队列。当我们调用无参构造器时,其源码为:

public ConcurrentLinkedQueue() {
   head = tail = new Node<E>(null);
}

head和tail指针会指向一个item域为null的节点,此时ConcurrentLinkedQueue状态如下图所示:

如图,head和tail指向同一个节点Node0,该节点item域为null,next域为null。

 

技术图片

 

1.2 操作Node的几个CAS操作

在队列进行出队入队的时候免不了对节点需要进行操作,在多线程就很容易出现线程安全的问题。可以看出在处理器指令集能够支持CMPXCHG指令后,在java源码中涉及到并发处理都会使用CAS操作(关于CAS操作可以看这篇文章的第3.1节),那么在ConcurrentLinkedQueue对Node的CAS操作有这样几个:

//更改Node中的数据域item   
boolean casItem(E cmp, E val) {
   return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}
//更改Node中的指针域next
void lazySetNext(Node<E> val) {
   UNSAFE.putOrderedObject(this, nextOffset, val);
}
//更改Node中的指针域next
boolean casNext(Node<E> cmp, Node<E> val) {
   return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}

可以看出这些方法实际上是通过调用UNSAFE实例的方法,UNSAFE为sun.misc.Unsafe类,该类是hotspot底层方法,目前为止了解即可,知道CAS的操作归根结底是由该类提供就好。

2.offer方法

对一个队列来说,插入满足FIFO特性,插入元素总是在队列最末尾的地方进行插入,而取(移除)元素总是从队列的队头。所有要想能够彻底弄懂ConcurrentLinkedQueue自然而然是从offer方法和poll方法开始。那么为了能够理解offer方法,采用debug的方式来一行一行的看代码走。另外,在看多线程的代码时,可采用这样的思维方式:

单个线程offer 多个线程offer 部分线程offer,部分线程poll ----offer的速度快于poll --------队列长度会越来越长,由于offer节点总是在对队列队尾,而poll节点总是在队列对头,也就是说offer线程和poll线程两者并无“交集”,也就是说两类线程间并不会相互影响,这种情况站在相对速率的角度来看,也就是一个"单线程offer" ----offer的速度慢于poll --------poll的相对速率快于offer,也就是队头删的速度要快于队尾添加节点的速度,导致的结果就是队列长度会越来越短,而offer线程和poll线程就会出现“交集”,即那一时刻就可以称之为offer线程和poll线程同时操作的节点为 临界点 ,且在该节点offer线程和poll线程必定相互影响。根据在临界点时offer和poll发生的相对顺序又可从两个角度去思考:1. 执行顺序为offer-->poll-->offer,即表现为当offer线程在Node1后插入Node2时,此时poll线程已经将Node1删除,这种情况很显然需要在offer方法中考虑; 2.执行顺序可能为:poll-->offer-->poll,即表现为当poll线程准备删除的节点为null时(队列为空队列),此时offer线程插入一个节点使得队列变为非空队列

先看这么一段代码:

1. ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<>();
2. queue.offer(1);
3. queue.offer(2);

创建一个ConcurrentLinkedQueue实例,先offer 1,然后再offer 2。offer的源码为:

public boolean offer(E e) {
   checkNotNull(e);
   // 新建一个node
   final Node<E> newNode = new Node<E>(e);
   
   // 不断重试(for只有初始化条件,没有判断条件),直到将node加入队列
   // 初始化p、t都是指向tail
   // 循环过程中一直让p指向最后一个节点。让t指向tail
   for (Node<E> t = tail, p = t;;) {
       // q一直指向p的下一个
       Node<E> q = p.next;
       if (q == null) {
           // p is last node
           // 如果q为null表示p是最后一个元素,尝试加入队列
           // 如果失败,表示其他线程已经修改了p指向的节点
           if (p.casNext(null, newNode)) {
               // Successful CAS is the linearization point
               // for e to become an element of this queue,
               // and for newNode to become "live".
               // node加入队列之后,tail距离最后一个节点已经相差大于一个了,需要更新tail
               if (p != t) // hop two nodes at a time
                   // 这儿允许设置tail为最新节点的时候失败,因为添加node的时候是根据p.next是不是为null判断的,
                   casTail(t, newNode);  // Failure is OK.
               return true;
          }
           // Lost CAS race to another thread; re-read next
      }
       else if (p == q)
           // 虽然q是p.next,但是因为是多线程,在offer的同时也在poll,如offer的时候正好p被poll了,那么在poll方法中的updateHead方法会将head指向当前的q,而把p.next指向自己,即:p.next == p
           // 这个时候就会造成tail在head的前面,需要重新设置p
           // 如果tail已经改变,将p指向tail,但这个时候tail依然可能在head前面
           // 如果tail没有改变,直接将p指向head
           // We have fallen off list. If tail is unchanged, it
           // will also be off-list, in which case we need to
           // jump to head, from which all live nodes are always
           // reachable. Else the new tail is a better bet.
           p = (t != (t = tail)) ? t : head;
       else
           // Check for tail updates after two hops.
           // tail已经不是最后一个节点,将p指向最后一个节点(单线程下这个永远不会成立)
           p = (p != t && t != (t = tail)) ? t : q;
  }
}

单线程执行角度分析

先从单线程执行的角度看起,分析offer 1的过程。第1行代码会对是否为null进行判断,为null的话就直接抛出空指针异常,第2行代码将e包装成一个Node类,第3行为for循环,只有初始化条件没有循环结束条件,这很符合CAS的“套路”,在循环体CAS操作成功会直接return返回,如果CAS操作失败的话就在for循环中不断重试直至成功。这里实例变量t被初始化为tail,p被初始化为t即tail。为了方便下面的理解,p被认为队列真正的尾节点,tail不一定指向对象真正的尾节点,因为在ConcurrentLinkedQueue中tail是被延迟更新的,具体原因我们慢慢来看。代码走到第3行的时候,t和p都分别指向初始化时创建的item域为null,next域为null的Node0。第4行变量q被赋值为null,第5行if判断为true,在第7行使用casNext将插入的Node设置成当前队列尾节点p的next节点,如果CAS操作失败,此次循环结束在下次循环中进行重试。CAS操作成功走到第8行,此时p==t,if判断为false,直接return true返回。如果成功插入1的话,此时ConcurrentLinkedQueue的状态如下图所示:

 

技术图片

如图,此时队列的尾节点应该为Node1,而tail指向的节点依然还是Node0,因此可以说明tail是延迟更新的。那么我们继续来看offer 2的时候的情况,很显然此时第4行q指向的节点不为null了,而是指向Node1,第5行if判断为false,第11行if判断为false,代码会走到第13行。好了,再插入节点的时候我们会问自己这样一个问题?上面已经解释了tail并不是指向队列真正的尾节点,那么在插入节点的时候,我们是不是应该最开始做的就是找到队列当前的尾节点在哪里才能插入?那么第13行代码就是找出队列真正的尾节点

定位队列真正的对尾节点

p = (p != t && t != (t = tail)) ? t : q;

我们来分析一下这行代码,如果这段代码在单线程环境执行时,很显然由于p==t,此时p会被赋值为q,而q等于Node<E> q = p.next,即Node1。在第一次循环中指针p指向了队列真正的队尾节点Node1,那么在下一次循环中第4行q指向的节点为null,那么在第5行中if判断为true,那么在第7行依然通过casNext方法设置p节点的next为当前新增的Node,接下来走到第8行,这个时候p!=t,第8行if判断为true,会通过casTail(t, newNode)将当前节点Node设置为队列的队尾节点,此时的队列状态示意图如下图所示:

 

技术图片

tail指向的节点由Node0改变为Node2,这里的casTail失败不需要重试的原因是,offer代码中主要是通过p的next节点q(Node<E> q = p.next)决定后面的逻辑走向的,当casTail失败时状态示意图如下:

 

技术图片

如图,如果这里casTail设置tail失败即tail还是指向Node0节点的话,无非就是多循环几次通过13行代码定位到队尾节点

通过对单线程执行角度进行分析,我们可以了解到poll的执行逻辑为:

  1. 如果tail指向的节点的下一个节点(next域)为null的话,说明tail指向的节点即为队列真正的队尾节点,因此可以通过casNext插入当前待插入的节点,但此时tail并未变化,如图2;

  2. 如果tail指向的节点的下一个节点(next域)不为null的话,说明tail指向的节点不是队列的真正队尾节点。通过q(Node<E> q = p.next)指针往前递进去找到队尾节点,然后通过casNext插入当前待插入的节点,并通过casTail方式更改tail,如图3

我们回过头再来看p = (p != t && t != (t = tail)) ? t : q;这行代码在单线程中,这段代码永远不会将p赋值为t,那么这么写就不会有任何作用,那我们试着在多线程的情况下进行分析。

多线程执行角度分析

多个线程offer

很显然这么写另有深意,其实在多线程环境下这行代码很有意思的。 t != (t = tail)这个操作并非一个原子操作,有这样一种情况:

 

技术图片

如图,假设线程A此时读取了变量t,线程B刚好在这个时候offer一个Node后,此时会修改tail指针,那么这个时候线程A再次执行t=tail时t会指向另外一个节点,很显然线程A前后两次读取的变量t指向的节点不相同,即t != (t = tail)为true,并且由于t指向节点的变化p != t也为true,此时该行代码的执行结果为p和t最新的t指针指向了同一个节点,并且此时t也是队列真正的对尾节点。那么,现在已经定位到队列真正的队尾节点,就可以执行offer操作了。

offer->poll->offer

那么还剩下第11行的代码我们没有分析,大致可以猜想到应该就是回答一部分线程offer,一部分poll的这种情况。当if (p == q)为true时,说明p指向的节点的next也指向它自己,这种节点称之为哨兵节点这种节点在队列中存在的价值不大,一般表示为要删除的节点或者是空节点。为了能够很好的理解这种情况,我们先看看poll方法的执行过程后,再回过头来看,总之这是一个很有意思的事情 :)。

3.poll方法

poll方法源码如下:

public E poll() {
   restartFromHead:
   1. for (;;) {
   2.    for (Node<E> h = head, p = h, q;;) {
   3.        E item = p.item;
?
   4.        if (item != null && p.casItem(item, null)) {
               // Successful CAS is the linearization point
               // for item to be removed from this queue.
   5.            if (p != h) // hop two nodes at a time
   6.                updateHead(h, ((q = p.next) != null) ? q : p);
   7.            return item;
          }
   8.        else if ((q = p.next) == null) {
   9.            updateHead(h, p);
   10.            return null;
          }
   11.        else if (p == q)
   12.            continue restartFromHead;
           else
   13.            p = q;
      }
  }
}

我们还是先站在单线程的角度去理清该方法的基本逻辑。假设ConcurrentLinkedQueue初始状态如下图所示:

技术图片

参数offer时的定义,我们还是先将变量p作为队列要删除真正的队头节点,h(head)指向的节点并不一定是队列的队头节点。先来看poll出Node1时的情况,由于p=h=head,参照上图,很显然此时p指向的Node1的数据域不为null,在第4行代码中item!=null判断为true后接下来通过casItem将Node1的数据域设置为null。如果CAS设置失败则此次循环结束等待下一次循环进行重试。若第4行执行成功进入到第5行代码,此时p和h都指向Node1,第5行if判断为false,然后直接到第7行return回Node1的数据域1,方法运行结束,此时的队列状态如下图。

 

技术图片

下面继续从队列中poll,很显然当前h和p指向的Node1的数据域为null,那么第一件事就是要定位准备删除的队头节点(找到数据域不为null的节点)

定位删除的队头节点

继续看,第三行代码item为null,第4行代码if判断为false,走到第8行代码(q = p.next)if也为false,由于q指向了Node2,在第11行的if判断也为false,因此代码走到了第13行,这个时候p和q共同指向了Node2,也就找到了要删除的真正的队头节点。可以总结出,定位待删除的队头节点的过程为:如果当前节点的数据域为null,很显然该节点不是待删除的节点,就用当前节点的下一个节点去试探。在经过第一次循环后,此时状态图为下图:

 

技术图片

进行下一次循环,第4行的操作同上述,当前假设第4行中casItem设置成功,由于p已经指向了Node2,而h还依旧指向Node1,此时第5行的if判断为true,然后执行updateHead(h, ((q = p.next) != null) ? q : p),此时q指向的Node3,所有传入updateHead方法的分别是指向Node1的h引用和指向Node3的q引用。updateHead方法的源码为:

final void updateHead(Node<E> h, Node<E> p) {
    if (h != p && casHead(h, p))
        h.lazySetNext(h);
}

该方法主要是通过casHead将队列的head指向Node3,并且通过 h.lazySetNext将Node1的next域指向它自己。最后在第7行代码中返回Node2的值。此时队列的状态如下图所示:

技术图片

 

Node1的next域指向它自己,head指向了Node3。如果队列为空队列的话,就会执行到代码的第8行(q = p.next) == null,if判断为true,因此在第10行中直接返回null。以上的分析是从单线程执行的角度去看,也可以让我们了解poll的整体思路,现在来做一个总结:

  1. 如果当前head,h和p指向的节点的Item不为null的话,说明该节点即为真正的队头节点(待删除节点),只需要通过casItem方法将item域设置为null,然后将原来的item直接返回即可。

  2. 如果当前head,h和p指向的节点的item为null的话,则说明该节点不是真正的待删除节点,那么应该做的就是寻找item不为null的节点。通过让q指向p的下一个节点(q = p.next)进行试探,若找到则通过updateHead方法更新head指向的节点以及构造哨兵节点(通过updateHead方法的h.lazySetNext(h)

接下来,按照上面分析offer的思维方式,下面来分析一下多线程的情况,第一种情况是;

多线程执行情况分析:

多个线程poll

现在回过头来看poll方法的源码,有这样一部分:

else if (p == q)
    continue restartFromHead;

这一部分就是处理多个线程poll的情况,q = p.next也就是说q永远指向的是p的下一个节点,那么什么情况下会使得p,q指向同一个节点呢?根据上面我们的分析,只有p指向的节点在poll的时候转变成了哨兵节点(通过updateHead方法中的h.lazySetNext)。当线程A在判断p==q时,线程B已经将执行完poll方法将p指向的节点转换为哨兵节点并且head指向的节点已经发生了改变,所以就需要从restartFromHead处执行,保证用到的是最新的head。

poll->offer->poll

试想,还有这样一种情况,如果当前队列为空队列,线程A进行poll操作,同时线程B执行offer,然后线程A在执行poll,那么此时线程A返回的是null还是线程B刚插入的最新的那个节点呢?我们来写一代demo:

public static void main(String[] args) {
    Thread thread1 = new Thread(() -> {
        Integer value = queue.poll();
        System.out.println(Thread.currentThread().getName() + " poll 的值为:" + value);
        System.out.println("queue当前是否为空队列:" + queue.isEmpty());
    });
    thread1.start();
    Thread thread2 = new Thread(() -> {
        queue.offer(1);
    });
    thread2.start();
}

输出结果为:

Thread-0 poll 的值为:null queue当前是否为空队列:false

通过debug控制线程thread1和线程thread2的执行顺序,thread1先执行到第8行代码if ((q = p.next) == null),由于此时队列为空队列if判断为true,进入if块,此时先让thread1暂停,然后thread2进行offer插入值为1的节点后,thread2执行结束。再让thread1执行,这时thread1并没有进行重试,而是代码继续往下走,返回null,尽管此时队列由于thread2已经插入了值为1的新的节点。所以输出结果为thread0 poll的为null,然队列不为空队列。因此,在判断队列是否为空队列的时候是不能通过线程在poll的时候返回为null进行判断的,可以通过isEmpty方法进行判断

4. offer方法中部分线程offer部分线程poll

在分析offer方法的时候我们还留下了一个问题,即对offer方法中第11行代码的理解。

offer->poll->offer

在offer方法的第11行代码if (p == q),能够让if判断为true的情况为p指向的节点为哨兵节点,而什么时候会构造哨兵节点呢?在对poll方法的讨论中,我们已经找到了答案,即当head指向的节点的item域为null时会寻找真正的队头节点,等到待插入的节点插入之后,会更新head,并且将原来head指向的节点设置为哨兵节点。假设队列初始状态如下图所示: 技术图片 因此在线程A执行offer时,线程B执行poll就会存在如下一种情况: 技术图片

 

 

 

如图,线程A的tail节点存在next节点Node1,因此会通过引用q往前寻找队列真正的队尾节点,当执行到判断if (p == q)时,此时线程B执行poll操作,在对线程B来说,head和p指向Node0,由于Node0的item域为null,同样会往前递进找到队列真正的队头节点Node1,在线程B执行完poll之后,Node0就会转换为哨兵节点,也就意味着队列的head发生了改变,此时队列状态为下图。

 

技术图片

此时线程A在执行判断if (p == q)时就为true,会继续执行p = (t != (t = tail)) ? t : head;,由于tail指针没有发生改变所以p被赋值为head,重新从head开始完成插入操作。

5. HOPS的设计

通过上面对offer和poll方法的分析,我们发现tail和head是延迟更新的,两者更新触发时机为:

tail更新触发时机:当tail指向的节点的下一个节点不为null的时候,会执行定位队列真正的队尾节点的操作,找到队尾节点后完成插入之后才会通过casTail进行tail更新;当tail指向的节点的下一个节点为null的时候,只插入节点不更新tail

head更新触发时机:当head指向的节点的item域为null的时候,会执行定位队列真正的队头节点的操作,找到队头节点后完成删除之后才会通过updateHead进行head更新;当head指向的节点的item域不为null的时候,只删除节点不更新head。

并且在更新操作时,源码中会有注释为:hop two nodes at a time。所以这种延迟更新的策略就被叫做HOPS的大概原因是这个(猜的 :)),从上面更新时的状态图可以看出,head和tail的更新是“跳着的”即中间总是间隔了一个。那么这样设计的意图是什么呢?

如果让tail永远作为队列的队尾节点,实现的代码量会更少,而且逻辑更易懂。但是,这样做有一个缺点,如果大量的入队操作,每次都要执行CAS进行tail的更新,汇总起来对性能也会是大大的损耗。如果能减少CAS更新的操作,无疑可以大大提升入队的操作效率,所以doug lea大师每间隔1次(tail和队尾节点的距离为1)进行才利用CAS更新tail。对head的更新也是同样的道理,虽然,这样设计会多出在循环中定位队尾节点,但总体来说读的操作效率要远远高于写的性能,因此,多出来的在循环中定位尾节点的操作的性能损耗相对而言是很小的。

6.总结:

<1>offer

public boolean offer(E e) {
        checkNotNull(e);
        final Node<E> newNode = new Node<E>(e);
        for (Node<E> t = tail, p = t;;) {
            Node<E> q = p.next;
            if (q == null) {
                if (p.casNext(null, newNode)) {
                    if (p != t) 
                        //更新tail
                        casTail(t, newNode);  
                    return true;
                }
            }
            else if (p == q)
                //哨兵重新找
                //哨兵节点是直接看别人写的,感觉比较适合,这里也这么称呼
                //这个节点的下一个节点是指向自己的
                p = (t != (t = tail)) ? t : head;
            else
                p = (p != t && t != (t = tail)) ? t : q;
        }
    }
   offer操作的代码可以说非常精简。offer先把数据建立成一个节点,然后使用到t,p两个变量同时指向tail的位置,q是p的next,只要q为null就进入cas环节。这里都好理解,但是更新tail的时候需要做p!=t的判定,这个就是比较有讲究的地方了,具体下面说,哨兵节点是在poll产生的,具体在Poll中结合的说。
        p = (p != t && t != (t = tail)) ? t : q;

这段代码利用了java的特性,就是解析成指令后,按照指令从左往右执行。

    	int t=6;
        System.out.println(t!=(t=5));
   例如上面这个程序,遇到t的时候就把6取出来,然后t=5赋值,此时表达式就变成6!=5。 以上程序,用c++重写后,结果就不是这个样子了。 

   这一句简单的话,就是在tail更新后使用新的tail的值,tail没有被其他线程更新的话,就直接让p移动到p的下一个节点的位置。以此找到正确的tail。哨兵节点的处理方式同理。

   offer方法如果返回只会返回true,所以不能用返回值来做逻辑判断。

 

tail是有条件更新的

   很多人都有这个疑问,p和t原来都是指向tail的,p经过查找,表示的是最后一个节点的位置,t经过查找,找到的是tail的位置,也就是说最后一个节点和tail节点不相同的时候,就更新tail。

  p!=t的情况发生在两种情况下。

  条件1,由于tail是有条件更新的,当tail指向最后一个节点的时候,此时添加节点后,t和p都没有变动,不会触发tail的更新,此时tail就是倒数第二个节点,等再次添加的时候,进入p!=t的条件,开始更新tail的位置。

 条件2,tail被其他线程更新了,p指向的tail,已经不是tail指向的位置了。

 

tail的极限位置

  由于这个条件,tail指向的可能是最后一个节点,可能是倒数第二个,可能是倒数第三个(就是在新加了元素后,还没执行更新tail的时候,tail此时就是节点的倒数第三个位置),在并发大的情况,下tail还可能极限到倒数第四个。最后和倒数第二个的情况,大家可以推导出来,这里再说一下倒数第四个的情况。触发条件是,tail是倒数第二的情况,然后同时两个线程一起并发添加,第一个线程使用cas添加成功,此时开始走p!=t的情况,与此同时,另外一个线程虽然cas失败,但是p的节点找到当时的最后一个,此时第一个线程添加节点成功,然后通过重新查找找到下一个节点,开始添加成功,而此时tail的cas开始执行,这个瞬间,tail还保持到原来的位置,但是已经新添加了2个节点。这个状态的时间非常短,但是有可能出现。

  为什么说极限是倒数第四个,没有倒数第五个。因为cas操作是原子的,能成功的只有一个其他都是失败。所以以下代码最多容纳两个线程,一个casnext成功,进入castail的流程,另外的一个线程是casnext刚失败,然后往后找了一个节点,执行casNext成功,就算并发量再大,其他casnext都是失败的,一个时间点上只允许一个成功,此时casTail处于刚要成功的位置,加上原来正常的倒数第二个节点,正好是倒数第四个。
                if (p.casNext(null, newNode)) {
                    if (p != t) 
                        casTail(t, newNode);

 

tail为什么不作为最后一个节点

     很多人都问这个问题,如果是正常操作的话,应该让tail成为最后一个节点,这样每次都可以直接取出tail,然后往后cas添加。以上这个思维其实是加锁的一种做法,毕竟casNext和casTail是两个操作,在不加锁的情况下,多线程跑起来,没法保证这个操作。

     我在网上找到了类似的算法,那个算法就是保证线程都结束后,tail就是最后一个节点。
p=new node();
while(true){
   //获取最后节点
   tail=getTail()
   if(tail->next==null){
      //添加节点
      if(casAddNode()){
        break;
      }

   }else{
      //更新tail节点
      cas(tail,tail->next)
   }
}
//更新tail
castail(tail,q)
     为了保证tail是最后一个,每个线程都在做tail的维护工作。但是在并发高的情况下,tail也只是保证在当前一个时间点上是最后一个元素,你在更新的时候别人还在加元素,你刚更新完tail,另外的线程就加了一个元素,所以在多线程添加元素的情况下,也只能是依靠现有的tail去查找到最后一个元素。相对于每次添加元素而言,tail并不一定是最后一个元素,而且还加入了频繁的casTail操作。

   后面又提出了一种优化方式,就是对tail的定义为接近最后一个元素的位置。
p=new node();
while(true){
   //获取最后节点
   tail=getTail()
   if(tail->next==null){
      //添加节点
      if(casAddNode()){
        break;
      }

   }else{
      //更新tail节点
      while(tail->next != null)
          tail = tail->next;
   }
}
//更新tail
castail(tail,q)
   这个是对第一种的改良,用while替换了cas操作,效率上高了很多,存在的问题就是tail的更新在完成之后,那么在并发高的情况下,还没更新tail的时候,可能已经加入了很多节点,每个都是拿着当前变量去查找最后一个节点,就算更新了tail也不会使用tail的值。每个线程找真正最后一个节点的遍历次数可能非常多。

   于是新的算法被提出来Michael & Scott的改良。ConcurrentLinkedQueue就是用了这个算法。就是tail本身定义就是接近结尾的元素。相比第二个算法的改良,就是使用了tail的更新,并不是单纯一个局部变量去更新,p = (p != t && t != (t = tail)) ? t : q;再有人更新tail就使用了tail,第二种的while循环则要和加入的速度竞争,效果和一个水缸,一个口在放水,一个口在加水一样。

 

为什么还要加入p!=t的条件,每次都更新不好吗

    Michael & Scott的改良本身对tail的定义就不是最后一个节点,所以更新只是为了防止距离太远,如果距离可以接近,那可以省去这个cas操作。只有一种情况会出现不更新,就是tail节点就是原来添加元素的最后一个节点,在并发高的情况下,p!=t的条件很容易满足,只要多线程开始处理,很多情况下都是p!=t的,都会尝试更新tail节点。慢慢的就退化成上述的第二种情况,每次都会去更新tail。

 

总结:

从源代码角度来看整个入队过程主要做二件事情。第一是定位出尾节点,第二是使用CAS算法将入队节点设置成尾节点的next节点,如不成功则重试。

第一步定位尾节点。tail节点并不总是尾节点,所以每次入队都必须先通过tail节点来找到尾节点,尾节点可能就是tail节点,也可能是tail节点的next节点。代码中循环体中的第一个if就是判断tail是否有next节点,有则表示next节点可能是尾节点。获取tail节点的next节点需要注意的是p节点等于p的next节点的情况,只有一种可能就是p节点和p的next节点都等于空,表示这个队列刚初始化,正准备添加第一次节点,所以需要返回head节点

第二步设置入队节点为尾节点。p.casNext(null, n)方法用于将入队节点设置为当前队列尾节点的next节点,p如果是null表示p是当前队列的尾节点,如果不为null表示有其他线程更新了尾节点,则需要重新获取当前队列的尾节点。

<2>poll()

public E poll() {
        restartFromHead:
        for (;;) {
            for (Node<E> h = head, p = h, q;;) {
                E item = p.item;
                if (item != null && p.casItem(item, null)) {
                    if (p != h) 
                        updateHead(h, ((q = p.next) != null) ? q : p);
                    return item;
                }
                else if ((q = p.next) == null) {
                    updateHead(h, p);
                    return null;
                }
                //哨兵节点
                else if (p == q)
                    continue restartFromHead;
                else
                    p = q;
            }
        }
    }
     poll的操作相对offer,要稍微好看一些,直接从head开始遍历,初始化的时候head是一个空节点,head表示的也不一定是第一个元素,起码初始化的时候head是一个空节点,p表示的是一个有数据的节点,h表示的就是head的位置,p!=h只有一种情况下出现,就是head是一个空节点而不是第一个元素节点,在p!=h的情况下,如果p有下一个节点,那么p的下一个节点作为head,否则p本身作为head。p的next作为head的时候,head就是指向了第一个元素,再次删除的时候,就只会把一个元素的值设置为null,并不会更新head,此时head又成为了一个表头,他的下一个元素才是第一个数据节点。

 

哨兵的产生以及相遇

    final void updateHead(Node<E> h, Node<E> p) {
        if (h != p && casHead(h, p))
            h.lazySetNext(h);
    }
    在更新头节点的时候原来的head就会自己指向自己,成为哨兵。

    当添加入第一个元素后,head和tail都指向了头结点(不是第一个元素),此时添加的时候获取tail,poll正好把head做成哨兵节点,此时找head的下一个节点的时候就会无限循环下去。

总结:

并不是每次出队时都更新head节点,当head节点里有元素时,直接弹出head节点里的元素,而不会更新head节点。只有当head节点里没有元素时,出队操作才会更新head节点。这种做法也是通过hops变量来减少使用CAS更新head节点的消耗,从而提高出队效率。

首先获取头节点的元素,然后判断头节点元素是否为空,如果为空,表示另外一个线程已经进行了一次出队操作将该节点的元素取走,需要更新head节点,如果不为空,则使用CAS的方式将头节点的引用设置成null,如果CAS成功,则直接返回头节点的元素,如果不成功,表示另外一个线程已经进行了一次出队操作更新了head节点,导致元素发生了变化,需要重新获取头节点。

<3>总结:

ConcurrentLinkedQueue通过无锁来做到了更高的并发量,是个高性能的队列,但是使用场景相对不如阻塞队列常见,毕竟取数据也要不停的去循环,不如阻塞的逻辑好设计,但是在并发量特别大的情况下,是个不错的选择,性能上好很多,而且这个队列的设计也是特别费力,尤其的使用的改良算法和对哨兵的处理。整体的思路都是比较严谨的,这个也是使用了无锁造成的,我们自己使用无锁的条件的话,这个队列是个不错的参考。

 

参考资料

《java并发编程的艺术》 《Java高并发程序设计》 ConcurrentLinkedQueue博文:https://www.cnblogs.com/sunshine-2015/p/6067709.html

15.并发容器之ConcurrentLinkedQueue

标签:hot   总结   程序   fifo   常用   速度慢   修改   another   boolean   

原文地址:https://www.cnblogs.com/itxiaok/p/10356587.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!