MPI并行计算的基本介绍和使用

简介: MPI并行计算的基本介绍和使用

MPI


基本概念


MPI(Message Passin Interface 消息传递接口)是一种消息传递编程模型,最终目的是服务于进程间通信

MPI是一种标准或者规范的代表,不特指某一个对它具体的实现

MPI是一个库,不是一门语言


一般实现

层次 说明
MPI 通过ADI层提供的服务和平台无关的算法、数据结构实现MPI的标准接口
ADI 通过底层通信库提供的API实现,把实现后的接口封装成一类抽象设备,上一层基于不同的硬件通信平台,选择不同的抽象设备
底层通道API 通常由操作系统或者特定网络产品生产商提供


操作分类


MPI操作是由MPI库为建立和启用数据传输和/或同步而执行的一系列步骤

它包括四个阶段:初始化、开始、完成和释放

内部参数名称和概念


  • 序号:即进程的标识,是用来在一个进程组或者一个通信器中标识一个进程。是唯一的
  • 通信域:它描述了一组可以相互通信的进程以及他们之间的连接关系等信息。MPI所有通信必须在某个通信器中进行。
  • 消息:MPI程序中在进程间传递的数据。它由通信器、原地址、目的地之、消息标签和数据构成
  • 通信:通信是指在进程之间进行消息的收发、同步等操作
  • 缓冲区:在用户应用程序中定义的用于保存发送和接收数据的地址空间


基本语句


  • MPI_Init(int **argc, char ***argv)

完成MPI程序初始化工作,通过获取main函数的参数,让每一个MPI进程都能获取到main函数


  • MPI_Comm_rank(MPI_comm comm, int *rank)

用于获取调用进程在给定进程通信域中的进程标识号


  • MPI_Comm_size(MPI_comm comm, int *size)

调用返回给定的通信域中所包含的进程总数


  • MPI_Finalize(void)

清除全部MPI环境


windows下使用Microsoft MPI 示例

#include <iostream>
#include <cstdio>
#include <cstdlib>
#include <mpi.h>

#define N 10

int main(int argc, char **argv)
{
    int size, rank;
    int *send;
    int *recv;
    int i = 0;

    MPI_Init(&argc, &argv);
    MPI_Comm_size(MPI_COMM_WORLD, &size);
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);

    send = new int[size * N];
    recv = new int[N];

    if(rank == 0)
    {
        for(i = 0; i < size * N; i++)
        {
            send[i] = i;
        }
    }

    MPI_Scatter(send, N, MPI_INT, recv, N, MPI_INT, 0, MPI_COMM_WORLD);

    std::printf("-------------------------rank = %d\n", rank);
    std::printf("-------------------------size = %d\n", size);
    for(int i = 0; i < N; i++)
    {
        std::printf("recv buffer[%d] = %d \n", i, recv[i]);
    }
    std::printf("---------------------------------------\n");

    delete[] send;
    delete[] recv;

    MPI_Finalize();
    return 0;
}


运行

"C:\Program Files\Microsoft MPI\Bin\mpiexec.exe" -n 4 MPITest.exe


输出

-------------------------rank = 2
-------------------------size = 4
recv buffer[0] = 20 
recv buffer[1] = 21 
recv buffer[2] = 22 
recv buffer[3] = 23 
recv buffer[4] = 24 
recv buffer[5] = 25 
recv buffer[6] = 26 
recv buffer[7] = 27 
recv buffer[8] = 28 
recv buffer[9] = 29 
---------------------------------------
-------------------------rank = 3
-------------------------size = 4
recv buffer[0] = 30 
recv buffer[1] = 31 
recv buffer[2] = 32 
recv buffer[3] = 33 
recv buffer[4] = 34 
recv buffer[5] = 35 
recv buffer[6] = 36 
recv buffer[7] = 37 
recv buffer[8] = 38 
recv buffer[9] = 39 
---------------------------------------
-------------------------rank = 0
-------------------------size = 4
recv buffer[0] = 0 
recv buffer[1] = 1 
recv buffer[2] = 2 
recv buffer[3] = 3 
recv buffer[4] = 4 
recv buffer[5] = 5 
recv buffer[6] = 6 
recv buffer[7] = 7 
recv buffer[8] = 8 
recv buffer[9] = 9 
---------------------------------------
-------------------------rank = 1
-------------------------size = 4
recv buffer[0] = 10 
recv buffer[1] = 11 
recv buffer[2] = 12 
recv buffer[3] = 13 
recv buffer[4] = 14 
recv buffer[5] = 15 
recv buffer[6] = 16 
recv buffer[7] = 17 
recv buffer[8] = 18 
recv buffer[9] = 19 
---------------------------------------


