标签:
//map类
package hadoop3;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class danbiaomap extends Mapper <LongWritable,Text,Text,Text>{
String childname=new String();
String parientname=new String();
String flag=new String();
protected void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException
{
String [] str=value.toString().split("\t");
if (str[0].compareTo("child")!=0)
{ //left table
flag="1";
childname=str[0];
parientname=str[1];
context.write(new Text(parientname), new Text(flag+"+"+childname+"+"+parientname));
//right table
flag="2";
context.write(new Text(childname), new Text(flag+"+"+childname+"+"+parientname));
}
}
}
//reduce 类
package hadoop3;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class danbiaoreduce extends Reducer<Text,Text,Text,Text>{
private int num=0;
protected void reduce(Text key,Iterable<Text> value, Context context) throws IOException, InterruptedException
{
if (num==0)
{
context.write(new Text("grandchild"),new Text( "grandparient"));
num++;
}
Iterator <Text> itr=value.iterator();
int grandchildnum=0;
String [] grandchild=new String[100];
int grandparientnum=0;
String [] grandparient=new String[100];
while (itr.hasNext())
{
String [] record=itr.next().toString().split("\\+");
if (record[0].compareTo("1")==0)
{
grandchild[grandchildnum]=record[1];
grandchildnum++;
}
else if (record[0].compareTo("2")==0)
{
grandparient[grandparientnum]=record[2];
grandparientnum++;
}
else
{}
}
if(grandchildnum !=0 && grandparientnum !=0)
{
for (int i=0;i<grandparientnum;i++)
{
for (int j=0;j<grandchildnum;j++)
{
context.write(new Text(grandchild[i]), new Text(grandparient[j]));
}
}
}
}
}
//主类
package hadoop3;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
//import com.sun.jersey.core.impl.provider.entity.XMLJAXBElementProvider.Text;
public class danbiao extends Configured implements Tool{
public static void main(String[] args) throws Exception {
// TODO Auto-generated method stub
ToolRunner.run(new danbiao(), args);
}
@Override
public int run(String[] arg0) throws Exception {
// TODO Auto-generated method stub
Configuration conf=getConf();
Job job=new Job();
job.setJarByClass(getClass());
FileSystem fs=FileSystem.get(conf);
fs.delete(new Path("/outfile1104"),true);
FileInputFormat.addInputPath(job,new Path("/luo/danbiao.txt"));
FileOutputFormat.setOutputPath(job, new Path("/outfile1104"));
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setMapperClass(danbiaomap.class);
job.setReducerClass(danbiaoreduce.class);
job.waitForCompletion(true);
return 0;
}
}
标签:
原文地址:http://www.cnblogs.com/luo-mao/p/5872496.html