码迷,mamicode.com
首页 > Web开发 > 详细

Netty源码学习(六)ChannelPipeline

时间:2017-09-30 21:06:23      阅读:746      评论:0      收藏:0      [点我收藏+]

标签:读取   jdk   ioc   use   complete   继承   group   res   static   

0. ChannelPipeline简介

ChannelPipeline = Channel + Pipeline,也就是说首先它与Channel绑定,然后它是起到类似于管道的作用:字节流在ChannelPipeline上流动,流动的过程中被ChannelHandler修饰,最终输出。

 

1. ChannelPipeline类图

技术分享

ChannelPipeline只有两个子类,直接一起放上来好了,其中EmbeddedChannelPipeline主要用于测试,本文只介绍DefaultChannelPipeline

 

2. ChannelPipeline的初始化

跟踪一下DefaultChannelPipeline的构造方法就能发现

ChannelPipeline是在AbstractChannel的构造方法中被初始化的,而AbstractChannel的构造方法有两个,我只选取其中一个做分析了:

AbstractChannel()
    protected AbstractChannel(Channel parent) {
        this.parent = parent;
        id = newId();
        unsafe = newUnsafe();
        pipeline = newChannelPipeline();
    }

AbstractChannel.newChannelPipeline()
    protected DefaultChannelPipeline newChannelPipeline() {
        return new DefaultChannelPipeline(this);
    }


    protected DefaultChannelPipeline(Channel channel) {
        this.channel = ObjectUtil.checkNotNull(channel, "channel");
        succeededFuture = new SucceededChannelFuture(channel, null);
        voidPromise =  new VoidChannelPromise(channel, true);

        tail = new TailContext(this);
        head = new HeadContext(this);

        head.next = tail;
        tail.prev = head;
    }

可以看到,DefaultChannelPipeline中维护了对关联的Channel的引用

而且,Pipeline内部维护了一个双向链表,head是链表的头,tail是链表的尾,链表指针是AbstractChannelHandlerContext类型。

在DefaultChannelPipeline初始化完成的时候,其内部结构是下面这个样子的:

HeadContext  <---->  TailContext

HeadContext与TailContext都继承于AbstractChannelHandlerContext,基本上是起到占位符的效果,没有什么功能性的作用。

 

 

3. 向pipeline添加节点

举一个很简单的例子:

bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
     @Override
     public void initChannel(SocketChannel ch) throws Exception {
         ChannelPipeline p = ch.pipeline();
         p.addLast(new Decoder());//解码
         p.addLast(new BusinessHandler())//业务逻辑
         p.addLast(new Encoder());//编码
     }
});

在pipeline中,从网卡收到的数据流先被Decoder解码,然后被BusinessHandler处理,然后再被Encoder编码,最后写回到网卡中。

此时pipeline的内部结构为:

HeadContext  <---->  Decoder  <---->   BusinessHandler  <---->   Encoder  <---->    TailContext

pipeline的修改,是在Channel初始化时,由ChannelInitializer进行的。

