标签:mst mapreduce实现
对应于下面的一个输入图,求出最短距离:
对应与的输入文件在hdfs上面存储的形式如下:制表符分隔
代码如下:
package MST; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.Map; import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner; public class MST { /** * @param args */ //路径写死的 可以修改 static final String INPUT_PATH = "hdfs://localhost:9000/input1/MST"; static final String OUTPUT_PATH = "hdfs://localhost:9000/outputMst"; static enum MSTCounters{ totalWeight //计数作用 统计最后的最小权值 } //配置驱动 public static void main(String[] args) throws IOException, URISyntaxException, ClassNotFoundException, InterruptedException { // TODO Auto-generated method stub Configuration conf = new Configuration(); Job job = new Job(conf,MST.class.getSimpleName()); final FileSystem filesystem = FileSystem.get(new URI(INPUT_PATH),conf); final Path outPath = new Path(OUTPUT_PATH); if(filesystem.exists(outPath)){ filesystem.delete(outPath, true); } //读取文件位置 FileInputFormat.setInputPaths(job,INPUT_PATH); job.setJarByClass(MST.class); //设置map job.setMapOutputKeyClass(IntWritable.class); job.setMapOutputValueClass(Text.class); job.setMapperClass(MSTMapper.class); //1.3分区 job.setPartitionerClass (HashPartitioner.class); job.setNumReduceTasks(1); //1.4 排序 //1.5 合并 //2.1 网络拷贝 //2.2 指定reducer类 job.setReducerClass(MSTReducer.class); //2.3 输出类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); //输出文件路径 FileOutputFormat.setOutputPath(job, outPath); //执行 job.waitForCompletion(true); //获取计数值 并且在终端输出 Counters jobCounters = job.getCounters(); long totalWeight = jobCounters.findCounter(MSTCounters.totalWeight).getValue(); System.out.println("the total weight of the MST is "+totalWeight); } /** * * 实验采用的最小生成树的算法是Kruskal算法:具体描述过程参考博客:blog.csdn.net/xd_122/article/details/40684223 * */ static class MSTMapper extends Mapper <LongWritable , Text , IntWritable , Text>{ protected void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException{ Text srcDestPair = new Text(); String[] inputTokens = value.toString().split("\t"); String weight = inputTokens[0]; int wt = Integer.parseInt(weight); srcDestPair.set(inputTokens[1]+":"+inputTokens[2]); context.write(new IntWritable(wt),srcDestPair); } } static class MSTReducer extends Reducer <IntWritable , Text , Text , Text >{ Map<String,Set<String>> node_AssociatedSet = new HashMap<String, Set<String>>(); //记录节点信息 键值对 protected void reduce(IntWritable key , Iterable<Text> values, Context context) throws IOException, InterruptedException{ String strKey = new String(); strKey += key; Text outputKey = new Text(strKey); for(Text val : values){ //判断节点是否存在于同一个树中 boolean ignoreSameEdgeSet1 = false; boolean ignoreSameEdgeSet2 = false; boolean ignoreSameEdgeSet3 = false; Set<String> nodeSet = new HashSet<String>(); String[] srcDest = val.toString().split(":"); String src = srcDest[0]; String dest = srcDest[1]; ignoreSameEdgeSet1 = isSameSet(src,dest); nodeSet.add(src); nodeSet.add(dest); ignoreSameEdgeSet2 = unionSet(nodeSet,src,dest); ignoreSameEdgeSet3 = unionSet(nodeSet,dest,src); if(!ignoreSameEdgeSet1 && !ignoreSameEdgeSet2 && !ignoreSameEdgeSet3){ long weight_value = Long.parseLong(outputKey.toString()); //统计总权值 计数器 写入到计数类到totalWeight里面 context.getCounter(MSTCounters.totalWeight).increment(weight_value); context.write(outputKey,val); } } } private boolean unionSet(Set<String> nodeSet, String src, String dest) { // TODO Auto-generated method stub boolean ignoreEdge = false; if(!node_AssociatedSet.containsKey(src)){ node_AssociatedSet.put(src, nodeSet); }else{ Set<String> associatedSet = node_AssociatedSet.get(src); Set<String> nodeset = new HashSet<String>(); nodeset.addAll(associatedSet); Iterator<String> nodeItr = nodeset.iterator(); Iterator<String> duplicateCheckItr = nodeset.iterator(); while(duplicateCheckItr.hasNext()){ String n = duplicateCheckItr.next(); if(node_AssociatedSet.get(n).contains(dest)){ ignoreEdge = true; } } while(nodeItr.hasNext()){ String nextNode = nodeItr.next(); if(!node_AssociatedSet.containsKey(nextNode)){ node_AssociatedSet.put(nextNode, nodeSet); } node_AssociatedSet.get(nextNode).addAll(nodeSet); } } return ignoreEdge; } private boolean isSameSet(String src, String dest) { // TODO Auto-generated method stub boolean ignoreEdge = false; for(Map.Entry<String, Set<String>> node_AssociatedSetValue : node_AssociatedSet.entrySet()){ Set<String> nodesInSameSet = node_AssociatedSetValue.getValue(); if(nodesInSameSet.contains(src) &&nodesInSameSet.contains(dest)){ ignoreEdge = true; } } return ignoreEdge; } } }
上述的输出结果如下:
查看结果:
对应输出的权值在终端 打印出:14
更详细的讲解参考:http://hadooptutorial.wikispaces.com/Sorting+feature+of+MapReduce
标签:mst mapreduce实现
原文地址:http://blog.csdn.net/xd_122/article/details/41923447