标签:
官话套话不想讲,介绍也不想打,都知道pyspark和KDD-99是啥吧?
不知道的话...点这里1
或者这里2
转载记得注明出处
http://blog.csdn.net/isinstance/article/details/51329766
pyspark本身是用Scala语言编写的,而Scala语言呢又是Java的变形状态,虽说spark也支持Python,但是还是没有Scala支持的好,对于pyspark的书也很少.
所以恰好前几天研究了一些,现在跟大家分享交流一下吧.
首先我是用替换后的kdd-99-10-precent文件,怎么替换文件,看这里 替换文件
然后结果大概有个70多M的样子,
我们先打开spark.
使用terminal进入spark的主目录
然后输入IPYTHON=1 ./bin/pyspark
对,没错,我用了IPython, IPython是啥?点这里IPython
然后将你替换后的文件放进spark主目录的新建文件夹1中
回到terminal中导入模块
from pyspark import SparkContext, SparkConf
from pyspark.mllib.clustering import KMeans
from pyspark.mllib.feature import StandardScaler
from numpy import array
from math import sqrt
然后定义四个函数
def parse_interaction(line):
line_split = line.split(",")
clean_line_split = line_split[0:-1]
return (line_split[-1], array([float(x) for x in clean_line_split]))
def distance(a, b):
return sqrt(a.zip(b).map(lambda x: (x[0]-x[1])).map(lambda x: x*x).reduce(lambda a,b: a+b))
def dist_to_centroid(datum, clusters):
cluster = clusters.predict(datum)
centroid = clusters.centers[cluster]
return sqrt(sum([x**2 for x in (centroid - datum)]))
def clustering_score(data, k):
clusters = KMeans.train(data, k, maxIterations=10, runs=5, initializationMode="random")
result = (k, clusters, data.map(lambda datum: dist_to_centroid(datum, clusters)).mean())
print "Clustering score for k=%(k)d is %(score)f" % {"k": k, "score": result[2]}
return result
第一个函数parse_interaction(line),是将文件最后那个labels的小尾巴切下来的
第二个函数distance(a, b),一看也知道,是计算两个点之间的空间距离的
第三个函数dist_to_centroid(datum, clusters)是用来计算每个点到聚类中心的距离的
第四个函数clustering_score(data, k)就是所有函数中的中心函数了,它训练kmeans,然后调用第三个函数计算,最后返回一个result包含了这次聚类训练用的k值和中心点的坐标和所有点到中心点距离的平均值.
本文所有的工作都是围绕这个函数进行的.
函数定义完之后,开始前期的数据准备工作
定义一下
max_k = 30
data_file = "1/result"
解释一下,max_k是对k值选取测试时候将要使用测试k值的最大值,如果你计算资源丰富的话也可以选100,150之类的,这里暂定为30
然后data_file是替换文件的位置.刚刚说的在spark主目录下新建一个文件夹1,然后将替换后的文件放进去就行
raw_data = sc.textFile(data_file)
载入数据
labels = raw_data.map(lambda line: line.strip().split(",")[-1])
这一步就是将数据集中标签文本切出来
方便以后的比对工作
parsed_data = raw_data.map(parse_interaction)
parsed_data_values = parsed_data.values().cache()
这一步就是将数据进行切割,将不能计算的labels项切掉
然后将数据读入内寸中
然后数据中的数据矩阵看上去很稀疏,现在我们将其转化为密集矩阵
standardizer = StandardScaler(True, True)
standardizer_model = standardizer.fit(parsed_data_values)
standardized_data_values = standardizer_model.transform(parsed_data_values)
使用Spark的规整化函数将数据密集话
然后得到规整化后的数据standardized_data_values
然后就是将数据进行训练了
scores = map(lambda k: clustering_score(standardized_data_values, k), range(10, max_k+1, 10))
这一步里面的range函数中有三个参数10, max_k+1,10
还是那句话,如果你计算资源丰富的话,可以将最后那个10改为5或者直接删掉
那个10的意义是在10到max_k+1的范围内的步伐数
就是一步跳个10步的样子
第一次k取10,第二次就直接跳到20了.
然后输出一下
min_k = min(scores, key=lambda x: x[2])[0]
print "Best k value is %(best_k)d" % {"best_k": min_k}
best_k就是最好的k取值
还记得那个函数的返回值不
现在我们将其中训练好的model提取出来
best_model = min(scores, key=lambda x: x[2])[1]
这里提取出的最好模块,也就是最佳k值训练出来的model
最后就是聚类运算kmeans
cluster_assignments_sample = standardized_data_values.map(lambda datum: str(best_model.predict(datum))+","+",".join(map(str,datum)))
然后保存计算结果
cluster_assignments_sample.saveAsTextFile("standardized")
labels.saveAsTextFile("labels")
standardized目录下生成的文件
labels目录下生成的文件也和上面的名字一样,只是在的目录不同罢了
standardized目录下的文件中,每一个文件都和labels目录中的文件一一对应,每个文件中的每一行也是一一对应
下面我们将standardized中文件按照顺序改为
好吧,我知道名字很烂,但是简单...
哈哈
然后labels目录下的也是改名字
改名之后将所有除了_SUCCESSS的文件,拖入一个单独文件中
忽视那个gg开头的文件,那是运行后的输出文件
你也看到那个ll.py了...取名比较随意...
那就是合并文件用的
可以打开文件看一下
standardized目录下的文件截取一行
16,-0.0677916519216,-1.23798593805,1.59687725081,-3.03993409184,-0.00306168273249,-0.0262872999946,-0.00667341089394,-0.0477201370966,-0.00257146549756,-0.0441358669765,-0.00978217473084,-0.417191282147,-0.00567867840782,-0.0105519415435,-0.00467566599809,-0.00564000635973,-0.011232070172,-0.00991896489162,-0.0276317877068,0.0,0.0,-0.0372626246721,-0.404816973483,-1.14040006348,-0.464089282412,-0.463520002343,4.06939148225,4.05898474876,-1.91027154631,0.596281475651,-0.203632862202,0.347966485214,-1.66602170748,-1.71327236727,0.449337984497,-1.25061954309,-0.15862913227,-0.464417155895,-0.463201923033,4.08467150823,4.09571132604
是这样的,第一个数据16就是将这样一行数据进行预测和归类后的簇的名字
它属于第16簇
然后这就难办了
labels是保存另一个文件之中,而聚类的结果保存在另一个文件中
怎么合并
反正到这里我是用土方法,有可以在spark中就将数据合并并输出的同学请联系我
super_big_hero@sina.com
介绍一下我的土办法吧
这个github的传送门
土办法
然后源代码用的时候还是要改一改,把文件名改一下就好
然后就等着它把所有数据每一行每一行的对应合并到一起
要提取结果的还有一步...
把最后的结果文件,我的是gg系列拷到另一个文件中
然后执行下面这个代码
github
得到数据结果
最后就可以在terminal中看到输出结果了
最后我的输出是这样的
从最后那个count.py代码中大量的注释也可以看出,本人对于这个数据处理的过程还是左右摇摆的
到底怎么分离和记录不同的类型
最后还是用了按簇保存,每个簇保存在一个文件中
最后用了一下pandas的value_counts()函数输出结果
然后欢迎知道怎么直接在spark中合并数据的同学发邮件给我
super_big_hero@sina.com
原谅我不会取个好名字...
对代码有修改意见的可以直接上github fork一下,修改一下,然后push给我
标签:
原文地址:http://blog.csdn.net/isinstance/article/details/51329766