标签:读取 jdk ioc use complete 继承 group res static
ChannelPipeline = Channel + Pipeline,也就是说首先它与Channel绑定,然后它是起到类似于管道的作用:字节流在ChannelPipeline上流动,流动的过程中被ChannelHandler修饰,最终输出。
ChannelPipeline只有两个子类,直接一起放上来好了,其中EmbeddedChannelPipeline主要用于测试,本文只介绍DefaultChannelPipeline
跟踪一下DefaultChannelPipeline的构造方法就能发现
ChannelPipeline是在AbstractChannel的构造方法中被初始化的,而AbstractChannel的构造方法有两个,我只选取其中一个做分析了:
AbstractChannel() protected AbstractChannel(Channel parent) { this.parent = parent; id = newId(); unsafe = newUnsafe(); pipeline = newChannelPipeline(); } AbstractChannel.newChannelPipeline() protected DefaultChannelPipeline newChannelPipeline() { return new DefaultChannelPipeline(this); } protected DefaultChannelPipeline(Channel channel) { this.channel = ObjectUtil.checkNotNull(channel, "channel"); succeededFuture = new SucceededChannelFuture(channel, null); voidPromise = new VoidChannelPromise(channel, true); tail = new TailContext(this); head = new HeadContext(this); head.next = tail; tail.prev = head; }
可以看到,DefaultChannelPipeline中维护了对关联的Channel的引用
而且,Pipeline内部维护了一个双向链表,head是链表的头,tail是链表的尾,链表指针是AbstractChannelHandlerContext类型。
在DefaultChannelPipeline初始化完成的时候,其内部结构是下面这个样子的:
HeadContext <----> TailContext
HeadContext与TailContext都继承于AbstractChannelHandlerContext,基本上是起到占位符的效果,没有什么功能性的作用。
举一个很简单的例子:
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast(new Decoder());//解码 p.addLast(new BusinessHandler())//业务逻辑 p.addLast(new Encoder());//编码 } });
在pipeline中,从网卡收到的数据流先被Decoder解码,然后被BusinessHandler处理,然后再被Encoder编码,最后写回到网卡中。
此时pipeline的内部结构为:
HeadContext <----> Decoder <----> BusinessHandler <----> Encoder <----> TailContext
pipeline的修改,是在Channel初始化时,由ChannelInitializer进行的。
ChannelInitializer调用用户自定义的initChannel方法,然后调用Pipeline.addLast()方法修改pipeline的结构,关键代码位于DefaultChannelPipeline.addLast()中:
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) { final AbstractChannelHandlerContext newCtx; synchronized (this) {//很重要,用当前的DefaultChannelPipeline作为同步对象,使pipeline的addLast方法串行化 checkMultiplicity(handler);//禁止非Sharable的handler被重复add到不同的pipeline中 newCtx = newContext(group, filterName(name, handler), handler);//将Handler包装成DefaultChannelHandlerContext并插入pipeline中 addLast0(newCtx);//将ChannelHandlerContext插入pipeline // If the registered is false it means that the channel was not registered on an eventloop yet. // In this case we add the context to the pipeline and add a task that will call // ChannelHandler.handlerAdded(...) once the channel is registered. if (!registered) {//如果channel没有与eventloop绑定,则创建一个任务,这个任务会在channel被register的时候调用 newCtx.setAddPending(); callHandlerCallbackLater(newCtx, true); return this; } EventExecutor executor = newCtx.executor(); if (!executor.inEventLoop()) { newCtx.setAddPending(); executor.execute(new Runnable() { @Override public void run() { callHandlerAdded0(newCtx); } }); return this; } } callHandlerAdded0(newCtx); return this; } private void addLast0(AbstractChannelHandlerContext newCtx) {//向双链表插入节点 AbstractChannelHandlerContext prev = tail.prev; newCtx.prev = prev; newCtx.next = tail; prev.next = newCtx; tail.prev = newCtx; } private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) { try { ctx.handler().handlerAdded(ctx);//触发handler的handlerAdded回调函数 ctx.setAddComplete(); } catch (Throwable t) {//异常处理 boolean removed = false; try { remove0(ctx); try { ctx.handler().handlerRemoved(ctx); } finally { ctx.setRemoved(); } removed = true; } catch (Throwable t2) { if (logger.isWarnEnabled()) { logger.warn("Failed to remove a handler: " + ctx.name(), t2); } } if (removed) { fireExceptionCaught(new ChannelPipelineException( ctx.handler().getClass().getName() + ".handlerAdded() has thrown an exception; removed.", t)); } else { fireExceptionCaught(new ChannelPipelineException( ctx.handler().getClass().getName() + ".handlerAdded() has thrown an exception; also failed to remove.", t)); } } }
小结:
a. addLast方法的作用就是将传入的handler添加到当前Pipeline的双向链表中
b. 在后续处理的时候,可以通过遍历链表,找到channel关联的pipeline上注册的所有handler
c. 贴出的代码中没有涉及,但又很重要的一点是:在添加handler的过程中,会根据handler继承于ChannelInboundHandler或者ChannelOutboundHandler来判定这个handler是用于处理in事件还是out事件的,然后会以此为依据来设置AbstractChannelHandlerContext的inbound和outbound位。
在第三节的示例中,如果一个已经register完毕的Channel收到一个数据包,会发生什么事情呢?
首先,这个Channel必然是与某个NioEventLoop绑定的,这个Channel上的可读事件会触发NioEventLoop.processSelectedKey方法:
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); ........... try{ // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead // to a spin loop if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read();//触发 } } catch (CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); } }
可以看到这会触发AbstractNioChannel.NioUnsafe的read方法,其实现位于AbstractNioByteChannel.NioByteUnsafe中:
@Override public final void read() { ........... do { byteBuf = allocHandle.allocate(allocator);//创建一个ByteBuf作为缓冲区 allocHandle.lastBytesRead(doReadBytes(byteBuf));//读取数据到ByteBuf if (allocHandle.lastBytesRead() <= 0) { // nothing was read. release the buffer. byteBuf.release(); byteBuf = null; close = allocHandle.lastBytesRead() < 0; break; } allocHandle.incMessagesRead(1); readPending = false; pipeline.fireChannelRead(byteBuf);//触发pipeline的读事件 byteBuf = null; } while (allocHandle.continueReading()); allocHandle.readComplete(); pipeline.fireChannelReadComplete(); ............. } }
fireChannelRead的实现位于DefaultChannelPipeline中:
DefaultChannelPipeline.fireChannelRead() @Override s0. public final ChannelPipeline fireChannelRead(Object msg) { AbstractChannelHandlerContext.invokeChannelRead(head, msg);//这里传入的是pipeline的head节点 return this; } AbstractChannelHandlerContext.invokeChannelRead() s1. static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) { final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);//这行代码可能是为了检查内存泄漏 EventExecutor executor = next.executor(); if (executor.inEventLoop()) {//同步或者异步的调用传入的AbstractChannelHandlerContext的invokeChannelRead方法, next.invokeChannelRead(m); } else { executor.execute(new Runnable() { @Override public void run() { next.invokeChannelRead(m); } }); } } s2. private void invokeChannelRead(Object msg) { if (invokeHandler()) { try { ((ChannelInboundHandler) handler()).channelRead(this, msg);//第一次调用时,handler为HeadContext,后续调用时为pipeline中自定义的类型为inbound的handler } catch (Throwable t) { notifyHandlerException(t); } } else { fireChannelRead(msg); } } HeadContext.channelRead() @Override s3. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ctx.fireChannelRead(msg); } AbstractChannelHandlerContext.fireChannelRead() @Override s4. public ChannelHandlerContext fireChannelRead(final Object msg) { invokeChannelRead(findContextInbound(), msg);//先找到pipeline上当前AbstractChannelHandlerContext节点之后的第一个inbound类型的AbstractChannelHandlerContext,然后调用其invokeChannelRead()方法,这样就又调转回到s1了 return this; } //从pipeline的当前AbstractChannelHandlerContext向后遍历,找到第一个类型为inbound的AbstractChannelHandlerContext节点 s5. private AbstractChannelHandlerContext findContextInbound() { AbstractChannelHandlerContext ctx = this; do { ctx = ctx.next; } while (!ctx.inbound); return ctx; }
可以看出,流入的msg会先被送到HeadContext中,然后HeadContext会将其转发到pipeline中的下一个类型为inbound的AbstractChannelHandlerContext,然后调用关联的Handler来处理数据包
如果Handler的channelRead方法中又调用了ctx.fireChannelRead(msg),那么这个msg会继续被转发到pipeline中下一个类型为inbound的AbstractChannelHandlerContext中进行处理
那么问题来了,如果pipeline中的最后一个自定义的类型为inbound的AbstractChannelHandlerContext中接着调用ctx.fireChannelRead(msg),会发生什么呢?
只需要查看pipeline链表的真正尾结点TailContext的源码就行了:
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { onUnhandledInboundMessage(msg); } /** * Called once a message hit the end of the {@link ChannelPipeline} without been handled by the user * in {@link ChannelInboundHandler#channelRead(ChannelHandlerContext, Object)}. This method is responsible * to call {@link ReferenceCountUtil#release(Object)} on the given msg at some point. */ protected void onUnhandledInboundMessage(Object msg) { try { logger.debug( "Discarded inbound message {} that reached at the tail of the pipeline. " + "Please check your pipeline configuration.", msg); } finally { ReferenceCountUtil.release(msg);//释放内存 } }
原来只是使用debug级别输出一行日志罢了。
小结:
a. 读事件会触发Channel所关联的EventLoop的processSelectedKey方法
b. 触发AbstractNioByteChannel.NioByteUnsafe的read方法,其中会调用JDK底层提供的nio方法,将从网卡上读取到的数据包装成ByteBuf类型的消息msg
c. 触发Channel关联的DefaultChannelPipeline的fireChannelRead方法
d. 触发DefaultChannelPipeline中维护的双向链表的头结点HeadContext的invokeChannelRead方法
e. 触发DefaultChannelPipeline中维护的双向链表的后续类型为inBound的AbstractChannelHandlerContext的invokeChannelRead方法
f. 如果用户自定义的Handler的channelRead方法中又调用了ctx.fireChannelRead(msg),那么这个msg会继续沿着pipeline向后传播
g. 如果TailContext的channelRead收到了msg,则以debug级别输出日志
5. 一个写事件在pipeline中的流转过程
标签:读取 jdk ioc use complete 继承 group res static
原文地址:http://www.cnblogs.com/stevenczp/p/7615903.html