arbeit
Main Page | Namespace List | Class Hierarchy | Alphabetical List | Compound List | File List | Namespace Members | Compound Members | File Members

MPICommunicator.cpp

Go to the documentation of this file.
00001 //
00002 //------------------------------------------------------------------------
00003 //
00004 // File   : MPICommunicator.cpp
00005 // Author : Patrick McCormick (pat@acl.lanl.gov)
00006 // 
00007 // $Date: 2003/08/04 04:57:49 $
00008 // $Revision: 1.16 $
00009 //
00010 // 
00011 // This class implements the details of an MPI based communicator 
00012 // object.  Most of the details in this class and its implementation
00013 // are taken straight from MPI. 
00014 //
00015 // TODO: At the moment we use a map to keep tabs on async message handles.
00016 // This is necessary because we had to keep tabs on more than just the 
00017 // MPI specific handle -- we need to update the KokoBuffer to reflect the
00018 // size of the message recevied...  This leads to a couple of issues in 
00019 // the current implementation:
00020 //
00021 //    * The current implementation allocates an AsyncMesgHandle class 
00022 //      per async call -- if this turns out to be an issue the easiest
00023 //      fix is to build a free list of AsyncMesgHandle objects that can
00024 //      be reused.
00025 //
00026 //    * We need to write some code to test drive the async messages.  The
00027 //      map approach has not be validated with real tests yet. 
00028 //
00029 //    * The current code uses an unsigned integer value for the message
00030 //      handle data.  It attempts to catch a wrap-around condition when
00031 //      a maximum unsigned int value is reached but perhaps it might be 
00032 //      better to consider unsigned long values instead?
00033 //
00034 //------------------------------------------------------------------------
00035 
00036 #include <iostream>
00037 #include <mpi.h>
00038 
00039 #include "MPICommunicator.h"
00040 
00041 const KokoTag KOKO_ANY_TAG             = MPI_ANY_TAG;
00042 const KokoProcID KOKO_ANY_ID           = MPI_ANY_SOURCE;
00043 const KokoMesgHandle KOKO_NULL_HANDLE  = 0;
00044 
00045 class AsyncMesgHandle {
00046   public:
00047   KokoBuffer    *_kbuf;    // The buffer that was handed to ARecv().
00048   // Handle some Windows vs. Linux crud...  This may actually be
00049   // an MPI implementation detail. 
00050 #ifdef WIN32
00051   unsigned int  _handle;
00052 #else
00053   void         *_handle;
00054 #endif
00055 };
00056 
00057 using namespace std;
00058 
00059 // -----------------------
00060 // --- MPICommunicator ---
00061 // -----------------------
00062 //
00063 //
00064 //
00065 MPICommunicator::MPICommunicator(void)
00066 {
00067   int flag;
00068 
00069   // TODO: Add error checking here! 
00070   MPI_Init(NULL, NULL);
00071   MPI_Comm_size(MPI_COMM_WORLD, &_nprocs);
00072   MPI_Comm_rank(MPI_COMM_WORLD, &_id);
00073 
00074   //cerr << "MPICommunicator: starting up...\n";
00075   MPI_Attr_get(MPI_COMM_WORLD,MPI_TAG_UB, &_maxtag, &flag);
00076   if (flag == false)
00077    cerr << "MPICommunicator::MPICommunicator() : Warning! Unable to get " 
00078         << "maximum tag value.\n";
00079    _handlecount = 0;
00080 }
00081 
00082 
00083 // ------------------------
00084 // --- ~MPICommunicator ---
00085 // ------------------------
00086 //
00087 // 
00088 //
00089 MPICommunicator::~MPICommunicator(void)
00090 {
00091   MPI_Finalize();
00092 
00093   // Clean out the active handle map.
00094   AMHIterator first = _handlemap.begin();
00095   AMHIterator last  = _handlemap.end();
00096   while(first != last) {
00097     delete (*first).second;
00098     first++;
00099   }
00100   _handlemap.clear();
00101 }
00102 
00103 
00104 // ---------------
00105 // --- Barrier ---
00106 // ---------------
00107 //
00108 //
00109 //
00110 bool MPICommunicator::Barrier(void)
00111 {
00112   int err = MPI_Barrier(MPI_COMM_WORLD);
00113 
00114   if (err != MPI_SUCCESS) {
00115     cerr << "MPICommunicator::Barrier() : Error on barrier.\n";
00116     ErrorMesg(err);
00117     return false;
00118   } else {
00119     return true;
00120   }
00121 }
00122 
00123 
00124 // ------------
00125 // --- Send ---
00126 // ------------
00127 //
00128 //
00129 //
00130 bool MPICommunicator::Send(KokoProcID id, KokoBuffer &buf, KokoTag tag)
00131 {
00132   if (tag > _maxtag) {
00133     cerr << "MPICommunicator::Send() : KokoTag value execeeds upper bound.\n";
00134     cerr << "\tKokoTags must be <= " << _maxtag << endl;
00135     return false;
00136   }
00137 
00138   if (buf.Locked()) {
00139     cerr << "MPICommunicator::Send() : KokoBuffer is locked.\n";
00140     return false;
00141   }
00142 
00143   int err = MPI_Send(buf, buf.Size(), MPI_UNSIGNED_CHAR, id, tag, 
00144                      MPI_COMM_WORLD);
00145 
00146   if (err != MPI_SUCCESS) {
00147     cerr << "MPICommunicator::Send() : Error sending message.\n";
00148     ErrorMesg(err);
00149     return false;
00150   } else {
00151     return true;
00152   }
00153 }
00154 
00155 
00156 // ------------
00157 // --- Recv ---
00158 // ------------
00159 //
00160 //
00161 //
00162 int MPICommunicator::Recv(KokoProcID id, KokoBuffer &buf, KokoTag tag)
00163 {
00164   if (tag > _maxtag) {
00165     cerr << "MPICommunicator::Recv() : KokoTag value exceeds upper bounds.\n";
00166     cerr << "\tKokoTags must be <= " << _maxtag << endl;
00167     return 0;
00168   }
00169 
00170   if (buf.Locked()) {
00171     cerr << "MPICommunicator::Recv() : KokoBuffer is locked.\n";
00172     return false;
00173   }
00174 
00175   MPI_Status status;
00176 
00177   int err = MPI_Recv(buf, buf.TotalSize(), MPI_UNSIGNED_CHAR, id, tag, 
00178                      MPI_COMM_WORLD, &status);
00179 
00180   if (err != MPI_SUCCESS) {
00181     cerr << "MPICommunicator::Recv() : Error receiving message.\n";
00182     ErrorMesg(err);
00183     return 0;
00184   } else {
00185     MPI_Get_count(&status, MPI_UNSIGNED_CHAR, &_recv_bytes);
00186     _recv_tag = status.MPI_TAG;
00187     _recv_src = status.MPI_SOURCE;
00188     buf.SetCurSize(_recv_bytes); // Update the buffer's size...
00189     return _recv_bytes;
00190   }
00191 }
00192 
00193 
00194 // ----------------
00195 // --- SendRecv ---
00196 // ----------------
00197 //
00198 //
00199 //
00200 int MPICommunicator::SendRecv(int dest, KokoBuffer &sendbuf, KokoTag stag,
00201                               int  src, KokoBuffer &recvbuf, KokoTag rtag)
00202 {
00203   if (stag > _maxtag || rtag > _maxtag) {
00204     cerr << "MPICommunicator::SendRecv() : Warning, send or recveive tag "
00205          << "exceeds upper bound.\n";
00206     cerr << "\tKokoTags must be <= " << _maxtag << endl;
00207     return 0;
00208   }
00209 
00210   if (sendbuf.Locked()) {
00211     cerr << "MPICommunicator::SendRecv() : send KokoBuffer is locked.\n";
00212     return false;
00213   }
00214 
00215   if (recvbuf.Locked()) {
00216     cerr << "MPICommunicator::SendRecv() : send KokoBuffer is locked.\n";
00217     return false;
00218   }
00219 
00220   MPI_Status status;
00221   int err = MPI_Sendrecv(sendbuf, sendbuf.Size(), MPI_UNSIGNED_CHAR, dest, 
00222                          stag, recvbuf, recvbuf.Size(), MPI_UNSIGNED_CHAR, 
00223                          src, rtag, MPI_COMM_WORLD, &status);
00224 
00225   if (err != MPI_SUCCESS) {
00226     cerr << "MPICommunicator::SendRecv() : Error sending/receiving message.\n";
00227     ErrorMesg(err);
00228     return 0;
00229   } else {
00230     MPI_Get_count(&status, MPI_UNSIGNED_CHAR, &_recv_bytes);
00231     _recv_tag = status.MPI_TAG;
00232     _recv_src = status.MPI_SOURCE;
00233     recvbuf.SetCurSize(_recv_bytes); // Update the buffer's size... 
00234     return _recv_bytes;
00235   }
00236 }
00237 
00238 // -------------
00239 // --- ASend ---
00240 // -------------
00241 //
00242 //
00243 //
00244 KokoMesgHandle MPICommunicator::ASend(KokoProcID id, KokoBuffer &buf, 
00245                                       KokoTag tag)
00246 {
00247   if (tag > _maxtag) {
00248     cerr << "MPICommunicator::ASend() : Warning, tag exceeds upper bound.\n";
00249     cerr << "\tKokoTags must be <= " << _maxtag << endl;
00250     return KOKO_NULL_HANDLE;
00251   }
00252 
00253   if (buf.Locked()) {
00254     cerr << "MPICommunicator::ASend() : send KokoBuffer is locked.\n";
00255     return false;
00256   }
00257 
00258   MPI_Request req;
00259   int err = MPI_Isend(buf, buf.Size(), MPI_UNSIGNED_CHAR, id, tag, 
00260                       MPI_COMM_WORLD, &req);
00261 
00262   if (err != MPI_SUCCESS) {
00263     cerr << "MPICommunicator::ASend() : Error sending message.\n";
00264     ErrorMesg(err);
00265     return KOKO_NULL_HANDLE;
00266   } else {
00267     AsyncMesgHandle *amhand = new AsyncMesgHandle;
00268     amhand->_handle = req;
00269     amhand->_kbuf   = &buf;
00270     _handlecount++;
00271     if (_handlecount == 0) // we've wrapped around a full uint's worth
00272       _handlecount = 1;    //   of messages... (whew!)
00273     AMHPair amhpair(_handlecount, amhand);
00274     AMHBool flag = _handlemap.insert(amhpair);
00275 
00276     buf.Lock(); // Lock the buffer down until the async operation successfully 
00277                 //  completes.
00278 
00279     if (flag.second == true) 
00280       return amhpair.first;
00281     else 
00282       return KOKO_NULL_HANDLE;
00283   }
00284 }
00285 
00286 
00287 // -------------
00288 // --- ARecv ---
00289 // -------------
00290 //
00291 //
00292 //
00293 KokoMesgHandle MPICommunicator::ARecv(KokoProcID id, KokoBuffer &buf, 
00294                                       KokoTag tag)
00295 {
00296   if (tag > _maxtag) {
00297     cerr << "MPICommunicator::ARecv() : Warning, tag exceeds upper bound.\n";
00298     cerr << "\tKokoTags must be <= " << _maxtag << endl;
00299     return KOKO_NULL_HANDLE;
00300   }
00301 
00302   if (buf.Locked()) {
00303     cerr << "MPICommunicator::ARecv() : send KokoBuffer is locked.\n";
00304     return false;
00305   }
00306 
00307   MPI_Request req;
00308   int err = MPI_Irecv(buf, buf.Size(), MPI_UNSIGNED_CHAR, id, tag, 
00309                       MPI_COMM_WORLD, &req);
00310   
00311   if (err != MPI_SUCCESS) {
00312     cerr << "MPICommunicator::ARecv() : Error receiving message.\n";
00313     ErrorMesg(err);
00314     return KOKO_NULL_HANDLE;
00315   } else {
00316     AsyncMesgHandle *amhand = new AsyncMesgHandle;
00317     amhand->_handle = req;
00318     amhand->_kbuf   = &buf;
00319     _handlecount++;
00320     if (_handlecount == 0) // we've wrapped around a full uint's worth
00321       _handlecount = 1;    //   of messages... (whew!)
00322     AMHPair amhpair(_handlecount, amhand);
00323     AMHBool flag = _handlemap.insert(amhpair);
00324 
00325     buf.Lock();  // Lock the buffer down until the async operation successfully
00326                  //   completes.
00327 
00328     if (flag.second == true)
00329       return amhpair.first;
00330     else
00331       return KOKO_NULL_HANDLE;
00332   }
00333 }
00334 
00335 
00336 // ------------
00337 // --- Wait ---
00338 // ------------
00339 //
00340 //
00341 //
00342 bool MPICommunicator::Wait(KokoMesgHandle &handle)
00343 {
00344   if (handle == KOKO_NULL_HANDLE) {
00345     cerr << "MPICommunicator::Wait() : Warning!  Invalid message handle.\n";
00346     return true;
00347   }
00348 
00349   MPI_Status status;
00350   // Danger Will Robinson!   This cast isn't great but it works... 
00351   int err = MPI_Wait((MPI_Request *)&handle, &status); 
00352 
00353   if (err != MPI_SUCCESS) {
00354     cerr << "MPICommunicator::Wait() : Error waiting...\n";
00355     ErrorMesg(err);
00356     return true;
00357   }
00358 
00359   // Set recv_bytes to the number of bytes of the completed
00360   // operation. 
00361   MPI_Get_count(&status, MPI_UNSIGNED_CHAR, &_recv_bytes);
00362 
00363   // We're done waiting for the message -- clean the handle 
00364   // out of the map. 
00365 
00366   // TODO: We might want to consider making a
00367   // free list of AsyncMesgHandle objects to reuse vs. always
00368   // using dynamic allocation. 
00369   AMHIterator it = _handlemap.find(handle);
00370   if (it != _handlemap.end()) {
00371     (*it).second->_kbuf->SetCurSize(_recv_bytes);
00372     (*it).second->_kbuf->Unlock();  // Async op. done, unlock buffer. 
00373     delete (*it).second;
00374     _handlemap.erase(it);
00375   } else {
00376     cerr << "MPICommunicator::Wait() : Warning! Message handle not found!\n";
00377     cerr << "\tMessage recevied successfully but we may have a corrupt map.\n";
00378   }
00379 
00380   handle = KOKO_NULL_HANDLE;
00381   
00382   return false;
00383 }
00384 
00385 
00386 // ------------
00387 // --- Test ---
00388 // ------------
00389 //
00390 //
00391 //
00392 bool MPICommunicator::Test(KokoMesgHandle &handle)
00393 {
00394   if (handle == KOKO_NULL_HANDLE) {
00395     cerr << "MPICommunicator::Test() : Warning, invalid message handle.\n";
00396     return false;
00397   }
00398 
00399   int        flag;
00400   MPI_Status status;
00401   // Danger Will Robinson!  This cast isn't great but it works... 
00402   int err = MPI_Test((MPI_Request *)&handle, &flag, &status); 
00403   
00404   if (err != MPI_SUCCESS) {
00405     cerr << "MPICommunicator::Test() : Error testing completion status.\n";
00406     ErrorMesg(err);
00407     return false;
00408   }
00409 
00410   if (flag) {
00411     // operation is finished, store the size of the transfer
00412     // in recv_bytes. 
00413 
00414     // TODO: Need to add a handle to the koko buffer to the 
00415     // message handle class/struct so we can update the 
00416     // buffer's size...
00417     MPI_Get_count(&status, MPI_UNSIGNED_CHAR, &_recv_bytes);
00418 
00419     // We've recevied the message -- clean the handle 
00420     // out of the map. 
00421 
00422     // TODO: We might want to consider making a
00423     // free list of AsyncMesgHandle objects to reuse vs. always
00424     // using dynamic allocation. 
00425     AMHIterator it = _handlemap.find(handle);
00426     if (it != _handlemap.end()) {
00427       (*it).second->_kbuf->SetCurSize(_recv_bytes);
00428       (*it).second->_kbuf->Unlock(); // Async op. done, unlock buffer. 
00429       delete (*it).second;
00430       _handlemap.erase(it);
00431     } else {
00432       cerr << "MPICommunicator::Test() : Warning! Message handle not found!\n";
00433       cerr << "\tMessage received but we may have a corrupt map.\n";
00434     }
00435 
00436     return true;
00437   } else {
00438     return false;
00439   }
00440 }
00441 
00442 
00443 // -----------------
00444 // --- Broadcast ---
00445 // -----------------
00446 //
00447 //
00448 //
00449 bool MPICommunicator::Broadcast(KokoBuffer &buf, KokoProcID root, KokoTag tag)
00450 {
00451 
00452   if (buf.Locked()) {
00453     cerr << "MPICommunicator::Broadcast() : KokoBuffer is locked.\n";
00454     return false;
00455   }
00456 
00457   // 
00458   // MPI doesn't support tags in broadcast.  In order to support them
00459   // we use the Tag region in the KokoBuffer object.   If we are the
00460   // root (sending) node in the broadcast we stuff the tag value into
00461   // the buffer. 
00462   //
00463   if (root == this->ID())
00464     buf.SetTag(tag);
00465 
00466   // Call MPI's broadcast function -- note that all processors must use the 
00467   // same 'root' value.   Also note that under MPI it must be the case that
00468   // all buffers have the same size on all processes (buf.size() should be
00469   // identical).
00470   int err = MPI_Bcast(buf.GetBuffer(), buf.TotalSize(), MPI_UNSIGNED_CHAR, 
00471                       root, MPI_COMM_WORLD);
00472 
00473   if (err != MPI_SUCCESS) {
00474     cerr << "MPICommunicator::Broadcast() : Error sending broadcast.\n";
00475     ErrorMesg(err);
00476     return false;
00477   } else {
00478     // If we were not the originator of the broadcast we snag the 
00479     // tag value out of the buffer. 
00480     if (this->ID() != root) {
00481       _recv_tag = buf.GetTag();
00482       if (!buf.SetCurSize(buf.TotalSize())) {
00483         cerr << "MPICommunicator::Broadcast() : Warning! "
00484              << "Can't set buffer size.\n";
00485       }
00486     }
00487     return true;
00488   }
00489 }
00490 
00491 
00492 // -----------------
00493 // --- ErrorMesg ---
00494 // -----------------
00495 //
00496 //
00497 //
00498 void MPICommunicator::ErrorMesg(int code)
00499 {
00500   char mesg[MPI_MAX_ERROR_STRING];
00501   int  len;
00502   MPI_Error_string(code, mesg, &len);
00503   cerr << "MPI Error: " << mesg << endl;
00504 }

Send questions, comments, and bug reports to:
jmk