标签:实现 并发 请求 设置 count image tor 原子性 初始
Semaphore 是一个计数信号量,必须由获取它的线程释放。用于管理一组资源,内部是基于AQS的共享模式。它相当于给线程规定一个量从而控制允许活动的线程数。
Semaphore 是 synchronized 的加强版,作用是控制线程的并发数量。就这一点而言,单纯的synchronized 关键字是实现不了的。
Semaphore是一种在多线程环境下使用的设施,该设施负责协调各个线程,以保证它们能够正确、合理的使用公共资源的设施,也是操作系统中用于控制进程同步互斥的量。
常用于限制可以访问某些资源的线程数量,例如通过 Semaphore 限流;实现一个文件允许的并发访问数。
Semaphore 可以用来限流(流量控制),在一些公共资源有限的场景下,Semaphore 可以派上用场。
比如在做日志清洗时,可能有几十个线程在并发清洗,但是将清洗的数据存入到数据库时,可能只给数据库分配了 10 个连接池,
这样两边的线程数就不对等了,必须保证同时只能有 10 个线程获取数据库链接,否则就会存在大量线程无法连接上数据库。
以一个停车场是运作为例。为了简单起见,假设停车场只有三个车位,一开始三个车位都是空的。
这时如果同时来了五辆车,看门人允许其中三辆不受阻碍的进入,然后放下车拦,剩下的车则必须在入口等待,此后来的车也都不得不在入口处等待。
这时,有一辆车离开停车场,看门人得知后,打开车拦,放入一辆,如果又离开两辆,则又可以放入两辆,如此往复。
这个停车系统中,每辆车就好比一个线程,看门人就好比一个信号量,看门人限制了可以活动的线程。
假如里面依然是三个车位,但是看门人改变了规则,要求每次只能停两辆车,那么一开始进入两辆车,后面得等到有车离开才能有车进入,但是得保证最多停两辆车。
对于Semaphore类而言,就如同一个看门人,限制了可活动的线程数。
Semaphore的主要方法摘要:
(1)void acquire():从此信号量获取一个许可,在提供一个许可前一直将线程阻塞,否则线程被中断。
(2)void release():释放一个许可,将其返回给信号量。
(3)int availablePermits():返回此信号量中当前可用的许可数。
(4)boolean hasQueuedThreads():查询是否有线程正在等待获取。
Semaphore 跟锁(synchronized、Lock)有点相似,不同的地方是,锁同一时刻只允许一个线程访问某一资源,而 Semaphore 则可以控制同一时刻多个线程访问某一资源。
Semaphore(信号量)并不是 Java 语言特有的,几乎所有的并发语言都有。所以也就存在一个信号量模型的概念,如下图所示:
信号量模型比较简单,可以概括为:一个计数器、一个队列、三个方法。
计数器:记录当前还可以运行多少个资源访问资源。
队列:待访问资源的线程
Semaphore 只有3个操作:初始化;增加;减少。
三个方法:
init():初始化计数器的值,可就是允许多少线程同时访问资源。
up():计数器加1,有线程归还资源时,如果计数器的值大于或者等于 0 时,从等待队列中唤醒一个线程。
down():计数器减 1,有线程占用资源时,如果此时计数器的值小于 0 ,线程将被阻塞。这三个方法都是原子性的,由实现方保证原子性。
Semaphore 是基于 AbstractQueuedSynchronizer 接口实现信号量模型的。
AbstractQueuedSynchronizer 提供了一个基于 FIFO 队列,可以用于构建锁或者其他相关同步装置的基础框架,利用了一个 int 来表示状态,通过类似 acquire 和 release 的方式来操纵状态。
在 Semaphore 类中,实现了两种信号量:公平的信号量和非公平的信号量,公平的信号量就是大家排好队,先到先进,非公平的信号量就是不一定先到先进,允许插队。
非公平的信号量效率会高一些,所以默认使用的是非公平信号量。具体的可以查看 Semaphore 类实现源码。
Semaphore用于管理信号量,在并发编程中,可以控制返访问同步代码的线程数量。Semaphore在实例化时传入一个int值,也就是指明信号数量。
主要方法有两个:acquire()和release()。acquire()用于请求信号,每调用一次,信号量便少一个。
release()用于释放信号,调用一次信号量加一个。信号量用完以后,后续使用acquire()方法请求信号的线程便会加入阻塞队列挂起。
Semaphore对于信号量的控制是基于AQS(AbstractQueuedSynchronizer)来做的。Semaphore有一个内部类Sync继承了AQS。
而且Semaphore中还有两个内部类FairSync和NonfairSync继承Sync,也就是说Semaphore有公平锁和非公平锁之分。
在 Semaphore 类中,实现了两种信号量:公平的信号量和非公平的信号量,公平的信号量就是大家排好队,先到先进,非公平的信号量就是不一定先到先进,允许插队。
非公平的信号量效率会高一些,所以默认使用的是非公平信号量。
以下是Semaphore中内部类的结构:
例子一:
每个人的个人信息,那么一个人占用一个线程,并用Semphore类创建对象从而初始化信号量,控制可活动的线程数。
具体代码如下:
package concurrent; import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.LinkedBlockingQueue; public class SemaphoreDemo {
private static final Semaphore semaphore=new Semaphore(3);
private static final ThreadPoolExecutor threadPool=new ThreadPoolExecutor(5,10,60,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>()); private static class InformationThread extends Thread{ private final String name; private final int age; public InformationThread(String name,int age) { this.name=name; this.age=age; } public void run() { try { semaphore.acquire(); System.out.println(Thread.currentThread().getName()+":大家好,我是"+name+"我今年"+age+"岁当前时间为:"+System.currentTimeMillis()); Thread.sleep(1000); System.out.println(name+"要准备释放许可证了,当前时间为:"+System.currentTimeMillis()); System.out.println("当前可使用的许可数为:"+semaphore.availablePermits()); semaphore.release(); } catch(InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) { String[] name= {"李明","王五","张杰","王强","赵二","李四","张三"}; int[] age= {26,27,33,45,19,23,41}; for(int i=0;i<7;i++) { Thread t1=new InformationThread(name[i],age[i]); threadPool.execute(t1); } } }
运行结果:
pool-1-thread-3:大家好,我是张杰我今年33岁当前时间为:1520424000186 pool-1-thread-1:大家好,我是李明我今年26岁当前时间为:1520424000186 pool-1-thread-2:大家好,我是王五我今年27岁当前时间为:1520424000186 张杰要准备释放许可证了,当前时间为:1520424001187 李明要准备释放许可证了,当前时间为:1520424001187 王五要准备释放许可证了,当前时间为:1520424001187 当前可使用的许可数为:0 当前可使用的许可数为:0 当前可使用的许可数为:0 pool-1-thread-4:大家好,我是王强我今年45岁当前时间为:1520424001187 pool-1-thread-2:大家好,我是张三我今年41岁当前时间为:1520424001187 pool-1-thread-1:大家好,我是李四我今年23岁当前时间为:1520424001187 李四要准备释放许可证了,当前时间为:1520424002187 王强要准备释放许可证了,当前时间为:1520424002187 当前可使用的许可数为:0 张三要准备释放许可证了,当前时间为:1520424002187 pool-1-thread-5:大家好,我是赵二我今年19岁当前时间为:1520424002187 当前可使用的许可数为:0 当前可使用的许可数为:0 赵二要准备释放许可证了,当前时间为:1520424003188 当前可使用的许可数为:2
以上是非公平信号量,将建立Semaphore对象的语句改为如下语句:
private static final Semaphore semaphore=new Semaphore(3,true);
运行结果:
pool-1-thread-2:大家好,我是王五我今年27岁当前时间为:1520424286454 pool-1-thread-3:大家好,我是张杰我今年33岁当前时间为:1520424286454 pool-1-thread-1:大家好,我是李明我今年26岁当前时间为:1520424286454 pool-1-thread-1:李明要准备释放许可证了,当前时间为:1520424287455 当前可使用的许可数为:0 pool-1-thread-2:王五要准备释放许可证了,当前时间为:1520424287455 pool-1-thread-3:张杰要准备释放许可证了,当前时间为:1520424287455 当前可使用的许可数为:0 当前可使用的许可数为:1 pool-1-thread-1:大家好,我是李四我今年23岁当前时间为:1520424287455 pool-1-thread-5:大家好,我是赵二我今年19岁当前时间为:1520424287455 pool-1-thread-4:大家好,我是王强我今年45岁当前时间为:1520424287455 pool-1-thread-4:王强要准备释放许可证了,当前时间为:1520424288456 当前可使用的许可数为:0 pool-1-thread-1:李四要准备释放许可证了,当前时间为:1520424288456 pool-1-thread-3:大家好,我是张三我今年41岁当前时间为:1520424288456 pool-1-thread-5:赵二要准备释放许可证了,当前时间为:1520424288456 当前可使用的许可数为:0 当前可使用的许可数为:0 pool-1-thread-3:张三要准备释放许可证了,当前时间为:1520424289456 当前可使用的许可数为:2
将创建信号量对象语句修改如下:
private static final Semaphore semaphore=new Semaphore(1);
运行结果:
pool-1-thread-1:大家好,我是李明我今年26岁当前时间为:1520424379699 pool-1-thread-1:李明要准备释放许可证了,当前时间为:1520424380700 当前可使用的许可数为:0 pool-1-thread-2:大家好,我是王五我今年27岁当前时间为:1520424380700 pool-1-thread-2:王五要准备释放许可证了,当前时间为:1520424381701 当前可使用的许可数为:0 pool-1-thread-3:大家好,我是张杰我今年33岁当前时间为:1520424381701 pool-1-thread-3:张杰要准备释放许可证了,当前时间为:1520424382702 当前可使用的许可数为:0 pool-1-thread-4:大家好,我是王强我今年45岁当前时间为:1520424382702 pool-1-thread-4:王强要准备释放许可证了,当前时间为:1520424383702 当前可使用的许可数为:0 pool-1-thread-5:大家好,我是赵二我今年19岁当前时间为:1520424383702 pool-1-thread-5:赵二要准备释放许可证了,当前时间为:1520424384702 当前可使用的许可数为:0 pool-1-thread-1:大家好,我是李四我今年23岁当前时间为:1520424384702 pool-1-thread-1:李四要准备释放许可证了,当前时间为:1520424385702 当前可使用的许可数为:0 pool-1-thread-2:大家好,我是张三我今年41岁当前时间为:1520424385702 pool-1-thread-2:张三要准备释放许可证了,当前时间为:1520424386703 当前可使用的许可数为:0
如上可知,如果将给定许可数设置为1,就如同一个单例模式。
例子二:
假如有一个需求,要读取几万个文件的数据,因为都是IO密集型任务,可以启动几十个线程并发的读取,但是如果读到内存后,还需要存储到数据库中,
而数据库的连接数只有10个,这时必须控制只有十个线程同时获取数据库连接保存数据,否则会报错无法获取数据库连接。
代码如下:
public class SemaphoreTest { private static final int THREAD_COUNT = 30; private static ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT); private static Semaphore s = new Semaphore(10); public static void main(String[] args) { for (int i = 0; i < THREAD_COUNT; i++) { threadPool.execute(new Runnable() { @Override public void run() { try { s.acquire(); System.out.println("save data"); s.release(); } catch (InterruptedException e) { } } }); } threadPool.shutdown(); } }
在代码中,虽然有30个线程在执行,但是只允许10个并发的执行。Semaphore的构造方法Semaphore(int permits) 接受一个整型的数字,表示可用的许可证数量。
Semaphore(10)表示允许10个线程获取许可证,也就是最大并发数是10。
Semaphore的用法也很简单,首先线程使用Semaphore的acquire()获取一个许可证,使用完之后调用release()归还许可证。还可以用tryAcquire()方法尝试获取许可证。
标签:实现 并发 请求 设置 count image tor 原子性 初始
原文地址:https://www.cnblogs.com/ZJOE80/p/12890514.html