标签:main 小数 info imp roc style 内存 名称 空间
前文探究了非阻塞算法的实现ConcurrentLinkedQueue安全队列,也说明了阻塞算法实现的两种方式,使用一把锁(出队和入队同一把锁ArrayBlockingQueue)和两把锁(出队和入队各一把锁LinkedBlockingQueue)来实现,今天来探究下ArrayBlockingQueue。
ArrayBlockingQueue是一个阻塞队列,底层使用数组结构实现,按照先进先出(FIFO)的原则对元素进行排序。
ArrayBlockingQueue是一个线程安全的集合,通过ReentrantLock锁来实现,在并发情况下可以保证数据的一致性。
此外,ArrayBlockingQueue的容量是有限的,数组的大小在初始化时就固定了,不会随着队列元素的增加而出现扩容的情况,也就是说ArrayBlockingQueue是一个“有界缓存区”。
从下图可以看出,ArrayBlockingQueue是使用一个数组存储元素的,当向队列插入元素时,首先会插入到数组下标索引为6的位置,再有新元素进来时插入到索引为7的位置,依次类推,如果满了就不会再插入。
当元素出队时,先移除索引为2的元素3,与入队一样,依次类推,移除索引3、4、5...上的元素。这也形成了“先进先出”。
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { //队列实现:数组 final Object[] items; //当读取元素时数组的下标(下一个被取出元素的索引) int takeIndex; //添加元素时数组的下标 (下一个被添加元素的索引) int putIndex; //队列中元素个数: int count; //可重入锁: final ReentrantLock lock; //入队操作时是否让线程等待 private final Condition notEmpty; //出队操作时是否让线程等待 private final Condition notFull; /** * 初始化队列容量构造:由于公平锁会降低队列的性能,因而使用非公平锁(默认)。 */ public ArrayBlockingQueue(int capacity) { this(capacity, false); } //带初始容量大小和公平锁队列(公平锁通过ReentrantLock实现): public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); } }
Note however, that fairness of locks does not guarantee fairness of thread scheduling. Thus, one of many threads using a fair lock may obtain it multiple times in succession while other active threads are not progressing and not currently holding the lock.
A: Request Lock -> Release Lock -> Request Lock Again (Succeeds)
B: Request Lock (Denied)...
----------------------- Time --------------------------------->
synchronized(obj) { while (true) { // .... infinite loop }
public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly();//获取锁 try { //队列中元素 == 数组长度(队列满了),则线程等待 while (count == items.length) notFull.await(); enqueue(e);//元素加入队列 } finally { lock.unlock();//释放锁 } }
public boolean add(E e) { return super.add(e);// AbstractQueue.add } public boolean add(E e) { if (offer(e))//调用实现接口 return true; else throw new IllegalStateException("Queue full"); } public boolean offer(E e) { checkNotNull(e);//检测是否有空指针异常 final ReentrantLock lock = this.lock;//获得锁对象 lock.lock();//加锁 try { //如果队列满了,返回false if (count == items.length) return false; else { //元素加入队列 enqueue(e); return true; } } finally { lock.unlock();//释放锁 } } private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; //获得数组 final Object[] items = this.items; //槽位填充元素 items[putIndex] = x; //获得下一个被添加元素的索引,如果值等于数组长度,表示到达尾部了,需要从头开始填充 if (++putIndex == items.length) putIndex = 0; count++;//数量+1 notEmpty.signal();//唤醒出队上的等待线程,表示有元素可以消费了 }
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { checkNotNull(e);//检测是否为空 long nanos = unit.toNanos(timeout);//转换成超时时间阀值 final ReentrantLock lock = this.lock; lock.lockInterruptibly();//加锁 try { //队列是否满了的判断 while (count == items.length) { if (nanos <= 0)//等待超时结束返回false return false; nanos = notFull.awaitNanos(nanos);//队列满了,等待出队有空位填充 } enqueue(e);//加入队列中 return true; } finally { lock.unlock();//释放锁 } }
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { //队列为空,进行等待 while (count == 0) notEmpty.await(); return dequeue();//返回出队元素 } finally { lock.unlock(); } }
public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { //队列为空,返回null,否则返回元素 return (count == 0) ? null : dequeue(); } finally { lock.unlock(); } } private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null; final Object[] items = this.items;//获得队列 @SuppressWarnings("unchecked") E x = (E) items[takeIndex];//获得出队元素 items[takeIndex] = null;//出队槽位元素置为null //下一个被取出元素的索引+1,如果值等于长度,表示后面没有元素了,需要从头开始取出 if (++takeIndex == items.length) takeIndex = 0; count--;//数量-1 if (itrs != null)//迭代器不为空 itrs.elementDequeued();//同时更新迭代器中的元素数据 notFull.signal();//唤醒入队线程 return x;//返回出队元素 }
public E poll(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout);//转换成超时时间阀值 final ReentrantLock lock = this.lock; lock.lockInterruptibly();//加锁 try { while (count == 0) {//队列空了,等待 if (nanos <= 0)//超时了返回null return null; nanos = notEmpty.awaitNanos(nanos);//等待入队填充元素 } return dequeue();//返回出队元素 } finally { lock.unlock();//释放锁 } }
public boolean remove(Object o) { //要移除的元素为空返回false if (o == null) return false; //获得队列数组 final Object[] items = this.items; final ReentrantLock lock = this.lock; lock.lock();//加锁 try { //队列有元素 if (count > 0) { final int putIndex = this.putIndex;//获得下一个被添加元素的索引 int i = takeIndex;//下一个被取出元素的索引 do { if (o.equals(items[i])) {//从takeIndex下标开始,找到要被删除的元素 removeAt(i);//移除 return true; } if (++i == items.length)//下一个被取出元素的索引+1并判断是否等于队列长度,如果是,表示需要从头开始遍历 i = 0; } while (i != putIndex);//继续查找,直到找到最后一个元素 } return false; } finally { lock.unlock();//解锁 } } /** * 根据下标移除元素,那么会分成两种情况一个是移除的是队首元素,一个是移除的是非队首元素,移除队首元素,就相当于出队操作, * 移除非队首元素那么中间就有空位了,后面元素需要依次补上,然后如果是队尾元素,那么putIndex也就是插入操作的下标也就需要跟着移动。 */ void removeAt(final int removeIndex) { // assert lock.getHoldCount() == 1; // assert items[removeIndex] != null; // assert removeIndex >= 0 && removeIndex < items.length; final Object[] items = this.items;//获得队列 if (removeIndex == takeIndex) {//移除的是队首元素 // removing front item; just advance items[takeIndex] = null;//队首置为null if (++takeIndex == items.length)//下一个被取出元素的索引+1并判断是否等于队列长度 takeIndex = 0; count--;//数量-1 if (itrs != null)//迭代器不为空 itrs.elementDequeued();//更新迭代器元素 } else {//移除的不是队首元素,而是中间元素 // an "interior" remove // slide over all others up through putIndex. final int putIndex = this.putIndex;//下一个被添加元素的索引 for (int i = removeIndex;;) {//对队列进行遍历,因为是队列中间的值被移除了,所有后面的元素都要挨个迁移 int next = i + 1;//获取移除元素的下一个坐标 if (next == items.length)//判断是否等于队列长度 next = 0; if (next != putIndex) {//获取移除元素的下一个坐标!=下一个被添加元素的索引,表示移除元素的索引后面有值 items[i] = items[next];//当前要移除的元素置为后面的元素,即对后面的元素往前迁移,覆盖要移除的元素 i = next;//下一个迁移的索引 } else {//移除的元素是最后一个,后面没有值了 items[i] = null;//移除元素,直接置为null this.putIndex = i;//更新下一个被添加元素的索引 break;//结束 } } count--;//数量-1 if (itrs != null)//迭代器不为空 itrs.removedAt(removeIndex);//更新迭代器元素 } notFull.signal();//唤醒入队线程,可以添加元素了 }
public void clear() { final Object[] items = this.items;//获得队列 final ReentrantLock lock = this.lock; lock.lock(); try { int k = count;//获取元素数量 if (k > 0) {//有元素,表示队列不为空 final int putIndex = this.putIndex;//下一个被添加元素的索引 int i = takeIndex;//下一个被取出元素的索引 do { items[i] = null;//对每个有元素的槽位置为null if (++i == items.length) i = 0; } while (i != putIndex);//从有元素的第一个槽位开始遍历,直到槽位元素为null takeIndex = putIndex;//更新取出和添加的索引 count = 0;//数量更新为0 if (itrs != null)//迭代器不为空 itrs.queueIsEmpty();//更新迭代器为空 //若有等待notFull条件的线程,则逐一唤醒 for (; k > 0 && lock.hasWaiters(notFull); k--) notFull.signal();//唤醒入队线程,可以添加元素了 } } finally { lock.unlock(); } }
AbstractQueuedSynchronizer: //进行超时控制 public final long awaitNanos(long nanosTimeout) throws InterruptedException { //如果当前线程中断了抛出中断异常 if (Thread.interrupted()) throw new InterruptedException(); //当前线程加入到Condition队列中 Node node = addConditionWaiter(); //锁释放是否成功:释放当前线程的lock,从AQS的队列中移出 int savedState = fullyRelease(node); //到达等待时间点 final long deadline = System.nanoTime() + nanosTimeout; //中断标识 int interruptMode = 0; //当前节点是否在同步队列中,否表示不在,进入挂起判断操作,如果已经在Sync队列中,则退出循环 //那什么时候会把当前线程又加入到Sync队列中呢?当然是调用signal方法的时候,因为这里需要唤醒之前调用await方法的线程,唤醒之后进行下面的获取锁等操作 while (!isOnSyncQueue(node)) { //如果超时了,将线程挂起,然后停止遍历 if (nanosTimeout <= 0L) { transferAfterCancelledWait(node); break; } //如果等待时间间隔超过了1000,继续挂起 if (nanosTimeout >= spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); //线程中断了停止遍历 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; //获得剩余的等待时间间隔 nanosTimeout = deadline - System.nanoTime(); } //结束挂起,acquireQueued自旋对当前线程的队列出队进行获取锁并返回线程是否中断 //如果线程被中断,并且中断的方式不是抛出异常,则设置中断后续的处理方式设置为REINTERRUPT if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT;//中断标识更新为退出等待时重新中断 if (node.nextWaiter != null)//当前节点后面还有节点,多并发操作了 unlinkCancelledWaiters();//从头到尾遍历Condition队列,移除被cancel的节点 //如果线程已经被中断,则根据之前获取的interruptMode的值来判断是继续中断还是抛出异常 if (interruptMode != 0) reportInterruptAfterWait(interruptMode); return deadline - System.nanoTime();//返回剩余等待时间 }
//最多从此队列中移除给定数量的可用元素,并将这些元素添加到给定collection中 public int drainTo(Collection<? super E> c) { return drainTo(c, Integer.MAX_VALUE); } public int drainTo(Collection<? super E> c, int maxElements) { checkNotNull(c);//检查是否为空 if (c == this)//如果集合类型相同抛出参数异常 throw new IllegalArgumentException(); if (maxElements <= 0)//如果给定移除数量小于0,返回0,表示不做移除操作 return 0; final Object[] items = this.items;//获得队列 final ReentrantLock lock = this.lock; lock.lock();//加锁 try { int n = Math.min(maxElements, count);//获得元素的最小数量 int take = takeIndex;//下一个被取出元素的索引 int i = 0; try { while (i < n) {//遍历移除和添加 @SuppressWarnings("unchecked") E x = (E) items[take];//获得移除元素 c.add(x);//元素添加到直到集合中 items[take] = null;//元素原先队列位置置为null if (++take == items.length)//如果取出索引到达尾部,从头开始遍历取出 take = 0; i++;//移除的数量+1,如果达到了移除的最小数量,结束遍历 } return n;//返回一共移除并添加了多少个元素 } finally { // Restore invariants even if c.add() threw if (i > 0) {//如果有移除操作 count -= i;//队列元素数量-i takeIndex = take;//重置下一个被取出元素的索引 if (itrs != null) {//迭代器不为空 if (count == 0)//队列空了 itrs.queueIsEmpty();//迭代器清空 else if (i > take)//说明take中间变成0了,通知itr itrs.takeIndexWrapped(); } //唤醒在因为队列满而等待的入队线程,最多唤醒i个,避免线程被唤醒了因为队列又满了而阻塞 for (; i > 0 && lock.hasWaiters(notFull); i--) notFull.signal(); } } } finally { lock.unlock(); } }
<appender name="PROJECT" class="ch.qos.logback.core.FileAppender"> <file>project.log</file> <encoding>UTF-8</encoding> <append>true</append> <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> <!-- daily rollover --> <fileNamePattern>project.log.%d{yyyy-MM-dd}</fileNamePattern> <!-- keep 7 days‘ worth of history --> <maxHistory>7</maxHistory> </rollingPolicy> <layout class="ch.qos.logback.classic.PatternLayout"> <pattern> <![CDATA[%n%-4r [%d{yyyy-MM-dd HH:mm:ss}] %X{productionMode} - %X{method} %X{requestURIWithQueryString} [ip=%X{remoteAddr}, ref=%X{referrer}, ua=%X{userAgent}, sid=%X{cookie.JSESSIONID}]%n %-5level %logger{35} - %m%n]]> </pattern> </layout> </appender> <appender name="asyncProject" class="ch.qos.logback.classic.AsyncAppender"> <discardingThreshold>0</discardingThreshold> <queueSize>1024</queueSize> <neverBlock>true</neverBlock> <appender-ref ref="PROJECT" /> </appender> <logger name="PROJECT_LOGGER" additivity="false"> <level value="WARN" /> <appender-ref ref="asyncProject" /> </logger>
public void start() { if (isStarted()) return; if (appenderCount == 0) { addError("No attached appenders found."); return; } if (queueSize < 1) { addError("Invalid queue size [" + queueSize + "]"); return; } // 创建一个ArrayBlockingQueue阻塞队列,queueSize默认为256,创建阻塞队列的原因是:防止生产者过多,造成队列中元素过多,产生OOM异常 blockingQueue = new ArrayBlockingQueue<E>(queueSize); // 如果discardingThreshold未定义的话,默认为queueSize的1/5 if (discardingThreshold == UNDEFINED) discardingThreshold = queueSize / 5; addInfo("Setting discardingThreshold to " + discardingThreshold); // 将工作线程设置为守护线程,即当jvm停止时,即使队列中有未处理的元素,也不会在进行处理 worker.setDaemon(true); // 为线程设置name便于调试 worker.setName("AsyncAppender-Worker-" + getName()); // make sure this instance is marked as "started" before staring the worker Thread // 启动线程 super.start(); worker.start(); }
protected void append(E eventObject) { // 判断队列中的元素数量是否小于discardingThreshold,如果小于的话,并且日志等级小于info的话,则直接丢弃这些日志任务 if (isQueueBelowDiscardingThreshold() && isDiscardable(eventObject)) { return; } preprocess(eventObject); // 日志入队 put(eventObject); } private boolean isQueueBelowDiscardingThreshold() { return (blockingQueue.remainingCapacity() < discardingThreshold); } // 子类重写的方法 判断日志等级 protected boolean isDiscardable(ILoggingEvent event) { Level level = event.getLevel(); return level.toInt() <= Level.INFO_INT; }
private void put(E eventObject) { // 判断是否阻塞(默认为false),则会调用阻塞队列的put方法 if (neverBlock) { blockingQueue.offer(eventObject); } else { putUninterruptibly(eventObject); } } // 可中断的阻塞put方法 private void putUninterruptibly(E eventObject) { boolean interrupted = false; try { while (true) { try { blockingQueue.put(eventObject); break; } catch (InterruptedException e) { interrupted = true; } } } finally { if (interrupted) { Thread.currentThread().interrupt(); } } }
public void addAppender(Appender<E> newAppender) { if (appenderCount == 0) { appenderCount++; addInfo("Attaching appender named [" + newAppender.getName() + "] to AsyncAppender."); aai.addAppender(newAppender); } else { addWarn("One and only one appender may be attached to AsyncAppender."); addWarn("Ignoring additional appender named [" + newAppender.getName() + "]"); } }
class Worker extends Thread { public void run() { AsyncAppenderBase<E> parent = AsyncAppenderBase.this; AppenderAttachableImpl<E> aai = parent.aai; // loop while the parent is started 一直循环知道线程被中断 while (parent.isStarted()) { try {// 从阻塞队列中获取元素,交由给同步的appender将日志打印到磁盘 E e = parent.blockingQueue.take(); aai.appendLoopOnAppenders(e); } catch (InterruptedException ie) { break; } } addInfo("Worker thread will flush remaining events before exiting. "); //执行到这里说明该线程被中断,则把队列里边的剩余日志任务刷新到磁盘 for (E e : parent.blockingQueue) { aai.appendLoopOnAppenders(e); parent.blockingQueue.remove(e); } aai.detachAndStopAllAppenders(); } }
多线程高并发编程(12) -- 阻塞算法实现ArrayBlockingQueue源码分析(1)
标签:main 小数 info imp roc style 内存 名称 空间
原文地址:https://www.cnblogs.com/huangrenhui/p/13153667.html