Added a simplified and cleaned up 3D decomp MPI implementation. Tested to work at least up to 2x2x2 nodes.

This commit is contained in:
jpekkila
2020-01-17 15:22:23 +02:00
parent 975a15f7f4
commit ff6a7155e5

View File

@@ -581,3 +581,652 @@ acDeviceReduceVec(const Device device, const Stream stream, const ReductionType
#if PACKED_DATA_TRANSFERS // TODO DEPRECATED, see AC_MPI_ENABLED instead
// Functions for calling packed data transfers
#endif
#if AC_MPI_ENABLED == 0
AcResult
acDeviceRunMPITest(void)
{
WARNING("MPI was not enabled but acDeviceRunMPITest() was called");
return AC_FAILURE;
}
#else // MPI_ENABLED ///////////////////////////////////////////////////////////////////////////////
#include <mpi.h>
// Kernels
#include "kernels/packing.cuh"
// From Astaroth Utils
#include "src/utils/config_loader.h"
#include "src/utils/memory.h"
#include "src/utils/verification.h"
static int
mod(const int a, const int b)
{
const int r = a % b;
return r < 0 ? r + b : r;
}
static int
getPid(const int3 pid, const int3 decomposition)
{
return mod(pid.x, decomposition.x) + //
mod(pid.y, decomposition.y) * decomposition.x + //
mod(pid.z, decomposition.z) * decomposition.x * decomposition.y;
}
static int3
getPid3D(const int pid, const int3 decomposition)
{
const int3 pid3d = (int3){
mod(pid, decomposition.x),
mod(pid / decomposition.x, decomposition.y),
(pid / (decomposition.x * decomposition.y)),
};
return pid3d;
}
static int3
decompose(const int target)
{
int decomposition[] = {1, 1, 1};
int axis = 0;
while (decomposition[0] * decomposition[1] * decomposition[2] < target) {
++decomposition[axis];
axis = (axis + 1) % 3;
}
const int found = decomposition[0] * decomposition[1] * decomposition[2];
if (found != target) {
fprintf(stderr, "Invalid number of processes! Cannot decompose the problem domain!\n");
fprintf(stderr, "Target nprocs: %d. Next allowed: %d\n", target, found);
ERROR("Invalid nprocs");
return (int3){-1, -1, -1};
}
else {
return (int3){decomposition[0], decomposition[1], decomposition[2]};
}
}
static PackedData
acCreatePackedData(const int3 dims)
{
PackedData data = {};
data.dims = dims;
const size_t bytes = dims.x * dims.y * dims.z * sizeof(data.data[0]) * NUM_VTXBUF_HANDLES;
ERRCHK_CUDA_ALWAYS(cudaMalloc((void**)&data.data, bytes));
return data;
}
static AcResult
acDestroyPackedData(PackedData* data)
{
data->dims = (int3){-1, -1, -1};
cudaFree(data->data);
data->data = NULL;
return AC_SUCCESS;
}
static PackedData
acCreatePackedDataHost(const int3 dims)
{
PackedData data = {};
data.dims = dims;
const size_t bytes = dims.x * dims.y * dims.z * sizeof(data.data[0]) * NUM_VTXBUF_HANDLES;
data.data = (AcReal*)malloc(bytes);
ERRCHK_ALWAYS(data.data);
return data;
}
static void
acTransferPackedDataToHost(const PackedData ddata, PackedData* hdata)
{
const size_t bytes = ddata.dims.x * ddata.dims.y * ddata.dims.z * sizeof(ddata.data[0]) *
NUM_VTXBUF_HANDLES;
ERRCHK_CUDA_ALWAYS(cudaMemcpy(hdata->data, ddata.data, bytes, cudaMemcpyDeviceToHost));
}
static void
acTransferPackedDataToDevice(const PackedData hdata, PackedData* ddata)
{
const size_t bytes = hdata.dims.x * hdata.dims.y * hdata.dims.z * sizeof(hdata.data[0]) *
NUM_VTXBUF_HANDLES;
ERRCHK_CUDA_ALWAYS(cudaMemcpy(ddata->data, hdata.data, bytes, cudaMemcpyHostToDevice));
}
static AcResult
acDestroyPackedDataHost(PackedData* data)
{
data->dims = (int3){-1, -1, -1};
free(data->data);
data->data = NULL;
return AC_SUCCESS;
}
// TODO: do with packed data
static AcResult
acDeviceDistributeMeshMPI(const AcMesh src, const int3 decomposition, AcMesh* dst)
{
MPI_Barrier(MPI_COMM_WORLD);
printf("Distributing mesh...\n");
fflush(stdout);
MPI_Datatype datatype = MPI_FLOAT;
if (sizeof(AcReal) == 8)
datatype = MPI_DOUBLE;
int pid, nprocs;
MPI_Comm_rank(MPI_COMM_WORLD, &pid);
MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
ERRCHK_ALWAYS(dst);
// Submesh nn
const int3 nn = (int3){
dst->info.int_params[AC_nx],
dst->info.int_params[AC_ny],
dst->info.int_params[AC_nz],
};
// Submesh mm
const int3 mm = (int3){
dst->info.int_params[AC_mx],
dst->info.int_params[AC_my],
dst->info.int_params[AC_mz],
};
// Send to self
if (pid == 0) {
for (int vtxbuf = 0; vtxbuf < NUM_VTXBUF_HANDLES; ++vtxbuf) {
// For pencils
for (int k = NGHOST; k < NGHOST + nn.z; ++k) {
for (int j = NGHOST; j < NGHOST + nn.y; ++j) {
const int i = NGHOST;
const int count = nn.x;
const int src_idx = acVertexBufferIdx(i, j, k, src.info);
const int dst_idx = acVertexBufferIdx(i, j, k, dst->info);
memcpy(&dst->vertex_buffer[vtxbuf][dst_idx], //
&src.vertex_buffer[vtxbuf][src_idx], //
count * sizeof(src.vertex_buffer[i][0]));
}
}
}
}
for (int vtxbuf = 0; vtxbuf < NUM_VTXBUF_HANDLES; ++vtxbuf) {
// For pencils
for (int k = NGHOST; k < NGHOST + nn.z; ++k) {
for (int j = NGHOST; j < NGHOST + nn.y; ++j) {
const int i = NGHOST;
const int count = nn.x;
if (pid != 0) {
const int dst_idx = acVertexBufferIdx(i, j, k, dst->info);
// Recv
MPI_Status status;
MPI_Recv(&dst->vertex_buffer[vtxbuf][dst_idx], count, datatype, 0, 0,
MPI_COMM_WORLD, &status);
}
else {
for (int tgt_pid = 1; tgt_pid < nprocs; ++tgt_pid) {
const int3 tgt_pid3d = getPid3D(tgt_pid, decomposition);
const int src_idx = acVertexBufferIdx(i + tgt_pid3d.x * nn.x, //
j + tgt_pid3d.y * nn.y, //
k + tgt_pid3d.z * nn.z, //
src.info);
// Send
MPI_Send(&src.vertex_buffer[vtxbuf][src_idx], count, datatype, tgt_pid, 0,
MPI_COMM_WORLD);
}
}
}
}
}
return AC_SUCCESS;
}
// TODO: do with packed data
static AcResult
acDeviceGatherMeshMPI(const AcMesh src, const int3 decomposition, AcMesh* dst)
{
MPI_Barrier(MPI_COMM_WORLD);
printf("Gathering mesh...\n");
fflush(stdout);
MPI_Datatype datatype = MPI_FLOAT;
if (sizeof(AcReal) == 8)
datatype = MPI_DOUBLE;
int pid, nprocs;
MPI_Comm_rank(MPI_COMM_WORLD, &pid);
MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
if (pid == 0)
ERRCHK_ALWAYS(dst);
// Submesh nn
const int3 nn = (int3){
src.info.int_params[AC_nx],
src.info.int_params[AC_ny],
src.info.int_params[AC_nz],
};
// Submesh mm
const int3 mm = (int3){
src.info.int_params[AC_mx],
src.info.int_params[AC_my],
src.info.int_params[AC_mz],
};
// Send to self
if (pid == 0) {
for (int vtxbuf = 0; vtxbuf < NUM_VTXBUF_HANDLES; ++vtxbuf) {
// For pencils
for (int k = 0; k < mm.z; ++k) {
for (int j = 0; j < mm.y; ++j) {
const int i = 0;
const int count = mm.x;
const int src_idx = acVertexBufferIdx(i, j, k, src.info);
const int dst_idx = acVertexBufferIdx(i, j, k, dst->info);
memcpy(&dst->vertex_buffer[vtxbuf][dst_idx], //
&src.vertex_buffer[vtxbuf][src_idx], //
count * sizeof(src.vertex_buffer[i][0]));
}
}
}
}
for (int vtxbuf = 0; vtxbuf < NUM_VTXBUF_HANDLES; ++vtxbuf) {
// For pencils
for (int k = 0; k < mm.z; ++k) {
for (int j = 0; j < mm.y; ++j) {
const int i = 0;
const int count = mm.x;
if (pid != 0) {
// Send
const int src_idx = acVertexBufferIdx(i, j, k, src.info);
MPI_Send(&src.vertex_buffer[vtxbuf][src_idx], count, datatype, 0, 0,
MPI_COMM_WORLD);
}
else {
for (int tgt_pid = 1; tgt_pid < nprocs; ++tgt_pid) {
const int3 tgt_pid3d = getPid3D(tgt_pid, decomposition);
const int dst_idx = acVertexBufferIdx(i + tgt_pid3d.x * nn.x, //
j + tgt_pid3d.y * nn.y, //
k + tgt_pid3d.z * nn.z, //
dst->info);
// Recv
MPI_Status status;
MPI_Recv(&dst->vertex_buffer[vtxbuf][dst_idx], count, datatype, tgt_pid, 0,
MPI_COMM_WORLD, &status);
}
}
}
}
}
return AC_SUCCESS;
}
static AcResult
acDeviceCommunicateBlocksMPI(const Device device, //
const int3* a0s, // Src idx inside comp. domain
const int3* b0s, // Dst idx inside bound zone
const int mapping_count, // Num a0s and b0s
const int3 dims) // Block size
{
cudaSetDevice(device->id);
acDeviceSynchronizeStream(device, STREAM_ALL);
MPI_Barrier(MPI_COMM_WORLD);
MPI_Datatype datatype = MPI_FLOAT;
if (sizeof(AcReal) == 8)
datatype = MPI_DOUBLE;
int nprocs, pid;
MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
MPI_Comm_rank(MPI_COMM_WORLD, &pid);
const int3 decomp = decompose(nprocs);
const int3 nn = (int3){
device->local_config.int_params[AC_nx],
device->local_config.int_params[AC_ny],
device->local_config.int_params[AC_nz],
};
for (int k = -1; k <= 1; ++k) {
for (int j = -1; j <= 1; ++j) {
for (int i = -1; i <= 1; ++i) {
if (i == 0 && j == 0 && k == 0)
continue;
for (size_t a_idx = 0; a_idx < mapping_count; ++a_idx) {
for (size_t b_idx = 0; b_idx < mapping_count; ++b_idx) {
const int3 neighbor = (int3){i, j, k};
const int3 a0 = a0s[a_idx];
// const int3 a1 = a0 + dims;
const int3 b0 = a0 - neighbor * nn;
// const int3 b1 = a1 - neighbor * nn;
if (b0s[b_idx].x == b0.x && b0s[b_idx].y == b0.y && b0s[b_idx].z == b0.z) {
const size_t count = dims.x * dims.y * dims.z * NUM_VTXBUF_HANDLES;
PackedData src = acCreatePackedData(dims);
PackedData dst = acCreatePackedData(dims);
const cudaStream_t stream = device->streams[STREAM_DEFAULT];
acKernelPackData(stream, device->vba, a0, src);
acDeviceSynchronizeStream(device, STREAM_DEFAULT);
// Host ////////////////////////////////////////////////
PackedData src_host = acCreatePackedDataHost(dims);
PackedData dst_host = acCreatePackedDataHost(dims);
acTransferPackedDataToHost(src, &src_host);
acDeviceSynchronizeStream(device, STREAM_ALL);
MPI_Barrier(MPI_COMM_WORLD);
////////////////////////////////////////////////////////
const int3 pid3d = getPid3D(pid, decomp);
MPI_Request send_req, recv_req;
MPI_Isend(src_host.data, count, datatype,
getPid(pid3d + neighbor, decomp), b_idx, MPI_COMM_WORLD,
&send_req);
MPI_Irecv(dst_host.data, count, datatype,
getPid(pid3d - neighbor, decomp), b_idx, MPI_COMM_WORLD,
&recv_req);
MPI_Wait(&send_req, MPI_STATUS_IGNORE);
MPI_Wait(&recv_req, MPI_STATUS_IGNORE);
// Host ////////////////////////////////////////////////
acTransferPackedDataToDevice(dst_host, &dst);
acDeviceSynchronizeStream(device, STREAM_ALL);
acDestroyPackedDataHost(&src_host);
acDestroyPackedDataHost(&dst_host);
////////////////////////////////////////////////////////
acKernelUnpackData(stream, dst, b0, device->vba);
acDeviceSynchronizeStream(device, STREAM_DEFAULT);
acDestroyPackedData(&src);
acDestroyPackedData(&dst);
}
}
}
}
}
}
return AC_SUCCESS;
}
static AcResult
acDeviceCommunicateHalosMPI(const Device device)
{
const int3 nn = (int3){
device->local_config.int_params[AC_nx],
device->local_config.int_params[AC_ny],
device->local_config.int_params[AC_nz],
};
{ // Corners
const int3 a0s[] = {
(int3){NGHOST, NGHOST, NGHOST}, //
(int3){nn.x, NGHOST, NGHOST}, //
(int3){NGHOST, nn.y, NGHOST}, //
(int3){nn.x, nn.y, NGHOST}, //
(int3){NGHOST, NGHOST, nn.z}, //
(int3){nn.x, NGHOST, nn.z}, //
(int3){NGHOST, nn.y, nn.z}, //
(int3){nn.x, nn.y, nn.z},
};
const int3 b0s[] = {
(int3){0, 0, 0},
(int3){NGHOST + nn.x, 0, 0},
(int3){0, NGHOST + nn.y, 0},
(int3){NGHOST + nn.x, NGHOST + nn.y, 0},
(int3){0, 0, NGHOST + nn.z},
(int3){NGHOST + nn.x, 0, NGHOST + nn.z},
(int3){0, NGHOST + nn.y, NGHOST + nn.z},
(int3){NGHOST + nn.x, NGHOST + nn.y, NGHOST + nn.z},
};
const int3 dims = (int3){NGHOST, NGHOST, NGHOST};
const int mapping_count = sizeof(a0s) / sizeof(a0s[0]);
acDeviceCommunicateBlocksMPI(device, a0s, b0s, mapping_count, dims);
}
{ // Edges X
const int3 a0s[] = {
(int3){NGHOST, NGHOST, NGHOST}, //
(int3){NGHOST, nn.y, NGHOST}, //
(int3){NGHOST, NGHOST, nn.z}, //
(int3){NGHOST, nn.y, nn.z}, //
};
const int3 b0s[] = {
(int3){NGHOST, 0, 0},
(int3){NGHOST, NGHOST + nn.y, 0},
(int3){NGHOST, 0, NGHOST + nn.z},
(int3){NGHOST, NGHOST + nn.y, NGHOST + nn.z},
};
const int3 dims = (int3){nn.x, NGHOST, NGHOST};
const int mapping_count = sizeof(a0s) / sizeof(a0s[0]);
acDeviceCommunicateBlocksMPI(device, a0s, b0s, mapping_count, dims);
}
{ // Edges Y
const int3 a0s[] = {
(int3){NGHOST, NGHOST, NGHOST}, //
(int3){nn.x, NGHOST, NGHOST}, //
(int3){NGHOST, NGHOST, nn.z}, //
(int3){nn.x, NGHOST, nn.z}, //
};
const int3 b0s[] = {
(int3){0, NGHOST, 0},
(int3){NGHOST + nn.x, NGHOST, 0},
(int3){0, NGHOST, NGHOST + nn.z},
(int3){NGHOST + nn.x, NGHOST, NGHOST + nn.z},
};
const int3 dims = (int3){NGHOST, nn.y, NGHOST};
const int mapping_count = sizeof(a0s) / sizeof(a0s[0]);
acDeviceCommunicateBlocksMPI(device, a0s, b0s, mapping_count, dims);
}
{ // Edges Z
const int3 a0s[] = {
(int3){NGHOST, NGHOST, NGHOST}, //
(int3){nn.x, NGHOST, NGHOST}, //
(int3){NGHOST, nn.y, NGHOST}, //
(int3){nn.x, nn.y, NGHOST}, //
};
const int3 b0s[] = {
(int3){0, 0, NGHOST},
(int3){NGHOST + nn.x, 0, NGHOST},
(int3){0, NGHOST + nn.y, NGHOST},
(int3){NGHOST + nn.x, NGHOST + nn.y, NGHOST},
};
const int3 dims = (int3){NGHOST, NGHOST, nn.z};
const int mapping_count = sizeof(a0s) / sizeof(a0s[0]);
acDeviceCommunicateBlocksMPI(device, a0s, b0s, mapping_count, dims);
}
{ // Sides XY
const int3 a0s[] = {
(int3){NGHOST, NGHOST, NGHOST}, //
(int3){NGHOST, NGHOST, nn.z}, //
};
const int3 b0s[] = {
(int3){NGHOST, NGHOST, 0}, //
(int3){NGHOST, NGHOST, NGHOST + nn.z}, //
};
const int3 dims = (int3){nn.x, nn.y, NGHOST};
const int mapping_count = sizeof(a0s) / sizeof(a0s[0]);
acDeviceCommunicateBlocksMPI(device, a0s, b0s, mapping_count, dims);
}
{ // Sides XZ
const int3 a0s[] = {
(int3){NGHOST, NGHOST, NGHOST}, //
(int3){NGHOST, nn.y, NGHOST}, //
};
const int3 b0s[] = {
(int3){NGHOST, 0, NGHOST}, //
(int3){NGHOST, NGHOST + nn.y, NGHOST}, //
};
const int3 dims = (int3){nn.x, NGHOST, nn.z};
const int mapping_count = sizeof(a0s) / sizeof(a0s[0]);
acDeviceCommunicateBlocksMPI(device, a0s, b0s, mapping_count, dims);
}
{ // Sides YZ
const int3 a0s[] = {
(int3){NGHOST, NGHOST, NGHOST}, //
(int3){nn.x, NGHOST, NGHOST}, //
};
const int3 b0s[] = {
(int3){0, NGHOST, NGHOST}, //
(int3){NGHOST + nn.x, NGHOST, NGHOST}, //
};
const int3 dims = (int3){NGHOST, nn.y, nn.z};
const int mapping_count = sizeof(a0s) / sizeof(a0s[0]);
acDeviceCommunicateBlocksMPI(device, a0s, b0s, mapping_count, dims);
}
return AC_SUCCESS;
}
AcResult
acDeviceRunMPITest(void)
{
MPI_Init(NULL, NULL);
int nprocs, pid;
MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
MPI_Comm_rank(MPI_COMM_WORLD, &pid);
char processor_name[MPI_MAX_PROCESSOR_NAME];
int name_len;
MPI_Get_processor_name(processor_name, &name_len);
printf("Processor %s. Process %d of %d.\n", processor_name, pid, nprocs);
// Create model and candidate meshes
AcMeshInfo info;
acLoadConfig(AC_DEFAULT_CONFIG, &info);
// Some real params must be calculated (for the MHD case) // TODO DANGEROUS
info.real_params[AC_inv_dsx] = AcReal(1.0) / info.real_params[AC_dsx];
info.real_params[AC_inv_dsy] = AcReal(1.0) / info.real_params[AC_dsy];
info.real_params[AC_inv_dsz] = AcReal(1.0) / info.real_params[AC_dsz];
info.real_params[AC_cs2_sound] = info.real_params[AC_cs_sound] * info.real_params[AC_cs_sound];
AcMesh model, candidate;
// Master CPU
if (pid == 0) {
acMeshCreate(info, &model);
acMeshCreate(info, &candidate);
acMeshRandomize(&model);
acMeshRandomize(&candidate);
}
/// DECOMPOSITION & SUBMESH ///////////////////////////////////
AcMeshInfo submesh_info = info;
const int3 decomposition = decompose(nprocs);
const int3 pid3d = getPid3D(pid, decomposition);
printf("Decomposition: %d, %d, %d\n", decomposition.x, decomposition.y, decomposition.z);
printf("Process %d: (%d, %d, %d)\n", pid, pid3d.x, pid3d.y, pid3d.z);
ERRCHK_ALWAYS(info.int_params[AC_nx] % decomposition.x == 0);
ERRCHK_ALWAYS(info.int_params[AC_ny] % decomposition.y == 0);
ERRCHK_ALWAYS(info.int_params[AC_nz] % decomposition.z == 0);
submesh_info.int_params[AC_nx] = info.int_params[AC_nx] / decomposition.x;
submesh_info.int_params[AC_ny] = info.int_params[AC_ny] / decomposition.y;
submesh_info.int_params[AC_nz] = info.int_params[AC_nz] / decomposition.z;
submesh_info.int3_params[AC_global_grid_n] = (int3){
info.int_params[AC_nx],
info.int_params[AC_ny],
info.int_params[AC_nz],
};
submesh_info.int3_params[AC_multigpu_offset] = (int3){-1, -1, -1}; // TODO
WARNING("AC_multigpu_offset not yet implemented");
acUpdateConfig(&submesh_info);
ERRCHK_ALWAYS(is_valid(submesh_info.real_params[AC_inv_dsx]));
ERRCHK_ALWAYS(is_valid(submesh_info.real_params[AC_cs2_sound]));
AcMesh submesh;
acMeshCreate(submesh_info, &submesh);
acMeshRandomize(&submesh);
////////////////////////////////////////////////////////////////
// GPU INIT ////////////////////////////////////////////////////
int devices_per_node = -1;
cudaGetDeviceCount(&devices_per_node);
Device device;
acDeviceCreate(pid % devices_per_node, submesh_info, &device);
// TODO enable peer access
////////////////////////////////////////////////////////////////
// DISTRIBUTE & LOAD //////////////////////////////////////////
acDeviceDistributeMeshMPI(model, decomposition, &submesh);
acDeviceLoadMesh(device, STREAM_DEFAULT, submesh);
///////////////////////////////////////////////////////////////
// SYNC //////////////////////////////////////////////////////
acDeviceSynchronizeStream(device, STREAM_ALL);
MPI_Barrier(MPI_COMM_WORLD);
//////////////////////////////////////////////////////////////
// INTEGRATION & BOUNDCONDS////////////////////////////////////
acDeviceCommunicateHalosMPI(device);
///////////////////////////////////////////////////////////////
// STORE & GATHER /////////////////////////////////////////////
MPI_Barrier(MPI_COMM_WORLD);
acDeviceStoreMesh(device, STREAM_DEFAULT, &submesh);
acDeviceSynchronizeStream(device, STREAM_DEFAULT);
acDeviceGatherMeshMPI(submesh, decomposition, &candidate);
//////////////////////////////////////////////////////////////
// VERIFY ////////////////////////////////////////////////////
if (pid == 0) {
acMeshApplyPeriodicBounds(&model);
const bool valid = acVerifyMesh(model, candidate);
acMeshDestroy(&model);
acMeshDestroy(&candidate);
}
//////////////////////////////////////////////////////////////
// DESTROY ///////////////////////////////////////////////////
acDeviceDestroy(device);
acMeshDestroy(&submesh);
MPI_Finalize();
//////////////////////////////////////////////////////////////
return AC_SUCCESS;
}
#endif // MPI_ENABLED //////////////////////////////////////////////////////////////////////////////