码迷,mamicode.com
首页 > 其他好文 > 详细

spark streaming 6: BlockGenerator、RateLimiter

时间:2015-02-05 20:03:14      阅读:148      评论:0      收藏:0      [点我收藏+]

标签:

BlockGenerator和RateLimiter其实很简单,但是它包含了几个很重要的属性配置的处理,所以记录一下。
/**
* Generates batches of objects received by a
* [[org.apache.spark.streaming.receiver.Receiver]] and puts them into appropriately
* named blocks at regular intervals. This class starts two threads,
* one to periodically start a new batch and prepare the previous batch of as a block,
* the other to push the blocks into the block manager.
*/
private[streaming] class BlockGenerator(
listener: BlockGeneratorListener
,
receiverId: Int,
conf: SparkConf
)
extends RateLimiter(conf) with Logging {

private case class Block(id: StreamBlockId, buffer: ArrayBuffer[Any])

private val clock = new SystemClock()
private val blockInterval = conf.getLong("spark.streaming.blockInterval", 200)
private val blockIntervalTimer =
new RecurringTimer(clock, blockInterval, updateCurrentBuffer, "BlockGenerator")
private val blockQueueSize = conf.getInt("spark.streaming.blockQueueSize", 10)
private val blocksForPushing = new ArrayBlockingQueue[Block](blockQueueSize)
private val blockPushingThread = new Thread() { override def run() { keepPushingBlocks() } }

@volatile private var currentBuffer = new ArrayBuffer[Any]
@volatile private var stopped = false

/** Provides waitToPush() method to limit the rate at which receivers consume data.
*
* waitToPush method will block the thread if too many messages have been pushed too quickly,
* and only return when a new message has been pushed. It assumes that only one message is
* pushed at a time.
*
* The spark configuration spark.streaming.receiver.maxRate gives the maximum number of messages
* per second that each receiver will accept.
*
* @param conf spark configuration
*/
private[receiver] abstract class RateLimiter(conf: SparkConf) extends Logging {

private var lastSyncTime = System.nanoTime
private var messagesWrittenSinceSync = 0L
private val desiredRate = conf.getInt("spark.streaming.receiver.maxRate", 0)
private val SYNC_INTERVAL = NANOSECONDS.convert(10, SECONDS)
















spark streaming 6: BlockGenerator、RateLimiter

标签:

原文地址:http://www.cnblogs.com/zwCHAN/p/4275464.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!