Found a workaround that gives good inter and intra-node performance. HPC-X MPI implementation does not know how to do p2p comm with pinned arrays (should be 80 GiB/s, measured 10 GiB/s) and internode comm is super slow without pinned arrays (should be 40 GiB/s, measured < 1 GiB/s). Made a proof of concept communicator that pins arrays that are send or received from another node.

This commit is contained in:
jpekkila
2020-04-05 20:15:32 +03:00
parent 88e53dfa21
commit cc9d3f1b9c
3 changed files with 258 additions and 5 deletions

View File

@@ -6,4 +6,4 @@ find_package(CUDAToolkit)
add_executable(bwtest main.c)
add_compile_options(-O3)
target_link_libraries(bwtest MPI::MPI_C OpenMP::OpenMP_C CUDA::cudart)
target_link_libraries(bwtest MPI::MPI_C OpenMP::OpenMP_C CUDA::cudart_static)

View File

@@ -1,4 +1,5 @@
#include <assert.h>
#include <stdbool.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
@@ -18,6 +19,8 @@
- Need to use cudaMalloc for intranode comm for P2P to trigger with MPI
- For internode one should use pinned memory (RDMA is staged through pinned, gives full
network speed iff pinned)
- Both the sending and receiving arrays must be pinned to see performance improvement
in internode comm
*/
static uint8_t*
@@ -36,6 +39,20 @@ freeHost(uint8_t* arr)
static uint8_t*
allocDevice(const size_t bytes)
{
uint8_t* arr;
// Standard (20 GiB/s internode, 85 GiB/s intranode)
const cudaError_t retval = cudaMalloc((void**)&arr, bytes);
// Unified mem (5 GiB/s internode, 6 GiB/s intranode)
// const cudaError_t retval = cudaMallocManaged((void**)&arr, bytes, cudaMemAttachGlobal);
// Pinned (40 GiB/s internode, 10 GiB/s intranode)
// const cudaError_t retval = cudaMallocHost((void**)&arr, bytes);
assert(retval == cudaSuccess);
return arr;
}
static uint8_t*
allocDevicePinned(const size_t bytes)
{
uint8_t* arr;
// Standard (20 GiB/s internode, 85 GiB/s intranode)
@@ -107,6 +124,107 @@ sendrecv_twoway(uint8_t* src, uint8_t* dst)
MPI_COMM_WORLD, &status);
}
static void
sendrecv_nonblocking_multiple(uint8_t* src, uint8_t* dst)
{
int pid, nprocs;
MPI_Comm_rank(MPI_COMM_WORLD, &pid);
MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
MPI_Request recv_requests[nprocs], send_requests[nprocs];
for (int i = 1; i < nprocs; ++i) {
int nfront = (pid + i) % nprocs;
int nback = (((pid - i) % nprocs) + nprocs) % nprocs;
MPI_Irecv(dst, BLOCK_SIZE, MPI_BYTE, nback, pid, MPI_COMM_WORLD, &recv_requests[i]);
MPI_Isend(src, BLOCK_SIZE, MPI_BYTE, nfront, nfront, MPI_COMM_WORLD, &send_requests[i]);
}
for (int i = 1; i < nprocs; ++i) {
MPI_Status status;
MPI_Wait(&recv_requests[i], &status);
MPI_Wait(&send_requests[i], &status);
}
}
static void
sendrecv_nonblocking_multiple_parallel(uint8_t* src, uint8_t* dst)
{
int pid, nprocs;
MPI_Comm_rank(MPI_COMM_WORLD, &pid);
MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
MPI_Request recv_requests[nprocs], send_requests[nprocs];
for (int i = 1; i < nprocs; ++i) {
int nfront = (pid + i) % nprocs;
MPI_Isend(src, BLOCK_SIZE, MPI_BYTE, nfront, nfront, MPI_COMM_WORLD, &send_requests[i]);
}
static bool error_shown = false;
if (!pid && !error_shown) {
fprintf(stderr, "\tWARNING: make sure you init MPI_Init_thread for OpenMP support (no "
"supported on puhti atm "
"2020-04-05\n");
error_shown = true;
}
#pragma omp parallel for
for (int i = 1; i < nprocs; ++i) {
int nback = (((pid - i) % nprocs) + nprocs) % nprocs;
MPI_Status status;
MPI_Recv(dst, BLOCK_SIZE, MPI_BYTE, nback, pid, MPI_COMM_WORLD, &status);
}
for (int i = 1; i < nprocs; ++i) {
MPI_Status status;
MPI_Wait(&send_requests[i], &status);
}
}
static void
sendrecv_nonblocking_multiple_rt_pinning(uint8_t* src, uint8_t* dst)
{
int pid, nprocs;
MPI_Comm_rank(MPI_COMM_WORLD, &pid);
MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
static uint8_t* src_pinned = NULL;
static uint8_t* dst_pinned = NULL;
if (!src_pinned)
src_pinned = allocDevicePinned(BLOCK_SIZE); // Note: Never freed
if (!dst_pinned)
dst_pinned = allocDevicePinned(BLOCK_SIZE); // Note: Never freed
int devices_per_node = -1;
cudaGetDeviceCount(&devices_per_node);
const int node_id = pid / devices_per_node;
MPI_Request recv_requests[nprocs], send_requests[nprocs];
for (int i = 1; i < nprocs; ++i) {
int nfront = (pid + i) % nprocs;
int nback = (((pid - i) % nprocs) + nprocs) % nprocs;
if (nback / devices_per_node != pid / devices_per_node) // Not on the same node
MPI_Irecv(dst_pinned, BLOCK_SIZE, MPI_BYTE, nback, pid, MPI_COMM_WORLD,
&recv_requests[i]);
else
MPI_Irecv(dst, BLOCK_SIZE, MPI_BYTE, nback, pid, MPI_COMM_WORLD, &recv_requests[i]);
if (nfront / devices_per_node != pid / devices_per_node) // Not on the same node
MPI_Isend(src_pinned, BLOCK_SIZE, MPI_BYTE, nfront, nfront, MPI_COMM_WORLD,
&send_requests[i]);
else
MPI_Isend(src, BLOCK_SIZE, MPI_BYTE, nfront, nfront, MPI_COMM_WORLD, &send_requests[i]);
}
for (int i = 1; i < nprocs; ++i) {
MPI_Status status;
MPI_Wait(&recv_requests[i], &status);
MPI_Wait(&send_requests[i], &status);
}
}
#define PRINT \
if (!pid) \
printf
@@ -115,7 +233,7 @@ static void
measurebw(const char* msg, const size_t bytes, void (*sendrecv)(uint8_t*, uint8_t*), uint8_t* src,
uint8_t* dst)
{
const size_t num_samples = 10;
const size_t num_samples = 100;
int pid, nprocs;
MPI_Comm_rank(MPI_COMM_WORLD, &pid);
@@ -145,23 +263,36 @@ measurebw(const char* msg, const size_t bytes, void (*sendrecv)(uint8_t*, uint8_
MPI_Barrier(MPI_COMM_WORLD);
const long double time_elapsed = timer_diff_nsec(t) / 1e9l; // seconds
PRINT("%Lg GiB/s\n", num_samples * bytes / time_elapsed / (1024 * 1024 * 1024));
PRINT("\tTransfer time: %Lg ms\n", time_elapsed * 1000);
PRINT("\tTransfer time: %Lg ms\n", time_elapsed * 1000 / num_samples);
MPI_Barrier(MPI_COMM_WORLD);
}
int
main(void)
{
MPI_Init(NULL, NULL);
// int provided;
// MPI_Init_thread(NULL, NULL, MPI_THREAD_MULTIPLE, &provided);
// assert(provided >= MPI_THREAD_MULTIPLE);
// Disable stdout buffering
setbuf(stdout, NULL);
MPI_Init(NULL, NULL);
int pid, nprocs;
MPI_Comm_rank(MPI_COMM_WORLD, &pid);
MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
assert(nprocs >= 2); // Require at least one neighbor
MPI_Barrier(MPI_COMM_WORLD);
if (!pid) {
printf("Do we have threads? The following should not be ordered (unless very lucky)\n");
#pragma omp parallel for
for (int i = 0; i < 10; ++i)
printf("%d, ", i);
printf("\n");
}
MPI_Barrier(MPI_COMM_WORLD);
int devices_per_node = -1;
cudaGetDeviceCount(&devices_per_node);
const int device_id = pid % devices_per_node;
@@ -182,10 +313,15 @@ main(void)
2 * BLOCK_SIZE, sendrecv_nonblocking, src, dst);
measurebw("Bidirectional bandwidth, twoway (Host)", //
2 * BLOCK_SIZE, sendrecv_twoway, src, dst);
measurebw("Bidirectional bandwidth, async multiple (Host)", //
2 * BLOCK_SIZE, sendrecv_nonblocking_multiple, src, dst);
measurebw("Bidirectional bandwidth, async multiple parallel (Host)", //
2 * BLOCK_SIZE, sendrecv_nonblocking_multiple_parallel, src, dst);
freeHost(src);
freeHost(dst);
}
PRINT("\n------------------------\n");
{
uint8_t* src = allocDevice(BLOCK_SIZE);
@@ -197,10 +333,47 @@ main(void)
2 * BLOCK_SIZE, sendrecv_nonblocking, src, dst);
measurebw("Bidirectional bandwidth, twoway (Device)", //
2 * BLOCK_SIZE, sendrecv_twoway, src, dst);
measurebw("Bidirectional bandwidth, async multiple (Device)", //
2 * BLOCK_SIZE, sendrecv_nonblocking_multiple, src, dst);
measurebw("Bidirectional bandwidth, async multiple parallel (Device)", //
2 * BLOCK_SIZE, sendrecv_nonblocking_multiple_parallel, src, dst);
measurebw("Bidirectional bandwidth, async multiple (Device, rt pinning)", //
2 * BLOCK_SIZE, sendrecv_nonblocking_multiple_rt_pinning, src, dst);
freeDevice(src);
freeDevice(dst);
}
PRINT("\n------------------------\n");
{
uint8_t* src = allocDevicePinned(BLOCK_SIZE);
uint8_t* dst = allocDevicePinned(BLOCK_SIZE);
measurebw("Unidirectional bandwidth, blocking (Device, pinned)", //
2 * BLOCK_SIZE, sendrecv_blocking, src, dst);
measurebw("Bidirectional bandwidth, async (Device, pinned)", //
2 * BLOCK_SIZE, sendrecv_nonblocking, src, dst);
measurebw("Bidirectional bandwidth, twoway (Device, pinned)", //
2 * BLOCK_SIZE, sendrecv_twoway, src, dst);
measurebw("Bidirectional bandwidth, async multiple (Device, pinned)", //
2 * BLOCK_SIZE, sendrecv_nonblocking_multiple, src, dst);
freeDevice(src);
freeDevice(dst);
}
PRINT("\n------------------------\n");
/*
{ // Final run for easy identification with the profiler
uint8_t* src = allocDevice(BLOCK_SIZE);
uint8_t* dst = allocDevice(BLOCK_SIZE);
measurebw("Bidirectional bandwidth, async multiple (Device, rt pinning)", //
2 * BLOCK_SIZE, sendrecv_nonblocking_multiple_rt_pinning, src, dst);
freeDevice(src);
freeDevice(dst);
}
*/
MPI_Finalize();
return EXIT_SUCCESS;

