标签:推荐系统
空白 | … | |||
---|---|---|---|---|
0 | 1 | … | 1 | |
1 | 0 | … | 1 | |
… | … | … | … | … |
1 | 1 | 0 | 0 |
下面给出计算向量
夹角余弦
∑Nk=1 xk×yk∑Nk=1 x2k???????√∑Nk=1 y2k???????√
皮尔逊相似度
∑(x?xˉ)×(y?yˉ)∑( x?xˉ)2?????????√∑( y?yˉ)2?????????√
欧式距离
∑k=1N(xk?yk)2????????????
|x?y||x|+|y|?|x?y|
空 | ||
---|---|---|
N=k11+k12+k21+k22
loglikelihoodratio=2?(rowEntropy?columnEntropy?matrixEntropy)
rowEntropy=?k11+k12N×log2(k11+k12N)?k21+k22N×log2(k21+k22N)
其中rowEntropy,columnEntropy,matrixEntropy的计算公式如下
columnEntropy=?k11+k21N×log2(k11+k21N)?k12+k22N×log2(k12+k22N)
在计算任意向量
a) 首先将
x^=preprocess(x),y^=preprocess(y)
b) 将经过第一步处理后的向量
ni=norm(xˉ),nj=norm(yˉ)
c) 在a),b)的基础上计算出向量的相似度
Sij=similarity(dotij,ni,nj),其中:dotij=x^?y^
算子说明
三个步骤计算相似度的示例
假设
x^=preprocess(x)=bin(x)=???101???
b) 计算范数
ni=norm(xˉ)=|x^|1=2,nj=norm(yˉ)=|y^|1=3
c) 计算杰卡相似度
jacc(i,j)=dotijni+nj?dotij=22+3?2=23
在3.1中介绍了如何用三步标准步骤计算向量间的相似度,对于不同的相似度计算方法,这三个步骤中所涉及到的计算算子是不相同的,下表总结了常用相似度的计算算子
余弦相似度 | - | ||
皮尔逊相似度 | - | ||
欧式距离 | - | ||
杰卡距离 | |||
曼哈顿距离 | |||
Log-likelihood ratio |
经过前面的介绍,如果现在想计算1中介绍的数据格式中的商品相似度,只需要三个MapReduce任务就可以完成,下面分别介绍这两个MR任务的实现,由于Hadoop提供的Streaming接口能够通过标准输入、输出流和其它程序进行数据交互,只要算的上一种语言的基本都支持标准输入、输出流吧,这使其它语言也可以很方便的编写MR程序,具体的就不bb了,上干货。
余弦相似度 | 计算向量2的范数(Mapper:cfCosiPreMapper_1.py,Reducer:cfCosiPreReducer_1.py) | 数据归一化(Mapper:cfCosiPreMapper_2.py) |
皮尔逊相似度 | 计算向量的和,平方和,2的范数(Mapper:cfPearPreMapper_1.py,Reduce:cfPearPreReducer_1.py) | - |
杰卡距离、曼哈顿距离、Log-likelihood ratio | 对向量二值化(Mapper:cfPreMapper.py) | - |
欧式距离 | - | - |
前处理示意图
余弦相似度的前处理1示意图
余弦相似度的前处理2示意图
第二个MR程序,对应于
欧式距离 | 计算向量长度的平方(Mapper:cfNormMapper.py,Reducer:cfNormReducer.py) |
杰卡距离、曼哈顿距离、Log-likelihood ratio | 向量1的范数(Mapper:cfNormMapper.py,Reducer:cfNormReducer.py) |
余弦距离,皮尔逊相似度 | - |
代码分成两部分:
下面按照前处理,计算,后处理对cfItemSim.sh中的逻辑进行介绍
#=======================================================#
#===================数据预处理方法======================#
#=======================================================#
if [ ${pre} = ‘TRUE‘ ]
then
echo "Preprocess the data......"
#计算uuid1d对应的id
hive -e "
set mapreduce.job.reduce.slowstart.completedmaps=0.9;
drop table ${IN_TABLE}_pin_id;
create table ${IN_TABLE}_pin_id
as
select cc.uuid1d,cc.id-1 id from
(select bb.uuid1d,row_number(bb.num) as id from
(select distinct 1 as num, uuid1d from ${IN_TABLE} order by num) bb) cc
order by id
"
#计算sku对应的id
hive -e "
set mapreduce.job.reduce.slowstart.completedmaps=0.9;
drop table ${IN_TABLE}_sku_id;
create table ${IN_TABLE}_sku_id
as
select cc.sku,cc.id-1 id from
(select bb.sku,row_number(bb.num) as id from
(select distinct 1 as num, sku from ${IN_TABLE} order by num) bb) cc
order by id
"
#关联出uuid1d对应的数据
hive -e "
set mapreduce.job.reduce.slowstart.completedmaps=0.9;
drop table ${IN_TABLE}_id;
create table ${IN_TABLE}_id
as
select concat_ws(‘#‘,collect_set(sku)) from
(select b.id,concat_ws(‘_‘,cast(bb.id as string),cast(a.weight as string)) sku from
${IN_TABLE} a
join
${IN_TABLE}_pin_id b on (a.uuid1d=b.uuid1d)
join
${IN_TABLE}_sku_id bb on (a.sku=bb.sku)) c
group by c.id
"
fi
#在itembased cf计算之前一些数据准备工作
N=`hive -e "select max(id)+1 from ${IN_TABLE}_pin_id"`
lib=`echo ${IN_TABLE}_id | awk ‘BEGIN{FS="."} {print $1}‘`
tab=`echo ${IN_TABLE}_id | awk ‘BEGIN{FS="."} {print $2}‘`
input=/user/recsys/${lib}.db/${tab}
output=${tmp}/${method}_output
#=======================================================#
#===============itemSimilarity计算方法==================#
#=======================================================#
echo "Calculate the item to item similarity using cf......"
#Log-likelihood ratio
if [ $method = ‘logl‘ ]
then
#预处理
hadoop fs -rm -r ${tmp}/pre1
cd /data0/recsys/zhongchao/recsys_work/cf;
hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.2.0.jar -Dmapreduce.job.reduce.slowstart.completedmaps=0.9 -Dstream.non.zero.exit.is.failure=false -Dmapred.reduce.tasks=0 -Dmapred.map.child.java.opts=-Xmx2048m -Dmapred.reduce.child.java.opts=-Xmx2048m -input ${input} -output ${tmp}/pre1 -mapper "python ./cfPreMapper.py" -file "cfPreMapper.py"
#计算norm
hadoop fs -rm -r ${tmp}/pre2
cd /data0/recsys/zhongchao/recsys_work/cf;
hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.2.0.jar -Dmapreduce.job.reduce.slowstart.completedmaps=0.9 -Dstream.non.zero.exit.is.failure=false -Dmapred.reduce.tasks=20 -Dmapred.map.child.java.opts=-Xmx2048m -Dmapred.reduce.child.java.opts=-Xmx2048m -input ${tmp}/pre1 -output ${tmp}/pre2 -mapper "python ./cfNormMapper.py" -file "cfNormMapper.py" -reducer "python ./cfNormReducer.py ${method}" -file "cfNormReducer.py"
norm_file=‘norm_‘`echo $RANDOM`
norm_sort_file=‘norm_sort‘`echo $RANDOM`
# echo $tmp_file1
echo $norm_file
echo $norm_sort_file
hadoop fs -getmerge ${tmp}/pre2 ${norm_file}
cat ${norm_file} | sort -k1 -n > ${norm_sort_file}
#cat hdfs_data.txt | python ./cfNormMapper.py | sort -k1 -n | python ./cfNormReducer.py > norm_sort.txt
#计算相似度
hadoop fs -rm -r ${output};
cd /data0/recsys/zhongchao/recsys_work/cf;
hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.2.0.jar -Dmapreduce.job.reduce.slowstart.completedmaps=0.9 -Dstream.non.zero.exit.is.failure=false -Dmapred.reduce.tasks=80 -Dmapred.map.child.java.opts=-Xmx2048m -Dmapred.reduce.child.java.opts=-Xmx2048m -input ${tmp}/pre1 -output ${output} -mapper "python ./cfSimMapper.py" -file "cfSimMapper.py" -reducer "python ./cfSimReducer.py ${method} $N ${norm_sort_file}" -file "cfSimReducer.py" -file ${norm_sort_file}
[ $? -ne 0 ] && { echo "cfSimReducer 错误";exit 2;}
\rm -r ${norm_file}
\rm -r ${norm_sort_file}
#cat pre1 | python ./cfSimMapper.py | sort -k1 -n | python ./cfSimReducer.py logl 12834737 norm_sort28075 | sort -k12 -n > logl.txt
fi
#Jaccard coeffcient
if [ $method = ‘jacc‘ ]
then
#预处理
hadoop fs -rm -r ${tmp}/pre1
cd /data0/recsys/zhongchao/recsys_work/cf;
hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.2.0.jar -Dmapreduce.job.reduce.slowstart.completedmaps=0.9 -Dstream.non.zero.exit.is.failure=false -Dmapred.reduce.tasks=0 -Dmapred.map.child.java.opts=-Xmx2048m -Dmapred.reduce.child.java.opts=-Xmx2048m -input ${input} -output ${tmp}/pre1 -mapper "python ./cfPreMapper.py" -file "cfPreMapper.py"
#计算norm
hadoop fs -rm -r ${tmp}/pre2
cd /data0/recsys/zhongchao/recsys_work/cf;
hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.2.0.jar -Dmapreduce.job.reduce.slowstart.completedmaps=0.9 -Dstream.non.zero.exit.is.failure=false -Dmapred.reduce.tasks=20 -Dmapred.map.child.java.opts=-Xmx2048m -Dmapred.reduce.child.java.opts=-Xmx2048m -input ${tmp}/pre1 -output ${tmp}/pre2 -mapper "python ./cfNormMapper.py" -file "cfNormMapper.py" -reducer "python ./cfNormReducer.py ${method}" -file "cfNormReducer.py"
#cat hdfs_data.txt | python ./cfNormMapper.py | sort -k1 -n | python ./cfNormReducer.py > norm_sort.txt
#计算相似度
norm_file=‘norm_‘`echo $RANDOM`
norm_sort_file=‘norm_sort‘`echo $RANDOM`
# echo $tmp_file1
echo $norm_file
echo $norm_sort_file
hadoop fs -getmerge ${tmp}/pre2 ${norm_file}
cat ${norm_file} | sort -k1 -n > ${norm_sort_file}
hadoop fs -rm -r ${output};
cd /data0/recsys/zhongchao/recsys_work/cf;
hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.2.0.jar -Dmapreduce.job.reduce.slowstart.completedmaps=0.9 -Dstream.non.zero.exit.is.failure=false -Dmapred.reduce.tasks=80 -Dmapred.map.child.java.opts=-Xmx2048m -Dmapred.reduce.child.java.opts=-Xmx2048m -input ${tmp}/pre1 -output ${output} -mapper "python ./cfSimMapper.py" -file "cfSimMapper.py" -reducer "python ./cfSimReducer.py ${method} $N ${norm_sort_file}" -file "cfSimReducer.py" -file ${norm_sort_file}
\rm -r ${norm_file}
\rm -r ${norm_sort_file}
#cat pre1 | python ./cfSimMapper.py | sort -k1 -n | python ./cfSimReducer.py jacc 100 norm_sort3284 | sort -k12 -n > text.txt
fi
#=======================================================#
#==============后处理,取topK,生成hive表===============#
#=======================================================#
hadoop fs -rm -r ${output}/_SUCCESS
#建立外部临时表
s="
drop table ${OUT_TABLE}_t1;
CREATE EXTERNAL TABLE ${OUT_TABLE}_t1(id1 string,id2 string,weight double)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ‘\t‘
STORED AS TEXTFILE
LOCATION ‘${output}‘"
hive -e "$s"
[ $? -ne 0 ] && { echo "$s EXIT";exit 2;}
#取临时表中的topK数据
s="
drop table ${OUT_TABLE}_t2;
create table ${OUT_TABLE}_t2
as
select id1,id2,weight from
(select id1,id2,weight,row_number(id1) id from
(select id1,id2,weight from ${OUT_TABLE}_t1 distribute by id1 sort by id1,weight desc) a) b
where b.id<${K}
"
#关联出最终结果
hive -e "$s"
[ $? -ne 0 ] && { echo "$s EXIT";exit 2;}
s="
drop table ${OUT_TABLE};
create table ${OUT_TABLE}
as
select b.sku sku1,c.sku sku2,a.weight from
${OUT_TABLE}_t2 a
join
${IN_TABLE}_sku_id b on (a.id1=b.id)
join
${IN_TABLE}_sku_id c on (a.id2=c.id)
"
hive -e "$s"
[ $? -ne 0 ] && { echo "$s EXIT";exit 2;}
参考文献:《Scalable Similarity-Based Neighborhood Methods with MapReduce》
版权声明:本文为博主原创文章,未经博主允许不得转载。
Streaming+Python实现Itembased CF
标签:推荐系统
原文地址:http://blog.csdn.net/zc02051126/article/details/47748617