标签:定时 实践 def lse invoke send policy lease row
inBound事件的传播
何为inBound事件以及ChannelInboundHandler
ChannelRead事件的传播 ChannelRead是典型的inbound事件,以他为例了解inbound事件的传播
SimpleInBoundHandler处理器
何为inBound事件以及ChannelInboundHandler
ChannelHandler的继承关系
ChannelHandler定义了哪些功能
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public interface { void handlerAdded (ChannelHandlerContext ctx) throws Exception ; void handlerRemoved (ChannelHandlerContext ctx) throws Exception ; void exceptionCaught (ChannelHandlerContext ctx, Throwable cause) throws Exception ; @Inherited @Documented @Target (ElementType.TYPE) @Retention (RetentionPolicy.RUNTIME) @interface Sharable { } }
ChannelInboundHandler在ChannelHandler的基础上扩展了什么
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 public interface ChannelInboundHandler extends { void channelRegistered (ChannelHandlerContext ctx) throws Exception ; void channelUnregistered (ChannelHandlerContext ctx) throws Exception ; void channelActive (ChannelHandlerContext ctx) throws Exception ; void channelInactive (ChannelHandlerContext ctx) throws Exception ; void channelRead (ChannelHandlerContext ctx, Object msg) throws Exception ; void channelReadComplete (ChannelHandlerContext ctx) throws Exception ; void userEventTriggered (ChannelHandlerContext ctx, Object evt) throws Exception ; void channelWritabilityChanged (ChannelHandlerContext ctx) throws Exception ; @Override @SuppressWarnings ("deprecation" ) void exceptionCaught (ChannelHandlerContext ctx, Throwable cause) throws Exception ; }
这次由ChannelRead为例,看一下inbound事件是如何传播的.
ChannelRead事件的传播 这里创建3个自定义的InboundHandler测试ChannelRead事件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public class InBoundHandlerA extends ChannelInboundHandlerAdapter { @Override public void channelRead (ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("InBoundHandlerA: " + msg); ctx.fireChannelRead(msg); } } public class InBoundHandlerB extends ChannelInboundHandlerAdapter { @Override public void channelRead (ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("InBoundHandlerB: " + msg); ctx.fireChannelRead(msg); } @Override public void channelActive (ChannelHandlerContext ctx) { ctx.channel().pipeline().fireChannelRead("hello world" ); } } public class InBoundHandlerC extends ChannelInboundHandlerAdapter { @Override public void channelRead (ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("InBoundHandlerC: " + msg); ctx.fireChannelRead(msg); } }
服务端启动代码添加childHandler的部分为
1 2 3 4 5 6 7 8 .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel (SocketChannel ch) { ch.pipeline().addLast(new InBoundHandlerA()); ch.pipeline().addLast(new InBoundHandlerB()); ch.pipeline().addLast(new InBoundHandlerC()); } });
此时启动后在控制台执行telnet 127.0.0.1 8888
,它的后台输出结果为
1 2 3 InBoundHandlerA: hello world InBoundHandlerB: hello world InBoundHandlerC: hello world
如果把添加childHandler的部分改为下面的顺序
1 2 3 ch.pipeline().addLast(new InBoundHandlerA()); ch.pipeline().addLast(new InBoundHandlerC()); ch.pipeline().addLast(new InBoundHandlerB());
那么输出是
1 2 3 InBoundHandlerA: hello world InBoundHandlerC: hello world InBoundHandlerB: hello world
可以推测出,inbound和添加顺序相关.在InboundHandlerB
的channelActive()
中打个断点.看一下它的fireChannelRead("hello world")
的逻辑.会看到它会调用:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 @Override public final ChannelPipeline fireChannelRead (Object msg) { AbstractChannelHandlerContext.invokeChannelRead(head, msg); return this ; } --- static void invokeChannelRead (final AbstractChannelHandlerContext next, Object msg) { final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg" ), next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelRead(m); } else { executor.execute(new Runnable() { @Override public void run () { next.invokeChannelRead(m); } }); } }
也就是说,当遇到channelRead事件时它会从调用head的invokeChannelRead.继续看
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 private void invokeChannelRead (Object msg) { if (invokeHandler()) { ((ChannelInboundHandler) handler()).channelRead(this , msg); } else { fireChannelRead(msg); } } --- @Override public void channelRead (ChannelHandlerContext ctx, Object msg) throws Exception { ctx.fireChannelRead(msg); }
这个fireChannelRead是io.netty.channel.AbstractChannelHandlerContext#fireChannelRead
.它会调用findContextInbound()
来获取下一个inboundHandler
1 2 3 4 5 6 7 8 9 10 11 12 13 14 @Override public ChannelHandlerContext fireChannelRead (final Object msg) { invokeChannelRead(findContextInbound(), msg); return this ; } --- private AbstractChannelHandlerContext findContextInbound () { AbstractChannelHandlerContext ctx = this ; do { ctx = ctx.next; } while (!ctx.inbound); return ctx; }
获取到下一个inboundHandler,也就是InboundHandlerA后它会调用io.netty.channel.AbstractChannelHandlerContext#invokeChannelRead
,这个和之前head调用的是完全相同的方法,他回去调用当前inboundHandler,也就是InboundHandlerA的readChannel方法,它就是我们在用户代码中定义的部分.
1 2 3 4 5 6 7 8 public class InBoundHandlerA extends ChannelInboundHandlerAdapter { @Override public void channelRead (ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("InBoundHandlerA: " + msg); ctx.fireChannelRead(msg); } }
然后InBoundHandlerA
的channelRead()
又会调用下一个InBoundHandlerB
的channelRead()
,它又会接着调用InBoundHandlerC
的channelRead()
此时回顾一下InBoundHandlerB
中的逻辑,会发现.pipeline中inBound的传播方式为:
1 2 ctx.fireChannelRead(msg); ctx.channel().pipeline().fireChannelRead("hello world" );
当传播到最后一个节点(C)时,它会传播到最后的Tail节点,调用它的channelRead()
.之前说过tial是用来做一些收尾工作,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 @Override public void channelRead (ChannelHandlerContext ctx, Object msg) throws Exception { onUnhandledInboundMessage(msg); } --- protected void onUnhandledInboundMessage (Object msg) { try { logger.debug( "Discarded inbound message {} that reached at the tail of the pipeline. " + "Please check your pipeline configuration." , msg); } finally { ReferenceCountUtil.release(msg); } }
SimpleInBoundHandler处理器 以下面的自定义Handler为例,看一下它的使用场景
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public class AuthHandler extends SimpleChannelInboundHandler <ByteBuf > { @Override public void channelRead (ChannelHandlerContext ctx, Object msg) throws Exception { } @Override protected void channelRead0 (ChannelHandlerContext ctx, ByteBuf password) throws Exception { if (paas(password)) { ctx.pipeline().remove(this ); } else { ctx.close(); } } private boolean paas (ByteBuf password) { return false ; } }
看一下SimpleChannelInboundHandler
的channelRead()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 @Override public void channelRead (ChannelHandlerContext ctx, Object msg) throws Exception { boolean release = true ; try { if (acceptInboundMessage(msg)) { I imsg = (I) msg; channelRead0(ctx, imsg); } else { release = false ; ctx.fireChannelRead(msg); } } finally { if (autoRelease && release) { ReferenceCountUtil.release(msg); } } }
也就是说使用SimpleChannelInboundHandler
时不需要管channelRead()
而是通过channelRead0()
来做原本在channelRead()
中的逻辑.因为SimpleChannelInboundHandler的channelRead()
中定义了从执行channelRead0()
直到释放的过程,所以当channelRead0()
被执行后它会自动帮你去释放
outBound事件的传播
何为outBound事件以及ChannelOutBoundHandler
write()事件的传播 典型的outBound事件
何为outBound事件以及ChannelOutBoundHandler ChannelOutboundHandler在ChannelHandler的基础上扩展了什么
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public interface ChannelOutboundHandler extends { void bind (ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception ; void connect ( ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception ; void disconnect (ChannelHandlerContext ctx, ChannelPromise promise) throws Exception ; void close (ChannelHandlerContext ctx, ChannelPromise promise) throws Exception ; void deregister (ChannelHandlerContext ctx, ChannelPromise promise) throws Exception ; void read (ChannelHandlerContext ctx) throws Exception ; void write (ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception ; void flush (ChannelHandlerContext ctx) throws Exception ; }
相对于InBounder,outBound更像是用户主动发起的操作.而InBounder更类似于事件触发
write()事件的传播 这里创建3个自定义的OutboundHandler测试write事件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 public class OutBoundHandlerA extends ChannelOutboundHandlerAdapter { @Override public void write (ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { System.out.println("OutBoundHandlerA: " + msg); ctx.write(msg, promise); } } --- public class OutBoundHandlerB extends ChannelOutboundHandlerAdapter { @Override public void write (ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { System.out.println("OutBoundHandlerB: " + msg); ctx.write(msg, promise); } @Override public void handlerAdded (final ChannelHandlerContext ctx) { ctx.executor().schedule(() -> { ctx.channel().write("hello world" ); }, 3 , TimeUnit.SECONDS); } } 大专栏 事件和异常的传播 · 农场主的黑科技. pan class="line">---public class OutBoundHandlerC extends ChannelOutboundHandlerAdapter { @Override public void write (ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { System.out.println("OutBoundHandlerC: " + msg); ctx.write(msg, promise); } }
服务端创建代码
1 2 3 4 5 6 7 8 .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel (SocketChannel ch) { ch.pipeline().addLast(new OutBoundHandlerA()); ch.pipeline().addLast(new OutBoundHandlerB()); ch.pipeline().addLast(new OutBoundHandlerC()); } });
输出为以下,和添加顺序正好相反 .也就是说outBound的添加顺序和pipeline中的传播顺序是相反的
1 2 3 OutBoundHandlerC: hello world OutBoundHandlerB: hello world OutBoundHandlerA: hello world
下面看看write()
的源码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 @Override public ChannelFuture write (Object msg) { return pipeline.write(msg); } --- @Override public final ChannelFuture write (Object msg) { return tail.write(msg); } --- @Override public ChannelFuture write (Object msg) { return write(msg, newPromise()); }
这个pipeline的write()实际会调用tail的write,这里的newPromise()
是一个回调
1 2 3 4 5 6 7 8 9 10 @Override public ChannelFuture write (final Object msg, final ChannelPromise promise) { write(msg, false , promise); return promise; } --- private void write (Object msg, boolean flush, ChannelPromise promise) { AbstractChannelHandlerContext next = findContextOutbound(); }
通过findContextOutbound()
找到下一个传播对象,看一下它的源码
1 2 3 4 5 6 7 private AbstractChannelHandlerContext findContextOutbound () { AbstractChannelHandlerContext ctx = this ; do { ctx = ctx.prev; } while (!ctx.outbound); return ctx; }
和inbound的时候相似,通过while找前面那个outbound节点,此时会返回最后添加的C节点,继续看这个write()
的逻辑
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 private void write (Object msg, boolean flush, ChannelPromise promise) { AbstractChannelHandlerContext next = findContextOutbound(); final Object m = pipeline.touch(msg, next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { if (flush) { next.invokeWriteAndFlush(m, promise); } else { next.invokeWrite(m, promise); } } else { }
也就是说write()
会调用下一个outbound节点的invokeWrite()
1 2 3 4 5 6 7 8 9 10 11 12 private void invokeWrite (Object msg, ChannelPromise promise) { if (invokeHandler()) { invokeWrite0(msg, promise); } else { write(msg, promise); } } --- private void invokeWrite0 (Object msg, ChannelPromise promise) { ((ChannelOutboundHandler) handler()).write(this , msg, promise); }
也就是会调用用户代码自定义的write()
1 2 3 4 5 6 7 8 public class OutBoundHandlerC extends ChannelOutboundHandlerAdapter { @Override public void write (ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { System.out.println("OutBoundHandlerC: " + msg); ctx.write(msg, promise); } }
这个write会从当前节点调用io.netty.channel.AbstractChannelHandlerContext#write(java.lang.Object, io.netty.channel.ChannelPromise)
,寻找下一个传播对象,并调用它的write
1 2 3 4 5 6 7 8 9 10 11 @Override public ChannelFuture write (final Object msg, final ChannelPromise promise) { write(msg, false , promise); return promise; } --- private void write (Object msg, boolean flush, ChannelPromise promise) { AbstractChannelHandlerContext next = findContextOutbound(); }
后面节点B又会通过相同逻辑调用A.A节点又会调用相同逻辑,但此时返回的next是head节点.也就是head节点的wtite()方法.
1 2 3 4 @Override public void write (ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { unsafe.write(msg, promise); }
总结一下就是
1 2 ctx.channel().write("hello world"); //从tail开始传播 ctx.write(msg, promise); //从当前节点开始往下传播
异常的传播
异常的触发链 在服务端启动代码中添加6个Handler
1 2 3 4 5 6 7 8 9 10 11 .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel (SocketChannel ch) { ch.pipeline().addLast(new InBoundHandlerA()); ch.pipeline().addLast(new InBoundHandlerB()); ch.pipeline().addLast(new InBoundHandlerC()); ch.pipeline().addLast(new OutBoundHandlerA()); ch.pipeline().addLast(new OutBoundHandlerB()); ch.pipeline().addLast(new OutBoundHandlerC()); } });
把InBoundHandlerB
改为以下
1 2 3 4 5 6 7 8 9 10 11 12 public class InBoundHandlerB extends ChannelInboundHandlerAdapter { @Override public void channelRead (ChannelHandlerContext ctx, Object msg) throws Exception { throw new BusinessException("from InBoundHandlerB" ); } @Override public void channelActive (ChannelHandlerContext ctx) { ctx.channel().pipeline().fireChannelRead("hello world" ); } }
在InBoundHandlerA
,InBoundHandlerC
,OutBoundHandlerA
,OutBoundHandlerB
,OutBoundHandlerC
中加入以下逻辑
1 2 3 4 5 @Override public void exceptionCaught (ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("InBoundHandlerA.exceptionCaught()" ); ctx.fireExceptionCaught(cause); }
通过telnet写入数据,此时会抛出异常
1 2 $ telnet 127.0.0.1 8888 $ send ayt
服务端的输出为
1 2 3 4 5 6 InBoundHandlerB.exceptionCaught() InBoundHandlerC.exceptionCaught() OutBoundHandlerA.exceptionCaught() OutBoundHandlerB.exceptionCaught() OutBoundHandlerC.exceptionCaught() //..以下异常信息
可以看出inBound的输出顺序和inbound事件的传播顺序很相似,但outBound的异常传播顺序和之前的outBound事件传播顺序是相反的.
跟踪源码看一下,抛出异常的InBoundHandlerB#channelRead
的调用顺序是怎样的
1 2 3 4 5 6 7 8 9 10 11 12 private void invokeChannelRead (Object msg) { if (invokeHandler()) { try { ((ChannelInboundHandler) handler()).channelRead(this , msg); } catch (Throwable t) { notifyHandlerException(t); } } else { fireChannelRead(msg); } }
跟踪抛出异常时会执行的notifyHandlerException(t)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 private void notifyHandlerException (Throwable cause) { invokeExceptionCaught(cause); } --- private void invokeExceptionCaught (final Throwable cause) { if (invokeHandler()) { try { handler().exceptionCaught(this , cause); } catch (Throwable error) { } } else { fireExceptionCaught(cause); } }
也就是说在InBoundHandlerB#channelRead
抛出异常时它会回调InBoundHandlerB#exceptionCaught
,
它里面通过ctx.fireExceptionCaught(cause);
把异常事件继续进行传播.接着看
1 2 3 4 5 6 7 @Override public ChannelHandlerContext fireExceptionCaught (final Throwable cause) { invokeExceptionCaught(next, cause); return this ; }
InBoundHandlerB会调用C的异常捕获
1 2 3 4 5 6 7 8 static void invokeExceptionCaught (final AbstractChannelHandlerContext next, final Throwable cause) { EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeExceptionCaught(cause); } else { } }
接下来就和InB调用同一个方法,执行C的exceptionCaught()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 private void invokeExceptionCaught (final Throwable cause) { if (invokeHandler()) { try { handler().exceptionCaught(this , cause); } catch (Throwable error) { } } else { fireExceptionCaught(cause); } } --- @Override public void exceptionCaught (ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("InBoundHandlerC.exceptionCaught()" ); ctx.fireExceptionCaught(cause); }
而InBoundHandlerC
的下一个节点则是OutBoundHandlerA
,再下一个则是OutBoundHandlerB
那么OutBoundHandlerC
调用ctx.fireExceptionCaught(cause);
继续往下传播异常时会传播到哪?Tail节点的exceptionCaught()
Tail节点的exceptionCaught()
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @Override public void exceptionCaught (ChannelHandlerContext ctx, Throwable cause) throws Exception { onUnhandledInboundException(cause); } --- protected void onUnhandledInboundException (Throwable cause) { try { logger.warn( "An exceptionCaught() event was fired, and it reached at the tail of the pipeline. " + "It usually means the last handler in the pipeline did not handle the exception." , cause); } finally { ReferenceCountUtil.release(cause); } }
提醒用户代码异常尚未被处理.上面的示例程序中它会打印
1 2 3 4 5 6 7 8 Jan 07, 2019 10:00:35 AM io.netty.channel.DefaultChannelPipeline onUnhandledInboundException WARNING: An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception. com.imooc.netty.ch6.BusinessException: from InBoundHandlerB at com.imooc.netty.ch6.InBoundHandlerB.channelRead(InBoundHandlerB.java:12) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:351) //...
以上就是异常的触发链.可以发现它的异常触发和是inBound还是outBound没有关系,只会每次调用next触发下一个节点,也就是说和Handler的添加顺序有关.
那么如果没有在用户代码中重载exceptionCaught()
,它的父类会做哪些事?
1 2 3 4 5 @Override public void exceptionCaught (ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.fireExceptionCaught(cause); }
就是直接把异常往下传播
异常处理的最佳实践 项目中常用的异常处理方式:在每一条channel的最后给它添加一个终极的Exception处理器,如:
1 2 3 4 5 6 7 8 public class ExceptionCaughtHandler extends ChannelInboundHandlerAdapter { @Override public void exceptionCaught (ChannelHandlerContext ctx, Throwable cause) throws Exception { if (cause instanceof BusinessException){ System.out.println("BusinessException" ); } } }
在pipeline的最后添加ExceptionCaughtHandler
1 2 3 4 5 6 7 8 9 10 11 12 13 .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel (SocketChannel ch) { ch.pipeline().addLast(new InBoundHandlerA()); ch.pipeline().addLast(new InBoundHandlerB()); ch.pipeline().addLast(new InBoundHandlerC()); ch.pipeline().addLast(new OutBoundHandlerA()); ch.pipeline().addLast(new OutBoundHandlerB()); ch.pipeline().addLast(new OutBoundHandlerC()); ch.pipeline().addLast(new ExceptionCaughtHandler()); } });
此时的输出为以下,说明异常在最后被处理了
1 2 3 4 5 6 InBoundHandlerB.exceptionCaught() InBoundHandlerC.exceptionCaught() OutBoundHandlerA.exceptionCaught() OutBoundHandlerB.exceptionCaught() OutBoundHandlerC.exceptionCaught() BusinessException
总结
netty如何判断ChannelHandler类型的? 添加一个节点时,pipeline会通过一个instanceof来判断是inbound类型还是outbound类型,然后用布尔变量来标示
对于ChannelHandler的添加应该遵循什么样的顺序? inBound事件的传播顺序和inBoundHandler的添加顺序相同 outBound事件的传播顺序和outBoundHandler的添加顺序相反
用户手动触发事件传播,不同的触发方式有什么样的区别?
inbound事件,如
1 2 ctx.fireChannelRead(msg); ctx.channel().pipeline().fireChannelRead("hello world" );
outbound事件,如
1 2 ctx.write(msg, promise); ctx.channel().write("hello world" );
事件和异常的传播 · 农场主的黑科技.
标签:定时 实践 def lse invoke send policy lease row
原文地址:https://www.cnblogs.com/lijianming180/p/12366658.html