标签:图片 locking dex over red 重复 Once color must
CountDownLatch是一种灵活的闭锁实现。它可以使一个或多个线程等待一组事件的发生。
闭锁状态包括一个计数器,该计数器被初始化为一个正数,表示需要等待的事件的数量。countDown()方法递减计数器,表示有一个事件已经发生,而await方法等待计数器达到零,表示所有需要等待的事件都已经发生。如果计数器的值非零,那么await会一直阻塞直到计数器为零,或者等待中的线程中断,或者等待超时。
1 package com.citi.test.mutiplethread.demo0503; 2 3 import java.util.Date; 4 import java.util.concurrent.CountDownLatch; 5 6 public class TestHarness { 7 public static void main(String[] args) { 8 int counts=5; 9 final CountDownLatch startGate=new CountDownLatch(1); 10 final CountDownLatch endGate=new CountDownLatch(counts); 11 for(int i=0;i<counts;i++){ 12 new Thread(new Runnable() { 13 @Override 14 public void run() { 15 System.out.println(Thread.currentThread().getName()+" is starting"); 16 try { 17 Thread.sleep(1000); 18 startGate.await(); 19 Thread.sleep(3000); 20 } catch (InterruptedException e) { 21 // TODO Auto-generated catch block 22 e.printStackTrace(); 23 } 24 endGate.countDown(); 25 } 26 }).start(); 27 } 28 long start=System.nanoTime(); 29 System.out.println(Thread.currentThread().getName()+" is running:"+new Date()); 30 startGate.countDown(); 31 System.out.println(Thread.currentThread().getName()+new Date()); 32 try { 33 endGate.await(); 34 } catch (InterruptedException e) { 35 // TODO Auto-generated catch block 36 e.printStackTrace(); 37 } 38 long end=System.nanoTime(); 39 System.out.println(end-start); 40 } 41 }
原理:
内部是用AQS实现。其中的count就是AQS中的state,
countDownLatch 主要有几个重要方法。await(), await(long time, TimeUnit unit). countDown()
1 /** 2 * Constructs a {@code CountDownLatch} initialized with the given count. 3 * 4 * @param count the number of times {@link #countDown} must be invoked 5 * before threads can pass through {@link #await} 6 * @throws IllegalArgumentException if {@code count} is negative 7 */ 8 public CountDownLatch(int count) { 9 if (count < 0) throw new IllegalArgumentException("count < 0"); 10 this.sync = new Sync(count); 11 } 12 /** 13 * Synchronization control For CountDownLatch. 14 * Uses AQS state to represent count. 15 这里可以看到是用AQS 中的state来表示这个count 16 */ 17 private static final class Sync extends AbstractQueuedSynchronizer {} 18 19 /** 20 * Causes the current thread to wait until the latch has counted down to 21 * zero, unless the thread is {@ 22 使当前线程等待直到闭锁倒数至零,除非线程被中断 23 * <p>If the current count is zero then this method returns immediately. 24 *如果当前计数是零,则这个方法直接返回。 25 * <p>If the current count is greater than zero then the current 26 * thread becomes disabled for thread scheduling purposes and lies 27 * dormant until one of two things happen: 28 如果当前计数大于0,则当前线程为了线程调度目的变得不可用,并处于休眠状态。 29 * <ul> 30 * <li>The count reaches zero due to invocations of the 31 * {@link #countDown} method; or 32 * <li>Some other thread {@linkplain Thread#interrupt interrupts} 33 * the current thread. 34 * </ul> 35 *如果因为调用coutDown方法使计数变为0,或者其他线程中断当前线程。 36 * <p>If the current thread: 37 * <ul> 38 * <li>has its interrupted status set on entry to this method; or 39 * <li>is {@linkplain Thread#interrupt interrupted} while waiting, 40 * </ul> 41 * then {@link InterruptedException} is thrown and the current thread‘s 42 * interrupted status is cleared. 43 *如果当前线程在进入此方法时设置了中断状态,或者当等待的时候被中断,则抛出中断异常并且清空当前线程的中断状态。 44 * @throws InterruptedException if the current thread is interrupted 45 * while waiting 46 */ 47 public void await() throws InterruptedException { 48 sync.acquireSharedInterruptibly(1); 49 } 50 51 AbstractQueuedSynchronizer.class 52 /** 53 * Acquires in shared mode, aborting if interrupted. Implemented 54 * by first checking interrupt status, then invoking at least once 55 * {@link #tryAcquireShared}, returning on success. Otherwise the 56 * thread is queued, possibly repeatedly blocking and unblocking, 57 * invoking {@link #tryAcquireShared} until success or the thread 58 * is interrupted. 59 * @param arg the acquire argument 60 * This value is conveyed to {@link #tryAcquireShared} but is 61 * otherwise uninterpreted and can represent anything 62 * you like. 63 * @throws InterruptedException if the current thread is interrupted 64 65 以共享模式获取,如果中断则中止。首先检查中断状态,然后至少调用一次{@link tryacquireshared},成功返回。否则线程排队,可能重复阻塞和解除阻塞,调用{@link tryacquireshared}直到成功或线程被打断了。 66 */ 67 public final void acquireSharedInterruptibly(int arg) 68 throws InterruptedException { 69 if (Thread.interrupted()) 70 throw new InterruptedException(); 71 if (tryAcquireShared(arg) < 0) 72 doAcquireSharedInterruptibly(arg); 73 } 74 75 public static boolean interrupted() { 76 return currentThread().isInterrupted(true); 77 } 78 79 protected int tryAcquireShared(int arg) { 80 throw new UnsupportedOperationException(); 81 } 82 83 /** 84 * Acquires in shared interruptible mode. 85 * @param arg the acquire argument 86 */ 87 private void doAcquireSharedInterruptibly(int arg) 88 throws InterruptedException { 89 final Node node = addWaiter(Node.SHARED); 90 boolean failed = true; 91 try { 92 for (;;) { 93 final Node p = node.predecessor(); 94 if (p == head) { 95 int r = tryAcquireShared(arg); 96 if (r >= 0) { 97 setHeadAndPropagate(node, r); 98 p.next = null; // help GC 99 failed = false; 100 return; 101 } 102 } 103 if (shouldParkAfterFailedAcquire(p, node) && 104 parkAndCheckInterrupt()) 105 throw new InterruptedException(); 106 } 107 } finally { 108 if (failed) 109 cancelAcquire(node); 110 } 111 }
1 /** 2 * Decrements the count of the latch, releasing all waiting threads if 3 * the count reaches zero. 4 * 5 * <p>If the current count is greater than zero then it is decremented. 6 * If the new count is zero then all waiting threads are re-enabled for 7 * thread scheduling purposes. 8 * 9 * <p>If the current count equals zero then nothing happens. 10 11 减少闭锁的计数,如果计数到达0,释放所有等待的线程。 12 如果当前计数大于0,则将其递减。 13 如果新的计数是0,则所有等待的线程为了线程调度的目的被重新启用 14 如果当前计数等于0,则什么也不发生。 15 */ 16 public void countDown() { 17 sync.releaseShared(1); 18 } 19 20 /** 21 * Releases in shared mode. Implemented by unblocking one or more 22 * threads if {@link #tryReleaseShared} returns true. 23 * 24 * @param arg the release argument. This value is conveyed to 25 * {@link #tryReleaseShared} but is otherwise uninterpreted 26 * and can represent anything you like. 27 * @return the value returned from {@link #tryReleaseShared} 28 */ 29 public final boolean releaseShared(int arg) { 30 if (tryReleaseShared(arg)) { 31 doReleaseShared(); 32 return true; 33 } 34 return false; 35 } 36 37 /** 38 * Release action for shared mode -- signal successor and ensure 39 * propagation. (Note: For exclusive mode, release just amounts 40 * to calling unparkSuccessor of head if it needs signal.) 41 */ 42 private void doReleaseShared() { 43 /* 44 * Ensure that a release propagates, even if there are other 45 * in-progress acquires/releases. This proceeds in the usual 46 * way of trying to unparkSuccessor of head if it needs 47 * signal. But if it does not, status is set to PROPAGATE to 48 * ensure that upon release, propagation continues. 49 * Additionally, we must loop in case a new node is added 50 * while we are doing this. Also, unlike other uses of 51 * unparkSuccessor, we need to know if CAS to reset status 52 * fails, if so rechecking. 53 */ 54 for (;;) { 55 Node h = head; 56 if (h != null && h != tail) { 57 int ws = h.waitStatus; 58 if (ws == Node.SIGNAL) { 59 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) 60 continue; // loop to recheck cases 61 unparkSuccessor(h); 62 } 63 else if (ws == 0 && 64 !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) 65 continue; // loop on failed CAS 66 } 67 if (h == head) // loop if head changed 68 break; 69 } 70 }
1 /** 2 * Implements timed condition wait. 3 * <ol> 4 * <li> If current thread is interrupted, throw InterruptedException. 5 * <li> Save lock state returned by {@link #getState}. 6 * <li> Invoke {@link #release} with 7 * saved state as argument, throwing 8 * IllegalMonitorStateException if it fails. 9 * <li> Block until signalled, interrupted, or timed out. 10 * <li> Reacquire by invoking specialized version of 11 * {@link #acquire} with saved state as argument. 12 * <li> If interrupted while blocked in step 4, throw InterruptedException. 13 * <li> If timed out while blocked in step 4, return false, else true. 14 * </ol> 15 */ 16 public final boolean await(long time, TimeUnit unit) 17 throws InterruptedException { 18 if (unit == null) 19 throw new NullPointerException(); 20 long nanosTimeout = unit.toNanos(time); 21 if (Thread.interrupted()) 22 throw new InterruptedException(); 23 Node node = addConditionWaiter(); 24 int savedState = fullyRelease(node); 25 long lastTime = System.nanoTime(); 26 boolean timedout = false; 27 int interruptMode = 0; 28 while (!isOnSyncQueue(node)) { 29 if (nanosTimeout <= 0L) { 30 timedout = transferAfterCancelledWait(node); 31 break; 32 } 33 if (nanosTimeout >= spinForTimeoutThreshold) 34 LockSupport.parkNanos(this, nanosTimeout); 35 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) 36 break; 37 long now = System.nanoTime(); 38 nanosTimeout -= now - lastTime; 39 lastTime = now; 40 } 41 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) 42 interruptMode = REINTERRUPT; 43 if (node.nextWaiter != null) 44 unlinkCancelledWaiters(); 45 if (interruptMode != 0) 46 reportInterruptAfterWait(interruptMode); 47 return !timedout; 48 }
标签:图片 locking dex over red 重复 Once color must
原文地址:https://www.cnblogs.com/liumy/p/11602932.html