MPI并行计算的基本介绍和使用

简介: MPI并行计算的基本介绍和使用

MPI


基本概念


MPI(Message Passin Interface 消息传递接口)是一种消息传递编程模型,最终目的是服务于进程间通信

MPI是一种标准或者规范的代表,不特指某一个对它具体的实现

MPI是一个库,不是一门语言


一般实现

层次 说明
MPI 通过ADI层提供的服务和平台无关的算法、数据结构实现MPI的标准接口
ADI 通过底层通信库提供的API实现,把实现后的接口封装成一类抽象设备,上一层基于不同的硬件通信平台,选择不同的抽象设备
底层通道API 通常由操作系统或者特定网络产品生产商提供


操作分类


MPI操作是由MPI库为建立和启用数据传输和/或同步而执行的一系列步骤

它包括四个阶段:初始化、开始、完成和释放

内部参数名称和概念


  • 序号:即进程的标识,是用来在一个进程组或者一个通信器中标识一个进程。是唯一的
  • 通信域:它描述了一组可以相互通信的进程以及他们之间的连接关系等信息。MPI所有通信必须在某个通信器中进行。
  • 消息:MPI程序中在进程间传递的数据。它由通信器、原地址、目的地之、消息标签和数据构成
  • 通信:通信是指在进程之间进行消息的收发、同步等操作
  • 缓冲区:在用户应用程序中定义的用于保存发送和接收数据的地址空间


基本语句


  • MPI_Init(int **argc, char ***argv)

完成MPI程序初始化工作,通过获取main函数的参数,让每一个MPI进程都能获取到main函数


  • MPI_Comm_rank(MPI_comm comm, int *rank)

用于获取调用进程在给定进程通信域中的进程标识号


  • MPI_Comm_size(MPI_comm comm, int *size)

调用返回给定的通信域中所包含的进程总数


  • MPI_Finalize(void)

清除全部MPI环境


windows下使用Microsoft MPI 示例

#include <iostream>
#include <cstdio>
#include <cstdlib>
#include <mpi.h>

#define N 10

int main(int argc, char **argv)
{
    int size, rank;
    int *send;
    int *recv;
    int i = 0;

    MPI_Init(&argc, &argv);
    MPI_Comm_size(MPI_COMM_WORLD, &size);
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);

    send = new int[size * N];
    recv = new int[N];

    if(rank == 0)
    {
        for(i = 0; i < size * N; i++)
        {
            send[i] = i;
        }
    }

    MPI_Scatter(send, N, MPI_INT, recv, N, MPI_INT, 0, MPI_COMM_WORLD);

    std::printf("-------------------------rank = %d\n", rank);
    std::printf("-------------------------size = %d\n", size);
    for(int i = 0; i < N; i++)
    {
        std::printf("recv buffer[%d] = %d \n", i, recv[i]);
    }
    std::printf("---------------------------------------\n");

    delete[] send;
    delete[] recv;

    MPI_Finalize();
    return 0;
}


运行

"C:\Program Files\Microsoft MPI\Bin\mpiexec.exe" -n 4 MPITest.exe


输出

-------------------------rank = 2
-------------------------size = 4
recv buffer[0] = 20 
recv buffer[1] = 21 
recv buffer[2] = 22 
recv buffer[3] = 23 
recv buffer[4] = 24 
recv buffer[5] = 25 
recv buffer[6] = 26 
recv buffer[7] = 27 
recv buffer[8] = 28 
recv buffer[9] = 29 
---------------------------------------
-------------------------rank = 3
-------------------------size = 4
recv buffer[0] = 30 
recv buffer[1] = 31 
recv buffer[2] = 32 
recv buffer[3] = 33 
recv buffer[4] = 34 
recv buffer[5] = 35 
recv buffer[6] = 36 
recv buffer[7] = 37 
recv buffer[8] = 38 
recv buffer[9] = 39 
---------------------------------------
-------------------------rank = 0
-------------------------size = 4
recv buffer[0] = 0 
recv buffer[1] = 1 
recv buffer[2] = 2 
recv buffer[3] = 3 
recv buffer[4] = 4 
recv buffer[5] = 5 
recv buffer[6] = 6 
recv buffer[7] = 7 
recv buffer[8] = 8 
recv buffer[9] = 9 
---------------------------------------
-------------------------rank = 1
-------------------------size = 4
recv buffer[0] = 10 
recv buffer[1] = 11 
recv buffer[2] = 12 
recv buffer[3] = 13 
recv buffer[4] = 14 
recv buffer[5] = 15 
recv buffer[6] = 16 
recv buffer[7] = 17 
recv buffer[8] = 18 
recv buffer[9] = 19 
---------------------------------------


