码迷,mamicode.com
首页 > 其他好文 > 详细

Spark MLlib NaiveBayes 贝叶斯分类器

时间:2015-04-29 13:41:27      阅读:302      评论:0      收藏:0      [点我收藏+]

标签:spark   mllib   naivebayes   

1.1朴素贝叶斯公式

贝叶斯定理:

  技术分享    

其中A为事件,B为类别,P(B|A)为事件A条件下属于B类别的概率。

朴素贝叶斯分类的正式定义如下:

      1、设技术分享为一个待分类项,而每个a为x的一个特征属性。

      2、有类别集合技术分享

      3、计算技术分享

      4、如果技术分享,则技术分享

      那么现在的关键就是如何计算第3步中的各个条件概率:

      1、找到一个已知分类的待分类项集合,这个集合叫做训练样本集。

      2、统计得到在各类别下各个特征属性的条件概率估计。即 技术分享

      3、如果各个特征属性是条件独立的,则根据贝叶斯定理有如下推导:

       技术分享

      因为分母对于所有类别为常数,因为我们只要将分子最大化皆可。又因为各特征属性是条件独立的,所以有:

       技术分享 

1.2 NaiveBayesModel 源码解析

1、NaiveBayesModel主要的三个变量:

1)labels:类别

scala> labels

res56: Array[Double] = Array(2.0, 0.0, 1.0)

2)pi:各个label的先验概率

scala> pi

res57: Array[Double] = Array(-1.1631508098056809, -0.9808292530117262, -1.1631508098056809)

3)theta:存储各个特征在各个类别中的条件概率。

scala> theta

res58: Array[Array[Double]] = Array(Array(-2.032921526044943, -1.8658674413817768, -0.33647223662121295), Array(-0.2451224580329847, -2.179982770901713, -2.26002547857525), Array(-1.9676501356917193, -0.28410425110389714, -2.2300144001592104))

4)lambda:平滑因子

 

2、NaiveBayesModel代码

/**

   * Run the algorithm with the configured parameters on an input RDD of LabeledPoint entries.

   *

   * @param data RDD of [[org.apache.spark.mllib.regression.LabeledPoint]].

   */

  def run(data: RDD[LabeledPoint]) = {

// requireNonnegativeValues:取每一个样本数据的特征值,以向量形式存储,特征植必需为非负数。

    val requireNonnegativeValues: Vector => Unit = (v: Vector) => {

      val values = v match {

        case SparseVector(size, indices, values) =>

          values

        case DenseVector(values) =>

          values

      }

      if (!values.forall(_ >= 0.0)) {

        thrownew SparkException(s"Naive Bayes requires nonnegative feature values but found $v.")

      }

    }

 

    // Aggregates term frequencies per label.

    // TODO: Calling combineByKey and collect creates two stages, we can implement something

// TODO: similar to reduceByKeyLocally to save one stage.

// aggregated:对所有样本数据进行聚合,以labelkey,聚合同一个labelfeatures

// createCombiner:完成样本从VCcombine转换,(v: Vector) –> (c: (Long, BDV[Double])

// mergeValue:将下一个样本中Value合并为操作后的C类型数据,(c: (Long, BDV[Double]), v: Vector) –> (c: (Long, BDV[Double])

// mergeCombiners:根据每个Key所对应的多个C,进行归并,(c1: (Long, BDV[Double]), c2: (Long, BDV[Double])) –> (c: (Long, BDV[Double])

    val aggregated = data.map(p => (p.label, p.features)).combineByKey[(Long, BDV[Double])](

      createCombiner = (v: Vector) => {

        requireNonnegativeValues(v)

        (1L, v.toBreeze.toDenseVector)

      },

      mergeValue = (c: (Long, BDV[Double]), v: Vector) => {

        requireNonnegativeValues(v)

        (c._1 + 1L, c._2 += v.toBreeze)

      },

      mergeCombiners = (c1: (Long, BDV[Double]), c2: (Long, BDV[Double])) =>

        (c1._1 + c2._1, c1._2 += c2._2)

    ).collect()

    val numLabels = aggregated.length

    var numDocuments = 0L

    aggregated.foreach { case (_, (n, _)) =>

      numDocuments += n

    }

    val numFeatures = aggregated.head match { case (_, (_, v)) => v.size }

    val labels = new Array[Double](numLabels)

    val pi = new Array[Double](numLabels)

    val theta = Array.fill(numLabels)(new Array[Double](numFeatures))

    val piLogDenom = math.log(numDocuments + numLabels * lambda)

var i = 0

// labels:存储类别

// pi:计算每个类别的概率

// theta:计算在该类别下每个特征的概率

    aggregated.foreach { case (label, (n, sumTermFreqs)) =>

      labels(i) = label

      val thetaLogDenom = math.log(brzSum(sumTermFreqs) + numFeatures * lambda)

      pi(i) = math.log(n + lambda) - piLogDenom

      var j = 0

      while (j < numFeatures) {

        theta(i)(j) = math.log(sumTermFreqs(j) + lambda) - thetaLogDenom

        j += 1

      }

      i += 1

    }

// 返回模型

    new NaiveBayesModel(labels, pi, theta)

  }

1.3 NaiveBayesModel 实例

1、数据

数据格式为:类别, 特征1 特征2 特征3

0,1 0 0

0,2 0 0

0,1 0 0.1

0,2 0 0.2

0,1 0.1 0

0,2 0.2 0

1,0 1 0.1

1,0 2 0.2

1,0.1 1 0

1,0.2 2 0

1,0 1 0

1,0 2 0

2,0.1 0 1

2,0.2 0 2

2,0 0.1 1

2,0 0.2 2

2,0 0 1

2,0 0 2

2、代码

//数据路径

valdata_path = "user/tmp/sample_naive_bayes_data.txt"

//读取数据,转换成LabeledPoint类型

    valexamples = sc.textFile(data_path).map { line =>

      valitems = line.split(‘,‘)

      vallabel = items(0).toDouble

      valvalue = items(1).split(‘ ‘).toArray.map(f => f.toDouble)

      LabeledPoint(label, Vectors.dense(value))

    }

examples.cache()

//样本划分,80%训练,20%测试

    valsplits = examples.randomSplit(Array(0.8, 0.2))

    valtraining = splits(0)

    valtest = splits(1)

    valnumTraining = training.count()

    valnumTest = test.count()

    println(s"numTraining = $numTraining, numTest = $numTest.")

//样本训练,生成分类模型

    valmodel = new NaiveBayes().setLambda(1.0).run(training)

//根据分类模型,对测试数据进行测试,计算测试数据的正常率

    valprediction = model.predict(test.map(_.features))

    valpredictionAndLabel = prediction.zip(test.map(_.label))

    valaccuracy = predictionAndLabel.filter(x => x._1 == x._2).count().toDouble / numTest

    println(s"Test accuracy = $accuracy.")

 

Spark MLlib NaiveBayes 贝叶斯分类器

标签:spark   mllib   naivebayes   

原文地址:http://blog.csdn.net/sunbow0/article/details/45364299

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!