码迷,mamicode.com
首页 > 其他好文 > 详细

基于pyspark的对KDD-99数据集的聚类分析实验

时间:2016-05-06 12:38:37      阅读:505      评论:0      收藏:0      [点我收藏+]

标签:

官话套话不想讲,介绍也不想打,都知道pyspark和KDD-99是啥吧?
不知道的话...点这里1
或者这里2

转载记得注明出处
http://blog.csdn.net/isinstance/article/details/51329766

pyspark本身是用Scala语言编写的,而Scala语言呢又是Java的变形状态,虽说spark也支持Python,但是还是没有Scala支持的好,对于pyspark的书也很少.

所以恰好前几天研究了一些,现在跟大家分享交流一下吧.
首先我是用替换后的kdd-99-10-precent文件,怎么替换文件,看这里 替换文件
然后结果大概有个70多M的样子,
我们先打开spark.

使用terminal进入spark的主目录
然后输入IPYTHON=1 ./bin/pyspark
对,没错,我用了IPython, IPython是啥?点这里IPython

然后将你替换后的文件放进spark主目录的新建文件夹1

回到terminal中导入模块

from pyspark import SparkContext, SparkConf
from pyspark.mllib.clustering import KMeans
from pyspark.mllib.feature import StandardScaler
from numpy import array
from math import sqrt

然后定义四个函数

def parse_interaction(line):
    line_split = line.split(",")
    clean_line_split = line_split[0:-1]
    return (line_split[-1], array([float(x) for x in clean_line_split]))


def distance(a, b):
    return sqrt(a.zip(b).map(lambda x: (x[0]-x[1])).map(lambda x: x*x).reduce(lambda a,b: a+b))


def dist_to_centroid(datum, clusters):
    cluster = clusters.predict(datum)
    centroid = clusters.centers[cluster]
    return sqrt(sum([x**2 for x in (centroid - datum)]))


def clustering_score(data, k):
    clusters = KMeans.train(data, k, maxIterations=10, runs=5, initializationMode="random")
    result = (k, clusters, data.map(lambda datum: dist_to_centroid(datum, clusters)).mean())
    print "Clustering score for k=%(k)d is %(score)f" % {"k": k, "score": result[2]}
    return result

第一个函数parse_interaction(line),是将文件最后那个labels的小尾巴切下来的
第二个函数distance(a, b),一看也知道,是计算两个点之间的空间距离的
第三个函数dist_to_centroid(datum, clusters)是用来计算每个点到聚类中心的距离的
第四个函数clustering_score(data, k)就是所有函数中的中心函数了,它训练kmeans,然后调用第三个函数计算,最后返回一个result包含了这次聚类训练用的k值和中心点的坐标和所有点到中心点距离的平均值.
本文所有的工作都是围绕这个函数进行的.

函数定义完之后,开始前期的数据准备工作
定义一下

max_k = 30
data_file = "1/result"

解释一下,max_k是对k值选取测试时候将要使用测试k值的最大值,如果你计算资源丰富的话也可以选100,150之类的,这里暂定为30
然后data_file是替换文件的位置.刚刚说的在spark主目录下新建一个文件夹1,然后将替换后的文件放进去就行

raw_data = sc.textFile(data_file)

载入数据

labels = raw_data.map(lambda line: line.strip().split(",")[-1])

这一步就是将数据集中标签文本切出来
方便以后的比对工作

parsed_data = raw_data.map(parse_interaction)
parsed_data_values = parsed_data.values().cache()

这一步就是将数据进行切割,将不能计算的labels项切掉
然后将数据读入内寸中

然后数据中的数据矩阵看上去很稀疏,现在我们将其转化为密集矩阵

standardizer = StandardScaler(True, True)
standardizer_model = standardizer.fit(parsed_data_values)
standardized_data_values = standardizer_model.transform(parsed_data_values)

使用Spark的规整化函数将数据密集话
然后得到规整化后的数据standardized_data_values

然后就是将数据进行训练了

