diff --git a/CMakeLists.txt b/CMakeLists.txt index c1238f8..93cc614 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -113,10 +113,17 @@ if (MPI_FOUND) set_cxx_standard(persistent) endif() +if (MPI_FOUND) + add_executable(send-recv send_recv.cpp) + target_link_libraries(send-recv MPI::MPI_CXX) + set_cxx_options(send-recv) + set_cxx_standard(send-recv) +endif() + if (MPI_FOUND AND CMAKE_CUDA_COMPILER) add_executable(one-sided-gpu one_sided_gpu.cpp) target_link_libraries(one-sided-gpu MPI::MPI_CXX) target_link_libraries(one-sided-gpu CUDA::cudart) set_cxx_options(one-sided-gpu) set_cxx_standard(one-sided-gpu) -endif() \ No newline at end of file +endif() diff --git a/send_recv.cpp b/send_recv.cpp new file mode 100644 index 0000000..5a0f477 --- /dev/null +++ b/send_recv.cpp @@ -0,0 +1,147 @@ +#include +#include +#include + +#include +#include +#include + +const float sample_target = 200e-6; + +struct Sample { + double raw; + double norm; +}; + +static Sample get_sample(int perSample, void *buf, int bytes, int rank, int other, MPI_Comm comm) { + Sample sample; + int tag = 0; + MPI_Barrier(comm); + double start = MPI_Wtime(); + for (int i = 0; i < perSample; ++i) { + if (0 == rank) { + MPI_Send(buf, bytes, MPI_BYTE, other, tag, comm); + MPI_Recv(buf, bytes, MPI_BYTE, other, tag, comm, MPI_STATUS_IGNORE); + } else if (1 == rank) { + MPI_Recv(buf, bytes, MPI_BYTE, other, tag, comm, MPI_STATUS_IGNORE); + MPI_Send(buf, bytes, MPI_BYTE, other, tag, comm); + } + } + double stop = MPI_Wtime(); + sample.raw = stop-start; + sample.norm = sample.raw / perSample; + return sample; +} + +int main(int argc, char **argv) { + // Initialize the MPI environment + MPI_Init(&argc, &argv); + + // Get the number of processes + int size, rank; + MPI_Comm_size(MPI_COMM_WORLD, &size); + MPI_Comm_rank(MPI_COMM_WORLD, &rank); + + if (size < 2) { + printf("need at least 2 ranks!\n"); + exit(1); + } + + int other = (rank + 1) % 2; + int numIters = 100; + + std::vector sweep{ + 1, + 64, + 128, + 256, + 512, + 1 * 1024, + 2 * 1024, + 4 * 1024, + 8 * 1024, + 16 * 1024, + 32 * 1024, + 64 * 1024, + 128 * 1024, + 256 * 1024, + 512 * 1024, + 1 * 1024 * 1024, + 2 * 1024 * 1024, + 4 * 1024 * 1024, + 8 * 1024 * 1024, + 16 * 1024 * 1024, + 32 * 1024 * 1024, + 64 * 1024 * 1024, + 128 * 1024 * 1024, + 256 * 1024 * 1024, + }; + + if (0 == rank) { + printf("bytes,min,max,avg,med\n"); + } + + for (size_t bytes : sweep) { + std::vector samples(numIters); + char *buf = new char[bytes]; + if (mlock(buf, bytes)) { + perror("error locking memory"); + } + + // try to reach 200us / sample + int perSample = 1; + for (int i = 0; i < 10; ++i) { + double sample = get_sample(perSample, buf, bytes, rank, other, MPI_COMM_WORLD).raw; + // estimate number of measurements per sample + int guess = sample_target / sample + /*rounding*/0.5; + // close half the distance to this estimate + perSample += (guess - perSample) * 0.5; + if (perSample < 1) perSample = 1; + MPI_Bcast(&perSample, 1, MPI_INT, 0, MPI_COMM_WORLD); + } + + if (0 == rank) { + fprintf(stderr, "sample averaged over %d iterations\n", perSample); + } + + for (int i = 0; i < numIters; ++i) { + samples[i] = get_sample(perSample, buf, bytes, rank, other, MPI_COMM_WORLD).norm; + } + + // each sample is the max time observed + MPI_Allreduce(MPI_IN_PLACE, samples.data(), numIters, MPI_DOUBLE, MPI_MAX, MPI_COMM_WORLD); + + // bubble sort + bool changed = true; + while (changed) { + changed = false; + for (int i = 0; i < numIters - 1; ++i) { + if (samples[i] > samples[i+1]) { + double tmp = samples[i+1]; + samples[i+1] = samples[i]; + samples[i] = tmp; + changed = true; + } + } + } + + // average + double avg = 0; + for (int i = 0; i < numIters; ++i) { + avg += samples[i]; + } + avg /= numIters; + + if (0 == rank) { + printf("%lu,%e,%e,%e,%e\n", bytes, samples[0], samples[numIters-1], avg, samples[numIters/2]); + } + + if (munlock(buf, bytes)) { + perror("error unlocking memory"); + } + delete[] buf; + } + + MPI_Finalize(); + return 0; +}