| // -*- C++ -*- |
| |
| // Copyright (C) 2004-2008 The Trustees of Indiana University. |
| // Copyright (C) 2007 Douglas Gregor <doug.gregor@gmail.com> |
| // Copyright (C) 2007 Matthias Troyer <troyer@boost-consulting.com> |
| |
| // Use, modification and distribution is subject to the Boost Software |
| // License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at |
| // http://www.boost.org/LICENSE_1_0.txt) |
| |
| // Authors: Douglas Gregor |
| // Andrew Lumsdaine |
| // Matthias Troyer |
| |
| //#define PBGL_PROCESS_GROUP_DEBUG |
| |
| #ifndef BOOST_GRAPH_USE_MPI |
| #error "Parallel BGL files should not be included unless <boost/graph/use_mpi.hpp> has been included" |
| #endif |
| |
| #include <cassert> |
| #include <algorithm> |
| #include <boost/graph/parallel/detail/untracked_pair.hpp> |
| #include <numeric> |
| #include <iterator> |
| #include <functional> |
| #include <vector> |
| #include <queue> |
| #include <stack> |
| #include <boost/graph/distributed/detail/tag_allocator.hpp> |
| #include <stdio.h> |
| |
| // #define PBGL_PROCESS_GROUP_DEBUG |
| |
| #ifdef PBGL_PROCESS_GROUP_DEBUG |
| # include <iostream> |
| #endif |
| |
| namespace boost { namespace graph { namespace distributed { |
| |
| struct mpi_process_group::impl |
| { |
| |
| typedef mpi_process_group::message_header message_header; |
| typedef mpi_process_group::outgoing_messages outgoing_messages; |
| |
| /** |
| * Stores the incoming messages from a particular processor. |
| * |
| * @todo Evaluate whether we should use a deque instance, which |
| * would reduce could reduce the cost of "receiving" messages and |
| allow us to deallocate memory earlier, but increases the time |
| spent in the synchronization step. |
| */ |
| struct incoming_messages { |
| incoming_messages(); |
| ~incoming_messages() {} |
| |
| std::vector<message_header> headers; |
| buffer_type buffer; |
| std::vector<std::vector<message_header>::iterator> next_header; |
| }; |
| |
| struct batch_request { |
| MPI_Request request; |
| buffer_type buffer; |
| }; |
| |
| // send once we have a certain number of messages or bytes in the buffer |
| // these numbers need to be tuned, we keep them small at first for testing |
| std::size_t batch_header_number; |
| std::size_t batch_buffer_size; |
| std::size_t batch_message_size; |
| |
| /** |
| * The actual MPI communicator used to transmit data. |
| */ |
| boost::mpi::communicator comm; |
| |
| /** |
| * The MPI communicator used to transmit out-of-band replies. |
| */ |
| boost::mpi::communicator oob_reply_comm; |
| |
| /// Outgoing message information, indexed by destination processor. |
| std::vector<outgoing_messages> outgoing; |
| |
| /// Incoming message information, indexed by source processor. |
| std::vector<incoming_messages> incoming; |
| |
| /// The numbers of processors that have entered a synchronization stage |
| std::vector<int> processors_synchronizing_stage; |
| |
| /// The synchronization stage of a processor |
| std::vector<int> synchronizing_stage; |
| |
| /// Number of processors still sending messages |
| std::vector<int> synchronizing_unfinished; |
| |
| /// Number of batches sent since last synchronization stage |
| std::vector<int> number_sent_batches; |
| |
| /// Number of batches received minus number of expected batches |
| std::vector<int> number_received_batches; |
| |
| |
| /// The context of the currently-executing trigger, or @c trc_none |
| /// if no trigger is executing. |
| trigger_receive_context trigger_context; |
| |
| /// Non-zero indicates that we're processing batches |
| /// Increment this when processing patches, |
| /// decrement it when you're done. |
| int processing_batches; |
| |
| /** |
| * Contains all of the active blocks corresponding to attached |
| * distributed data structures. |
| */ |
| blocks_type blocks; |
| |
| /// Whether we are currently synchronizing |
| bool synchronizing; |
| |
| /// The MPI requests for posted sends of oob messages |
| std::vector<MPI_Request> requests; |
| |
| /// The MPI buffers for posted irecvs of oob messages |
| std::map<int,buffer_type> buffers; |
| |
| /// Queue for message batches received while already processing messages |
| std::queue<std::pair<int,outgoing_messages> > new_batches; |
| /// Maximum encountered size of the new_batches queue |
| std::size_t max_received; |
| |
| /// The MPI requests and buffers for batchess being sent |
| std::list<batch_request> sent_batches; |
| /// Maximum encountered size of the sent_batches list |
| std::size_t max_sent; |
| |
| /// Pre-allocated requests in a pool |
| std::vector<batch_request> batch_pool; |
| /// A stack controlling which batches are available |
| std::stack<std::size_t> free_batches; |
| |
| void free_sent_batches(); |
| |
| // Tag allocator |
| detail::tag_allocator allocated_tags; |
| |
| impl(std::size_t num_headers, std::size_t buffers_size, |
| communicator_type parent_comm); |
| ~impl(); |
| |
| private: |
| void set_batch_size(std::size_t header_num, std::size_t buffer_sz); |
| }; |
| |
| inline trigger_receive_context mpi_process_group::trigger_context() const |
| { |
| return impl_->trigger_context; |
| } |
| |
| template<typename T> |
| void |
| mpi_process_group::send_impl(int dest, int tag, const T& value, |
| mpl::true_ /*is_mpi_datatype*/) const |
| { |
| assert(tag < msg_reserved_first || tag > msg_reserved_last); |
| |
| impl::outgoing_messages& outgoing = impl_->outgoing[dest]; |
| |
| // Start constructing the message header |
| impl::message_header header; |
| header.source = process_id(*this); |
| header.tag = tag; |
| header.offset = outgoing.buffer.size(); |
| |
| boost::mpi::packed_oarchive oa(impl_->comm, outgoing.buffer); |
| oa << value; |
| |
| #ifdef PBGL_PROCESS_GROUP_DEBUG |
| std::cerr << "SEND: " << process_id(*this) << " -> " << dest << ", tag = " |
| << tag << ", bytes = " << packed_size << std::endl; |
| #endif |
| |
| // Store the header |
| header.bytes = outgoing.buffer.size() - header.offset; |
| outgoing.headers.push_back(header); |
| |
| maybe_send_batch(dest); |
| } |
| |
| |
| template<typename T> |
| void |
| mpi_process_group::send_impl(int dest, int tag, const T& value, |
| mpl::false_ /*is_mpi_datatype*/) const |
| { |
| assert(tag < msg_reserved_first || tag > msg_reserved_last); |
| |
| impl::outgoing_messages& outgoing = impl_->outgoing[dest]; |
| |
| // Start constructing the message header |
| impl::message_header header; |
| header.source = process_id(*this); |
| header.tag = tag; |
| header.offset = outgoing.buffer.size(); |
| |
| // Serialize into the buffer |
| boost::mpi::packed_oarchive out(impl_->comm, outgoing.buffer); |
| out << value; |
| |
| // Store the header |
| header.bytes = outgoing.buffer.size() - header.offset; |
| outgoing.headers.push_back(header); |
| maybe_send_batch(dest); |
| |
| #ifdef PBGL_PROCESS_GROUP_DEBUG |
| std::cerr << "SEND: " << process_id(*this) << " -> " << dest << ", tag = " |
| << tag << ", bytes = " << header.bytes << std::endl; |
| #endif |
| } |
| |
| template<typename T> |
| inline void |
| send(const mpi_process_group& pg, mpi_process_group::process_id_type dest, |
| int tag, const T& value) |
| { |
| pg.send_impl(dest, pg.encode_tag(pg.my_block_number(), tag), value, |
| boost::mpi::is_mpi_datatype<T>()); |
| } |
| |
| template<typename T> |
| typename enable_if<boost::mpi::is_mpi_datatype<T>, void>::type |
| send(const mpi_process_group& pg, mpi_process_group::process_id_type dest, |
| int tag, const T values[], std::size_t n) |
| { |
| pg.send_impl(dest, pg.encode_tag(pg.my_block_number(), tag), |
| boost::serialization::make_array(values,n), |
| boost::mpl::true_()); |
| } |
| |
| template<typename T> |
| typename disable_if<boost::mpi::is_mpi_datatype<T>, void>::type |
| mpi_process_group:: |
| array_send_impl(int dest, int tag, const T values[], std::size_t n) const |
| { |
| assert(tag < msg_reserved_first || tag > msg_reserved_last); |
| |
| impl::outgoing_messages& outgoing = impl_->outgoing[dest]; |
| |
| // Start constructing the message header |
| impl::message_header header; |
| header.source = process_id(*this); |
| header.tag = tag; |
| header.offset = outgoing.buffer.size(); |
| |
| // Serialize into the buffer |
| boost::mpi::packed_oarchive out(impl_->comm, outgoing.buffer); |
| out << n; |
| |
| for (std::size_t i = 0; i < n; ++i) |
| out << values[i]; |
| |
| // Store the header |
| header.bytes = outgoing.buffer.size() - header.offset; |
| outgoing.headers.push_back(header); |
| maybe_send_batch(dest); |
| |
| #ifdef PBGL_PROCESS_GROUP_DEBUG |
| std::cerr << "SEND: " << process_id(*this) << " -> " << dest << ", tag = " |
| << tag << ", bytes = " << header.bytes << std::endl; |
| #endif |
| } |
| |
| template<typename T> |
| typename disable_if<boost::mpi::is_mpi_datatype<T>, void>::type |
| send(const mpi_process_group& pg, mpi_process_group::process_id_type dest, |
| int tag, const T values[], std::size_t n) |
| { |
| pg.array_send_impl(dest, pg.encode_tag(pg.my_block_number(), tag), |
| values, n); |
| } |
| |
| template<typename InputIterator> |
| void |
| send(const mpi_process_group& pg, mpi_process_group::process_id_type dest, |
| int tag, InputIterator first, InputIterator last) |
| { |
| typedef typename std::iterator_traits<InputIterator>::value_type value_type; |
| std::vector<value_type> values(first, last); |
| if (values.empty()) send(pg, dest, tag, static_cast<value_type*>(0), 0); |
| else send(pg, dest, tag, &values[0], values.size()); |
| } |
| |
| template<typename T> |
| bool |
| mpi_process_group::receive_impl(int source, int tag, T& value, |
| mpl::true_ /*is_mpi_datatype*/) const |
| { |
| #ifdef PBGL_PROCESS_GROUP_DEBUG |
| std::cerr << "RECV: " << process_id(*this) << " <- " << source << ", tag = " |
| << tag << std::endl; |
| #endif |
| |
| impl::incoming_messages& incoming = impl_->incoming[source]; |
| |
| // Find the next header with the right tag |
| std::vector<impl::message_header>::iterator header = |
| incoming.next_header[my_block_number()]; |
| while (header != incoming.headers.end() && header->tag != tag) ++header; |
| |
| // If no header is found, notify the caller |
| if (header == incoming.headers.end()) return false; |
| |
| // Unpack the data |
| if (header->bytes > 0) { |
| boost::mpi::packed_iarchive ia(impl_->comm, incoming.buffer, |
| archive::no_header, header->offset); |
| ia >> value; |
| } |
| |
| // Mark this message as received |
| header->tag = -1; |
| |
| // Move the "next header" indicator to the next unreceived message |
| while (incoming.next_header[my_block_number()] != incoming.headers.end() |
| && incoming.next_header[my_block_number()]->tag == -1) |
| ++incoming.next_header[my_block_number()]; |
| |
| if (incoming.next_header[my_block_number()] == incoming.headers.end()) { |
| bool finished = true; |
| for (std::size_t i = 0; i < incoming.next_header.size() && finished; ++i) { |
| if (incoming.next_header[i] != incoming.headers.end()) finished = false; |
| } |
| |
| if (finished) { |
| std::vector<impl::message_header> no_headers; |
| incoming.headers.swap(no_headers); |
| buffer_type empty_buffer; |
| incoming.buffer.swap(empty_buffer); |
| for (std::size_t i = 0; i < incoming.next_header.size(); ++i) |
| incoming.next_header[i] = incoming.headers.end(); |
| } |
| } |
| |
| return true; |
| } |
| |
| template<typename T> |
| bool |
| mpi_process_group::receive_impl(int source, int tag, T& value, |
| mpl::false_ /*is_mpi_datatype*/) const |
| { |
| impl::incoming_messages& incoming = impl_->incoming[source]; |
| |
| // Find the next header with the right tag |
| std::vector<impl::message_header>::iterator header = |
| incoming.next_header[my_block_number()]; |
| while (header != incoming.headers.end() && header->tag != tag) ++header; |
| |
| // If no header is found, notify the caller |
| if (header == incoming.headers.end()) return false; |
| |
| // Deserialize the data |
| boost::mpi::packed_iarchive in(impl_->comm, incoming.buffer, |
| archive::no_header, header->offset); |
| in >> value; |
| |
| // Mark this message as received |
| header->tag = -1; |
| |
| // Move the "next header" indicator to the next unreceived message |
| while (incoming.next_header[my_block_number()] != incoming.headers.end() |
| && incoming.next_header[my_block_number()]->tag == -1) |
| ++incoming.next_header[my_block_number()]; |
| |
| if (incoming.next_header[my_block_number()] == incoming.headers.end()) { |
| bool finished = true; |
| for (std::size_t i = 0; i < incoming.next_header.size() && finished; ++i) { |
| if (incoming.next_header[i] != incoming.headers.end()) finished = false; |
| } |
| |
| if (finished) { |
| std::vector<impl::message_header> no_headers; |
| incoming.headers.swap(no_headers); |
| buffer_type empty_buffer; |
| incoming.buffer.swap(empty_buffer); |
| for (std::size_t i = 0; i < incoming.next_header.size(); ++i) |
| incoming.next_header[i] = incoming.headers.end(); |
| } |
| } |
| |
| return true; |
| } |
| |
| template<typename T> |
| typename disable_if<boost::mpi::is_mpi_datatype<T>, bool>::type |
| mpi_process_group:: |
| array_receive_impl(int source, int tag, T* values, std::size_t& n) const |
| { |
| impl::incoming_messages& incoming = impl_->incoming[source]; |
| |
| // Find the next header with the right tag |
| std::vector<impl::message_header>::iterator header = |
| incoming.next_header[my_block_number()]; |
| while (header != incoming.headers.end() && header->tag != tag) ++header; |
| |
| // If no header is found, notify the caller |
| if (header == incoming.headers.end()) return false; |
| |
| // Deserialize the data |
| boost::mpi::packed_iarchive in(impl_->comm, incoming.buffer, |
| archive::no_header, header->offset); |
| std::size_t num_sent; |
| in >> num_sent; |
| if (num_sent > n) |
| std::cerr << "ERROR: Have " << num_sent << " items but only space for " |
| << n << " items\n"; |
| |
| for (std::size_t i = 0; i < num_sent; ++i) |
| in >> values[i]; |
| n = num_sent; |
| |
| // Mark this message as received |
| header->tag = -1; |
| |
| // Move the "next header" indicator to the next unreceived message |
| while (incoming.next_header[my_block_number()] != incoming.headers.end() |
| && incoming.next_header[my_block_number()]->tag == -1) |
| ++incoming.next_header[my_block_number()]; |
| |
| if (incoming.next_header[my_block_number()] == incoming.headers.end()) { |
| bool finished = true; |
| for (std::size_t i = 0; i < incoming.next_header.size() && finished; ++i) { |
| if (incoming.next_header[i] != incoming.headers.end()) finished = false; |
| } |
| |
| if (finished) { |
| std::vector<impl::message_header> no_headers; |
| incoming.headers.swap(no_headers); |
| buffer_type empty_buffer; |
| incoming.buffer.swap(empty_buffer); |
| for (std::size_t i = 0; i < incoming.next_header.size(); ++i) |
| incoming.next_header[i] = incoming.headers.end(); |
| } |
| } |
| |
| return true; |
| } |
| |
| // Construct triggers |
| template<typename Type, typename Handler> |
| void mpi_process_group::trigger(int tag, const Handler& handler) |
| { |
| assert(block_num); |
| install_trigger(tag,my_block_number(),shared_ptr<trigger_base>( |
| new trigger_launcher<Type, Handler>(*this, tag, handler))); |
| } |
| |
| template<typename Type, typename Handler> |
| void mpi_process_group::trigger_with_reply(int tag, const Handler& handler) |
| { |
| assert(block_num); |
| install_trigger(tag,my_block_number(),shared_ptr<trigger_base>( |
| new reply_trigger_launcher<Type, Handler>(*this, tag, handler))); |
| } |
| |
| template<typename Type, typename Handler> |
| void mpi_process_group::global_trigger(int tag, const Handler& handler, |
| std::size_t sz) |
| { |
| if (sz==0) // normal trigger |
| install_trigger(tag,0,shared_ptr<trigger_base>( |
| new global_trigger_launcher<Type, Handler>(*this, tag, handler))); |
| else // trigger with irecv |
| install_trigger(tag,0,shared_ptr<trigger_base>( |
| new global_irecv_trigger_launcher<Type, Handler>(*this, tag, handler,sz))); |
| |
| } |
| |
| namespace detail { |
| |
| template<typename Type> |
| void do_oob_receive(mpi_process_group const& self, |
| int source, int tag, Type& data, mpl::true_ /*is_mpi_datatype*/) |
| { |
| using boost::mpi::get_mpi_datatype; |
| |
| //self.impl_->comm.recv(source,tag,data); |
| MPI_Recv(&data, 1, get_mpi_datatype<Type>(data), source, tag, self.impl_->comm, |
| MPI_STATUS_IGNORE); |
| } |
| |
| template<typename Type> |
| void do_oob_receive(mpi_process_group const& self, |
| int source, int tag, Type& data, mpl::false_ /*is_mpi_datatype*/) |
| { |
| // self.impl_->comm.recv(source,tag,data); |
| // Receive the size of the data packet |
| boost::mpi::status status; |
| status = self.impl_->comm.probe(source, tag); |
| |
| #if BOOST_VERSION >= 103600 |
| int size = status.count<boost::mpi::packed>().get(); |
| #else |
| int size; |
| MPI_Status& mpi_status = status; |
| MPI_Get_count(&mpi_status, MPI_PACKED, &size); |
| #endif |
| |
| // Receive the data packed itself |
| boost::mpi::packed_iarchive in(self.impl_->comm); |
| in.resize(size); |
| MPI_Recv(in.address(), size, MPI_PACKED, source, tag, self.impl_->comm, |
| MPI_STATUS_IGNORE); |
| |
| // Deserialize the data |
| in >> data; |
| } |
| |
| template<typename Type> |
| void do_oob_receive(mpi_process_group const& self, int source, int tag, Type& data) |
| { |
| do_oob_receive(self, source, tag, data, |
| boost::mpi::is_mpi_datatype<Type>()); |
| } |
| |
| |
| } // namespace detail |
| |
| |
| template<typename Type, typename Handler> |
| void |
| mpi_process_group::trigger_launcher<Type, Handler>:: |
| receive(mpi_process_group const&, int source, int tag, |
| trigger_receive_context context, int block) const |
| { |
| #ifdef PBGL_PROCESS_GROUP_DEBUG |
| std::cerr << (out_of_band? "OOB trigger" : "Trigger") |
| << " receive from source " << source << " and tag " << tag |
| << " in block " << (block == -1 ? self.my_block_number() : block) << std::endl; |
| #endif |
| |
| Type data; |
| |
| if (context == trc_out_of_band) { |
| // Receive the message directly off the wire |
| int realtag = self.encode_tag( |
| block == -1 ? self.my_block_number() : block, tag); |
| detail::do_oob_receive(self,source,realtag,data); |
| } |
| else |
| // Receive the message out of the local buffer |
| boost::graph::distributed::receive(self, source, tag, data); |
| |
| // Pass the message off to the handler |
| handler(source, tag, data, context); |
| } |
| |
| template<typename Type, typename Handler> |
| void |
| mpi_process_group::reply_trigger_launcher<Type, Handler>:: |
| receive(mpi_process_group const&, int source, int tag, |
| trigger_receive_context context, int block) const |
| { |
| #ifdef PBGL_PROCESS_GROUP_DEBUG |
| std::cerr << (out_of_band? "OOB reply trigger" : "Reply trigger") |
| << " receive from source " << source << " and tag " << tag |
| << " in block " << (block == -1 ? self.my_block_number() : block) << std::endl; |
| #endif |
| assert(context == trc_out_of_band); |
| |
| boost::parallel::detail::untracked_pair<int, Type> data; |
| |
| // Receive the message directly off the wire |
| int realtag = self.encode_tag(block == -1 ? self.my_block_number() : block, |
| tag); |
| detail::do_oob_receive(self, source, realtag, data); |
| |
| // Pass the message off to the handler and send the result back to |
| // the source. |
| send_oob(self, source, data.first, |
| handler(source, tag, data.second, context), -2); |
| } |
| |
| template<typename Type, typename Handler> |
| void |
| mpi_process_group::global_trigger_launcher<Type, Handler>:: |
| receive(mpi_process_group const& self, int source, int tag, |
| trigger_receive_context context, int block) const |
| { |
| #ifdef PBGL_PROCESS_GROUP_DEBUG |
| std::cerr << (out_of_band? "OOB trigger" : "Trigger") |
| << " receive from source " << source << " and tag " << tag |
| << " in block " << (block == -1 ? self.my_block_number() : block) << std::endl; |
| #endif |
| |
| Type data; |
| |
| if (context == trc_out_of_band) { |
| // Receive the message directly off the wire |
| int realtag = self.encode_tag( |
| block == -1 ? self.my_block_number() : block, tag); |
| detail::do_oob_receive(self,source,realtag,data); |
| } |
| else |
| // Receive the message out of the local buffer |
| boost::graph::distributed::receive(self, source, tag, data); |
| |
| // Pass the message off to the handler |
| handler(self, source, tag, data, context); |
| } |
| |
| |
| template<typename Type, typename Handler> |
| void |
| mpi_process_group::global_irecv_trigger_launcher<Type, Handler>:: |
| receive(mpi_process_group const& self, int source, int tag, |
| trigger_receive_context context, int block) const |
| { |
| #ifdef PBGL_PROCESS_GROUP_DEBUG |
| std::cerr << (out_of_band? "OOB trigger" : "Trigger") |
| << " receive from source " << source << " and tag " << tag |
| << " in block " << (block == -1 ? self.my_block_number() : block) << std::endl; |
| #endif |
| |
| Type data; |
| |
| if (context == trc_out_of_band) { |
| return; |
| } |
| assert (context == trc_irecv_out_of_band); |
| |
| // force posting of new MPI_Irecv, even though buffer is already allocated |
| boost::mpi::packed_iarchive ia(self.impl_->comm,self.impl_->buffers[tag]); |
| ia >> data; |
| // Start a new receive |
| prepare_receive(self,tag,true); |
| // Pass the message off to the handler |
| handler(self, source, tag, data, context); |
| } |
| |
| |
| template<typename Type, typename Handler> |
| void |
| mpi_process_group::global_irecv_trigger_launcher<Type, Handler>:: |
| prepare_receive(mpi_process_group const& self, int tag, bool force) const |
| { |
| #ifdef PBGL_PROCESS_GROUP_DEBUG |
| std::cerr << ("Posting Irecv for trigger") |
| << " receive with tag " << tag << std::endl; |
| #endif |
| if (self.impl_->buffers.find(tag) == self.impl_->buffers.end()) { |
| self.impl_->buffers[tag].resize(buffer_size); |
| force = true; |
| } |
| assert(static_cast<int>(self.impl_->buffers[tag].size()) >= buffer_size); |
| |
| //BOOST_MPL_ASSERT(mpl::not_<is_mpi_datatype<Type> >); |
| if (force) { |
| self.impl_->requests.push_back(MPI_Request()); |
| MPI_Request* request = &self.impl_->requests.back(); |
| MPI_Irecv(&self.impl_->buffers[tag].front(),buffer_size, |
| MPI_PACKED,MPI_ANY_SOURCE,tag,self.impl_->comm,request); |
| } |
| } |
| |
| |
| template<typename T> |
| inline mpi_process_group::process_id_type |
| receive(const mpi_process_group& pg, int tag, T& value) |
| { |
| for (std::size_t source = 0; source < pg.impl_->incoming.size(); ++source) { |
| if (pg.receive_impl(source, pg.encode_tag(pg.my_block_number(), tag), |
| value, boost::mpi::is_mpi_datatype<T>())) |
| return source; |
| } |
| assert (false); |
| } |
| |
| template<typename T> |
| typename |
| enable_if<boost::mpi::is_mpi_datatype<T>, |
| std::pair<mpi_process_group::process_id_type, std::size_t> >::type |
| receive(const mpi_process_group& pg, int tag, T values[], std::size_t n) |
| { |
| for (std::size_t source = 0; source < pg.impl_->incoming.size(); ++source) { |
| bool result = |
| pg.receive_impl(source, pg.encode_tag(pg.my_block_number(), tag), |
| boost::serialization::make_array(values,n), |
| boost::mpl::true_()); |
| if (result) |
| return std::make_pair(source, n); |
| } |
| assert(false); |
| } |
| |
| template<typename T> |
| typename |
| disable_if<boost::mpi::is_mpi_datatype<T>, |
| std::pair<mpi_process_group::process_id_type, std::size_t> >::type |
| receive(const mpi_process_group& pg, int tag, T values[], std::size_t n) |
| { |
| for (std::size_t source = 0; source < pg.impl_->incoming.size(); ++source) { |
| if (pg.array_receive_impl(source, pg.encode_tag(pg.my_block_number(), tag), |
| values, n)) |
| return std::make_pair(source, n); |
| } |
| assert(false); |
| } |
| |
| template<typename T> |
| mpi_process_group::process_id_type |
| receive(const mpi_process_group& pg, |
| mpi_process_group::process_id_type source, int tag, T& value) |
| { |
| if (pg.receive_impl(source, pg.encode_tag(pg.my_block_number(), tag), |
| value, boost::mpi::is_mpi_datatype<T>())) |
| return source; |
| else { |
| fprintf(stderr, |
| "Process %d failed to receive a message from process %d with tag %d in block %d.\n", |
| process_id(pg), source, tag, pg.my_block_number()); |
| |
| assert(false); |
| exit(1); |
| } |
| } |
| |
| template<typename T> |
| typename |
| enable_if<boost::mpi::is_mpi_datatype<T>, |
| std::pair<mpi_process_group::process_id_type, std::size_t> >::type |
| receive(const mpi_process_group& pg, int source, int tag, T values[], |
| std::size_t n) |
| { |
| if (pg.receive_impl(source, pg.encode_tag(pg.my_block_number(), tag), |
| boost::serialization::make_array(values,n), |
| boost::mpl::true_())) |
| return std::make_pair(source,n); |
| else { |
| fprintf(stderr, |
| "Process %d failed to receive a message from process %d with tag %d in block %d.\n", |
| process_id(pg), source, tag, pg.my_block_number()); |
| |
| assert(false); |
| exit(1); |
| } |
| } |
| |
| template<typename T> |
| typename |
| disable_if<boost::mpi::is_mpi_datatype<T>, |
| std::pair<mpi_process_group::process_id_type, std::size_t> >::type |
| receive(const mpi_process_group& pg, int source, int tag, T values[], |
| std::size_t n) |
| { |
| pg.array_receive_impl(source, pg.encode_tag(pg.my_block_number(), tag), |
| values, n); |
| |
| return std::make_pair(source, n); |
| } |
| |
| template<typename T, typename BinaryOperation> |
| T* |
| all_reduce(const mpi_process_group& pg, T* first, T* last, T* out, |
| BinaryOperation bin_op) |
| { |
| synchronize(pg); |
| |
| bool inplace = first == out; |
| |
| if (inplace) out = new T [last-first]; |
| |
| boost::mpi::all_reduce(boost::mpi::communicator(communicator(pg), |
| boost::mpi::comm_attach), |
| first, last-first, out, bin_op); |
| |
| if (inplace) { |
| std::copy(out, out + (last-first), first); |
| delete [] out; |
| return last; |
| } |
| |
| return out; |
| } |
| |
| template<typename T> |
| void |
| broadcast(const mpi_process_group& pg, T& val, |
| mpi_process_group::process_id_type root) |
| { |
| // broadcast the seed |
| boost::mpi::communicator comm(communicator(pg),boost::mpi::comm_attach); |
| boost::mpi::broadcast(comm,val,root); |
| } |
| |
| |
| template<typename T, typename BinaryOperation> |
| T* |
| scan(const mpi_process_group& pg, T* first, T* last, T* out, |
| BinaryOperation bin_op) |
| { |
| synchronize(pg); |
| |
| bool inplace = first == out; |
| |
| if (inplace) out = new T [last-first]; |
| |
| boost::mpi::scan(communicator(pg), first, last-first, out, bin_op); |
| |
| if (inplace) { |
| std::copy(out, out + (last-first), first); |
| delete [] out; |
| return last; |
| } |
| |
| return out; |
| } |
| |
| |
| template<typename InputIterator, typename T> |
| void |
| all_gather(const mpi_process_group& pg, InputIterator first, |
| InputIterator last, std::vector<T>& out) |
| { |
| synchronize(pg); |
| |
| // Stick a copy of the local values into a vector, so we can broadcast it |
| std::vector<T> local_values(first, last); |
| |
| // Collect the number of vertices stored in each process |
| int size = local_values.size(); |
| std::vector<int> sizes(num_processes(pg)); |
| int result = MPI_Allgather(&size, 1, MPI_INT, |
| &sizes[0], 1, MPI_INT, |
| communicator(pg)); |
| assert(result == MPI_SUCCESS); |
| |
| // Adjust sizes based on the number of bytes |
| std::transform(sizes.begin(), sizes.end(), sizes.begin(), |
| std::bind2nd(std::multiplies<int>(), sizeof(T))); |
| |
| // Compute displacements |
| std::vector<int> displacements; |
| displacements.reserve(sizes.size() + 1); |
| displacements.push_back(0); |
| std::partial_sum(sizes.begin(), sizes.end(), |
| std::back_inserter(displacements)); |
| |
| // Gather all of the values |
| out.resize(displacements.back() / sizeof(T)); |
| if (!out.empty()) { |
| result = MPI_Allgatherv(local_values.empty()? (void*)&local_values |
| /* local results */: (void*)&local_values[0], |
| local_values.size() * sizeof(T), |
| MPI_BYTE, |
| &out[0], &sizes[0], &displacements[0], MPI_BYTE, |
| communicator(pg)); |
| } |
| assert(result == MPI_SUCCESS); |
| } |
| |
| template<typename InputIterator> |
| mpi_process_group |
| process_subgroup(const mpi_process_group& pg, |
| InputIterator first, InputIterator last) |
| { |
| /* |
| boost::mpi::group current_group = communicator(pg).group(); |
| boost::mpi::group new_group = current_group.include(first,last); |
| boost::mpi::communicator new_comm(communicator(pg),new_group); |
| return mpi_process_group(new_comm); |
| */ |
| std::vector<int> ranks(first, last); |
| |
| MPI_Group current_group; |
| int result = MPI_Comm_group(communicator(pg), ¤t_group); |
| assert(result == MPI_SUCCESS); |
| |
| MPI_Group new_group; |
| result = MPI_Group_incl(current_group, ranks.size(), &ranks[0], &new_group); |
| assert(result == MPI_SUCCESS); |
| |
| MPI_Comm new_comm; |
| result = MPI_Comm_create(communicator(pg), new_group, &new_comm); |
| assert(result == MPI_SUCCESS); |
| |
| result = MPI_Group_free(&new_group); |
| assert(result == MPI_SUCCESS); |
| result = MPI_Group_free(¤t_group); |
| assert(result == MPI_SUCCESS); |
| |
| if (new_comm != MPI_COMM_NULL) { |
| mpi_process_group result_pg(boost::mpi::communicator(new_comm,boost::mpi::comm_attach)); |
| result = MPI_Comm_free(&new_comm); |
| assert(result == 0); |
| return result_pg; |
| } else { |
| return mpi_process_group(mpi_process_group::create_empty()); |
| } |
| |
| } |
| |
| |
| template<typename Receiver> |
| Receiver* mpi_process_group::get_receiver() |
| { |
| return impl_->blocks[my_block_number()]->on_receive |
| .template target<Receiver>(); |
| } |
| |
| template<typename T> |
| typename enable_if<boost::mpi::is_mpi_datatype<T> >::type |
| receive_oob(const mpi_process_group& pg, |
| mpi_process_group::process_id_type source, int tag, T& value, int block) |
| { |
| using boost::mpi::get_mpi_datatype; |
| |
| // Determine the actual message we expect to receive, and which |
| // communicator it will come by. |
| std::pair<boost::mpi::communicator, int> actual |
| = pg.actual_communicator_and_tag(tag, block); |
| |
| // Post a non-blocking receive that waits until we complete this request. |
| MPI_Request request; |
| MPI_Irecv(&value, 1, get_mpi_datatype<T>(value), |
| source, actual.second, actual.first, &request); |
| |
| int done = 0; |
| do { |
| MPI_Test(&request, &done, MPI_STATUS_IGNORE); |
| if (!done) |
| pg.poll(/*wait=*/false, block); |
| } while (!done); |
| } |
| |
| template<typename T> |
| typename disable_if<boost::mpi::is_mpi_datatype<T> >::type |
| receive_oob(const mpi_process_group& pg, |
| mpi_process_group::process_id_type source, int tag, T& value, int block) |
| { |
| // Determine the actual message we expect to receive, and which |
| // communicator it will come by. |
| std::pair<boost::mpi::communicator, int> actual |
| = pg.actual_communicator_and_tag(tag, block); |
| |
| boost::optional<boost::mpi::status> status; |
| do { |
| status = actual.first.iprobe(source, actual.second); |
| if (!status) |
| pg.poll(); |
| } while (!status); |
| |
| //actual.first.recv(status->source(), status->tag(),value); |
| |
| // Allocate the receive buffer |
| boost::mpi::packed_iarchive in(actual.first); |
| |
| #if BOOST_VERSION >= 103600 |
| in.resize(status->count<boost::mpi::packed>().get()); |
| #else |
| int size; |
| MPI_Status mpi_status = *status; |
| MPI_Get_count(&mpi_status, MPI_PACKED, &size); |
| in.resize(size); |
| #endif |
| |
| // Receive the message data |
| MPI_Recv(in.address(), in.size(), MPI_PACKED, |
| status->source(), status->tag(), actual.first, MPI_STATUS_IGNORE); |
| |
| // Unpack the message data |
| in >> value; |
| } |
| |
| |
| template<typename SendT, typename ReplyT> |
| typename enable_if<boost::mpi::is_mpi_datatype<ReplyT> >::type |
| send_oob_with_reply(const mpi_process_group& pg, |
| mpi_process_group::process_id_type dest, |
| int tag, const SendT& send_value, ReplyT& reply_value, |
| int block) |
| { |
| detail::tag_allocator::token reply_tag = pg.impl_->allocated_tags.get_tag(); |
| send_oob(pg, dest, tag, boost::parallel::detail::make_untracked_pair( |
| (int)reply_tag, send_value), block); |
| receive_oob(pg, dest, reply_tag, reply_value); |
| } |
| |
| template<typename SendT, typename ReplyT> |
| typename disable_if<boost::mpi::is_mpi_datatype<ReplyT> >::type |
| send_oob_with_reply(const mpi_process_group& pg, |
| mpi_process_group::process_id_type dest, |
| int tag, const SendT& send_value, ReplyT& reply_value, |
| int block) |
| { |
| detail::tag_allocator::token reply_tag = pg.impl_->allocated_tags.get_tag(); |
| send_oob(pg, dest, tag, |
| boost::parallel::detail::make_untracked_pair((int)reply_tag, |
| send_value), block); |
| receive_oob(pg, dest, reply_tag, reply_value); |
| } |
| |
| } } } // end namespace boost::graph::distributed |