点对点通信


MPI通信的模式:


发送操作:

  • MPI_SEND 标准通信
  • MPI_BSEND 缓存通信
  • MPI_SSEND 同步通信
  • MPI_RSEND 就绪通信


接收操作:

  • MPI_RECV


标准通信

MPI_Send(void *buf, int count, MPI_Datatype datatype, int dest, int tag, MPI_Comm comm);
MPI_Recv(void *buf, int count, MPI_Datatype datatype, int source, int tag, MPI_Comm comm, status)


其中: buf 数据地址 count 数据个数 datatype 数据类型 source 原进程号 dest 目的进程号 tag进程标识 comm 通信域 status 状态


任意源和任意标识: MPI_ANY_SOURCE(标识任何继承发送的消息都可以接收) MPI_ANY_TAG(标识任何tag都可以接收)


  • MPI_ABORT(comm, errorcode) comm 退出进程所在的通信域 errorcode 返回到所嵌环境的错误码
#include <iostream>
#include <cstdio>
#include <cstdlib>
#include <mpi.h>

#define N 10

int main(int argc, char **argv)
{
    int size, rank;
    int flag, rval, i;
    int buffer_1, recv_1;
    int buffer_2, recv_2;
    MPI_Status status, status1, status2, status3, status4;
    int src = 0;
    int dest = 1;
    MPI_Init(&argc, &argv);
    MPI_Comm_size(MPI_COMM_WORLD, &size);
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);

    if(size != 2)
    {
        printf("**这个程序使用了不是两个进程 %d**\n", size);
        MPI_Abort(MPI_COMM_WORLD, 1);
    }
    if(rank == src)
    {
        buffer_1 = 200;
        buffer_2 = 20000;
        printf("standard MPI_Send\n");
        MPI_Send(&buffer_1, 1, MPI_INT, dest, 1, MPI_COMM_WORLD);
        MPI_Send(&buffer_2, 1, MPI_INT, dest, 2, MPI_COMM_WORLD);
        printf("MPI_Send %d data, tag = 1\n", buffer_1);
        printf("MPI_Send %d data, tag = 2\n", buffer_2);
    }
    else if(rank == dest)
    {
        MPI_Recv(&recv_1, 1, MPI_INT, src, 1, MPI_COMM_WORLD, &status3);
        MPI_Recv(&recv_2, 1, MPI_INT, src, 2, MPI_COMM_WORLD, &status4);
        printf("MPI_Recv %d data, tag = 1\n", recv_1);
        printf("MPI_Recv %d data, tag = 2\n", recv_2);
    }

    MPI_Finalize();
    return 0;
}

输出

standard MPI_Send
MPI_Send 200 data, tag = 1
MPI_Send 20000 data, tag = 2
MPI_Recv 200 data, tag = 1
MPI_Recv 20000 data, tag = 2


缓存通信


并行程序员对标准通信模式不满意,希望能够对通信缓冲区进行直接控制。


并行程序员需要对通信缓冲区进行申请、使用和释放,通信缓冲区的合理与正确使用需要设计人员自己保证。

#include <iostream>
#include <cstdio>
#include <cstdlib>
#include <mpi.h>

#define N 10