点对点通信


MPI通信的模式:


发送操作:

  • MPI_SEND 标准通信
  • MPI_BSEND 缓存通信
  • MPI_SSEND 同步通信
  • MPI_RSEND 就绪通信


接收操作:

  • MPI_RECV


标准通信

MPI_Send(void *buf, int count, MPI_Datatype datatype, int dest, int tag, MPI_Comm comm);
MPI_Recv(void *buf, int count, MPI_Datatype datatype, int source, int tag, MPI_Comm comm, status)


其中: buf 数据地址 count 数据个数 datatype 数据类型 source 原进程号 dest 目的进程号 tag进程标识 comm 通信域 status 状态


任意源和任意标识: MPI_ANY_SOURCE(标识任何继承发送的消息都可以接收) MPI_ANY_TAG(标识任何tag都可以接收)


  • MPI_ABORT(comm, errorcode) comm 退出进程所在的通信域 errorcode 返回到所嵌环境的错误码
#include <iostream>
#include <cstdio>
#include <cstdlib>
#include <mpi.h>

#define N 10

int main(int argc, char **argv)
{
    int size, rank;
    int flag, rval, i;
    int buffer_1, recv_1;
    int buffer_2, recv_2;
    MPI_Status status, status1, status2, status3, status4;
    int src = 0;
    int dest = 1;
    MPI_Init(&argc, &argv);
    MPI_Comm_size(MPI_COMM_WORLD, &size);
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);

    if(size != 2)
    {
        printf("**这个程序使用了不是两个进程 %d**\n", size);
        MPI_Abort(MPI_COMM_WORLD, 1);
    }
    if(rank == src)
    {
        buffer_1 = 200;
        buffer_2 = 20000;
        printf("standard MPI_Send\n");
        MPI_Send(&buffer_1, 1, MPI_INT, dest, 1, MPI_COMM_WORLD);
        MPI_Send(&buffer_2, 1, MPI_INT, dest, 2, MPI_COMM_WORLD);
        printf("MPI_Send %d data, tag = 1\n", buffer_1);
        printf("MPI_Send %d data, tag = 2\n", buffer_2);
    }
    else if(rank == dest)
    {
        MPI_Recv(&recv_1, 1, MPI_INT, src, 1, MPI_COMM_WORLD, &status3);
        MPI_Recv(&recv_2, 1, MPI_INT, src, 2, MPI_COMM_WORLD, &status4);
        printf("MPI_Recv %d data, tag = 1\n", recv_1);
        printf("MPI_Recv %d data, tag = 2\n", recv_2);
    }

    MPI_Finalize();
    return 0;
}

输出

standard MPI_Send
MPI_Send 200 data, tag = 1
MPI_Send 20000 data, tag = 2
MPI_Recv 200 data, tag = 1
MPI_Recv 20000 data, tag = 2


缓存通信


并行程序员对标准通信模式不满意,希望能够对通信缓冲区进行直接控制。


并行程序员需要对通信缓冲区进行申请、使用和释放,通信缓冲区的合理与正确使用需要设计人员自己保证。

#include <iostream>
#include <cstdio>
#include <cstdlib>
#include <mpi.h>

#define N 10

