标签:tween stand min 简单的 fse 空间 关联 简化 zookeeper
Spark Streaming 支持多种实时输入源数据的读取,其中包括Kafka、flume、socket流等等。除了Kafka以外的实时输入源,由于我们的业务场景没有涉及,在此将不会讨论。本篇文章主要着眼于我们目前的业务场景,只关注Spark Streaming读取Kafka数据的方式。 Spark Streaming 官方提供了两种方式读取Kafka数据:
此两种读取方式存在很大的不同,当然也各有优劣。接下来就让我们具体剖解这两种数据读取方式。
如前文所述,Spark官方最先提供了基于Receiver的Kafka数据消费模式。但会存在程序失败丢失数据的可能,后在Spark 1.2时引入一个配置参数spark.streaming.receiver.writeAheadLog.enable以规避此风险。以下是官方的原话:
under default configuration, this approach can lose data under failures (see receiver reliability. To ensure zero-data loss, you have to additionally enable Write Ahead Logs in Spark Streaming (introduced in Spark 1.2). This synchronously saves all the received Kafka data into write ahead logs on a distributed file system (e.g HDFS), so that all the data can be recovered on failure.
Receiver-based的Kafka读取方式是基于Kafka高阶(high-level) api来实现对Kafka数据的消费。在提交Spark Streaming任务后,Spark集群会划出指定的Receivers来专门、持续不断、异步读取Kafka的数据,读取时间间隔以及每次读取offsets范围可以由参数来配置。读取的数据保存在Receiver中,具体StorageLevel方式由用户指定,诸如MEMORY_ONLY等。当driver 触发batch任务的时候,Receivers中的数据会转移到剩余的Executors中去执行。在执行完之后,Receivers会相应更新ZooKeeper的offsets。如要确保at least once的读取方式,可以设置spark.streaming.receiver.writeAheadLog.enable为true。具体Receiver执行流程见下图:
Kafka的high-level数据读取方式让用户可以专注于所读数据,而不用关注或维护consumer的offsets,这减少用户的工作量以及代码量而且相对比较简单。因此,在刚开始引入Spark Streaming计算引擎时,我们优先考虑采用此种方式来读取数据,具体的代码如下:
/*读取kafka数据函数*/
def getKafkaInputStream(zookeeper: String,
topic: String,
groupId: String,
numRecivers: Int,
partition: Int,
ssc: StreamingContext): DStream[String] = {
val kafkaParams = Map(
("zookeeper.connect", zookeeper),
("auto.offset.reset", "largest"),
("zookeeper.connection.timeout.ms", "30000"),
("fetch.message.max.bytes", (1024 * 1024 * 50).toString),
("group.id", groupId)
)
val topics = Map(topic -> partition / numRecivers)
val kafkaDstreams = (1 to numRecivers).map { _ =>
KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc,
kafkaParams,
topics,
StorageLevel.MEMORY_AND_DISK_SER).map(_._2)
}
ssc.union(kafkaDstreams)
}
如上述代码,函数getKafkaInputStream提供了zookeeper, topic, groupId, numReceivers, partition以及ssc,其传入函数分别对应:
以上几个参数主要用来连接Kafka并读取Kafka数据。具体执行的步骤如下:
采用Reveiver-based方式满足我们的一些场景需求,并基于此抽象出了一些micro-batch、内存计算模型等。在具体的应用场景中,我们也对此种的方式做了一些优化:
以上处理方式在一定程度上满足了我们的应用场景,诸如micro-batch以及内存计算模型等。但是同时因为这两方面以及其他方面的一些因素,导致也会出现各种情况的问题:
为了回辟以上问题,降低资源使用,我们后来采用Direct Approach来读取Kafka的数据,具体接下来细说。
区别于Receiver-based的数据消费方法,Spark 官方在Spark 1.3时引入了Direct方式的Kafka数据消费方式。相对于Receiver-based的方法,Direct方式具有以下方面的优势:
简化并行(Simplified Parallelism)。不现需要创建以及union多输入源,Kafka topic的partition与RDD的partition一一对应,官方描述如下:
No need to create multiple input Kafka streams and union them. With directStream, Spark Streaming will create as many RDD partitions as there are Kafka partitions to consume, which will all read data from Kafka in parallel. So there is a one-to-one mapping between Kafka and RDD partitions, which is easier to understand and tune.
高效(Efficiency)。Receiver-based保证数据零丢失(zero-data loss)需要配置spark.streaming.receiver.writeAheadLog.enable,此种方式需要保存两份数据,浪费存储空间也影响效率。而Direct方式则不存在这个问题。
Achieving zero-data loss in the first approach required the data to be stored in a Write Ahead Log, which further replicated the data. This is actually inefficient as the data effectively gets replicated twice - once by Kafka, and a second time by the Write Ahead Log. This second approach eliminates the problem as there is no receiver, and hence no need for Write Ahead Logs. As long as you have sufficient Kafka retention, messages can be recovered from Kafka.
强一致语义(Exactly-once semantics)。High-level数据由Spark Streaming消费,但是Offsets则是由Zookeeper保存。通过参数配置,可以实现at-least once消费,此种情况有重复消费数据的可能。
The first approach uses Kafka’s high level API to store consumed offsets in Zookeeper. This is traditionally the way to consume data from Kafka. While this approach (in combination with write ahead logs) can ensure zero data loss (i.e. at-least once semantics), there is a small chance some records may get consumed twice under some failures. This occurs because of inconsistencies between data reliably received by Spark Streaming and offsets tracked by Zookeeper. Hence, in this second approach, we use simple Kafka API that does not use Zookeeper. Offsets are tracked by Spark Streaming within its checkpoints. This eliminates inconsistencies between Spark Streaming and Zookeeper/Kafka, and so each record is received by Spark Streaming effectively exactly once despite failures. In order to achieve exactly-once semantics for output of your results, your output operation that saves the data to an external data store must be either idempotent, or an atomic transaction that saves results and offsets (see Semantics of output operations in the main programming guide for further information).
Direct方式采用Kafka简单的consumer api方式来读取数据,无需经由ZooKeeper,此种方式不再需要专门Receiver来持续不断读取数据。当batch任务触发时,由Executor读取数据,并参与到其他Executor的数据计算过程中去。driver来决定读取多少offsets,并将offsets交由checkpoints来维护。将触发下次batch任务,再由Executor读取Kafka数据并计算。从此过程我们可以发现Direct方式无需Receiver读取数据,而是需要计算时再读取数据,所以Direct方式的数据消费对内存的要求不高,只需要考虑批量计算所需要的内存即可;另外batch任务堆积时,也不会影响数据堆积。其具体读取方式如下图:
Spark Streaming提供了一些重载读取Kafka数据的方法,本文中关注两个基于Scala的方法,这在我们的应用场景中会用到,具体的方法代码如下:
def createDirectStream[ K: ClassTag, V: ClassTag,
KD <: Decoder[K]: ClassTag,
VD <: Decoder[V]: ClassTag, R: ClassTag] ( ssc: StreamingContext, kafkaParams: Map[String, String], fromOffsets: Map[TopicAndPartition, Long], messageHandler: MessageAndMetadata[K, V] => R
): InputDStream[R] = {
val cleanedHandler = ssc.sc.clean(messageHandler)
new DirectKafkaInputDStream[K, V, KD, VD, R](
ssc, kafkaParams, fromOffsets, cleanedHandler)
}
def createDirectStream[
K: ClassTag,
V: ClassTag,
KD <: Decoder[K]: ClassTag,
VD <: Decoder[V]: ClassTag] (
ssc: StreamingContext,
kafkaParams: Map[String, String],
topics: Set[String]
): InputDStream[(K, V)] = {
val messageHandler = (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message)
val kc = new KafkaCluster(kafkaParams)
val fromOffsets = getFromOffsets(kc, kafkaParams, topics)
new DirectKafkaInputDStream[K, V, KD, VD, (K, V)](
ssc, kafkaParams, fromOffsets, messageHandler)
}
在实际的应用场景中,我们会将两种方法结合起来使用,大体的方向分为两个方面:
总体方向上,我们采用以上方法满足我们的需要,当然具体的策略我们不在本篇中讨论,后续会有专门的文章来介绍。从largest或者是smallest处读Kafka数据代码实现如下:
/**
* 读取kafka数据,从最新的offset开始读
*
* @param ssc : StreamingContext
* @param kafkaParams : kafka参数
* @param topics : kafka topic
* @return : 返回流数据
*/private def getDirectStream(ssc: StreamingContext,
kafkaParams: Map[String, String],
topics: Set[String]): DStream[String] = {
val kafkaDStreams = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc,
kafkaParams,
topics
)
kafkaDStreams.map(_._2)
}
程序失败重启的逻辑代码如下:
/**
* 如果已有offset,则从offset开始读数据
*
* @param ssc : StreamingContext
* @param kafkaParams : kafkaParams配置参数
* @param fromOffsets : 已有的offsets
* @return : 返回流数据
*/private def getDirectStreamWithOffsets(ssc: StreamingContext,
kafkaParams: Map[String, String],
fromOffsets: Map[TopicAndPartition, Long]): DStream[String] = {
val kfkData = try {
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, String](
ssc,
kafkaParams,
fromOffsets,
(mmd: MessageAndMetadata[String, String]) => mmd.message()
)
} catch { //offsets失效, 从最新的offsets读。
case _: Exception =>
val topics = fromOffsets.map { case (tap, _) =>
tap.topic
}.toSet
getDirectStream(ssc, kafkaParams, topics)
}
kfkData
}
代码中的fromOffsets参数从外部存储获取并需要处理转换,其代码如下:
val fromOffsets = offsets.map { consumerInfo =>
TopicAndPartition(consumerInfo.topic, consumerInfo.part) -> consumerInfo.until_offset
}.toMap
该方法提供了从指定offsets处读取Kafka数据。如果发现读取数据异常,我们认为是offsets失败,此种情况去捕获这个异常,然后从largest处读取Kafka数据。
在实际的应用中,Direct Approach方式很好地满足了我们的需要,与Receiver-based方式相比,有以下几方面的优势:
至于其他方面的优势,比如 简化并行(Simplified Parallelism)、高效(Efficiency)以及强一致语义(Exactly-once semantics)在之前已列出,在此不再介绍。虽然Direct 有以上这些优势,但是也存在一些不足,具体如下:
本文介绍了基于Spark Streaming的Kafka数据读取方式,包括Receiver-based以及Direct两种方式。两种方式各有优劣,但相对来说Direct 适用于更多的业务场景以及有更好的可护展性。至于如何选择以上两种方式,除了业务场景外也跟团队相关,如果是应用初期,为了快速迭代应用,可以考虑采用第一种方式;如果要深入使用的话则建议采用第二种方式。
标签:tween stand min 简单的 fse 空间 关联 简化 zookeeper
原文地址:http://www.cnblogs.com/lixin1101/p/7306400.html