并行正则采样排序之 MPI 实现

并行正则采样排序之 MPI 实现

算法介绍与原理

在并行计算中,流行一种比快速排序更高效的算法——并行正则采样排序算法。并行正则采样排序 PSRS(Parallel Sorting by Regular Sampling),是一种基于均匀划分(Uniform Partition)原理的负载均衡的并行排序算法。假定待排序的元素有 n 个,系统中有 p 个处理器,算法将

  1. 将 n 个元素均匀地分割成 p 段,每段含有 n/p 个元素
  2. 给每段元素指派一个处理器,每个处理器对各自管辖的这部分元素进行快速排序
  3. 从各段元素中抽取几个代表元素,再从它们产生出 p-1 个主元
  4. 找出这些主元与原来的各局部有序的元素之间的偏序关系,进而将各个局部有序段重新划分成 p 段
  5. 通过全局交换将各个段中的对应部分集合到一起,最后将集合到一起的元素采用多路归并方法进行排序

不得不说,整个算法步骤较多,过程复杂,令人一时难以理解,博主通过图片的形式再描述一遍算法,希望能帮助各位。

具体实现

首先,还是老规矩,先生成随机数组,然后使用单个处理器(这里使用 Proc#0)排序,得到最终答案,用于检验并行程序的正确性。在做好这些初始化工作后,并行正则采样排序的实现正式开始!:muscle:

分段快排

Proc#0 将所有数据分段发送给所有的处理器,每个处理器均匀地接收到部分数据,并调用 qsort() 函数进行快排。然而,均匀地分派给各处理器,就不得不考虑均分后的余数问题。当然,我们可以用 MPI_SendMPI_Recv 函数,外加一个循环,来实现数据分派。但这样做 1)编程复杂,容易出错。2)不易阅读,难以理解。这里我采用了一个比较厉害的函数:MPI_Scatterv

1
2
3
4
int MPI_Scatterv(const void *sendbuf, const int *sendcounts, const int *displs,
MPI_Datatype sendtype, void *recvbuf, int recvcount,
MPI_Datatype recvtype,
int root, MPI_Comm comm)

总的来说,就是将一个处理器中包含的内容,全都散播到各个处理器中。当然,具体到每个处理器应该获得多长的数据,以及获得哪部分的数据,就分别由 sendcountsdispls 这两个数组决定了。

回到本问题,我将除法计算中得到的余数,全部扔给了最后一个处理器,那么 sendcounts 的最后一个元素长度就需要加上余数,其余不变。于是便有如下实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
// 分段数据 移交各个处理器  group = 数组长度n / 处理器数p
int sendcounts[num_procs];
int displs[num_procs];
for(int i = 0; i < num_procs; i++) {
sendcounts[i] = group;
displs[i] = i*group;
}
sendcounts[num_procs-1] = group+mod;
MPI_Scatterv(array, sendcounts, displs, MPI_INT, a, group+mod, MPI_INT, 0, MPI_COMM_WORLD);

group_len = sendcounts[id_procs];
// 均匀划分 局部排序
qsort(a, group_len, sizeof(int), compare);

抽取代表

让每个处理器从已经排好序的数据中,抽取出第$ w, 2w, 3w, … (p-1)w 个代表元素,其中个代表元素,其中 w = n/p^2 $。每个处理器抽取好 p-1 个数据后,全部送到 Proc#0 处理器中,由它来完成对代表元素的排序。

很有意思哈,第一步要求 Proc#0 把数据发送给各个处理器,而第二步是要求 Proc#0 接收来自各处理器的数据。

简单介绍一下,MPI_Scatterv 的反操作 MPI_Gatherv

1
2
3
int MPI_Gatherv(const void *sendbuf, int sendcount, MPI_Datatype sendtype,
void *recvbuf, const int *recvcounts, const int *displs,
MPI_Datatype recvtype, int root, MPI_Comm comm)

其中的参数要求与 MPI_Scatter 相似,毕竟是镜像操作。

有了之前的编程经验,相信收集数据的编程就不难了,不过还是要注意一下,偏移量和长度等细节问题。细心的读者可能发现了,这边接收数据的长度都为常值,因此可以使用一个更简单的函数 MPI_Gather

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 正则采样
int recvcounts[num_procs];
int samples[num_procs * num_procs];
int s[num_procs];
for (i = 1; i < num_procs; i++)
s[i-1] = a[i * group / num_procs];
// 采到样本 收集到Proc#0
for(i = 0; i < num_procs; i++) {
recvcounts[i] = num_procs-1;
displs[i] = i * (num_procs-1);
}
MPI_Gatherv(s, num_procs-1, MPI_INT, samples, recvcounts, displs, MPI_INT, 0, MPI_COMM_WORLD);

// 采样排序
if (id_procs == 0)
qsort(samples, (num_procs-1)*num_procs, sizeof(int), compare);

主元划分

上小节中,Proc#0 处理器对代表元素完成排序后,需要从中抽取第$ p-1,2(p-1),…,(p-1)(p-1) $ 个主元,并将它们全部广播到所有处理器中。然后,每个处理器根据这 p-1 个主元,将自己的数据划分为 p 段。

将有序数组划分为 p 段,还是很简单的吧 :laughing:。只需要遍历一下有序数组,就可以了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 进行主元划分
index = 0;
int pcounts[num_procs];
for(i = 0; i < num_procs; i++)
pcounts[i] = 0;
pivot[num_procs-1] = INT32_MAX;

for(i = 0; i < group_len; i++) {
if(a[i] <= pivot[index])
pcounts[index]++;
else {
i--;
index++;
}
}

全局交换

上小节中,每个处理器都将自己的数据划分为 p 段。为表述方便,记$ W^j_i $ 为 Proc#i 中的第 j 段。将所有处理器的$ W^j $ 都发送给 Proc#j 处理器,然后每个处理器再对接收到的数据使用归并排序。完成任务后,将所有有序数据发回给 Proc#0 ,让它完成答案校验。

让我们再补充一个 MPI 的函数 MPI_Alltoallv :

1
2
3
4
int MPI_Alltoallv(const void *sendbuf, const int *sendcounts,
const int *sdispls, MPI_Datatype sendtype, void *recvbuf,
const int *recvcounts, const int *rdispls, MPI_Datatype recvtype,
MPI_Comm comm)

该函数的功能是,让每个处理器发送一个消息给所有处理器(包括自己),这些消息都在它的发送缓冲中以处理器标识符号的顺序有序地存放。如下图所示,每个处理器都从所有处理器接收到一个消息,并以标号的顺序存放在缓冲区中。

回到本次实现,不难发现,对$ W^j_i $ 的操作,就是全局交换,与 MPI_Alltoall 函数所描述的功能完全一致。那么紧接着的一个问题是,如何编码呢?

首先,明确一个坏消息:$ W^j_i $ 的长度都是不定的,且只有本地的处理器知道它的长度。这意味着,在直接使用 MPI_Alltoallv 函数来完成全局交换之前,所有处理器还需要全局交换每个数据段的长度:scream: 。否则,我无法赋值 MPI_Alltoallv 函数中的各个数组值。好在,这部分代码很简单,就当做正式作战之前的一次热身吧。

1
2
3
// 发送各个段的长度
int rcounts[num_procs];
MPI_Alltoall(pcounts, 1, MPI_INT, rcounts, 1, MPI_INT, MPI_COMM_WORLD);

现在,每个处理器都已经知道了自己要发多长的消息,也知道了自己要收多长的消息。还需要知道,这些消息在哪,也就是它们的偏移量是什么。其实也简单,毕竟一个紧挨着一个,只要根据长度累加起来就可以。

1
2
3
4
5
6
7
8
9
10
11
// 计算位移
int rdispls[num_procs];
sdispls[0] = 0; rdispls[0] = 0;
for(i = 1;i < num_procs;i++) {
sdispls[i] = sdispls[i-1] + pcounts[i-1];
rdispls[i] = rdispls[i-1] + rcounts[i-1];
}
// 计算总长度
int totalcounts = 0;
for(i = 0; i < num_procs; i++)
totalcounts += rcounts[i];

然后,就是创造一个接收缓冲区,并调用函数,归并排序!

1
2
3
4
5
6
int *b = (int *)malloc(totalcounts*sizeof(int));
// 每个处理器发送数据给其他所有处理器,且每个处理发送的数据长度都不同
// 故有长度数组和位移数组
MPI_Alltoallv(a, pcounts, sdispls, MPI_INT, b, rcounts, rdispls, MPI_INT, MPI_COMM_WORLD);
// 归并排序
merge_sort(b, 0, totalcounts-1);

收集数组

现在,排序任务已经完成,只剩接收工作了。

回想一下,MPI 是如何收集数据的?使用 MPI_Gather 函数!注意到,Proc#0 不清楚各处理器中到底有多少有序元素。所以说,还是和上一小节一样,需要先收集各处理器中数组的长度。正所谓,兵马未动粮草先行。

1
MPI_Gather(&totalcounts, 1, MPI_INT, rcounts, 1, MPI_INT, 0, MPI_COMM_WORLD);

然后,根据得到的长度,计算偏移量,再使用 MPI_Gatherv 函数(这里为方便,复用了一些之前定义的数组)

1
2
3
4
rdispls[0] = 0;
for(i = 1; i < num_procs; i++)
rdispls[i] = rdispls[i-1] + rcounts[i-1];
MPI_Gatherv(b, totalcounts, MPI_INT, result, rcounts, rdispls, MPI_INT, 0, MPI_COMM_WORLD);

于是,现在所有的有序数组都存放在 result 数组中了 :smile:。

复杂度分析

完成了 PSRS 排序的实现后,我简单地讨论一下 PSRS 算法的复杂度。在第一部分的快速排序中,时间复杂度为$ O(k \log k),k=n/p 。然后,各处理器选择p1个代表元素,代价为。然后,各处理器选择 p-1 个代表元素,代价为 O(p) 。再由 `Proc#0` 对所有送来的代表元素进行排序,然后选出主元,这里若使用快速排序,代价为 O(p^2 \log p^2) ,而若使用归并排序,则所需代价为,而若使用归并排序,则所需代价为 O(p^2) 。每个处理器接收到主元后,再对有序数组进行划分,代价为。每个处理器接收到主元后,再对有序数组进行划分,代价为 O(k+p) ,最后,各个处理器全局交换,并行归并排序,代价为,最后,各个处理器全局交换,并行归并排序,代价为 O(k+\log p) $。考虑到实际应用中,需要排序的数据长度 n 一定远远多于现有的处理器 p,此时可以视 p 为一个小常数,那么 PSRS 排序算法的时间复杂度,就可以简化为 $ O(k\log k+k) $。

从消息复杂度的角度看,Proc#0 收集代表元素,并播送主元的复杂度为 $ O(p^2+p) $。在全局交换部分的消息复杂度与具体算法实现相关,但其最大值不会超过 O(n)O(n)

总结

PSRS 排序过程复杂,编写 MPI 容易出错。在实现时,建议尽量地借助群集通信的相关函数,可以有效地减少代码量,减少错误,增强代码的可读性。同时,借助这次对 PSRS 排序的实现,提高对这些群集通信函数的熟练度,因此作为一个练手项目,是一个不错的选择。

博主本人的实现见PSRS排序


并行正则采样排序之 MPI 实现
https://dingfen.github.io/2021/01/23/2021-1-23-psrs_sort/
作者
Bill Ding
发布于
2021年1月23日
更新于
2024年4月12日
许可协议