对于矩阵A[mn]*B[nl]=C[ml]。这里可以并行起来的就是每个Cij,对于Cij而言,他是由A的第i行和B的第j列相乘得到。由于大的矩阵中经常是稀疏矩阵,所以一般用行列值表示
例如对于A:
1 2 3
4 5 0
7 8 9
10 11 12
他的行列值表为:
1 1 1
1 2 2
1 3 3
2 1 4
2 2 5
3 1 7
3 2 8
3 3 9
4 1 10
4 2 11
4 3 12
1 2 2
1 3 3
2 1 4
2 2 5
3 1 7
3 2 8
3 3 9
4 1 10
4 2 11
4 3 12
对于B:
10 15
0 2
11 9
行列值:
1 1 10
1 2 15
2 2 2
3 1 11
3 2 9
1 2 15
2 2 2
3 1 11
3 2 9
我们以计算C11为例,这里需要用到A的行列值中行为1的数据和B行列值中列为1的数据。我们的想法是每个Reduce计算一个Cij,并且map输出的key为Cij的i,j,value为A的第i行,和B的第j列。那么整体的框架就是在map中每次读入一行数据,如果是从mat_a.txt中读入的就是A,如果是从mat_b.txt中读入的那就是B的一个数据,同时我们发现对于A11他会在计算C11和C12的时候都被用到,其实是A的第一行数据的时候会在计算C第一行数据的时候被用到,那么我们需要对每个Aij数据进行多次分发,分发的次数为B的列数,例如A11需要分发<1,1>,<1,2>规律就是A的<row,i>i为B的列数,同理可得对于B11来说会在求C11,C21,C31,C41的时候被使用,所以每个C的数据需要被分发4次,规律为<j,col>j为A的行数。这样通过map之后就可以获得每个Cij所需要的数据了。
例如对于C1,2经过map后的数据为
a 1 1
a 2 2
a 3 3
b 1 15
b 2 2
b 3 9
接着为了减少空间,在reduce中,创建了一个class,用与存放下标和数据。
class A implements Comparable<A>{
public int index;
public int value;
A( int index, int value){
this .index =index;
this .value =value;
}
public int compareTo(A a){
if (index <a. index) return -1;
else return 1;
}
}
并且创建两个实例list1和list2,1用于放A的数据,2用于放B的数据。经过排序后(一开始没有排序,导致出错),就遍历这两个,由于对于0值这里没有存放数据,所以在相乘的条件是两者的index相同。自己在写的时候在这个地方出现了一个逻辑上的错误,就是在遍历两个list,并让他们在index相等的时候相乘。
int j=0;
for (int i=0;i<list1.size();i++){
if (list1.get(i).index ==list2.get(j). index){
sum=sum+list1.get(i). value *list2.get(j).value ;
j++;
}if (list1.get(i). index>list2.get(j). index ){
j++;
}
}
这是一开始写的,发现一直报下标出错,后来发现原来是第二个if的问题,例如此时的list1是
1 1
2 2
3 3
list2 :
1 10
3 2
在第一个if判断后,j++了结果后进行了下一个if。而应该是else if。一个疏忽大意。
具体代码:
Map
public class MyMapper extends Mapper<LongWritable, Text, Text, Text> {
public static int MATRIX_I =4;
public static int MATRIX_J =3;
public static int MATRIX_K =2;
public void map(LongWritable ikey, Text ivalue, Context context)
throws IOException, InterruptedException {
String filename = ((FileSplit) context.getInputSplit()).getPath().getName();
String line=ivalue.toString();
int row=Integer.parseInt( line.split( " ")[0]);
int col=Integer.parseInt( line.split( " ")[1]);
int num=Integer.parseInt( line.split( " ")[2]);
if (filename.compareTo("mat_a.txt" )==0){
for (int i=1;i<= MATRIX_K;i++){
String key="<" +row+ ":"+i+ ">" ;
String value="a" + " "+col+ " " +num;
context.write( new Text(key), new Text(value));
}
} else if (filename.compareTo( "mat_b.txt")==0){
for (int i=1;i<= MATRIX_I;i++){
String key="<" +i+ ":"+col+ ">" ;
String value="b" + " "+row+ " " +num;
context.write( new Text(key), new Text(value));
}
}
}
}
Reduce:
public void reduce(Text _key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
// process values
int n=0;
ArrayList<A> list1= new ArrayList<A>();
ArrayList<A> list2= new ArrayList<A>();
System. out .println(" " );
System. out .println("key is " +_key.toString());
System. out .println(" " );
for (Text val : values) {
String[] line=val.toString().split(" " );
/*
System.out.println(" ");
System.out.println("line is "+line[0]+" "+line[1]+" "+line[2]);
System.out.println(" ");
*/
A a=new A(Integer.parseInt(line[1]),Integer. parseInt(line[2]));
if (line[0].compareTo("a" )==0){
System. out .println(" " );
System. out .println("this is a" );
System. out .println(" " );
list1.add(a);
} else if (line[0].compareTo( "b")==0){
System. out .println(" " );
System. out .println("this is b" );
System. out .println(" " );
list2.add(a);
}
}
int sum=0;
int j=0;
Collections. sort(list1);
Collections. sort(list2);
for (A a:list1){
System. out .println("list1 is " +a. index+ " " +a.value );
}
System. out .println(" " );
for (A a:list2){
System. out .println("list2 is " +a. index+ " " +a.value );
}
for (int i=0;i<list1.size();i++){
if (list1.get(i).index ==list2.get(j). index){
sum=sum+list1.get(i). value *list2.get(j).value ;
j++;
} else if (list1.get(i). index>list2.get(j). index ){
j++;
}
}
System. out .println(" " );
System. out .println("sum is " +sum);
System. out .println(" " );
context.write(_key, new Text(String.valueOf(sum)));
}
}