标签:union erro demo 创建 事件分发 简单 3.1 这一 执行
与云基础设施的集成。
所有主要的云提供商都在 对象存储 中提供持久的数据存储。这些不是经典的 “POSIX”
文件系统。为了在不出现任何故障的情况下存储数百字节的数据,对象存储用一个更简单的 “object-name => data”
模型替换了传统的文件系统目录树。为了支持远程访问,对象上的操作通常使用(缓慢的) HTTP REST
协议接口。
Spark
可以通过 Hadoop
中实现的文件系统连接器或基础设施供应商自己提供的连接器读写对象存储中的数据。这些连接器使对象存储看起来 “几乎” 像文件系统,其中包含目录和文件,以及诸如列表、删除和重命名之类的经典操作。
虽然存储看起来是文件系统,但是在它们的下面仍然是对象存储,而且这种区别是显著的。
它们不能作为集群文件系统(如HDFS )的直接替代,除非显式地声明了这一点。【这里应该只得是不能用 s3
来承载 Hadoop
的各种应用,显示声明 指的是 显示的配置 core-site.xml
中 fs.defaultFs
参数】
关键不同点:
- 在
List
操作和 对象数据访问中,对存储对象的更改可能不会立即可见- 模拟目录的方法可能会使使用它们的速度变慢
- 重命名操作可能非常缓慢,如果失败,将使存储处于未知状态
- 在文件中
seek
可能需要新的HTTP
调用,从而影响性能【 new HTTP 调用没太理解】
如何影响 Spark
:
- 与使用普通文件系统相比,读取和写入数据可能要慢得多
- 某些目录结构,在执行
query split calculation
时可能非常低效【可能是存储层查询分片计算的意思】- 后续查询可能无法立即看到工作的输出
Spark
通常在保存RDD
、DataFrame
或数据集时提交的基于重命名的算法可能既慢又不可靠
由于这些原因,使用对象存储作为查询的直接目的地,或者作为查询链中的中间存储,并不总是安全的。请参阅对象存储库及其连接器的文档,以确定哪些使用被认为是安全的。【后面补】
特别是,如果没有某种形式的一致性层,Amazon S3
就不能安全地作为使用基于重命名的提交器工作的直接目的地。【说的是 s3 guard
】
每个云连接器都有自己的一组配置参数,请再次参阅相关文档。
基于 rename
提交是安全的对象存储【一致性】,请使用 “FileOutputCommitter”
v2
算法来提高性能:
spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 2
v2
算法比 “version 1”
算法在作业结束时进行的重命名要少。由于它仍然使用 ‘ rename() ‘
提交文件,所以当对象存储没有一致的 metadata/listings 时使用它是不安全的。
还可以设置提交者在清理临时文件时忽略失败;这降低了将暂态网络问题升级为作业失败的风险
spark.hadoop.mapreduce.fileoutputcommitter.cleanup-failures.ignored true
因为存储临时文件会产生费用;定期删除名为 “_temporary”
的目录以避免这种情况。【for
公有云】
Spark
流可以通过创建一个 “FileInputDStream”
来监控添加到对象存储中的文件,该文件通过调用 “StreamingContext.textFileStream()”
来监控存储中的路径。
new
文件的数量成比例,所以它可能成为一个缓慢的操作。需要设置窗口的大小来处理这个问题“rename()”
操作的 存储系统 进行监控,否则 checkpoint
可能很慢,而且可能不可靠Spark Streaming
是 core Spark API
的扩展,它支持对实时数据流进行可伸缩、高吞吐量、容错的流处理。数据可以从 Kafka
、Flume
、Kinesis
或 TCP
套接字等许多来源获取,也可以使用映射、reduce
、join
和 window
等高级函数表示的复杂算法进行处理。最后,可以将处理后的数据推送到文件系统、数据库和实时仪表板。事实上,你可以应用 Spark
的 机器学习和图形处理。
在内部,它的工作原理如下。Spark
流接收实时输入数据流,并将数据分成批,然后由 Spark
引擎处理这些数据,生成最终的结果流。
Spark
流提供了一个高级抽象,称为离散流或 DStream
,它表示连续的数据流。可以从输入数据创建 DStreams
来自 Kafka
、Flume
和 Kinesis
等源的流,或者通过对其他 dstream
应用高级操作。在内部,DStream
表示为 RDDs
序列。
本指南向您展示如何开始使用 DStreams
编写 Spark
流程序。您可以用 Scala
、Java
或 Python
(在 Spark 1.2
中引入)编写 Spark
流程序,所有这些都在本指南中介绍。您将在本指南中找到 tabs
,这些 tabs
允许您在不同语言的代码片段之间进行选择。
注意:
Python
中有一些api
不是不同就是不可用。在本指南中,您将发现标记Python API
突出显示了这些差异。
StreamingContext
实例默认状态 INITIALIZED
private var state: StreamingContextState = INITIALIZED
StreamingContext.startSite.set()
StreamingContext.ACTIVATION_LOCK.synchronized
a.检查 StreamingContext 是否已启动
b.输入参数检查,是否设置时间间隔等
c.在一个新线程中启动流调度器,以便在不影响当前线程的情况下重置线程本地属性(如 CallSite 和 Job Group)
ThreadUtils.runInNewThread("streaming-start") {
sparkContext.setCallSite(startSite.get)
sparkContext.clearJobGroup()
sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false")
savedProperties.set(SerializationUtils.clone(sparkContext. localProperties.get()))
// 线程的执行函数 JobScheduler start 函数
scheduler.start()
}
d.设置状态为 ACTIVE
e.发送 StreamingListenerStreamingStarted 和当前时间消息到 listenerBus
f.将这个类输入给 StreamingContext 对象
shutdownHookRef = ShutdownHookManager.addShutdownHook(
StreamingContext.SHUTDOWN_HOOK_PRIORITY)(() => stopOnShutdown())
了解:
Spark测量系统,由指定的instance创建,由source、sink组成,周期性地从source获取指标然后发送到sink,其中instance、source、sink的概念如下:
Instance:指定了谁在使用测量系统,在spark中有一些列如master、worker、executor、client driver这些角色,这些角色创建测量系统用于监控spark状态,目前在spark中已经实现的角色包括master、worker、executor、driver、applications
Source:指定了从哪里收集测量数据。在Spark测量系统中有两种来源:
- Spark内部指标,比如MasterSource、WorkerSource等,这些源将会收集Spark组件的内部状态
- 普通指标,比例JvmSource,通过配置文件进行配置
Sink:指定了往哪里输出测量数据
该类将作业安排在Spark上运行。它使用JobGenerator生成作业,并使用线程池运行它们
主要处理 3 个事件
private[scheduler] sealed trait JobSchedulerEvent
private[scheduler] case class JobStarted(job: Job, startTime: Long) extends JobSchedulerEvent
private[scheduler] case class JobCompleted(job: Job, completedTime: Long) extends JobSchedulerEvent
private[scheduler] case class ErrorReported(msg: String, e: Throwable) extends JobSchedulerEvent
处理函数
private def processEvent(event: JobSchedulerEvent) {
try {
event match {
case JobStarted(job, startTime) => handleJobStart(job, startTime)
case JobCompleted(job, completedTime) => handleJobCompletion(job, completedTime)
case ErrorReported(m, e) => handleError(m, e)
}
} catch {
case e: Throwable =>
reportError("Error in job scheduler", e)
}
}
注册到 eventLoop 中
eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") {
override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event)
override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e)
}
执行 eventLoop.start()
eventLoop 中,维护一个 eventQueue 用于缓存即时事件
- 当执行其
start
方法时,会多线程的执行EventLoop
中的run
方法- 在
EventLoop
类中可以看到,里面维持了一个LinkedBlockingDeque
类型的eventQueue
事件队列,接收到的事件都存在该队列中- 从
eventQueue
中取出事件,调用EventLoop
对象在JobScheduler
中被重写的onReceive
方法
监听的事件如下
protected override def doPostEvent(
listener: StreamingListener,
event: StreamingListenerEvent): Unit = {
event match {
case receiverStarted: StreamingListenerReceiverStarted =>
listener.onReceiverStarted(receiverStarted)
case receiverError: StreamingListenerReceiverError =>
listener.onReceiverError(receiverError)
case receiverStopped: StreamingListenerReceiverStopped =>
listener.onReceiverStopped(receiverStopped)
case batchSubmitted: StreamingListenerBatchSubmitted =>
listener.onBatchSubmitted(batchSubmitted)
case batchStarted: StreamingListenerBatchStarted =>
listener.onBatchStarted(batchStarted)
case batchCompleted: StreamingListenerBatchCompleted =>
listener.onBatchCompleted(batchCompleted)
case outputOperationStarted: StreamingListenerOutputOperationStarted =>
listener.onOutputOperationStarted(outputOperationStarted)
case outputOperationCompleted: StreamingListenerOutputOperationCompleted =>
listener.onOutputOperationCompleted(outputOperationCompleted)
case streamingStarted: StreamingListenerStreamingStarted =>
listener.onStreamingStarted(streamingStarted)
case _ =>
}
}
该类管理ReceiverInputDStreams的 receivers 的执行。
start 函数主要调用 launchReceivers()
def start(): Unit = synchronized {
if (isTrackerStarted) {
throw new SparkException("ReceiverTracker already started")
}
if (!receiverInputStreams.isEmpty) {
endpoint = ssc.env.rpcEnv.setupEndpoint(
"ReceiverTracker", new ReceiverTrackerEndpoint(ssc.env.rpcEnv))
if (!skipReceiverLaunch) launchReceivers()
logInfo("ReceiverTracker started")
trackerState = Started
}
}
ReceiverTracker.start 方法的主要逻辑是调用了 ReceiverTracker.launchReceivers。这个方法处理receiverInputStreams 中的每一个receiver后,分发到worker节点,启动并运行。nis.getReceiver对不同的数据源有其具体实现
private def launchReceivers(): Unit = {
val receivers = receiverInputStreams.map { nis =>
val rcvr = nis.getReceiver() // 对不同的数据源有其具体实现
rcvr.setReceiverId(nis.id)
rcvr
}
// 在非local模式下,运行一段逻辑运算,确保所有的slaves都起来后再继续执行,避免了将receivers分配到同一节点上
runDummySparkJob()
logInfo("Starting " + receivers.length + " receivers")
// endpoint 是 RpcEndpointRef 类型,通过它将 receivers 分发到worker节点
endpoint.send(StartAllReceivers(receivers))
}
在 endpoint.send 方法被调用后,根据传入的对象类型,将进入 ReceiverTrackerEndpoint.receive 方法中,处理启动所有 Receivers 的事件。
override def receive : PartialFunction[Any , Unit] = {
// 处理StartAllReceivers事件
case StartAllReceivers(receivers) =>
val scheduledLocations = schedulingPolicy.scheduleReceivers(receivers , getExecutors)
for (receiver <- receivers) {
val executors = scheduledLocations(receiver.streamId)
updateReceiverScheduledExecutors(receiver.streamId, executors)
receiverPreferredLocations (receiver.streamId) = receiver.preferredLocation
startReceiver(receiver, executors)
}
...
}
调用 ReceiverTracker.startReceiver
private def startReceiver (
receiver: Receiver[_],
scheduledLocations: Seq [TaskLocation]): Unit = {
...
// 取出每一个Receiver对象
val receiver = iterator.next()
assert(iterator.hasNext == false)
val supervisor = new ReceiverSupervisorImpl(receiver, SparkEnv.get , serializableHadoopConf.value, checkpointDirOption)
supervisor.start()
supervisor.awaitTermination()
...
}
在 ReceiverSupervisor.start 方法中,开始真正的启动 Receivers,对不同的DStream有具体的Receiver实现。
def start () {
onStart()
startReceiver()
}
def startReceiver (): Unit = synchronized {
try {
if (onReceiverStart()) {
logInfo("Starting receiver" )
receiverState = Started
// 调用Receiver.onStart方法开始接收数据。对不同的DStream有具体的Receiver实现
receiver.onStart()
logInfo("Called receiver onStart" )
} else {
// The driver refused us
stop( "Registered unsuccessfully because Driver refused to start receiver " + streamId, None)
}
} catch {
case NonFatal(t) =>
stop("Error starting receiver " + streamId , Some(t))
}
}
JobGenerator的构造方法如下,使用到了前面提到的JobScheduler对象
class JobGenerator(jobScheduler: JobScheduler) extends Logging
进入 JobGenerator 类。可以看到其 start 方法与 JobScheduler 的 start 方法结构十分类似。在这里面也有一个EventLoop 类型的 eventLoop 对象,只不过这个对象传入的是 JobGeneratorEvent 类型的事件。
eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") {
override protected def onReceive (event: JobGeneratorEvent): Unit = processEvent(event)
override protected def onError (e: Throwable ): Unit = {
jobScheduler.reportError("Error in job generator" , e)
}
}
eventLoop.start()
JobGenerator 中的 eventLoop 主要处理的是 Job 生成,metadata 以及 checkpoint 相关的事件
/** Event classes for JobGenerator */
private[scheduler] sealed trait JobGeneratorEvent
private[scheduler] case class GenerateJobs(time: Time) extends JobGeneratorEvent
private[scheduler] case class ClearMetadata(time: Time) extends JobGeneratorEvent
private[scheduler] case class DoCheckpoint(
time: Time, clearCheckpointDataLater: Boolean) extends JobGeneratorEvent
private[scheduler] case class ClearCheckpointData(time: Time) extends JobGeneratorEvent
在 JobGenerato r类中有一个 RecurringTimer 类型的 timer 对象,这个对象以设置的 batch duration 定时往eventLoop 中推送 GenerateJobs 事件,这样前面这个代码片段中的 processEvent 方法就可以处理这些事件了。
private val timer = new RecurringTimer(clock , ssc.graph.batchDuration.milliseconds ,
longTime => eventLoop.post(GenerateJobs(new Time(longTime))) , "JobGenerator")
eventLoop.post(GenerateJobs(new Time(longTime)))
jobScheduler.receiverTracker.allocateBlocksToBatch(time)
graph.generateJobs(time)
val jobs = this.synchronized {
outputStreams.flatMap { outputStream =>
val jobOption = outputStream.generateJob(time)
jobOption.foreach(_.setCallSite(outputStream.creationSite))
jobOption
}
}
override def generateJob(time: Time): Option[Job] = {
parent.getOrCompute(time) match {
case Some(rdd) =>
val jobFunc = () => createRDDWithLocalProperties(time, displayInnerRDDOps) {
foreachFunc(rdd, time)
}
Some(new Job(time, jobFunc))
case None => None
}
}
private[streaming] final def getOrCompute(time: Time): Option[RDD[T]] = {
// If RDD was already generated, then retrieve it from HashMap,
// or else compute the RDD
generatedRDDs.get(time).orElse {
// Compute the RDD if time is valid (e.g. correct time in a sliding window)
// of RDD generation, else generate nothing.
if (isTimeValid(time)) {
val rddOption = createRDDWithLocalProperties(time, displayInnerRDDOps = false) {
// Disable checks for existing output directories in jobs launched by the streaming
// scheduler, since we may need to write output to an existing directory during checkpoint
// recovery; see SPARK-4835 for more details. We need to have this call here because
// compute() might cause Spark jobs to be launched.
SparkHadoopWriterUtils.disableOutputSpecValidation.withValue(true) {
compute(time)
}
}
...
}
? 查找自上次调用此方法以来已修改的文件,并从中创建union RDD。请注意,这将维护在上次对该方法调用的最新修改时间中处理的文件列表。这是因为FileStatus API返回的修改时间似乎只返回秒粒度的时间。而且新文件的修改时间可能与上一个方法调用中的最新修改时间相同,但在上一个调用中没有报告。
private def findNewFiles(currentTime: Long): Array[String] = {
...
val newFileFilter = new PathFilter {
def accept(path: Path): Boolean = isNewFile(path, currentTime, modTimeIgnoreThreshold)
}
val directoryFilter = new PathFilter {
override def accept(path: Path): Boolean = fs.getFileStatus(path).isDirectory
}
val directories = fs.globStatus(directoryPath, directoryFilter).map(_.getPath)
val newFiles = directories.flatMap(dir =>
fs.listStatus(dir, newFileFilter).map(_.getPath.toString))
...
newFiles
} catch {
case e: Exception =>
logWarning("Error finding new files", e)
reset()
Array.empty
}
}
标签:union erro demo 创建 事件分发 简单 3.1 这一 执行
原文地址:https://www.cnblogs.com/zhance/p/10162827.html