ChannelInitializer调用用户自定义的initChannel方法,然后调用Pipeline.addLast()方法修改pipeline的结构,关键代码位于DefaultChannelPipeline.addLast()中:

    public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
        final AbstractChannelHandlerContext newCtx;
        synchronized (this) {//很重要,用当前的DefaultChannelPipeline作为同步对象,使pipeline的addLast方法串行化
            checkMultiplicity(handler);//禁止非Sharable的handler被重复add到不同的pipeline中

            newCtx = newContext(group, filterName(name, handler), handler);//将Handler包装成DefaultChannelHandlerContext并插入pipeline中

            addLast0(newCtx);//将ChannelHandlerContext插入pipeline

            // If the registered is false it means that the channel was not registered on an eventloop yet.
            // In this case we add the context to the pipeline and add a task that will call
            // ChannelHandler.handlerAdded(...) once the channel is registered.
            if (!registered) {//如果channel没有与eventloop绑定,则创建一个任务,这个任务会在channel被register的时候调用
                newCtx.setAddPending();
                callHandlerCallbackLater(newCtx, true);
                return this;
            }

            EventExecutor executor = newCtx.executor();
            if (!executor.inEventLoop()) {
                newCtx.setAddPending();
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        callHandlerAdded0(newCtx);
                    }
                });
                return this;
            }
        }
        callHandlerAdded0(newCtx);
        return this;
    }

    private void addLast0(AbstractChannelHandlerContext newCtx) {//向双链表插入节点
        AbstractChannelHandlerContext prev = tail.prev;
        newCtx.prev = prev;
        newCtx.next = tail;
        prev.next = newCtx;
        tail.prev = newCtx;
    }

    private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
        try {
            ctx.handler().handlerAdded(ctx);//触发handler的handlerAdded回调函数
            ctx.setAddComplete();
        } catch (Throwable t) {//异常处理
            boolean removed = false;
            try {
                remove0(ctx);
                try {
                    ctx.handler().handlerRemoved(ctx);
                } finally {
                    ctx.setRemoved();
                }
                removed = true;
            } catch (Throwable t2) {
                if (logger.isWarnEnabled()) {
                    logger.warn("Failed to remove a handler: " + ctx.name(), t2);
                }
            }

            if (removed) {
                fireExceptionCaught(new ChannelPipelineException(
                        ctx.handler().getClass().getName() +
                        ".handlerAdded() has thrown an exception; removed.", t));
            } else {
                fireExceptionCaught(new ChannelPipelineException(
                        ctx.handler().getClass().getName() +
                        ".handlerAdded() has thrown an exception; also failed to remove.", t));
            }
        }
    }

小结:

a. addLast方法的作用就是将传入的handler添加到当前Pipeline的双向链表中

b. 在后续处理的时候,可以通过遍历链表,找到channel关联的pipeline上注册的所有handler

c. 贴出的代码中没有涉及,但又很重要的一点是:在添加handler的过程中,会根据handler继承于ChannelInboundHandler或者ChannelOutboundHandler来判定这个handler是用于处理in事件还是out事件的,然后会以此为依据来设置AbstractChannelHandlerContext的inbound和outbound位。

 

4. 一个读事件在pipeline中的流转过程

在第三节的示例中,如果一个已经register完毕的Channel收到一个数据包,会发生什么事情呢?

首先,这个Channel必然是与某个NioEventLoop绑定的,这个Channel上的可读事件会触发NioEventLoop.processSelectedKey方法:

    private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
        ...........
        try{
            // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
            // to a spin loop
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                unsafe.read();//触发
            }
        } catch (CancelledKeyException ignored) {
            unsafe.close(unsafe.voidPromise());
        }
    }

可以看到这会触发AbstractNioChannel.NioUnsafe的read方法,其实现位于AbstractNioByteChannel.NioByteUnsafe中:

        @Override
        public final void read() {
            ...........
                do {
                    byteBuf = allocHandle.allocate(allocator);//创建一个ByteBuf作为缓冲区
                    allocHandle.lastBytesRead(doReadBytes(byteBuf));//读取数据到ByteBuf
                    if (allocHandle.lastBytesRead() <= 0) {
                        // nothing was read. release the buffer.
                        byteBuf.release();
                        byteBuf = null;
                        close = allocHandle.lastBytesRead() < 0;
                        break;
                    }

                    allocHandle.incMessagesRead(1);
                    readPending = false;
                    pipeline.fireChannelRead(byteBuf);//触发pipeline的读事件
                    byteBuf = null;
                } while (allocHandle.continueReading());

                allocHandle.readComplete();
                pipeline.fireChannelReadComplete();
           .............
        }
    }

fireChannelRead的实现位于DefaultChannelPipeline中:

DefaultChannelPipeline.fireChannelRead()
    @Override
s0. public final ChannelPipeline fireChannelRead(Object msg) {
        AbstractChannelHandlerContext.invokeChannelRead(head, msg);//这里传入的是pipeline的head节点
        return this;
    }