int main(int argc, char **argv)
{
    int rank;

    double recv;
    double *tmpbuffer;
    int size = 1;
    int bsize;
    double data = 100.00;

    MPI_Status status;
    int src = 0;
    int dest = 1;
    MPI_Init(&argc, &argv);
//    MPI_Comm_size(MPI_COMM_WORLD, &size);
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);

    if(rank == 0)
    {
        printf("MPI_BSend\n");
        MPI_Pack_size(size, MPI_DOUBLE, MPI_COMM_WORLD, &bsize);
        tmpbuffer = (double *) malloc(bsize + MPI_BSEND_OVERHEAD);
        MPI_Buffer_attach(tmpbuffer, bsize + MPI_BSEND_OVERHEAD);
        printf("BSend data\n");
        MPI_Bsend(&data, 1, MPI_DOUBLE, 1, 2000, MPI_COMM_WORLD);
        MPI_Buffer_detach(&tmpbuffer, &bsize);
    }
    else if(rank == 1)
    {
        MPI_Recv(&recv, 1, MPI_DOUBLE, 0, 2000, MPI_COMM_WORLD, &status);
        printf("MPI_Recv %f data\n", recv);
    }

    MPI_Finalize();
    return 0;
}


输出

MPI_Recv 100.000000 data
MPI_BSend
BSend data


同步通信

#include <iostream>
#include <cstdio>
#include <cstdlib>
#include <mpi.h>

#define N 10

int main(int argc, char **argv)
{
    int size, rank;
    int act_size = 0;
    int flag, rval, i;
    int buffer, recv;
    int sbuffer, srecv;
    int ssbuffer, ssrecv;
    int buffer1, recv1;
    MPI_Status status, status1, status2, status3, status4;
    int count1, count2;
    int src = 0;
    int dest = 1;
    MPI_Init(&argc, &argv);
    MPI_Comm_size(MPI_COMM_WORLD, &size);
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    if(size != 2)
    {
        MPI_Abort(MPI_COMM_WORLD, 1);
    }

    if(rank == src)
    {
        buffer = 100;
        buffer1 = 10000;
        ssbuffer = 20000;
        sbuffer = 200;
        MPI_Send(&ssbuffer, 1, MPI_INT, dest, 3, MPI_COMM_WORLD);
        MPI_Send(&sbuffer, 1, MPI_INT, dest, 4, MPI_COMM_WORLD);
        MPI_Ssend(&buffer, 1, MPI_INT, dest, 1, MPI_COMM_WORLD);
        printf("MPI_Ssend %d data tag = 1\n", buffer);
        MPI_Ssend(&buffer1, 1, MPI_INT, dest, 2, MPI_COMM_WORLD);
        printf("MPI_Ssend %d data tag = 2\n", buffer1);
    }
    else if(rank == dest)
    {
        MPI_Recv(&srecv, 1, MPI_INT, src, 3, MPI_COMM_WORLD, &status3);
        MPI_Recv(&ssrecv, 1, MPI_INT, src, 4, MPI_COMM_WORLD, &status4);

        MPI_Recv(&recv, 1, MPI_INT, src, 1, MPI_COMM_WORLD, &status1);
        printf("MPI_Recv %d data tag = 1\n", recv);
        MPI_Recv(&recv1, 1, MPI_INT, src, 2, MPI_COMM_WORLD, &status2);
        printf("MPI_Recv %d data tag = 2\n", recv1);
    }


    MPI_Finalize();
    return 0;
}


输出

MPI_Ssend 100 data tag = 1
MPI_Ssend 10000 data tag = 2
MPI_Recv 100 data tag = 1
MPI_Recv 10000 data tag = 2


就绪通信


就绪通信的特殊之处就在于它要求接收操作先于发送操作而被启动。因此,在一个正确的程序中,一个就绪发送能被一个标准发送替代,它对程序的语义没有影响,而对程序的性能有影响。

#include <iostream>
#include <cstdio>
#include <cstdlib>
#include <mpi.h>

#define N 10

