$Cannon$算法是并行矩阵乘法的经典算法,将多个处理器排列成二维网格,采用二维块划分的方法将矩阵分给不同的处理器计算各自的局部结果,以此来加速计算。在本文中,为方便起见,示例程序中的矩阵均为$n$阶方阵,处理器的数量为2的幂次,确保每个矩阵得到的局部矩阵的元素个数相同。
一、二维矩阵串行乘法 两个$n$维方阵的乘法$A \cdot B = C$的串行计算公式为: $$ C_{ij} = \sum_{k=0}^{n-1} A_{ik} \cdot B_{kj},. $$ 程序可以表示为:
1 2 3 4 5 6 7 void Multiply (double * A, double * B, double * C, int n) { for (int i = 0 ; i < n; i++) for (int j = 0 ; j < n; j++) for (int k = 0 ; k < n; k++) C[i * n + j] += A[i * n + k] * B[j + k * n]; }
程序将二维矩阵用一维矩阵线性展开,用一维数组来模拟二维数组。
二、Cannon算法 并行化二维矩阵乘法的一种思想是二维块划分方法,将$p,$ 个进程($p,$为完全平方数)排列成$\sqrt[]{p}\times\sqrt[]{p},$二维网格,然后将矩阵$A、B、C,$都分成$\sqrt[]{p}\times\sqrt[]{p},$ 块,均匀分布在网格上,矩阵第$(i,j),$个子块分别记为$A_{ij},$、$B_{ij},$、$C_{ij},$,存储在二维进程网格上坐标为$(i,j),$的进程$P_{ij},$上。计算$C_{ij},$时要将$A_{ik},$(第$i,$行进程上的所有$A,$的子块)和$B_{kj},$(第$j,$列进程上的所有$B,$的子块)都收集到进程$P_{ij},$上,再计算$C_{ij},$,公式可以表达为: $$C_{ij} = \sum_{k=0}^{\sqrt[]{p}-1} A_{ik} \cdot B_{kj}$$ 如下图所示:
然而每一个进程都重复收集$A_{ik},$和$B_{kj},$会造成空间的大量浪费,时间效率也比较低,于是有了更高效的$Canon,$算法,其表达式为: $$C_{ij} = \sum_{k=0}^{\sqrt[]{p}-1} A_{i,(i+j+k)%\sqrt[]{p}} \cdot B_{(i+j+k)%\sqrt[]{p},j}$$
$Canon,$算法基本思想是,每一个进程只存储$A,$、$B,$、$C,$矩阵的一个子块,本地相对应的$A,$、$B,$子块相乘后将结果累加到本地$C,$子块上,然后再与其他进程交换$A,$、$B,$子块,继续相乘将结果累加到本地$C,$子块上。但是要保证对应的$A,$、$B,$子块相乘,不能错位,我们注意到,在最开始,$P_{ij},$上的$A,$为所在行的第$j,$个,$B,$为所在列的第$i,$个,$A,$和$B,$子块并不对应,因此将一行上的$A,$子块循环向左移动$i,$格,一列上的$B,$子块循环向上移动$j,$格,这样$A,$和$B,$子块就能对应了,以后每执行一次计算,每行所有$A,$子块循环向左移动1格,每列上的$B,$子块循环向上移动1格,$A,$、$B,$子块相乘的结果累加在本地的$C,$子块上。
算法的个步骤表示如下:
1、第一次重排列 $k=0$时,$A_{i,(i+j)%\sqrt[]{p}}$并不处于$P_{ij},$上,需要对齐,于是$A_{i,(i+j)%\sqrt[]{p}}$传送到$P_{ij},$上,具体的实现方式是,二维网格上每一行的进程都将$A,$子块循环向左移位,第$i,$行的所有进程将$A,$子块循环向左移位$i,$个单位;同时$B_{(i+j)%\sqrt[]{p},j}$并不处于$P_{ij},$上,$B_{(i+j)%\sqrt[]{p},j}$传送到$P_{ij},$上,第$j,$列的所有进程将$B,$子块循环向上移位$j,$个单位,如下图所示: 得到的第一次重排列后的矩阵排列为: 每个进程得到初次重排列后的$A,$、$B,$子块后,将$A,$、$B,$子块相乘的结果累加在本地的$C,$子块上。
2、后续重排列 以后每进行一次计算,每行进程的$A,$子块都循环向左移动一个单位,每列进程的$B,$子块都循环的向上移动一个单位,如下图所示,$A,$、$B,$子块相乘的结果累加在本地的$C,$子块上,该步骤重复$\sqrt[]{p}-1,$次。 最后进程$P_{ij},$上能够得到本地的结果$C_{ij},$。
三、程序实现 1、创建二维进程拓扑结构 创建进程二维拓扑结构的函数为:
1 2 3 dims[0 ] = dims[1 ] = sqrt (comm_size); periods[0 ] = periods[1 ] = 1 ; MPI_Cart_create(MPI_COMM_WORLD, 2 , dims, periods, 1 , &comm_2d);
$comm_size,$为进程的总数量,$dims[2],$数组表示二维拓扑结构中每一维的大小,$period[2],$全部设置成1,表示拓扑结构的第$i,$维有环绕接。这样我们得到了新的进程通讯器$comm_2d,$。由于每一个进程都会被分配一个进程号以及进程坐标,从进程号获取进程坐标的函数如下:
1 MPI_Cart_coords(comm_2d, myrank, 2 , mycoords);
$myrank,$是进程序号,$mycoords,$是大小为2的一维数组。
2、输入输出矩阵 输入输出矩阵均为$8\times8,$矩阵,$A,$、$B,$矩阵均为正交矩阵,且$B=A^{T},$,$A,$矩阵为: 计算结果应该可以得到一个单位矩阵。
3、主程序 每个进程保存的本地矩阵子块分别为$local_A,$、$local_B,$、$local_C,$,方便起见,进程的数量设为1、4、16、64这4种情况中的一种。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 #include <stdio.h> #include <stdlib.h> #include <iostream> #include <math.h> #include <windows.h> #include <mpi.h> #define pi acos(-1) int myrank, comm_size, srcrank, dstrank;int dims[2 ], periods[2 ], mycoords[2 ], coords[2 ];int n = 8 , local_n;MPI_Comm comm_2d; void Multiply (double * A, double * B, double * C, int n) ;void GenerateData (MPI_Comm comm_2d, double * local_A, double * local_B) ;void Cannon (MPI_Comm comm_2d, double * local_A, double * local_B, double * local_C) ;void GatherResult (MPI_Comm comm_2d, double * local_C) ;int main () { double * local_A, * local_B, * local_C; MPI_Init(NULL , NULL ); MPI_Comm_size(MPI_COMM_WORLD, &comm_size); MPI_Comm_rank(MPI_COMM_WORLD, &myrank); if (myrank == 0 ) printf ("Number of Process: %d\n" , comm_size); double start = MPI_Wtime(); dims[0 ] = dims[1 ] = sqrt (comm_size); periods[0 ] = periods[1 ] = 1 ; MPI_Cart_create(MPI_COMM_WORLD, 2 , dims, periods, 1 , &comm_2d); MPI_Comm_rank(comm_2d, &myrank); MPI_Cart_coords(comm_2d, myrank, 2 , mycoords); local_n = n / dims[0 ]; local_A = (double *)malloc (sizeof (double ) * local_n * local_n); local_B = (double *)malloc (sizeof (double ) * local_n * local_n); local_C = (double *)malloc (sizeof (double ) * local_n * local_n); memset (local_A, 0 , sizeof (double ) * local_n * local_n); memset (local_B, 0 , sizeof (double ) * local_n * local_n); memset (local_C, 0 , sizeof (double ) * local_n * local_n); GenerateData(comm_2d, local_A, local_B); Cannon(comm_2d, local_A, local_B, local_C); GatherResult(comm_2d, local_C); double end = MPI_Wtime(); if (myrank == 0 ) printf ("Running time: %.2f seconds...\n" , end - start); MPI_Comm_free(&comm_2d); MPI_Finalize(); return 0 ; }
4、Cannon算法主函数 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 void Cannon (MPI_Comm comm_2d, double * local_A, double * local_B, double * local_C) { int uprank, downrank, leftrank, rightrank; MPI_Status status; coords[0 ] = mycoords[0 ]; coords[1 ] = (mycoords[1 ] - 1 ) % dims[1 ]; MPI_Cart_rank(comm_2d, coords, &leftrank); coords[1 ] = (mycoords[1 ] + 1 ) % dims[1 ]; MPI_Cart_rank(comm_2d, coords, &rightrank); coords[0 ] = (mycoords[0 ] - 1 ) % dims[0 ]; coords[1 ] = mycoords[1 ]; MPI_Cart_rank(comm_2d, coords, &uprank); coords[0 ] = (mycoords[0 ] + 1 ) % dims[0 ]; MPI_Cart_rank(comm_2d, coords, &downrank); coords[0 ] = mycoords[0 ]; coords[1 ] = (mycoords[1 ] - mycoords[0 ]) % dims[1 ]; MPI_Cart_rank(comm_2d, coords, &dstrank); coords[1 ] = (mycoords[1 ] + mycoords[0 ]) % dims[1 ]; MPI_Cart_rank(comm_2d, coords, &srcrank); if (myrank != dstrank) { MPI_Sendrecv_replace(local_A, local_n * local_n, MPI_DOUBLE, dstrank, 0 , srcrank, 0 , comm_2d, &status); } coords[1 ] = mycoords[1 ]; coords[0 ] = (mycoords[0 ] - mycoords[1 ]) % dims[0 ]; MPI_Cart_rank(comm_2d, coords, &dstrank); coords[0 ] = (mycoords[0 ] + mycoords[1 ]) % dims[0 ]; MPI_Cart_rank(comm_2d, coords, &srcrank); if (myrank != dstrank) { MPI_Sendrecv_replace(local_B, local_n * local_n, MPI_DOUBLE, dstrank, 0 , srcrank, 0 , comm_2d, &status); } Multiply(local_A, local_B, local_C, local_n); for (int time = 0 ; time < dims[0 ] - 1 ; time++) { MPI_Sendrecv_replace(local_A, local_n * local_n, MPI_DOUBLE, leftrank, 0 , rightrank, 0 , comm_2d, &status); MPI_Sendrecv_replace(local_B, local_n * local_n, MPI_DOUBLE, uprank, 0 , downrank, 0 , comm_2d, &status); Multiply(local_A, local_B, local_C, local_n); } } void Multiply (double * A, double * B, double * C, int n) { for (int i = 0 ; i < n; i++) for (int j = 0 ; j < n; j++) for (int k = 0 ; k < n; k++) { C[i * n + j] += A[i * n + k] * B[j + k * n]; Sleep(10 ); } }
其中$MPI_Cart_rank,$用于将进程坐标转换为进程号,$MPI_Sendrecv_replace,$函数可以视为$MPI_Send$以及$MPI_Recv,$函数的组合,用于在一个绕接环中,每一个进程向目标进程$dstrank$发送数据,并接受来自$srcrank$源进程的数据,并且在收发数据中所有进程使用的都是同一个缓存。使用该函数可以实现$A$、$B$子块的循环移位。
5、生成数据并分发到各进程的函数 0号进程将$A$、$B$矩阵的数据放入$bufferA$、$bufferB$中再发送给对应进程的$local_A$、$local_B$中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 void GenerateData (MPI_Comm comm_2d, double * local_A, double * local_B) { if (mycoords[0 ] == 0 && mycoords[1 ] == 0 ) { double A[8 * 8 ] = { cos (pi / 6 ), -sin (pi / 6 ), 0 , 0 , 0 , 0 , 0 , 0 , sin (pi / 6 ), cos (pi / 6 ), 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , cos (pi / 4 ), -sin (pi / 4 ), 0 , 0 , 0 , 0 , 0 , 0 , sin (pi / 4 ), cos (pi / 4 ), 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , cos (pi / 6 ), -sin (pi / 6 ), 0 , 0 , 0 , 0 , 0 , 0 , sin (pi / 6 ), cos (pi / 6 ), 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , cos (pi / 4 ), -sin (pi / 4 ), 0 , 0 , 0 , 0 , 0 , 0 , sin (pi / 4 ), cos (pi / 4 ), }; double * B = (double *)malloc (sizeof (double ) * n * n); memset (B, 0 , sizeof (double ) * n * n); for (int i = 0 ; i < n; i++) for (int j = 0 ; j < n; j++) B[j * n + i] = A[i * n + j]; for (int i = 0 ; i < local_n; i++) for (int j = 0 ; j < local_n; j++) { local_A[i * local_n + j] = A[i * n + j]; local_B[i * local_n + j] = B[i * n + j]; } for (int row = 0 ; row < dims[0 ]; row++) for (int col = 0 ; col < dims[1 ]; col++) { if (row == 0 && col == 0 ) continue ; int offset = row * dims[1 ] * local_n * local_n + col * local_n; double * bufferA = (double *)malloc (sizeof (double ) * local_n * local_n); double * bufferB = (double *)malloc (sizeof (double ) * local_n * local_n); for (int i = 0 ; i < local_n; i++) for (int j = 0 ; j < local_n; j++) { bufferA[i * local_n + j] = A[offset + i * n + j]; bufferB[i * local_n + j] = B[offset + i * n + j]; } coords[0 ] = row; coords[1 ] = col; MPI_Cart_rank(comm_2d, coords, &dstrank); MPI_Send(bufferA, local_n * local_n, MPI_DOUBLE, dstrank, 0 , comm_2d); MPI_Send(bufferB, local_n * local_n, MPI_DOUBLE, dstrank, 1 , comm_2d); free (bufferA); free (bufferB); } free (B); } else { MPI_Recv(local_A, local_n * local_n, MPI_DOUBLE, 0 , 0 , comm_2d, MPI_STATUS_IGNORE); MPI_Recv(local_B, local_n * local_n, MPI_DOUBLE, 0 , 1 , comm_2d, MPI_STATUS_IGNORE); } }
6、收集结果到0号进程的函数 所有进程将计算结果(本地$C$子块的数据)放入$bufferC$中发送给0号进程,0号进程收集$bufferC$中的数据放入$C$矩阵的对应位置中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 void GatherResult (MPI_Comm comm_2d, double * local_C) { int otherrank; MPI_Status status; if (coords[0 ] == 0 && coords[1 ] == 0 ) { double * C = (double *)malloc (sizeof (double ) * n * n); memset (C, 0 , sizeof (double ) * n * n); double * bufferC = (double *)malloc (sizeof (double ) * local_n * local_n); memset (bufferC, 0 , sizeof (double ) * local_n * local_n); for (int row = 0 ; row < dims[0 ]; row++) { for (int col = 0 ; col < dims[1 ]; col++) { if (row == 0 && col == 0 ) { for (int i = 0 ; i < local_n; i++) for (int j = 0 ; j < local_n; j++) C[i * n + j] = local_C[i * local_n + j]; continue ; } coords[0 ] = row; coords[1 ] = col; MPI_Cart_rank(comm_2d, coords, &otherrank); MPI_Recv(bufferC, local_n * local_n, MPI_DOUBLE, otherrank, 0 , comm_2d, &status); int offset = row * dims[1 ] * local_n * local_n + col * local_n; for (int i = 0 ; i < local_n; i++) for (int j = 0 ; j < local_n; j++) C[offset + i * n + j] = bufferC[i * local_n + j]; } } free (bufferC); for (int i = 0 ; i < n; i++) { for (int j = 0 ; j < n; j++) printf ("%.2f " , C[i * n + j]); printf ("\n" ); } } else { MPI_Send(local_C, local_n * local_n, MPI_DOUBLE, 0 , 0 , comm_2d); } }
四、实现结果 程序在$VS2019$上运行,可以看到随着进程数量的增加,$0$号进程的运行时间明显减少(显示器上显示的执行时间是$0$号进程的执行时间)。但是当进程增加到原来的$n$倍,$0$号进程的运行时间并不为原来的$\frac{1}{n}$,这一方面是因为$0$号进程需要与更多的进程点对点发送$A$、$B$矩阵的数据,另一个重要原因是我的电脑为$Intel$ $8$核$CPU$,最多只能有$8$个进程同时运行,因此会有$64-8=56$个进程会在等待队列和就绪队列上等待被$CPU$调度,影响总程序运行时间。但是多个进程确实明显加速了矩阵乘法。