AbstractChannelHandlerContext.invokeChannelRead()
s1. static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
        final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);//这行代码可能是为了检查内存泄漏
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {//同步或者异步的调用传入的AbstractChannelHandlerContext的invokeChannelRead方法,
            next.invokeChannelRead(m);
        } else {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    next.invokeChannelRead(m);
                }
            });
        }
    }

s2. private void invokeChannelRead(Object msg) {
        if (invokeHandler()) {
            try {
                ((ChannelInboundHandler) handler()).channelRead(this, msg);//第一次调用时,handler为HeadContext,后续调用时为pipeline中自定义的类型为inbound的handler
            } catch (Throwable t) {
                notifyHandlerException(t);
            }
        } else {
            fireChannelRead(msg);
        }
    }

HeadContext.channelRead()
            @Override
s3. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            ctx.fireChannelRead(msg);
        }

AbstractChannelHandlerContext.fireChannelRead()
    @Override
s4. public ChannelHandlerContext fireChannelRead(final Object msg) {
        invokeChannelRead(findContextInbound(), msg);//先找到pipeline上当前AbstractChannelHandlerContext节点之后的第一个inbound类型的AbstractChannelHandlerContext,然后调用其invokeChannelRead()方法,这样就又调转回到s1了
        return this;
    }

    //从pipeline的当前AbstractChannelHandlerContext向后遍历,找到第一个类型为inbound的AbstractChannelHandlerContext节点
s5. private AbstractChannelHandlerContext findContextInbound() {
        AbstractChannelHandlerContext ctx = this;
        do {
            ctx = ctx.next;
        } while (!ctx.inbound);
        return ctx;
    }

可以看出,流入的msg会先被送到HeadContext中,然后HeadContext会将其转发到pipeline中的下一个类型为inbound的AbstractChannelHandlerContext,然后调用关联的Handler来处理数据包

如果Handler的channelRead方法中又调用了ctx.fireChannelRead(msg),那么这个msg会继续被转发到pipeline中下一个类型为inbound的AbstractChannelHandlerContext中进行处理

那么问题来了,如果pipeline中的最后一个自定义的类型为inbound的AbstractChannelHandlerContext中接着调用ctx.fireChannelRead(msg),会发生什么呢?

只需要查看pipeline链表的真正尾结点TailContext的源码就行了:

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        onUnhandledInboundMessage(msg);
    }

    /**
     * Called once a message hit the end of the {@link ChannelPipeline} without been handled by the user
     * in {@link ChannelInboundHandler#channelRead(ChannelHandlerContext, Object)}. This method is responsible
     * to call {@link ReferenceCountUtil#release(Object)} on the given msg at some point.
     */
    protected void onUnhandledInboundMessage(Object msg) {
        try {
            logger.debug(
                    "Discarded inbound message {} that reached at the tail of the pipeline. " +
                            "Please check your pipeline configuration.", msg);
        } finally {
            ReferenceCountUtil.release(msg);//释放内存
        }
    }

原来只是使用debug级别输出一行日志罢了。

小结:

a. 读事件会触发Channel所关联的EventLoop的processSelectedKey方法

b. 触发AbstractNioByteChannel.NioByteUnsafe的read方法,其中会调用JDK底层提供的nio方法,将从网卡上读取到的数据包装成ByteBuf类型的消息msg

c. 触发Channel关联的DefaultChannelPipeline的fireChannelRead方法

d. 触发DefaultChannelPipeline中维护的双向链表的头结点HeadContext的invokeChannelRead方法

e. 触发DefaultChannelPipeline中维护的双向链表的后续类型为inBound的AbstractChannelHandlerContext的invokeChannelRead方法

f. 如果用户自定义的Handler的channelRead方法中又调用了ctx.fireChannelRead(msg),那么这个msg会继续沿着pipeline向后传播

g. 如果TailContext的channelRead收到了msg,则以debug级别输出日志

 

5. 一个写事件在pipeline中的流转过程

 

Netty源码学习(六)ChannelPipeline

标签:读取   jdk   ioc   use   complete   继承   group   res   static   

原文地址:http://www.cnblogs.com/stevenczp/p/7615903.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!