0%

MPI_Allreduce的前世今生

摘要

本文介绍Allreduce操作在MPI中的算法设计和实现,归纳和整理其发展脉络,追溯到最新的研究进展,并分析MPI的典型实现MPICH库中是怎样实现Allreduce例程的。

Allreduce介绍

Allreduce操作是MPI中最常用的集合通信操作,与之相似的是Reduce操作,假设有P个进程,每个进程都持有一个含N个元素的向量,所有的P个进程将自己的向量发送给根进程,根进程收集这些向量计算规约的结果(求和、求最大最小值等等),Reduce操作结果保存在根进程,Allreduce则将根进程的结果再广播出去。简单的在应用程序中调用MPI_Allreduce就可以完成上述例程,函数定义如下:
程序可以表示为:

1
2
3
4
5
6
7
int MPI_Allreduce(
const void *sendbuf, //存放源数据
void *recvbuf, //存放规约结果
int count, //数据个数
MPI_Datatype datatype, //数据类型
MPI_Op op, //规约操作类型
MPI_Comm comm); //一组通信进程

MPI_Reduce和MPI_Allreduce例程的图解如下所示:

图片源网址和MPI_Allreduce的入门教程在这里:英文版中文版

MPI_Allreduce广泛用于各种并行与分布式应用程序:科学计算、大数据、分布式机器学习、分布式深度学习DNN等等,并且已有工作表明MPI_Allreduce是使用频率和运行时间最长的集合通信操作。

自从MPI标准在1994年提出以来,MPI_Allreduce的相关研究从上世纪90年代就已经有很多了,而本文从2005年的一篇综述论文出发,一直追溯到现在,总结MPI_Allreduce的算法激情燃烧的昨天、老骥伏枥的今天和仰望星空过后的明天。

经典数据结构与算法

0、评估模型:

2005:Optimization of Collective Communication Operations in MPICH是一篇经典的MPI集合通信论文,介绍了常用的集合通信操作和对应的算法:Allgather, Broadcast, All-to-all, Reduce-Scatter, Reduce和Allreduce,并进一步的讨论了MPI_Reduce和MPI_Allreduce的优化。
这篇文章采用的性能评估模型是,任意两个节点之间发送一条消息的时间可以用,其中表示延迟(或者说启动时间),与消息大小无关,而表示每个字节的传输时间,n表示传输消息的字节数,对于规约操作则用 表示每个字节执行规约操作的时间消耗。由于MPI基于分布式存储系统,采用LogP模型(L:latency, o:overhead, g:gap, P:processor),一条长度为n的消息传输时间可以计算为: ,其中

也就是传输延迟L加上两端处理器的处理开销2o,和n个字节的传输时间,注意overhead是任意一个消息的处理开销,gap是连续两个字节的传输时间间隔。将上述式子变形,可以得到: ,其中

这也就是本文性能评估模型的公式,许多文献和教材中也用到了这个公式。我们将称为延迟项(latency term),而称为带宽项(bandwidth term),并用这个公式来形式化的评估集合通信算法的性能。
下面深入讨论这篇论文介绍的集合通信算法。

1、Allgather操作及算法

Allgather操作的图解如下:

MPI_Allgather函数可以参考MPI_Allgather

Ring Algorithm

MPICH中最初实现Allgather操作使用的就是Ring算法。

每一个进程i发送本地数据给进程,并且接受来自的数据(环绕方式)。以后每一步,进程i都向进程发送上一步接收的来自号进程的数据。假设有p个进程,该算法总共需要p-1步来完成。用n表示收集的数据总量,每一步每个进程都发送的数据,因此算法的时间消耗可以计算为:

Recursive Doubling

递归加倍算法的流程如下图所示:

在第一步,彼此间距离为1的进程之间互相交换数据,数据量为;第二步,彼此间距离为2的进程之间交换进程自己以及上一步从邻居进程接受的数据,数据量为;在第三步,彼此间距离为4的进程之间交换进程自己以及前两步从其他进程接受的数据,数据量为,以此类推,所有进程会在步获得所有数据,执行时间为:

其中带宽项和环算法相同,这是因为:

这个等式的内在逻辑是任意一个进程总要接受来自其他p-1进程发送的总共数据量,也就是说带宽项是不能进一步减少的,但是延迟项可以通过优化算法来减少。递归加倍算法能很好的处理进程数量为2的整数幂的情况,但较难处理进程数量非2的幂次的情况。

