标签:oid 数据 多线程 cte release 延迟 请求 on() 远程服务
Java 中的线程池(ThreadPoolExecutor)我们都知道(不知道请自行搜索),它的执行机制简单讲就是多个线程不停的从队列里面取任务执行。但是我们可能遇到下面这样的场景:
我有一批数据要通过线程池来处理,处理过程中需要调用某个远程服务。但该服务存在调用频率限制,比如每秒钟最多调用 50 次,超过这个阈值将返回错误信息。
这是否意味着我们不应该用多线程了呢?不是,在这个场景中,我们要保证的是以间隔不低于 20ms 的频率发起请求,至于处理时间,不管是几百甚至几千毫秒,都不影响发起请求的频率,因此多线程是必要的。
默认的线程池(ThreadPoolExecutor)没有按固定频率执行任务的特性,有的同学可能会想到 ScheduledThreadPoolExecutor,但是很可惜这个类也不能用,别看它名字里面带了计划任务的特性,但这个是用来反复执行同一个任务的,而我们的场景是一个任务只执行一次。
当然也有的同学会想到一种方案,依旧使用 ScheduledThreadPoolExecutor,但是将任务队列外部化(即不使用 ScheduledThreadPoolExecutor 的内部任务队列),然后 ScheduledThreadPoolExecutor 的任务本身就是从外部队列取任务执行。
这种方案是可行的,但是抛开实现起来过于复杂不说,线程池的执行机制也会遭到破坏,比如说我们本来可以通过 shutdown()
和 awaitTermination()
来等待线程池队列全部执行完,令线程池安全关闭;但若任务队列外部化,这点就做不到了,因为线程池会立刻关闭,不会再处理外部队列中的剩余任务。
这里有一个相对简单的解决方案。好在 ThreadPoolExecutor 给我们提供了 beforeExecute()
这样一个扩展点,我们可以通过继承 ThreadPoolExecutor,覆写这个方法来实现执行频率的限制:
由此可见,这样的设计既实现了执行频率限制,又保持了任务执行本身的并行性,同时线程池的执行机制没有受到影响。
代码实现起来不复杂,如下:
public class FundThreadPoolExecutor extends ThreadPoolExecutor { private int fixedRateMillis; private final Semaphore fixedRateSemaphore = new Semaphore(1); // 设置执行频率限制的延迟时间(ms) public void setFixedRateMillis(int fixedRateMillis) { this.fixedRateMillis = fixedRateMillis; } public FundThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); } public FundThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory); } public FundThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler); } public FundThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); } @Override protected void beforeExecute(Thread t, Runnable r) { if (this.fixedRateMillis > 0) { try { this.fixedRateSemaphore.acquire(); Thread.sleep(this.fixedRateMillis); } catch (InterruptedException e) { // ignore this } finally { this.fixedRateSemaphore.release(); } } } }
标签:oid 数据 多线程 cte release 延迟 请求 on() 远程服务
原文地址:https://www.cnblogs.com/kehoudaanxiangjie/p/12918405.html