标签:
def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("DecisionTree1").setMaster("local[2]") sparkConf.set("es.index.auto.create", "true") sparkConf.set("es.nodes", "10.3.162.202") sparkConf.set("es.port", "9200") val sc = new SparkContext(sparkConf) //write2Es(sc) read4Es(sc); } def write2Es(sc: SparkContext) = { val numbers = Map("one" -> 1, "two" -> 2, "three" -> 3) val airports = Map("OTP" -> "Otopeni", "SFO" -> "San Fran") var rdd = sc.makeRDD(Seq(numbers, airports)) EsSpark.saveToEs(rdd, "spark/docs") println("--------------------End-----------------") } def read4Es(sc: SparkContext) { val rdd = EsSpark.esRDD(sc, "spark/docs") rdd.foreach(line => { val key = line._1 val value = line._2 println("------------------key:" + key) for (tmp <- value) { val key1 = tmp._1 val value1 = tmp._2 println("------------------key1:" + key1) println("------------------value1:" + value1) } }) }
例子依赖jar:elasticsearch-spark_2.10-2.1.0.jar
标签:
原文地址:http://www.cnblogs.com/qq27271609/p/4689976.html