scores = map(lambda k: clustering_score(standardized_data_values, k), range(10, max_k+1, 10))

这一步里面的range函数中有三个参数10, max_k+1,10
还是那句话,如果你计算资源丰富的话,可以将最后那个10改为5或者直接删掉
那个10的意义是在10到max_k+1的范围内的步伐数
就是一步跳个10步的样子
第一次k取10,第二次就直接跳到20了.

然后输出一下

min_k = min(scores, key=lambda x: x[2])[0]
print "Best k value is %(best_k)d" % {"best_k": min_k}

best_k就是最好的k取值

还记得那个函数的返回值不
现在我们将其中训练好的model提取出来

best_model = min(scores, key=lambda x: x[2])[1]

这里提取出的最好模块,也就是最佳k值训练出来的model

最后就是聚类运算kmeans

cluster_assignments_sample = standardized_data_values.map(lambda datum: str(best_model.predict(datum))+","+",".join(map(str,datum)))

然后保存计算结果

cluster_assignments_sample.saveAsTextFile("standardized")
labels.saveAsTextFile("labels")

standardized目录下生成的文件
技术分享

labels目录下生成的文件也和上面的名字一样,只是在的目录不同罢了

standardized目录下的文件中,每一个文件都和labels目录中的文件一一对应,每个文件中的每一行也是一一对应
下面我们将standardized中文件按照顺序改为
技术分享
好吧,我知道名字很烂,但是简单...
哈哈
然后labels目录下的也是改名字
技术分享
改名之后将所有除了_SUCCESSS的文件,拖入一个单独文件中
技术分享
忽视那个gg开头的文件,那是运行后的输出文件
你也看到那个ll.py了...取名比较随意...
那就是合并文件用的

可以打开文件看一下
standardized目录下的文件截取一行

16,-0.0677916519216,-1.23798593805,1.59687725081,-3.03993409184,-0.00306168273249,-0.0262872999946,-0.00667341089394,-0.0477201370966,-0.00257146549756,-0.0441358669765,-0.00978217473084,-0.417191282147,-0.00567867840782,-0.0105519415435,-0.00467566599809,-0.00564000635973,-0.011232070172,-0.00991896489162,-0.0276317877068,0.0,0.0,-0.0372626246721,-0.404816973483,-1.14040006348,-0.464089282412,-0.463520002343,4.06939148225,4.05898474876,-1.91027154631,0.596281475651,-0.203632862202,0.347966485214,-1.66602170748,-1.71327236727,0.449337984497,-1.25061954309,-0.15862913227,-0.464417155895,-0.463201923033,4.08467150823,4.09571132604

是这样的,第一个数据16就是将这样一行数据进行预测和归类后的簇的名字
它属于第16簇
然后这就难办了
labels是保存另一个文件之中,而聚类的结果保存在另一个文件中
怎么合并
反正到这里我是用土方法,有可以在spark中就将数据合并并输出的同学请联系我
super_big_hero@sina.com
介绍一下我的土办法吧
这个github的传送门
土办法

然后源代码用的时候还是要改一改,把文件名改一下就好
然后就等着它把所有数据每一行每一行的对应合并到一起
要提取结果的还有一步...
把最后的结果文件,我的是gg系列拷到另一个文件中
然后执行下面这个代码

github
得到数据结果
最后就可以在terminal中看到输出结果了

最后我的输出是这样的
技术分享
从最后那个count.py代码中大量的注释也可以看出,本人对于这个数据处理的过程还是左右摇摆的
到底怎么分离和记录不同的类型
最后还是用了按簇保存,每个簇保存在一个文件中
最后用了一下pandas的value_counts()函数输出结果

然后欢迎知道怎么直接在spark中合并数据的同学发邮件给我
super_big_hero@sina.com
原谅我不会取个好名字...
对代码有修改意见的可以直接上github fork一下,修改一下,然后push给我

基于pyspark的对KDD-99数据集的聚类分析实验

标签:

原文地址:http://blog.csdn.net/isinstance/article/details/51329766

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!