标签:spark kafka
Simplified Parallelism: No need to create multiple input Kafka streams and union-ing them. With directStream
, Spark Streaming will create as many RDD partitions as there is Kafka partitions to consume, which will all read data from Kafka in parallel. So there is one-to-one mapping between Kafka and RDD partitions, which is easier to understand and tune.
简化并行:不再需要创建多个kafka的输入流,然后再将其合并。用directStream,spark
Streaming讲根据kafka要消费的分区来创建对应的RDD,所有的数据都是并行的葱kafka中读取。简单来讲就是RDD和kafka的分区做了一一对应。
Efficiency: Achieving zero-data loss in the first approach required the data to be stored in a Write Ahead Log, which further replicated the data. This is actually inefficient as the data effectively gets replicated twice - once by Kafka, and a second time by the Write Ahead Log. This second approach eliminate the problem as there is no receiver, and hence no need for Write Ahead Logs.
高效:实现零数据丢失,在spark1.2中,需要讲数据存储在一个预写日志(Write Ahead Log),实际上是数据做了二次复制 - 一次通过kafka复制到spark,第二被预写到hdfs存储中,效率是很低下的。spark1.3提供的方法消除了该问题,因为没有接收器,因此没有必要预写日志。
Exactly-once semantics: The first approach uses Kafka’s high level API to store consumed offsets in Zookeeper. This is traditionally the way to consume data from Kafka. While this approach (in combination with write ahead logs) can ensure zero data loss (i.e. at-least once semantics), there is a small chance some records may get consumed twice under some failures. This occurs because of inconsistencies between data reliably received by Spark Streaming and offsets tracked by Zookeeper. Hence, in this second approach, we use simple Kafka API that does not use Zookeeper and offsets tracked only by Spark Streaming within its checkpoints. This eliminates inconsistencies between Spark Streaming and Zookeeper/Kafka, and so each record is received by Spark Streaming effectively exactly once despite failures.
消费一次原则(语义):在spark1.2之前的版本中,spark steaming通过kafka的高级消费API来读取数据,并且将偏移量保存在zookeeper中。 这是传统上从kafka消费数据的方式。虽然这种方法(结合Write Ahead Log日志)可以保证零数据丢失(即至少一次语义),但还是在发生故障的情况下有很少的几率会消费两次。这是由于zookeeper保存偏移量和spark steaming接收数据之间的不一致造成的。因此,在spark1.3中,我们使用简单的Kafak API,而不是使用zookeeper(高级api),偏移量由spark streaming通过checkpoints来保存,这消除了spark streaming 和zookeeper/kafka之间的不一致,所以即使发生失败,收到的每条记录由spark streaming 有且只能消费一次。
Note that one disadvantage of this approach is that it does not update offsets in Zookeeper, hence Zookeeper-based Kafka monitoring tools will not show progress. However, you can access the offsets processed by this approach in each batch and update Zookeeper yourself .
需要注意的是这个方法的唯一缺点就是zookeeper不再保存kafka的偏移量,因此原来依赖zookeeper的监控工具将不能正确的显示消费进程。然而你可以手动的讲spark streaming的偏移量更新到zookeeper中。
/** | |
* :: Experimental :: | |
* Create an input stream that directly pulls messages from Kafka Brokers | |
* without using any receiver. This stream can guarantee that each message | |
* from Kafka is included in transformations exactly once (see points below). | |
* | |
* Points to note: | |
* - No receivers: This stream does not use any receiver. It directly queries Kafka | |
* - Offsets: This does not use Zookeeper to store offsets. The consumed offsets are tracked | |
* by the stream itself. For interoperability with Kafka monitoring tools that depend on | |
* Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application. | |
* You can access the offsets used in each batch from the generated RDDs (see | |
* [[org.apache.spark.streaming.kafka.HasOffsetRanges]]). | |
* - Failure Recovery: To recover from driver failures, you have to enable checkpointing | |
* in the [[StreamingContext]]. The information on consumed offset can be | |
* recovered from the checkpoint. See the programming guide for details (constraints, etc.). | |
* - End-to-end semantics: This stream ensures that every records is effectively received and | |
* transformed exactly once, but gives no guarantees on whether the transformed data are | |
* outputted exactly once. For end-to-end exactly-once semantics, you have to either ensure | |
* that the output operation is idempotent, or use transactions to output records atomically. | |
* See the programming guide for more details. | |
* | |
* @param ssc StreamingContext object | |
* @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration"> | |
* configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers" | |
* to be set with Kafka broker(s) (NOT zookeeper servers) specified in | |
* host1:port1,host2:port2 form. | |
* @param fromOffsets Per-topic/partition Kafka offsets defining the (inclusive) | |
* starting point of the stream | |
* @param messageHandler Function for translating each message and metadata into the desired type | |
*/ | |
@Experimental | |
def createDirectStream[ | |
K: ClassTag, | |
V: ClassTag, | |
KD <: Decoder[K]: ClassTag, | |
VD <: Decoder[V]: ClassTag, | |
R: ClassTag] ( | |
ssc: StreamingContext, | |
kafkaParams: Map[String, String], | |
fromOffsets: Map[TopicAndPartition, Long], | |
messageHandler: MessageAndMetadata[K, V] => R | |
): InputDStream[R] = { | |
new DirectKafkaInputDStream[K, V, KD, VD, R]( | |
ssc, kafkaParams, fromOffsets, messageHandler) | |
} | |
/** | |
* :: Experimental :: | |
* Create an input stream that directly pulls messages from Kafka Brokers | |
* without using any receiver. This stream can guarantee that each message | |
* from Kafka is included in transformations exactly once (see points below). | |
* | |
* Points to note: | |
* - No receivers: This stream does not use any receiver. It directly queries Kafka | |
* - Offsets: This does not use Zookeeper to store offsets. The consumed offsets are tracked | |
* by the stream itself. For interoperability with Kafka monitoring tools that depend on | |
* Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application. | |
* You can access the offsets used in each batch from the generated RDDs (see | |
* [[org.apache.spark.streaming.kafka.HasOffsetRanges]]). | |
* - Failure Recovery: To recover from driver failures, you have to enable checkpointing | |
* in the [[StreamingContext]]. The information on consumed offset can be | |
* recovered from the checkpoint. See the programming guide for details (constraints, etc.). | |
* - End-to-end semantics: This stream ensures that every records is effectively received and | |
* transformed exactly once, but gives no guarantees on whether the transformed data are | |
* outputted exactly once. For end-to-end exactly-once semantics, you have to either ensure | |
* that the output operation is idempotent, or use transactions to output records atomically. | |
* See the programming guide for more details. | |
* | |
* @param ssc StreamingContext object | |
* @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration"> | |
* configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers" | |
* to be set with Kafka broker(s) (NOT zookeeper servers), specified in | |
* host1:port1,host2:port2 form. | |
* If not starting from a checkpoint, "auto.offset.reset" may be set to "largest" or "smallest" | |
* to determine where the stream starts (defaults to "largest") | |
* @param topics Names of the topics to consume | |
*/ | |
@Experimental | |
def createDirectStream[ | |
K: ClassTag, | |
V: ClassTag, | |
KD <: Decoder[K]: ClassTag, | |
VD <: Decoder[V]: ClassTag] ( | |
ssc: StreamingContext, | |
kafkaParams: Map[String, String], | |
topics: Set[String] | |
): InputDStream[(K, V)] = { | |
val messageHandler = (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message) | |
val kc = new KafkaCluster(kafkaParams) | |
val reset = kafkaParams.get("auto.offset.reset").map(_.toLowerCase) | |
(for { | |
topicPartitions <- kc.getPartitions(topics).right | |
leaderOffsets <- (if (reset == Some("smallest")) { | |
kc.getEarliestLeaderOffsets(topicPartitions) | |
} else { | |
kc.getLatestLeaderOffsets(topicPartitions) | |
}).right | |
} yield { | |
val fromOffsets = leaderOffsets.map { case (tp, lo) => | |
(tp, lo.offset) | |
} | |
new DirectKafkaInputDStream[K, V, KD, VD, (K, V)]( | |
ssc, kafkaParams, fromOffsets, messageHandler) | |
}).fold( | |
errs => throw new SparkException(errs.mkString("\n")), | |
ok => ok | |
) | |
} |
标签:spark kafka
原文地址:http://weikuan.blog.51cto.com/9957075/1621794