码迷,mamicode.com
首页 > 其他好文 > 详细

使用Spark下的corr计算皮尔森相似度Pearson时,报错Can only zip RDDs with same number of elements in each partition....

时间:2017-08-07 11:54:15      阅读:650      评论:0      收藏:0      [点我收藏+]

标签:...   new   apach   imp   stat   object   number   tin   案例   

package com.huawei.bigdata.spark.examples

import org.apache.spark.mllib.stat.Statistics
import org.apache.spark.sql.types.DoubleType
import org.apache.spark.{SparkConf, SparkContext}

/**
  * Created by wulei on 2017/8/3.
  */
object PointCorrPredict {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("PointCorrPredict")
    val sc = new SparkContext(sparkConf)
    val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
    sqlContext.sql("use vio_offsite")
    //360111010002,360102029001
    val dataFrame = sqlContext.sql("select kk_id,direct,day,hour,cnt,speed from kk_hour_scale").orderBy("day","hour")
    val newDataFrame = dataFrame.filter("kk_id = ‘3601110100‘and direct = ‘02‘")
                      .orderBy(dataFrame("day").desc,dataFrame("hour").desc).select(dataFrame.col("cnt").cast(DoubleType)).limit(100)
      .rdd.map(row=>row.getAs[Double]("cnt"))
    /*val dd =  newDataFrame.collect().take(3)
   dd.foreach(println)*/
    val destinationDataFrame = sqlContext.sql("select origin_kakou,destination_kakou from kk_relation ")
    val newDestinationDataFrame = destinationDataFrame.filter("origin_kakou = ‘360111010002‘").select("destination_kakou").collect()
    for (i <- 0 until newDestinationDataFrame.length){
      println(newDestinationDataFrame(i))
      println(newDestinationDataFrame(i).toString().substring(1,11))
      println(newDestinationDataFrame(i).toString().substring(11,13))
      val tmpDataFrame = dataFrame.filter("kk_id = ‘"+ newDestinationDataFrame(i).toString().substring(1,11)
                         +"‘ and direct = ‘"+newDestinationDataFrame(i).toString().substring(11,13)+"‘")
                        .orderBy(dataFrame("day").desc,dataFrame("hour").desc).select(dataFrame.col("cnt").cast(DoubleType)).limit(100)
        .rdd.map(row=>row.getAs[Double]("cnt"))
      //tmpDataFrame.foreach(row => println(row))
      var correlationPearson: Double = Statistics.corr(newDataFrame,tmpDataFrame)//计算不同数据之间的相关系数:皮尔逊
      println("\ncorrelationPearson:" + correlationPearson) //打印结果
    }
    println("11111")


  sc.stop()
  }
}

实现代码如上,因为Statistics.corr(RDD[Double],RDD[Double]),所以SparkSQL读取后的数据生成的dataFrame必须转换,第一步是转换成RDD[Row],Row就相当于sql查询出来的一条数据,这里也转换过多次才成功,最后百度得到可以先.cast(DoubleType)的形式。问题自己接触的少,要先看本质,然后看API,然后看案例就快了。

很明显可以从问题的描述上看是组之间的元素个数对应不上,但我已经被Row=>Double转晕了头,没有静心思考琢磨,没有专注仔细的自我对话,导致自己盲目的修改代码,还依然从转换问题上改变,后来转念一想才醒悟,以此警戒自己。limit

使用Spark下的corr计算皮尔森相似度Pearson时,报错Can only zip RDDs with same number of elements in each partition....

标签:...   new   apach   imp   stat   object   number   tin   案例   

原文地址:http://www.cnblogs.com/1023linlin/p/7297940.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!