标签:between test car pread any select element 对象 cores
?????在每个节点(driver和executor)上运行的管理器,该接口提供用于在本地和远程将block放置和检索到各种存储(内存,磁盘和堆外)的接口。
private[spark] class BlockManager(
executorId: String,
rpcEnv: RpcEnv,
val master: BlockManagerMaster,
serializerManager: SerializerManager,
val conf: SparkConf,
memoryManager: MemoryManager,
mapOutputTracker: MapOutputTracker,
shuffleManager: ShuffleManager,
val blockTransferService: BlockTransferService,
securityManager: SecurityManager,
numUsableCores: Int)
extends BlockDataManager with BlockEvictionHandler with Logging {
// 是否配置spark是否启用shuffle,提高性能
private[spark] val externalShuffleServiceEnabled =
conf.getBoolean("spark.shuffle.service.enabled", false)
val diskBlockManager = {
// 只有driver或者不开启shuffler的executor
val deleteFilesOnStop =
!externalShuffleServiceEnabled || executorId == SparkContext.DRIVER_IDENTIFIER
new DiskBlockManager(conf, deleteFilesOnStop)
}
// Visible for testing
private[storage] val blockInfoManager = new BlockInfoManager
private val futureExecutionContext = ExecutionContext.fromExecutorService(
ThreadUtils.newDaemonCachedThreadPool("block-manager-future", 128))
// 数据块实际存储的位置 memoryStore和diskStore
private[spark] val memoryStore =
new MemoryStore(conf, blockInfoManager, serializerManager, memoryManager, this)
private[spark] val diskStore = new DiskStore(conf, diskBlockManager)
memoryManager.setMemoryStore(memoryStore)
// 根据内存管理,maxMemory这个值很容易就达到
private val maxMemory = memoryManager.maxOnHeapStorageMemory
// shuffle程序使用的端口,如果使用的是yarn,可以通过hadoop配置来设置
private val externalShuffleServicePort = {
val tmpPort = Utils.getSparkOrYarnConfig(conf, "spark.shuffle.service.port", "7337").toInt
if (tmpPort == 0) {
// for testing, we set "spark.shuffle.service.port" to 0 in the yarn config, so yarn finds
// an open port. But we still need to tell our spark apps the right port to use. So
// only if the yarn config has the port set to 0, we prefer the value in the spark config
conf.get("spark.shuffle.service.port").toInt
} else {
tmpPort
}
}
var blockManagerId: BlockManagerId = _
// 进行shuffle的服务器地址
private[spark] var shuffleServerId: BlockManagerId = _
// 读取其他executor的shuffle文件,这是一个外部服务,或者是标准的BlockTransferService服务去连接其他executor
private[spark] val shuffleClient = if (externalShuffleServiceEnabled) {
val transConf = SparkTransportConf.fromSparkConf(conf, "shuffle", numUsableCores)
new ExternalShuffleClient(transConf, securityManager, securityManager.isAuthenticationEnabled(),
securityManager.isSaslEncryptionEnabled())
} else {
blockTransferService
}
// block manager 刷新从driver获得的block信息失败的最大次数
private val maxFailuresBeforeLocationRefresh =
conf.getInt("spark.block.failures.beforeLocationRefresh", 5)
private val slaveEndpoint = rpcEnv.setupEndpoint(
"BlockManagerEndpoint" + BlockManager.ID_GENERATOR.next,
new BlockManagerSlaveEndpoint(rpcEnv, this, mapOutputTracker))
// 等待的任务将会异步执行
// 使用asyncReregisterLock这个锁实现同步
private var asyncReregisterTask: Future[Unit] = null
private val asyncReregisterLock = new Object
// Field related to peer block managers that are necessary for block replication
@volatile private var cachedPeers: Seq[BlockManagerId] = _
private val peerFetchLock = new Object
private var lastPeerFetchTime = 0L
/**
* 使用给定的APPID初始化blockmanager。这个操作不会在构造函数中执行,因为
* AppId在实例化的时候可能还未知(特别是driver,只有在TaskScheduler注册之后才知道)
* 这个方法将会初始化BlockTransferService和ShuffleClient,并向BlockManagerMaster注册,启动BlockManagerWorker,如果有配置shuffle,并向shuffle注册
*/
def initialize(appId: String): Unit = {
blockTransferService.init(this)
shuffleClient.init(appId)
blockManagerId = BlockManagerId(
executorId, blockTransferService.hostName, blockTransferService.port)
shuffleServerId = if (externalShuffleServiceEnabled) {
logInfo(s"external shuffle service port = $externalShuffleServicePort")
BlockManagerId(executorId, blockTransferService.hostName, externalShuffleServicePort)
} else {
blockManagerId
}
master.registerBlockManager(blockManagerId, maxMemory, slaveEndpoint)
// Register Executors‘ configuration with the local shuffle service, if one should exist.
if (externalShuffleServiceEnabled && !blockManagerId.isDriver) {
registerWithExternalShuffleServer()
}
}
private def registerWithExternalShuffleServer() {
logInfo("Registering executor with local external shuffle service.")
val shuffleConfig = new ExecutorShuffleInfo(
diskBlockManager.localDirs.map(_.toString),
diskBlockManager.subDirsPerLocalDir,
shuffleManager.getClass.getName)
val MAX_ATTEMPTS = 3
val SLEEP_TIME_SECS = 5
for (i <- 1 to MAX_ATTEMPTS) {
try {
// Synchronous and will throw an exception if we cannot connect.
shuffleClient.asInstanceOf[ExternalShuffleClient].registerWithShuffleServer(
shuffleServerId.host, shuffleServerId.port, shuffleServerId.executorId, shuffleConfig)
return
} catch {
case e: Exception if i < MAX_ATTEMPTS =>
logError(s"Failed to connect to external shuffle server, will retry ${MAX_ATTEMPTS - i}"
+ s" more times after waiting $SLEEP_TIME_SECS seconds...", e)
Thread.sleep(SLEEP_TIME_SECS * 1000)
}
}
}
/**
* 向blockManager上报所有的块信息。这个是很重要的,比如在一个executor崩溃之后恢复块等情况
如果master返回false这个方法就会失败(表明slave需要重新注册),心跳检测或者新的块信息上报时将会发现这个错误信息,然后将会尝试重新注册所有块信息
*/
private def reportAllBlocks(): Unit = {
logInfo(s"Reporting ${blockInfoManager.size} blocks to the master.")
for ((blockId, info) <- blockInfoManager.entries) {
// 获取块信息
val status = getCurrentBlockStatus(blockId, info)
// 更新块信息
if (!tryToReportBlockStatus(blockId, info, status)) {
logError(s"Failed to report $blockId to master; giving up.")
return
}
}
}
/**
* 重新向master注册,并上报块信息。如果心跳检测发现表明块没有注册,心跳检测线程将会调用这个方法
*/
def reregister(): Unit = {
// TODO: We might need to rate limit re-registering.
logInfo(s"BlockManager $blockManagerId re-registering with master")
master.registerBlockManager(blockManagerId, maxMemory, slaveEndpoint)
reportAllBlocks()
}
/**
* 同步注册
*/
private def asyncReregister(): Unit = {
asyncReregisterLock.synchronized {
if (asyncReregisterTask == null) {
asyncReregisterTask = Future[Unit] {
// This is a blocking action and should run in futureExecutionContext which is a cached
// thread pool
reregister()
asyncReregisterLock.synchronized {
asyncReregisterTask = null
}
}(futureExecutionContext)
}
}
}
/**
* For testing. Wait for any pending asynchronous re-registration; otherwise, do nothing.
*/
def waitForAsyncReregister(): Unit = {
val task = asyncReregisterTask
if (task != null) {
try {
Await.ready(task, Duration.Inf)
} catch {
case NonFatal(t) =>
throw new Exception("Error occurred while waiting for async. reregistration", t)
}
}
}
/**
* 获取块数据的接口,如果没有抛出异常
*/
override def getBlockData(blockId: BlockId): ManagedBuffer = {
if (blockId.isShuffle) {
// shuflle,IndexShuffleBlockResolver shuffleManager.shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId])
} else {
getLocalBytes(blockId) match {
case Some(buffer) => new BlockManagerManagedBuffer(blockInfoManager, blockId, buffer)
case None => throw new BlockNotFoundException(blockId.toString)
}
}
}
/**
* 使用给定的存储级别存储数据
*/
override def putBlockData(
blockId: BlockId,
data: ManagedBuffer,
level: StorageLevel,
classTag: ClassTag[_]): Boolean = {
putBytes(blockId, new ChunkedByteBuffer(data.nioByteBuffer()), level)(classTag)
}
/**
* 获取给定的id块的状态
*/
def getStatus(blockId: BlockId): Option[BlockStatus] = {
blockInfoManager.get(blockId).map { info =>
val memSize = if (memoryStore.contains(blockId)) memoryStore.getSize(blockId) else 0L
val diskSize = if (diskStore.contains(blockId)) diskStore.getSize(blockId) else 0L
BlockStatus(info.level, memSize = memSize, diskSize = diskSize)
}
}
/**
* 根据条件过滤出满足条件的block id,这个方法也会查询在diskblockmanager中的内容
*/
def getMatchingBlockIds(filter: BlockId => Boolean): Seq[BlockId] = {
// The `toArray` is necessary here in order to force the list to be materialized so that we
// don‘t try to serialize a lazy iterator when responding to client requests.
(blockInfoManager.entries.map(_._1) ++ diskBlockManager.getAllBlocks())
.filter(filter)
.toArray
.toSeq
}
/**
* 向blockmanagermaster上报这个数据块的存储状态,这个方法将会发送一个反应block当前状态的更新消息,而不是这个块信息中所需的存储级别。比如一个block设置了存储级别MEMORY_AND_DISK,但是现在可能现在只存储在磁盘上。
droppedMemorySize这个参数存在时,可以将数据从内存刷新到磁盘。
*/
private def reportBlockStatus(
blockId: BlockId,
info: BlockInfo,
status: BlockStatus,
droppedMemorySize: Long = 0L): Unit = {
val needReregister = !tryToReportBlockStatus(blockId, info, status, droppedMemorySize)
if (needReregister) {
logInfo(s"Got told to re-register updating block $blockId")
// Re-registering will report our new block for free.
asyncReregister()
}
logDebug(s"Told master about block $blockId")
}
/**
* 实际上发送了UpdateBlockInfo这个消息,接收返回blockmanagermaster的回复,如果成功记录了这条记录返回true,需要重新注册返回false
*/
private def tryToReportBlockStatus(
blockId: BlockId,
info: BlockInfo,
status: BlockStatus,
droppedMemorySize: Long = 0L): Boolean = {
if (info.tellMaster) {
val storageLevel = status.storageLevel
val inMemSize = Math.max(status.memSize, droppedMemorySize)
val onDiskSize = status.diskSize
master.updateBlockInfo(blockManagerId, blockId, storageLevel, inMemSize, onDiskSize)
} else {
true
}
}
/**
* 返回给定数据块的存储状态。如果数据已经刷新到磁盘,返回最新的存储级别并更新内存和磁盘的数据大小
*/
private def getCurrentBlockStatus(blockId: BlockId, info: BlockInfo): BlockStatus = {
info.synchronized {
info.level match {
case null =>
BlockStatus(StorageLevel.NONE, memSize = 0L, diskSize = 0L)
case level =>
val inMem = level.useMemory && memoryStore.contains(blockId)
val onDisk = level.useDisk && diskStore.contains(blockId)
val deserialized = if (inMem) level.deserialized else false
val replication = if (inMem || onDisk) level.replication else 1
val storageLevel = StorageLevel(
useDisk = onDisk,
useMemory = inMem,
useOffHeap = level.useOffHeap,
deserialized = deserialized,
replication = replication)
val memSize = if (inMem) memoryStore.getSize(blockId) else 0L
val diskSize = if (onDisk) diskStore.getSize(blockId) else 0L
BlockStatus(storageLevel, memSize, diskSize)
}
}
}
/**
* 获取block的位置
*/
private def getLocationBlockIds(blockIds: Array[BlockId]): Array[Seq[BlockManagerId]] = {
val startTimeMs = System.currentTimeMillis
val locations = master.getLocations(blockIds).toArray
logDebug("Got multiple block location in %s".format(Utils.getUsedTimeMs(startTimeMs)))
locations
}
/**
* 本地读取失败时运行清理代码
* 移除block时必须获得block的读锁
*/
private def handleLocalReadFailure(blockId: BlockId): Nothing = {
// 释放锁
releaseLock(blockId)
// 移除丢失的block
removeBlock(blockId)
throw new SparkException(s"Block $blockId was not found even though it‘s read-locked")
}
/**
* 从blockmanager中获取block
*/
def getLocalValues(blockId: BlockId): Option[BlockResult] = {
logDebug(s"Getting local block $blockId")
blockInfoManager.lockForReading(blockId) match {
case None =>
logDebug(s"Block $blockId was not found")
None
case Some(info) =>
val level = info.level
logDebug(s"Level for block $blockId is $level")
// 如果还在内存中
if (level.useMemory && memoryStore.contains(blockId)) {
val iter: Iterator[Any] = if (level.deserialized) {
memoryStore.getValues(blockId).get
} else {
serializerManager.dataDeserializeStream(
blockId, memoryStore.getBytes(blockId).get.toInputStream())(info.classTag)
}
val ci = CompletionIterator[Any, Iterator[Any]](iter, releaseLock(blockId))
Some(new BlockResult(ci, DataReadMethod.Memory, info.size))
} else if (level.useDisk && diskStore.contains(blockId)) { // 已经刷新到磁盘
val iterToReturn: Iterator[Any] = {
val diskBytes = diskStore.getBytes(blockId)
if (level.deserialized) {
val diskValues = serializerManager.dataDeserializeStream(
blockId,
diskBytes.toInputStream(dispose = true))(info.classTag)
maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
} else {
val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskBytes)
.map {_.toInputStream(dispose = false)}
.getOrElse { diskBytes.toInputStream(dispose = true) }
serializerManager.dataDeserializeStream(blockId, stream)(info.classTag)
}
}
val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, releaseLock(blockId))
Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
} else {
// 内存和磁盘都没有读取失败
handleLocalReadFailure(blockId)
}
}
}
/**
* 从本地blockmanager中获取块的序列化字节
*/
def getLocalBytes(blockId: BlockId): Option[ChunkedByteBuffer] = {
logDebug(s"Getting local block $blockId as bytes")
// As an optimization for map output fetches, if the block is for a shuffle, return it
// without acquiring a lock; the disk store never deletes (recent) items so this should work
if (blockId.isShuffle) {
val shuffleBlockResolver = shuffleManager.shuffleBlockResolver
// TODO: This should gracefully handle case where local block is not available. Currently
// downstream code will throw an exception.
Option(
new ChunkedByteBuffer(
shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId]).nioByteBuffer()))
} else {
blockInfoManager.lockForReading(blockId).map { info => doGetLocalBytes(blockId, info) }
}
}
/**
* 从本地blockmanager中获取块的序列化字节
*
* 调用这个方法时必须获取这个block的读锁,持有锁到运行成功,如果出现异常释放锁
*/
private def doGetLocalBytes(blockId: BlockId, info: BlockInfo): ChunkedByteBuffer = {
val level = info.level
logDebug(s"Level for block $blockId is $level")
// 按序读取块,首先从内存中,然后磁盘中,然后退回到序列化内存对象中,如果块不存在,抛出异常
if (level.deserialized) {
// 从磁盘中读取反序列化的副本
if (level.useDisk && diskStore.contains(blockId)) {
diskStore.getBytes(blockId)
} else if (level.useMemory && memoryStore.contains(blockId)) {
// 如果从磁盘中没有找到,到内存中
serializerManager.dataSerialize(blockId, memoryStore.getValues(blockId).get)
} else {
handleLocalReadFailure(blockId)
}
} else { // storage level is serialized
if (level.useMemory && memoryStore.contains(blockId)) {
memoryStore.getBytes(blockId).get
} else if (level.useDisk && diskStore.contains(blockId)) {
val diskBytes = diskStore.getBytes(blockId)
maybeCacheDiskBytesInMemory(info, blockId, level, diskBytes).getOrElse(diskBytes)
} else {
handleLocalReadFailure(blockId)
}
}
}
/**
* 从远程blockmanager获取block
*/
private def getRemoteValues(blockId: BlockId): Option[BlockResult] = {
getRemoteBytes(blockId).map { data =>
val values =
serializerManager.dataDeserializeStream(blockId, data.toInputStream(dispose = true))
new BlockResult(values, DataReadMethod.Network, data.size)
}
}
/**
* 返回给定block的地址列表,优先处理本地的
*/
private def getLocations(blockId: BlockId): Seq[BlockManagerId] = {
val locs = Random.shuffle(master.getLocations(blockId))
val (preferredLocs, otherLocs) = locs.partition { loc => blockManagerId.host == loc.host }
preferredLocs ++ otherLocs
}
/**
* 用远程的blcokmanager中以序列化字节方式获取block数据
*/
def getRemoteBytes(blockId: BlockId): Option[ChunkedByteBuffer] = {
logDebug(s"Getting remote block $blockId")
require(blockId != null, "BlockId is null")
var runningFailureCount = 0
var totalFailureCount = 0
val locations = getLocations(blockId)
val maxFetchFailures = locations.size
var locationIterator = locations.iterator
while (locationIterator.hasNext) {
val loc = locationIterator.next()
logDebug(s"Getting remote block $blockId from $loc")
val data = try {
blockTransferService.fetchBlockSync(
loc.host, loc.port, loc.executorId, blockId.toString).nioByteBuffer()
} catch {
case NonFatal(e) =>
runningFailureCount += 1
totalFailureCount += 1
if (totalFailureCount >= maxFetchFailures) {
// Give up trying anymore locations. Either we‘ve tried all of the original locations,
// or we‘ve refreshed the list of locations from the master, and have still
// hit failures after trying locations from the refreshed list.
throw new BlockFetchException(s"Failed to fetch block after" +
s" ${totalFailureCount} fetch failures. Most recent failure cause:", e)
}
logWarning(s"Failed to fetch remote block $blockId " +
s"from $loc (failed attempt $runningFailureCount)", e)
// If there is a large number of executors then locations list can contain a
// large number of stale entries causing a large number of retries that may
// take a significant amount of time. To get rid of these stale entries
// we refresh the block locations after a certain number of fetch failures
if (runningFailureCount >= maxFailuresBeforeLocationRefresh) {
locationIterator = getLocations(blockId).iterator
logDebug(s"Refreshed locations from the driver " +
s"after ${runningFailureCount} fetch failures.")
runningFailureCount = 0
}
// This location failed, so we retry fetch from a different one by returning null here
null
}
if (data != null) {
return Some(new ChunkedByteBuffer(data))
}
logDebug(s"The value of block $blockId is null")
}
logDebug(s"Block $blockId not found")
None
}
/**
* 从blockmanager获取block
如果块存储在本地,那么会获取block的读锁;如果从远程blockmananger中获取,则不用获取读锁,
*/
def get(blockId: BlockId): Option[BlockResult] = {
val local = getLocalValues(blockId)
if (local.isDefined) {
logInfo(s"Found block $blockId locally")
return local
}
val remote = getRemoteValues(blockId)
if (remote.isDefined) {
logInfo(s"Found block $blockId remotely")
return remote
}
None
}
/**
* Downgrades an exclusive write lock to a shared read lock.
*/
def downgradeLock(blockId: BlockId): Unit = {
blockInfoManager.downgradeLock(blockId)
}
/**
* Release a lock on the given block.
*/
def releaseLock(blockId: BlockId): Unit = {
blockInfoManager.unlock(blockId)
}
/**
* 向blockmanager注册task
*/
def registerTask(taskAttemptId: Long): Unit = {
blockInfoManager.registerTask(taskAttemptId)
}
/**
* Release all locks for the given task.
*
* @return the blocks whose locks were released.
*/
def releaseAllLocksForTask(taskAttemptId: Long): Seq[BlockId] = {
blockInfoManager.releaseAllLocksForTask(taskAttemptId)
}
/**
* 检索给定的block,如果不存在就调用makeIterator这个方法计算block并缓存返回这个block
*/
def getOrElseUpdate[T](
blockId: BlockId,
level: StorageLevel,
classTag: ClassTag[T],
makeIterator: () => Iterator[T]): Either[BlockResult, Iterator[T]] = {
// Attempt to read the block from local or remote storage. If it‘s present, then we don‘t need
// to go through the local-get-or-put path.
get(blockId) match {
case Some(block) =>
return Left(block)
case _ =>
// Need to compute the block.
}
// Initially we hold no locks on this block.
doPutIterator(blockId, makeIterator, level, classTag, keepReadLock = true) match {
case None =>
// doPut() didn‘t hand work back to us, so the block already existed or was successfully
// stored. Therefore, we now hold a read lock on the block.
val blockResult = getLocalValues(blockId).getOrElse {
// Since we held a read lock between the doPut() and get() calls, the block should not
// have been evicted, so get() not returning the block indicates some internal error.
releaseLock(blockId)
throw new SparkException(s"get() failed for block $blockId even though we held a lock")
}
// We already hold a read lock on the block from the doPut() call and getLocalValues()
// acquires the lock again, so we need to call releaseLock() here so that the net number
// of lock acquisitions is 1 (since the caller will only call release() once).
releaseLock(blockId)
Left(blockResult)
case Some(iter) =>
// The put failed, likely because the data was too large to fit in memory and could not be
// dropped to disk. Therefore, we need to pass the input iterator back to the caller so
// that they can decide what to do with the values (e.g. process them without caching).
Right(iter)
}
}
/**
* @return true if the block was stored or false if an error occurred.
*/
def putIterator[T: ClassTag](
blockId: BlockId,
values: Iterator[T],
level: StorageLevel,
tellMaster: Boolean = true): Boolean = {
require(values != null, "Values is null")
doPutIterator(blockId, () => values, level, implicitly[ClassTag[T]], tellMaster) match {
case None =>
true
case Some(iter) =>
// Caller doesn‘t care about the iterator values, so we can close the iterator here
// to free resources earlier
iter.close()
false
}
}
/**
* 获取block的写入流,将数据直接写入到磁盘
*/
def getDiskWriter(
blockId: BlockId,
file: File,
serializerInstance: SerializerInstance,
bufferSize: Int,
writeMetrics: ShuffleWriteMetrics): DiskBlockObjectWriter = {
val compressStream: OutputStream => OutputStream =
serializerManager.wrapForCompression(blockId, _)
val syncWrites = conf.getBoolean("spark.shuffle.sync", false)
new DiskBlockObjectWriter(file, serializerInstance, bufferSize, compressStream,
syncWrites, writeMetrics, blockId)
}
/**
* 将新序列化的字节写入到blockmanager
*/
def putBytes[T: ClassTag](
blockId: BlockId,
bytes: ChunkedByteBuffer,
level: StorageLevel,
tellMaster: Boolean = true): Boolean = {
require(bytes != null, "Bytes is null")
doPutBytes(blockId, bytes, level, implicitly[ClassTag[T]], tellMaster)
}
/**
* 根据给定的级别将给定的字节写入块中,如果块已经存在,数据不会覆盖
*/
private def doPutBytes[T](
blockId: BlockId,
bytes: ChunkedByteBuffer,
level: StorageLevel,
classTag: ClassTag[T],
tellMaster: Boolean = true,
keepReadLock: Boolean = false): Boolean = {
doPut(blockId, level, classTag, tellMaster = tellMaster, keepReadLock = keepReadLock) { info =>
val startTimeMs = System.currentTimeMillis
// 因为存储字节,在本地存储之前需要初始化副本
// This is faster as data is already serialized and ready to send.
val replicationFuture = if (level.replication > 1) {
Future {
// This is a blocking action and should run in futureExecutionContext which is a cached
// thread pool
replicate(blockId, bytes, level, classTag)
}(futureExecutionContext)
} else {
null
}
val size = bytes.size
if (level.useMemory) {
// 先存储到内存中
val putSucceeded = if (level.deserialized) {
val values =
serializerManager.dataDeserializeStream(blockId, bytes.toInputStream())(classTag)
memoryStore.putIteratorAsValues(blockId, values, classTag) match {
case Right(_) => true
case Left(iter) =>
// If putting deserialized values in memory failed, we will put the bytes directly to
// disk, so we don‘t need this iterator and can close it to free resources earlier.
iter.close()
false
}
} else {
memoryStore.putBytes(blockId, size, level.memoryMode, () => bytes)
}
if (!putSucceeded && level.useDisk) {
logWarning(s"Persisting block $blockId to disk instead.")
diskStore.putBytes(blockId, bytes)
}
} else if (level.useDisk) {
diskStore.putBytes(blockId, bytes)
}
val putBlockStatus = getCurrentBlockStatus(blockId, info)
val blockWasSuccessfullyStored = putBlockStatus.storageLevel.isValid
if (blockWasSuccessfullyStored) {
// Now that the block is in either the memory, externalBlockStore, or disk store,
// tell the master about it.
info.size = size
if (tellMaster) {
reportBlockStatus(blockId, info, putBlockStatus)
}
Option(TaskContext.get()).foreach { c =>
c.taskMetrics().incUpdatedBlockStatuses(blockId -> putBlockStatus)
}
}
logDebug("Put block %s locally took %s".format(blockId, Utils.getUsedTimeMs(startTimeMs)))
if (level.replication > 1) {
// Wait for asynchronous replication to finish
try {
Await.ready(replicationFuture, Duration.Inf)
} catch {
case NonFatal(t) =>
throw new Exception("Error occurred while waiting for replication to finish", t)
}
}
if (blockWasSuccessfullyStored) {
None
} else {
Some(bytes)
}
}.isEmpty
}
/**
* Helper method used to abstract common code from [[doPutBytes()]] and [[doPutIterator()]].
*
* @param putBody a function which attempts the actual put() and returns None on success
* or Some on failure.
*/
private def doPut[T](
blockId: BlockId,
level: StorageLevel,
classTag: ClassTag[_],
tellMaster: Boolean,
keepReadLock: Boolean)(putBody: BlockInfo => Option[T]): Option[T] = {
require(blockId != null, "BlockId is null")
require(level != null && level.isValid, "StorageLevel is null or invalid")
val putBlockInfo = {
val newInfo = new BlockInfo(level, classTag, tellMaster)
if (blockInfoManager.lockNewBlockForWriting(blockId, newInfo)) {
newInfo
} else {
logWarning(s"Block $blockId already exists on this machine; not re-adding it")
if (!keepReadLock) {
// lockNewBlockForWriting returned a read lock on the existing block, so we must free it:
releaseLock(blockId)
}
return None
}
}
val startTimeMs = System.currentTimeMillis
var blockWasSuccessfullyStored: Boolean = false
val result: Option[T] = try {
val res = putBody(putBlockInfo)
blockWasSuccessfullyStored = res.isEmpty
res
} finally {
if (blockWasSuccessfullyStored) {
if (keepReadLock) {
blockInfoManager.downgradeLock(blockId)
} else {
blockInfoManager.unlock(blockId)
}
} else {
blockInfoManager.removeBlock(blockId)
logWarning(s"Putting block $blockId failed")
}
}
if (level.replication > 1) {
logDebug("Putting block %s with replication took %s"
.format(blockId, Utils.getUsedTimeMs(startTimeMs)))
} else {
logDebug("Putting block %s without replication took %s"
.format(blockId, Utils.getUsedTimeMs(startTimeMs)))
}
result
}
/**
* Put the given block according to the given level in one of the block stores, replicating
* the values if necessary.
*
* If the block already exists, this method will not overwrite it.
*
* @param keepReadLock if true, this method will hold the read lock when it returns (even if the
* block already exists). If false, this method will hold no locks when it
* returns.
* @return None if the block was already present or if the put succeeded, or Some(iterator)
* if the put failed.
*/
private def doPutIterator[T](
blockId: BlockId,
iterator: () => Iterator[T],
level: StorageLevel,
classTag: ClassTag[T],
tellMaster: Boolean = true,
keepReadLock: Boolean = false): Option[PartiallyUnrolledIterator[T]] = {
doPut(blockId, level, classTag, tellMaster = tellMaster, keepReadLock = keepReadLock) { info =>
val startTimeMs = System.currentTimeMillis
var iteratorFromFailedMemoryStorePut: Option[PartiallyUnrolledIterator[T]] = None
// Size of the block in bytes
var size = 0L
if (level.useMemory) {
// Put it in memory first, even if it also has useDisk set to true;
// We will drop it to disk later if the memory store can‘t hold it.
if (level.deserialized) {
memoryStore.putIteratorAsValues(blockId, iterator(), classTag) match {
case Right(s) =>
size = s
case Left(iter) =>
// Not enough space to unroll this block; drop to disk if applicable
if (level.useDisk) {
logWarning(s"Persisting block $blockId to disk instead.")
diskStore.put(blockId) { fileOutputStream =>
serializerManager.dataSerializeStream(blockId, fileOutputStream, iter)(classTag)
}
size = diskStore.getSize(blockId)
} else {
iteratorFromFailedMemoryStorePut = Some(iter)
}
}
} else { // !level.deserialized
memoryStore.putIteratorAsBytes(blockId, iterator(), classTag, level.memoryMode) match {
case Right(s) =>
size = s
case Left(partiallySerializedValues) =>
// Not enough space to unroll this block; drop to disk if applicable
if (level.useDisk) {
logWarning(s"Persisting block $blockId to disk instead.")
diskStore.put(blockId) { fileOutputStream =>
partiallySerializedValues.finishWritingToStream(fileOutputStream)
}
size = diskStore.getSize(blockId)
} else {
iteratorFromFailedMemoryStorePut = Some(partiallySerializedValues.valuesIterator)
}
}
}
} else if (level.useDisk) {
diskStore.put(blockId) { fileOutputStream =>
serializerManager.dataSerializeStream(blockId, fileOutputStream, iterator())(classTag)
}
size = diskStore.getSize(blockId)
}
val putBlockStatus = getCurrentBlockStatus(blockId, info)
val blockWasSuccessfullyStored = putBlockStatus.storageLevel.isValid
if (blockWasSuccessfullyStored) {
// Now that the block is in either the memory, externalBlockStore, or disk store,
// tell the master about it.
info.size = size
if (tellMaster) {
reportBlockStatus(blockId, info, putBlockStatus)
}
Option(TaskContext.get()).foreach { c =>
c.taskMetrics().incUpdatedBlockStatuses(blockId -> putBlockStatus)
}
logDebug("Put block %s locally took %s".format(blockId, Utils.getUsedTimeMs(startTimeMs)))
if (level.replication > 1) {
val remoteStartTime = System.currentTimeMillis
val bytesToReplicate = doGetLocalBytes(blockId, info)
try {
replicate(blockId, bytesToReplicate, level, classTag)
} finally {
bytesToReplicate.dispose()
}
logDebug("Put block %s remotely took %s"
.format(blockId, Utils.getUsedTimeMs(remoteStartTime)))
}
}
assert(blockWasSuccessfullyStored == iteratorFromFailedMemoryStorePut.isEmpty)
iteratorFromFailedMemoryStorePut
}
}
/**
* Attempts to cache spilled bytes read from disk into the MemoryStore in order to speed up
* subsequent reads. This method requires the caller to hold a read lock on the block.
*
* @return a copy of the bytes from the memory store if the put succeeded, otherwise None.
* If this returns bytes from the memory store then the original disk store bytes will
* automatically be disposed and the caller should not continue to use them. Otherwise,
* if this returns None then the original disk store bytes will be unaffected.
*/
private def maybeCacheDiskBytesInMemory(
blockInfo: BlockInfo,
blockId: BlockId,
level: StorageLevel,
diskBytes: ChunkedByteBuffer): Option[ChunkedByteBuffer] = {
require(!level.deserialized)
if (level.useMemory) {
// Synchronize on blockInfo to guard against a race condition where two readers both try to
// put values read from disk into the MemoryStore.
blockInfo.synchronized {
if (memoryStore.contains(blockId)) {
diskBytes.dispose()
Some(memoryStore.getBytes(blockId).get)
} else {
val allocator = level.memoryMode match {
case MemoryMode.ON_HEAP => ByteBuffer.allocate _
case MemoryMode.OFF_HEAP => Platform.allocateDirectBuffer _
}
val putSucceeded = memoryStore.putBytes(blockId, diskBytes.size, level.memoryMode, () => {
// https://issues.apache.org/jira/browse/SPARK-6076
// If the file size is bigger than the free memory, OOM will happen. So if we
// cannot put it into MemoryStore, copyForMemory should not be created. That‘s why
// this action is put into a `() => ChunkedByteBuffer` and created lazily.
diskBytes.copy(allocator)
})
if (putSucceeded) {
diskBytes.dispose()
Some(memoryStore.getBytes(blockId).get)
} else {
None
}
}
}
} else {
None
}
}
/**
* Attempts to cache spilled values read from disk into the MemoryStore in order to speed up
* subsequent reads. This method requires the caller to hold a read lock on the block.
*
* @return a copy of the iterator. The original iterator passed this method should no longer
* be used after this method returns.
*/
private def maybeCacheDiskValuesInMemory[T](
blockInfo: BlockInfo,
blockId: BlockId,
level: StorageLevel,
diskIterator: Iterator[T]): Iterator[T] = {
require(level.deserialized)
val classTag = blockInfo.classTag.asInstanceOf[ClassTag[T]]
if (level.useMemory) {
// Synchronize on blockInfo to guard against a race condition where two readers both try to
// put values read from disk into the MemoryStore.
blockInfo.synchronized {
if (memoryStore.contains(blockId)) {
// Note: if we had a means to discard the disk iterator, we would do that here.
memoryStore.getValues(blockId).get
} else {
memoryStore.putIteratorAsValues(blockId, diskIterator, classTag) match {
case Left(iter) =>
// The memory store put() failed, so it returned the iterator back to us:
iter
case Right(_) =>
// The put() succeeded, so we can read the values back:
memoryStore.getValues(blockId).get
}
}
}.asInstanceOf[Iterator[T]]
} else {
diskIterator
}
}
/**
* Get peer block managers in the system.
*/
private def getPeers(forceFetch: Boolean): Seq[BlockManagerId] = {
peerFetchLock.synchronized {
val cachedPeersTtl = conf.getInt("spark.storage.cachedPeersTtl", 60 * 1000) // milliseconds
val timeout = System.currentTimeMillis - lastPeerFetchTime > cachedPeersTtl
if (cachedPeers == null || forceFetch || timeout) {
cachedPeers = master.getPeers(blockManagerId).sortBy(_.hashCode)
lastPeerFetchTime = System.currentTimeMillis
logDebug("Fetched peers from master: " + cachedPeers.mkString("[", ",", "]"))
}
cachedPeers
}
}
/**
* Replicate block to another node. Not that this is a blocking call that returns after
* the block has been replicated.
*/
private def replicate(
blockId: BlockId,
data: ChunkedByteBuffer,
level: StorageLevel,
classTag: ClassTag[_]): Unit = {
val maxReplicationFailures = conf.getInt("spark.storage.maxReplicationFailures", 1)
val numPeersToReplicateTo = level.replication - 1
val peersForReplication = new ArrayBuffer[BlockManagerId]
val peersReplicatedTo = new ArrayBuffer[BlockManagerId]
val peersFailedToReplicateTo = new ArrayBuffer[BlockManagerId]
val tLevel = StorageLevel(
useDisk = level.useDisk,
useMemory = level.useMemory,
useOffHeap = level.useOffHeap,
deserialized = level.deserialized,
replication = 1)
val startTime = System.currentTimeMillis
val random = new Random(blockId.hashCode)
var replicationFailed = false
var failures = 0
var done = false
// Get cached list of peers
peersForReplication ++= getPeers(forceFetch = false)
// Get a random peer. Note that this selection of a peer is deterministic on the block id.
// So assuming the list of peers does not change and no replication failures,
// if there are multiple attempts in the same node to replicate the same block,
// the same set of peers will be selected.
def getRandomPeer(): Option[BlockManagerId] = {
// If replication had failed, then force update the cached list of peers and remove the peers
// that have been already used
if (replicationFailed) {
peersForReplication.clear()
peersForReplication ++= getPeers(forceFetch = true)
peersForReplication --= peersReplicatedTo
peersForReplication --= peersFailedToReplicateTo
}
if (!peersForReplication.isEmpty) {
Some(peersForReplication(random.nextInt(peersForReplication.size)))
} else {
None
}
}
// One by one choose a random peer and try uploading the block to it
// If replication fails (e.g., target peer is down), force the list of cached peers
// to be re-fetched from driver and then pick another random peer for replication. Also
// temporarily black list the peer for which replication failed.
//
// This selection of a peer and replication is continued in a loop until one of the
// following 3 conditions is fulfilled:
// (i) specified number of peers have been replicated to
// (ii) too many failures in replicating to peers
// (iii) no peer left to replicate to
//
while (!done) {
getRandomPeer() match {
case Some(peer) =>
try {
val onePeerStartTime = System.currentTimeMillis
logTrace(s"Trying to replicate $blockId of ${data.size} bytes to $peer")
blockTransferService.uploadBlockSync(
peer.host,
peer.port,
peer.executorId,
blockId,
new NettyManagedBuffer(data.toNetty),
tLevel,
classTag)
logTrace(s"Replicated $blockId of ${data.size} bytes to $peer in %s ms"
.format(System.currentTimeMillis - onePeerStartTime))
peersReplicatedTo += peer
peersForReplication -= peer
replicationFailed = false
if (peersReplicatedTo.size == numPeersToReplicateTo) {
done = true // specified number of peers have been replicated to
}
} catch {
case e: Exception =>
logWarning(s"Failed to replicate $blockId to $peer, failure #$failures", e)
failures += 1
replicationFailed = true
peersFailedToReplicateTo += peer
if (failures > maxReplicationFailures) { // too many failures in replicating to peers
done = true
}
}
case None => // no peer left to replicate to
done = true
}
}
val timeTakeMs = (System.currentTimeMillis - startTime)
logDebug(s"Replicating $blockId of ${data.size} bytes to " +
s"${peersReplicatedTo.size} peer(s) took $timeTakeMs ms")
if (peersReplicatedTo.size < numPeersToReplicateTo) {
logWarning(s"Block $blockId replicated to only " +
s"${peersReplicatedTo.size} peer(s) instead of $numPeersToReplicateTo peers")
}
}
/**
* Read a block consisting of a single object.
*/
def getSingle(blockId: BlockId): Option[Any] = {
get(blockId).map(_.data.next())
}
/**
* Write a block consisting of a single object.
*
* @return true if the block was stored or false if the block was already stored or an
* error occurred.
*/
def putSingle[T: ClassTag](
blockId: BlockId,
value: T,
level: StorageLevel,
tellMaster: Boolean = true): Boolean = {
putIterator(blockId, Iterator(value), level, tellMaster)
}
/**
* Drop a block from memory, possibly putting it on disk if applicable. Called when the memory
* store reaches its limit and needs to free up space.
*
* If `data` is not put on disk, it won‘t be created.
*
* The caller of this method must hold a write lock on the block before calling this method.
* This method does not release the write lock.
*
* @return the block‘s new effective StorageLevel.
*/
private[storage] override def dropFromMemory[T: ClassTag](
blockId: BlockId,
data: () => Either[Array[T], ChunkedByteBuffer]): StorageLevel = {
logInfo(s"Dropping block $blockId from memory")
val info = blockInfoManager.assertBlockIsLockedForWriting(blockId)
var blockIsUpdated = false
val level = info.level
// Drop to disk, if storage level requires
if (level.useDisk && !diskStore.contains(blockId)) {
logInfo(s"Writing block $blockId to disk")
data() match {
case Left(elements) =>
diskStore.put(blockId) { fileOutputStream =>
serializerManager.dataSerializeStream(
blockId,
fileOutputStream,
elements.toIterator)(info.classTag.asInstanceOf[ClassTag[T]])
}
case Right(bytes) =>
diskStore.putBytes(blockId, bytes)
}
blockIsUpdated = true
}
// Actually drop from memory store
val droppedMemorySize =
if (memoryStore.contains(blockId)) memoryStore.getSize(blockId) else 0L
val blockIsRemoved = memoryStore.remove(blockId)
if (blockIsRemoved) {
blockIsUpdated = true
} else {
logWarning(s"Block $blockId could not be dropped from memory as it does not exist")
}
val status = getCurrentBlockStatus(blockId, info)
if (info.tellMaster) {
reportBlockStatus(blockId, info, status, droppedMemorySize)
}
if (blockIsUpdated) {
Option(TaskContext.get()).foreach { c =>
c.taskMetrics().incUpdatedBlockStatuses(blockId -> status)
}
}
status.storageLevel
}
/**
* Remove all blocks belonging to the given RDD.
*
* @return The number of blocks removed.
*/
def removeRdd(rddId: Int): Int = {
// TODO: Avoid a linear scan by creating another mapping of RDD.id to blocks.
logInfo(s"Removing RDD $rddId")
val blocksToRemove = blockInfoManager.entries.flatMap(_._1.asRDDId).filter(_.rddId == rddId)
blocksToRemove.foreach { blockId => removeBlock(blockId, tellMaster = false) }
blocksToRemove.size
}
/**
* Remove all blocks belonging to the given broadcast.
*/
def removeBroadcast(broadcastId: Long, tellMaster: Boolean): Int = {
logDebug(s"Removing broadcast $broadcastId")
val blocksToRemove = blockInfoManager.entries.map(_._1).collect {
case bid @ BroadcastBlockId(`broadcastId`, _) => bid
}
blocksToRemove.foreach { blockId => removeBlock(blockId, tellMaster) }
blocksToRemove.size
}
/**
* Remove a block from both memory and disk.
*/
def removeBlock(blockId: BlockId, tellMaster: Boolean = true): Unit = {
logDebug(s"Removing block $blockId")
blockInfoManager.lockForWriting(blockId) match {
case None =>
// The block has already been removed; do nothing.
logWarning(s"Asked to remove block $blockId, which does not exist")
case Some(info) =>
// Removals are idempotent in disk store and memory store. At worst, we get a warning.
val removedFromMemory = memoryStore.remove(blockId)
val removedFromDisk = diskStore.remove(blockId)
if (!removedFromMemory && !removedFromDisk) {
logWarning(s"Block $blockId could not be removed as it was not found in either " +
"the disk, memory, or external block store")
}
blockInfoManager.removeBlock(blockId)
val removeBlockStatus = getCurrentBlockStatus(blockId, info)
if (tellMaster && info.tellMaster) {
reportBlockStatus(blockId, info, removeBlockStatus)
}
Option(TaskContext.get()).foreach { c =>
c.taskMetrics().incUpdatedBlockStatuses(blockId -> removeBlockStatus)
}
}
}
def stop(): Unit = {
blockTransferService.close()
if (shuffleClient ne blockTransferService) {
// Closing should be idempotent, but maybe not for the NioBlockTransferService.
shuffleClient.close()
}
diskBlockManager.stop()
rpcEnv.stop(slaveEndpoint)
blockInfoManager.clear()
memoryStore.clear()
futureExecutionContext.shutdownNow()
logInfo("BlockManager stopped")
}
}
private[spark] object BlockManager {
private val ID_GENERATOR = new IdGenerator
def blockIdsToHosts(
blockIds: Array[BlockId],
env: SparkEnv,
blockManagerMaster: BlockManagerMaster = null): Map[BlockId, Seq[String]] = {
// blockManagerMaster != null is used in tests
assert(env != null || blockManagerMaster != null)
val blockLocations: Seq[Seq[BlockManagerId]] = if (blockManagerMaster == null) {
env.blockManager.getLocationBlockIds(blockIds)
} else {
blockManagerMaster.getLocations(blockIds)
}
val blockManagers = new HashMap[BlockId, Seq[String]]
for (i <- 0 until blockIds.length) {
blockManagers(blockIds(i)) = blockLocations(i).map(_.host)
}
blockManagers.toMap
}
}
标签:between test car pread any select element 对象 cores
原文地址:https://www.cnblogs.com/liufei-yes/p/13387085.html