标签:一个 void exec creat 任务 only 获得 exce notify
pulsar 实现了一个 RateLimiter 来限制 dispatch 的速率。
大体思路是:初始有 n 个令牌,当令牌被申请完了后,其他人就无法获得令牌了,每隔一段时间 t 会清零已分配的令牌数。
所以,记住这 2 个参数即可。
通过一个测试用例,观察 RateLimiter 的用法。
// org.apache.pulsar.common.util.RateLimiterTest#testMultipleAcquire public void testMultipleAcquire() throws Exception { // 每过 1000ms 重置令牌数 final long rateTimeMSec = 1000; // 令牌总数为 100 final int permits = 100; final int acquirePermist = 50; RateLimiter rate = new RateLimiter(permits, rateTimeMSec, TimeUnit.MILLISECONDS); long start = System.currentTimeMillis(); for (int i = 0; i < permits / acquirePermist; i++) { // 1 次获取 50 个令牌,2 次申请完 100 个令牌 rate.acquire(acquirePermist); } long end = System.currentTimeMillis(); assertTrue((end - start) < rateTimeMSec); // 时间还不到 1000ms,令牌没有重置,则可用令牌仍为 0 assertEquals(rate.getAvailablePermits(), 0); rate.close(); }
接下来看下实现:
org.apache.pulsar.common.util.RateLimiter // 令牌总数 private long permits; // 当前已分配的令牌数 private long acquiredPermits; // 清理已分配令牌数的定时任务 1. 线程池 2. 定时任务 3. 间隔时间 private final ScheduledExecutorService executorService; private ScheduledFuture<?> renewTask; private long rateTime; // 提供了一个接口用来修改令牌总数 private Supplier<Long> permitUpdater;
申请 acquirePermit 个令牌,自旋模式
public synchronized void acquire(long acquirePermit) throws InterruptedException { checkArgument(!isClosed(), "Rate limiter is already shutdown"); checkArgument(acquirePermit <= this.permits, "acquiring permits must be less or equal than initialized rate =" + this.permits); // 如果还没创建清理定时任务,则创建它 if (renewTask == null) { renewTask = createTask(); } boolean canAcquire = false; do { // 如果已分配令牌数小于总令牌数,可以分配 canAcquire = acquirePermit < 0 || acquiredPermits < this.permits; if (!canAcquire) { // 阻塞当前线程 wait(); } else { // 增加已分配令牌数 acquiredPermits += acquirePermit; } } while (!canAcquire); }
申请 acquirePermit 个令牌,快速失败模式
public synchronized boolean tryAcquire(long acquirePermit) { checkArgument(!isClosed(), "Rate limiter is already shutdown"); // lazy init and start task only once application start using it if (renewTask == null) { renewTask = createTask(); } // acquired-permits can‘t be larger than the rate if (acquirePermit > this.permits) { acquiredPermits = this.permits; return false; } // 这里并没有严格限制,无所谓,没必要太精确 boolean canAcquire = acquirePermit < 0 || acquiredPermits < this.permits; if (canAcquire) { acquiredPermits += acquirePermit; } return canAcquire; }
创建清理分配令牌数的定时任务
protected ScheduledFuture<?> createTask() { return executorService.scheduleAtFixedRate(this::renew, this.rateTime, this.rateTime, this.timeUnit); }
清理已分配令牌数
synchronized void renew() { // 直接重置为 0 acquiredPermits = 0; // 如果提供了这个对象,则用它的值来设置总令牌数 if (permitUpdater != null) { long newPermitRate = permitUpdater.get(); if (newPermitRate > 0) { setRate(newPermitRate); } } // 唤醒所有等待当前对象的线程 notifyAll(); }
标签:一个 void exec creat 任务 only 获得 exce notify
原文地址:https://www.cnblogs.com/allenwas3/p/12764839.html