int main(int argc, char **argv)
{
    int rank;

    double recv;
    double *tmpbuffer;
    int size = 1;
    int bsize;
    double data = 100.00;

    MPI_Status status;
    int src = 0;
    int dest = 1;
    MPI_Init(&argc, &argv);
//    MPI_Comm_size(MPI_COMM_WORLD, &size);
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);

    if(rank == 0)
    {
        printf("MPI_BSend\n");
        MPI_Pack_size(size, MPI_DOUBLE, MPI_COMM_WORLD, &bsize);
        tmpbuffer = (double *) malloc(bsize + MPI_BSEND_OVERHEAD);
        MPI_Buffer_attach(tmpbuffer, bsize + MPI_BSEND_OVERHEAD);
        printf("BSend data\n");
        MPI_Bsend(&data, 1, MPI_DOUBLE, 1, 2000, MPI_COMM_WORLD);
        MPI_Buffer_detach(&tmpbuffer, &bsize);
    }
    else if(rank == 1)
    {
        MPI_Recv(&recv, 1, MPI_DOUBLE, 0, 2000, MPI_COMM_WORLD, &status);
        printf("MPI_Recv %f data\n", recv);
    }

    MPI_Finalize();
    return 0;
}


输出

MPI_Recv 100.000000 data
MPI_BSend
BSend data


同步通信

#include <iostream>
#include <cstdio>
#include <cstdlib>
#include <mpi.h>

#define N 10

int main(int argc, char **argv)
{
    int size, rank;
    int act_size = 0;
    int flag, rval, i;
    int buffer, recv;
    int sbuffer, srecv;
    int ssbuffer, ssrecv;
    int buffer1, recv1;
    MPI_Status status, status1, status2, status3, status4;
    int count1, count2;
    int src = 0;
    int dest = 1;
    MPI_Init(&argc, &argv);
    MPI_Comm_size(MPI_COMM_WORLD, &size);
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    if(size != 2)
    {
        MPI_Abort(MPI_COMM_WORLD, 1);
    }

    if(rank == src)
    {
        buffer = 100;
        buffer1 = 10000;
        ssbuffer = 20000;
        sbuffer = 200;
        MPI_Send(&ssbuffer, 1, MPI_INT, dest, 3, MPI_COMM_WORLD);
        MPI_Send(&sbuffer, 1, MPI_INT, dest, 4, MPI_COMM_WORLD);
        MPI_Ssend(&buffer, 1, MPI_INT, dest, 1, MPI_COMM_WORLD);
        printf("MPI_Ssend %d data tag = 1\n", buffer);
        MPI_Ssend(&buffer1, 1, MPI_INT, dest, 2, MPI_COMM_WORLD);
        printf("MPI_Ssend %d data tag = 2\n", buffer1);
    }
    else if(rank == dest)
    {
        MPI_Recv(&srecv, 1, MPI_INT, src, 3, MPI_COMM_WORLD, &status3);
        MPI_Recv(&ssrecv, 1, MPI_INT, src, 4, MPI_COMM_WORLD, &status4);

        MPI_Recv(&recv, 1, MPI_INT, src, 1, MPI_COMM_WORLD, &status1);
        printf("MPI_Recv %d data tag = 1\n", recv);
        MPI_Recv(&recv1, 1, MPI_INT, src, 2, MPI_COMM_WORLD, &status2);
        printf("MPI_Recv %d data tag = 2\n", recv1);
    }


    MPI_Finalize();
    return 0;
}


输出

MPI_Ssend 100 data tag = 1
MPI_Ssend 10000 data tag = 2
MPI_Recv 100 data tag = 1
MPI_Recv 10000 data tag = 2


就绪通信


就绪通信的特殊之处就在于它要求接收操作先于发送操作而被启动。因此,在一个正确的程序中,一个就绪发送能被一个标准发送替代,它对程序的语义没有影响,而对程序的性能有影响。

#include <iostream>
#include <cstdio>
#include <cstdlib>
#include <mpi.h>

#define N 10

