标签:style blog http io ar color os 使用 sp
package com.util;
import java.io.IOException;
import java.util.Arrays;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.mahout.common.iterator.sequencefile.PathType;
import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirValueIterator;
import org.apache.mahout.common.iterator.sequencefile.SequenceFileValueIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
publicfinalclassHadoopUtil{
privatestaticfinalLogger log =LoggerFactory.getLogger(HadoopUtil.class);
privateHadoopUtil(){}
publicstaticJob prepareJob(String jobName,
String[] inputPath,
String outputPath,
Class<?extendsInputFormat> inputFormat,
Class<?extendsMapper> mapper,
Class<?extendsWritable> mapperKey,
Class<?extendsWritable> mapperValue,
Class<?extendsOutputFormat> outputFormat,Configuration conf)throwsIOException{
Job job =newJob(newConfiguration(conf));
job.setJobName(jobName);
Configuration jobConf = job.getConfiguration();
if(mapper.equals(Mapper.class)){
thrownewIllegalStateException("Can‘t figure out the user class jar file from mapper/reducer");
}
job.setJarByClass(mapper);
job.setInputFormatClass(inputFormat);
job.setInputFormatClass(inputFormat);
StringBuilder inputPathsStringBuilder =newStringBuilder();
for(String p : inputPath){
inputPathsStringBuilder.append(",").append(p);
}
inputPathsStringBuilder.deleteCharAt(0);
jobConf.set("mapred.input.dir", inputPathsStringBuilder.toString());
job.setMapperClass(mapper);
job.setMapOutputKeyClass(mapperKey);
job.setMapOutputValueClass(mapperValue);
job.setOutputKeyClass(mapperKey);
job.setOutputValueClass(mapperValue);
jobConf.setBoolean("mapred.compress.map.output",true);
job.setNumReduceTasks(0);
job.setOutputFormatClass(outputFormat);
jobConf.set("mapred.output.dir", outputPath);
return job;
}
publicstaticJob prepareJob(String jobName,
String[] inputPath,
String outputPath,
Class<?extendsInputFormat> inputFormat,
Class<?extendsMapper> mapper,
Class<?extendsWritable> mapperKey,
Class<?extendsWritable> mapperValue,
Class<?extendsReducer> reducer,
Class<?extendsWritable> reducerKey,
Class<?extendsWritable> reducerValue,
Class<?extendsOutputFormat> outputFormat,
Configuration conf)throwsIOException{
Job job =newJob(newConfiguration(conf));
job.setJobName(jobName);
Configuration jobConf = job.getConfiguration();
if(reducer.equals(Reducer.class)){
if(mapper.equals(Mapper.class)){
thrownewIllegalStateException("Can‘t figure out the user class jar file from mapper/reducer");
}
job.setJarByClass(mapper);
}else{
job.setJarByClass(reducer);
}
job.setInputFormatClass(inputFormat);
StringBuilder inputPathsStringBuilder =newStringBuilder();
for(String p : inputPath){
inputPathsStringBuilder.append(",").append(p);
}
inputPathsStringBuilder.deleteCharAt(0);
jobConf.set("mapred.input.dir", inputPathsStringBuilder.toString());
job.setMapperClass(mapper);
if(mapperKey !=null){
job.setMapOutputKeyClass(mapperKey);
}
if(mapperValue !=null){
job.setMapOutputValueClass(mapperValue);
}
jobConf.setBoolean("mapred.compress.map.output",true);
job.setReducerClass(reducer);
job.setOutputKeyClass(reducerKey);
job.setOutputValueClass(reducerValue);
job.setOutputFormatClass(outputFormat);
jobConf.set("mapred.output.dir", outputPath);
return job;
}
publicstaticJob prepareJob(String jobName,String[] inputPath,
String outputPath,Class<?extendsInputFormat> inputFormat,
Class<?extendsMapper> mapper,
Class<?extendsWritable> mapperKey,
Class<?extendsWritable> mapperValue,
Class<?extendsReducer> combiner,
Class<?extendsReducer> reducer,
Class<?extendsWritable> reducerKey,
Class<?extendsWritable> reducerValue,
Class<?extendsOutputFormat> outputFormat,Configuration conf)
throwsIOException{
Job job =newJob(newConfiguration(conf));
job.setJobName(jobName);
Configuration jobConf = job.getConfiguration();
if(reducer.equals(Reducer.class)){
if(mapper.equals(Mapper.class)){
thrownewIllegalStateException(
"Can‘t figure out the user class jar file from mapper/reducer");
}
job.setJarByClass(mapper);
}else{
job.setJarByClass(reducer);
}
job.setInputFormatClass(inputFormat);
StringBuilder inputPathsStringBuilder =newStringBuilder();
for(String p : inputPath){
inputPathsStringBuilder.append(",").append(p);
}
inputPathsStringBuilder.deleteCharAt(0);
jobConf.set("mapred.input.dir", inputPathsStringBuilder.toString());
job.setMapperClass(mapper);
if(mapperKey !=null){
job.setMapOutputKeyClass(mapperKey);
}
if(mapperValue !=null){
job.setMapOutputValueClass(mapperValue);
}
jobConf.setBoolean("mapred.compress.map.output",true);
job.setCombinerClass(combiner);
job.setReducerClass(reducer);
job.setOutputKeyClass(reducerKey);
job.setOutputValueClass(reducerValue);
job.setOutputFormatClass(outputFormat);
jobConf.set("mapred.output.dir", outputPath);
return job;
}
publicstaticString getCustomJobName(String className,JobContext job,
Class<?extendsMapper> mapper,
Class<?extendsReducer> reducer){
StringBuilder name =newStringBuilder(100);
String customJobName = job.getJobName();
if(customJobName ==null|| customJobName.trim().isEmpty()){
name.append(className);
}else{
name.append(customJobName);
}
name.append(‘-‘).append(mapper.getSimpleName());
name.append(‘-‘).append(reducer.getSimpleName());
return name.toString();
}
publicstaticvoiddelete(Configuration conf,Iterable<Path> paths)throwsIOException{
if(conf ==null){
conf =newConfiguration();
}
for(Path path : paths){
FileSystem fs = path.getFileSystem(conf);
if(fs.exists(path)){
log.info("Deleting {}", path);
fs.delete(path,true);
}
}
}
publicstaticvoiddelete(Configuration conf,Path... paths)throwsIOException{
delete(conf,Arrays.asList(paths));
}
publicstaticlong countRecords(Path path,Configuration conf)throwsIOException{
long count =0;
Iterator<?> iterator =newSequenceFileValueIterator<Writable>(path,true, conf);
while(iterator.hasNext()){
iterator.next();
count++;
}
return count;
}
publicstaticlong countRecords(Path path,PathType pt,PathFilter filter,Configuration conf)throwsIOException{
long count =0;
Iterator<?> iterator =newSequenceFileDirValueIterator<Writable>(path, pt, filter,null,true, conf);
while(iterator.hasNext()){
iterator.next();
count++;
}
return count;
}
}
package com.mapper;
import java.io.IOException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.mahout.math.VarLongWritable;
/**
* mapper输入格式:userID:itemID1 itemID2 itemID3....
* mapper输出格式:<userID,itemID>
* @author 曾昭正
*/
publicclassSourceDataToItemPrefsMapperextendsMapper<LongWritable,Text,VarLongWritable,VarLongWritable>{
//private static final Logger logger = LoggerFactory.getLogger(SourceDataToItemPrefsMapper.class);
privatestaticfinalPattern NUMBERS =Pattern.compile("(\\d+)");
privateString line =null;
@Override
protectedvoid map(LongWritable key,Text value,Context context)
throwsIOException,InterruptedException{
line = value.toString();
if(line ==null)return;
// logger.info("line:"+line);
Matcher matcher = NUMBERS.matcher(line);
matcher.find();//寻找第一个分组,即userID
VarLongWritable userID =newVarLongWritable(Long.parseLong(matcher.group()));//这个类型是在mahout中独立进行封装的
VarLongWritable itemID =newVarLongWritable();
while(matcher.find()){
itemID.set(Long.parseLong(matcher.group()));
// logger.info(userID + " " + itemID);
context.write(userID, itemID);
}
}
}
package com.reducer;
import java.io.IOException;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.mahout.math.RandomAccessSparseVector;
import org.apache.mahout.math.VarLongWritable;
import org.apache.mahout.math.Vector;
import org.apache.mahout.math.VectorWritable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* reducer输入:<userID,Iterable<itemID>>
* reducer输出:<userID,VecotrWriable<index=itemID,valuce=pres>....>
* @author 曾昭正
*/
publicclassSourceDataToUserVectorReducerextendsReducer<VarLongWritable,VarLongWritable,VarLongWritable,VectorWritable>{
privatestaticfinalLogger logger =LoggerFactory.getLogger(SourceDataToUserVectorReducer.class);
@Override
protectedvoid reduce(VarLongWritable userID,Iterable<VarLongWritable> itemPrefs,Context context)
throwsIOException,InterruptedException{
/**
* DenseVector,它的实现就是一个浮点数数组,对向量里所有域都进行存储,适合用于存储密集向量。
RandomAccessSparseVector 基于浮点数的 HashMap 实现的,key 是整形 (int) 类型,value 是浮点数 (double) 类型,它只存储向量中不为空的值,并提供随机访问。
SequentialAccessVector 实现为整形 (int) 类型和浮点数 (double) 类型的并行数组,它也只存储向量中不为空的值,但只提供顺序访问。
用户可以根据自己算法的需求选择合适的向量实现类,如果算法需要很多随机访问,应该选择 DenseVector 或者 RandomAccessSparseVector,如果大部分都是顺序访问,SequentialAccessVector 的效果应该更好。
介绍了向量的实现,下面我们看看如何将现有的数据建模成向量,术语就是“如何对数据进行向量化”,以便采用 Mahout 的各种高效的聚类算法。
*/
Vector userVector =newRandomAccessSparseVector(Integer.MAX_VALUE,100);
for(VarLongWritable itemPref : itemPrefs){
userVector.set((int)itemPref.get(),1.0f);//RandomAccessSparseVector.set(index,value),用户偏好类型为boolean类型,将偏好值默认都为1.0f
}
logger.info(userID+" "+newVectorWritable(userVector));
context.write(userID,newVectorWritable(userVector));
}
}
package com.mapper;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.mahout.math.VarLongWritable;
import org.apache.mahout.math.Vector;
import org.apache.mahout.math.VectorWritable;
/**
* mapper输入:<userID,VecotrWriable<index=itemID,valuce=pres>....>
* mapper输出:<itemID,itemID>(共现物品id对)
* @author 曾昭正
*/
publicclassUserVectorToCooccurrenceMapperextendsMapper<VarLongWritable,VectorWritable,IntWritable,IntWritable>{
@Override
protectedvoid map(VarLongWritable userID,VectorWritable userVector,Context context)
throwsIOException,InterruptedException{
Iterator<Vector.Element> it = userVector.get().nonZeroes().iterator();//过滤掉非空元素
while(it.hasNext()){
int index1 = it.next().index();
Iterator<Vector.Element> it2 = userVector.get().nonZeroes().iterator();
while(it2.hasNext()){
int index2 = it2.next().index();
context.write(newIntWritable(index1),newIntWritable(index2));
}
}
}
}
package com.reducer;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.mahout.cf.taste.hadoop.item.VectorOrPrefWritable;
import org.apache.mahout.math.RandomAccessSparseVector;
import org.apache.mahout.math.Vector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* reducer输入:<itemID,Iterable<itemIDs>>
* reducer输出:<mainItemID,Vector<coocItemID,coocTime(共现次数)>....>
* @author 曾昭正
*/
publicclassUserVectorToCoocurrenceReducerextendsReducer<IntWritable,IntWritable,IntWritable,VectorOrPrefWritable>{
privatestaticfinalLogger logger =LoggerFactory.getLogger(UserVectorToCoocurrenceReducer.class);
@Override
protectedvoid reduce(IntWritable mainItemID,Iterable<IntWritable> coocItemIDs,Context context)
throwsIOException,InterruptedException{
Vector coocItemIDVectorRow =newRandomAccessSparseVector(Integer.MAX_VALUE,100);
for(IntWritable coocItem : coocItemIDs){
int itemCoocTime = coocItem.get();
coocItemIDVectorRow.set(itemCoocTime,coocItemIDVectorRow.get(itemCoocTime)+1.0);//将共现次数累加
}
logger.info(mainItemID +" "+newVectorOrPrefWritable(coocItemIDVectorRow));
context.write(mainItemID,newVectorOrPrefWritable(coocItemIDVectorRow));//记录mainItemID的完整共现关系
}
}
package com.mapper;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.mahout.cf.taste.hadoop.item.VectorOrPrefWritable;
import org.apache.mahout.math.VarLongWritable;
import org.apache.mahout.math.Vector;
import org.apache.mahout.math.Vector.Element;
import org.apache.mahout.math.VectorWritable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 将用户向量分割,以便和物品的共现向量进行合并
* mapper输入:<userID,Vector<itemIDIndex,preferenceValuce>....>
* reducer输出:<itemID,Vecotor<userID,preferenceValuce>....>
* @author 曾昭正
*/
publicclassUserVecotrSplitMapperextendsMapper<VarLongWritable,VectorWritable,IntWritable,VectorOrPrefWritable>{
privatestaticfinalLogger logger =LoggerFactory.getLogger(UserVecotrSplitMapper.class);
@Override
protectedvoid map(VarLongWritable userIDWritable,VectorWritable value,Context context)
throwsIOException,InterruptedException{
IntWritable itemIDIndex =newIntWritable();
long userID = userIDWritable.get();
Vector userVector = value.get();
Iterator<Element> it = userVector.nonZeroes().iterator();//只取非空用户向量
while(it.hasNext()){
Element e = it.next();
int itemID = e.index();
itemIDIndex.set(itemID);
float preferenceValuce =(float) e.get();
logger.info(itemIDIndex +" "+newVectorOrPrefWritable(userID,preferenceValuce));
context.write(itemIDIndex,newVectorOrPrefWritable(userID,preferenceValuce));
}
}
}
package com.mapper;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.mahout.cf.taste.hadoop.item.VectorOrPrefWritable;
/**
* 将共现矩阵和分割后的用户向量进行合并,以便计算部分的推荐向量
* 这个mapper其实没有什么逻辑处理功能,只是将数据按照输入格式输出
* 注意:这里的mapper输入为共现矩阵和分割后的用户向量计算过程中的共同输出的2个目录
* mapper输入:<itemID,Vecotor<userID,preferenceValuce>> or <itemID,Vecotor<coocItemID,coocTimes>>
* mapper输出:<itemID,Vecotor<userID,preferenceValuce>/Vecotor<coocItemID,coocTimes>>
* @author 曾昭正
*/
publicclassCombineUserVectorAndCoocMatrixMapperextendsMapper<IntWritable,VectorOrPrefWritable,IntWritable,VectorOrPrefWritable>{
@Override
protectedvoid map(IntWritable itemID,VectorOrPrefWritable value,Context context)
throwsIOException,InterruptedException{
context.write(itemID, value);
}
}
package com.reducer;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.mahout.cf.taste.hadoop.item.VectorAndPrefsWritable;
import org.apache.mahout.cf.taste.hadoop.item.VectorOrPrefWritable;
import org.apache.mahout.math.Vector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 将共现矩阵和分割后的用户向量进行合并,以便计算部分的推荐向量
* @author 曾昭正
*/
publicclassCombineUserVectorAndCoocMatrixReducerextendsReducer<IntWritable,VectorOrPrefWritable,IntWritable,VectorAndPrefsWritable>{
privatestaticfinalLogger logger =LoggerFactory.getLogger(CombineUserVectorAndCoocMatrixReducer.class);
@Override
protectedvoid reduce(IntWritable itemID,Iterable<VectorOrPrefWritable> values,Context context)
throwsIOException,InterruptedException{
VectorAndPrefsWritable vectorAndPrefsWritable =newVectorAndPrefsWritable();
List<Long> userIDs =newArrayList<Long>();
List<Float> preferenceValues =newArrayList<Float>();
Vector coocVector =null;
Vector coocVectorTemp =null;
Iterator<VectorOrPrefWritable> it = values.iterator();
while(it.hasNext()){
VectorOrPrefWritable e = it.next();
coocVectorTemp = e.getVector();
if(coocVectorTemp ==null){
userIDs.add(e.getUserID());
preferenceValues.add(e.getValue());
}else{
coocVector = coocVectorTemp;
}
}
if(coocVector !=null){
//这个需要注意,根据共现矩阵的计算reduce聚合之后,到了这个一个Reudce分组就有且只有一个vecotr(即共现矩阵的一列或者一行,这里行和列是一样的)了。
vectorAndPrefsWritable.set(coocVector, userIDs, preferenceValues);
logger.info(itemID+" "+vectorAndPrefsWritable);
context.write(itemID, vectorAndPrefsWritable);
}
}
}
package com.mapper;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.mahout.cf.taste.hadoop.item.VectorAndPrefsWritable;
import org.apache.mahout.math.VarLongWritable;
import org.apache.mahout.math.Vector;
import org.apache.mahout.math.VectorWritable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 计算部分用户推荐向量
* @author 曾昭正
*/
publicclassCaclPartialRecomUserVectorMapperextendsMapper<IntWritable,VectorAndPrefsWritable,VarLongWritable,VectorWritable>{
privatestaticfinalLogger logger =LoggerFactory.getLogger(CaclPartialRecomUserVectorMapper.class);
@Override
protectedvoid map(IntWritable itemID,VectorAndPrefsWritable values,Context context)
throwsIOException,InterruptedException{
Vector coocVectorColumn = values.getVector();
List<Long> userIDs = values.getUserIDs();
List<Float> preferenceValues = values.getValues();
for(int i =0; i< userIDs.size(); i++){
long userID = userIDs.get(i);
float preferenceValue = preferenceValues.get(i);
logger.info("userID:"+ userID);
logger.info("preferenceValue:"+preferenceValue);
//将共现矩阵中userID对应的列相乘,算出部分用户对应的推荐列表分数
Vector preferenceParScores = coocVectorColumn.times(preferenceValue);
context.write(newVarLongWritable(userID),newVectorWritable(preferenceParScores));
}
}
}
package com.reducer;
import java.io.IOException;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.mahout.math.VarLongWritable;
import org.apache.mahout.math.Vector;
import org.apache.mahout.math.VectorWritable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 将计算部分用户推荐向量的结果进行合并,将userID对应的贡现向量的分值进行相加(注意:这个只是将一个map的输出进行合并,所以这个也是只部分结果)
* @author 曾昭正
*/
publicclassParRecomUserVectorCombinerextendsReducer<VarLongWritable,VectorWritable,VarLongWritable,VectorWritable>{
privatestaticfinalLogger logger =LoggerFactory.getLogger(ParRecomUserVectorCombiner.class);
@Override
protectedvoid reduce(VarLongWritable userID,Iterable<VectorWritable> coocVectorColunms,Context context)
throwsIOException,InterruptedException{
Vector vectorColunms =null;
for(VectorWritable coocVectorColunm : coocVectorColunms){
vectorColunms = vectorColunms ==null? coocVectorColunm.get(): vectorColunms.plus(coocVectorColunm.get());
}
logger.info(userID +" "+newVectorWritable(vectorColunms));
context.write(userID,newVectorWritable(vectorColunms));
}
}
package com.reducer;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.PriorityQueue;
import java.util.Queue;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.mahout.cf.taste.hadoop.RecommendedItemsWritable;
import org.apache.mahout.cf.taste.impl.recommender.ByValueRecommendedItemComparator;
import org.apache.mahout.cf.taste.impl.recommender.GenericRecommendedItem;
import org.apache.mahout.cf.taste.recommender.RecommendedItem;
import org.apache.mahout.math.VarLongWritable;
import org.apache.mahout.math.Vector;
import org.apache.mahout.math.Vector.Element;
import org.apache.mahout.math.VectorWritable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 合并所有已经评分的共现矩阵
* @author 曾昭正
*/
publicclassMergeAndGenerateRecommendReducerextendsReducer<VarLongWritable,VectorWritable,VarLongWritable,RecommendedItemsWritable>{
privatestaticfinalLogger logger =LoggerFactory.getLogger(MergeAndGenerateRecommendReducer.class);
privateint recommendationsPerUser;
@Override
protectedvoid setup(Context context)
throwsIOException,InterruptedException{
recommendationsPerUser = context.getConfiguration().getInt("recomandItems.recommendationsPerUser",5);
}
@Override
protectedvoid reduce(VarLongWritable userID,Iterable<VectorWritable> cooVectorColumn,Context context)
throwsIOException,InterruptedException{
//分数求和合并
Vector recommdVector =null;
for(VectorWritable vector : cooVectorColumn){
recommdVector = recommdVector ==null? vector.get(): recommdVector.plus(vector.get());
}
//对推荐向量进行排序,为每个UserID找出topM个推荐选项(默认找出5个),此队列按照item对应的分数进行排序
//注意下:PriorityQueue队列的头一定是最小的元素,另外这个队列容量增加1是为了为添加更大的新元素时使用的临时空间
Queue<RecommendedItem> topItems =newPriorityQueue<RecommendedItem>(recommendationsPerUser+1,ByValueRecommendedItemComparator.getInstance());
Iterator<Element> it = recommdVector.nonZeroes().iterator();
while(it.hasNext()){
Element e = it.next();
int itemID = e.index();
float preValue =(float) e.get();
//当队列容量小于推荐个数,往队列中填item和分数
if(topItems.size()< recommendationsPerUser){
topItems.add(newGenericRecommendedItem(itemID, preValue));
}
//当前item对应的分数比队列中的item的最小分数大,则将队列头原始(即最小元素)弹出,并且将当前item:分数加入队列
elseif(preValue > topItems.peek().getValue()){
topItems.add(newGenericRecommendedItem(itemID, preValue));
//弹出头元素(最小元素)
topItems.poll();
}
}
//重新调整队列的元素的顺序
List<RecommendedItem> recommdations =newArrayList<RecommendedItem>(topItems.size());
recommdations.addAll(topItems);//将队列中所有元素添加即将排序的集合
Collections.sort(recommdations,ByValueRecommendedItemComparator.getInstance());//排序
//输出推荐向量信息
logger.info(userID+" "+newRecommendedItemsWritable(recommdations));
context.write(userID,newRecommendedItemsWritable(recommdations));
}
}
package com.mapreduceMain;
import java.io.IOException;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.mahout.cf.taste.hadoop.RecommendedItemsWritable;
import org.apache.mahout.cf.taste.hadoop.item.VectorAndPrefsWritable;
import org.apache.mahout.cf.taste.hadoop.item.VectorOrPrefWritable;
import org.apache.mahout.math.VarLongWritable;
import org.apache.mahout.math.VectorWritable;
import com.mapper.CaclPartialRecomUserVectorMapper;
import com.mapper.CombineUserVectorAndCoocMatrixMapper;
import com.mapper.UserVecotrSplitMapper;
import com.mapper.UserVectorToCooccurrenceMapper;
import com.mapper.SourceDataToItemPrefsMapper;
import com.reducer.CombineUserVectorAndCoocMatrixReducer;
import com.reducer.MergeAndGenerateRecommendReducer;
import com.reducer.ParRecomUserVectorCombiner;
import com.reducer.UserVectorToCoocurrenceReducer;
import com.reducer.SourceDataToUserVectorReducer;
import com.util.HadoopUtil;
/**
* 组装各个作业组件,完成推荐作业
* @author 曾昭正
*/
publicclassPackageRecomendJobextendsConfiguredimplementsTool{
String[] dataSourceInputPath ={"/user/hadoop/z.zeng/distruteItemCF/dataSourceInput"};
String[] uesrVectorOutput ={"/user/hadoop/z.zeng/distruteItemCF/uesrVectorOutput/"};
String[] userVectorSpliltOutPut ={"/user/hadoop/z.zeng/distruteItemCF/userVectorSpliltOutPut"};
String[] cooccurrenceMatrixOuptPath ={"/user/hadoop/z.zeng/distruteItemCF/CooccurrenceMatrixOuptPath"};
String[] combineUserVectorAndCoocMatrixOutPutPath ={"/user/hadoop/z.zeng/distruteItemCF/combineUserVectorAndCoocMatrixOutPutPath"};
String[] caclPartialRecomUserVectorOutPutPath ={"/user/hadoop/z.zeng/distruteItemCF/CaclPartialRecomUserVectorOutPutPath"};
protectedvoid setup(Configuration configuration)
throwsIOException,InterruptedException{
FileSystem hdfs =FileSystem.get(URI.create("hdfs://cluster-master"), configuration);
Path p1 =newPath(uesrVectorOutput[0]);
Path p2 =newPath(userVectorSpliltOutPut[0]);
Path p3 =newPath(cooccurrenceMatrixOuptPath[0]);
Path p4 =newPath(combineUserVectorAndCoocMatrixOutPutPath[0]);
Path p5 =newPath(caclPartialRecomUserVectorOutPutPath[0]);
if(hdfs.exists(p1)){
hdfs.delete(p1,true);
}
if(hdfs.exists(p2)){
hdfs.delete(p2,true);
}
if(hdfs.exists(p3)){
hdfs.delete(p3,true);
}
if(hdfs.exists(p4)){
hdfs.delete(p4,true);
}
if(hdfs.exists(p5)){
hdfs.delete(p5,true);
}
}
@Override
publicint run(String[] args)throwsException{
Configuration conf=getConf();//获得配置文件对象
setup(conf);
// DistributedCache.addArchiveToClassPath(new Path("/user/hadoop/z.zeng/distruteItemCF/lib"), conf);
//配置计算用户向量作业
Job wikipediaToItemPrefsJob =HadoopUtil.prepareJob(
"WikipediaToItemPrefsJob",
dataSourceInputPath,
uesrVectorOutput[0],
TextInputFormat.class,
SourceDataToItemPrefsMapper.class,
VarLongWritable.class,
VarLongWritable.class,
SourceDataToUserVectorReducer.class,
VarLongWritable.class,
VectorWritable.class,
SequenceFileOutputFormat.class,
conf);
//配置计算共现向量作业
Job userVectorToCooccurrenceJob =HadoopUtil.prepareJob(
"UserVectorToCooccurrenceJob",
uesrVectorOutput,
cooccurrenceMatrixOuptPath[0],
SequenceFileInputFormat.class,
UserVectorToCooccurrenceMapper.class,
IntWritable.class,
IntWritable.class,
UserVectorToCoocurrenceReducer.class,
IntWritable.class,
VectorOrPrefWritable.class,
SequenceFileOutputFormat.class,
conf);
//配置分割用户向量作业
Job userVecotrSplitJob =HadoopUtil.prepareJob(
"userVecotrSplitJob",
uesrVectorOutput,
userVectorSpliltOutPut[0],
SequenceFileInputFormat.class,
UserVecotrSplitMapper.class,
IntWritable.class,
VectorOrPrefWritable.class,
SequenceFileOutputFormat.class,
conf);
//合并共现向量和分割之后的用户向量作业
//这个主意要将分割用户向量和共现向量的输出结果一起作为输入
String[] combineUserVectorAndCoocMatrixIutPutPath ={cooccurrenceMatrixOuptPath[0],userVectorSpliltOutPut[0]};
Job combineUserVectorAndCoocMatrixJob =HadoopUtil.prepareJob(
"combineUserVectorAndCoocMatrixJob",
combineUserVectorAndCoocMatrixIutPutPath,
combineUserVectorAndCoocMatrixOutPutPath[0],
SequenceFileInputFormat.class,
CombineUserVectorAndCoocMatrixMapper.class,
IntWritable.class,
VectorOrPrefWritable.class,
CombineUserVectorAndCoocMatrixReducer.class,
IntWritable.class,
VectorAndPrefsWritable.class,
SequenceFileOutputFormat.class,
conf);
//计算用户推荐向量
Job caclPartialRecomUserVectorJob=HadoopUtil.prepareJob(
"caclPartialRecomUserVectorJob",
combineUserVectorAndCoocMatrixOutPutPath,
caclPartialRecomUserVectorOutPutPath[0],
SequenceFileInputFormat.class,
CaclPartialRecomUserVectorMapper.class,
VarLongWritable.class,
VectorWritable.class,
ParRecomUserVectorCombiner.class,//为map设置combiner减少网络IO
MergeAndGenerateRecommendReducer.class,
VarLongWritable.class,
RecommendedItemsWritable.class,
TextOutputFormat.class,
conf);
//串联各个job
if(wikipediaToItemPrefsJob.waitForCompletion(true)){
if(userVectorToCooccurrenceJob.waitForCompletion(true)){
if(userVecotrSplitJob.waitForCompletion(true)){
if(combineUserVectorAndCoocMatrixJob.waitForCompletion(true)){
int rs = caclPartialRecomUserVectorJob.waitForCompletion(true)?1:0;
return rs;
}else{
thrownewException("合并共现向量和分割之后的用户向量作业失败!!");
}
}else{
thrownewException("分割用户向量作业失败!!");
}
}else{
thrownewException("计算共现向量作业失败!!");
}
}else{
thrownewException("计算用户向量作业失败!!");
}
}
publicstaticvoid main(String[] args)throwsIOException,
ClassNotFoundException,InterruptedException{
try{
int returnCode =ToolRunner.run(newPackageRecomendJob(),args);
System.exit(returnCode);
}catch(Exception e){
}
}
}
(转) 基于MapReduce的ItemBase推荐算法的共现矩阵实现(一)
标签:style blog http io ar color os 使用 sp
原文地址:http://www.cnblogs.com/baixl/p/4165156.html