接上一篇( http://my.oschina.net/haogrgr/blog/490266 )
8. Worker代码走读.
//主要负责累加tick, 执行到期任务等. private final class Worker implements Runnable { private final Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>(); private long tick; @Override public void run() { //初始化startTime, startTime只是一个起始时间的标记, 任务的deadline是相对这个时间点来的. startTime = System.nanoTime(); //因为nanoTime返回值可能为0, 甚至负数, 所以这时赋值为1, Timer中start方法会判断该值, 直到不为0才跳出循环. if (startTime == 0) { startTime = 1; } //唤醒阻塞在Timer.start()方法上的线程, 表示已经启动完成. startTimeInitialized.countDown(); //只要还是启动状态, 就一直循环 do { //waitForNextTick方法主要是计算下次tick的时间, 然后sleep到下次tick //返回值就是System.nanoTime() - startTime, 也就是Timer启动后到这次tick, 所过去的时间 final long deadline = waitForNextTick(); if (deadline > 0) {//可能溢出, 所以小于等于0不管 //获取index, 原理见Timer的构造方法注释, 等价于 tick % wheel.length int idx = (int) (tick & mask); //移除cancel了的task, 具体可以见HashedWheelTimeout.cancel()方法注释 processCancelledTasks(); //当前tick对应的wheel HashedWheelBucket bucket = wheel[idx]; //因为添加任务是先加入到timeouts队列中, 而这里就是将任务从队列中取出, 放到对应的bucket中 transferTimeoutsToBuckets(); //见上篇HashedWheelBucket.expireTimeouts()方法的注释 //具体是根据当前的deadline, 判断bucket中的人物是否到期, 到期的任务就执行, 没到期的, 就将人物轮数减一. //正常情况下, 一个bucket在一轮中, 只会执行一次expireTimeouts方法. bucket.expireTimeouts(deadline); //累加tick tick++; } } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED); //返回调用stop()时, 还未处理的任务. for (HashedWheelBucket bucket: wheel) { bucket.clearTimeouts(unprocessedTimeouts); } //加上还没来得及放入bucket中的任务 for (;;) { HashedWheelTimeout timeout = timeouts.poll(); if (timeout == null) { break; } if (!timeout.isCancelled()) { unprocessedTimeouts.add(timeout); } } //最好移除下cancel了的task processCancelledTasks(); } //将Timer.newTimeout()调用放入到timeouts时的任务放入到对应的bucket中 private void transferTimeoutsToBuckets() { //一次tick, 最多放入10w任务, 防止太多了, 造成worker线程在这里停留太久. for (int i = 0; i < 100000; i++) { HashedWheelTimeout timeout = timeouts.poll(); if (timeout == null) { //全部处理完了, 退出循环 break; } if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) { //还没加入到bucket中, 就取消了, 继续. continue; } //calculated 表示任务要经过多少个tick long calculated = timeout.deadline / tickDuration; //设置任务要经过的轮数 timeout.remainingRounds = (calculated - tick) / wheel.length; //如果任务在timeouts队列里面放久了, 以至于已经过了执行时间, 这个时候就使用当前tick, 也就是放到当前bucket, 于是方法调用完后就会执行. final long ticks = Math.max(calculated, tick); int stopIndex = (int) (ticks & mask);//同样, 类似于ticks % wheel.length //这时任务所在的bucket在wheel中的位置就表示, 经过n轮后, 还需要多少次tick才执行. HashedWheelBucket bucket = wheel[stopIndex]; bucket.addTimeout(timeout);//将timeout加入到链表 } } //将cancel任务从队列中取出, 并执行cancel操作, 具体可以见HashedWheelTimeout.cancel()方法注释. private void processCancelledTasks() { for (;;) { Runnable task = cancelledTimeouts.poll(); if (task == null) { // all processed break; } try { task.run(); } catch (Throwable t) { if (logger.isWarnEnabled()) { logger.warn("An exception was thrown while process a cancellation task", t); } } } } /** * calculate goal nanoTime from startTime and current tick number, * then wait until that goal has been reached. * @return Long.MIN_VALUE if received a shutdown request, * current time otherwise (with Long.MIN_VALUE changed by +1) */ //sleep, 直到下次tick到来, 然后返回该次tick和启动时间之间的时长 private long waitForNextTick() { //下次tick的时间点, 用于计算需要sleep的时间 long deadline = tickDuration * (tick + 1); //循环, 直到HashedWheelTimer被stop, 或者到了下个tick for (;;) { //计算需要sleep的时间, 之所以加9999999后再除10000000, 是因为保证为10毫秒的倍数. final long currentTime = System.nanoTime() - startTime; long sleepTimeMs = (deadline - currentTime + 999999) / 1000000; if (sleepTimeMs <= 0) {//小于等于0, 表示本次tick已经到了, 返回. if (currentTime == Long.MIN_VALUE) { return -Long.MAX_VALUE; //不懂不懂, 我不懂...估计又是nanoTime的问题. } else { return currentTime; //返回过去的时间. } } // Check if we run on windows, as if thats the case we will need // to round the sleepTime as workaround for a bug that only affect // the JVM if it runs on windows. // // See https://github.com/netty/netty/issues/356 if (PlatformDependent.isWindows()) {//不多说, 一个字, 屌 sleepTimeMs = sleepTimeMs / 10 * 10; } try { Thread.sleep(sleepTimeMs);//睡吧 } catch (InterruptedException ignored) { //当调用Timer.stop时, 退出 if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) { return Long.MIN_VALUE; } } } } public Set<Timeout> unprocessedTimeouts() { return Collections.unmodifiableSet(unprocessedTimeouts); } }
终于搞完了, 具体看注释吧, 很详细的注释.
9. 备注.
在看代码的时候, 大部分都好, 但是有些代码看的很困惑, 比如说
startTime = System.nanoTime(); if (startTime == 0) { // We use 0 as an indicator for the uninitialized value here, so make sure it‘s not 0 when initialized. startTime = 1; } //startTime 是volatile的, 然后没有其他地方修改startTime, 为什么这里还要判断下是否为0...
然后我就去群里问了问, 最后定位到是nanoTime的问题, API文档说, 它连负数都可能返回~~~~
jlong os::javaTimeNanos() { if (Linux::supports_monotonic_clock()) { struct timespec tp; int status = Linux::clock_gettime(CLOCK_MONOTONIC, &tp); assert(status == 0, "gettime error"); jlong result = jlong(tp.tv_sec) * (1000 * 1000 * 1000) + jlong(tp.tv_nsec); return result; } else { timeval time; int status = gettimeofday(&time, NULL); assert(status != -1, "linux error"); jlong usecs = jlong(time.tv_sec) * (1000 * 1000) + jlong(time.tv_usec); return 1000 * usecs; } }
北京-菜鸟多年 : 看了下 代码 原来这个 clock_gettime函数 可能会发生时间回绕
北京-菜鸟多年 : 然后 获得的纳秒 就变成0了
北京-菜鸟多年 : 不过, 需要很长时间
北京-菜鸟多年 : 时间是递增的 递增到一定地步就溢出了 然后就从0开始
北京-菜鸟多年 : timespec 这里面 秒和纳秒分开存储的
北京-菜鸟多年 : 就是为了延长回绕出现的几率
北京-菜鸟多年 : else那个分支 就是 currentTimeMillis() 的实现
北京-菜鸟多年 : 不过 是 纳秒级别
北京-菜鸟多年 : 和currentTimeMillis算法一样的, 性能要慢些
10. 总结.
任务里不要有太耗时的操作, 否则会阻塞Worker线程, 导致tick不准.
Wheel Timer, 确实是很精巧的算法, Netty实现的HashedWheelTimer也是经过大神们极致的优化而来的.