Started the 3D decomposition branch. Four tasks: 1) Determine how to distribute the work given n processes 2) Distribute and gather the mesh to/from these processes 3) Create packing/unpacking functions and 4) Transfer packed data blocks between neighbors. Tasks 1 and 2 done with this commit.

This commit is contained in:
jpekkila
2019-12-21 12:37:01 +02:00
parent ecff5c3041
commit bad64f5307

View File

@@ -633,6 +633,7 @@ mod(const int a, const int b)
return r < 0 ? r + b : r; return r < 0 ? r + b : r;
} }
/*
static inline int static inline int
get_neighbor(const int3 offset) get_neighbor(const int3 offset)
{ {
@@ -650,7 +651,11 @@ get_neighbor(const int3 offset)
return mod(pid + offset.x, n) + offset.y * n + offset.z * n * n; return mod(pid + offset.x, n) + offset.y * n + offset.z * n * n;
} }
*/
/*
#ifdef DECOMP_1D
// Working 1D decomp distribute/gather
static void static void
acDeviceDistributeMeshMPI(const AcMesh src, AcMesh* dst) acDeviceDistributeMeshMPI(const AcMesh src, AcMesh* dst)
{ {
@@ -738,6 +743,8 @@ acDeviceGatherMeshMPI(const AcMesh src, AcMesh* dst)
} }
} }
} }
#endif
*/
// 1D decomp // 1D decomp
static AcResult static AcResult
@@ -973,6 +980,160 @@ acDeviceIntegrateStepMPI(const Device device, const AcReal dt)
return AC_SUCCESS; return AC_SUCCESS;
} }
static int3
decompose(const int target)
{
int decomposition[] = {1, 1, 1};
int axis = 0;
while (decomposition[0] * decomposition[1] * decomposition[2] < target) {
++decomposition[axis];
axis = (axis + 1) % 3;
}
const int found = decomposition[0] * decomposition[1] * decomposition[2];
if (found != target) {
fprintf(stderr, "Invalid number of processes! Cannot decompose the problem domain!\n");
fprintf(stderr, "Target nprocs: %d. Next allowed: %d\n", target, found);
ERROR("Invalid nprocs");
return (int3){-1, -1, -1};
}
else {
return (int3){decomposition[0], decomposition[1], decomposition[2]};
}
}
static int3
getPid3D(const int pid, const int3 decomposition)
{
const int3 pid3d = (int3){
mod(pid, decomposition.x),
mod(pid / decomposition.x, decomposition.y),
(pid / (decomposition.x * decomposition.y)),
};
return pid3d;
}
static int
getPid1D(const int3 pid, const int3 decomposition)
{
return mod(pid.x, decomposition.x) + //
mod(pid.y, decomposition.y) * decomposition.x + //
mod(pid.z, decomposition.z) * decomposition.x * decomposition.y;
}
static void
acDeviceDistributeMeshMPI(const AcMesh src, const int3 decomposition, AcMesh* dst)
{
MPI_Barrier(MPI_COMM_WORLD);
printf("Distributing mesh...\n");
MPI_Datatype datatype = MPI_FLOAT;
if (sizeof(AcReal) == 8)
datatype = MPI_DOUBLE;
int pid, num_processes;
MPI_Comm_rank(MPI_COMM_WORLD, &pid);
MPI_Comm_size(MPI_COMM_WORLD, &num_processes);
const size_t count = acVertexBufferSize(dst->info);
for (int i = 0; i < NUM_VTXBUF_HANDLES; ++i) {
if (pid == 0) {
// Communicate to self
assert(dst);
memcpy(&dst->vertex_buffer[i][0], //
&src.vertex_buffer[i][0], //
count * sizeof(src.vertex_buffer[i][0]));
// Communicate to others
for (int j = 1; j < num_processes; ++j) {
const int3 target_pid = getPid3D(j, decomposition);
const int3 offset = (int3){
src.info.int_params[AC_nx] / decomposition.x,
src.info.int_params[AC_ny] / decomposition.y,
src.info.int_params[AC_nz] / decomposition.z,
};
const size_t src_idx = acVertexBufferIdx(target_pid.x * offset.x,
target_pid.y * offset.y,
target_pid.z * offset.z, src.info);
MPI_Send(&src.vertex_buffer[i][src_idx], count, datatype, j, 0, MPI_COMM_WORLD);
}
}
else {
assert(dst);
// Recv
const size_t dst_idx = 0;
MPI_Status status;
MPI_Recv(&dst->vertex_buffer[i][dst_idx], count, datatype, 0, 0, MPI_COMM_WORLD,
&status);
}
}
}
static void
acDeviceGatherMeshMPI(const AcMesh src, const int3 decomposition, AcMesh* dst)
{
MPI_Barrier(MPI_COMM_WORLD);
printf("Gathering mesh...\n");
MPI_Datatype datatype = MPI_FLOAT;
if (sizeof(AcReal) == 8)
datatype = MPI_DOUBLE;
int pid, num_processes;
MPI_Comm_rank(MPI_COMM_WORLD, &pid);
MPI_Comm_size(MPI_COMM_WORLD, &num_processes);
size_t count = acVertexBufferSize(src.info);
for (int i = 0; i < NUM_VTXBUF_HANDLES; ++i) {
// Communicate to self
if (pid == 0) {
assert(dst);
memcpy(&dst->vertex_buffer[i][0], //
&src.vertex_buffer[i][0], //
count * sizeof(src.vertex_buffer[i][0]));
for (int j = 1; j < num_processes; ++j) {
// Recv
// NOTE! Gathers halos also! Must be up to date!
// Not well-defined which GPU it gets the boundary from!
const int3 target_pid = getPid3D(j, decomposition);
const int3 offset = (int3){
dst->info.int_params[AC_nx] / decomposition.x,
dst->info.int_params[AC_ny] / decomposition.y,
dst->info.int_params[AC_nz] / decomposition.z,
};
const size_t dst_idx = acVertexBufferIdx(target_pid.x * offset.x,
target_pid.y * offset.y,
target_pid.z * offset.z, dst->info);
assert(dst_idx + count <= acVertexBufferSize(dst->info));
MPI_Status status;
MPI_Recv(&dst->vertex_buffer[i][dst_idx], count, datatype, j, 0, MPI_COMM_WORLD,
&status);
}
}
else {
// Send
const size_t src_idx = 0;
assert(src_idx + count <= acVertexBufferSize(src.info));
MPI_Send(&src.vertex_buffer[i][src_idx], count, datatype, 0, 0, MPI_COMM_WORLD);
}
}
}
static void
acDeviceCommunicateHalos(void)
{
// TODO
WARNING("acDeviceCommunicateHalos not yet implemented. Tests will fail (bounds must be "
"up-to-date before calling acDeviceGatherMeshMPI)");
}
// From Astaroth Utils // From Astaroth Utils
#include "src/utils/config_loader.h" #include "src/utils/config_loader.h"
#include "src/utils/memory.h" #include "src/utils/memory.h"
@@ -984,6 +1145,217 @@ acDeviceIntegrateStepMPI(const Device device, const AcReal dt)
#include <vector> #include <vector>
// --smpiargs="-gpu" // --smpiargs="-gpu"
// 3D decomposition
AcResult
acDeviceRunMPITest(void)
{
// If 1, runs the strong scaling tests and verification.
// Verification is disabled when benchmarking weak scaling because the
// whole grid is too large to fit into memory.
#define BENCH_STRONG_SCALING (1)
MPI_Init(NULL, NULL);
int nprocs, pid;
MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
MPI_Comm_rank(MPI_COMM_WORLD, &pid);
char processor_name[MPI_MAX_PROCESSOR_NAME];
int name_len;
MPI_Get_processor_name(processor_name, &name_len);
printf("Processor %s. Process %d of %d.\n", processor_name, pid, nprocs);
// Create model and candidate meshes
AcMeshInfo info;
acLoadConfig(AC_DEFAULT_CONFIG, &info);
// Large mesh dim
const int nn = 64;
info.int_params[AC_nx] = info.int_params[AC_ny] = nn;
info.int_params[AC_nz] = BENCH_STRONG_SCALING ? nn : nn * nprocs;
info.real_params[AC_inv_dsx] = AcReal(1.0) / info.real_params[AC_dsx];
info.real_params[AC_inv_dsy] = AcReal(1.0) / info.real_params[AC_dsy];
info.real_params[AC_inv_dsz] = AcReal(1.0) / info.real_params[AC_dsz];
info.real_params[AC_cs2_sound] = info.real_params[AC_cs_sound] * info.real_params[AC_cs_sound];
acUpdateConfig(&info);
acPrintMeshInfo(info);
#if BENCH_STRONG_SCALING
AcMesh model, candidate;
// Master CPU
if (pid == 0) {
acMeshCreate(info, &model);
acMeshCreate(info, &candidate);
acMeshRandomize(&model);
acMeshRandomize(&candidate);
}
#endif
/// DECOMPOSITION
AcMeshInfo submesh_info = info;
const int3 decomposition = decompose(nprocs);
const int3 pid3d = (int3){
pid % decomposition.x,
(pid / decomposition.x) % decomposition.y,
(pid / (decomposition.x * decomposition.y)),
};
printf("Decomposition: %d, %d, %d\n", decomposition.x, decomposition.y, decomposition.z);
printf("Process %d: (%d, %d, %d)\n", pid, pid3d.x, pid3d.y, pid3d.z);
ERRCHK(info.int_params[AC_nx] % decomposition.x == 0);
ERRCHK(info.int_params[AC_ny] % decomposition.y == 0);
ERRCHK(info.int_params[AC_nz] % decomposition.z == 0);
submesh_info.int_params[AC_nx] = info.int_params[AC_nx] / decomposition.x;
submesh_info.int_params[AC_ny] = info.int_params[AC_ny] / decomposition.y;
submesh_info.int_params[AC_nz] = info.int_params[AC_nz] / decomposition.z;
submesh_info.int3_params[AC_global_grid_n] = (int3){
info.int_params[AC_nx],
info.int_params[AC_ny],
info.int_params[AC_nz],
};
submesh_info.int3_params[AC_multigpu_offset] = (int3){-1, -1, -1}; // TODO
WARNING("AC_multigpu_offset not yet implemented");
acUpdateConfig(&submesh_info);
ERRCHK_ALWAYS(is_valid(submesh_info.real_params[AC_inv_dsx]));
ERRCHK_ALWAYS(is_valid(submesh_info.real_params[AC_cs2_sound]));
//
AcMesh submesh;
acMeshCreate(submesh_info, &submesh);
acMeshRandomize(&submesh);
#if BENCH_STRONG_SCALING
acDeviceDistributeMeshMPI(model, decomposition, &submesh);
#endif
// Init the GPU
int devices_per_node = -1;
cudaGetDeviceCount(&devices_per_node);
Device device;
acDeviceCreate(pid % devices_per_node, submesh_info, &device);
acDeviceLoadMesh(device, STREAM_DEFAULT, submesh);
MPI_Barrier(MPI_COMM_WORLD);
// Attempt to enable peer access with all neighbors in the node
for (int i = 0; i < devices_per_node; ++i) {
cudaSetDevice(device->id);
WARNCHK_CUDA_ALWAYS(cudaDeviceEnablePeerAccess(i, 0));
}
/*
// Attempt to enable peer access to the most expensive neighbors (sides)
const int left = getPid1D(pid3d + (int3){-1, 0, 0}, decomposition);
const int right = getPid1D(pid3d + (int3){1, 0, 0}, decomposition);
const int up = getPid1D(pid3d + (int3){0, 1, 0}, decomposition);
const int down = getPid1D(pid3d + (int3){0, -1, 0}, decomposition);
const int front = getPid1D(pid3d + (int3){0, 0, 1}, decomposition);
const int back = getPid1D(pid3d + (int3){0, 0, -1}, decomposition);
cudaSetDevice(device->id);
WARNCHK_CUDA_ALWAYS(cudaDeviceEnablePeerAccess(left, 0));
WARNCHK_CUDA_ALWAYS(cudaDeviceEnablePeerAccess(right, 0));
WARNCHK_CUDA_ALWAYS(cudaDeviceEnablePeerAccess(up, 0));
WARNCHK_CUDA_ALWAYS(cudaDeviceEnablePeerAccess(down, 0));
WARNCHK_CUDA_ALWAYS(cudaDeviceEnablePeerAccess(front, 0));
WARNCHK_CUDA_ALWAYS(cudaDeviceEnablePeerAccess(back, 0));
*/
// Verification start ///////////////////////////////////////////////////////////////////////
#if BENCH_STRONG_SCALING
{
// const float dt = FLT_EPSILON; // TODO
// acDeviceIntegrateStepMPI(device, dt); // TODO
// acDeviceBoundStepMPI(device); TODO
acDeviceCommunicateHalos();
acDeviceSynchronizeStream(device, STREAM_ALL);
acDeviceStoreMesh(device, STREAM_DEFAULT, &submesh);
acDeviceGatherMeshMPI(submesh, decomposition, &candidate);
if (pid == 0) {
// acModelIntegrateStep(model, FLT_EPSILON); // TODO
acMeshApplyPeriodicBounds(&model);
const bool valid = acVerifyMesh(model, candidate);
acMeshDestroy(&model);
acMeshDestroy(&candidate);
// Write out
char buf[256];
sprintf(buf, "nprocs_%d_result_%s", nprocs, valid ? "valid" : "INVALID_CHECK_OUTPUT");
FILE* fp = fopen(buf, "w");
ERRCHK_ALWAYS(fp);
fclose(fp);
}
}
#endif
// Verification end ///////////////////////////////////////////////////////////////////////
/*
// TODO
// Warmup
for (int i = 0; i < 10; ++i)
acDeviceIntegrateStepMPI(device, 0);
acDeviceSynchronizeStream(device, STREAM_ALL);
MPI_Barrier(MPI_COMM_WORLD);
// Benchmark start ///////////////////////////////////////////////////////////////////////
std::vector<double> results;
results.reserve(num_iters);
Timer total_time;
timer_reset(&total_time);
Timer step_time;
const int num_iters = 10;
for (int i = 0; i < num_iters; ++i) {
timer_reset(&step_time);
const AcReal dt = FLT_EPSILON;
acDeviceIntegrateStepMPI(device, dt);
acDeviceSynchronizeStream(device, STREAM_ALL);
MPI_Barrier(MPI_COMM_WORLD);
results.push_back(timer_diff_nsec(step_time) / 1e6);
}
const double ms_elapsed = timer_diff_nsec(total_time) / 1e6;
const double nth_percentile = 0.90;
std::sort(results.begin(), results.end(),
[](const double& a, const double& b) { return a < b; });
if (pid == 0) {
printf("vertices: %d^3, iterations: %d\n", nn, num_iters);
printf("Total time: %f ms\n", ms_elapsed);
printf("Time per step: %f ms\n", ms_elapsed / num_iters);
const size_t nth_index = int(nth_percentile * num_iters);
printf("%dth percentile per step: %f ms\n", int(100 * nth_percentile), results[nth_index]);
// Write out
char buf[256];
sprintf(buf, "nprocs_%d_result_%s.bench", nprocs, BENCH_STRONG_SCALING ? "strong" : "weak");
FILE* fp = fopen(buf, "w");
ERRCHK_ALWAYS(fp);
fprintf(fp, "nprocs, percentile (%dth)\n", int(100 * nth_percentile));
fprintf(fp, "%d, %g\n", nprocs, results[nth_index]);
fclose(fp);
}
// Benchmark end ///////////////////////////////////////////////////////////////////////
*/
// Finalize
acDeviceDestroy(device);
acMeshDestroy(&submesh);
MPI_Finalize();
return AC_SUCCESS;
}
/*
// Working, 1D decomposition
AcResult AcResult
acDeviceRunMPITest(void) acDeviceRunMPITest(void)
{ {
@@ -1008,8 +1380,8 @@ acDeviceRunMPITest(void)
acLoadConfig(AC_DEFAULT_CONFIG, &info); acLoadConfig(AC_DEFAULT_CONFIG, &info);
// Large mesh dim // Large mesh dim
const int nn = 512; const int nn = 256;
const int num_iters = 100; const int num_iters = 10;
info.int_params[AC_nx] = info.int_params[AC_ny] = nn; info.int_params[AC_nx] = info.int_params[AC_ny] = nn;
info.int_params[AC_nz] = BENCH_STRONG_SCALING ? nn : nn * num_processes; info.int_params[AC_nz] = BENCH_STRONG_SCALING ? nn : nn * num_processes;
info.real_params[AC_inv_dsx] = AcReal(1.0) / info.real_params[AC_dsx]; info.real_params[AC_inv_dsx] = AcReal(1.0) / info.real_params[AC_dsx];
@@ -1159,177 +1531,8 @@ acDeviceRunMPITest(void)
MPI_Finalize(); MPI_Finalize();
return AC_SUCCESS; return AC_SUCCESS;
} }
/*
AcResult
acDeviceRunMPITest(void)
{
int num_processes, pid;
MPI_Init(NULL, NULL);
// int provided;
// MPI_Init_thread(NULL, NULL, MPI_THREAD_MULTIPLE, &provided); // Hybrid MP + MPI
// ERRCHK_ALWAYS(provided == MPI_THREAD_MULTIPLE);
MPI_Comm_size(MPI_COMM_WORLD, &num_processes);
MPI_Comm_rank(MPI_COMM_WORLD, &pid);
char processor_name[MPI_MAX_PROCESSOR_NAME];
int name_len;
MPI_Get_processor_name(processor_name, &name_len);
printf("Processor %s. Process %d of %d.\n", processor_name, pid, num_processes);
#ifdef MPIX_CUDA_AWARE_SUPPORT
if (MPIX_Query_cuda_support())
printf("CUDA-aware MPI supported (MPIX)\n");
else
WARNING("CUDA-aware MPI not supported with this MPI library (MPIX)\n");
#else
printf("MPIX_CUDA_AWARE_SUPPORT was not defined. Do not know whether CUDA-aware MPI is "
"supported\n");
#endif
if (getenv("MPICH_RDMA_ENABLED_CUDA") && atoi(getenv("MPICH_RDMA_ENABLED_CUDA")))
printf("CUDA-aware MPI supported (MPICH)\n");
else
WARNING("MPICH not used or this MPI library does not support CUDA-aware MPI\n");
// Create model and candidate meshes
AcMeshInfo info;
acLoadConfig(AC_DEFAULT_CONFIG, &info);
// Large mesh dim
const int nn = 128;
const int num_iters = 1;
info.int_params[AC_nx] = info.int_params[AC_ny] = info.int_params[AC_nz] = nn;
info.real_params[AC_inv_dsx] = AcReal(1.0) / info.real_params[AC_dsx];
info.real_params[AC_inv_dsy] = AcReal(1.0) / info.real_params[AC_dsy];
info.real_params[AC_inv_dsz] = AcReal(1.0) / info.real_params[AC_dsz];
info.real_params[AC_cs2_sound] = info.real_params[AC_cs_sound] * info.real_params[AC_cs_sound];
acUpdateConfig(&info);
ERRCHK_ALWAYS(is_valid(info.real_params[AC_inv_dsx]));
ERRCHK_ALWAYS(is_valid(info.real_params[AC_cs2_sound]));
acPrintMeshInfo(info);
#define VERIFY (1)
#if VERIFY
AcMesh model, candidate;
// Master CPU
if (pid == 0) {
acMeshCreate(info, &model);
acMeshCreate(info, &candidate);
acMeshRandomize(&model);
acMeshRandomize(&candidate);
}
#endif
ERRCHK_ALWAYS(info.int_params[AC_nz] % num_processes == 0);
/// DECOMPOSITION
AcMeshInfo submesh_info = info;
const int submesh_nz = info.int_params[AC_nz] / num_processes;
submesh_info.int_params[AC_nz] = submesh_nz;
submesh_info.int3_params[AC_global_grid_n] = (int3){
info.int_params[AC_nx],
info.int_params[AC_ny],
info.int_params[AC_nz],
};
submesh_info.int3_params[AC_multigpu_offset] = (int3){0, 0, pid * submesh_nz};
acUpdateConfig(&submesh_info);
ERRCHK_ALWAYS(is_valid(submesh_info.real_params[AC_inv_dsx]));
ERRCHK_ALWAYS(is_valid(submesh_info.real_params[AC_cs2_sound]));
//
AcMesh submesh;
acMeshCreate(submesh_info, &submesh);
acMeshRandomize(&submesh);
#if VERIFY
acDeviceDistributeMeshMPI(model, &submesh);
#endif
////////////////////////////////////////////////////////////////////////////////////////////////
int devices_per_node = -1;
cudaGetDeviceCount(&devices_per_node);
Device device;
acDeviceCreate(pid % devices_per_node, submesh_info, &device);
acDeviceLoadMesh(device, STREAM_DEFAULT, submesh);
// Warmup
for (int i = 0; i < 0; ++i) {
acDeviceIntegrateStepMPI(device, 0);
}
acDeviceSynchronizeStream(device, STREAM_ALL);
MPI_Barrier(MPI_COMM_WORLD);
// Benchmark
std::vector<double> results;
results.reserve(num_iters);
Timer total_time;
timer_reset(&total_time);
Timer step_time;
for (int i = 0; i < num_iters; ++i) {
timer_reset(&step_time);
acDeviceIntegrateStepMPI(device, FLT_EPSILON);
acDeviceSynchronizeStream(device, STREAM_ALL);
MPI_Barrier(MPI_COMM_WORLD);
results.push_back(timer_diff_nsec(step_time) / 1e6);
}
const double ms_elapsed = timer_diff_nsec(total_time) / 1e6;
const double nth_percentile = 0.95;
std::sort(results.begin(), results.end(),
[](const double& a, const double& b) { return a < b; });
if (pid == 0) {
printf("vertices: %d^3, iterations: %d\n", nn, num_iters);
printf("Total time: %f ms\n", ms_elapsed);
printf("Time per step: %f ms\n", ms_elapsed / num_iters);
const size_t nth_index = int(nth_percentile * num_iters);
printf("%dth percentile per step: %f ms\n", int(100 * nth_percentile), results[nth_index]);
// Write out
char buf[256];
sprintf(buf, "nprocs_%d_result.bench", num_processes);
FILE* fp = fopen(buf, "w");
ERRCHK_ALWAYS(fp);
fprintf(fp, "num_processes, percentile (%dth)\n", int(100 * nth_percentile));
fprintf(fp, "%d, %g\n", num_processes, results[nth_index]);
fclose(fp);
}
////////////////////////////// Timer end
acDeviceBoundStepMPI(device);
acDeviceStoreMesh(device, STREAM_DEFAULT, &submesh);
acDeviceDestroy(device);
////////////////////////////////////////////////////////////////////////////////////////////////
#if VERIFY
acDeviceGatherMeshMPI(submesh, &candidate);
#endif
acMeshDestroy(&submesh);
#if VERIFY
// Master CPU
if (pid == 0) {
for (int i = 0; i < num_iters; ++i) {
acModelIntegrateStep(model, FLT_EPSILON);
}
acMeshApplyPeriodicBounds(&model);
acVerifyMesh(model, candidate);
acMeshDestroy(&model);
acMeshDestroy(&candidate);
}
#endif
MPI_Finalize();
return AC_FAILURE;
}
*/ */
#else #else
AcResult AcResult
acDeviceRunMPITest(void) acDeviceRunMPITest(void)