Bruck Algorithm

Bruck算法能够很好的处理进程数非2的幂次的情况,算法的执行步骤为步,算法图解如下所示

每个进程都有一片大小为n的缓存存放数据,在算法的开始,每个进程将本地数据拷贝到缓存的顶部。在第k步,进程i向目标进程发送本地的所有数据,并将接受的数据(来自进程)添加至本地数据的末尾,一共步。如果进程的数量不是2的幂,还需要额外的一步,每个进程向目标进程发送自己缓存头部的块数据(前面步骤都是本地全部数据,这里是头部的部分数据),并将接受的数据添加到本地缓存末尾。
现在,所有进程都已经获得了全部数据,但是数据并不是以正确的顺序排列在缓存中:进程i中的所有数据块都向上偏移了i块。因此简单的将所有数据块循环向下移动i块就能将数据块调整到正确的位置上。算法的时间开销为:

Allgather操作的算法选取策略是:

  • 当进程数量为2的幂并且发送短消息或者中等规模消息,采用Recursive doubling算法;
  • 当发送短消息以及进程数量非2的幂的情况下,采用Bruck算法;
  • 发送大消息,无论进程数量是多少,并且进程数量非2幂且发送中等规模消息,采用Ring算法。

2、Broadcast操作及算法

广播操作由根进程将根进程中的数据广播给所有进程,对应的是MPI_Bcast函数,可以参考MPI_Bcast

Bionomial Tree

MPICH中广播操作最初使用二项树算法。在第一步,根进程root向目标进程发送数据,进程以及根进程成为它们子树的根结点,继续递归执行算法。该算法一共执行步,在每一步所有进程发送的数据量均为n,因此算法的时间开销为:

Scatter + Allgather

这是一种组合算法,又叫Van de Geijn算法,将Scatter和Allgather两个操作组合成了Broadcast操作。Scatter(散播)操作与Broadcast操作的对比如下:

在该算法中,要广播的数据先分成若干份,散播到各个进程中,接着,散播的数据又收集到所有进程中,也就是再执行MPI_Allgather操作。其中Scatter操作使用二项树算法,时间消耗为:

时间消耗和Allgather递归加倍算法相同,仔细观察你会发现两者互为逆过程。而Allgather操作可以使用递归加倍算法或者环算法,总时间等于两者之和。
因此广播操作的二项树算法和Scatter+Allgather算法的时间消耗对比如下:

对比两个式子我们可以很容易得到:

  • 当消息两较小(即n较小)或者进程数量少时(小于8),我们使用二项树算法;
  • 当消息较大时或者进程数量较大时,我们采用Scatter + Allgather的组合算法。

3、Reduce-Scatter操作及其算法

Reduce-Scatter操作(多对多规约)是数据规约操作Reduce的一个变种,Reduce操作的结果保存在根进程中,而Reduce-Scatter将结果散发(Scatter)给所有进程。

二项树Reduce+线性Scatter

在MPICH中的老算法中,Reduce-Scatter操作先是将所有进程的数据通过二项树规约到0号进程,然后通过线性的散发操作将数据分发出去。二项树规约操作的时间为:

线性散发操作的时间为:

总的时间为:

Recursive Halving

递归减半算法和前面Allgather操作的递归加倍算法互为逆过程。

在第1步,进程分为2个子集,每一个进程都和与自己间隔的进程交换数据:每一个进程发送另一半集合所有进程都所需要的数据,并且接收自己所在进程集合都需要的数据,然后对收集到的数据进行规约操作。在第2步,每一个进程都和与自己间隔的进程交换数据。该过程如此递归进行下去,每一步通信数据也递归减半,进行步。算法的时间消耗为:

该算法能够正确执行的前提是规约操作是满足交换率的(commutative),满足交换律的规约操作使用频率更高,这是由于MPI定义的许多规约操作都是可交换的,例如MPI_SUM,MPI_MAX。如果进程的数量不是2的幂次,我们首先将进程的数量减少到2的幂次,具体做法是最开始的个偶数编号进程发送数据给最近的奇数编号进程,使得为2的幂。奇数编号进程对收集的数据执行规约操作,然后这些奇数号进程和其余的个进程(一共个)参与递归减半算法中计算自己的结果,最后,前x个奇数进程将结果返回给左邻居结点。这样算法的时间为:

Pairwise Exchange

