标签:
程序说明:利用MapReduce计算框架,计算一组英文文档中各个单词的TFIDF。某单词在某文档的TFIDF=该单词中该文档的TF×该单词IDF。其中,- blog.csdn.net/ididcan/article/details/6657977
TFIDF源代码内容:
1)编写Docword类,把相关单词、对应文件名、相关指标(如IDF、TFIDF等等)作为属性。
<span style="font-size:18px;">package tfidf; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; public class Docword implements WritableComparable<Docword> { String docname; String word; double index; static final String DELIMITER=","; public Docword() { super(); // TODO Auto-generated constructor stub } public String getDocname() { return docname; } public void setDocname(String docname) { this.docname = docname; } public String getWord() { return word; } public void setWord(String word) { this.word = word; } public double getIndex() { return index; } public void setIndex(double index) { this.index = index; } @Override public void write(DataOutput out) throws IOException { // TODO Auto-generated method stub out.writeUTF(docname); out.writeUTF(word); out.writeDouble(index); } @Override public String toString() { return docname+DELIMITER+word; } @Override public void readFields(DataInput in) throws IOException { // TODO Auto-generated method stub docname=in.readUTF(); word=in.readUTF(); index=in.readDouble(); } @Override public int compareTo(Docword arg0) { // TODO Auto-generated method stub int result=0; result=(result!=0)?result:docname.compareTo(arg0.getDocname()); result=(result!=0)?result:word.compareTo(arg0.getWord()); return result; } }</span>
2)TFIDF代码。已有注释,具体如下。
<span style="font-size:18px;">public class Tfidf extends Configured implements Tool { static final String DELIMITER=","; /* =======================Job1:Tfidf-tf======================= * 程序说明:计算文档中各单词TF。程序主要思路借鉴了MapReduce Tutorial的WordCount2.0代码, * - 利用Local Resource,将待删除的字符串(如逗号、句号等)读入内存,对split中每个record删除特定字符串。 * - 设置”全局“变量(Map类的属性),统计文档的单词总数。注意,文档可能由于超过split大小,需要被分为多个split处理。所以Map中”全局“变量仅统计该split中各文档单词数,实际数需要中reduce中按照filename进行合并。 * * 程序输入:HDFS某目录下所有Text英文文档。 * 程序输出:各文档中单词的TF,按照“filename, wordname, termfreq”格式输出。 * (Shuffle)排序规则:1)filename。2)wordname。 * (Shuffle)合并规则:1)filename。2)wordname。 */ public static class Tfmapper extends Mapper<LongWritable, Text, Docword, IntWritable> { Docword outputKey=new Docword(); IntWritable outputValue=new IntWritable(1);//拆分每个单词,按频次为1,在reduce中分文件名进行合并 HashSet<String> skipstr=new HashSet<String>(); //存储待删除的字符串 int termnumindoc=0;//每个split中各文档中单词总数。 @Override protected void cleanup(Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub FileSplit filesplit=(FileSplit)context.getInputSplit(); String filename=filesplit.getPath().getName(); outputKey.setDocname(filename); outputKey.setWord("!termnum");//!termnum标识该记录是文件总单词数,由于map中已经对记录删除”!“,所以不会重复。 context.write(outputKey, new IntWritable(termnumindoc));//按照”filename,!termnum,termnumindoc“格式输出。由于!的ASCII码中所有字母之前,按照Docword.compareTo(),经过shuffle后,!termnum记录会出现中该filename所有记录的第一位。 } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub String str=value.toString().toLowerCase(); for(String skip:skipstr){ str=str.replaceAll(skip, "");//对每个redcord,删除特定字符串 } String []words=StringUtils.split(str, ' '); FileSplit filesplit=(FileSplit)context.getInputSplit();//获取InputSplit。每个InputSplit只属于一个InputFile,每个InputFile至少有一个InputSplit。 String filename=filesplit.getPath().getName();//利用FileSplit,提取输入文件名。 for(String word:words){ if(word.trim().isEmpty())//删除空行和空字符串。也可以放在待删除字符串,用正则表达式实现。 continue; if(word.charAt(0)<'a' || word.charAt(0)>'z')//删除非字母开头的单词。也可以放在待删除字符串,用正则表达式实现。 continue; termnumindoc++;//文档的单词数加1. outputKey.setDocname(filename);//文件名 outputKey.setWord(word);//单词名 context.write(outputKey, outputValue);//按照”filename,wordname,1“格式输出到本地文件系统,进行shuffle } } @Override protected void setup(Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub String line=""; BufferedReader fs=new BufferedReader(new FileReader("skipstr"));//读入待剔除的字符串 while((line=fs.readLine())!=null){ skipstr.add(line);//字符串逐个加入内存中的HashSet } } } public static class Tfcombiner extends Reducer<Docword, IntWritable, Docword, IntWritable> { IntWritable outputValue=new IntWritable(); @Override protected void reduce(Docword key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub int sumall=0; for(IntWritable value:values){ sumall+=value.get(); } outputValue.set(sumall); context.write(key, outputValue); } } public static class Tfreducer extends Reducer<Docword, IntWritable, Docword, DoubleWritable> { DoubleWritable outputValue=new DoubleWritable(); int termnumindoc=0;//各文档中单词总数。由于文档可能超过split尺寸大小,被拆分在多个split被多个Map统计单词数。在Reduce中对各个Map统计的单词总数进行汇总。 @Override protected void reduce(Docword key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub int sumall=0; for(IntWritable value:values){ sumall+=value.get(); } if(key.getWord().equals("!termnum")){//单词名是!termnum,标记该记录是文件的单词总数。 termnumindoc=sumall; } else{//单词名不是!termnum,标记该记录是文件实际单词。 // 由于!的ASCII码中所有字母之前,按照Docword.compareTo(),经过shuffle后,!termnum记录会出现中该filename所有记录的第一位。 // 计算TF时,分母”文件中单词总数“termnumindoc已有数据。 outputValue.set((double)1*sumall/termnumindoc); context.write(key, outputValue);//按照”filename,wordname,termfreq“输出到HDFS,作为Job2的输入 } } } public static class Tfpartition extends Partitioner<Docword, IntWritable> { @Override public int getPartition(Docword key, IntWritable value, int numPartitions) { // TODO Auto-generated method stub return Math.abs((key.getDocname()).hashCode())%numPartitions; } } /* =======================Job2:Tfidf-idf======================= * 程序说明:计算各单词idf。程序读入Job1的输出数据,按照"wordname,filename"格式,对各个wordname进行合并,计算word的IDF。 * - 在run()中,利用HDFS API获取文件总数,并传递给Reduce。 * - 由于Job1的输出数据按照filename进行排序,顺序读入记录并比较前后两个记录的filename有没有改变。如果没有改变,则文件总数不变;如果有改变,则文件总数加1。 * * 程序输入:Job1的输出文件,格式为“filename, wordname, termfreq” * 程序输出:单词的Idf,格式为“wordname, idf”。 * (Shuffle)排序规则:wordname。 * (Shuffle)合并规则:wordname。 */ public static class Idfmapper extends Mapper<LongWritable, Text, Docword, IntWritable> { Docword outputKey=new Docword(); IntWritable outputValue=new IntWritable(1);//拆分单词,按频次为1,在reduce中分文件名进行合并 int filenum=0; String filename="";//文件名。 @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub String []words=StringUtils.split(value.toString(), ','); outputKey.setDocname(words[0]); outputKey.setWord(words[1]); context.write(outputKey, outputValue); } } public static class Idfcombiner extends Reducer<Docword, IntWritable, Docword, IntWritable> { IntWritable outputValue=new IntWritable(); @Override protected void reduce(Docword key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub int sumall=0; for(IntWritable value:values){ sumall+=value.get(); } outputValue.set(sumall); context.write(key, outputValue); } } public static class Idfreducer extends Reducer<Docword, IntWritable, Text, DoubleWritable> { DoubleWritable outputValue=new DoubleWritable(); Text outputKey=new Text(); int alldoc=0;//文件总数。 @Override protected void setup(Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub //将run()中传递的变量读入内存,获得文件总数。 alldoc=Integer.parseInt(context.getConfiguration().get("filesnum")); } @Override protected void reduce(Docword key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub //由于Job1的输出数据按照filename进行排序,顺序读入记录并比较前后两个记录的filename有没有改变。 // 如果没有改变,则文件总数不变;如果有改变,则文件总数加1。 int termdocnum=0; for(IntWritable value:values){ termdocnum+=value.get();//单词对应文件数加1。 } outputKey.set(key.getWord()); outputValue.set((double)Math.log((double)(alldoc+1)/termdocnum)); context.write(outputKey, outputValue);//输出idf计算结果,输出格式“wordname,idf” } } public static class Idfpartition extends Partitioner<Docword, IntWritable> { @Override public int getPartition(Docword key, IntWritable value, int numPartitions) { // TODO Auto-generated method stub return Math.abs((key.getWord().hashCode()))%numPartitions; } } public static class Idfsort extends WritableComparator { //在shuffle中,所有记录按照wordname进行排序,按照wordname进行合并。 @Override public int compare(WritableComparable a, WritableComparable b) { // TODO Auto-generated method stub Docword lhs=(Docword)a; Docword rhs=(Docword)b; return lhs.getWord().compareTo(rhs.getWord()); } public Idfsort() { super(Docword.class,true); // TODO Auto-generated constructor stub } } /* =======================Job3:Tfidf-tfidf======================= * 程序说明:计算各单词tfidf。程序利用MultipleInputs分别读入Job1、Job2的输出数据,类似ReduceSide Join在Reduce中进行汇总计算。 * - MultipleInputs:配置Map,分别读入Job1、Job2的输出数据。其中,读入job2的输入数据后,设置filename为“!alldoc”。由于!的ASCII码小于所有字母,所以同一个word的job2记录在shuffle中排在job1记录前面。 * - Reduce:设置sortComparator,按照wordname(高优先级)、filename(低优先级)进行排序;设置groupComparator,按照wordname进行合并记录。 * * 程序输入:Job1的输出文件,格式为“filename, wordname, termfreq”;Job2的输出文件,格式为“wordname,idf”。 * 程序输出:单词的tfIdf,格式为“filename,wordname, idf”。 * (Shuffle)排序规则:1)wordname。2)filename。 * (Shuffle)合并规则:wordname。 */ public static class Tfidf_tfmapper extends Mapper<LongWritable, Text, Docword, Docword> { Docword outputKey=new Docword(); Docword outputValue=new Docword(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub String []words=StringUtils.split(value.toString(), ','); outputKey.setWord(words[1]); outputKey.setDocname(words[0]); outputValue.setDocname(words[0]); outputValue.setWord(words[1]); outputValue.setIndex(Double.parseDouble(words[2])); context.write(outputKey, outputValue);//读入Job1的输出文件,格式为“filename, wordname, termfreq” } } public static class Tfidf_idfmapper extends Mapper<LongWritable, Text, Docword, Docword> { Docword outputValue=new Docword(); Docword outputKey=new Docword(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub String []words=StringUtils.split(value.toString(), ','); outputValue.setDocname("!alldoc"); outputValue.setWord(words[0]); outputValue.setIndex(Double.parseDouble(words[1])); outputKey.setWord(words[0]); outputKey.setDocname("!alldoc"); context.write(outputKey, outputValue);//读入Job2的输出文件,格式为“wordname,idf”。 } } public static class Tfidfreducer extends Reducer<Docword, Docword, Text, DoubleWritable> { Text outputKey=new Text(); DoubleWritable outputValue=new DoubleWritable(); @Override protected void reduce(Docword key, Iterable<Docword> values, Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub double termidf=0.0,termfq=0.0; for(Docword value:values){ //读入job2的输入数据后,设置filename为“!alldoc”。由于!的ASCII码小于所有字母,所以同一个word的job2记录在shuffle中排在job1记录前面。 if(value.getDocname().equals("!alldoc")){ termidf=value.getIndex(); }else{ termfq=value.getIndex(); outputKey.set(value.getDocname()+","+value.getWord()); outputValue.set(termidf*termfq); context.write(outputKey, outputValue); } } } } public static class Tfidfsort extends WritableComparator { @Override public int compare(WritableComparable a, WritableComparable b) { // TODO Auto-generated method stub Docword lhs=(Docword)a; Docword rhs=(Docword)b; int result=0; result=(result!=0)?result:lhs.getWord().compareTo(rhs.getWord()); result=(result!=0)?result:lhs.getDocname().compareTo(rhs.getDocname()); return result;//按照wordname、filename进行排序。 } public Tfidfsort() { super(Docword.class,true); // TODO Auto-generated constructor stub } } public static class Tfidfpartition extends Partitioner<Docword, Docword> { @Override public int getPartition(Docword key, Docword value, int numPartitions) { // TODO Auto-generated method stub return Math.abs((key.getWord().hashCode()))%numPartitions; } } public static class Tfidfgroup extends WritableComparator { @Override public int compare(WritableComparable a, WritableComparable b) { // TODO Auto-generated method stub Docword lhs=(Docword)a; Docword rhs=(Docword)b; return lhs.getWord().compareTo(rhs.getWord());//按照wordname进行合并。 } public Tfidfgroup() { super(Docword.class,true); // TODO Auto-generated constructor stub } } public int run(String []args) throws Exception{ Path in1=new Path("data/wordcount");//输入文件路径 Path out1=new Path("output/tfidf-tf");//Job1的tf结果输出路径 Path out2=new Path("output/tfidf-idf");//Job2的idf结果输出路径 Path out3=new Path("output/tfidf-tfidf");//Job3的tfidf结果输出路径 URI skipstr=new URI("data/skipstr");//Job1的待删除字符串路径 //============Job1配置============ Job job1=Job.getInstance(getConf(), "tfidf-tf"); Configuration conf1=job1.getConfiguration(); job1.setJarByClass(getClass()); FileInputFormat.setInputPaths(job1, in1); out1.getFileSystem(conf1).delete(out1, true); FileOutputFormat.setOutputPath(job1, out1); conf1.set(TextOutputFormat.SEPERATOR, DELIMITER); job1.setInputFormatClass(TextInputFormat.class); job1.setOutputFormatClass(TextOutputFormat.class); job1.setMapperClass(Tfmapper.class); job1.setMapOutputKeyClass(Docword.class); job1.setMapOutputValueClass(IntWritable.class); job1.setCombinerClass(Tfcombiner.class); job1.setReducerClass(Tfreducer.class); job1.setOutputKeyClass(Docword.class); job1.setOutputValueClass(DoubleWritable.class); job1.setPartitionerClass(Tfpartition.class); job1.addCacheFile(skipstr); job1.setNumReduceTasks(3); if(job1.waitForCompletion(true)==false) return 1; //============Job2配置============ Job job2=Job.getInstance(getConf(), "tfidf-idf"); Configuration conf2=job2.getConfiguration(); job2.setJarByClass(getClass()); FileInputFormat.setInputPaths(job2, out1); out2.getFileSystem(conf2).delete(out2, true); //利用HDFS API接口,获得输入文件总数,并通过变量filesum传递给Job2。 FileSystem hdfs=FileSystem.get(conf2); FileStatus p[]=hdfs.listStatus(in1); conf2.set("filesnum", Integer.toString(p.length)); FileOutputFormat.setOutputPath(job2, out2); conf2.set(TextOutputFormat.SEPERATOR, DELIMITER); job2.setInputFormatClass(TextInputFormat.class); job2.setOutputFormatClass(TextOutputFormat.class); job2.setSortComparatorClass(Idfsort.class); job2.setGroupingComparatorClass(Idfsort.class); job2.setMapperClass(Idfmapper.class); job2.setMapOutputKeyClass(Docword.class); job2.setMapOutputValueClass(IntWritable.class); job2.setCombinerClass(Idfcombiner.class); job2.setReducerClass(Idfreducer.class); job2.setOutputKeyClass(Text.class); job2.setOutputValueClass(DoubleWritable.class); job2.setNumReduceTasks(3); job2.setPartitionerClass(Idfpartition.class); if(job2.waitForCompletion(true)==false) return 1; //============Job3配置============ Job job3=Job.getInstance(getConf(), "tfidf-tfidf"); Configuration conf3=job3.getConfiguration(); job3.setJarByClass(getClass()); out3.getFileSystem(conf3).delete(out3, true); FileOutputFormat.setOutputPath(job3, out3); conf3.set(TextOutputFormat.SEPERATOR, DELIMITER); job3.setOutputFormatClass(TextOutputFormat.class); //利用MultipleInputs,配置Map分别读取Job1、Job2的输出文件 MultipleInputs.addInputPath(job3, out1, TextInputFormat.class, Tfidf_tfmapper.class); MultipleInputs.addInputPath(job3, out2, TextInputFormat.class, Tfidf_idfmapper.class); job3.setMapOutputKeyClass(Docword.class); job3.setMapOutputValueClass(Docword.class); job3.setReducerClass(Tfidfreducer.class); job3.setOutputKeyClass(Text.class); job3.setOutputValueClass(DoubleWritable.class); job3.setNumReduceTasks(3); job3.setSortComparatorClass(Tfidfsort.class); job3.setGroupingComparatorClass(Tfidfgroup.class); job3.setPartitionerClass(Tfidfpartition.class); return job3.waitForCompletion(true)?0:1; } public static void main(String []args) throws Exception{ int result=0; try{ result=ToolRunner.run(new Configuration(), new Tfidf(), args); }catch(Exception e){ e.printStackTrace(); } System.exit(result); } }</span>
标签:
原文地址:http://blog.csdn.net/xuefei2/article/details/51643677