码迷,mamicode.com
首页 > 其他好文 > 详细

[源码]源代码解析 SynchronousQueue

时间:2014-06-25 19:19:39      阅读:397      评论:0      收藏:0      [点我收藏+]

标签:concurrent   synchronousqueue   

 

简析SynchronousQueue,LinkedBlockingQueue,ArrayBlockingQueue

三者都是blockingQueue.

LinkedBlockingQueue,ArrayBlockingQueue

有界,默认是Integer.Max;

SynchronousQueue没什么界不界的概念.之所以这么说.是因为它的操作必须成对.

注记方案:

oppo(oppo手机)是一对,offer和pool不阻塞

ppt是一对.put和take都阻塞.


     1. 里面没有任何数据.调用offer()无法成功,返回flase,表示填充失败.调用put被阻塞,直到有人take或者poll, 反之亦然,如下.


     2. 先take,被阻塞,直到有一个线程来offer,或者put. 

两个不同的互补碰撞发生匹配完成(fullfill). 之前的take的线程被唤醒获得offer的提供的数据.

public static void main(String[] args) throws InterruptedException {
	final	SynchronousQueue<Long> workQueue = new SynchronousQueue<Long>();
		boolean offer = workQueue.offer(2L);
		System.out.println("main thread: offer="+offer);
		ExecutorService newCachedThreadPool =Executors.newCachedThreadPool();
// 内部实现是 <span style="font-family: 'Microsoft YaHei';">new ThreadPoolExecutor(0,Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); </span>
		newCachedThreadPool.execute(new Runnable() {
			@Override
			public void run() {
				try {
					System.out.println("take thread: begin take and thread will be blocked by call  park(await) method");
					Long take = workQueue.take();
					System.out.println("take thread:   take suceffull , take object="+take);


				} catch (InterruptedException e1) {
					e1.printStackTrace();
				}				
			}
		});
		Thread.sleep(2000);
		System.out.println("main thread:  after sleep ");
	        newCachedThreadPool.execute(new Runnable() {
			@Override
			public void run() {
					long object = 123L;
					System.out.println("offer thead: begin offer "+object);
					boolean offer = workQueue.offer(object);
					System.out.println("offer thead: finish offer , is sucefully ? "+offer+" , if true, SynchronousQueue will unpark(notify) the take thread ");
			}
		});
	newCachedThreadPool.shutdown();
	}
输出:

main thread: offer=false
take thread: begin take and thread will be blocked by call  park(await) method
main thread:  after sleep 
offer thead: begin offer 123
take thread:   take suceffull , take object=123
offer thead: finish offer , is sucefully ? true , if true, SynchronousQueue will unpark(notify) the take thread 

关键术语解析:

   和其他queue不同的是SynchronousQueue的take()函数调用也有可能被添加到queue里,变成一个节点(未匹配时.)
   Node类型一共分层两种. 一种是 isDate=true. (对应offer , put 函数) 一种是isDate=false (对应 take函数)

dual queue:dual的含义就好理解了,因为只有两类,可以当isDate=true和isDate=false遇到时会匹配.可翻译为

成双的,对偶的. 对偶队列.

same mode: 相同的模式(isDate都=true,或者isDate都=false).比如take产生的Node和前面已经放入到队列中的take动作Node就属于同一个模式

fulfill(从SynchronousQueue下面的一个注释我们可以理解,具体见本文下载的摘抄中的红体字): 

字面英文翻译,完成.具体到算法里的含义是一个动作和之前的complementary(译为互补)的动作得到匹配.

complementary :互补的.比如先take,放到队列中.后面来一个offer动作就是complementary (互补)

=============

 SynchronousQueue下面的一个部分注释部分翻译.

