provisional distributed spmv

This commit is contained in:
Carl William Pearson
2021-06-11 14:43:16 -06:00
parent 2f82c65bbe
commit a7c755d899
7 changed files with 184 additions and 116 deletions

View File

@@ -173,6 +173,7 @@ public:
}
void pack_x_async() {
assert(xSendBuf_.size() == xSendIdx_.size());
scatter<<<100,128, 0, packStream_>>>(xSendBuf_.view(), lx_.view(), xSendIdx_.view());
}
@@ -182,9 +183,12 @@ public:
void send_x_async() {
std::cerr << "send_x_async(): send to " << sendParams_.size() << " ranks\n";
// send to neighbors who want it
for (auto &p : sendParams_) {
int tag = 0;
assert(xSendBuf_.size() >= p.displ + p.count);
MPI_Isend(xSendBuf_.data() + p.displ, p.count, MPI_FLOAT, p.dst, tag, comm_, &p.req);
}
}
@@ -209,8 +213,10 @@ public:
CUDA_RUNTIME(cudaStreamSynchronize(kernelStream_));
}
void launch_local_spmv() {}
void launch_remote_spmv() {}
~RowPartSpmv() {
CUDA_RUNTIME(cudaStreamDestroy(kernelStream_)); kernelStream_ = 0;
CUDA_RUNTIME(cudaStreamDestroy(packStream_)); packStream_ = 0;
}
/* create from a matrix at root
*/
@@ -218,11 +224,14 @@ public:
const CsrMat<Where::host> &wholeA,
const int root,
MPI_Comm comm
) {
) : comm_(comm) {
CUDA_RUNTIME(cudaStreamCreate(&kernelStream_));
CUDA_RUNTIME(cudaStreamCreate(&packStream_));
int rank, size;
MPI_Comm_rank(comm, &rank);
MPI_Comm_size(comm, &size);
MPI_Comm_rank(comm_, &rank);
MPI_Comm_size(comm_, &size);
CsrMat<Where::host> a;
if (root == rank) {
@@ -231,27 +240,34 @@ public:
for (size_t dst = 0; dst < size; ++dst) {
if (root != dst) {
std::cerr << "send A to " << dst << "\n";
send_matrix(dst, 0, std::move(as[dst]), MPI_COMM_WORLD);
send_matrix(dst, 0, std::move(as[dst]), comm_);
}
}
a = as[rank];
} else {
std::cerr << "recv A at " << rank << "\n";
a = receive_matrix(rank, 0, MPI_COMM_WORLD);
a = receive_matrix(rank, 0, comm_);
}
// split row part of a into local and global
SplitCooMat scm = split_local_remote(a, comm);
la_ = std::move(scm.local);
ra_ = std::move(scm.remote);
assert(la_.nnz() + ra_.nnz() == a.nnz() && "lost a non-zero during split");
loff_ = scm.loff;
// create local part of x array
// undefined entries
Range xrange = get_partition(a.num_cols(), rank, size);
lx_ = Array<Where::device, float>(xrange.extent());
ly_ = Array<Where::device, float>(la_.num_rows());
// create remote part of x array
// one entry per remote column
rx_ = Array<Where::device,float>(scm.globals.size());
if (0 == rx_.size()) {
std::cerr << "WARN: not receiving anything\n";
}
// determine which columns needed from others
std::map<int, std::vector<int>> recvCols;
@@ -261,6 +277,24 @@ public:
recvCols[src].push_back(c);
}
#if 1
for (int r = 0; r < size; ++r) {
MPI_Barrier(comm_);
if (r == rank) {
std::cerr << "rank " << rank << "recvCols:\n";
for (auto it = recvCols.begin(); it != recvCols.end(); ++it) {
std::cerr << "from " << it->first << ": ";
for (auto &c : it->second) {
std::cerr << c << " ";
}
std::cerr << "\n";
}
}
MPI_Barrier(comm_);
}
#endif
// create receive parameters
int offset = 0;
for (auto it = recvCols.begin(); it != recvCols.end(); ++it) {
@@ -272,15 +306,36 @@ public:
recvParams_.push_back(param);
}
#if 1
for (int r = 0; r < size; ++r) {
MPI_Barrier(comm_);
if (r == rank) {
std::cerr << "rank " << rank << " recvParams:\n";
for (RecvParam &p : recvParams_) {
std::cerr
<< "src=" << p.src
<< " displ=" << p.displ
<< " count=" << p.count
<< "\n";
}
}
MPI_Barrier(comm_);
}
#endif
// tell others which cols I need (send 0 if nothing)
std::vector<MPI_Request> reqs(size);
for (int dest = 0; dest < size; ++dest) {
auto it = recvCols.find(dest);
if (it != recvCols.end()) {
MPI_Isend(it->second.data(), it->second.size(), MPI_INT, dest, 0, comm, &reqs[dest]);
assert(it->second.data());
MPI_Isend(it->second.data(), it->second.size(), MPI_INT, dest, 0, comm_, &reqs[dest]);
} else {
int _;
MPI_Isend(&_, 0, MPI_INT, dest, 0, comm, &reqs[dest]);
MPI_Isend(&_ /*junk*/, 0, MPI_INT, dest, 0, comm_, &reqs[dest]);
}
}
@@ -293,10 +348,10 @@ public:
MPI_Get_count(&status, MPI_INT, &count);
if (count != 0) {
sendCols[src].resize(count);
MPI_Recv(sendCols[src].data(), count, MPI_INT, src, 0, comm, MPI_STATUS_IGNORE);
MPI_Recv(sendCols[src].data(), count, MPI_INT, src, 0, comm_, MPI_STATUS_IGNORE);
} else {
int _;
MPI_Recv(&_, 0, MPI_INT, src, 0, comm, MPI_STATUS_IGNORE);
MPI_Recv(&_, 0, MPI_INT, src, 0, comm_, MPI_STATUS_IGNORE);
}
}
@@ -310,19 +365,20 @@ public:
param.dst = it->first;
for (int gc : it->second) {
int lc = gc - scm.loff;
assert(lc >= 0);
assert(lc < lx_.size());
offsets.push_back(lc);
}
param.count = offsets.size() - param.displ;
sendParams_.push_back(param);
}
// device version of offsets for packing
xSendIdx_ = offsets;
// buffer that x values will be placed into for sending
xSendBuf_.resize(xSendIdx_.size());
assert(la_.size() > 0);
assert(ra_.size() > 0); // remote A
assert(lx_.size() > 0);
assert(rx_.size() > 0);
assert(ly_.size() > 0);
}
};