标签:decode etl ever empty enable 基本 img 解码 .exe
在前面的博客中讨论了Executor, Driver之间如何汇报Executor生成的Shuffle的数据文件,以及Executor获取到Shuffle的数据文件的分布,那么Executor是如何获取到Shuffle的数据文件进行Action的算子的计算呢?
在ResultTask中,Executor通过MapOutPutTracker向Driver获取了ShuffID的Shuffle数据块的结构,整理成以BlockManangerId为Key的结构,这样可以更容易区分究竟是本地的Shuffle还是远端executor的Shuffle
Seq[(BlockManagerId, Seq[(BlockId, Long)])]
class BlockManagerId private ( private var executorId_ : String, private var host_ : String, private var port_ : Int, private var topologyInfo_ : Option[String]) extends Externalizable
for ((address, blockInfos) <- blocksByAddress) { totalBlocks += blockInfos.size if (address.executorId == blockManager.blockManagerId.executorId) { // Filter out zero-sized blocks localBlocks ++= blockInfos.filter(_._2 != 0).map(_._1) numBlocksToFetch += localBlocks.size } }
spark.reducer.maxSizeInFlight val targetRequestSize = math.max(maxBytesInFlight / 5, 1L)
val iterator = blockInfos.iterator var curRequestSize = 0L var curBlocks = new ArrayBuffer[(BlockId, Long)] while (iterator.hasNext) { val (blockId, size) = iterator.next() // Skip empty blocks if (size > 0) { curBlocks += ((blockId, size)) remoteBlocks += blockId numBlocksToFetch += 1 curRequestSize += size } else if (size < 0) { throw new BlockException(blockId, "Negative block size " + size) } if (curRequestSize >= targetRequestSize) { // Add this FetchRequest remoteRequests += new FetchRequest(address, curBlocks) curBlocks = new ArrayBuffer[(BlockId, Long)] logDebug(s"Creating fetch request of $curRequestSize at $address") curRequestSize = 0 } } // Add in the final request if (curBlocks.nonEmpty) { remoteRequests += new FetchRequest(address, curBlocks) }多个FetchRequest会被随机化后放入队列Queue中,每个Executor从Driver端获取的ShuffID对应的BlockManagerID所管理的BlockID的状态是相同的顺序,如果不对FetchRequest进行随机化,那么非常有可能存在多个Executor同时向同一个Executor获取发送FetchRequest的情况,从而导致Executor的负载增高,为了均衡每个Executor的数据获取,随机化FetchRequest是非常有必要的。
FetchRequest并不是并行提交的,对同一个Task来说,在Executor的做combine的时候是一个一个的BlockID块合并的,而Task本身就是一个线程运行的,所以不需要设计FetchRequest成并行提交,当一个BlockID完成计算后,才需要判断是否需要进行下一个FetchRequest请求,因为FetchRequest是多个Block提交的,为了控制Executor获取多个BlockID的shuffle数据的带宽,在提交FetchRequest的时候控制了请求的频率
在满足下面以下条件下,才允许提交下个FetchRequest
spark.reducer.maxReqsInFlight
- Executor A 通过ExternalShuffleClient 进行fetchBlocks的操作,如果配置了
io.maxRetries最大重试参数的话,将启动一个能重试RetryingBlockFetcher的获取器- 初始化TransportClient,OneForOneBlockFetcher获取器
- 在OneForOneBlockFetcher里首先向另一个Executor B发送了OpenBlocks的询问请求,里面告知ExecutorID, APPID和BlockID的集合
- Executor B获取到BlockIDs,后通过BlockManager获取相关的BlockID的文件(通过mapid, reduceid获取相关的索引和数据文件),构建FileSegmentManagedBuffer
- 通过StreamManager(OneForOneStreamManager) registerStream 生成streamId,和StreamState(多个ManagedBuffer,AppID)的缓存
返回所生成的StreamId- Executor B 返回给 StreamHandle的消息,里面包含了StreamId和Chunk的数量,这里chunk的数量其实就是Block的数量
- Executor A 获取到 StreamHandle的消息,一个一个的发送ChunkFetchRequest里面包含了StreamId, Chunk index,去真实的获取Executor B的shuffle数据文件
- Executor B 通过传递的ChunkFetchRequest消息获取到StreamId, Chunk index, 通过缓存获取到对应的FileSgementManagedBuffer,返回chunkFetchSuccess消息,里面包含着streamID, 和FileSegmentManagedBuffer
- 在步骤3-6步骤里是堵塞在Task线程里,而步骤7一个一个发送ChunkFetchRequest后,并不堵塞等待返回结果,结果是通过回调函数来实现的,在调用前注册了一个回调函数
client.fetchChunk(streamHandle.streamId, i, chunkCallback); private class ChunkCallback implements ChunkReceivedCallback { @Override public void onSuccess(int chunkIndex, ManagedBuffer buffer) { // On receipt of a chunk, pass it upwards as a block. listener.onBlockFetchSuccess(blockIds[chunkIndex], buffer); } @Override public void onFailure(int chunkIndex, Throwable e) { // On receipt of a failure, fail every block from chunkIndex onwards. String[] remainingBlockIds = Arrays.copyOfRange(blockIds, chunkIndex, blockIds.length); failRemainingBlocks(remainingBlockIds, e); } }- 在这里的listener就是前面fetchBlocks里注入的BlockFetchingListener
new BlockFetchingListener { override def onBlockFetchSuccess(blockId: String, buf: ManagedBuffer): Unit = { // Only add the buffer to results queue if the iterator is not zombie, // i.e. cleanup() has not been called yet. ShuffleBlockFetcherIterator.this.synchronized { if (!isZombie) { // Increment the ref count because we need to pass this to a different thread. // This needs to be released after use. buf.retain() remainingBlocks -= blockId results.put(new SuccessFetchResult(BlockId(blockId), address, sizeMap(blockId), buf, remainingBlocks.isEmpty)) logDebug("remainingBlocks: " + remainingBlocks) } } logTrace("Got remote block " + blockId + " after " + Utils.getUsedTimeMs(startTime)) } override def onBlockFetchFailure(blockId: String, e: Throwable): Unit = { logError(s"Failed to get block(s) from ${req.address.host}:${req.address.port}", e) results.put(new FailureFetchResult(BlockId(blockId), address, e)) } }- 如果获取成功将封装SuccessFetchResult里面保存着blockId,地址,数据大小,以及ManagedBuffer,并保存到results的queue中
override def hasNext: Boolean = numBlocksProcessed < numBlocksToFetch在hasNext里判断当前的是否已经达到需要读取的block数量了,每一次读取下一个block的时候都会在numBlocksProcessed+1,在读取失败的情况下会直接抛出异常。
public interface Message extends Encodable{}
public interface Encodable { /** Number of bytes of the encoded form of this object. */ int encodedLength(); /** * Serializes this object by writing into the given ByteBuf. * This method must write exactly encodedLength() bytes. */ void encode(ByteBuf buf); }核心的序列话的encode的入参数是ByteBuf 很符合Netty里的NIO所暴露出的接口,同时也要注意这是Netty的ByteBuf 和Netty是耦合了
MessageToMessageEncoder在Spark里自己实现MessageToMessageEncoder的encoder的方法protected abstract void encode(ChannelHandlerContext paramChannelHandlerContext, I paramI, List<Object> paramList) /* */ throws Exception;
public final class MessageEncoder extends MessageToMessageEncoder<Message> { private static final Logger logger = LoggerFactory.getLogger(MessageEncoder.class); /*** * Encodes a Message by invoking its encode() method. For non-data messages, we will add one * ByteBuf to ‘out‘ containing the total frame length, the message type, and the message itself. * In the case of a ChunkFetchSuccess, we will also add the ManagedBuffer corresponding to the * data to ‘out‘, in order to enable zero-copy transfer. */ @Override public void encode(ChannelHandlerContext ctx, Message in, List<Object> out) throws Exception { Object body = null; long bodyLength = 0; boolean isBodyInFrame = false; // If the message has a body, take it out to enable zero-copy transfer for the payload. if (in.body() != null) { try { bodyLength = in.body().size(); body = in.body().convertToNetty(); isBodyInFrame = in.isBodyInFrame(); } catch (Exception e) { in.body().release(); if (in instanceof AbstractResponseMessage) { AbstractResponseMessage resp = (AbstractResponseMessage) in; // Re-encode this message as a failure response. String error = e.getMessage() != null ? e.getMessage() : "null"; logger.error(String.format("Error processing %s for client %s", in, ctx.channel().remoteAddress()), e); encode(ctx, resp.createFailureResponse(error), out); } else { throw e; } return; } } Message.Type msgType = in.type(); // All messages have the frame length, message type, and message itself. The frame length // may optionally include the length of the body data, depending on what message is being // sent. int headerLength = 8 + msgType.encodedLength() + in.encodedLength(); long frameLength = headerLength + (isBodyInFrame ? bodyLength : 0); ByteBuf header = ctx.alloc().heapBuffer(headerLength); header.writeLong(frameLength); msgType.encode(header); in.encode(header); assert header.writableBytes() == 0; if (body != null) { // We transfer ownership of the reference on in.body() to MessageWithHeader. // This reference will be freed when MessageWithHeader.deallocate() is called. out.add(new MessageWithHeader(in.body(), header, body, bodyLength)); } else { out.add(header); } } }
public final class MessageDecoder extends MessageToMessageDecoder<ByteBuf>在decode方法里,直接对ByteBuf进行decode会Message
public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { Message.Type msgType = Message.Type.decode(in); Message decoded = decode(msgType, in); assert decoded.type() == msgType; logger.trace("Received message {}: {}", msgType, decoded); out.add(decoded); }对每个不同的Message 分别调用了各自的decode的方法。
respond(new ChunkFetchSuccess(req.streamChunkId, buf));在buf里的ManagedBuffer是FileSegmentManagedBuffer,而在刚才的encode函数里
body = in.body().convertToNetty();
public Object convertToNetty() throws IOException { if (conf.lazyFileDescriptor()) { return new DefaultFileRegion(file, offset, length); } else { FileChannel fileChannel = new FileInputStream(file).getChannel(); return new DefaultFileRegion(fileChannel, offset, length); } }使用了DefaultFileRegion,这是一个Netty里传递文件使用零拷贝的方式,在FileRegion里是调用TransferTo进行零拷贝复制文件,关于零拷贝在这里不介绍了
public abstract long transferTo(WritableByteChannel paramWritableByteChannel, long paramLong) throws IOException;但是问题是encode的方法里返回的MessageWithHeader对象,并不是DefaultFileRegion
if (body != null) { // We transfer ownership of the reference on in.body() to MessageWithHeader. // This reference will be freed when MessageWithHeader.deallocate() is called. out.add(new MessageWithHeader(in.body(), header, body, bodyLength)); }
class MessageWithHeader extends AbstractReferenceCounted implements FileRegion原来是FileRegion,对Netty来说FileRegion最后调用的TransferTo进行传递
public long transferTo(final WritableByteChannel target, final long position) throws IOException { Preconditions.checkArgument(position == totalBytesTransferred, "Invalid position."); // Bytes written for header in this call. long writtenHeader = 0; if (header.readableBytes() > 0) { writtenHeader = copyByteBuf(header, target); totalBytesTransferred += writtenHeader; if (header.readableBytes() > 0) { return writtenHeader; } } // Bytes written for body in this call. long writtenBody = 0; if (body instanceof FileRegion) { writtenBody = ((FileRegion) body).transferTo(target, totalBytesTransferred - headerLength); } else if (body instanceof ByteBuf) { writtenBody = copyByteBuf((ByteBuf) body, target); } totalBytesTransferred += writtenBody; return writtenHeader + writtenBody; }
writtenBody = ((FileRegion) body).transferTo(target, totalBytesTransferred - headerLength);来传递文件,而其他的ByteBuf 直接写到Write的channel 里。
public static ChunkFetchSuccess decode(ByteBuf buf) { StreamChunkId streamChunkId = StreamChunkId.decode(buf); buf.retain(); NettyManagedBuffer managedBuf = new NettyManagedBuffer(buf.duplicate()); return new ChunkFetchSuccess(streamChunkId, managedBuf); }
Spark Shuffle(三)Executor是如何fetch shuffle的数据文件
标签:decode etl ever empty enable 基本 img 解码 .exe
原文地址:http://blog.csdn.net/raintungli/article/details/71411684