成对交换算法适用于规约操作不满足交换律,其思想类似于Allgather操作递归加倍算法。在第1步,每一对邻居进程交换数据;第2步,彼此间距为2的进程交换数据;在第3步,彼此间距为4的进程交换数据,如此进行下去。然而它相较于Allgather操作,交换的数据更多。在第一步,进程交换除了自己所需要数据以外的所有数据(数据量),比方说0号进程把除了块0之外的块发送给1号进程,1号进程发送除块1之外的0、块发送给0号进程;第二步,进程交换除了自己和上一步通信进程所拥有数据以外的所有数据();第三步数据量为()。这样算法执行的时间为:

该算法适用于传输的消息量小于256B的情况。对长消息发送(满足交换律的操作是256KB,不满足交换律的操作是256B),我们使用执行p-1步的成对交换算法。在第i步,每一个进程向发送数据,接收来自进程的数据,并执行局部规约操作。交换的数据仅仅是用于散发结果的的数据量,也就是只需要发送每个进程需要的那一部分数据即可。算法执行需要的时间为:

Tips:

  • Commutative Operations(满足交换律的操作):MPI定义的数据归约操作包含sum、min、max、MinLoc、MaxLoc、(bitwise)OR、AND、XOR等等,其中有些是满足交换律的,有些不满足,
  • Associative Operation(满足结合律的操作):浮点加法和乘法满足交换律但不满足结合律,因为,例如
  • Recursive Havling适合于满足交换律的操作,Recursive Doubling只适用于满足结合律的操作。

MPI_Reduce_scatter策略:

  • 当操作满足交换律,消息采用递归减半算法,则采用(p-1)步的成对交换算法;
  • 操作不满足交换律,消息时采用步的成对交换算法,时采用(p-1)步的成对交换算法。

4、Reduce操作及其算法

Bionomial Tree

MPICH中老算法采用二项树算法,执行步,每步都交换一个进程的所有n字节数据并进行规约计算,算法的时间为:

Reduce_scatter+Gather组合算法

该算法将Reduce-scatter和Gather两个操作组合成Reduce操作,也叫Rabenseifner算法。回顾广播操作的Scatter+Allgather组合算法,成功将二项树算法的的带宽项减小到了的数量级,Reduce操作类似于广播的逆过程,因此也可以采用类似的思想,Reduce-scatter和Gather组合的Reduce算法也可以将二项树算法的的带宽项减小到了的数量级。算法的时间为Reduce-scatter(递归减半算法)和Gather(二项树算法)操作的总和,计算为:

策略:

  • 当消息量小()时,采用二项树算法;
  • 当消息量大()时,采用Reduce-scatter + Gather算法。

小结:看到这里也应该能摸索出一些规律,大消息发送时(n较大)我们要尽量较少带宽项,也就是减少前面的系数,延迟项(也叫启动时间)大一点无所谓;而小消息(n较小)发送时我们要尽量较少延迟项。这也就是Reduce和Broadcast操作选择不同算法时的核心思想。

Ring Algorithm

和下面将要介绍的Ring Allreduce类似,使用Reduce-scatter + Gather的方式,但是Reduce-scatter只发送一部分数据()给目标进程,且Gather阶段使用环算法。

5、Allreduce操作及其算法

Reduce+Broadcast

MPICH中老算法先将结果Reduce到根进程然后再将根进程的结果Broadcast到所有进程中。

Recursive Doubling

Allreduce的递归加倍算法和Allgather的递归加倍算法是非常相似的,只是每一步都伴随规约操作且交换的数据量也不同,每次进程间两两交换的数据量都是n。因此算法执行的时间为:

Reduce_scatter+Allgather

该算法也叫Rabenseifner算法。回顾Reduce操作我们采用了Reduce-scatter + Gather算法,这里我们在第二步将Gather换成了Allgather操作,采用Reduce-scatter + Allgather算法。算法的总开销为:

截至目前,上述Reduce操作的Reduce-scatter + Gather算法和Allgather操作的Reduce-scatter + Allgather算法,当进程的数量不是2的幂次的时候需要额外处理。移除个额外进程来将进程数量减少到最接近的2次幂。前2r个进程(0号到2r-1号)中,所有的偶数进程将输入向量的后半部分发送给右邻居(rank+1),所有的奇数进程将输入向量的前半部分发送给左邻居(rank-1)。随后偶数进程对前半部分向量进行规约操作,奇数进程对后半部分向量进行规约操作。奇数进程将规约结果发送给左邻居进程。该步骤结束后,前2r个进程的偶数编号进程都拥有了和右邻居进程进行规约的结果,而奇数编号进程不会参与算法的后续过程,这样我们就可以把进程的数量减少到2的幂次:最开始的r个偶数进程和最后面的p-2r个进程从编号,是2的幂次。然后这些进程再执行Reduce-scatter + Allgather算法。最后规约的结果还要发送给第一步就已经移除的r个进程,如果是Reduce操作,根进程在第一步中就被剔除掉了,那么在Reduce-scatter操作之后的第一步,该根进程和邻居进程就要互换位置,这样不会增加额外消耗。
下图展示了带有13个进程的Allreduce操作算法的例子。输入向量和规约结果被分成了8个部分(A,B,…,H),因为8是小于且最接近13的2的幂次,用来表示。前2r个(r=13-8=5,2r=10)偶数进程先执行两两Allreduce操作,然后前r个偶数进程(0、2、4、6、8号)和剩余的3(p-2r=3)个进程(10、11、12号)执行Reduce-scatter+Allgather,最后,前r个偶数进程(0、2、4、6、8号)将结果发送给前r个奇数进程(1、3、5、7、9号).

Bionary Block Algorithm

叫做二方块算法。该算法能够降低进程数量非2幂时Reduce-scatter+Allgather算法的负载不均衡问题。以下图为例,在初始阶段对进程划分为若干块,使每一个块内进程子集的数量为2的方幂。每个块内部执行Reduce-scatter操作。然后,从最小的块开始,被划分为若干段做为更高一块的输入,更高的一块对收集过来的数据执行规约操作,如块作为块的输入,两个块进行规约,注意块的4个数据拷贝与块规约,然后块做为块的输入再进行两个块的规约。小的进程块会造成负载不均衡。两个连续块的最大差异,会决定负载不均衡的程度。定义做为两个连续块数量(均为2的幂次)的最大差值,如,则,如果值很小,那么算法的性能会很好。
在算法的第二阶段,是Allgather操作。上一个更大的块必须向小块发送数据,如图。

Ring Algorithm

环算法,其实就是Reduce-scatter+Allgather算法的变形,在Reduce-scatter阶段是各个进程直接将一部分数据发送到其目的节点,并且Allgather操作使用环算法来执行。
借用OpenMPI中的例子来解释Ring Allreduce算法。
假设有5个进程,则进程的输入数据分成5份,先进行Computation stage(也就是Reduce-scatter)然后是Distribution phase(也就是Allgather的环算法)。

该算法的执行时间为:

Allreduce选择最佳算法

在上面介绍的Allreduce的5种算法中根据进程数量和消息大小来选择不同算法,这张图是展示不同进程数量和消息大小对应的最佳算法(对MPI_DOUBLE型的数据进行MPI_SUM求和操作)。havling+doubling就是Reduce-scatter+Allgather算法。

这个是消息大小为32KB时对MPI_DOUBLE型的数据进行MPI_SUM求和操作不同算法的带宽。

策略:

  • 对于短消息,使用Recursive Doubling算法;
  • 对于长消息,先进行Reduce-scatter(Recursive-halving算法),再进行Allgather(Recursive Doubling算法)。

后面的三种方法只是进一步优化,没有在MPICH里集成。

Allreduce算法的评估和对比

常用的经典Allreduce算法的消耗评估模型:

Allreduce Algorithm Cost Model Efficient Bandwidth
Reduce + Broadcast
Recursive Doubling
Reduce-Scatter + Allgather
Binary Block
Ring Algorithm

OpenMPI和MPICH中的Allreduce算法

1、OpenMPI-4.1.2的MPI_Allreduce实现

OpenMPI-4.1.2是最新版本的OpenMPI,算法的具体选择在ompi/mca/coll/tuned/coll_tuned_decision_fixed.c和ompi/mca/coll/tuned/coll_tuned_decision_dynamic.c文件里,用户可以指定规则以及选择使用的算法,并且OpenMPI使用了6种算法,分别是

1
2
3
4
5
6
7
8
Algorithms:
{1, "basic_linear"}: Reduce + Broadcast
{2, "nonoverlapping"}: Reduce +Broadcast
{3, "recursive_doubling"}: Recursive Doubling
{4, "ring"}: Ring(Segmented Messages) + Allgather(Ring)
{5, "segmented_ring"}: Segmented Ring
{6, "rabenseifner"}: Reduce-Scatter + Allgather
/* Currently, ring, segmented ring, and rabenseifner do not support non-commutative operations. */

