码迷,mamicode.com
首页 > 其他好文 > 详细

JDK 源码解析 —— Semaphore

时间:2016-05-03 12:46:27      阅读:362      评论:0      收藏:0      [点我收藏+]

标签:

零. 简介
这是一个用来对并发计数的信号量,并发量超过一定数值则只能等待。从概念上来说,semaphore 维持着一组许可证。获取锁的时候,需要先获得 semaphore 的许可才行。



一. 从 Demo 解析源码

package com.wenniuwuren.concurrent;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

/**
 * Created by zhuyb on 16/5/1.
 */
public class SemaphoreTest {

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newCachedThreadPool();

        Semaphore semaphore = new Semaphore(5);

        for (int i = 0; i < 10; i++) {
            int count = i;
            Runnable runnable = new Runnable() {
                @Override
                public void run() {
                    try {
                        semaphore.acquire(); // 获取许可
                        System.out.println("当前循环:" + count);
                        System.out.println("当前还剩多少许可数量:" + semaphore.availablePermits());
                        Thread.sleep(10000);
                        semaphore.release(); // 释放占用的许可
                    } catch (Exception e) {
                        e.printStackTrace();
                    }

                }
            };
            executorService.execute(runnable);
        }
        executorService.shutdown();
    }
}

(1) 先看 Semaphore semaphore = new Semaphore(5) 的构造函数
默认使用非公平锁,调用的是继承自 AQS 的内部类 NonfairSync 
public Semaphore(int permits) {
    sync = new NonfairSync(permits);
}

看下 NonfairSync 的具体实现:构造函数调用父类来初始化,其实就是 AQS 的 Sync 构造函数

static final class NonfairSync extends Sync {
    private static final long serialVersionUID = -2694183684443567898L;

    NonfairSync(int permits) {
        super(permits);
    }

    protected int tryAcquireShared(int acquires) {
        return nonfairTryAcquireShared(acquires);
    }
}


AQS 的 Sync 构造函数:设置 AQS 的同步状态 stat

abstract static class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 1192457210091910933L;

    Sync(int permits) {
        setState(permits);
    }

// 省略无用代码
}


AQS 的状态值由具体调用的类来定义 stat 的含义,对于 Semaphore 来说 stat 的数量含义就是可以有多少个线程并发使用某个资源

protected final void setState(int newState) {
    state = newState;
}

(2)semaphore.acquire(); // 获取许可
public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}


acquireSharedInterruptibly 从代码看出来是响应中断的,再看下 tryAcquireShared(arg):AQS 中 tryAcquireShared 没有具体实现,交给具体的继承类去实现

protected int tryAcquireShared(int arg) {
    throw new UnsupportedOperationException();
}

下面是 Semaphore 的 tryAcquireShared 具体实现:可以看到用当前的 stat 值减去传入的 1。举个具体的例子,如果是第一次调用 semaphore.acquire(), 则 available 就等于初始化  Semaphore 时候的值,然后减去 acquires=1,如果小于零就要调用 doAcquireSharedInterruptibly(arg) 这个方法是在 AQS 的 FIFO 队列中排队;如果大于零则 CAS 更新 AQS 的 stat 值,说明线程获得了 Semaphore 的许可,可以成功执行
protected int tryAcquireShared(int acquires) {
    return nonfairTryAcquireShared(acquires);
}

final int nonfairTryAcquireShared(int acquires) {
    for (;;) {
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}


(3)semaphore.release(); // 释放占用的许可

public void release() {
    sync.releaseShared(1);
}

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}


tryReleaseShared(arg) 和 tryAcquireShared() 一样都是需要具体类自己实现的,这样才能由该类定义 stat 的具体含义:参数 releases =1 表示释放一个 Semaphore, CAS 设置新的 stat 值

protected final boolean tryReleaseShared(int releases) {
    for (;;) {
        int current = getState();
        int next = current + releases;
        if (next < current) // overflow
            throw new Error("Maximum permit count exceeded");
        if (compareAndSetState(current, next))
            return true;
    }
}


接下来看 tryReleaseShared 返回 true 后进入的 doReleaseShared():与互斥锁不同的是,共享锁在释放锁的时候会将共享锁释放信息向 AQS 的队列中传播这个共享锁已经释放的 SIGNAL,这样等待这个共享锁的线程就能较快地脱离 AQS 的等待队列。从 ws == Node.SIGNAL 分支执行的就是共享锁的向后传递 SIGNAL 方法 unparkSuccessor(h)。最外面的 for(;;)就是为了保证释放锁的正常进行,异常就是循环重试
private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}



二. 总结
Semaphore 是借助 AQS(AbstractQueuedSynchronizer) 这个同步控制器来实现共享锁的获取和释放,AQS 中的同步变量 stat 在 Semaphore 的意思就是并发的数量,线程并发数量超过这个 stat 总数,之后的线程只能进入 AQS 的等待队列直到其他线程释放这个 stat,然后公平地排队获取锁或者非公平地抢占锁。 




JDK 源码解析 —— Semaphore

标签:

原文地址:http://blog.csdn.net/wenniuwuren/article/details/51302745

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!