标签:puts ext repo send evel amp upd 查看 value
1)TimeStampedHashMap[BlockId,BlockInfo] 是否存在可drop的block=> 从info中获得StorageLevel
2)StorageLevel:是否可存入disk且DistStore中没有此文件=>调用DiskStore.putArray或者putBytes,将此Block存入disk。
3)MemoryStore清除block
4)getCurrentBlockStatus中此Block的tellMaster=true=>调用reportBlockStatus报告状态给BlockManagerMasterEndpoint
5)blockInfo中清除BlockId,并返回Block的状态。
def dropFromMemory(
blockId: BlockId,
data: () => Either[Array[Any], ByteBuffer]): Option[BlockStatus] = {
logInfo(s"Dropping block $blockId from memory")
val info = blockInfo.get(blockId).orNull
}
reportBlockStatus用于向BlockManagerMasterEndpoint报告Block的状态并且重新注册BlockManager。实现步骤如下:
1)调用tryToReportBlockStatus方法,tryToReportBlockStatus调用了BlockManagerMaster的updateBlockInfo方法向BlockManagerMasterEndpoint发送UpdateBlockInfo消息更新Block占用的内存大小、磁盘大小、存储级别等信息。
2)如果BlockManager还没有向BlockManagerMasterEndpoint注册,则调用asyncRegister方法,asyncRegister调用了reregister,最后reregister实际调用了BlockManagerMaster的registerBlockManager方法和reportAllBlocks方法,reportAllBlocks方法实际也调用了tryToReportBlockStatus。
putSignle方法用于将由一个对象构成的Block写入存储系统。putSingle经过层层调用,实际调用了doPut方法。
def putSingle(
blockId: BlockId,
value: Any,
level: StorageLevel,
tellMaster: Boolean = true): Seq[(BlockId, BlockStatus)] = {
putIterator(blockId, Iterator(value), level, tellMaster)
}
def putIterator(
blockId: BlockId,
values: Iterator[Any],
level: StorageLevel,
tellMaster: Boolean = true,
effectiveStorageLevel: Option[StorageLevel] = None): Seq[(BlockId, BlockStatus)] = {
require(values != null, "Values is null")
doPut(blockId, IteratorValues(values), level, tellMaster, effectiveStorageLevel)
}
putBytes方法用于将序列化字节组成的Block写入存储系统,putBytes实际也调用了doPut方法。
def putBytes(
blockId: BlockId,
bytes: ByteBuffer,
level: StorageLevel,
tellMaster: Boolean = true,
effectiveStorageLevel: Option[StorageLevel] = None): Seq[(BlockId, BlockStatus)] = {
require(bytes != null, "Bytes is null")
doPut(blockId, ByteBufferValues(bytes), level, tellMaster, effectiveStorageLevel)
}
doPut的处理流程:
1)获取putBlockInfo,如果blockInfo中已经缓存了BlockInfo,则使用缓存中的BlockInfo,否则使用新建的BlockInfo。
// Return value
val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
val putBlockInfo = {
val tinfo = new BlockInfo(level, tellMaster)
// Do atomically !
val oldBlockOpt = blockInfo.putIfAbsent(blockId, tinfo)
if (oldBlockOpt.isDefined) {
if (oldBlockOpt.get.waitForReady()) {
logWarning(s"Block $blockId already exists on this machine; not re-adding it")
return updatedBlocks
}
// TODO: So the block info exists - but previous attempt to load it (?) failed.
// What do we do now ? Retry on it ?
oldBlockOpt.get
} else {
tinfo
}
}
2)获取块最终使用的存储级别putLevel,根据putLevel判断块写入的BlockStore,优先使用MemoryStore,其次是OffHeap和DiskStore,依据data的实际包装类型,分别调用BlockStore不同的方法,如putIterator、putArray、putBytes等。
// The level we actually use to put the block
val putLevel = effectiveStorageLevel.getOrElse(level)
// If we‘re storing bytes, then initiate the replication before storing them locally.
// This is faster as data is already serialized and ready to send.
val replicationFuture = data match {
case b: ByteBufferValues if putLevel.replication > 1 =>
// Duplicate doesn‘t copy the bytes, but just creates a wrapper
val bufferView = b.buffer.duplicate()
Future {
// This is a blocking action and should run in futureExecutionContext which is a cached
// thread pool
replicate(blockId, bufferView, putLevel)
}(futureExecutionContext)
case _ => null
}
putBlockInfo.synchronized {
logTrace("Put for block %s took %s to get into synchronized block"
.format(blockId, Utils.getUsedTimeMs(startTimeMs)))
var marked = false
try {
// returnValues - Whether to return the values put
// blockStore - The type of storage to put these values into
val (returnValues, blockStore: BlockStore) = {
if (putLevel.useMemory) {
// Put it in memory first, even if it also has useDisk set to true;
// We will drop it to disk later if the memory store can‘t hold it.
(true, memoryStore)
} else if (putLevel.useOffHeap) {
// Use external block store
(false, externalBlockStore)
} else if (putLevel.useDisk) {
// Don‘t get back the bytes from put unless we replicate them
(putLevel.replication > 1, diskStore)
} else {
assert(putLevel == StorageLevel.NONE)
throw new BlockException(
blockId, s"Attempted to put block $blockId without specifying storage level!")
}
}
// Actually put the values
val result = data match {
case IteratorValues(iterator) =>
blockStore.putIterator(blockId, iterator, putLevel, returnValues)
case ArrayValues(array) =>
blockStore.putArray(blockId, array, putLevel, returnValues)
case ByteBufferValues(bytes) =>
bytes.rewind()
blockStore.putBytes(blockId, bytes, putLevel)
}
size = result.size
result.data match {
case Left (newIterator) if putLevel.useMemory => valuesAfterPut = newIterator
case Right (newBytes) => bytesAfterPut = newBytes
case _ =>
}
3)写入完毕后,将写入操作导致从内存drop掉的Block更新到updatedBlocks:ArrayBuffer[(BlockId,BlockStatus)]中,使用getCurrentBlockStatus获取写入Block的状态。将putBlockInfo设置为允许其他线程读取,调用reportBlockStatus将当前Block的信息更新到BlockManagerMasterEndpoint,最后将putBlockInfo添加到updatedBlocks中。
// Keep track of which blocks are dropped from memory
if (putLevel.useMemory) {
result.droppedBlocks.foreach { updatedBlocks += _ }
}
val putBlockStatus = getCurrentBlockStatus(blockId, putBlockInfo)
if (putBlockStatus.storageLevel != StorageLevel.NONE) {
// Now that the block is in either the memory, externalBlockStore, or disk store,
// let other threads read it, and tell the master about it.
marked = true
putBlockInfo.markReady(size)
if (tellMaster) {
reportBlockStatus(blockId, putBlockInfo, putBlockStatus)
}
updatedBlocks += ((blockId, putBlockStatus))
}
4)如果putlevel.replication大于1,则为了容错考虑,数据的备份数量大于1的时候,需要将Block的数据备份到其他节点上。
if (putLevel.replication > 1) {
}
标签:puts ext repo send evel amp upd 查看 value
原文地址:http://www.cnblogs.com/yumanman/p/7636441.html