标签:rgs min contex 通过 对象 class 场景 storage nbsp
1.说明
针对需要恢复的应用场景,提供了HA的的机制
内部实现原理:基于checkpoint的
当程序被kill的时候,下次恢复的时候,会从checkpoint对用的文件中进行数据的恢复
2.注意点
SparkStreaming 的HA和updateStateByKey来记录历史数据的API不能一起使用
二:程序
1.程序
1 package com.stream.it 2 3 import kafka.serializer.StringDecoder 4 import org.apache.spark.storage.StorageLevel 5 import org.apache.spark.streaming.kafka.KafkaUtils 6 import org.apache.spark.streaming.{Seconds, StreamingContext} 7 import org.apache.spark.{SparkConf, SparkContext} 8 9 object HAKafkaWordcount { 10 def main(args: Array[String]): Unit = { 11 val conf=new SparkConf() 12 .setAppName("spark-streaming-wordcount") 13 .setMaster("local[*]") 14 val sc=SparkContext.getOrCreate(conf) 15 val checkpointDir = "hdfs://linux-hadoop01.ibeifeng.com:8020/beifeng/spark/streaming/chkdir02" 16 17 18 /** 19 * 构造StreamingContext对象 20 * 21 * @return 22 */ 23 def createStreamingContextFunc(): StreamingContext = { 24 val ssc = new StreamingContext(sc, Seconds(5)) 25 ssc.checkpoint(checkpointDir) 26 val kafkaParams=Map("group.id"->"stream-sparking-0", 27 "zookeeper.connect"->"linux-hadoop01.ibeifeng.com:2181/kafka", 28 "auto.offset.reset"->"smallest" 29 ) 30 val topics=Map("beifeng"->1) 31 val dStream=KafkaUtils.createStream[String,String,StringDecoder,StringDecoder]( 32 ssc, //给定sparkStreaming的上下文 33 kafkaParams, //kafka的参数信息,通过kafka HightLevelComsumerApi连接 34 topics, //给定读取对应的topic的名称以及读取数据的线程数量 35 StorageLevel.MEMORY_AND_DISK_2 //数据接收器接收到kafka的数据后的保存级别 36 ).map(_._2) 37 38 39 val resultWordcount=dStream 40 .filter(line=>line.nonEmpty) 41 .flatMap(line=>line.split(" ").map((_,1))) 42 .reduceByKey(_+_) 43 resultWordcount.foreachRDD(rdd=>{ 44 rdd.foreachPartition(iter=>iter.foreach(println)) 45 }) 46 ssc 47 } 48 49 val ssc = StreamingContext.getOrCreate( 50 checkpointPath = checkpointDir, 51 creatingFunc = createStreamingContextFunc 52 ) 53 54 //启动 55 ssc.start() 56 //等到 57 ssc.awaitTermination() 58 } 59 }
2.注意点
HA第一次执行后,以后如果代码进行改动(创建StreamingContext的代码改动),不会得到反应(会直接从checkpoint中读取数据进行StreamingContext的恢复) ===> 解决SparkStreaming和Kafka集成的时候offset偏移量管理的问题
标签:rgs min contex 通过 对象 class 场景 storage nbsp
原文地址:https://www.cnblogs.com/juncaoit/p/9464277.html