| // Copyright (C) 2004-2006 The Trustees of Indiana University. |
| |
| // Use, modification and distribution is subject to the Boost Software |
| // License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at |
| // http://www.boost.org/LICENSE_1_0.txt) |
| |
| // Authors: Douglas Gregor |
| // Andrew Lumsdaine |
| #ifndef BOOST_GRAPH_DISTRIBUTED_QUEUE_HPP |
| #define BOOST_GRAPH_DISTRIBUTED_QUEUE_HPP |
| |
| #ifndef BOOST_GRAPH_USE_MPI |
| #error "Parallel BGL files should not be included unless <boost/graph/use_mpi.hpp> has been included" |
| #endif |
| |
| #include <boost/graph/parallel/process_group.hpp> |
| #include <boost/optional.hpp> |
| #include <boost/shared_ptr.hpp> |
| #include <vector> |
| |
| namespace boost { namespace graph { namespace distributed { |
| |
| /// A unary predicate that always returns "true". |
| struct always_push |
| { |
| template<typename T> bool operator()(const T&) const { return true; } |
| }; |
| |
| |
| |
| /** A distributed queue adaptor. |
| * |
| * Class template @c distributed_queue implements a distributed queue |
| * across a process group. The distributed queue is an adaptor over an |
| * existing (local) queue, which must model the @ref Buffer |
| * concept. Each process stores a distinct copy of the local queue, |
| * from which it draws or removes elements via the @ref pop and @ref |
| * top members. |
| * |
| * The value type of the local queue must be a model of the @ref |
| * GlobalDescriptor concept. The @ref push operation of the |
| * distributed queue passes (via a message) the value to its owning |
| * processor. Thus, the elements within a particular local queue are |
| * guaranteed to have the process owning that local queue as an owner. |
| * |
| * Synchronization of distributed queues occurs in the @ref empty and |
| * @ref size functions, which will only return "empty" values (true or |
| * 0, respectively) when the entire distributed queue is empty. If the |
| * local queue is empty but the distributed queue is not, the |
| * operation will block until either condition changes. When the @ref |
| * size function of a nonempty queue returns, it returns the size of |
| * the local queue. These semantics were selected so that sequential |
| * code that processes elements in the queue via the following idiom |
| * can be parallelized via introduction of a distributed queue: |
| * |
| * distributed_queue<...> Q; |
| * Q.push(x); |
| * while (!Q.empty()) { |
| * // do something, that may push a value onto Q |
| * } |
| * |
| * In the parallel version, the initial @ref push operation will place |
| * the value @c x onto its owner's queue. All processes will |
| * synchronize at the call to empty, and only the process owning @c x |
| * will be allowed to execute the loop (@ref Q.empty() returns |
| * false). This iteration may in turn push values onto other remote |
| * queues, so when that process finishes execution of the loop body |
| * and all processes synchronize again in @ref empty, more processes |
| * may have nonempty local queues to execute. Once all local queues |
| * are empty, @ref Q.empty() returns @c false for all processes. |
| * |
| * The distributed queue can receive messages at two different times: |
| * during synchronization and when polling @ref empty. Messages are |
| * always received during synchronization, to ensure that accurate |
| * local queue sizes can be determines. However, whether @ref empty |
| * should poll for messages is specified as an option to the |
| * constructor. Polling may be desired when the order in which |
| * elements in the queue are processed is not important, because it |
| * permits fewer synchronization steps and less communication |
| * overhead. However, when more strict ordering guarantees are |
| * required, polling may be semantically incorrect. By disabling |
| * polling, one ensures that parallel execution using the idiom above |
| * will not process an element at a later "level" before an earlier |
| * "level". |
| * |
| * The distributed queue nearly models the @ref Buffer |
| * concept. However, the @ref push routine does not necessarily |
| * increase the result of @c size() by one (although the size of the |
| * global queue does increase by one). |
| */ |
| template<typename ProcessGroup, typename OwnerMap, typename Buffer, |
| typename UnaryPredicate = always_push> |
| class distributed_queue |
| { |
| typedef distributed_queue self_type; |
| |
| enum { |
| /** Message indicating a remote push. The message contains a |
| * single value x of type value_type that is to be pushed on the |
| * receiver's queue. |
| */ |
| msg_push, |
| /** Push many elements at once. */ |
| msg_multipush |
| }; |
| |
| public: |
| typedef ProcessGroup process_group_type; |
| typedef Buffer buffer_type; |
| typedef typename buffer_type::value_type value_type; |
| typedef typename buffer_type::size_type size_type; |
| |
| /** Construct a new distributed queue. |
| * |
| * Build a new distributed queue that communicates over the given @p |
| * process_group, whose local queue is initialized via @p buffer and |
| * which may or may not poll for messages. |
| */ |
| explicit |
| distributed_queue(const ProcessGroup& process_group, |
| const OwnerMap& owner, |
| const Buffer& buffer, |
| bool polling = false); |
| |
| /** Construct a new distributed queue. |
| * |
| * Build a new distributed queue that communicates over the given @p |
| * process_group, whose local queue is initialized via @p buffer and |
| * which may or may not poll for messages. |
| */ |
| explicit |
| distributed_queue(const ProcessGroup& process_group = ProcessGroup(), |
| const OwnerMap& owner = OwnerMap(), |
| const Buffer& buffer = Buffer(), |
| const UnaryPredicate& pred = UnaryPredicate(), |
| bool polling = false); |
| |
| /** Construct a new distributed queue. |
| * |
| * Build a new distributed queue that communicates over the given @p |
| * process_group, whose local queue is default-initalized and which |
| * may or may not poll for messages. |
| */ |
| distributed_queue(const ProcessGroup& process_group, const OwnerMap& owner, |
| const UnaryPredicate& pred, bool polling = false); |
| |
| /** Virtual destructor required with virtual functions. |
| * |
| */ |
| virtual ~distributed_queue() {} |
| |
| /** Push an element onto the distributed queue. |
| * |
| * The element will be sent to its owner process to be added to that |
| * process's local queue. If polling is enabled for this queue and |
| * the owner process is the current process, the value will be |
| * immediately pushed onto the local queue. |
| * |
| * Complexity: O(1) messages of size O(sizeof(value_type)) will be |
| * transmitted. |
| */ |
| void push(const value_type& x); |
| |
| /** Pop an element off the local queue. |
| * |
| * @p @c !empty() |
| */ |
| void pop() { buffer.pop(); } |
| |
| /** |
| * Return the element at the top of the local queue. |
| * |
| * @p @c !empty() |
| */ |
| value_type& top() { return buffer.top(); } |
| |
| /** |
| * \overload |
| */ |
| const value_type& top() const { return buffer.top(); } |
| |
| /** Determine if the queue is empty. |
| * |
| * When the local queue is nonempty, returns @c true. If the local |
| * queue is empty, synchronizes with all other processes in the |
| * process group until either (1) the local queue is nonempty |
| * (returns @c true) (2) the entire distributed queue is empty |
| * (returns @c false). |
| */ |
| bool empty() const; |
| |
| /** Determine the size of the local queue. |
| * |
| * The behavior of this routine is equivalent to the behavior of |
| * @ref empty, except that when @ref empty returns true this |
| * function returns the size of the local queue and when @ref empty |
| * returns false this function returns zero. |
| */ |
| size_type size() const; |
| |
| // private: |
| /** Synchronize the distributed queue and determine if all queues |
| * are empty. |
| * |
| * \returns \c true when all local queues are empty, or false if at least |
| * one of the local queues is nonempty. |
| * Defined as virtual for derived classes like depth_limited_distributed_queue. |
| */ |
| virtual bool do_synchronize() const; |
| |
| private: |
| // Setup triggers |
| void setup_triggers(); |
| |
| // Message handlers |
| void |
| handle_push(int source, int tag, const value_type& value, |
| trigger_receive_context); |
| |
| void |
| handle_multipush(int source, int tag, const std::vector<value_type>& values, |
| trigger_receive_context); |
| |
| mutable ProcessGroup process_group; |
| OwnerMap owner; |
| mutable Buffer buffer; |
| UnaryPredicate pred; |
| bool polling; |
| |
| typedef std::vector<value_type> outgoing_buffer_t; |
| typedef std::vector<outgoing_buffer_t> outgoing_buffers_t; |
| shared_ptr<outgoing_buffers_t> outgoing_buffers; |
| }; |
| |
| /// Helper macro containing the normal names for the template |
| /// parameters to distributed_queue. |
| #define BOOST_DISTRIBUTED_QUEUE_PARMS \ |
| typename ProcessGroup, typename OwnerMap, typename Buffer, \ |
| typename UnaryPredicate |
| |
| /// Helper macro containing the normal template-id for |
| /// distributed_queue. |
| #define BOOST_DISTRIBUTED_QUEUE_TYPE \ |
| distributed_queue<ProcessGroup, OwnerMap, Buffer, UnaryPredicate> |
| |
| /** Synchronize all processes involved with the given distributed queue. |
| * |
| * This function will synchronize all of the local queues for a given |
| * distributed queue, by ensuring that no additional messages are in |
| * transit. It is rarely required by the user, because most |
| * synchronization of distributed queues occurs via the @c empty or @c |
| * size methods. |
| */ |
| template<BOOST_DISTRIBUTED_QUEUE_PARMS> |
| inline void |
| synchronize(const BOOST_DISTRIBUTED_QUEUE_TYPE& Q) |
| { Q.do_synchronize(); } |
| |
| /// Construct a new distributed queue. |
| template<typename ProcessGroup, typename OwnerMap, typename Buffer> |
| inline distributed_queue<ProcessGroup, OwnerMap, Buffer> |
| make_distributed_queue(const ProcessGroup& process_group, |
| const OwnerMap& owner, |
| const Buffer& buffer, |
| bool polling = false) |
| { |
| typedef distributed_queue<ProcessGroup, OwnerMap, Buffer> result_type; |
| return result_type(process_group, owner, buffer, polling); |
| } |
| |
| } } } // end namespace boost::graph::distributed |
| |
| #include <boost/graph/distributed/detail/queue.ipp> |
| |
| #undef BOOST_DISTRIBUTED_QUEUE_TYPE |
| #undef BOOST_DISTRIBUTED_QUEUE_PARMS |
| |
| #endif // BOOST_GRAPH_DISTRIBUTED_QUEUE_HPP |