View File

@@ -0,0 +1,80 @@
/*
Copyright (C) 2014-2020, Johannes Pekkila, Miikka Vaisala.
This file is part of Astaroth.
Astaroth is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
Astaroth is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with Astaroth. If not, see <http://www.gnu.org/licenses/>.
*/
/**
Running: mpirun -np <num processes> <executable>
*/
#include "astaroth.h"
#include "astaroth_utils.h"
#include <mpi.h>
int
main(void)
{
MPI_Init(NULL, NULL);
int nprocs, pid;
MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
MPI_Comm_rank(MPI_COMM_WORLD, &pid);
// CPU alloc
AcMeshInfo info;
acLoadConfig(AC_DEFAULT_CONFIG, &info);
info.real_params[AC_inv_dsx] = AcReal(1.0) / info.real_params[AC_dsx];
info.real_params[AC_inv_dsy] = AcReal(1.0) / info.real_params[AC_dsy];
info.real_params[AC_inv_dsz] = AcReal(1.0) / info.real_params[AC_dsz];
info.real_params[AC_cs2_sound] = info.real_params[AC_cs_sound] * info.real_params[AC_cs_sound];
AcMesh model, candidate;
if (pid == 0) {
acMeshCreate(info, &model);
acMeshCreate(info, &candidate);
acMeshRandomize(&model);
acMeshRandomize(&candidate);
}
// GPU alloc & compute
Grid grid;
acGridCreateMPI(info, &grid);
acGridLoadMeshMPI(grid, STREAM_DEFAULT, model);
acGridSynchronizeStreamMPI(grid, STREAM_ALL);
acGridIntegrateMPI(grid, FLT_EPSILON);
acGridSynchronizeStreamMPI(grid, STREAM_ALL);
acGridSynchronizeMeshMPI(grid, STREAM_DEFAULT);
acGridSynchronizeStreamMPI(grid, STREAM_ALL);
acGridStoreMeshMPI(grid, STREAM_DEFAULT, &candidate);
acGridSynchronizeStreamMPI(grid, STREAM_ALL);
acGridDestroyMPI(grid);
// Verify
if (pid == 0) {
acModelIntegrateStep(model, FLT_EPSILON);
acMeshApplyPeriodicBounds(&model);
acVerifyMesh(model, candidate);
acMeshDestroy(&model);
acMeshDestroy(&candidate);
}
MPI_Finalize();
return EXIT_SUCCESS;
}