Added functions for pinning memory that is sent over the network. TODO pack to and from pinned memory selectively (currently P2P results are overwritten with data in pinned memory)

This commit is contained in:
jpekkila
2020-04-06 14:09:12 +03:00
parent cc9d3f1b9c
commit 37f1c841a3
2 changed files with 229 additions and 0 deletions

View File

@@ -488,6 +488,20 @@ getPid3D(const int pid, const int3 decomposition)
return pid3d;
}
/** Note: assumes that contiguous pids are on the same node and there is one process per GPU. I.e.
* pids are linearly mapped i + j * dx + k * dx * dy. */
static bool
onTheSameNode(const int pid_a, const int pid_b)
{
int devices_per_node = -1;
cudaGetDeviceCount(&devices_per_node);
const int node_a = pid_a / devices_per_node;
const int node_b = pid_b / devices_per_node;
return node_a == node_b;
}
static int3
decompose(const int target)
{
@@ -530,6 +544,10 @@ acCreatePackedData(const int3 dims)
const size_t bytes = dims.x * dims.y * dims.z * sizeof(data.data[0]) * NUM_VTXBUF_HANDLES;
ERRCHK_CUDA_ALWAYS(cudaMalloc((void**)&data.data, bytes));
#if AC_MPI_RT_PINNING
ERRCHK_CUDA_ALWAYS(cudaMallocHost((void**)&data.data_pinned, bytes));
#endif // AC_MPI_RT_PINNING
#if AC_MPI_UNIDIRECTIONAL_COMM
ERRCHK_ALWAYS(MPI_Win_create(data.data, bytes, sizeof(AcReal), MPI_INFO_NULL, MPI_COMM_WORLD,
&data.win) == MPI_SUCCESS);
@@ -542,6 +560,10 @@ acCreatePackedData(const int3 dims)
static AcResult
acDestroyPackedData(PackedData* data)
{
#if AC_MPI_RT_PINNING
cudaFree(data->data_pinned);
#endif // AC_MPI_RT_PINNING
#if AC_MPI_UNIDIRECTIONAL_COMM
MPI_Win_free(&data->win);
#endif // AC_MPI_UNIDIRECTIONAL_COMM
@@ -611,6 +633,30 @@ acTransferPackedDataToDevice(const Device device, const cudaStream_t stream, con
}
#endif // MPI_GPUDIRECT_DISABLED
#if AC_MPI_RT_PINNING
static void
acPinPackedData(const Device device, const cudaStream_t stream, PackedData* packed)
{
cudaSetDevice(device->id);
const size_t bytes = packed->dims.x * packed->dims.y * packed->dims.z * sizeof(AcReal) *
NUM_VTXBUF_HANDLES;
ERRCHK_CUDA(cudaMemcpyAsync(packed->data_pinned, packed->data, bytes, cudaMemcpyDeviceToDevice,
stream));
}
static void
acUnpinPackedData(const Device device, const cudaStream_t stream, PackedData* packed)
{
cudaSetDevice(device->id);
const size_t bytes = packed->dims.x * packed->dims.y * packed->dims.z * sizeof(AcReal) *
NUM_VTXBUF_HANDLES;
ERRCHK_CUDA(cudaMemcpyAsync(packed->data, packed->data_pinned, bytes, cudaMemcpyDeviceToDevice,
stream));
}
#endif // AC_MPI_RT_PINNING
// TODO: do with packed data
static AcResult
acDeviceDistributeMeshMPI(const AcMesh src, const int3 decomposition, AcMesh* dst)
@@ -904,6 +950,24 @@ acTransferCommDataToDevice(const Device device, CommData* data)
}
#endif
#if AC_MPI_RT_PINNING
static void
acPinCommData(const Device device, CommData* data)
{
cudaSetDevice(device->id);
for (size_t i = 0; i < data->count; ++i)
acPinPackedData(device, data->streams[i], &data->srcs[i]);
}
static void
acUnpinCommData(const Device device, CommData* data)
{
cudaSetDevice(device->id);
for (size_t i = 0; i < data->count; ++i)
acUnpinPackedData(device, data->streams[i], &data->dsts[i]);
}
#endif
#if AC_MPI_UNIDIRECTIONAL_COMM
static AcResult
acTransferCommData(const Device device, //
@@ -1057,6 +1121,146 @@ acTransferCommDataWait(const CommData data)
// NOP
}
#elif AC_MPI_RT_PINNING
static AcResult
acTransferCommData(const Device device, //
const int3* a0s, // Src idx inside comp. domain
const int3* b0s, // Dst idx inside bound zone
CommData* data)
{
cudaSetDevice(device->id);
MPI_Datatype datatype = MPI_FLOAT;
if (sizeof(AcReal) == 8)
datatype = MPI_DOUBLE;
int nprocs, pid;
MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
MPI_Comm_rank(MPI_COMM_WORLD, &pid);
const int3 decomp = decompose(nprocs);
const int3 nn = (int3){
device->local_config.int_params[AC_nx],
device->local_config.int_params[AC_ny],
device->local_config.int_params[AC_nz],
};
const int3 dims = data->dims;
const size_t blockcount = data->count;
for (int k = -1; k <= 1; ++k) {
for (int j = -1; j <= 1; ++j) {
for (int i = -1; i <= 1; ++i) {
if (i == 0 && j == 0 && k == 0)
continue;
for (size_t a_idx = 0; a_idx < blockcount; ++a_idx) {
for (size_t b_idx = 0; b_idx < blockcount; ++b_idx) {
const int3 neighbor = (int3){i, j, k};
const int3 a0 = a0s[a_idx];
// const int3 a1 = a0 + dims;
const int3 b0 = a0 - neighbor * nn;
// const int3 b1 = a1 - neighbor * nn;
if (b0s[b_idx].x == b0.x && b0s[b_idx].y == b0.y && b0s[b_idx].z == b0.z) {
const size_t count = dims.x * dims.y * dims.z * NUM_VTXBUF_HANDLES;
#if MPI_GPUDIRECT_DISABLED
PackedData src = data->srcs_host[a_idx];
#else
PackedData src = data->srcs[a_idx];
#endif
#if MPI_GPUDIRECT_DISABLED
PackedData dst = data->dsts_host[b_idx];
#else
PackedData dst = data->dsts[b_idx];
#endif
const int3 pid3d = getPid3D(pid, decomp);
const int npid_recv = getPid(pid3d - neighbor, decomp);
const int npid_send = getPid(pid3d + neighbor, decomp);
if (onTheSameNode(pid, npid_recv)) {
MPI_Irecv(dst.data, count, datatype, npid_recv, b_idx,
MPI_COMM_WORLD, &data->recv_reqs[b_idx]);
}
else {
MPI_Irecv(dst.data_pinned, count, datatype, npid_recv, b_idx,
MPI_COMM_WORLD, &data->recv_reqs[b_idx]);
}
}
}
}
}
}
}
for (int k = -1; k <= 1; ++k) {
for (int j = -1; j <= 1; ++j) {
for (int i = -1; i <= 1; ++i) {
if (i == 0 && j == 0 && k == 0)
continue;
for (size_t a_idx = 0; a_idx < blockcount; ++a_idx) {
for (size_t b_idx = 0; b_idx < blockcount; ++b_idx) {
const int3 neighbor = (int3){i, j, k};
const int3 a0 = a0s[a_idx];
// const int3 a1 = a0 + dims;
const int3 b0 = a0 - neighbor * nn;
// const int3 b1 = a1 - neighbor * nn;
if (b0s[b_idx].x == b0.x && b0s[b_idx].y == b0.y && b0s[b_idx].z == b0.z) {
const size_t count = dims.x * dims.y * dims.z * NUM_VTXBUF_HANDLES;
#if MPI_GPUDIRECT_DISABLED
PackedData src = data->srcs_host[a_idx];
#else
PackedData src = data->srcs[a_idx];
#endif
#if MPI_GPUDIRECT_DISABLED
PackedData dst = data->dsts_host[b_idx];
#else
PackedData dst = data->dsts[b_idx];
#endif
const int3 pid3d = getPid3D(pid, decomp);
const int npid_recv = getPid(pid3d - neighbor, decomp);
const int npid_send = getPid(pid3d + neighbor, decomp);
cudaStreamSynchronize(data->streams[a_idx]);
if (onTheSameNode(pid, npid_send)) {
MPI_Isend(src.data, count, datatype, npid_send, b_idx,
MPI_COMM_WORLD, &data->send_reqs[b_idx]);
}
else {
MPI_Isend(src.data_pinned, count, datatype, npid_send, b_idx,
MPI_COMM_WORLD, &data->send_reqs[b_idx]);
}
}
}
}
}
}
}
return AC_SUCCESS;
}
static void
acTransferCommDataWait(const CommData data)
{
for (size_t i = 0; i < data.count; ++i) {
MPI_Wait(&data.send_reqs[i], MPI_STATUS_IGNORE);
MPI_Wait(&data.recv_reqs[i], MPI_STATUS_IGNORE);
}
}
#else
static AcResult
acTransferCommData(const Device device, //
@@ -1520,6 +1724,16 @@ acGridIntegrate(const Stream stream, const AcReal dt)
acPackCommData(device, sidexz_a0s, &sidexz_data);
acPackCommData(device, sideyz_a0s, &sideyz_data);
#if AC_MPI_RT_PINNING
acPinCommData(device, &corner_data);
acPinCommData(device, &edgex_data);
acPinCommData(device, &edgey_data);
acPinCommData(device, &edgez_data);
acPinCommData(device, &sidexy_data);
acPinCommData(device, &sidexz_data);
acPinCommData(device, &sideyz_data);
#endif // AC_MPI_RT_PINNING
//////////// INNER INTEGRATION //////////////
{
const int3 m1 = (int3){2 * NGHOST, 2 * NGHOST, 2 * NGHOST};
@@ -1566,6 +1780,16 @@ acGridIntegrate(const Stream stream, const AcReal dt)
acTransferCommDataToDevice(device, &sideyz_data);
#endif
#if AC_MPI_RT_PINNING
acUnpinCommData(device, &corner_data);
acUnpinCommData(device, &edgex_data);
acUnpinCommData(device, &edgey_data);
acUnpinCommData(device, &edgez_data);
acUnpinCommData(device, &sidexy_data);
acUnpinCommData(device, &sidexz_data);
acUnpinCommData(device, &sideyz_data);
#endif // AC_MPI_RT_PINNING
acUnpackCommData(device, corner_b0s, &corner_data);
acUnpackCommData(device, edgex_b0s, &edgex_data);
acUnpackCommData(device, edgey_b0s, &edgey_data);

View File

@@ -5,12 +5,17 @@
#include <mpi.h>
#define AC_MPI_UNIDIRECTIONAL_COMM (0)
#define AC_MPI_RT_PINNING (1)
#endif // AC_MPI_ENABLED
typedef struct {
int3 dims;
AcReal* data;
#if (AC_MPI_ENABLED && AC_MPI_RT_PINNING)
AcReal* data_pinned;
#endif // (AC_MPI_ENABLED && AC_MPI_RT_PINNING)
#if (AC_MPI_ENABLED && AC_MPI_UNIDIRECTIONAL_COMM)
MPI_Win win; // MPI window for RMA
#endif // (AC_MPI_ENABLED && AC_MPI_UNIDIRECTIONAL_COMM)