码迷,mamicode.com
首页 > 其他好文 > 详细

Spark学习笔记——泰坦尼克生还预测

时间:2017-05-26 16:38:01      阅读:274      评论:0      收藏:0      [点我收藏+]

标签:spark   保存   cas   vmm   coalesce   builder   1.0   titan   instance   

package kaggle

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.{SQLContext, SparkSession}
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.classification.{LogisticRegressionWithLBFGS, LogisticRegressionWithSGD, NaiveBayes, SVMWithSGD}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.stat.Statistics


/**
  * Created by mi on 17-5-23.
  */


object Titanic {


  def main(args: Array[String]) {

    //    val sparkSession = SparkSession.builder.
    //      master("local")
    //      .appName("spark session example")
    //      .getOrCreate()
    //    val rawData = sparkSession.read.csv("/home/mi/下载/kaggle/Titanic/nohead-train.csv")
    //    val d = rawData.map{p => p.asInstanceOf[person]}
    //    d.show()

    val conf = new SparkConf().setAppName("WordCount").setMaster("local")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)

    //屏蔽日志
    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)

    // 读取数据
    val df = sqlContext.load("com.databricks.spark.csv", Map("path" -> "/home/mi/下载/kaggle/Titanic/train.csv", "header" -> "true"))

    // 分析年龄数据
    val ageAnalysis = df.rdd.filter(d => d(5) != null).map { d =>
      val age = d(5).toString.toDouble
      Vectors.dense(age)
    }
    val ageMean = Statistics.colStats(ageAnalysis).mean(0)
    val ageMax = Statistics.colStats(ageAnalysis).max(0)
    val ageMin = Statistics.colStats(ageAnalysis).min(0)
    val ageDiff = ageMax - ageMin

    // 分析船票价格数据
    val fareAnalysis = df.rdd.filter(d => d(9) != null).map { d =>
      val fare = d(9).toString.toDouble
      Vectors.dense(fare)
    }
    val fareMean = Statistics.colStats(fareAnalysis).mean(0)
    val fareMax = Statistics.colStats(fareAnalysis).max(0)
    val fareMin = Statistics.colStats(fareAnalysis).min(0)
    val fareDiff = fareMax - fareMin


    // 数据预处理
    val trainData = df.rdd.map { d =>
      val label = d(1).toString.toInt
      val sex = d(4) match {
        case "male" => 0.0
        case "female" => 1.0
      }
      val age = d(5) match {
        case null => (ageMean - ageMin) / ageDiff
        case _ => (d(5).toString().toDouble - ageMin) / ageDiff
      }
      val fare = d(9) match {
        case null => (fareMean - fareMin) / fareDiff
        case _ => (d(9).toString().toDouble - fareMin) / fareDiff
      }

      LabeledPoint(label, Vectors.dense(sex, age, fare))
    }

    // 切分数据集和测试集
    val Array(trainingData, testData) = trainData.randomSplit(Array(0.8, 0.2))

    // 训练数据
    val numIterations = 8
    val lrModel = new LogisticRegressionWithLBFGS().setNumClasses(2).run(trainingData)
    //    val svmModel = SVMWithSGD.train(trainingData, numIterations)

    val nbTotalCorrect = testData.map { point =>
      if (lrModel.predict(point.features) == point.label) 1 else 0
    }.sum
    val nbAccuracy = nbTotalCorrect / testData.count

    println("SVM模型正确率:" + nbAccuracy)

    // 预测
    // 读取数据
    val testdf = sqlContext.load("com.databricks.spark.csv", Map("path" -> "/home/mi/下载/kaggle/Titanic/test.csv", "header" -> "true"))

    // 分析测试集年龄数据
    val ageTestAnalysis = testdf.rdd.filter(d => d(4) != null).map { d =>
      val age = d(4).toString.toDouble
      Vectors.dense(age)
    }
    val ageTestMean = Statistics.colStats(ageTestAnalysis).mean(0)
    val ageTestMax = Statistics.colStats(ageTestAnalysis).max(0)
    val ageTestMin = Statistics.colStats(ageTestAnalysis).min(0)
    val ageTestDiff = ageTestMax - ageTestMin

    // 分析船票价格数据
    val fareTestAnalysis = testdf.rdd.filter(d => d(8) != null).map { d =>
      val fare = d(8).toString.toDouble
      Vectors.dense(fare)
    }
    val fareTestMean = Statistics.colStats(fareTestAnalysis).mean(0)
    val fareTestMax = Statistics.colStats(fareTestAnalysis).max(0)
    val fareTestMin = Statistics.colStats(fareTestAnalysis).min(0)
    val fareTestDiff = fareTestMax - fareTestMin

    // 数据预处理
    val data = testdf.rdd.map { d =>
      val sex = d(3) match {
        case "male" => 0.0
        case "female" => 1.0
      }
      val age = d(4) match {
        case null => (ageTestMean - ageTestMin) / ageTestDiff
        case _ => (d(4).toString().toDouble - ageTestMin) / ageTestDiff
      }
      val fare = d(8) match {
        case null => (fareTestMean - fareTestMin) / fareTestDiff
        case _ => (d(8).toString().toDouble - fareTestMin) / fareTestDiff
      }

      Vectors.dense(sex, age, fare)
    }

    val predictions = lrModel.predict(data).map(p => p.toInt)
    // 保存预测结果
    predictions.coalesce(1).saveAsTextFile("file:///home/mi/下载/kaggle/Titanic/test_predict")
  }
}

 

Spark学习笔记——泰坦尼克生还预测

标签:spark   保存   cas   vmm   coalesce   builder   1.0   titan   instance   

原文地址:http://www.cnblogs.com/tonglin0325/p/6909157.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!