int main(int argc, char **argv)
{
    int size, rank;
    int next, prev;
    int tag, count;
    double buffer, recv;
    MPI_Status status;
    MPI_Request  request;

    MPI_Init(&argc, &argv);
    MPI_Comm_size(MPI_COMM_WORLD, &size);
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    tag = 1;
    next = rank + 1;
    if(next >= size)  next = 0;
    if(size != 2)
    {
        MPI_Abort(MPI_COMM_WORLD, 1);
    }
    
    if(rank == 0)
    {
        printf("Rsend Tets\n");
        buffer = 6666.0f;
        MPI_Recv(MPI_BOTTOM, 0, MPI_INT, next, tag, MPI_COMM_WORLD, &status);
        printf("process %d post ready", rank);
        MPI_Rsend(&buffer, 1, MPI_DOUBLE, next, tag, MPI_COMM_WORLD);
    }
    else
    {
        printf("process %d recive call", rank);
        MPI_Irecv(&recv, 1, MPI_DOUBLE, MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &request);
        MPI_Send(MPI_BOTTOM, 0, MPI_INT, next, tag, MPI_COMM_WORLD);
        MPI_Wait(&request, &status);
        printf("ready MPI_recv = %f\n", recv);
        printf("process %d receive rsend message form &d\n", rank, status.MPI_SOURCE);
    }

    MPI_Finalize();
    return 0;
}


输出

Rsend Tets
process 0 post ready
process 1 recive callready MPI_recv = 6666.000000
process 1 receive rsend message form &d


MPI组通信


点对点通信是一个发送方和一个接收方2个进程

组通信则是一对多,多对一,多对多的进程,进程数量不确定

组通信一般实现了三个功能:


  • 通信:通信功能主要完成组内数据的传输 - 广播,收集,散发,组收集,全互换
  • 同步:同步功能实现组内所有进程在特定的地点在执行进度上取得一致,同步功能是许多应用中必须提供的功能,组同喜还提供专门的调用以完成各个进程之间的同步,从而协调各个进程的进度和步伐
  • 计算:计算功能稍微复杂一点,要对给定的数据完成一定的操作


广播 - 一对多


广播函数接口:MPI_Bcast(buffer, count, datatype, root, comm)


收集


MPI_Gather(sendbuf, sendcount, sendtype, recvbuf, recvcount, recvtype, root, comm)


MPI_GatherV(sendbuf, sendcount, sendtype, recvbuf, recvcounts, displs, recvtype, root, comm)


散发


MPI_Scatter(sendbuf, sendcount, sendtype, recvbuf, recvcount, recvtype, root, comm)


MPI_SCATTERV(sendbuf, sendcounts, displs, sendtype, recvbuf, recvcounts, recvtype, root, comm)


组收集


MPI_ALLGATHER(sendbuf, sendcount, sendtype, recvbuf, recvcount, recvtype, comm)


归约(reduce)


MPI_REDUCE 将组内每个进程输入缓冲区中的数据按给定的操作op进行运算,并将其结果返回到序列号为root的进程的输出缓冲区中。


MPI中已经定义好了一些操作,他们是为函数MPI_REDUCE和一些其他的相关函数,如MPI_ALLREDUCE, MPI_REDUCE_SCATTER和MPI_SCAN而定义的,这些操作用来设定相应的op


名字 含义
MPI_MAX 最大值
MPI_MIN 最小值
MPI_SUM 求和
MPI_PROD 求积
MPI_LAND 逻辑与
MPI_BAND 按位与
MPI_LOR 逻辑或
MPI_BOR 按位或
MPI_LXOR 逻辑异或
MPI_BXOR 按位异或
MPI_MAXLOC 最大值且相应位置
MPI_MINLOC 最小值且相应位置


归约并散发


MPI_REDUCE_SCATTER 操作可以认为是MPI对每个归约操作的变形,它将结果分散到主内的所有进程中取,而不是仅仅归约到root进程中。


求PI

#include <iostream>
#include <cstdio>
#include <cstdlib>
#include <mpi.h>

#define N 10

double f(double x)
{
    return (4.0 / (1.0 + x*x));
}