int main(int argc, char **argv)
{
    int size, rank;
    int next, prev;
    int tag, count;
    double buffer, recv;
    MPI_Status status;
    MPI_Request  request;

    MPI_Init(&argc, &argv);
    MPI_Comm_size(MPI_COMM_WORLD, &size);
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    tag = 1;
    next = rank + 1;
    if(next >= size)  next = 0;
    if(size != 2)
    {
        MPI_Abort(MPI_COMM_WORLD, 1);
    }
    
    if(rank == 0)
    {
        printf("Rsend Tets\n");
        buffer = 6666.0f;
        MPI_Recv(MPI_BOTTOM, 0, MPI_INT, next, tag, MPI_COMM_WORLD, &status);
        printf("process %d post ready", rank);
        MPI_Rsend(&buffer, 1, MPI_DOUBLE, next, tag, MPI_COMM_WORLD);
    }
    else
    {
        printf("process %d recive call", rank);
        MPI_Irecv(&recv, 1, MPI_DOUBLE, MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &request);
        MPI_Send(MPI_BOTTOM, 0, MPI_INT, next, tag, MPI_COMM_WORLD);
        MPI_Wait(&request, &status);
        printf("ready MPI_recv = %f\n", recv);
        printf("process %d receive rsend message form &d\n", rank, status.MPI_SOURCE);
    }

    MPI_Finalize();
    return 0;
}


输出

Rsend Tets
process 0 post ready
process 1 recive callready MPI_recv = 6666.000000
process 1 receive rsend message form &d


MPI组通信


点对点通信是一个发送方和一个接收方2个进程

组通信则是一对多,多对一,多对多的进程,进程数量不确定

组通信一般实现了三个功能:


  • 通信:通信功能主要完成组内数据的传输 - 广播,收集,散发,组收集,全互换
  • 同步:同步功能实现组内所有进程在特定的地点在执行进度上取得一致,同步功能是许多应用中必须提供的功能,组同喜还提供专门的调用以完成各个进程之间的同步,从而协调各个进程的进度和步伐
  • 计算:计算功能稍微复杂一点,要对给定的数据完成一定的操作


广播 - 一对多


广播函数接口:MPI_Bcast(buffer, count, datatype, root, comm)


收集


MPI_Gather(sendbuf, sendcount, sendtype, recvbuf, recvcount, recvtype, root, comm)


MPI_GatherV(sendbuf, sendcount, sendtype, recvbuf, recvcounts, displs, recvtype, root, comm)


散发


MPI_Scatter(sendbuf, sendcount, sendtype, recvbuf, recvcount, recvtype, root, comm)


MPI_SCATTERV(sendbuf, sendcounts, displs, sendtype, recvbuf, recvcounts, recvtype, root, comm)


组收集


MPI_ALLGATHER(sendbuf, sendcount, sendtype, recvbuf, recvcount, recvtype, comm)


归约(reduce)


MPI_REDUCE 将组内每个进程输入缓冲区中的数据按给定的操作op进行运算,并将其结果返回到序列号为root的进程的输出缓冲区中。


MPI中已经定义好了一些操作,他们是为函数MPI_REDUCE和一些其他的相关函数,如MPI_ALLREDUCE, MPI_REDUCE_SCATTER和MPI_SCAN而定义的,这些操作用来设定相应的op


名字 含义
MPI_MAX 最大值
MPI_MIN 最小值
MPI_SUM 求和
MPI_PROD 求积
MPI_LAND 逻辑与
MPI_BAND 按位与
MPI_LOR 逻辑或
MPI_BOR 按位或
MPI_LXOR 逻辑异或
MPI_BXOR 按位异或
MPI_MAXLOC 最大值且相应位置
MPI_MINLOC 最小值且相应位置


归约并散发


MPI_REDUCE_SCATTER 操作可以认为是MPI对每个归约操作的变形,它将结果分散到主内的所有进程中取,而不是仅仅归约到root进程中。


求PI

#include <iostream>
#include <cstdio>
#include <cstdlib>
#include <mpi.h>

#define N 10

double f(double x)
{
    return (4.0 / (1.0 + x*x));
}

