标签:统计 接下来 双子座 table 字符 creat foreach 类型 group by
最近看到有个例子很有意思,特地重现下。
这一步,我们使用Spark SQL按照星座对2000W数据进行分组统计, 看看哪个星座的人最喜欢开房。
当然, 使用纯Spark也可以完成我们的分析, 因为实际Spark SQL最终是利用Spark来完成的。
实际测试中发现这些数据并不是完全遵守一个schema, 有些数据的格式是不对的, 有些数据的数据项也是错误的。 在代码中我们要剔除那么干扰数据。
反正我们用这个数据测试者玩, 并没有严格的要求去整理哪些错误数据。
先看代码:
val sqlContext =
new
org.apache.spark.sql.SQLContext(sc)
import
sqlContext.createSchemaRDD
case
class
Customer(name: String, gender: String, ctfId: String, birthday: String, address: String)
val customer = sc.textFile(
"/mnt/share/2000W/*.csv"
).map(_.split(
","
)).filter(line => line.length >
7
).map(p => Customer(p(
0
), p(
5
), p(
4
), p(
6
), p(
7
))).distinct()
customer.registerTempTable(
"customer"
)
def toInt(s: String):Int = {
try
{
s.toInt
}
catch
{
case
e:Exception =>
9999
}
}
def myfun(birthday: String) : String = {
var rt =
"未知"
if
(birthday.length ==
8
) {
val md = toInt(birthday.substring(
4
))
if
(md >=
120
& md <=
219
)
rt =
"水瓶座"
else
if
(md >=
220
& md <=
320
)
rt =
"双鱼座"
else
if
(md >=
321
& md <=
420
)
rt =
"白羊座"
else
if
(md >=
421
& md <=
521
)
rt =
"金牛座"
else
if
(md >=
522
& md <=
621
)
rt =
"双子座"
else
if
(md >=
622
& md <=
722
)
rt =
"巨蟹座"
else
if
(md >=
723
& md <=
823
)
rt =
"狮子座"
else
if
(md >=
824
& md <=
923
)
rt =
"处女座"
else
if
(md >=
924
& md <=
1023
)
rt =
"天秤座"
else
if
(md >=
1024
& md <=
1122
)
rt =
"天蝎座"
else
if
(md >=
1123
& md <=
1222
)
rt =
"射手座"
else
if
((md >=
1223
& md <=
1231
) | (md >=
101
& md <=
119
))
rt =
"摩蝎座"
else
rt =
"未知"
}
rt
}
sqlContext.registerFunction(
"constellation"
, (x:String) => myfun(x))
var result = sqlContext.sql(
"SELECT constellation(birthday), count(constellation(birthday)) FROM customer group by constellation(birthday)"
)
result.collect().foreach(println)
为了使用spark sql,你需要引入 sqlContext.createSchemaRDD
. Spark sql一个核心对象就是SchemaRDD
。 上面的import
可以隐式的将一个RDD转换成SchemaRDD。
接着定义了Customer
类,用来映射每一行的数据, 我们只使用每一行很少的信息, 像地址,email等都没用到。
接下来从2000W文件夹中读取所有的csv文件, 创建一个RDD并注册表customer。
因为没有一个内建的函数可以将出生一起映射为星座, 所以我们需要定义一个映射函数myfun
, 并把它注册到SparkContext中。 这样我们就可以在sql语句中使用这个函数。 类似地,字符串的length函数当前也不支持, 你可以增加一个这样的函数。 因为有的日期不正确,所有特别增加了一个”未知”的星座。 错误数据可能有两种, 一是日期出错, 而是此行格式不对,将其它字段映射成了出生日期。 我们在分析的时候忽略它们好了。
然后执行一个分组的sql语句。这个sql语句查询结果类型为SchemaRDD, 也继承了RDD所有的操作。
最后将结果打印出来。
[双子座, 1406018 ] [双鱼座, 1509839 ] [摩蝎座, 2404812 ] [金牛座, 1406225 ] [水瓶座, 1635358 ] [巨蟹座, 1498077 ] [处女座, 1666009 ] [天秤座, 1896544 ] [白羊座, 1409838 ] [射手座, 1614915 ] [未知, 160483 ] [狮子座, 1613529 ] |
看起来魔蝎座的人最喜欢开房了, 明显比其它星座的人要多。
我们也可以分析一下开房的男女比例:
1
2
3
|
...... result = sqlContext.sql( "SELECT gender, count(gender) FROM customer where gender = ‘F‘ or gender = ‘M‘ group by gender" ) result.collect().foreach(println) |
结果显示男女开房的人数大约是2:1
1
2
|
[F, 6475461 ] [M, 12763926 ] |
标签:统计 接下来 双子座 table 字符 creat foreach 类型 group by
原文地址:http://www.cnblogs.com/hd-zg/p/6187927.html