| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007 | // -*- C++ -*-// Copyright (C) 2004-2008 The Trustees of Indiana University.// Copyright (C) 2007  Douglas Gregor <doug.gregor@gmail.com>// Copyright (C) 2007  Matthias Troyer  <troyer@boost-consulting.com>// Use, modification and distribution is subject to the Boost Software// License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at// http://www.boost.org/LICENSE_1_0.txt)//  Authors: Douglas Gregor//           Andrew Lumsdaine//           Matthias Troyer//#define PBGL_PROCESS_GROUP_DEBUG#ifndef BOOST_GRAPH_USE_MPI#error "Parallel BGL files should not be included unless <boost/graph/use_mpi.hpp> has been included"#endif#include <boost/assert.hpp>#include <algorithm>#include <boost/graph/parallel/detail/untracked_pair.hpp>#include <numeric>#include <iterator>#include <functional>#include <vector>#include <queue>#include <stack>#include <boost/graph/distributed/detail/tag_allocator.hpp>#include <stdio.h>// #define PBGL_PROCESS_GROUP_DEBUG#ifdef PBGL_PROCESS_GROUP_DEBUG#  include <iostream>#endifnamespace boost { namespace graph { namespace distributed {struct mpi_process_group::impl{    typedef mpi_process_group::message_header message_header;  typedef mpi_process_group::outgoing_messages outgoing_messages;  /**   * Stores the incoming messages from a particular processor.   *   * @todo Evaluate whether we should use a deque instance, which   * would reduce could reduce the cost of "receiving" messages and     allow us to deallocate memory earlier, but increases the time     spent in the synchronization step.   */  struct incoming_messages {    incoming_messages();    ~incoming_messages() {}    std::vector<message_header> headers;    buffer_type                 buffer;    std::vector<std::vector<message_header>::iterator> next_header;  };  struct batch_request {    MPI_Request request;    buffer_type buffer;  };  // send once we have a certain number of messages or bytes in the buffer  // these numbers need to be tuned, we keep them small at first for testing  std::size_t batch_header_number;  std::size_t batch_buffer_size;  std::size_t batch_message_size;    /**   * The actual MPI communicator used to transmit data.   */  boost::mpi::communicator             comm;  /**   * The MPI communicator used to transmit out-of-band replies.   */  boost::mpi::communicator             oob_reply_comm;  /// Outgoing message information, indexed by destination processor.  std::vector<outgoing_messages> outgoing;  /// Incoming message information, indexed by source processor.  std::vector<incoming_messages> incoming;  /// The numbers of processors that have entered a synchronization stage  std::vector<int> processors_synchronizing_stage;    /// The synchronization stage of a processor  std::vector<int> synchronizing_stage;  /// Number of processors still sending messages  std::vector<int> synchronizing_unfinished;    /// Number of batches sent since last synchronization stage  std::vector<int> number_sent_batches;    /// Number of batches received minus number of expected batches  std::vector<int> number_received_batches;    /// The context of the currently-executing trigger, or @c trc_none  /// if no trigger is executing.  trigger_receive_context trigger_context;  /// Non-zero indicates that we're processing batches  /// Increment this when processing patches,  /// decrement it when you're done.  int processing_batches;  /**   * Contains all of the active blocks corresponding to attached   * distributed data structures.   */  blocks_type blocks;  /// Whether we are currently synchronizing  bool synchronizing;  /// The MPI requests for posted sends of oob messages  std::vector<MPI_Request> requests;    /// The MPI buffers for posted irecvs of oob messages  std::map<int,buffer_type> buffers;  /// Queue for message batches received while already processing messages  std::queue<std::pair<int,outgoing_messages> > new_batches;  /// Maximum encountered size of the new_batches queue  std::size_t max_received;  /// The MPI requests and buffers for batchess being sent  std::list<batch_request> sent_batches;  /// Maximum encountered size of the sent_batches list  std::size_t max_sent;  /// Pre-allocated requests in a pool  std::vector<batch_request> batch_pool;  /// A stack controlling which batches are available  std::stack<std::size_t> free_batches;  void free_sent_batches();    // Tag allocator  detail::tag_allocator allocated_tags;  impl(std::size_t num_headers, std::size_t buffers_size,       communicator_type parent_comm);  ~impl();  private:  void set_batch_size(std::size_t header_num, std::size_t buffer_sz);};inline trigger_receive_context mpi_process_group::trigger_context() const{  return impl_->trigger_context;}template<typename T>voidmpi_process_group::send_impl(int dest, int tag, const T& value,                             mpl::true_ /*is_mpi_datatype*/) const{  BOOST_ASSERT(tag <  msg_reserved_first || tag > msg_reserved_last);  impl::outgoing_messages& outgoing = impl_->outgoing[dest];  // Start constructing the message header  impl::message_header header;  header.source = process_id(*this);  header.tag = tag;  header.offset = outgoing.buffer.size();    boost::mpi::packed_oarchive oa(impl_->comm, outgoing.buffer);  oa << value;#ifdef PBGL_PROCESS_GROUP_DEBUG  std::cerr << "SEND: " << process_id(*this) << " -> " << dest << ", tag = "            << tag << ", bytes = " << packed_size << std::endl;#endif  // Store the header  header.bytes = outgoing.buffer.size() - header.offset;  outgoing.headers.push_back(header);  maybe_send_batch(dest);}template<typename T>voidmpi_process_group::send_impl(int dest, int tag, const T& value,                             mpl::false_ /*is_mpi_datatype*/) const{  BOOST_ASSERT(tag <  msg_reserved_first || tag > msg_reserved_last);  impl::outgoing_messages& outgoing = impl_->outgoing[dest];  // Start constructing the message header  impl::message_header header;  header.source = process_id(*this);  header.tag = tag;  header.offset = outgoing.buffer.size();  // Serialize into the buffer  boost::mpi::packed_oarchive out(impl_->comm, outgoing.buffer);  out << value;  // Store the header  header.bytes = outgoing.buffer.size() - header.offset;  outgoing.headers.push_back(header);  maybe_send_batch(dest);#ifdef PBGL_PROCESS_GROUP_DEBUG  std::cerr << "SEND: " << process_id(*this) << " -> " << dest << ", tag = "            << tag << ", bytes = " << header.bytes << std::endl;#endif}template<typename T>inline voidsend(const mpi_process_group& pg, mpi_process_group::process_id_type dest,     int tag, const T& value){  pg.send_impl(dest, pg.encode_tag(pg.my_block_number(), tag), value,               boost::mpi::is_mpi_datatype<T>());}template<typename T>typename enable_if<boost::mpi::is_mpi_datatype<T>, void>::typesend(const mpi_process_group& pg, mpi_process_group::process_id_type dest,     int tag, const T values[], std::size_t n){  pg.send_impl(dest, pg.encode_tag(pg.my_block_number(), tag),                 boost::serialization::make_array(values,n),                  boost::mpl::true_());}template<typename T>typename disable_if<boost::mpi::is_mpi_datatype<T>, void>::typempi_process_group::array_send_impl(int dest, int tag, const T values[], std::size_t n) const{  BOOST_ASSERT(tag <  msg_reserved_first || tag > msg_reserved_last);  impl::outgoing_messages& outgoing = impl_->outgoing[dest];  // Start constructing the message header  impl::message_header header;  header.source = process_id(*this);  header.tag = tag;  header.offset = outgoing.buffer.size();  // Serialize into the buffer  boost::mpi::packed_oarchive out(impl_->comm, outgoing.buffer);  out << n;  for (std::size_t i = 0; i < n; ++i)    out << values[i];  // Store the header  header.bytes = outgoing.buffer.size() - header.offset;  outgoing.headers.push_back(header);  maybe_send_batch(dest);#ifdef PBGL_PROCESS_GROUP_DEBUG  std::cerr << "SEND: " << process_id(*this) << " -> " << dest << ", tag = "            << tag << ", bytes = " << header.bytes << std::endl;#endif}template<typename T>typename disable_if<boost::mpi::is_mpi_datatype<T>, void>::typesend(const mpi_process_group& pg, mpi_process_group::process_id_type dest,     int tag, const T values[], std::size_t n){  pg.array_send_impl(dest, pg.encode_tag(pg.my_block_number(), tag),                      values, n);}template<typename InputIterator>voidsend(const mpi_process_group& pg, mpi_process_group::process_id_type dest,     int tag, InputIterator first, InputIterator last){  typedef typename std::iterator_traits<InputIterator>::value_type value_type;  std::vector<value_type> values(first, last);  if (values.empty()) send(pg, dest, tag, static_cast<value_type*>(0), 0);  else send(pg, dest, tag, &values[0], values.size());}template<typename T>boolmpi_process_group::receive_impl(int source, int tag, T& value,                                mpl::true_ /*is_mpi_datatype*/) const{#ifdef PBGL_PROCESS_GROUP_DEBUG  std::cerr << "RECV: " << process_id(*this) << " <- " << source << ", tag = "            << tag << std::endl;#endif  impl::incoming_messages& incoming = impl_->incoming[source];  // Find the next header with the right tag  std::vector<impl::message_header>::iterator header =    incoming.next_header[my_block_number()];  while (header != incoming.headers.end() && header->tag != tag) ++header;  // If no header is found, notify the caller  if (header == incoming.headers.end()) return false;  // Unpack the data  if (header->bytes > 0) {    boost::mpi::packed_iarchive ia(impl_->comm, incoming.buffer,                                    archive::no_header, header->offset);    ia >> value;  }  // Mark this message as received  header->tag = -1;  // Move the "next header" indicator to the next unreceived message  while (incoming.next_header[my_block_number()] != incoming.headers.end()         && incoming.next_header[my_block_number()]->tag == -1)    ++incoming.next_header[my_block_number()];  if (incoming.next_header[my_block_number()] == incoming.headers.end()) {    bool finished = true;    for (std::size_t i = 0; i < incoming.next_header.size() && finished; ++i) {      if (incoming.next_header[i] != incoming.headers.end()) finished = false;    }    if (finished) {      std::vector<impl::message_header> no_headers;      incoming.headers.swap(no_headers);      buffer_type empty_buffer;      incoming.buffer.swap(empty_buffer);      for (std::size_t i = 0; i < incoming.next_header.size(); ++i)        incoming.next_header[i] = incoming.headers.end();    }  }  return true;}template<typename T>boolmpi_process_group::receive_impl(int source, int tag, T& value,                                mpl::false_ /*is_mpi_datatype*/) const{  impl::incoming_messages& incoming = impl_->incoming[source];  // Find the next header with the right tag  std::vector<impl::message_header>::iterator header =    incoming.next_header[my_block_number()];  while (header != incoming.headers.end() && header->tag != tag) ++header;  // If no header is found, notify the caller  if (header == incoming.headers.end()) return false;  // Deserialize the data  boost::mpi::packed_iarchive in(impl_->comm, incoming.buffer,                                  archive::no_header, header->offset);  in >> value;  // Mark this message as received  header->tag = -1;  // Move the "next header" indicator to the next unreceived message  while (incoming.next_header[my_block_number()] != incoming.headers.end()         && incoming.next_header[my_block_number()]->tag == -1)    ++incoming.next_header[my_block_number()];  if (incoming.next_header[my_block_number()] == incoming.headers.end()) {    bool finished = true;    for (std::size_t i = 0; i < incoming.next_header.size() && finished; ++i) {      if (incoming.next_header[i] != incoming.headers.end()) finished = false;    }    if (finished) {      std::vector<impl::message_header> no_headers;      incoming.headers.swap(no_headers);      buffer_type empty_buffer;      incoming.buffer.swap(empty_buffer);      for (std::size_t i = 0; i < incoming.next_header.size(); ++i)        incoming.next_header[i] = incoming.headers.end();    }  }  return true;}template<typename T>typename disable_if<boost::mpi::is_mpi_datatype<T>, bool>::typempi_process_group::array_receive_impl(int source, int tag, T* values, std::size_t& n) const{  impl::incoming_messages& incoming = impl_->incoming[source];  // Find the next header with the right tag  std::vector<impl::message_header>::iterator header =    incoming.next_header[my_block_number()];  while (header != incoming.headers.end() && header->tag != tag) ++header;  // If no header is found, notify the caller  if (header == incoming.headers.end()) return false;  // Deserialize the data  boost::mpi::packed_iarchive in(impl_->comm, incoming.buffer,                                  archive::no_header, header->offset);  std::size_t num_sent;  in >> num_sent;  if (num_sent > n)    std::cerr << "ERROR: Have " << num_sent << " items but only space for "              << n << " items\n";  for (std::size_t i = 0; i < num_sent; ++i)    in >> values[i];  n = num_sent;  // Mark this message as received  header->tag = -1;  // Move the "next header" indicator to the next unreceived message  while (incoming.next_header[my_block_number()] != incoming.headers.end()         && incoming.next_header[my_block_number()]->tag == -1)    ++incoming.next_header[my_block_number()];  if (incoming.next_header[my_block_number()] == incoming.headers.end()) {    bool finished = true;    for (std::size_t i = 0; i < incoming.next_header.size() && finished; ++i) {      if (incoming.next_header[i] != incoming.headers.end()) finished = false;    }    if (finished) {      std::vector<impl::message_header> no_headers;      incoming.headers.swap(no_headers);      buffer_type empty_buffer;      incoming.buffer.swap(empty_buffer);      for (std::size_t i = 0; i < incoming.next_header.size(); ++i)        incoming.next_header[i] = incoming.headers.end();    }  }  return true;}// Construct triggerstemplate<typename Type, typename Handler>void mpi_process_group::trigger(int tag, const Handler& handler){  BOOST_ASSERT(block_num);  install_trigger(tag,my_block_number(),shared_ptr<trigger_base>(    new trigger_launcher<Type, Handler>(*this, tag, handler)));}template<typename Type, typename Handler>void mpi_process_group::trigger_with_reply(int tag, const Handler& handler){  BOOST_ASSERT(block_num);  install_trigger(tag,my_block_number(),shared_ptr<trigger_base>(    new reply_trigger_launcher<Type, Handler>(*this, tag, handler)));}template<typename Type, typename Handler>void mpi_process_group::global_trigger(int tag, const Handler& handler,       std::size_t sz){  if (sz==0) // normal trigger    install_trigger(tag,0,shared_ptr<trigger_base>(      new global_trigger_launcher<Type, Handler>(*this, tag, handler)));  else // trigger with irecv    install_trigger(tag,0,shared_ptr<trigger_base>(      new global_irecv_trigger_launcher<Type, Handler>(*this, tag, handler,sz)));  }namespace detail {template<typename Type>void  do_oob_receive(mpi_process_group const& self,    int source, int tag, Type& data, mpl::true_ /*is_mpi_datatype*/) {  using boost::mpi::get_mpi_datatype;  //self.impl_->comm.recv(source,tag,data);  MPI_Recv(&data, 1, get_mpi_datatype<Type>(data), source, tag, self.impl_->comm,           MPI_STATUS_IGNORE);}template<typename Type>void do_oob_receive(mpi_process_group const& self,    int source, int tag, Type& data, mpl::false_ /*is_mpi_datatype*/) {  //  self.impl_->comm.recv(source,tag,data);  // Receive the size of the data packet  boost::mpi::status status;  status = self.impl_->comm.probe(source, tag);#if BOOST_VERSION >= 103600  int size = status.count<boost::mpi::packed>().get();#else  int size;  MPI_Status& mpi_status = status;  MPI_Get_count(&mpi_status, MPI_PACKED, &size);#endif  // Receive the data packed itself  boost::mpi::packed_iarchive in(self.impl_->comm);  in.resize(size);  MPI_Recv(in.address(), size, MPI_PACKED, source, tag, self.impl_->comm,       MPI_STATUS_IGNORE);  // Deserialize the data  in >> data;}template<typename Type>void do_oob_receive(mpi_process_group const& self, int source, int tag, Type& data) {  do_oob_receive(self, source, tag, data,                           boost::mpi::is_mpi_datatype<Type>());}} // namespace detailtemplate<typename Type, typename Handler>void mpi_process_group::trigger_launcher<Type, Handler>::receive(mpi_process_group const&, int source, int tag,         trigger_receive_context context, int block) const{#ifdef PBGL_PROCESS_GROUP_DEBUG  std::cerr << (out_of_band? "OOB trigger" : "Trigger")             << " receive from source " << source << " and tag " << tag        << " in block " << (block == -1 ? self.my_block_number() : block) << std::endl;#endif  Type data;  if (context == trc_out_of_band) {    // Receive the message directly off the wire    int realtag  = self.encode_tag(      block == -1 ? self.my_block_number() : block, tag);    detail::do_oob_receive(self,source,realtag,data);  }  else    // Receive the message out of the local buffer    boost::graph::distributed::receive(self, source, tag, data);  // Pass the message off to the handler  handler(source, tag, data, context);}template<typename Type, typename Handler>void mpi_process_group::reply_trigger_launcher<Type, Handler>::receive(mpi_process_group const&, int source, int tag,         trigger_receive_context context, int block) const{#ifdef PBGL_PROCESS_GROUP_DEBUG  std::cerr << (out_of_band? "OOB reply trigger" : "Reply trigger")             << " receive from source " << source << " and tag " << tag        << " in block " << (block == -1 ? self.my_block_number() : block) << std::endl;#endif  BOOST_ASSERT(context == trc_out_of_band);  boost::parallel::detail::untracked_pair<int, Type> data;  // Receive the message directly off the wire  int realtag  = self.encode_tag(block == -1 ? self.my_block_number() : block,                                 tag);  detail::do_oob_receive(self, source, realtag, data);  // Pass the message off to the handler and send the result back to  // the source.  send_oob(self, source, data.first,            handler(source, tag, data.second, context), -2);}template<typename Type, typename Handler>void mpi_process_group::global_trigger_launcher<Type, Handler>::receive(mpi_process_group const& self, int source, int tag,         trigger_receive_context context, int block) const{#ifdef PBGL_PROCESS_GROUP_DEBUG  std::cerr << (out_of_band? "OOB trigger" : "Trigger")             << " receive from source " << source << " and tag " << tag        << " in block " << (block == -1 ? self.my_block_number() : block) << std::endl;#endif  Type data;  if (context == trc_out_of_band) {    // Receive the message directly off the wire    int realtag  = self.encode_tag(      block == -1 ? self.my_block_number() : block, tag);    detail::do_oob_receive(self,source,realtag,data);  }  else    // Receive the message out of the local buffer    boost::graph::distributed::receive(self, source, tag, data);  // Pass the message off to the handler  handler(self, source, tag, data, context);}template<typename Type, typename Handler>void mpi_process_group::global_irecv_trigger_launcher<Type, Handler>::receive(mpi_process_group const& self, int source, int tag,         trigger_receive_context context, int block) const{#ifdef PBGL_PROCESS_GROUP_DEBUG  std::cerr << (out_of_band? "OOB trigger" : "Trigger")             << " receive from source " << source << " and tag " << tag        << " in block " << (block == -1 ? self.my_block_number() : block) << std::endl;#endif  Type data;  if (context == trc_out_of_band) {    return;  }  BOOST_ASSERT (context == trc_irecv_out_of_band);  // force posting of new MPI_Irecv, even though buffer is already allocated  boost::mpi::packed_iarchive ia(self.impl_->comm,self.impl_->buffers[tag]);  ia >> data;  // Start a new receive  prepare_receive(self,tag,true);  // Pass the message off to the handler  handler(self, source, tag, data, context);}template<typename Type, typename Handler>void mpi_process_group::global_irecv_trigger_launcher<Type, Handler>::prepare_receive(mpi_process_group const& self, int tag, bool force) const{#ifdef PBGL_PROCESS_GROUP_DEBUG std::cerr << ("Posting Irecv for trigger")       << " receive with tag " << tag << std::endl;#endif  if (self.impl_->buffers.find(tag) == self.impl_->buffers.end()) {    self.impl_->buffers[tag].resize(buffer_size);    force = true;  }  BOOST_ASSERT(static_cast<int>(self.impl_->buffers[tag].size()) >= buffer_size);    //BOOST_MPL_ASSERT(mpl::not_<is_mpi_datatype<Type> >);  if (force) {    self.impl_->requests.push_back(MPI_Request());    MPI_Request* request = &self.impl_->requests.back();    MPI_Irecv(&self.impl_->buffers[tag].front(),buffer_size,               MPI_PACKED,MPI_ANY_SOURCE,tag,self.impl_->comm,request);  }}template<typename T>inline mpi_process_group::process_id_typereceive(const mpi_process_group& pg, int tag, T& value){  for (std::size_t source = 0; source < pg.impl_->incoming.size(); ++source) {    if (pg.receive_impl(source, pg.encode_tag(pg.my_block_number(), tag),                        value, boost::mpi::is_mpi_datatype<T>()))      return source;  }  BOOST_ASSERT (false);}template<typename T>typename   enable_if<boost::mpi::is_mpi_datatype<T>,             std::pair<mpi_process_group::process_id_type, std::size_t> >::typereceive(const mpi_process_group& pg, int tag, T values[], std::size_t n){  for (std::size_t source = 0; source < pg.impl_->incoming.size(); ++source) {    bool result =      pg.receive_impl(source, pg.encode_tag(pg.my_block_number(), tag),                 boost::serialization::make_array(values,n),                 boost::mpl::true_());    if (result)       return std::make_pair(source, n);  }  BOOST_ASSERT(false);}template<typename T>typename   disable_if<boost::mpi::is_mpi_datatype<T>,              std::pair<mpi_process_group::process_id_type, std::size_t> >::typereceive(const mpi_process_group& pg, int tag, T values[], std::size_t n){  for (std::size_t source = 0; source < pg.impl_->incoming.size(); ++source) {    if (pg.array_receive_impl(source, pg.encode_tag(pg.my_block_number(), tag),                              values, n))      return std::make_pair(source, n);  }  BOOST_ASSERT(false);}template<typename T>mpi_process_group::process_id_typereceive(const mpi_process_group& pg,        mpi_process_group::process_id_type source, int tag, T& value){  if (pg.receive_impl(source, pg.encode_tag(pg.my_block_number(), tag),                      value, boost::mpi::is_mpi_datatype<T>()))    return source;  else {    fprintf(stderr,            "Process %d failed to receive a message from process %d with tag %d in block %d.\n",            process_id(pg), source, tag, pg.my_block_number());    BOOST_ASSERT(false);    abort();  }}template<typename T>typename   enable_if<boost::mpi::is_mpi_datatype<T>,             std::pair<mpi_process_group::process_id_type, std::size_t> >::typereceive(const mpi_process_group& pg, int source, int tag, T values[],         std::size_t n){  if (pg.receive_impl(source, pg.encode_tag(pg.my_block_number(), tag),                      boost::serialization::make_array(values,n),                       boost::mpl::true_()))    return std::make_pair(source,n);  else {    fprintf(stderr,            "Process %d failed to receive a message from process %d with tag %d in block %d.\n",            process_id(pg), source, tag, pg.my_block_number());    BOOST_ASSERT(false);    abort();  }}template<typename T>typename   disable_if<boost::mpi::is_mpi_datatype<T>,              std::pair<mpi_process_group::process_id_type, std::size_t> >::typereceive(const mpi_process_group& pg, int source, int tag, T values[],         std::size_t n){  pg.array_receive_impl(source, pg.encode_tag(pg.my_block_number(), tag),                        values, n);  return std::make_pair(source, n);}template<typename T, typename BinaryOperation>T*all_reduce(const mpi_process_group& pg, T* first, T* last, T* out,           BinaryOperation bin_op){  synchronize(pg);  bool inplace = first == out;  if (inplace) out = new T [last-first];  boost::mpi::all_reduce(boost::mpi::communicator(communicator(pg),                                                  boost::mpi::comm_attach),                          first, last-first, out, bin_op);  if (inplace) {    std::copy(out, out + (last-first), first);    delete [] out;    return last;  }  return out;}template<typename T>voidbroadcast(const mpi_process_group& pg, T& val,           mpi_process_group::process_id_type root){  // broadcast the seed    boost::mpi::communicator comm(communicator(pg),boost::mpi::comm_attach);  boost::mpi::broadcast(comm,val,root);}template<typename T, typename BinaryOperation>T*scan(const mpi_process_group& pg, T* first, T* last, T* out,           BinaryOperation bin_op){  synchronize(pg);  bool inplace = first == out;  if (inplace) out = new T [last-first];  boost::mpi::scan(communicator(pg), first, last-first, out, bin_op);  if (inplace) {    std::copy(out, out + (last-first), first);    delete [] out;    return last;  }  return out;}template<typename InputIterator, typename T>voidall_gather(const mpi_process_group& pg, InputIterator first,           InputIterator last, std::vector<T>& out){  synchronize(pg);  // Stick a copy of the local values into a vector, so we can broadcast it  std::vector<T> local_values(first, last);  // Collect the number of vertices stored in each process  int size = local_values.size();  std::vector<int> sizes(num_processes(pg));  int result = MPI_Allgather(&size, 1, MPI_INT,                             &sizes[0], 1, MPI_INT,                             communicator(pg));  BOOST_ASSERT(result == MPI_SUCCESS);  // Adjust sizes based on the number of bytes  std::transform(sizes.begin(), sizes.end(), sizes.begin(),                 std::bind2nd(std::multiplies<int>(), sizeof(T)));  // Compute displacements  std::vector<int> displacements;  displacements.reserve(sizes.size() + 1);  displacements.push_back(0);  std::partial_sum(sizes.begin(), sizes.end(),                   std::back_inserter(displacements));  // Gather all of the values  out.resize(displacements.back() / sizeof(T));  if (!out.empty()) {    result = MPI_Allgatherv(local_values.empty()? (void*)&local_values                            /* local results */: (void*)&local_values[0],                            local_values.size() * sizeof(T),                            MPI_BYTE,                            &out[0], &sizes[0], &displacements[0], MPI_BYTE,                            communicator(pg));  }  BOOST_ASSERT(result == MPI_SUCCESS);}template<typename InputIterator>mpi_process_groupprocess_subgroup(const mpi_process_group& pg,                 InputIterator first, InputIterator last){/*  boost::mpi::group current_group = communicator(pg).group();  boost::mpi::group new_group = current_group.include(first,last);  boost::mpi::communicator new_comm(communicator(pg),new_group);  return mpi_process_group(new_comm);*/  std::vector<int> ranks(first, last);  MPI_Group current_group;  int result = MPI_Comm_group(communicator(pg), ¤t_group);  BOOST_ASSERT(result == MPI_SUCCESS);  MPI_Group new_group;  result = MPI_Group_incl(current_group, ranks.size(), &ranks[0], &new_group);  BOOST_ASSERT(result == MPI_SUCCESS);  MPI_Comm new_comm;  result = MPI_Comm_create(communicator(pg), new_group, &new_comm);  BOOST_ASSERT(result == MPI_SUCCESS);  result = MPI_Group_free(&new_group);  BOOST_ASSERT(result == MPI_SUCCESS);  result = MPI_Group_free(¤t_group);  BOOST_ASSERT(result == MPI_SUCCESS);  if (new_comm != MPI_COMM_NULL) {    mpi_process_group result_pg(boost::mpi::communicator(new_comm,boost::mpi::comm_attach));    result = MPI_Comm_free(&new_comm);    BOOST_ASSERT(result == 0);    return result_pg;  } else {    return mpi_process_group(mpi_process_group::create_empty());  }}template<typename Receiver>Receiver* mpi_process_group::get_receiver(){  return impl_->blocks[my_block_number()]->on_receive           .template target<Receiver>();}template<typename T>typename enable_if<boost::mpi::is_mpi_datatype<T> >::typereceive_oob(const mpi_process_group& pg,             mpi_process_group::process_id_type source, int tag, T& value, int block){  using boost::mpi::get_mpi_datatype;  // Determine the actual message we expect to receive, and which  // communicator it will come by.  std::pair<boost::mpi::communicator, int> actual    = pg.actual_communicator_and_tag(tag, block);  // Post a non-blocking receive that waits until we complete this request.  MPI_Request request;  MPI_Irecv(&value, 1, get_mpi_datatype<T>(value),              source, actual.second, actual.first, &request);   int done = 0;  do {    MPI_Test(&request, &done, MPI_STATUS_IGNORE);    if (!done)      pg.poll(/*wait=*/false, block);  } while (!done);}template<typename T>typename disable_if<boost::mpi::is_mpi_datatype<T> >::typereceive_oob(const mpi_process_group& pg,             mpi_process_group::process_id_type source, int tag, T& value, int block){  // Determine the actual message we expect to receive, and which  // communicator it will come by.  std::pair<boost::mpi::communicator, int> actual    = pg.actual_communicator_and_tag(tag, block);  boost::optional<boost::mpi::status> status;  do {    status = actual.first.iprobe(source, actual.second);    if (!status)      pg.poll();  } while (!status);  //actual.first.recv(status->source(), status->tag(),value);  // Allocate the receive buffer  boost::mpi::packed_iarchive in(actual.first);#if BOOST_VERSION >= 103600  in.resize(status->count<boost::mpi::packed>().get());#else  int size;  MPI_Status mpi_status = *status;  MPI_Get_count(&mpi_status, MPI_PACKED, &size);  in.resize(size);#endif    // Receive the message data  MPI_Recv(in.address(), in.size(), MPI_PACKED,           status->source(), status->tag(), actual.first, MPI_STATUS_IGNORE);    // Unpack the message data  in >> value;}template<typename SendT, typename ReplyT>typename enable_if<boost::mpi::is_mpi_datatype<ReplyT> >::typesend_oob_with_reply(const mpi_process_group& pg,                     mpi_process_group::process_id_type dest,                    int tag, const SendT& send_value, ReplyT& reply_value,                    int block){  detail::tag_allocator::token reply_tag = pg.impl_->allocated_tags.get_tag();  send_oob(pg, dest, tag, boost::parallel::detail::make_untracked_pair(        (int)reply_tag, send_value), block);  receive_oob(pg, dest, reply_tag, reply_value);}template<typename SendT, typename ReplyT>typename disable_if<boost::mpi::is_mpi_datatype<ReplyT> >::typesend_oob_with_reply(const mpi_process_group& pg,                     mpi_process_group::process_id_type dest,                    int tag, const SendT& send_value, ReplyT& reply_value,                    int block){  detail::tag_allocator::token reply_tag = pg.impl_->allocated_tags.get_tag();  send_oob(pg, dest, tag,            boost::parallel::detail::make_untracked_pair((int)reply_tag,                                                         send_value), block);  receive_oob(pg, dest, reply_tag, reply_value);}} } } // end namespace boost::graph::distributed
 |