int main(int argc, char **argv)
{
    int done = 0, n, myid, numprocs, i;
    double PI25DT = 3.141592653589793238462643;
    double mypi, pi, h, sum, x;
    double startwtime = 0.0, endwtime;
    int namelen;

    MPI_Init(&argc, &argv);
    MPI_Comm_size(MPI_COMM_WORLD, &numprocs);
    MPI_Comm_rank(MPI_COMM_WORLD, &myid);

    if(myid == 0)
    {
        n = 100;
    }
    startwtime = MPI_Wtime();
    MPI_Bcast(&n, 1, MPI_INT, 0, MPI_COMM_WORLD);
    h = 1.0/(double)n;
    sum = 0.0;
    for(i = myid + 1; i <= n; i+= numprocs)
    {
        x = h * ((double)i - 0.5);
        sum += f(x);
    }
    mypi = h * sum;
    MPI_Reduce(&mypi, &pi, 1, MPI_DOUBLE, MPI_SUM, 0, MPI_COMM_WORLD);
    if(myid == 0)
    {
        printf("pi is approximately %.16f, Error is %.16f\n", pi, fabs(pi - PI25DT));
        endwtime = MPI_Wtime();
        printf("time = %f", endwtime - startwtime);
        fflush(stdout);
    }
    MPI_Finalize();
    return 0;
}
目录
相关文章
|
7月前
|
分布式计算 并行计算 数据处理
NumPy的并行与分布式计算实践
【4月更文挑战第17天】本文探讨了如何使用NumPy进行并行和分布式计算以提升效率。介绍了利用`numexpr`加速多核CPU计算,设置`NUMPY_NUM_THREADS`环境变量实现多线程,并通过Dask和PySpark进行分布式计算。Dask允许无缝集成NumPy,而PySpark则将NumPy数组转换为RDD进行并行处理。这些方法对处理大规模数据至关重要。
|
机器学习/深度学习 并行计算 算法
PyTorch并行与分布式(一)概述
PyTorch并行与分布式(一)概述
207 0
|
分布式计算 资源调度 并行计算
并行计算框架MapReduce编程模型
思想:分而治之 map:对每一部分进行处理 reduce :汇总map结果 map是MapReduce最核心的。 mapreduce编程模型 一种分布式计算模型,解决海量数据计算问题 MapReduce把整个并行计算的过程抽象到两个函数,map和reduce函数。
1520 0
|
并行计算 C语言 C++
《并行计算的编程模型》一1.2 MPI基础
本节书摘来华章计算机《并行计算的编程模型》一书中的第1章 ,第1.2节, [(美)帕万·巴拉吉(Pavan Balaji)编著;张云泉等译,更多章节内容可以访问云栖社区“华章计算机”公众号查看。
2181 0
|
并行计算
《并行计算的编程模型》一1.10 MPI开发心得
本节书摘来华章计算机《并行计算的编程模型》一书中的第1章 ,第1.10节, [(美)帕万·巴拉吉(Pavan Balaji)编著;张云泉等译,更多章节内容可以访问云栖社区“华章计算机”公众号查看。
1899 0
|
并行计算
《并行计算的编程模型》一1.8 并行I/O
本节书摘来华章计算机《并行计算的编程模型》一书中的第1章 ,第1.8节, [(美)帕万·巴拉吉(Pavan Balaji)编著;张云泉等译,更多章节内容可以访问云栖社区“华章计算机”公众号查看。
2003 0
|
并行计算 安全
《并行计算的编程模型》一1.9 其他特性
本节书摘来华章计算机《并行计算的编程模型》一书中的第1章 ,第1.9节, [(美)帕万·巴拉吉(Pavan Balaji)编著;张云泉等译,更多章节内容可以访问云栖社区“华章计算机”公众号查看。
1475 0
|
并行计算 程序员 API
《并行计算的编程模型》一2.2 GASNet概述
本节书摘来华章计算机《并行计算的编程模型》一书中的第2章 ,第2.2节, [(美)帕万·巴拉吉(Pavan Balaji)编著;张云泉等译,更多章节内容可以访问云栖社区“华章计算机”公众号查看。
1268 0
|
并行计算
《并行计算的编程模型》一3.5.2 RMA函数使用
本节书摘来华章计算机《并行计算的编程模型》一书中的第3章 ,第3.5.2节, [(美)帕万·巴拉吉(Pavan Balaji)编著;张云泉等译,更多章节内容可以访问云栖社区“华章计算机”公众号查看。
1233 0
|
并行计算 索引
《并行计算的编程模型》一2.3.1 开始和结束
本节书摘来华章计算机《并行计算的编程模型》一书中的第2章 ,第2.3.1节, [(美)帕万·巴拉吉(Pavan Balaji)编著;张云泉等译,更多章节内容可以访问云栖社区“华章计算机”公众号查看。
1100 0