默认使用/coll_tuned_decision_fixed.c里的规则(固定算法选择规则),具体的选择方法如下(原代码是100多行的else-if,贼暴力):


除了默认的规则之外,用户还可以指定参数来选择对应的算法。
函数选择逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
//动态算法选择规则
int ompi_coll_tuned_allreduce_intra_dec_dynamic()
{
...
//如果指定了filebased rules(暂不知道这是啥);
if (tuned_module->com_rules[ALLREDUCE])
{
...
int algorithm = ompi_coll_tuned_get_target_method_params();
if(algorithm) return ompi_coll_tuned_allreduce_intra_do_this(..., algorithm, ...)
{
...
switch (algorithm) {
case (0): return ompi_coll_tuned_allreduce_intra_dec_fixed();
case (1): return ompi_coll_base_allreduce_intra_basic_linear();
case (2): return ompi_coll_base_allreduce_intra_nonoverlapping();
case (3): return ompi_coll_base_allreduce_intra_recursivedoubling();
case (4): return ompi_coll_base_allreduce_intra_ring();
case (5): return ompi_coll_base_allreduce_intra_ring_segmented();
case (6): return ompi_coll_base_allreduce_intra_redscat_allgather();
}
...
}
...
}
//如果用户指定了算法;
if (tuned_module->user_forced[ALLREDUCE].algorithm)
{
...
return ompi_coll_tuned_allreduce_intra_do_this(..., algorithm, ...);
}
//若用户没指定算法,则使用固定规则
else return ompi_coll_tuned_allreduce_intra_dec_fixed(...)
{
...
int ompi_coll_tuned_allreduce_intra_dec_fixed (...)
{
//100多行的if-else, 根据进程数量和消息量确定algorithm(从1~6选一个值)
return ompi_coll_tuned_allreduce_intra_do_this (..., algorithm, ...);
};
}
...
}

2、MPICH-4.0.2的MPI_Allreduce实现

MPI应用程序在调用MPI_Allreduce时执行的主要算法、主要函数以及选择逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
/*
Algorithm:
Intra Communication:
Recursive Doubling;
Reduce-scatter + Allgather
Nb(Nonblocking Allreduce + Wait)
Smp(Local Reduce + Bcast)
Inter Communication:
Reduce-exchange + Bcast
Nb(Nonblocking Allreduce + Wait)
*/
int MPI_Allreduce(const void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
{
...
MPIR_Allreduce(sendbuf, recvbuf, count, datatype, op, comm_ptr, &errflag)
{
...
MPIR_Allreduce_impl(sendbuf, recvbuf, count, datatype, op, comm_ptr, &errflag)
{
if(/*intra communicator*/)
{
switch(intra_alrogithm)
{
case intra_recursive_doubling:
MPIR_Allreduce_intra_recursive_doubling(...);
break;
case intra_reduce_scatter_allgather:
MPIR_Allreduce_intra_reduce_scatter_allgather(...);
break;
case nb:
MPIR_Allreduce_allcomm_nb(...);
break;
case intra_smp:
MPIR_Allreduce_intra_smp(...);
break;
case auto:
MPIR_Allreduce_allcomm_auto(...)
{
MPII_Csel_container_s *cnt = MPIR_Csel_search(comm_ptr->csel_comm, coll_sig);
switch (cnt->id)
{
case intra_recursive_doubling:
MPIR_Allreduce_intra_recursive_doubling(...);
break;
case intra_reduce_scatter_allgather:
MPIR_Allreduce_intra_reduce_scatter_allgather(...);
break;
case intra_smp:
MPIR_Allreduce_intra_smp(...);
break;
case inter_reduce_exchange_bcast:
MPIR_Allreduce_inter_reduce_exchange_bcast(...);
break;
case nb:
MPIR_Allreduce_allcomm_nb(...)
{
MPIR_Iallreduce(...); /*Nonblocking Allreduce*/
}
break;
}
}
}
}
else /*inter communicator*/
{
switch(inter_algorithm)
{
case inter_reduce_exchange_bcast:
MPIR_Allreduce_inter_reduce_exchange_bcast(...);
break;
case nb:
MPIR_Allreduce_allcomm_nb(...);
break;
case auto:
MPIR_Allreduce_allcomm_auto(...);
break;
}
}
}
...
}
...
}