标签:
1 package hadoop.MachineLearning.Bayes.Pro; 2 3 4 5 import org.apache.hadoop.conf.Configuration; 6 import org.apache.hadoop.fs.Path; 7 import org.apache.hadoop.io.IntWritable; 8 import org.apache.hadoop.io.Text; 9 import org.apache.hadoop.mapreduce.Job; 10 import org.apache.hadoop.mapreduce.Mapper; 11 import org.apache.hadoop.mapreduce.Reducer; 12 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 13 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 14 15 public class PriorProbability {//用于求各个类别下的单词数,为后面求先验概率 16 17 public static void main(String[] args) throws Exception { 18 Configuration conf = new Configuration(); 19 String input="hdfs://10.107.8.110:9000/Bayes/Bayes_input/"; 20 String output="hdfs://10.107.8.110:9000/Bayes/Bayes_output/Pro/"; 21 Job job = Job.getInstance(conf, "ProirProbability"); 22 job.setJarByClass(hadoop.MachineLearning.Bayes.Pro.PriorProbability.class); 23 // TODO: specify a mapper 24 job.setMapperClass(MyMapper.class); 25 //job.setMapInputKeyClass(LongWritable.class); 26 // TODO: specify a reducer 27 job.setMapOutputKeyClass(Text.class); 28 job.setMapOutputValueClass(Text.class); 29 job.setReducerClass(MyReducer.class); 30 31 // TODO: specify output types 32 job.setOutputKeyClass(Text.class); 33 job.setOutputValueClass(IntWritable.class); 34 35 // TODO: specify input and output DIRECTORIES (not files) 36 FileInputFormat.setInputPaths(job, new Path(input)); 37 FileOutputFormat.setOutputPath(job, new Path(output)); 38 39 if (!job.waitForCompletion(true)) 40 return; 41 } 42 43 } 44 45 46 package hadoop.MachineLearning.Bayes.Pro; 47 48 import java.io.IOException; 49 50 import org.apache.hadoop.io.LongWritable; 51 import org.apache.hadoop.io.Text; 52 import org.apache.hadoop.mapreduce.Mapper; 53 import org.apache.hadoop.mapreduce.Mapper.Context; 54 55 public class MyMapper extends Mapper<LongWritable, Text, Text, Text> { 56 57 public void map(LongWritable ikey, Text ivalue, Context context) 58 throws IOException, InterruptedException { 59 String[] line=ivalue.toString().split(":| "); 60 int size=line.length-1; 61 context.write(new Text(line[0]),new Text(String.valueOf(size))); 62 } 63 64 } 65 66 67 package hadoop.MachineLearning.Bayes.Pro; 68 69 import java.io.IOException; 70 71 import org.apache.hadoop.io.IntWritable; 72 import org.apache.hadoop.io.Text; 73 import org.apache.hadoop.mapreduce.Reducer; 74 import org.apache.hadoop.mapreduce.Reducer.Context; 75 76 public class MyReducer extends Reducer<Text, Text, Text, IntWritable> { 77 78 public void reduce(Text _key, Iterable<Text> values, Context context) 79 throws IOException, InterruptedException { 80 // process values 81 int sum=0; 82 for (Text val : values) { 83 sum+=Integer.parseInt(val.toString()); 84 } 85 context.write(_key,new IntWritable(sum)); 86 } 87 88 }
package hadoop.MachineLearning.Bayes.Count; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class Count {//计算文档中的单词种类数目 public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "Count"); String input="hdfs://10.107.8.110:9000/Bayes/Bayes_input"; String output="hdfs://10.107.8.110:9000/Bayes/Bayes_output/Count"; job.setJarByClass(hadoop.MachineLearning.Bayes.Count.Count.class); // TODO: specify a mapper job.setMapperClass(MyMapper.class); // TODO: specify a reducer job.setCombinerClass(MyCombiner.class); job.setReducerClass(MyReducer.class); // TODO: specify output types job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); // TODO: specify input and output DIRECTORIES (not files) FileInputFormat.setInputPaths(job, new Path(input)); FileOutputFormat.setOutputPath(job, new Path(output)); if (!job.waitForCompletion(true)) return; } } package hadoop.MachineLearning.Bayes.Count; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class MyMapper extends Mapper<LongWritable, Text, Text, Text> { public void map(LongWritable ikey, Text ivalue, Context context) throws IOException, InterruptedException { String[] line=ivalue.toString().split(":| "); String key="1"; System.out.println(" "); System.out.println(" "); System.out.println(" "); for(int i=1;i<line.length;i++){ System.out.println(line[i]); context.write(new Text(key),new Text(line[i]));//以相同的key进行输出,使得能最后输出到一个reduce中 } } } package hadoop.MachineLearning.Bayes.Count; import java.io.IOException; import java.util.HashSet; import java.util.Iterator; import java.util.Set; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class MyCombiner extends Reducer<Text, Text, Text, Text> {//先在本地的节点上利用set删去重复的单词 public void reduce(Text _key, Iterable<Text> values, Context context) throws IOException, InterruptedException { // process values Set set=new HashSet(); for (Text val : values) { set.add(val.toString()); } for(Iterator it=set.iterator();it.hasNext();){ context.write(new Text("1"),new Text(it.next().toString())); } } } package hadoop.MachineLearning.Bayes.Count; import java.io.IOException; import java.util.HashSet; import java.util.Set; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class MyReducer extends Reducer<Text, Text, Text, Text> {//通过combiner后,再利用set对单词进行去重,最后得到种类数 public void reduce(Text _key, Iterable<Text> values, Context context) throws IOException, InterruptedException { // process values Set set=new HashSet(); for (Text val : values) { set.add(val.toString()); } context.write(new Text("num is "),new Text(String.valueOf(set.size()))); } }
package hadoop.MachineLearning.Bayes.Cond; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class CondiPro {//用于求条件概率 public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String input="hdfs://10.107.8.110:9000/Bayes/Bayes_input"; String output="hdfs://10.107.8.110:9000/Bayes/Bayes_output/Con"; String proPath="hdfs://10.107.8.110:9000/Bayes/Bayes_output/Pro";//这是之前求各个类别下单词数目的输出 String countPath="hdfs://10.107.8.110:9000/Bayes/Bayes_output/Count";//这是之前求的单词种类数 conf.set("propath",proPath); conf.set("countPath",countPath); Job job = Job.getInstance(conf, "ConditionPro"); job.setJarByClass(hadoop.MachineLearning.Bayes.Cond.CondiPro.class); // TODO: specify a mapper job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); // TODO: specify a reducer job.setReducerClass(MyReducer.class); // TODO: specify output types job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // TODO: specify input and output DIRECTORIES (not files) FileInputFormat.setInputPaths(job, new Path(input)); FileOutputFormat.setOutputPath(job, new Path(output)); if (!job.waitForCompletion(true)) return; } } package hadoop.MachineLearning.Bayes.Cond; import java.io.IOException; import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable> { public void map(LongWritable ikey, Text ivalue, Context context) throws IOException, InterruptedException { String[] line=ivalue.toString().split(":| "); for(int i=1;i<line.length;i++){ String key=line[0]+":"+line[i]; context.write(new Text(key),new IntWritable(1)); } } } package hadoop.MachineLearning.Bayes.Cond; import java.io.IOException; import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class MyReducer extends Reducer<Text, IntWritable, Text, DoubleWritable> { public Map<String,Integer> map; public int count=0; public void setup(Context context) throws IOException{ Configuration conf=context.getConfiguration(); String proPath=conf.get("propath"); String countPath=conf.get("countPath");// map=Utils.getMapFormHDFS(proPath);//获得各个类别下的单词数 count=Utils.getCountFromHDFS(countPath);//获得单词种类数 } public void reduce(Text _key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { // process values int sum=0; for (IntWritable val : values) { sum+=val.get(); } int type=Integer.parseInt(_key.toString().split(":")[0]); double probability=0.0; for(Map.Entry<String,Integer> entry:map.entrySet()){ if(type==Integer.parseInt(entry.getKey())){ probability=(sum+1)*1.0/(entry.getValue()+count);//条件概率的计算 } } context.write(_key,new DoubleWritable(probability)); } } package hadoop.MachineLearning.Bayes.Cond; import java.io.IOException; import java.util.HashMap; import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.util.LineReader; public class Utils { /** * @param args * @throws IOException */ public static Map<String,Integer> getMapFormHDFS(String input) throws IOException{ Configuration conf=new Configuration(); Path path=new Path(input); FileSystem fs=path.getFileSystem(conf); FileStatus[] stats=fs.listStatus(path); Map<String,Integer> map=new HashMap(); for(int i=0;i<stats.length;i++){ if(stats[i].isFile()){ FSDataInputStream infs=fs.open(stats[i].getPath()); LineReader reader=new LineReader(infs,conf); Text line=new Text(); while(reader.readLine(line)>0){ String[] temp=line.toString().split(" "); //System.out.println(temp.length); map.put(temp[0],Integer.parseInt(temp[1])); } reader.close(); } } return map; } public static Map<String,Double> getMapFormHDFS(String input,boolean j) throws IOException{ Configuration conf=new Configuration(); Path path=new Path(input); FileSystem fs=path.getFileSystem(conf); FileStatus[] stats=fs.listStatus(path); Map<String,Double> map=new HashMap(); for(int i=0;i<stats.length;i++){ if(stats[i].isFile()){ FSDataInputStream infs=fs.open(stats[i].getPath()); LineReader reader=new LineReader(infs,conf); Text line=new Text(); while(reader.readLine(line)>0){ String[] temp=line.toString().split(" "); //System.out.println(temp.length); map.put(temp[0],Double.parseDouble(temp[1])); } reader.close(); } } return map; } public static int getCountFromHDFS(String input) throws IOException{ Configuration conf=new Configuration(); Path path=new Path(input); FileSystem fs=path.getFileSystem(conf); FileStatus[] stats=fs.listStatus(path); int count=0; for(int i=0;i<stats.length;i++){ if(stats[i].isFile()){ FSDataInputStream infs=fs.open(stats[i].getPath()); LineReader reader=new LineReader(infs,conf); Text line=new Text(); while(reader.readLine(line)>0){ String[] temp=line.toString().split(" "); //System.out.println(temp.length); count=Integer.parseInt(temp[1]); } reader.close(); } } return count; } public static void main(String[] args) throws IOException { // TODO Auto-generated method stub String proPath="hdfs://10.107.8.110:9000/Bayes/Bayes_output/Pro"; String countPath="hdfs://10.107.8.110:9000/Bayes/Bayes_output/Count/"; Map<String,Integer> map=Utils.getMapFormHDFS(proPath); for(Map.Entry<String,Integer> entry:map.entrySet()){ System.out.println(entry.getKey()+"->"+entry.getValue()); } int count=Utils.getCountFromHDFS(countPath); System.out.println("count is "+count); } }
package hadoop.MachineLearning.Bayes.Predict; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class Predict { public static void main(String[] args) throws Exception {//预测 Configuration conf = new Configuration(); String input="hdfs://10.107.8.110:9000/Bayes/Predict_input"; String output="hdfs://10.107.8.110:9000/Bayes/Bayes_output/Predict"; String condiProPath="hdfs://10.107.8.110:9000/Bayes/Bayes_output/Con"; String proPath="hdfs://10.107.8.110:9000/Bayes/Bayes_output/Pro"; String countPath="hdfs://10.107.8.110:9000/Bayes/Bayes_output/Count"; conf.set("condiProPath",condiProPath); conf.set("proPath",proPath); conf.set("countPath",countPath); Job job = Job.getInstance(conf, "Predict"); job.setJarByClass(hadoop.MachineLearning.Bayes.Predict.Predict.class); // TODO: specify a mapper job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); // TODO: specify a reducer job.setReducerClass(MyReducer.class); // TODO: specify output types job.setOutputKeyClass(Text.class); job.setOutputValueClass(DoubleWritable.class); // TODO: specify input and output DIRECTORIES (not files) FileInputFormat.setInputPaths(job, new Path(input)); FileOutputFormat.setOutputPath(job, new Path(output)); if (!job.waitForCompletion(true)) return; } } package hadoop.MachineLearning.Bayes.Predict; import java.io.IOException; import java.util.HashMap; import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class MyMapper extends Mapper<LongWritable, Text, Text, Text> { public Map<String,Integer> map=new HashMap(); public void setup(Context context) throws IOException{ Configuration conf=context.getConfiguration(); String proPath=conf.get("proPath"); map=Utils.getMapFormHDFS(proPath); } public void map(LongWritable ikey, Text ivalue, Context context) throws IOException, InterruptedException { for(Map.Entry<String,Integer> entry:map.entrySet()){ context.write(new Text(entry.getKey()),ivalue);//对每一行数据,打上所有类别,方便后续的求条件概率 } } } package hadoop.MachineLearning.Bayes.Predict; import java.io.IOException; import java.util.HashMap; import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class MyReducer extends Reducer<Text, Text, Text, DoubleWritable> { public Map<String,Double> mapDouble=new HashMap();//存放条件概率 public Map<String,Integer> mapInteger=new HashMap();//存放各个类别下的单词数 public Map<String,Double> noFind=new HashMap();//用于那些单词没有出现在某个类别中的 public Map<String,Double> prePro=new HashMap();//求的后的先验概率 public void setup(Context context) throws IOException{ Configuration conf=context.getConfiguration(); String condiProPath=conf.get("condiProPath"); String proPath=conf.get("proPath"); String countPath=conf.get("countPath"); mapDouble=Utils.getMapFormHDFS(condiProPath,true); mapInteger=Utils.getMapFormHDFS(proPath); int count=Utils.getCountFromHDFS(countPath); for(Map.Entry<String,Integer> entry:mapInteger.entrySet()){ double pro=0.0; noFind.put(entry.getKey(),(1.0/(count+entry.getValue()))); } int sum=0; for(Map.Entry<String,Integer> entry:mapInteger.entrySet()){ sum+=entry.getValue(); } for(Map.Entry<String,Integer> entry:mapInteger.entrySet()){ prePro.put(entry.getKey(),(entry.getValue()*1.0/sum)); } } public void reduce(Text _key, Iterable<Text> values, Context context) throws IOException, InterruptedException { // process values String type=_key.toString(); double pro=1.0; for (Text val : values) { String[] words=val.toString().split(" "); for(int i=0;i<words.length;i++){ String condi=type+":"+words[i]; if(mapDouble.get(condi)!=null){//如果该单词出现在该类别中,说明有条件概率 pro=pro*mapDouble.get(condi); }else{//如果该单词不在该类别中,就采用默认的条件概率 pro=pro*noFind.get(type); } } } pro=pro*prePro.get(type); context.write(new Text(type),new DoubleWritable(pro)); } } package hadoop.MachineLearning.Bayes.Predict; import java.io.IOException; import java.util.HashMap; import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.util.LineReader; public class Utils { /** * @param args * @throws IOException */ public static Map<String,Integer> getMapFormHDFS(String input) throws IOException{ Configuration conf=new Configuration(); Path path=new Path(input); FileSystem fs=path.getFileSystem(conf); FileStatus[] stats=fs.listStatus(path); Map<String,Integer> map=new HashMap(); for(int i=0;i<stats.length;i++){ if(stats[i].isFile()){ FSDataInputStream infs=fs.open(stats[i].getPath()); LineReader reader=new LineReader(infs,conf); Text line=new Text(); while(reader.readLine(line)>0){ String[] temp=line.toString().split(" "); //System.out.println(temp.length); map.put(temp[0],Integer.parseInt(temp[1])); } reader.close(); } } return map; } public static Map<String,Double> getMapFormHDFS(String input,boolean j) throws IOException{ Configuration conf=new Configuration(); Path path=new Path(input); FileSystem fs=path.getFileSystem(conf); FileStatus[] stats=fs.listStatus(path); Map<String,Double> map=new HashMap(); for(int i=0;i<stats.length;i++){ if(stats[i].isFile()){ FSDataInputStream infs=fs.open(stats[i].getPath()); LineReader reader=new LineReader(infs,conf); Text line=new Text(); while(reader.readLine(line)>0){ String[] temp=line.toString().split(" "); //System.out.println(temp.length); map.put(temp[0],Double.parseDouble(temp[1])); } reader.close(); } } return map; } public static int getCountFromHDFS(String input) throws IOException{ Configuration conf=new Configuration(); Path path=new Path(input); FileSystem fs=path.getFileSystem(conf); FileStatus[] stats=fs.listStatus(path); int count=0; for(int i=0;i<stats.length;i++){ if(stats[i].isFile()){ FSDataInputStream infs=fs.open(stats[i].getPath()); LineReader reader=new LineReader(infs,conf); Text line=new Text(); while(reader.readLine(line)>0){ String[] temp=line.toString().split(" "); //System.out.println(temp.length); count=Integer.parseInt(temp[1]); } reader.close(); } } return count; } public static void main(String[] args) throws IOException { // TODO Auto-generated method stub String proPath="hdfs://10.107.8.110:9000/Bayes/Bayes_output/Pro"; String countPath="hdfs://10.107.8.110:9000/Bayes/Bayes_output/Count/"; Map<String,Integer> map=Utils.getMapFormHDFS(proPath); for(Map.Entry<String,Integer> entry:map.entrySet()){ System.out.println(entry.getKey()+"->"+entry.getValue()); } int count=Utils.getCountFromHDFS(countPath); System.out.println("count is "+count); } }
标签:
原文地址:http://www.cnblogs.com/sunrye/p/4553732.html