标签:style blog http io ar color os 使用 sp
package com.util;import java.io.IOException;import java.util.Arrays;import java.util.Iterator;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.fs.PathFilter;import org.apache.hadoop.io.Writable;import org.apache.hadoop.mapreduce.InputFormat;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.JobContext;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.OutputFormat;import org.apache.hadoop.mapreduce.Reducer;import org.apache.mahout.common.iterator.sequencefile.PathType;import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirValueIterator;import org.apache.mahout.common.iterator.sequencefile.SequenceFileValueIterator;import org.slf4j.Logger;import org.slf4j.LoggerFactory;publicfinalclassHadoopUtil{privatestaticfinalLogger log =LoggerFactory.getLogger(HadoopUtil.class);privateHadoopUtil(){}publicstaticJob prepareJob(String jobName,String[] inputPath,String outputPath,Class<?extendsInputFormat> inputFormat,Class<?extendsMapper> mapper,Class<?extendsWritable> mapperKey,Class<?extendsWritable> mapperValue,Class<?extendsOutputFormat> outputFormat,Configuration conf)throwsIOException{Job job =newJob(newConfiguration(conf)); job.setJobName(jobName);Configuration jobConf = job.getConfiguration();if(mapper.equals(Mapper.class)){thrownewIllegalStateException("Can‘t figure out the user class jar file from mapper/reducer");} job.setJarByClass(mapper); job.setInputFormatClass(inputFormat); job.setInputFormatClass(inputFormat);StringBuilder inputPathsStringBuilder =newStringBuilder();for(String p : inputPath){ inputPathsStringBuilder.append(",").append(p);} inputPathsStringBuilder.deleteCharAt(0); jobConf.set("mapred.input.dir", inputPathsStringBuilder.toString()); job.setMapperClass(mapper); job.setMapOutputKeyClass(mapperKey); job.setMapOutputValueClass(mapperValue); job.setOutputKeyClass(mapperKey); job.setOutputValueClass(mapperValue); jobConf.setBoolean("mapred.compress.map.output",true); job.setNumReduceTasks(0); job.setOutputFormatClass(outputFormat); jobConf.set("mapred.output.dir", outputPath);return job;}publicstaticJob prepareJob(String jobName,String[] inputPath,String outputPath,Class<?extendsInputFormat> inputFormat,Class<?extendsMapper> mapper,Class<?extendsWritable> mapperKey,Class<?extendsWritable> mapperValue,Class<?extendsReducer> reducer,Class<?extendsWritable> reducerKey,Class<?extendsWritable> reducerValue,Class<?extendsOutputFormat> outputFormat,Configuration conf)throwsIOException{Job job =newJob(newConfiguration(conf)); job.setJobName(jobName);Configuration jobConf = job.getConfiguration();if(reducer.equals(Reducer.class)){if(mapper.equals(Mapper.class)){thrownewIllegalStateException("Can‘t figure out the user class jar file from mapper/reducer");} job.setJarByClass(mapper);}else{ job.setJarByClass(reducer);} job.setInputFormatClass(inputFormat);StringBuilder inputPathsStringBuilder =newStringBuilder();for(String p : inputPath){ inputPathsStringBuilder.append(",").append(p);} inputPathsStringBuilder.deleteCharAt(0); jobConf.set("mapred.input.dir", inputPathsStringBuilder.toString()); job.setMapperClass(mapper);if(mapperKey !=null){ job.setMapOutputKeyClass(mapperKey);}if(mapperValue !=null){ job.setMapOutputValueClass(mapperValue);} jobConf.setBoolean("mapred.compress.map.output",true); job.setReducerClass(reducer); job.setOutputKeyClass(reducerKey); job.setOutputValueClass(reducerValue); job.setOutputFormatClass(outputFormat); jobConf.set("mapred.output.dir", outputPath);return job;}publicstaticJob prepareJob(String jobName,String[] inputPath,String outputPath,Class<?extendsInputFormat> inputFormat,Class<?extendsMapper> mapper,Class<?extendsWritable> mapperKey,Class<?extendsWritable> mapperValue,Class<?extendsReducer> combiner,Class<?extendsReducer> reducer,Class<?extendsWritable> reducerKey,Class<?extendsWritable> reducerValue,Class<?extendsOutputFormat> outputFormat,Configuration conf)throwsIOException{Job job =newJob(newConfiguration(conf)); job.setJobName(jobName);Configuration jobConf = job.getConfiguration();if(reducer.equals(Reducer.class)){if(mapper.equals(Mapper.class)){thrownewIllegalStateException("Can‘t figure out the user class jar file from mapper/reducer");} job.setJarByClass(mapper);}else{ job.setJarByClass(reducer);} job.setInputFormatClass(inputFormat);StringBuilder inputPathsStringBuilder =newStringBuilder();for(String p : inputPath){ inputPathsStringBuilder.append(",").append(p);} inputPathsStringBuilder.deleteCharAt(0); jobConf.set("mapred.input.dir", inputPathsStringBuilder.toString()); job.setMapperClass(mapper);if(mapperKey !=null){ job.setMapOutputKeyClass(mapperKey);}if(mapperValue !=null){ job.setMapOutputValueClass(mapperValue);} jobConf.setBoolean("mapred.compress.map.output",true); job.setCombinerClass(combiner); job.setReducerClass(reducer); job.setOutputKeyClass(reducerKey); job.setOutputValueClass(reducerValue); job.setOutputFormatClass(outputFormat); jobConf.set("mapred.output.dir", outputPath);return job;}publicstaticString getCustomJobName(String className,JobContext job,Class<?extendsMapper> mapper,Class<?extendsReducer> reducer){StringBuilder name =newStringBuilder(100);String customJobName = job.getJobName();if(customJobName ==null|| customJobName.trim().isEmpty()){ name.append(className);}else{ name.append(customJobName);} name.append(‘-‘).append(mapper.getSimpleName()); name.append(‘-‘).append(reducer.getSimpleName());return name.toString();}publicstaticvoiddelete(Configuration conf,Iterable<Path> paths)throwsIOException{if(conf ==null){ conf =newConfiguration();}for(Path path : paths){FileSystem fs = path.getFileSystem(conf);if(fs.exists(path)){ log.info("Deleting {}", path); fs.delete(path,true);}}}publicstaticvoiddelete(Configuration conf,Path... paths)throwsIOException{delete(conf,Arrays.asList(paths));}publicstaticlong countRecords(Path path,Configuration conf)throwsIOException{long count =0;Iterator<?> iterator =newSequenceFileValueIterator<Writable>(path,true, conf);while(iterator.hasNext()){ iterator.next(); count++;}return count;}publicstaticlong countRecords(Path path,PathType pt,PathFilter filter,Configuration conf)throwsIOException{long count =0;Iterator<?> iterator =newSequenceFileDirValueIterator<Writable>(path, pt, filter,null,true, conf);while(iterator.hasNext()){ iterator.next(); count++;}return count;}}package com.mapper;import java.io.IOException;import java.util.regex.Matcher;import java.util.regex.Pattern;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import org.apache.mahout.math.VarLongWritable;/** * mapper输入格式:userID:itemID1 itemID2 itemID3.... * mapper输出格式:<userID,itemID> * @author 曾昭正 */publicclassSourceDataToItemPrefsMapperextendsMapper<LongWritable,Text,VarLongWritable,VarLongWritable>{//private static final Logger logger = LoggerFactory.getLogger(SourceDataToItemPrefsMapper.class);privatestaticfinalPattern NUMBERS =Pattern.compile("(\\d+)");privateString line =null;@Overrideprotectedvoid map(LongWritable key,Text value,Context context)throwsIOException,InterruptedException{ line = value.toString();if(line ==null)return;// logger.info("line:"+line);Matcher matcher = NUMBERS.matcher(line); matcher.find();//寻找第一个分组,即userIDVarLongWritable userID =newVarLongWritable(Long.parseLong(matcher.group()));//这个类型是在mahout中独立进行封装的VarLongWritable itemID =newVarLongWritable();while(matcher.find()){ itemID.set(Long.parseLong(matcher.group()));// logger.info(userID + " " + itemID); context.write(userID, itemID);}}}package com.reducer;import java.io.IOException;import org.apache.hadoop.mapreduce.Reducer;import org.apache.mahout.math.RandomAccessSparseVector;import org.apache.mahout.math.VarLongWritable;import org.apache.mahout.math.Vector;import org.apache.mahout.math.VectorWritable;import org.slf4j.Logger;import org.slf4j.LoggerFactory;/** * reducer输入:<userID,Iterable<itemID>> * reducer输出:<userID,VecotrWriable<index=itemID,valuce=pres>....> * @author 曾昭正 */publicclassSourceDataToUserVectorReducerextendsReducer<VarLongWritable,VarLongWritable,VarLongWritable,VectorWritable>{privatestaticfinalLogger logger =LoggerFactory.getLogger(SourceDataToUserVectorReducer.class);@Overrideprotectedvoid reduce(VarLongWritable userID,Iterable<VarLongWritable> itemPrefs,Context context)throwsIOException,InterruptedException{/** * DenseVector,它的实现就是一个浮点数数组,对向量里所有域都进行存储,适合用于存储密集向量。 RandomAccessSparseVector 基于浮点数的 HashMap 实现的,key 是整形 (int) 类型,value 是浮点数 (double) 类型,它只存储向量中不为空的值,并提供随机访问。 SequentialAccessVector 实现为整形 (int) 类型和浮点数 (double) 类型的并行数组,它也只存储向量中不为空的值,但只提供顺序访问。 用户可以根据自己算法的需求选择合适的向量实现类,如果算法需要很多随机访问,应该选择 DenseVector 或者 RandomAccessSparseVector,如果大部分都是顺序访问,SequentialAccessVector 的效果应该更好。 介绍了向量的实现,下面我们看看如何将现有的数据建模成向量,术语就是“如何对数据进行向量化”,以便采用 Mahout 的各种高效的聚类算法。 */Vector userVector =newRandomAccessSparseVector(Integer.MAX_VALUE,100);for(VarLongWritable itemPref : itemPrefs){ userVector.set((int)itemPref.get(),1.0f);//RandomAccessSparseVector.set(index,value),用户偏好类型为boolean类型,将偏好值默认都为1.0f} logger.info(userID+" "+newVectorWritable(userVector)); context.write(userID,newVectorWritable(userVector));}}package com.mapper;import java.io.IOException;import java.util.Iterator;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.mapreduce.Mapper;import org.apache.mahout.math.VarLongWritable;import org.apache.mahout.math.Vector;import org.apache.mahout.math.VectorWritable;/** * mapper输入:<userID,VecotrWriable<index=itemID,valuce=pres>....> * mapper输出:<itemID,itemID>(共现物品id对) * @author 曾昭正 */publicclassUserVectorToCooccurrenceMapperextendsMapper<VarLongWritable,VectorWritable,IntWritable,IntWritable>{@Overrideprotectedvoid map(VarLongWritable userID,VectorWritable userVector,Context context)throwsIOException,InterruptedException{Iterator<Vector.Element> it = userVector.get().nonZeroes().iterator();//过滤掉非空元素while(it.hasNext()){int index1 = it.next().index();Iterator<Vector.Element> it2 = userVector.get().nonZeroes().iterator();while(it2.hasNext()){int index2 = it2.next().index(); context.write(newIntWritable(index1),newIntWritable(index2));}}}}package com.reducer;import java.io.IOException;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.mapreduce.Reducer;import org.apache.mahout.cf.taste.hadoop.item.VectorOrPrefWritable;import org.apache.mahout.math.RandomAccessSparseVector;import org.apache.mahout.math.Vector;import org.slf4j.Logger;import org.slf4j.LoggerFactory;/** * reducer输入:<itemID,Iterable<itemIDs>> * reducer输出:<mainItemID,Vector<coocItemID,coocTime(共现次数)>....> * @author 曾昭正 */publicclassUserVectorToCoocurrenceReducerextendsReducer<IntWritable,IntWritable,IntWritable,VectorOrPrefWritable>{privatestaticfinalLogger logger =LoggerFactory.getLogger(UserVectorToCoocurrenceReducer.class);@Overrideprotectedvoid reduce(IntWritable mainItemID,Iterable<IntWritable> coocItemIDs,Context context)throwsIOException,InterruptedException{Vector coocItemIDVectorRow =newRandomAccessSparseVector(Integer.MAX_VALUE,100);for(IntWritable coocItem : coocItemIDs){int itemCoocTime = coocItem.get(); coocItemIDVectorRow.set(itemCoocTime,coocItemIDVectorRow.get(itemCoocTime)+1.0);//将共现次数累加} logger.info(mainItemID +" "+newVectorOrPrefWritable(coocItemIDVectorRow)); context.write(mainItemID,newVectorOrPrefWritable(coocItemIDVectorRow));//记录mainItemID的完整共现关系}}package com.mapper;import java.io.IOException;import java.util.Iterator;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.mapreduce.Mapper;import org.apache.mahout.cf.taste.hadoop.item.VectorOrPrefWritable;import org.apache.mahout.math.VarLongWritable;import org.apache.mahout.math.Vector;import org.apache.mahout.math.Vector.Element;import org.apache.mahout.math.VectorWritable;import org.slf4j.Logger;import org.slf4j.LoggerFactory;/** * 将用户向量分割,以便和物品的共现向量进行合并 * mapper输入:<userID,Vector<itemIDIndex,preferenceValuce>....> * reducer输出:<itemID,Vecotor<userID,preferenceValuce>....> * @author 曾昭正 */publicclassUserVecotrSplitMapperextendsMapper<VarLongWritable,VectorWritable,IntWritable,VectorOrPrefWritable>{privatestaticfinalLogger logger =LoggerFactory.getLogger(UserVecotrSplitMapper.class);@Overrideprotectedvoid map(VarLongWritable userIDWritable,VectorWritable value,Context context)throwsIOException,InterruptedException{IntWritable itemIDIndex =newIntWritable();long userID = userIDWritable.get();Vector userVector = value.get();Iterator<Element> it = userVector.nonZeroes().iterator();//只取非空用户向量while(it.hasNext()){Element e = it.next();int itemID = e.index(); itemIDIndex.set(itemID);float preferenceValuce =(float) e.get(); logger.info(itemIDIndex +" "+newVectorOrPrefWritable(userID,preferenceValuce)); context.write(itemIDIndex,newVectorOrPrefWritable(userID,preferenceValuce));}}}package com.mapper;import java.io.IOException;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.mapreduce.Mapper;import org.apache.mahout.cf.taste.hadoop.item.VectorOrPrefWritable;/** * 将共现矩阵和分割后的用户向量进行合并,以便计算部分的推荐向量 * 这个mapper其实没有什么逻辑处理功能,只是将数据按照输入格式输出 * 注意:这里的mapper输入为共现矩阵和分割后的用户向量计算过程中的共同输出的2个目录 * mapper输入:<itemID,Vecotor<userID,preferenceValuce>> or <itemID,Vecotor<coocItemID,coocTimes>> * mapper输出:<itemID,Vecotor<userID,preferenceValuce>/Vecotor<coocItemID,coocTimes>> * @author 曾昭正 */publicclassCombineUserVectorAndCoocMatrixMapperextendsMapper<IntWritable,VectorOrPrefWritable,IntWritable,VectorOrPrefWritable>{@Overrideprotectedvoid map(IntWritable itemID,VectorOrPrefWritable value,Context context)throwsIOException,InterruptedException{ context.write(itemID, value);}}package com.reducer;import java.io.IOException;import java.util.ArrayList;import java.util.Iterator;import java.util.List;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.mapreduce.Reducer;import org.apache.mahout.cf.taste.hadoop.item.VectorAndPrefsWritable;import org.apache.mahout.cf.taste.hadoop.item.VectorOrPrefWritable;import org.apache.mahout.math.Vector;import org.slf4j.Logger;import org.slf4j.LoggerFactory;/** * 将共现矩阵和分割后的用户向量进行合并,以便计算部分的推荐向量 * @author 曾昭正 */publicclassCombineUserVectorAndCoocMatrixReducerextendsReducer<IntWritable,VectorOrPrefWritable,IntWritable,VectorAndPrefsWritable>{privatestaticfinalLogger logger =LoggerFactory.getLogger(CombineUserVectorAndCoocMatrixReducer.class);@Overrideprotectedvoid reduce(IntWritable itemID,Iterable<VectorOrPrefWritable> values,Context context)throwsIOException,InterruptedException{VectorAndPrefsWritable vectorAndPrefsWritable =newVectorAndPrefsWritable();List<Long> userIDs =newArrayList<Long>();List<Float> preferenceValues =newArrayList<Float>();Vector coocVector =null;Vector coocVectorTemp =null;Iterator<VectorOrPrefWritable> it = values.iterator();while(it.hasNext()){VectorOrPrefWritable e = it.next(); coocVectorTemp = e.getVector();if(coocVectorTemp ==null){ userIDs.add(e.getUserID()); preferenceValues.add(e.getValue());}else{ coocVector = coocVectorTemp;}}if(coocVector !=null){//这个需要注意,根据共现矩阵的计算reduce聚合之后,到了这个一个Reudce分组就有且只有一个vecotr(即共现矩阵的一列或者一行,这里行和列是一样的)了。 vectorAndPrefsWritable.set(coocVector, userIDs, preferenceValues); logger.info(itemID+" "+vectorAndPrefsWritable); context.write(itemID, vectorAndPrefsWritable);}}}package com.mapper;import java.io.IOException;import java.util.List;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.mapreduce.Mapper;import org.apache.mahout.cf.taste.hadoop.item.VectorAndPrefsWritable;import org.apache.mahout.math.VarLongWritable;import org.apache.mahout.math.Vector;import org.apache.mahout.math.VectorWritable;import org.slf4j.Logger;import org.slf4j.LoggerFactory;/** * 计算部分用户推荐向量 * @author 曾昭正 */publicclassCaclPartialRecomUserVectorMapperextendsMapper<IntWritable,VectorAndPrefsWritable,VarLongWritable,VectorWritable>{privatestaticfinalLogger logger =LoggerFactory.getLogger(CaclPartialRecomUserVectorMapper.class);@Overrideprotectedvoid map(IntWritable itemID,VectorAndPrefsWritable values,Context context)throwsIOException,InterruptedException{Vector coocVectorColumn = values.getVector();List<Long> userIDs = values.getUserIDs();List<Float> preferenceValues = values.getValues();for(int i =0; i< userIDs.size(); i++){long userID = userIDs.get(i);float preferenceValue = preferenceValues.get(i); logger.info("userID:"+ userID); logger.info("preferenceValue:"+preferenceValue);//将共现矩阵中userID对应的列相乘,算出部分用户对应的推荐列表分数Vector preferenceParScores = coocVectorColumn.times(preferenceValue); context.write(newVarLongWritable(userID),newVectorWritable(preferenceParScores));}}}package com.reducer;import java.io.IOException;import org.apache.hadoop.mapreduce.Reducer;import org.apache.mahout.math.VarLongWritable;import org.apache.mahout.math.Vector;import org.apache.mahout.math.VectorWritable;import org.slf4j.Logger;import org.slf4j.LoggerFactory;/** * 将计算部分用户推荐向量的结果进行合并,将userID对应的贡现向量的分值进行相加(注意:这个只是将一个map的输出进行合并,所以这个也是只部分结果) * @author 曾昭正 */publicclassParRecomUserVectorCombinerextendsReducer<VarLongWritable,VectorWritable,VarLongWritable,VectorWritable>{privatestaticfinalLogger logger =LoggerFactory.getLogger(ParRecomUserVectorCombiner.class);@Overrideprotectedvoid reduce(VarLongWritable userID,Iterable<VectorWritable> coocVectorColunms,Context context)throwsIOException,InterruptedException{Vector vectorColunms =null;for(VectorWritable coocVectorColunm : coocVectorColunms){ vectorColunms = vectorColunms ==null? coocVectorColunm.get(): vectorColunms.plus(coocVectorColunm.get());} logger.info(userID +" "+newVectorWritable(vectorColunms)); context.write(userID,newVectorWritable(vectorColunms));}}package com.reducer;import java.io.IOException;import java.util.ArrayList;import java.util.Collections;import java.util.Iterator;import java.util.List;import java.util.PriorityQueue;import java.util.Queue;import org.apache.hadoop.mapreduce.Reducer;import org.apache.mahout.cf.taste.hadoop.RecommendedItemsWritable;import org.apache.mahout.cf.taste.impl.recommender.ByValueRecommendedItemComparator;import org.apache.mahout.cf.taste.impl.recommender.GenericRecommendedItem;import org.apache.mahout.cf.taste.recommender.RecommendedItem;import org.apache.mahout.math.VarLongWritable;import org.apache.mahout.math.Vector;import org.apache.mahout.math.Vector.Element;import org.apache.mahout.math.VectorWritable;import org.slf4j.Logger;import org.slf4j.LoggerFactory;/** * 合并所有已经评分的共现矩阵 * @author 曾昭正 */publicclassMergeAndGenerateRecommendReducerextendsReducer<VarLongWritable,VectorWritable,VarLongWritable,RecommendedItemsWritable>{privatestaticfinalLogger logger =LoggerFactory.getLogger(MergeAndGenerateRecommendReducer.class);privateint recommendationsPerUser;@Overrideprotectedvoid setup(Context context)throwsIOException,InterruptedException{ recommendationsPerUser = context.getConfiguration().getInt("recomandItems.recommendationsPerUser",5);}@Overrideprotectedvoid reduce(VarLongWritable userID,Iterable<VectorWritable> cooVectorColumn,Context context)throwsIOException,InterruptedException{//分数求和合并Vector recommdVector =null;for(VectorWritable vector : cooVectorColumn){ recommdVector = recommdVector ==null? vector.get(): recommdVector.plus(vector.get());}//对推荐向量进行排序,为每个UserID找出topM个推荐选项(默认找出5个),此队列按照item对应的分数进行排序//注意下:PriorityQueue队列的头一定是最小的元素,另外这个队列容量增加1是为了为添加更大的新元素时使用的临时空间Queue<RecommendedItem> topItems =newPriorityQueue<RecommendedItem>(recommendationsPerUser+1,ByValueRecommendedItemComparator.getInstance());Iterator<Element> it = recommdVector.nonZeroes().iterator();while(it.hasNext()){Element e = it.next();int itemID = e.index();float preValue =(float) e.get();//当队列容量小于推荐个数,往队列中填item和分数if(topItems.size()< recommendationsPerUser){ topItems.add(newGenericRecommendedItem(itemID, preValue));}//当前item对应的分数比队列中的item的最小分数大,则将队列头原始(即最小元素)弹出,并且将当前item:分数加入队列elseif(preValue > topItems.peek().getValue()){ topItems.add(newGenericRecommendedItem(itemID, preValue));//弹出头元素(最小元素) topItems.poll();}}//重新调整队列的元素的顺序List<RecommendedItem> recommdations =newArrayList<RecommendedItem>(topItems.size()); recommdations.addAll(topItems);//将队列中所有元素添加即将排序的集合Collections.sort(recommdations,ByValueRecommendedItemComparator.getInstance());//排序//输出推荐向量信息 logger.info(userID+" "+newRecommendedItemsWritable(recommdations)); context.write(userID,newRecommendedItemsWritable(recommdations));}}package com.mapreduceMain;import java.io.IOException;import java.net.URI;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;import org.apache.mahout.cf.taste.hadoop.RecommendedItemsWritable;import org.apache.mahout.cf.taste.hadoop.item.VectorAndPrefsWritable;import org.apache.mahout.cf.taste.hadoop.item.VectorOrPrefWritable;import org.apache.mahout.math.VarLongWritable;import org.apache.mahout.math.VectorWritable;import com.mapper.CaclPartialRecomUserVectorMapper;import com.mapper.CombineUserVectorAndCoocMatrixMapper;import com.mapper.UserVecotrSplitMapper;import com.mapper.UserVectorToCooccurrenceMapper;import com.mapper.SourceDataToItemPrefsMapper;import com.reducer.CombineUserVectorAndCoocMatrixReducer;import com.reducer.MergeAndGenerateRecommendReducer;import com.reducer.ParRecomUserVectorCombiner;import com.reducer.UserVectorToCoocurrenceReducer;import com.reducer.SourceDataToUserVectorReducer;import com.util.HadoopUtil;/** * 组装各个作业组件,完成推荐作业 * @author 曾昭正 */publicclassPackageRecomendJobextendsConfiguredimplementsTool{String[] dataSourceInputPath ={"/user/hadoop/z.zeng/distruteItemCF/dataSourceInput"};String[] uesrVectorOutput ={"/user/hadoop/z.zeng/distruteItemCF/uesrVectorOutput/"};String[] userVectorSpliltOutPut ={"/user/hadoop/z.zeng/distruteItemCF/userVectorSpliltOutPut"};String[] cooccurrenceMatrixOuptPath ={"/user/hadoop/z.zeng/distruteItemCF/CooccurrenceMatrixOuptPath"};String[] combineUserVectorAndCoocMatrixOutPutPath ={"/user/hadoop/z.zeng/distruteItemCF/combineUserVectorAndCoocMatrixOutPutPath"};String[] caclPartialRecomUserVectorOutPutPath ={"/user/hadoop/z.zeng/distruteItemCF/CaclPartialRecomUserVectorOutPutPath"};protectedvoid setup(Configuration configuration)throwsIOException,InterruptedException{FileSystem hdfs =FileSystem.get(URI.create("hdfs://cluster-master"), configuration);Path p1 =newPath(uesrVectorOutput[0]);Path p2 =newPath(userVectorSpliltOutPut[0]);Path p3 =newPath(cooccurrenceMatrixOuptPath[0]);Path p4 =newPath(combineUserVectorAndCoocMatrixOutPutPath[0]);Path p5 =newPath(caclPartialRecomUserVectorOutPutPath[0]);if(hdfs.exists(p1)){ hdfs.delete(p1,true);}if(hdfs.exists(p2)){ hdfs.delete(p2,true);}if(hdfs.exists(p3)){ hdfs.delete(p3,true);}if(hdfs.exists(p4)){ hdfs.delete(p4,true);}if(hdfs.exists(p5)){ hdfs.delete(p5,true);}}@Overridepublicint run(String[] args)throwsException{Configuration conf=getConf();//获得配置文件对象 setup(conf);// DistributedCache.addArchiveToClassPath(new Path("/user/hadoop/z.zeng/distruteItemCF/lib"), conf);//配置计算用户向量作业Job wikipediaToItemPrefsJob =HadoopUtil.prepareJob("WikipediaToItemPrefsJob", dataSourceInputPath, uesrVectorOutput[0],TextInputFormat.class,SourceDataToItemPrefsMapper.class,VarLongWritable.class,VarLongWritable.class,SourceDataToUserVectorReducer.class,VarLongWritable.class,VectorWritable.class,SequenceFileOutputFormat.class, conf);//配置计算共现向量作业Job userVectorToCooccurrenceJob =HadoopUtil.prepareJob("UserVectorToCooccurrenceJob", uesrVectorOutput, cooccurrenceMatrixOuptPath[0],SequenceFileInputFormat.class,UserVectorToCooccurrenceMapper.class,IntWritable.class,IntWritable.class,UserVectorToCoocurrenceReducer.class,IntWritable.class,VectorOrPrefWritable.class,SequenceFileOutputFormat.class, conf);//配置分割用户向量作业Job userVecotrSplitJob =HadoopUtil.prepareJob("userVecotrSplitJob", uesrVectorOutput, userVectorSpliltOutPut[0],SequenceFileInputFormat.class,UserVecotrSplitMapper.class,IntWritable.class,VectorOrPrefWritable.class,SequenceFileOutputFormat.class, conf);//合并共现向量和分割之后的用户向量作业//这个主意要将分割用户向量和共现向量的输出结果一起作为输入String[] combineUserVectorAndCoocMatrixIutPutPath ={cooccurrenceMatrixOuptPath[0],userVectorSpliltOutPut[0]};Job combineUserVectorAndCoocMatrixJob =HadoopUtil.prepareJob("combineUserVectorAndCoocMatrixJob", combineUserVectorAndCoocMatrixIutPutPath, combineUserVectorAndCoocMatrixOutPutPath[0],SequenceFileInputFormat.class,CombineUserVectorAndCoocMatrixMapper.class,IntWritable.class,VectorOrPrefWritable.class,CombineUserVectorAndCoocMatrixReducer.class,IntWritable.class,VectorAndPrefsWritable.class,SequenceFileOutputFormat.class, conf);//计算用户推荐向量Job caclPartialRecomUserVectorJob=HadoopUtil.prepareJob("caclPartialRecomUserVectorJob", combineUserVectorAndCoocMatrixOutPutPath, caclPartialRecomUserVectorOutPutPath[0],SequenceFileInputFormat.class,CaclPartialRecomUserVectorMapper.class,VarLongWritable.class,VectorWritable.class,ParRecomUserVectorCombiner.class,//为map设置combiner减少网络IOMergeAndGenerateRecommendReducer.class,VarLongWritable.class,RecommendedItemsWritable.class,TextOutputFormat.class, conf);//串联各个jobif(wikipediaToItemPrefsJob.waitForCompletion(true)){if(userVectorToCooccurrenceJob.waitForCompletion(true)){if(userVecotrSplitJob.waitForCompletion(true)){if(combineUserVectorAndCoocMatrixJob.waitForCompletion(true)){int rs = caclPartialRecomUserVectorJob.waitForCompletion(true)?1:0;return rs;}else{thrownewException("合并共现向量和分割之后的用户向量作业失败!!");}}else{thrownewException("分割用户向量作业失败!!");}}else{thrownewException("计算共现向量作业失败!!");}}else{thrownewException("计算用户向量作业失败!!");}}publicstaticvoid main(String[] args)throwsIOException,ClassNotFoundException,InterruptedException{try{int returnCode =ToolRunner.run(newPackageRecomendJob(),args);System.exit(returnCode);}catch(Exception e){}}}
(转) 基于MapReduce的ItemBase推荐算法的共现矩阵实现(一)
标签:style blog http io ar color os 使用 sp
原文地址:http://www.cnblogs.com/baixl/p/4165156.html