标签:netty nio 连接中断处理 流量整形 trafficcounter
在客户端和服务端建立起连接之后,如果连接发生了意外中断,Netty也会及时释放连接句柄资源(因为TCP是全双工协议,通信双方都需要关闭和释放Socket句柄才不会发生句柄的泄漏,如不经过特殊处理是会发生句柄泄露的),原理如下:
在读取数据时会调用io.netty.buffer.AbstractByteBuf.writeBytes(ScatteringByteChannel, int),然后调用io.netty.buffer.ByteBuf.setBytes(int, ScatteringByteChannel, int),setBytes方法调用nio.channel.read,如果当前连接已经意外中断,会收到JDK NIO层抛出的ClosedChannelException异常,setBytes方法捕获该异常之后直接返回-1,
在NioByteUnsafe.read方法中,发现当前读取到的字节长度为-1,即调用io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe.closeOnRead(ChannelPipeline)方法,然后调用io.netty.channel.AbstractChannel.AbstractUnsafe.close(ChannelPromise)关闭连接释放句柄资源。参考相关的代码:
//NioByteUnsafe.read方法 public void read() { ... boolean close = false; try { ... do { ... int localReadAmount = doReadBytes(byteBuf); if (localReadAmount <= 0) { ... close = localReadAmount < 0; break; } ... } while (...); ... if (close) { closeOnRead(pipeline); close = false; } } catch (Throwable t) { ... } finally { ... } } //NioSocketChannel.doReadBytes方法 protected int doReadBytes(ByteBuf byteBuf) throws Exception { return byteBuf.writeBytes(javaChannel(), byteBuf.writableBytes()); } //AbstractByteBuf.writeBytes方法 public int writeBytes(ScatteringByteChannel in, int length) throws IOException { ensureWritable(length); int writtenBytes = setBytes(writerIndex, in, length); if (writtenBytes > 0) { writerIndex += writtenBytes; } return writtenBytes; } //UnpooledHeapByteBuf.setBytes方法 public int setBytes(int index, ScatteringByteChannel in, int length) throws IOException { ensureAccessible(); try { return in.read((ByteBuffer) internalNioBuffer().clear().position(index).limit(index + length)); } catch (ClosedChannelException e) { return -1; } }
一般大型的系统都包含多个模块,在部署时不同的模块可能部署在不同的机器上,比如我司的项目,至少5个部件起,少了都不好意思拿出去见人。这种情况下系统运行时会涉及到大量的上下游部件的通信,但是由于不同服务器无论是从硬件配置,还是系统模块的业务特性都会存在差异,这就导致到服务器的处理能力,以及不同时间段服务器的负载都是有差异的,这就可能会导致问题:上下游消息的传递速度和下游部件的消息处理速度失去平衡,下游部件接收到的消息量远远超过了它的处理能力,导致大量的业务无法被及时的处理,甚至可能导致下游服务器被压垮。
在Netty框架中提供了流量整形处理机制来应付这种场景,通过控制服务器单位时间内发送/接收消息的字节数来使上下游服务器处理相对平衡的状态。Netty中的流量整形包含了两种:一种是针对单个连接的流量整形,另一种是针对全局即所有连接的流量整形。这两种方式的流量整形原理是类似的,只是流量整形器的作用域不同,一个是全局的,一个是连接建立后创建,连接关闭后被回收。GlobalTrafficShapingHandler处理全局流量整形,ChannelTrafficShapingHandler处理单链路流量整形,流量整形处理有三个重要的参数:
以读操作为例,流量整形的工作过程大致如下:
下面是相关的代码:
//AbstractTrafficShapingHandler.channelRead方法 public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception { long size = calculateSize(msg); long curtime = System.currentTimeMillis(); if (trafficCounter != null) { //增加字节累计数 trafficCounter.bytesRecvFlowControl(size); if (readLimit == 0) { // no action ctx.fireChannelRead(msg); return; } // compute the number of ms to wait before reopening the channel long wait = getTimeToWait(readLimit, trafficCounter.currentReadBytes(), trafficCounter.lastTime(), curtime); if (wait >= MINIMAL_WAIT) { // At least 10ms seems a minimal // time in order to // try to limit the traffic if (!isSuspended(ctx)) { ctx.attr(READ_SUSPENDED).set(true); // Create a Runnable to reactive the read if needed. If one was create before it will just be // reused to limit object creation Attribute<Runnable> attr = ctx.attr(REOPEN_TASK); Runnable reopenTask = attr.get(); if (reopenTask == null) { reopenTask = new ReopenReadTimerTask(ctx); attr.set(reopenTask); } ctx.executor().schedule(reopenTask, wait, TimeUnit.MILLISECONDS); } else { // Create a Runnable to update the next handler in the chain. If one was create before it will // just be reused to limit object creation Runnable bufferUpdateTask = new Runnable() { @Override public void run() { ctx.fireChannelRead(msg); } }; ctx.executor().schedule(bufferUpdateTask, wait, TimeUnit.MILLISECONDS); return; } } } ctx.fireChannelRead(msg); } //AbstractTrafficShapingHandler.getTimeToWait方法 private static long getTimeToWait(long limit, long bytes, long lastTime, long curtime) { long interval = curtime - lastTime; if (interval <= 0) { // Time is too short, so just lets continue return 0; } return (bytes * 1000 / limit - interval) / 10 * 10; } private static class TrafficMonitoringTask implements Runnable { ... @Override public void run() { if (!counter.monitorActive.get()) { return; } long endTime = System.currentTimeMillis(); //还原累计字节数,lastTime等变量 counter.resetAccounting(endTime); if (trafficShapingHandler1 != null) { trafficShapingHandler1.doAccounting(counter); } counter.scheduledFuture = counter.executor.schedule(this, counter.checkInterval.get(), TimeUnit.MILLISECONDS); } }
标签:netty nio 连接中断处理 流量整形 trafficcounter
原文地址:http://blog.csdn.net/pentiumchen/article/details/45133189