那么我要找MapReduce is simple,那么就有file1 和file2有。基本的思想就是先按照MapReduce is simple一个个在索引上查找,例如
我们通过在主类中的配置实例写参数conf.set(key,value)这里的key,value都是String。要记住一点,这个语句一定要在jog.getInstance(conf)之前,否则都实例化了一个job了还怎么配置呢。接着在map或者reduce中通过
public class Find {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] pathargs= new GenericOptionsParser(conf, args).getRemainingArgs();
if(pathargs.length <2){
System. err.println(pathargs.length );
System. exit(2);
}
conf.set( "argsnum",Integer.toString(pathargs. length));
for(int i=2;i<pathargs.length;i++){
conf.set( "args"+i,pathargs[i]);
System. out.println(pathargs[i]);
}
Job job = Job. getInstance(conf, "JobName");
job.setJarByClass(org.apache.hadoop.examples10.Find. class);
// TODO: specify a mapper
job.setMapperClass(MyMapper. class);
// TODO: specify a reducer
job.setReducerClass(MyReducer. class);
// TODO: specify output types
job.setOutputKeyClass(Text. class);
job.setOutputValueClass(Text. class);
// TODO: specify input and output DIRECTORIES (not files)
FileInputFormat. setInputPaths(job, new Path(pathargs[0]));
FileOutputFormat. setOutputPath(job, new Path(pathargs[1]));
if (!job.waitForCompletion(true))
return;
}
}
public class MyMapper extends Mapper<LongWritable, Text, Text, Text> {
//String[] content={"MapReduce","is","simple"};
public void map(LongWritable ikey, Text ivalue, Context context)
throws IOException, InterruptedException {
Configuration conf=context.getConfiguration();
int argsnum=Integer.parseInt(conf.get( "argsnum"));
//int argsnum=conf.get(" argsnum");
int i=0;
ArrayList<String> content= new ArrayList<String>();
for(i=2;i<argsnum;i++ ){
//System.out.println(conf.get("args"+i));
content.add(conf.get( "args"+i));
}
String line=ivalue.toString();
String key=line.split( " " )[0];
String value=line.split( " " )[1];
StringTokenizer st= new StringTokenizer(value,";" );
for(i=0;i<content.size();i++){
if(content.get(i).compareTo(key)==0){
ArrayList<String> filelist=new ArrayList<String>();
while(st.hasMoreTokens()){
String file=st.nextToken();
file=file.split( ":")[0];
filelist.add(file);
}
for(int j=0;j<filelist.size();j++){
context.write( new Text(filelist.get(j)),new Text(key));
}
}
}
}
}
public class MyReducer extends Reducer<Text, Text, Text, Text> {
public void reduce(Text _key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
Configuration conf=context.getConfiguration();
int argsnum=Integer.parseInt(conf.get( "argsnum"));
// process values
int sum=0;
String filename= new String();
for(int i =2;i<argsnum; i++ ){
//System.out.println(conf.get("args"+i));
filename+=(conf.get( "args"+i));
filename+= " ";
}
for (Text val : values) {
sum++;
}
if(sum>=argsnum-2){
context.write( new Text(filename),_key);
}
}
}