From 46ad9da8c88b1651aa5231f382ae0a324908d518 Mon Sep 17 00:00:00 2001 From: jpekkila Date: Tue, 15 Oct 2019 17:00:44 +0300 Subject: [PATCH] Pulled some stuff from the mpi branch --- src/mpitest/main.c | 202 +++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 193 insertions(+), 9 deletions(-) diff --git a/src/mpitest/main.c b/src/mpitest/main.c index 824d8a8..600e14a 100644 --- a/src/mpitest/main.c +++ b/src/mpitest/main.c @@ -34,27 +34,211 @@ #include "src/utils/memory.h" #include "src/utils/verification.h" +static void +distribute_mesh(const AcMesh* src, AcMesh* dst) +{ + MPI_Barrier(MPI_COMM_WORLD); + printf("Distributing mesh...\n"); + + MPI_Datatype datatype = MPI_FLOAT; + if (sizeof(AcReal) == 8) + datatype = MPI_DOUBLE; + + int pid, num_processes; + MPI_Comm_rank(MPI_COMM_WORLD, &pid); + MPI_Comm_size(MPI_COMM_WORLD, &num_processes); + + const size_t count = acVertexBufferSize(dst->info); + for (int i = 0; i < NUM_VTXBUF_HANDLES; ++i) { + + if (pid == 0) { + // Communicate to self + assert(src); + assert(dst); + memcpy(&dst->vertex_buffer[i][0], // + &src->vertex_buffer[i][0], // + count * sizeof(src->vertex_buffer[i][0])); + + // Communicate to others + for (int j = 1; j < num_processes; ++j) { + assert(src); + + const size_t src_idx = acVertexBufferIdx( + 0, 0, j * src->info.int_params[AC_nz] / num_processes, src->info); + + MPI_Send(&src->vertex_buffer[i][src_idx], count, datatype, j, 0, MPI_COMM_WORLD); + } + } + else { + assert(dst); + + // Recv + const size_t dst_idx = 0; + MPI_Status status; + MPI_Recv(&dst->vertex_buffer[i][dst_idx], count, datatype, 0, 0, MPI_COMM_WORLD, + &status); + } + } +} + +static void +gather_mesh(const AcMesh* src, AcMesh* dst) +{ + MPI_Barrier(MPI_COMM_WORLD); + printf("Gathering mesh...\n"); + MPI_Datatype datatype = MPI_FLOAT; + if (sizeof(AcReal) == 8) + datatype = MPI_DOUBLE; + + int pid, num_processes; + MPI_Comm_rank(MPI_COMM_WORLD, &pid); + MPI_Comm_size(MPI_COMM_WORLD, &num_processes); + + size_t count = acVertexBufferSize(src->info); + + for (int i = 0; i < NUM_VTXBUF_HANDLES; ++i) { + // Communicate to self + if (pid == 0) { + assert(src); + assert(dst); + memcpy(&dst->vertex_buffer[i][0], // + &src->vertex_buffer[i][0], // + count * sizeof(src->vertex_buffer[i][0])); + + for (int j = 1; j < num_processes; ++j) { + // Recv + const size_t dst_idx = acVertexBufferIdx( + 0, 0, j * dst->info.int_params[AC_nz] / num_processes, dst->info); + + assert(dst_idx + count <= acVertexBufferSize(dst->info)); + MPI_Status status; + MPI_Recv(&dst->vertex_buffer[i][dst_idx], count, datatype, j, 0, MPI_COMM_WORLD, + &status); + } + } + else { + // Send + const size_t src_idx = 0; + + assert(src_idx + count <= acVertexBufferSize(src->info)); + MPI_Send(&src->vertex_buffer[i][src_idx], count, datatype, 0, 0, MPI_COMM_WORLD); + } + } +} + +static void +communicate_halos(AcMesh* submesh) +{ + MPI_Barrier(MPI_COMM_WORLD); + printf("Communicating bounds...\n"); + MPI_Datatype datatype = MPI_FLOAT; + if (sizeof(AcReal) == 8) + datatype = MPI_DOUBLE; + + int pid, num_processes; + MPI_Comm_rank(MPI_COMM_WORLD, &pid); + MPI_Comm_size(MPI_COMM_WORLD, &num_processes); + + const size_t count = submesh->info.int_params[AC_mx] * submesh->info.int_params[AC_my] * NGHOST; + + for (int i = 0; i < NUM_VTXBUF_HANDLES; ++i) { + { // Front + // ...|ooooxxx|... -> xxx|ooooooo|... + const size_t src_idx = acVertexBufferIdx(0, 0, submesh->info.int_params[AC_nz], + submesh->info); + const size_t dst_idx = acVertexBufferIdx(0, 0, 0, submesh->info); + const int send_pid = (pid + 1) % num_processes; + const int recv_pid = (pid + num_processes - 1) % num_processes; + + MPI_Request request; + MPI_Isend(&submesh->vertex_buffer[i][src_idx], count, datatype, send_pid, i, + MPI_COMM_WORLD, &request); + fflush(stdout); + + MPI_Status status; + MPI_Recv(&submesh->vertex_buffer[i][dst_idx], count, datatype, recv_pid, i, + MPI_COMM_WORLD, &status); + + MPI_Wait(&request, &status); + } + { // Back + // ...|ooooooo|xxx <- ...|xxxoooo|... + const size_t src_idx = acVertexBufferIdx(0, 0, NGHOST, submesh->info); + const size_t dst_idx = acVertexBufferIdx(0, 0, NGHOST + submesh->info.int_params[AC_nz], + submesh->info); + const int send_pid = (pid + num_processes - 1) % num_processes; + const int recv_pid = (pid + 1) % num_processes; + + MPI_Request request; + MPI_Isend(&submesh->vertex_buffer[i][src_idx], count, datatype, send_pid, + NUM_VTXBUF_HANDLES + i, MPI_COMM_WORLD, &request); + + MPI_Status status; + MPI_Recv(&submesh->vertex_buffer[i][dst_idx], count, datatype, recv_pid, + NUM_VTXBUF_HANDLES + i, MPI_COMM_WORLD, &status); + + MPI_Wait(&request, &status); + } + } +} + int main(void) { + int num_processes, process_id; MPI_Init(NULL, NULL); + MPI_Comm_size(MPI_COMM_WORLD, &num_processes); + MPI_Comm_rank(MPI_COMM_WORLD, &process_id); + + char processor_name[MPI_MAX_PROCESSOR_NAME]; + int name_len; + MPI_Get_processor_name(processor_name, &name_len); + printf("Processor %s. Process %d of %d.\n", processor_name, process_id, num_processes); AcMeshInfo info; acLoadConfig(AC_DEFAULT_CONFIG, &info); - AcMesh model; - acMeshCreate(info, &model); + AcMesh model, candidate, submesh; - AcMesh candidate; - acMeshCreate(info, &candidate); + // Master CPU + if (process_id == 0) { + acMeshCreate(info, &model); + acMeshRandomize(&model); + acMeshCreate(info, &candidate); + } - acMeshSet(1, &model); - acMeshSet(1.00000005, &candidate); + assert(info.int_params[AC_nz] % num_processes == 0); - acVerifyMesh(model, candidate); + AcMeshInfo submesh_info = info; + submesh_info.int_params[AC_nz] /= num_processes; + acUpdateConfig(&submesh_info); + acMeshCreate(submesh_info, &submesh); + + distribute_mesh(&model, &submesh); + // communicate_halos(&submesh); + gather_mesh(&submesh, &candidate); + + acMeshDestroy(&submesh); + + // Master CPU + if (process_id == 0) { + acVerifyMesh(model, candidate); + acMeshDestroy(&model); + acMeshDestroy(&candidate); + } + + // GPU + /* + Device device; + acDeviceCreate(process_id, info, &device); + + acDeviceLoadMesh(device, STREAM_DEFAULT, model); + + acDeviceStoreMesh(device, STREAM_DEFAULT, &candidate); + acDeviceDestroy(device); + */ + // - acMeshDestroy(&model); - acMeshDestroy(&candidate); MPI_Finalize(); return EXIT_SUCCESS; }