public class MyMapper extends Mapper<LongWritable, Text, Text, Text> {
public static int MATRIX_I =4;
public static int MATRIX_J =3;
public static int MATRIX_K =2;
public void map(LongWritable ikey, Text ivalue, Context context)
throws IOException, InterruptedException {
String filename = ((FileSplit) context.getInputSplit()).getPath().getName();
String line=ivalue.toString();
int row=Integer.parseInt( line.split( " ")[0]);
int col=Integer.parseInt( line.split( " ")[1]);
int num=Integer.parseInt( line.split( " ")[2]);
if (filename.compareTo("mat_a.txt" )==0){
for (int i=1;i<= MATRIX_K;i++){
String key="<" +row+ ":"+i+ ">" ;
String value="a" + " "+col+ " " +num;
context.write( new Text(key), new Text(value));
}
} else if (filename.compareTo( "mat_b.txt")==0){
for (int i=1;i<= MATRIX_I;i++){
String key="<" +i+ ":"+col+ ">" ;
String value="b" + " "+row+ " " +num;
context.write( new Text(key), new Text(value));
}
}
}
}
Reduce:
public void reduce(Text _key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
// process values
int n=0;
ArrayList<A> list1= new ArrayList<A>();
ArrayList<A> list2= new ArrayList<A>();
System. out .println(" " );
System. out .println("key is " +_key.toString());
System. out .println(" " );
for (Text val : values) {
String[] line=val.toString().split(" " );
/*
System.out.println(" ");
System.out.println("line is "+line[0]+" "+line[1]+" "+line[2]);
System.out.println(" ");
*/
A a=new A(Integer.parseInt(line[1]),Integer. parseInt(line[2]));
if (line[0].compareTo("a" )==0){
System. out .println(" " );
System. out .println("this is a" );
System. out .println(" " );
list1.add(a);
} else if (line[0].compareTo( "b")==0){
System. out .println(" " );
System. out .println("this is b" );
System. out .println(" " );
list2.add(a);
}
}
int sum=0;
int j=0;
Collections. sort(list1);
Collections. sort(list2);
for (A a:list1){
System. out .println("list1 is " +a. index+ " " +a.value );
}
System. out .println(" " );
for (A a:list2){
System. out .println("list2 is " +a. index+ " " +a.value );
}
for (int i=0;i<list1.size();i++){
if (list1.get(i).index ==list2.get(j). index){
sum=sum+list1.get(i). value *list2.get(j).value ;
j++;
} else if (list1.get(i). index>list2.get(j). index ){
j++;
}
}
System. out .println(" " );
System. out .println("sum is " +sum);
System. out .println(" " );
context.write(_key, new Text(String.valueOf(sum)));
}
}