From bad64f530790c9523b7ae7d4860e99248824e206 Mon Sep 17 00:00:00 2001 From: jpekkila Date: Sat, 21 Dec 2019 12:37:01 +0200 Subject: [PATCH] Started the 3D decomposition branch. Four tasks: 1) Determine how to distribute the work given n processes 2) Distribute and gather the mesh to/from these processes 3) Create packing/unpacking functions and 4) Transfer packed data blocks between neighbors. Tasks 1 and 2 done with this commit. --- src/core/device.cc | 547 +++++++++++++++++++++++++++++++-------------- 1 file changed, 375 insertions(+), 172 deletions(-) diff --git a/src/core/device.cc b/src/core/device.cc index 39e6c36..98db5f3 100644 --- a/src/core/device.cc +++ b/src/core/device.cc @@ -633,6 +633,7 @@ mod(const int a, const int b) return r < 0 ? r + b : r; } +/* static inline int get_neighbor(const int3 offset) { @@ -650,7 +651,11 @@ get_neighbor(const int3 offset) return mod(pid + offset.x, n) + offset.y * n + offset.z * n * n; } +*/ +/* +#ifdef DECOMP_1D +// Working 1D decomp distribute/gather static void acDeviceDistributeMeshMPI(const AcMesh src, AcMesh* dst) { @@ -738,6 +743,8 @@ acDeviceGatherMeshMPI(const AcMesh src, AcMesh* dst) } } } +#endif +*/ // 1D decomp static AcResult @@ -973,6 +980,160 @@ acDeviceIntegrateStepMPI(const Device device, const AcReal dt) return AC_SUCCESS; } +static int3 +decompose(const int target) +{ + int decomposition[] = {1, 1, 1}; + + int axis = 0; + while (decomposition[0] * decomposition[1] * decomposition[2] < target) { + ++decomposition[axis]; + axis = (axis + 1) % 3; + } + + const int found = decomposition[0] * decomposition[1] * decomposition[2]; + if (found != target) { + fprintf(stderr, "Invalid number of processes! Cannot decompose the problem domain!\n"); + fprintf(stderr, "Target nprocs: %d. Next allowed: %d\n", target, found); + ERROR("Invalid nprocs"); + return (int3){-1, -1, -1}; + } + else { + return (int3){decomposition[0], decomposition[1], decomposition[2]}; + } +} + +static int3 +getPid3D(const int pid, const int3 decomposition) +{ + const int3 pid3d = (int3){ + mod(pid, decomposition.x), + mod(pid / decomposition.x, decomposition.y), + (pid / (decomposition.x * decomposition.y)), + }; + return pid3d; +} + +static int +getPid1D(const int3 pid, const int3 decomposition) +{ + return mod(pid.x, decomposition.x) + // + mod(pid.y, decomposition.y) * decomposition.x + // + mod(pid.z, decomposition.z) * decomposition.x * decomposition.y; +} + +static void +acDeviceDistributeMeshMPI(const AcMesh src, const int3 decomposition, AcMesh* dst) +{ + MPI_Barrier(MPI_COMM_WORLD); + printf("Distributing mesh...\n"); + + MPI_Datatype datatype = MPI_FLOAT; + if (sizeof(AcReal) == 8) + datatype = MPI_DOUBLE; + + int pid, num_processes; + MPI_Comm_rank(MPI_COMM_WORLD, &pid); + MPI_Comm_size(MPI_COMM_WORLD, &num_processes); + + const size_t count = acVertexBufferSize(dst->info); + for (int i = 0; i < NUM_VTXBUF_HANDLES; ++i) { + + if (pid == 0) { + // Communicate to self + assert(dst); + memcpy(&dst->vertex_buffer[i][0], // + &src.vertex_buffer[i][0], // + count * sizeof(src.vertex_buffer[i][0])); + + // Communicate to others + for (int j = 1; j < num_processes; ++j) { + const int3 target_pid = getPid3D(j, decomposition); + const int3 offset = (int3){ + src.info.int_params[AC_nx] / decomposition.x, + src.info.int_params[AC_ny] / decomposition.y, + src.info.int_params[AC_nz] / decomposition.z, + }; + const size_t src_idx = acVertexBufferIdx(target_pid.x * offset.x, + target_pid.y * offset.y, + target_pid.z * offset.z, src.info); + + MPI_Send(&src.vertex_buffer[i][src_idx], count, datatype, j, 0, MPI_COMM_WORLD); + } + } + else { + assert(dst); + + // Recv + const size_t dst_idx = 0; + MPI_Status status; + MPI_Recv(&dst->vertex_buffer[i][dst_idx], count, datatype, 0, 0, MPI_COMM_WORLD, + &status); + } + } +} + +static void +acDeviceGatherMeshMPI(const AcMesh src, const int3 decomposition, AcMesh* dst) +{ + MPI_Barrier(MPI_COMM_WORLD); + printf("Gathering mesh...\n"); + MPI_Datatype datatype = MPI_FLOAT; + if (sizeof(AcReal) == 8) + datatype = MPI_DOUBLE; + + int pid, num_processes; + MPI_Comm_rank(MPI_COMM_WORLD, &pid); + MPI_Comm_size(MPI_COMM_WORLD, &num_processes); + + size_t count = acVertexBufferSize(src.info); + + for (int i = 0; i < NUM_VTXBUF_HANDLES; ++i) { + // Communicate to self + if (pid == 0) { + assert(dst); + memcpy(&dst->vertex_buffer[i][0], // + &src.vertex_buffer[i][0], // + count * sizeof(src.vertex_buffer[i][0])); + + for (int j = 1; j < num_processes; ++j) { + // Recv + // NOTE! Gathers halos also! Must be up to date! + // Not well-defined which GPU it gets the boundary from! + const int3 target_pid = getPid3D(j, decomposition); + const int3 offset = (int3){ + dst->info.int_params[AC_nx] / decomposition.x, + dst->info.int_params[AC_ny] / decomposition.y, + dst->info.int_params[AC_nz] / decomposition.z, + }; + const size_t dst_idx = acVertexBufferIdx(target_pid.x * offset.x, + target_pid.y * offset.y, + target_pid.z * offset.z, dst->info); + + assert(dst_idx + count <= acVertexBufferSize(dst->info)); + MPI_Status status; + MPI_Recv(&dst->vertex_buffer[i][dst_idx], count, datatype, j, 0, MPI_COMM_WORLD, + &status); + } + } + else { + // Send + const size_t src_idx = 0; + + assert(src_idx + count <= acVertexBufferSize(src.info)); + MPI_Send(&src.vertex_buffer[i][src_idx], count, datatype, 0, 0, MPI_COMM_WORLD); + } + } +} + +static void +acDeviceCommunicateHalos(void) +{ + // TODO + WARNING("acDeviceCommunicateHalos not yet implemented. Tests will fail (bounds must be " + "up-to-date before calling acDeviceGatherMeshMPI)"); +} + // From Astaroth Utils #include "src/utils/config_loader.h" #include "src/utils/memory.h" @@ -984,6 +1145,217 @@ acDeviceIntegrateStepMPI(const Device device, const AcReal dt) #include // --smpiargs="-gpu" +// 3D decomposition +AcResult +acDeviceRunMPITest(void) +{ + // If 1, runs the strong scaling tests and verification. + // Verification is disabled when benchmarking weak scaling because the + // whole grid is too large to fit into memory. +#define BENCH_STRONG_SCALING (1) + + MPI_Init(NULL, NULL); + + int nprocs, pid; + MPI_Comm_size(MPI_COMM_WORLD, &nprocs); + MPI_Comm_rank(MPI_COMM_WORLD, &pid); + + char processor_name[MPI_MAX_PROCESSOR_NAME]; + int name_len; + MPI_Get_processor_name(processor_name, &name_len); + printf("Processor %s. Process %d of %d.\n", processor_name, pid, nprocs); + + // Create model and candidate meshes + AcMeshInfo info; + acLoadConfig(AC_DEFAULT_CONFIG, &info); + + // Large mesh dim + const int nn = 64; + info.int_params[AC_nx] = info.int_params[AC_ny] = nn; + info.int_params[AC_nz] = BENCH_STRONG_SCALING ? nn : nn * nprocs; + info.real_params[AC_inv_dsx] = AcReal(1.0) / info.real_params[AC_dsx]; + info.real_params[AC_inv_dsy] = AcReal(1.0) / info.real_params[AC_dsy]; + info.real_params[AC_inv_dsz] = AcReal(1.0) / info.real_params[AC_dsz]; + info.real_params[AC_cs2_sound] = info.real_params[AC_cs_sound] * info.real_params[AC_cs_sound]; + acUpdateConfig(&info); + acPrintMeshInfo(info); + +#if BENCH_STRONG_SCALING + AcMesh model, candidate; + + // Master CPU + if (pid == 0) { + acMeshCreate(info, &model); + acMeshCreate(info, &candidate); + + acMeshRandomize(&model); + acMeshRandomize(&candidate); + } +#endif + + /// DECOMPOSITION + AcMeshInfo submesh_info = info; + const int3 decomposition = decompose(nprocs); + const int3 pid3d = (int3){ + pid % decomposition.x, + (pid / decomposition.x) % decomposition.y, + (pid / (decomposition.x * decomposition.y)), + }; + + printf("Decomposition: %d, %d, %d\n", decomposition.x, decomposition.y, decomposition.z); + printf("Process %d: (%d, %d, %d)\n", pid, pid3d.x, pid3d.y, pid3d.z); + ERRCHK(info.int_params[AC_nx] % decomposition.x == 0); + ERRCHK(info.int_params[AC_ny] % decomposition.y == 0); + ERRCHK(info.int_params[AC_nz] % decomposition.z == 0); + + submesh_info.int_params[AC_nx] = info.int_params[AC_nx] / decomposition.x; + submesh_info.int_params[AC_ny] = info.int_params[AC_ny] / decomposition.y; + submesh_info.int_params[AC_nz] = info.int_params[AC_nz] / decomposition.z; + submesh_info.int3_params[AC_global_grid_n] = (int3){ + info.int_params[AC_nx], + info.int_params[AC_ny], + info.int_params[AC_nz], + }; + submesh_info.int3_params[AC_multigpu_offset] = (int3){-1, -1, -1}; // TODO + WARNING("AC_multigpu_offset not yet implemented"); + acUpdateConfig(&submesh_info); + ERRCHK_ALWAYS(is_valid(submesh_info.real_params[AC_inv_dsx])); + ERRCHK_ALWAYS(is_valid(submesh_info.real_params[AC_cs2_sound])); + // + + AcMesh submesh; + acMeshCreate(submesh_info, &submesh); + acMeshRandomize(&submesh); + +#if BENCH_STRONG_SCALING + acDeviceDistributeMeshMPI(model, decomposition, &submesh); +#endif + + // Init the GPU + int devices_per_node = -1; + cudaGetDeviceCount(&devices_per_node); + + Device device; + acDeviceCreate(pid % devices_per_node, submesh_info, &device); + acDeviceLoadMesh(device, STREAM_DEFAULT, submesh); + + MPI_Barrier(MPI_COMM_WORLD); + // Attempt to enable peer access with all neighbors in the node + for (int i = 0; i < devices_per_node; ++i) { + cudaSetDevice(device->id); + WARNCHK_CUDA_ALWAYS(cudaDeviceEnablePeerAccess(i, 0)); + } + /* + // Attempt to enable peer access to the most expensive neighbors (sides) + const int left = getPid1D(pid3d + (int3){-1, 0, 0}, decomposition); + const int right = getPid1D(pid3d + (int3){1, 0, 0}, decomposition); + const int up = getPid1D(pid3d + (int3){0, 1, 0}, decomposition); + const int down = getPid1D(pid3d + (int3){0, -1, 0}, decomposition); + const int front = getPid1D(pid3d + (int3){0, 0, 1}, decomposition); + const int back = getPid1D(pid3d + (int3){0, 0, -1}, decomposition); + + cudaSetDevice(device->id); + WARNCHK_CUDA_ALWAYS(cudaDeviceEnablePeerAccess(left, 0)); + WARNCHK_CUDA_ALWAYS(cudaDeviceEnablePeerAccess(right, 0)); + WARNCHK_CUDA_ALWAYS(cudaDeviceEnablePeerAccess(up, 0)); + WARNCHK_CUDA_ALWAYS(cudaDeviceEnablePeerAccess(down, 0)); + WARNCHK_CUDA_ALWAYS(cudaDeviceEnablePeerAccess(front, 0)); + WARNCHK_CUDA_ALWAYS(cudaDeviceEnablePeerAccess(back, 0)); + */ + +// Verification start /////////////////////////////////////////////////////////////////////// +#if BENCH_STRONG_SCALING + { + // const float dt = FLT_EPSILON; // TODO + // acDeviceIntegrateStepMPI(device, dt); // TODO + // acDeviceBoundStepMPI(device); TODO + acDeviceCommunicateHalos(); + acDeviceSynchronizeStream(device, STREAM_ALL); + + acDeviceStoreMesh(device, STREAM_DEFAULT, &submesh); + acDeviceGatherMeshMPI(submesh, decomposition, &candidate); + if (pid == 0) { + // acModelIntegrateStep(model, FLT_EPSILON); // TODO + acMeshApplyPeriodicBounds(&model); + + const bool valid = acVerifyMesh(model, candidate); + acMeshDestroy(&model); + acMeshDestroy(&candidate); + + // Write out + char buf[256]; + sprintf(buf, "nprocs_%d_result_%s", nprocs, valid ? "valid" : "INVALID_CHECK_OUTPUT"); + FILE* fp = fopen(buf, "w"); + ERRCHK_ALWAYS(fp); + fclose(fp); + } + } +#endif + // Verification end /////////////////////////////////////////////////////////////////////// + + /* + // TODO + // Warmup + for (int i = 0; i < 10; ++i) + acDeviceIntegrateStepMPI(device, 0); + + acDeviceSynchronizeStream(device, STREAM_ALL); + MPI_Barrier(MPI_COMM_WORLD); + + // Benchmark start /////////////////////////////////////////////////////////////////////// + std::vector results; + results.reserve(num_iters); + + Timer total_time; + timer_reset(&total_time); + + Timer step_time; + const int num_iters = 10; + for (int i = 0; i < num_iters; ++i) { + timer_reset(&step_time); + + const AcReal dt = FLT_EPSILON; + acDeviceIntegrateStepMPI(device, dt); + acDeviceSynchronizeStream(device, STREAM_ALL); + MPI_Barrier(MPI_COMM_WORLD); + + results.push_back(timer_diff_nsec(step_time) / 1e6); + } + + const double ms_elapsed = timer_diff_nsec(total_time) / 1e6; + const double nth_percentile = 0.90; + std::sort(results.begin(), results.end(), + [](const double& a, const double& b) { return a < b; }); + + if (pid == 0) { + printf("vertices: %d^3, iterations: %d\n", nn, num_iters); + printf("Total time: %f ms\n", ms_elapsed); + printf("Time per step: %f ms\n", ms_elapsed / num_iters); + + const size_t nth_index = int(nth_percentile * num_iters); + printf("%dth percentile per step: %f ms\n", int(100 * nth_percentile), results[nth_index]); + + // Write out + char buf[256]; + sprintf(buf, "nprocs_%d_result_%s.bench", nprocs, BENCH_STRONG_SCALING ? "strong" : "weak"); + FILE* fp = fopen(buf, "w"); + ERRCHK_ALWAYS(fp); + fprintf(fp, "nprocs, percentile (%dth)\n", int(100 * nth_percentile)); + fprintf(fp, "%d, %g\n", nprocs, results[nth_index]); + fclose(fp); + } + // Benchmark end /////////////////////////////////////////////////////////////////////// + */ + + // Finalize + acDeviceDestroy(device); + acMeshDestroy(&submesh); + MPI_Finalize(); + return AC_SUCCESS; +} + +/* +// Working, 1D decomposition AcResult acDeviceRunMPITest(void) { @@ -1008,8 +1380,8 @@ acDeviceRunMPITest(void) acLoadConfig(AC_DEFAULT_CONFIG, &info); // Large mesh dim - const int nn = 512; - const int num_iters = 100; + const int nn = 256; + const int num_iters = 10; info.int_params[AC_nx] = info.int_params[AC_ny] = nn; info.int_params[AC_nz] = BENCH_STRONG_SCALING ? nn : nn * num_processes; info.real_params[AC_inv_dsx] = AcReal(1.0) / info.real_params[AC_dsx]; @@ -1159,177 +1531,8 @@ acDeviceRunMPITest(void) MPI_Finalize(); return AC_SUCCESS; } - -/* -AcResult -acDeviceRunMPITest(void) -{ - int num_processes, pid; - MPI_Init(NULL, NULL); - // int provided; - // MPI_Init_thread(NULL, NULL, MPI_THREAD_MULTIPLE, &provided); // Hybrid MP + MPI - // ERRCHK_ALWAYS(provided == MPI_THREAD_MULTIPLE); - MPI_Comm_size(MPI_COMM_WORLD, &num_processes); - MPI_Comm_rank(MPI_COMM_WORLD, &pid); - - char processor_name[MPI_MAX_PROCESSOR_NAME]; - int name_len; - MPI_Get_processor_name(processor_name, &name_len); - printf("Processor %s. Process %d of %d.\n", processor_name, pid, num_processes); - -#ifdef MPIX_CUDA_AWARE_SUPPORT - if (MPIX_Query_cuda_support()) - printf("CUDA-aware MPI supported (MPIX)\n"); - else - WARNING("CUDA-aware MPI not supported with this MPI library (MPIX)\n"); -#else - printf("MPIX_CUDA_AWARE_SUPPORT was not defined. Do not know whether CUDA-aware MPI is " - "supported\n"); -#endif - - if (getenv("MPICH_RDMA_ENABLED_CUDA") && atoi(getenv("MPICH_RDMA_ENABLED_CUDA"))) - printf("CUDA-aware MPI supported (MPICH)\n"); - else - WARNING("MPICH not used or this MPI library does not support CUDA-aware MPI\n"); - - // Create model and candidate meshes - AcMeshInfo info; - acLoadConfig(AC_DEFAULT_CONFIG, &info); - - // Large mesh dim - const int nn = 128; - const int num_iters = 1; - info.int_params[AC_nx] = info.int_params[AC_ny] = info.int_params[AC_nz] = nn; - info.real_params[AC_inv_dsx] = AcReal(1.0) / info.real_params[AC_dsx]; - info.real_params[AC_inv_dsy] = AcReal(1.0) / info.real_params[AC_dsy]; - info.real_params[AC_inv_dsz] = AcReal(1.0) / info.real_params[AC_dsz]; - info.real_params[AC_cs2_sound] = info.real_params[AC_cs_sound] * info.real_params[AC_cs_sound]; - acUpdateConfig(&info); - ERRCHK_ALWAYS(is_valid(info.real_params[AC_inv_dsx])); - ERRCHK_ALWAYS(is_valid(info.real_params[AC_cs2_sound])); - acPrintMeshInfo(info); - -#define VERIFY (1) - -#if VERIFY - AcMesh model, candidate; - - // Master CPU - if (pid == 0) { - acMeshCreate(info, &model); - acMeshCreate(info, &candidate); - - acMeshRandomize(&model); - acMeshRandomize(&candidate); - } -#endif - ERRCHK_ALWAYS(info.int_params[AC_nz] % num_processes == 0); - - /// DECOMPOSITION - AcMeshInfo submesh_info = info; - const int submesh_nz = info.int_params[AC_nz] / num_processes; - submesh_info.int_params[AC_nz] = submesh_nz; - submesh_info.int3_params[AC_global_grid_n] = (int3){ - info.int_params[AC_nx], - info.int_params[AC_ny], - info.int_params[AC_nz], - }; - submesh_info.int3_params[AC_multigpu_offset] = (int3){0, 0, pid * submesh_nz}; - acUpdateConfig(&submesh_info); - ERRCHK_ALWAYS(is_valid(submesh_info.real_params[AC_inv_dsx])); - ERRCHK_ALWAYS(is_valid(submesh_info.real_params[AC_cs2_sound])); - // - - AcMesh submesh; - acMeshCreate(submesh_info, &submesh); - acMeshRandomize(&submesh); -#if VERIFY - acDeviceDistributeMeshMPI(model, &submesh); -#endif - - //////////////////////////////////////////////////////////////////////////////////////////////// - int devices_per_node = -1; - cudaGetDeviceCount(&devices_per_node); - - Device device; - acDeviceCreate(pid % devices_per_node, submesh_info, &device); - acDeviceLoadMesh(device, STREAM_DEFAULT, submesh); - - // Warmup - for (int i = 0; i < 0; ++i) { - acDeviceIntegrateStepMPI(device, 0); - } - acDeviceSynchronizeStream(device, STREAM_ALL); - MPI_Barrier(MPI_COMM_WORLD); - - // Benchmark - std::vector results; - results.reserve(num_iters); - - Timer total_time; - timer_reset(&total_time); - - Timer step_time; - for (int i = 0; i < num_iters; ++i) { - timer_reset(&step_time); - acDeviceIntegrateStepMPI(device, FLT_EPSILON); - acDeviceSynchronizeStream(device, STREAM_ALL); - MPI_Barrier(MPI_COMM_WORLD); - results.push_back(timer_diff_nsec(step_time) / 1e6); - } - - const double ms_elapsed = timer_diff_nsec(total_time) / 1e6; - const double nth_percentile = 0.95; - std::sort(results.begin(), results.end(), - [](const double& a, const double& b) { return a < b; }); - - if (pid == 0) { - printf("vertices: %d^3, iterations: %d\n", nn, num_iters); - printf("Total time: %f ms\n", ms_elapsed); - printf("Time per step: %f ms\n", ms_elapsed / num_iters); - - const size_t nth_index = int(nth_percentile * num_iters); - printf("%dth percentile per step: %f ms\n", int(100 * nth_percentile), results[nth_index]); - - // Write out - char buf[256]; - sprintf(buf, "nprocs_%d_result.bench", num_processes); - FILE* fp = fopen(buf, "w"); - ERRCHK_ALWAYS(fp); - fprintf(fp, "num_processes, percentile (%dth)\n", int(100 * nth_percentile)); - fprintf(fp, "%d, %g\n", num_processes, results[nth_index]); - fclose(fp); - } - - ////////////////////////////// Timer end - acDeviceBoundStepMPI(device); - acDeviceStoreMesh(device, STREAM_DEFAULT, &submesh); - acDeviceDestroy(device); - //////////////////////////////////////////////////////////////////////////////////////////////// - -#if VERIFY - acDeviceGatherMeshMPI(submesh, &candidate); -#endif - acMeshDestroy(&submesh); - -#if VERIFY - // Master CPU - if (pid == 0) { - for (int i = 0; i < num_iters; ++i) { - acModelIntegrateStep(model, FLT_EPSILON); - } - acMeshApplyPeriodicBounds(&model); - - acVerifyMesh(model, candidate); - acMeshDestroy(&model); - acMeshDestroy(&candidate); - } -#endif - - MPI_Finalize(); - return AC_FAILURE; -} */ + #else AcResult acDeviceRunMPITest(void)