标签:host adl channel ali stat expand oaf tty object
服务端netty的channelHandler有这么多
public class NettyServerPipelineFactory implements ChannelPipelineFactory { private NettyServer server; private static CodecConfig codecConfig = CodecConfigFactory.createClientConfig(); public NettyServerPipelineFactory(NettyServer server) { this.server = server; } public ChannelPipeline getPipeline() { ChannelPipeline pipeline = pipeline(); pipeline.addLast("framePrepender", new FramePrepender()); pipeline.addLast("frameDecoder", new FrameDecoder()); pipeline.addLast("crc32Handler", new Crc32Handler(codecConfig)); pipeline.addLast("compressHandler", new CompressHandler(codecConfig)); pipeline.addLast("providerDecoder", new ProviderDecoder()); pipeline.addLast("providerEncoder", new ProviderEncoder()); pipeline.addLast("serverHandler", new NettyServerHandler(server)); return pipeline; } }
这其中跟业务相关最强的是这三个 FrameDecoder ProviderDecoder NettyServerHandler
//magic os.write(CodecConstants.MAGIC); // 0x39 0x3A //serialize os.writeByte(msg.getSerialize());//序列化类型 这里注意 消息头部分就3个字节 和上面可是不一样的 //bodyLength os.writeInt(Integer.MAX_VALUE);//消息体长度
我们先分析 FrameDecoder FrameDecoder#decode
protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception { Object message = null; if (buffer.readableBytes() <= 2) { return message; } byte[] headMsgs = new byte[2]; buffer.getBytes(buffer.readerIndex(), headMsgs); if ((0x39 == headMsgs[0] && 0x3A == headMsgs[1])) { //old protocol message = doDecode(buffer); } else if ((byte) 0xAB == headMsgs[0] && (byte) 0xBA == headMsgs[1]) { //new protocol message = _doDecode(buffer); } else { throw new IllegalArgumentException("Decode invalid message head:" + headMsgs[0] + " " + headMsgs[1] + ", " + "message:" + buffer); } return message; }
这里分析下 非thrift协议,也就是
if ((0x39 == headMsgs[0] && 0x3A == headMsgs[1])) { //old protocol message = doDecode(buffer);
protected Object doDecode(ChannelBuffer buffer) throws Exception { CodecEvent codecEvent = null; if (buffer.readableBytes() <= CodecConstants.FRONT_LENGTH) { HEAD_LENGTH + BODY_FIELD_LENGTH 3 + 4 return codecEvent; } int totalLength = (int) buffer.getUnsignedInt( buffer.readerIndex() + CodecConstants.HEAD_LENGTH);//从开始位置数3个字节,读出来四个字节,该值就是消息体字节数 int frameLength = totalLength + CodecConstants.FRONT_LENGTH;//消息体长度 + 7个字节的头长度 就是总长度 if (buffer.readableBytes() >= frameLength) { ChannelBuffer frame = buffer.slice(buffer.readerIndex(), frameLength); buffer.readerIndex(buffer.readerIndex() + frameLength); codecEvent = new CodecEvent(frame, false); codecEvent.setReceiveTime(System.currentTimeMillis());//从bytebuffer中切出来 frameLength 字节数,构成一个CodecEvent } return codecEvent; }
经过了 FrameDecoder的解码,现在handler链中的对象就不再是原生的 ByteBuffer了,而是CodecEvent
接下来分析 ProviderDecoder ProviderDecoder 继承自 AbstractDecoder,其主要的逻辑也是在 AbstractDecoder
AbstractDecoder # decode
public Object decode(ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception { if (msg == null || !(msg instanceof CodecEvent)) { return null; } CodecEvent codecEvent = (CodecEvent) msg; if (codecEvent.isValid()) { Object message = null; if (codecEvent.isUnified()) { message = _doDecode(ctx, channel, codecEvent); codecEvent.setInvocation((InvocationSerializable) message); } else { message = doDecode(ctx, channel, codecEvent); codecEvent.setInvocation((InvocationSerializable) message); } } return codecEvent; }
还是看非thrift协议的解码
protected Object doDecode(ChannelHandlerContext ctx, Channel channel, CodecEvent codecEvent) throws IOException { Object msg = null; ChannelBuffer buffer = codecEvent.getBuffer(); //head buffer.skipBytes(CodecConstants.MEGIC_FIELD_LENGTH);//跳过前两个字节 byte serialize = buffer.readByte();//读一个字节的序列化类型 Long sequence = null; try { //body length int totalLength = buffer.readInt();//消息体长度 int frameLength = totalLength + CodecConstants.FRONT_LENGTH;//帧长度 //body int bodyLength = (totalLength - CodecConstants.TAIL_LENGTH);//消息体里包含了11个字节的尾巴。其中8个字节long类型的序列号,后三个字节固定是 30,29,28 ChannelBuffer frame = extractFrame(buffer, buffer.readerIndex(), bodyLength);//这是切出来纯的请求体 buffer.readerIndex(buffer.readerIndex() + bodyLength); //tail sequence = buffer.readLong(); buffer.skipBytes(CodecConstants.EXPAND_FIELD_LENGTH); //deserialize ChannelBufferInputStream is = new ChannelBufferInputStream(frame); msg = deserialize(serialize, is);//对纯的请求体做反序列化 //after doAfter(channel, msg, serialize, frameLength, codecEvent.getReceiveTime()); } catch (Throwable e) { SerializationException se = new SerializationException(e); try { if (sequence != null) { doFailResponse(ctx, channel, ProviderUtils.createThrowableResponse(sequence.longValue(), serialize, se)); } logger.error("Deserialize failed. host:" + ((InetSocketAddress) channel.getRemoteAddress()).getAddress().getHostAddress() + "\n" + e.getMessage(), se); } catch (Throwable t) { logger.error("[doDecode] doFailResponse failed.", t); } } return msg; }
ProviderDecoder # deserialize
public Object deserialize(byte serializerType, InputStream is) { Object decoded = SerializerFactory.getSerializer(serializerType).deserializeRequest(is); return decoded; }
没啥好说的了,就是反序列化了
注意,此时CodecEvent中已经有了反序列化好的原始请求对象了
message = doDecode(ctx, channel, codecEvent);
codecEvent.setInvocation((InvocationSerializable) message);
剩下的就是请求到达 NettyServerHandler。这部分逻辑其实在 Pigeon源码分析(四) -- 服务端接收请求过程 - MaXianZhe - 博客园 (cnblogs.com)分析过了
标签:host adl channel ali stat expand oaf tty object
原文地址:https://www.cnblogs.com/juniorMa/p/14859716.html