MPI
基本概念
MPI(Message Passin Interface 消息传递接口)是一种消息传递编程模型,最终目的是服务于进程间通信。
MPI是一种标准或者规范的代表,不特指某一个对它具体的实现
MPI是一个库,不是一门语言
一般实现
层次 | 说明 |
MPI | 通过ADI层提供的服务和平台无关的算法、数据结构实现MPI的标准接口 |
ADI | 通过底层通信库提供的API实现,把实现后的接口封装成一类抽象设备,上一层基于不同的硬件通信平台,选择不同的抽象设备 |
底层通道API | 通常由操作系统或者特定网络产品生产商提供 |
操作分类
MPI操作是由MPI库为建立和启用数据传输和/或同步而执行的一系列步骤
它包括四个阶段:初始化、开始、完成和释放
内部参数名称和概念
- 序号:即进程的标识,是用来在一个进程组或者一个通信器中标识一个进程。是唯一的
- 通信域:它描述了一组可以相互通信的进程以及他们之间的连接关系等信息。MPI所有通信必须在某个通信器中进行。
- 消息:MPI程序中在进程间传递的数据。它由通信器、原地址、目的地之、消息标签和数据构成
- 通信:通信是指在进程之间进行消息的收发、同步等操作
- 缓冲区:在用户应用程序中定义的用于保存发送和接收数据的地址空间
基本语句
- MPI_Init(int **argc, char ***argv)
完成MPI程序初始化工作,通过获取main函数的参数,让每一个MPI进程都能获取到main函数
- MPI_Comm_rank(MPI_comm comm, int *rank)
用于获取调用进程在给定进程通信域中的进程标识号
- MPI_Comm_size(MPI_comm comm, int *size)
调用返回给定的通信域中所包含的进程总数
- MPI_Finalize(void)
清除全部MPI环境
windows下使用Microsoft MPI 示例
#include <iostream> #include <cstdio> #include <cstdlib> #include <mpi.h> #define N 10 int main(int argc, char **argv) { int size, rank; int *send; int *recv; int i = 0; MPI_Init(&argc, &argv); MPI_Comm_size(MPI_COMM_WORLD, &size); MPI_Comm_rank(MPI_COMM_WORLD, &rank); send = new int[size * N]; recv = new int[N]; if(rank == 0) { for(i = 0; i < size * N; i++) { send[i] = i; } } MPI_Scatter(send, N, MPI_INT, recv, N, MPI_INT, 0, MPI_COMM_WORLD); std::printf("-------------------------rank = %d\n", rank); std::printf("-------------------------size = %d\n", size); for(int i = 0; i < N; i++) { std::printf("recv buffer[%d] = %d \n", i, recv[i]); } std::printf("---------------------------------------\n"); delete[] send; delete[] recv; MPI_Finalize(); return 0; }
运行
"C:\Program Files\Microsoft MPI\Bin\mpiexec.exe" -n 4 MPITest.exe
输出
-------------------------rank = 2 -------------------------size = 4 recv buffer[0] = 20 recv buffer[1] = 21 recv buffer[2] = 22 recv buffer[3] = 23 recv buffer[4] = 24 recv buffer[5] = 25 recv buffer[6] = 26 recv buffer[7] = 27 recv buffer[8] = 28 recv buffer[9] = 29 --------------------------------------- -------------------------rank = 3 -------------------------size = 4 recv buffer[0] = 30 recv buffer[1] = 31 recv buffer[2] = 32 recv buffer[3] = 33 recv buffer[4] = 34 recv buffer[5] = 35 recv buffer[6] = 36 recv buffer[7] = 37 recv buffer[8] = 38 recv buffer[9] = 39 --------------------------------------- -------------------------rank = 0 -------------------------size = 4 recv buffer[0] = 0 recv buffer[1] = 1 recv buffer[2] = 2 recv buffer[3] = 3 recv buffer[4] = 4 recv buffer[5] = 5 recv buffer[6] = 6 recv buffer[7] = 7 recv buffer[8] = 8 recv buffer[9] = 9 --------------------------------------- -------------------------rank = 1 -------------------------size = 4 recv buffer[0] = 10 recv buffer[1] = 11 recv buffer[2] = 12 recv buffer[3] = 13 recv buffer[4] = 14 recv buffer[5] = 15 recv buffer[6] = 16 recv buffer[7] = 17 recv buffer[8] = 18 recv buffer[9] = 19 ---------------------------------------
点对点通信
MPI通信的模式:
发送操作:
- MPI_SEND 标准通信
- MPI_BSEND 缓存通信
- MPI_SSEND 同步通信
- MPI_RSEND 就绪通信
接收操作:
- MPI_RECV
标准通信
MPI_Send(void *buf, int count, MPI_Datatype datatype, int dest, int tag, MPI_Comm comm); MPI_Recv(void *buf, int count, MPI_Datatype datatype, int source, int tag, MPI_Comm comm, status)
其中: buf 数据地址 count 数据个数 datatype 数据类型 source 原进程号 dest 目的进程号 tag进程标识 comm 通信域 status 状态
任意源和任意标识: MPI_ANY_SOURCE(标识任何继承发送的消息都可以接收) MPI_ANY_TAG(标识任何tag都可以接收)
- MPI_ABORT(comm, errorcode) comm 退出进程所在的通信域 errorcode 返回到所嵌环境的错误码
#include <iostream> #include <cstdio> #include <cstdlib> #include <mpi.h> #define N 10 int main(int argc, char **argv) { int size, rank; int flag, rval, i; int buffer_1, recv_1; int buffer_2, recv_2; MPI_Status status, status1, status2, status3, status4; int src = 0; int dest = 1; MPI_Init(&argc, &argv); MPI_Comm_size(MPI_COMM_WORLD, &size); MPI_Comm_rank(MPI_COMM_WORLD, &rank); if(size != 2) { printf("**这个程序使用了不是两个进程 %d**\n", size); MPI_Abort(MPI_COMM_WORLD, 1); } if(rank == src) { buffer_1 = 200; buffer_2 = 20000; printf("standard MPI_Send\n"); MPI_Send(&buffer_1, 1, MPI_INT, dest, 1, MPI_COMM_WORLD); MPI_Send(&buffer_2, 1, MPI_INT, dest, 2, MPI_COMM_WORLD); printf("MPI_Send %d data, tag = 1\n", buffer_1); printf("MPI_Send %d data, tag = 2\n", buffer_2); } else if(rank == dest) { MPI_Recv(&recv_1, 1, MPI_INT, src, 1, MPI_COMM_WORLD, &status3); MPI_Recv(&recv_2, 1, MPI_INT, src, 2, MPI_COMM_WORLD, &status4); printf("MPI_Recv %d data, tag = 1\n", recv_1); printf("MPI_Recv %d data, tag = 2\n", recv_2); } MPI_Finalize(); return 0; }
输出
standard MPI_Send MPI_Send 200 data, tag = 1 MPI_Send 20000 data, tag = 2 MPI_Recv 200 data, tag = 1 MPI_Recv 20000 data, tag = 2
缓存通信
并行程序员对标准通信模式不满意,希望能够对通信缓冲区进行直接控制。
并行程序员需要对通信缓冲区进行申请、使用和释放,通信缓冲区的合理与正确使用需要设计人员自己保证。
#include <iostream> #include <cstdio> #include <cstdlib> #include <mpi.h> #define N 10 int main(int argc, char **argv) { int rank; double recv; double *tmpbuffer; int size = 1; int bsize; double data = 100.00; MPI_Status status; int src = 0; int dest = 1; MPI_Init(&argc, &argv); // MPI_Comm_size(MPI_COMM_WORLD, &size); MPI_Comm_rank(MPI_COMM_WORLD, &rank); if(rank == 0) { printf("MPI_BSend\n"); MPI_Pack_size(size, MPI_DOUBLE, MPI_COMM_WORLD, &bsize); tmpbuffer = (double *) malloc(bsize + MPI_BSEND_OVERHEAD); MPI_Buffer_attach(tmpbuffer, bsize + MPI_BSEND_OVERHEAD); printf("BSend data\n"); MPI_Bsend(&data, 1, MPI_DOUBLE, 1, 2000, MPI_COMM_WORLD); MPI_Buffer_detach(&tmpbuffer, &bsize); } else if(rank == 1) { MPI_Recv(&recv, 1, MPI_DOUBLE, 0, 2000, MPI_COMM_WORLD, &status); printf("MPI_Recv %f data\n", recv); } MPI_Finalize(); return 0; }
输出
MPI_Recv 100.000000 data MPI_BSend BSend data
同步通信
#include <iostream> #include <cstdio> #include <cstdlib> #include <mpi.h> #define N 10 int main(int argc, char **argv) { int size, rank; int act_size = 0; int flag, rval, i; int buffer, recv; int sbuffer, srecv; int ssbuffer, ssrecv; int buffer1, recv1; MPI_Status status, status1, status2, status3, status4; int count1, count2; int src = 0; int dest = 1; MPI_Init(&argc, &argv); MPI_Comm_size(MPI_COMM_WORLD, &size); MPI_Comm_rank(MPI_COMM_WORLD, &rank); if(size != 2) { MPI_Abort(MPI_COMM_WORLD, 1); } if(rank == src) { buffer = 100; buffer1 = 10000; ssbuffer = 20000; sbuffer = 200; MPI_Send(&ssbuffer, 1, MPI_INT, dest, 3, MPI_COMM_WORLD); MPI_Send(&sbuffer, 1, MPI_INT, dest, 4, MPI_COMM_WORLD); MPI_Ssend(&buffer, 1, MPI_INT, dest, 1, MPI_COMM_WORLD); printf("MPI_Ssend %d data tag = 1\n", buffer); MPI_Ssend(&buffer1, 1, MPI_INT, dest, 2, MPI_COMM_WORLD); printf("MPI_Ssend %d data tag = 2\n", buffer1); } else if(rank == dest) { MPI_Recv(&srecv, 1, MPI_INT, src, 3, MPI_COMM_WORLD, &status3); MPI_Recv(&ssrecv, 1, MPI_INT, src, 4, MPI_COMM_WORLD, &status4); MPI_Recv(&recv, 1, MPI_INT, src, 1, MPI_COMM_WORLD, &status1); printf("MPI_Recv %d data tag = 1\n", recv); MPI_Recv(&recv1, 1, MPI_INT, src, 2, MPI_COMM_WORLD, &status2); printf("MPI_Recv %d data tag = 2\n", recv1); } MPI_Finalize(); return 0; }
输出
MPI_Ssend 100 data tag = 1 MPI_Ssend 10000 data tag = 2 MPI_Recv 100 data tag = 1 MPI_Recv 10000 data tag = 2
就绪通信
就绪通信的特殊之处就在于它要求接收操作先于发送操作而被启动。因此,在一个正确的程序中,一个就绪发送能被一个标准发送替代,它对程序的语义没有影响,而对程序的性能有影响。
#include <iostream> #include <cstdio> #include <cstdlib> #include <mpi.h> #define N 10 int main(int argc, char **argv) { int size, rank; int next, prev; int tag, count; double buffer, recv; MPI_Status status; MPI_Request request; MPI_Init(&argc, &argv); MPI_Comm_size(MPI_COMM_WORLD, &size); MPI_Comm_rank(MPI_COMM_WORLD, &rank); tag = 1; next = rank + 1; if(next >= size) next = 0; if(size != 2) { MPI_Abort(MPI_COMM_WORLD, 1); } if(rank == 0) { printf("Rsend Tets\n"); buffer = 6666.0f; MPI_Recv(MPI_BOTTOM, 0, MPI_INT, next, tag, MPI_COMM_WORLD, &status); printf("process %d post ready", rank); MPI_Rsend(&buffer, 1, MPI_DOUBLE, next, tag, MPI_COMM_WORLD); } else { printf("process %d recive call", rank); MPI_Irecv(&recv, 1, MPI_DOUBLE, MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &request); MPI_Send(MPI_BOTTOM, 0, MPI_INT, next, tag, MPI_COMM_WORLD); MPI_Wait(&request, &status); printf("ready MPI_recv = %f\n", recv); printf("process %d receive rsend message form &d\n", rank, status.MPI_SOURCE); } MPI_Finalize(); return 0; }
输出
Rsend Tets process 0 post ready process 1 recive callready MPI_recv = 6666.000000 process 1 receive rsend message form &d
MPI组通信
点对点通信是一个发送方和一个接收方2个进程
组通信则是一对多,多对一,多对多的进程,进程数量不确定
组通信一般实现了三个功能:
- 通信:通信功能主要完成组内数据的传输 - 广播,收集,散发,组收集,全互换
- 同步:同步功能实现组内所有进程在特定的地点在执行进度上取得一致,同步功能是许多应用中必须提供的功能,组同喜还提供专门的调用以完成各个进程之间的同步,从而协调各个进程的进度和步伐
- 计算:计算功能稍微复杂一点,要对给定的数据完成一定的操作
广播 - 一对多
广播函数接口:MPI_Bcast(buffer, count, datatype, root, comm)
收集
MPI_Gather(sendbuf, sendcount, sendtype, recvbuf, recvcount, recvtype, root, comm)
MPI_GatherV(sendbuf, sendcount, sendtype, recvbuf, recvcounts, displs, recvtype, root, comm)
散发
MPI_Scatter(sendbuf, sendcount, sendtype, recvbuf, recvcount, recvtype, root, comm)
MPI_SCATTERV(sendbuf, sendcounts, displs, sendtype, recvbuf, recvcounts, recvtype, root, comm)
组收集
MPI_ALLGATHER(sendbuf, sendcount, sendtype, recvbuf, recvcount, recvtype, comm)
归约(reduce)
MPI_REDUCE 将组内每个进程输入缓冲区中的数据按给定的操作op进行运算,并将其结果返回到序列号为root的进程的输出缓冲区中。
MPI中已经定义好了一些操作,他们是为函数MPI_REDUCE和一些其他的相关函数,如MPI_ALLREDUCE, MPI_REDUCE_SCATTER和MPI_SCAN而定义的,这些操作用来设定相应的op
名字 | 含义 |
MPI_MAX | 最大值 |
MPI_MIN | 最小值 |
MPI_SUM | 求和 |
MPI_PROD | 求积 |
MPI_LAND | 逻辑与 |
MPI_BAND | 按位与 |
MPI_LOR | 逻辑或 |
MPI_BOR | 按位或 |
MPI_LXOR | 逻辑异或 |
MPI_BXOR | 按位异或 |
MPI_MAXLOC | 最大值且相应位置 |
MPI_MINLOC | 最小值且相应位置 |
归约并散发
MPI_REDUCE_SCATTER 操作可以认为是MPI对每个归约操作的变形,它将结果分散到主内的所有进程中取,而不是仅仅归约到root进程中。
求PI
#include <iostream> #include <cstdio> #include <cstdlib> #include <mpi.h> #define N 10 double f(double x) { return (4.0 / (1.0 + x*x)); } int main(int argc, char **argv) { int done = 0, n, myid, numprocs, i; double PI25DT = 3.141592653589793238462643; double mypi, pi, h, sum, x; double startwtime = 0.0, endwtime; int namelen; MPI_Init(&argc, &argv); MPI_Comm_size(MPI_COMM_WORLD, &numprocs); MPI_Comm_rank(MPI_COMM_WORLD, &myid); if(myid == 0) { n = 100; } startwtime = MPI_Wtime(); MPI_Bcast(&n, 1, MPI_INT, 0, MPI_COMM_WORLD); h = 1.0/(double)n; sum = 0.0; for(i = myid + 1; i <= n; i+= numprocs) { x = h * ((double)i - 0.5); sum += f(x); } mypi = h * sum; MPI_Reduce(&mypi, &pi, 1, MPI_DOUBLE, MPI_SUM, 0, MPI_COMM_WORLD); if(myid == 0) { printf("pi is approximately %.16f, Error is %.16f\n", pi, fabs(pi - PI25DT)); endwtime = MPI_Wtime(); printf("time = %f", endwtime - startwtime); fflush(stdout); } MPI_Finalize(); return 0; }