/*
     * This class implements extensions of the dual stack and dual
     * queue algorithms described in "Nonblocking Concurrent Objects
     * with Condition Synchronization", by W. N. Scherer III and
     * M. L. Scott.  18th Annual Conf. on Distributed Computing,
     * Oct. 2004 (see also
     *

http://www.cs.rochester.edu/u/scott/synchronization/pseudocode/duals.html

).参照的算法
     * The (Lifo) stack is used for non-fair mode, and the (Fifo)
     * queue for fair mode. The performance of the two is generally
     * similar. Fifo usually supports higher throughput under
     * contention but Lifo maintains higher thread locality in common
     * applications.
     *
     * A dual queue (and similarly stack) is one that at any given
     * time either holds "data" -- items provided by put operations,
     * or "requests" -- slots representing take operations, or is
     * empty. A call to "

fulfill

" 翻译: 一个目的是为了"完成"的调用 (i.e., a call requesting an item
     * from a queue holding data or vice versa 翻译: 当queue里有数据时,一个调用请求数据,就会得到匹配. 这样的调用称为实现完成的调用 ) dequeues a
     *

complementary

node.  (翻译: 会让一个互补的节点退出队列)The most interesting feature of these
     * queues is that any operation can figure out which mode the
     * queue is in, and act accordingly without needing locks.


   下面还有一些注释未剪贴上来,比较了java实现的算法和借鉴的算法(见注释中网址)有何区别
* The algorithms here differ from the versions in the above paper
     * in extending them for use in synchronous queues, as well as
     * dealing with cancellation. The main differences include:
     *
     *  1. The original algorithms used bit-marked pointers, but
     *     the ones here use mode bits in nodes, leading to a number
     *     of further adaptations.
     *  2. SynchronousQueues must block threads waiting to become
     *     fulfilled.
     *  3. Support for cancellation via timeout and interrupts,
     *     including cleaning out cancelled nodes/threads
     *     from lists to avoid garbage retention and memory depletion.

=============再来看看SynchronousQueue.TransferQueue.transfer下面的注释.=========

 /**
         * Puts or takes an item.
         */
        Object transfer(Object e, boolean timed, long nanos) {
            /* Basic algorithm is to loop trying to take either of
             * two actions:
             *
             * 1. If queue apparently empty or holding same-mode nodes,
             *    try to add node to queue of waiters, wait to be
             *    fulfilled (or cancelled) and return matching item.
             *    了解了上面的几个术语概念.就很容易明白这句话的含义.

  当队列是空,或者是同一种Mode时,直接放入到列队尾.不会完成(fullfill)

             * 2. If queue apparently contains waiting items, and this
             *    call is of complementary mode, try to

fulfill

by CAS‘ing
             *    item field of waiting node and

dequeuing

it, and then
             *    returning matching item.
             *
             * In each case, along the way, check for and try to help
             * advance head and tail on behalf of other stalled/slow
             * threads.
             *
             * The loop starts off with a null check guarding against
             * seeing uninitialized head or tail values. This never
             * happens in current SynchronousQueue, but could if
             * callers held non-volatile/final ref to the
             * transferer. The check is here anyway because it places
             * null checks at top of loop, which is usually faster
             * than having them implicitly interspersed.
             */


            QNode s = null; // constructed/reused as needed
            boolean isData = (e != null);


            for (;;) {
                QNode t = tail;
                QNode h = head;
                if (t == null || h == null)         // saw uninitialized value
                    continue;                       // spin


                if (h == t || t.isData == isData) { // empty or same-mode

//楼主注:①
                    QNode tn = t.next;
                    if (t != tail)                  // inconsistent read
                        continue;
                    if (tn != null) {               // lagging tail
                        advanceTail(t, tn);
                        continue;
                    }
                    if (timed && nanos <= 0)        // can‘t wait
                        return null;
                    if (s == null)
                        s = new QNode(e, isData);
                    if (!t.casNext(null, s))        // failed to link in
                        continue;


                    advanceTail(t, s);              // swing tail and wait
                    Object x = awaitFulfill(s, e, timed, nanos); // 楼主注: 会被阻塞,等待其他线程互补操作时唤醒
                    if (x == s) {                   // wait was cancelled
                        clean(t, s);
                        return null;
                    }


                    if (!s.isOffList()) {           // not already unlinked
                        advanceHead(t, s);          // unlink if head
                        if (x != null)              // and forget fields
                            s.item = s;
                        s.waiter = null;
                    }
                    return (x != null)? x : e; // 楼主注: 因为是互补匹配的.要么x=null 要么 e=null  返回一个非null的值即可.不管该线程调用的是take还是put,都返回数据.


                } else {                            // complementary-mode  //楼主注:②
                    QNode m = h.next;               // node to fulfill
                    if (t != tail || m == null || h != head)
                        continue;                   // inconsistent read


                    Object x = m.item;
                    if (isData == (x != null) ||    

// m already fulfilled 
                        x == m ||                   // m cancelled
                        !m.casItem(x, e)) {         // lost CAS
                        advanceHead(h, m);          // dequeue and retry
                        continue;
                    }


                    advanceHead(h, m);              // successfully fulfilled
                    LockSupport.unpark(m.waiter);
                    return (x != null)? x : e;    // 楼主注: 因为是互补匹配的.要么x=null 要么 e=null  返回一个非null的值即可.不管该线程调用的是take还是put,都返回数据.
                }
            }
        }

我一开始没理解的一点是:
    当一个head是 isDate=false , tail是isDate=true时, 一个线程进来的操作是isDate=false时.
不会进入①,进入②后看代码又无法和head完成匹配(fullfill).
后来想明白了,这种情况不会发生.因为tail是isDate=true,这个会与head完成匹配(fullfill).换句话说.
队列里tail和head肯定是same mode.所以当①判断失败,进入②后,肯定能和head完成匹配(fulfill)

[源码]源代码解析 SynchronousQueue,布布扣,bubuko.com

[源码]源代码解析 SynchronousQueue

标签:concurrent   synchronousqueue   

原文地址:http://blog.csdn.net/fei33423/article/details/33351907

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!