int main(int argc, char **argv)
{
    int done = 0, n, myid, numprocs, i;
    double PI25DT = 3.141592653589793238462643;
    double mypi, pi, h, sum, x;
    double startwtime = 0.0, endwtime;
    int namelen;

    MPI_Init(&argc, &argv);
    MPI_Comm_size(MPI_COMM_WORLD, &numprocs);
    MPI_Comm_rank(MPI_COMM_WORLD, &myid);

    if(myid == 0)
    {
        n = 100;
    }
    startwtime = MPI_Wtime();
    MPI_Bcast(&n, 1, MPI_INT, 0, MPI_COMM_WORLD);
    h = 1.0/(double)n;
    sum = 0.0;
    for(i = myid + 1; i <= n; i+= numprocs)
    {
        x = h * ((double)i - 0.5);
        sum += f(x);
    }
    mypi = h * sum;
    MPI_Reduce(&mypi, &pi, 1, MPI_DOUBLE, MPI_SUM, 0, MPI_COMM_WORLD);
    if(myid == 0)
    {
        printf("pi is approximately %.16f, Error is %.16f\n", pi, fabs(pi - PI25DT));
        endwtime = MPI_Wtime();
        printf("time = %f", endwtime - startwtime);
        fflush(stdout);
    }
    MPI_Finalize();
    return 0;
}
目录
相关文章
|
存储 缓存 Linux
Linux内核学习(九):linux内核的特殊文件系统-debugfs、ftrace、sys
Linux内核学习(九):linux内核的特殊文件系统-debugfs、ftrace、sys
684 0
|
3月前
|
并行计算 算法 Linux
毅硕HPC | 一文详解HPC环境中的MPI并行计算
MPI主要用于分布式内存系统,适合跨多个服务器节点的大规模并行任务。MPI 不仅仅是一种编程接口,它是连接算法与硬件之间的桥梁,是实现“算得更快、看得更远”的关键技术支撑。
290 0
毅硕HPC | 一文详解HPC环境中的MPI并行计算
|
存储 NoSQL 固态存储
阿里云服务器云盘选择参考,ESSD Entry云盘和Entry云盘区别
在我们选择阿里云服务器系统盘和数据盘的时候,有部分云服务器同时支持ESSD Entry云盘和ESSD云盘,对于部分初次接触阿里云服务器的用户来说,可能并不是很清楚他们之间的区别,因此不知道选择哪种更好更能满足自己场景的需求,本文为大家介绍一下阿里云服务器ESSD Entry云盘和ESSD云盘的区别及选择参考。
|
存储 网络协议 大数据
一文读懂RDMA: Remote Direct Memory Access(远程直接内存访问)
该文档详细介绍了RDMA(远程直接内存访问)技术的基本原理、主要特点及其编程接口。RDMA通过硬件直接在应用程序间搬移数据,绕过操作系统协议栈,显著提升网络通信效率,尤其适用于高性能计算和大数据处理等场景。文档还提供了RDMA编程接口的概述及示例代码,帮助开发者更好地理解和应用这一技术。
|
分布式计算 并行计算 Hadoop
【云计算与大数据计算】分布式处理CPU多核、MPI并行计算、Hadoop、Spark的简介(超详细)
【云计算与大数据计算】分布式处理CPU多核、MPI并行计算、Hadoop、Spark的简介(超详细)
1088 0
|
机器学习/深度学习 自动驾驶 算法
ONNX 在自动驾驶汽车中的应用案例
【8月更文第27天】随着自动驾驶技术的快速发展,高效的模型部署和跨平台的支持变得尤为重要。Open Neural Network Exchange (ONNX) 作为一种开放的模型格式,可以促进不同深度学习框架之间的模型转换,同时支持多种硬件平台上的高效执行。本文将探讨 ONNX 在自动驾驶系统中的应用,特别是如何在感知、决策和控制等核心环节中发挥作用。
751 3
|
JSON Nacos 开发工具
微服务通过nacos实现动态路由
微服务通过nacos实现动态路由
518 7
|
JavaScript 前端开发 数据格式
Cesium案例解析(四)——3DModels模型加载
Cesium案例解析(四)——3DModels模型加载
841 0
|
自然语言处理 算法 Serverless
详尽分享贝叶斯算法的基本原理和算法实现
详尽分享贝叶斯算法的基本原理和算法实现
493 0