标签:hadoop高级编程 复合键 自定义数据类型
package reverseIndex; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; /** * 倒排索引:根据内容查找文件 * xd is a good man ->file1.txt * good boy is xd ->file2.txt * xd like beautiful women ->file3.txt * 对应下: * xd ->file1.txt file2.txt file3.txt * is ->file1.txt file2.txt * a ->file1.txt * good ->file1.txt file2.txt * man ->file1.txt * boy ->file2.txt * like ->file3.txt * beautiful ->file3.txt * women ->file3.txt * 在每个map函数中 所需数据对 是<"单词+文件名","词频"> 便于combiner的词频统计 * 而在combiner中 将数据对变为<"单词","文件名+词频"> 便于将相同的key的数据 分发到 同一个reducer中执行 (HashPartition). * @author XD */ public class inverseIndex { public static class Map extends Mapper<LongWritable,Text,Text,Text>{ private Text keyInfo = new Text(); //key值 private Text valueInfo = new Text(); //value值 private FileSplit split; //回去文件的splie对象 public void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException{ split = (FileSplit)context.getInputSplit(); //关键 获取<key,value>所属的split对象 StringTokenizer itr = new StringTokenizer(value.toString()); while(itr.hasMoreTokens()){ int splitIndex = split.getPath().toString().indexOf("file");//获取文件名 包含file的索引位置 keyInfo.set(itr.nextToken()+":"+split.getPath().toString().substring(splitIndex)); //设定key值 valueInfo.set("1"); context.write(keyInfo, valueInfo); } } } public static class combiner extends Reducer<Text,Text,Text,Text>{ private Text info = new Text(); //为了拆分 key值 准备存储新的value值 public void reduce(Text key,Iterable<Text>values,Context context) throws IOException, InterruptedException{ int sum = 0; for(Text val:values){ sum += Integer.parseInt(val.toString()); } int splitIndex = key.toString().indexOf(":"); info.set(key.toString().substring(splitIndex+1)+":"+sum); //新的value值 key.set(key.toString().substring(0, splitIndex)); context.write(key, info); } } public static class Reduce extends Reducer<Text,Text,Text,Text>{ private Text result = new Text(); //设定最终的输出结果 public void reduce(Text key,Iterable<Text>values,Context context) throws IOException, InterruptedException{ String list = new String(); for(Text val:values){ list += val.toString()+";"; //不同的索引文件分隔开来 } result.set(list); context.write(key,result); } } }
package com.rpc.nefu; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; //对自己输入的数据需要 可序列化 即自定义一个可序列化的类 public class keyvalue implements WritableComparable<keyvalue>{ public int x,y; public keyvalue(){ this.x = 0; this.y = 0; } public keyvalue(int x1,int y1){ this.x = x1; this.y = y1; } @Override public void readFields(DataInput in) throws IOException { // TODO Auto-generated method stub x = in.readInt(); y = in.readInt(); } @Override public void write(DataOutput out) throws IOException { // TODO Auto-generated method stub out.writeInt(x); out.writeInt(y); } public int distanceFromOrigin(){ return (x*x+y*y); } public boolean equals(keyvalue o){ if(!(o instanceof keyvalue)){ return false; } return (this.x == o.x) && (this.y == o.y); } public int hashCode() { return Float.floatToIntBits(x) ^ Float.floatToIntBits(y); } public String toString(){ return Integer.toString(x)+","+Integer.toString(y); } @Override public int compareTo(keyvalue o) { //return x; // TODO Auto-generated method stub if(x > o.x){ return 1; }else if(x == o.x){ return 0; }else{ return -1; } } }
Hadoop 高级程序设计(一)---复合键 自定义输入类型
标签:hadoop高级编程 复合键 自定义数据类型
原文地址:http://blog.csdn.net/xd_122/article/details/39550869