| // Copyright (C) 2005-2006 The Trustees of Indiana University. |
| |
| // Use, modification and distribution is subject to the Boost Software |
| // License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at |
| // http://www.boost.org/LICENSE_1_0.txt) |
| |
| // Authors: Douglas Gregor |
| // Andrew Lumsdaine |
| |
| // |
| // Implements redistribution of vertices for a distributed adjacency |
| // list. This file should not be included by users. It will be |
| // included by the distributed adjacency list header. |
| // |
| |
| #ifndef BOOST_GRAPH_USE_MPI |
| #error "Parallel BGL files should not be included unless <boost/graph/use_mpi.hpp> has been included" |
| #endif |
| |
| #include <boost/pending/container_traits.hpp> |
| |
| namespace boost { namespace detail { namespace parallel { |
| |
| /* This structure contains a (vertex or edge) descriptor that is being |
| moved from one processor to another. It contains the properties for |
| that descriptor (if any). |
| */ |
| template<typename Descriptor, typename DescriptorProperty> |
| struct redistributed_descriptor : maybe_store_property<DescriptorProperty> |
| { |
| typedef maybe_store_property<DescriptorProperty> inherited; |
| |
| redistributed_descriptor() { } |
| |
| redistributed_descriptor(const Descriptor& v, const DescriptorProperty& p) |
| : inherited(p), descriptor(v) { } |
| |
| Descriptor descriptor; |
| |
| private: |
| friend class boost::serialization::access; |
| |
| template<typename Archiver> |
| void serialize(Archiver& ar, unsigned int /*version*/) |
| { |
| ar & boost::serialization::base_object<inherited>(*this) |
| & unsafe_serialize(descriptor); |
| } |
| }; |
| |
| /* Predicate that returns true if the target has migrated. */ |
| template<typename VertexProcessorMap, typename Graph> |
| struct target_migrated_t |
| { |
| typedef typename graph_traits<Graph>::vertex_descriptor Vertex; |
| typedef typename graph_traits<Graph>::edge_descriptor Edge; |
| |
| target_migrated_t(VertexProcessorMap vertex_to_processor, const Graph& g) |
| : vertex_to_processor(vertex_to_processor), g(g) { } |
| |
| bool operator()(Edge e) const |
| { |
| typedef global_descriptor<Vertex> DVertex; |
| processor_id_type owner = get(edge_target_processor_id, g, e); |
| return get(vertex_to_processor, DVertex(owner, target(e, g))) != owner; |
| } |
| |
| private: |
| VertexProcessorMap vertex_to_processor; |
| const Graph& g; |
| }; |
| |
| template<typename VertexProcessorMap, typename Graph> |
| inline target_migrated_t<VertexProcessorMap, Graph> |
| target_migrated(VertexProcessorMap vertex_to_processor, const Graph& g) |
| { return target_migrated_t<VertexProcessorMap, Graph>(vertex_to_processor, g); } |
| |
| /* Predicate that returns true if the source of an in-edge has migrated. */ |
| template<typename VertexProcessorMap, typename Graph> |
| struct source_migrated_t |
| { |
| typedef typename graph_traits<Graph>::vertex_descriptor Vertex; |
| typedef typename graph_traits<Graph>::edge_descriptor Edge; |
| |
| source_migrated_t(VertexProcessorMap vertex_to_processor, const Graph& g) |
| : vertex_to_processor(vertex_to_processor), g(g) { } |
| |
| bool operator()(stored_in_edge<Edge> e) const |
| { |
| return get(vertex_to_processor, DVertex(e.source_processor, source(e.e, g))) |
| != e.source_processor; |
| } |
| |
| private: |
| VertexProcessorMap vertex_to_processor; |
| const Graph& g; |
| }; |
| |
| template<typename VertexProcessorMap, typename Graph> |
| inline source_migrated_t<VertexProcessorMap, Graph> |
| source_migrated(VertexProcessorMap vertex_to_processor, const Graph& g) |
| { return source_migrated_t<VertexProcessorMap, Graph>(vertex_to_processor, g); } |
| |
| /* Predicate that returns true if the target has migrated. */ |
| template<typename VertexProcessorMap, typename Graph> |
| struct source_or_target_migrated_t |
| { |
| typedef typename graph_traits<Graph>::edge_descriptor Edge; |
| |
| source_or_target_migrated_t(VertexProcessorMap vertex_to_processor, |
| const Graph& g) |
| : vertex_to_processor(vertex_to_processor), g(g) { } |
| |
| bool operator()(Edge e) const |
| { |
| return get(vertex_to_processor, source(e, g)) != source(e, g).owner |
| || get(vertex_to_processor, target(e, g)) != target(e, g).owner; |
| } |
| |
| private: |
| VertexProcessorMap vertex_to_processor; |
| const Graph& g; |
| }; |
| |
| template<typename VertexProcessorMap, typename Graph> |
| inline source_or_target_migrated_t<VertexProcessorMap, Graph> |
| source_or_target_migrated(VertexProcessorMap vertex_to_processor, |
| const Graph& g) |
| { |
| typedef source_or_target_migrated_t<VertexProcessorMap, Graph> result_type; |
| return result_type(vertex_to_processor, g); |
| } |
| |
| } } // end of namespace detail::parallel |
| |
| template<PBGL_DISTRIB_ADJLIST_TEMPLATE_PARMS> |
| template<typename VertexProcessorMap> |
| void |
| PBGL_DISTRIB_ADJLIST_TYPE |
| ::request_in_neighbors(vertex_descriptor v, |
| VertexProcessorMap vertex_to_processor, |
| bidirectionalS) |
| { |
| BGL_FORALL_INEDGES_T(v, e, *this, graph_type) |
| request(vertex_to_processor, source(e, *this)); |
| } |
| |
| template<PBGL_DISTRIB_ADJLIST_TEMPLATE_PARMS> |
| template<typename VertexProcessorMap> |
| void |
| PBGL_DISTRIB_ADJLIST_TYPE |
| ::remove_migrated_in_edges(vertex_descriptor v, |
| VertexProcessorMap vertex_to_processor, |
| bidirectionalS) |
| { |
| graph_detail::erase_if(get(vertex_in_edges, base())[v.local], |
| source_migrated(vertex_to_processor, base())); |
| } |
| |
| template<PBGL_DISTRIB_ADJLIST_TEMPLATE_PARMS> |
| template<typename VertexProcessorMap> |
| void |
| PBGL_DISTRIB_ADJLIST_TYPE |
| ::redistribute(VertexProcessorMap vertex_to_processor) |
| { |
| using boost::parallel::inplace_all_to_all; |
| |
| // When we have stable descriptors, we only move those descriptors |
| // that actually need to be moved. Otherwise, we essentially have to |
| // regenerate the entire graph. |
| const bool has_stable_descriptors = |
| is_same<typename config_type::vertex_list_selector, listS>::value |
| || is_same<typename config_type::vertex_list_selector, setS>::value |
| || is_same<typename config_type::vertex_list_selector, multisetS>::value; |
| |
| typedef detail::parallel::redistributed_descriptor<vertex_descriptor, |
| vertex_property_type> |
| redistributed_vertex; |
| typedef detail::parallel::redistributed_descriptor<edge_descriptor, |
| edge_property_type> |
| redistributed_edge; |
| typedef std::pair<vertices_size_type, edges_size_type> num_relocated_pair; |
| |
| vertex_iterator vi, vi_end; |
| edge_iterator ei, ei_end; |
| |
| process_group_type pg = process_group(); |
| |
| // Initial synchronization makes sure that we have all of our ducks |
| // in a row. We don't want any outstanding add/remove messages |
| // coming in mid-redistribution! |
| synchronize(process_group_); |
| |
| // We cannot cope with eviction of ghost cells |
| vertex_to_processor.set_max_ghost_cells(0); |
| |
| process_id_type p = num_processes(pg); |
| |
| // Send vertices and edges to the processor where they will |
| // actually reside. This requires O(|V| + |E|) communication |
| std::vector<std::vector<redistributed_vertex> > redistributed_vertices(p); |
| std::vector<std::vector<redistributed_edge> > redistributed_edges(p); |
| |
| // Build the sets of relocated vertices for each process and then do |
| // an all-to-all transfer. |
| for (boost::tie(vi, vi_end) = vertices(*this); vi != vi_end; ++vi) { |
| if (!has_stable_descriptors |
| || get(vertex_to_processor, *vi) != vi->owner) { |
| redistributed_vertices[get(vertex_to_processor, *vi)] |
| .push_back(redistributed_vertex(*vi, get(vertex_all_t(), base(), |
| vi->local))); |
| } |
| |
| // When our descriptors are stable, we need to determine which |
| // adjacent descriptors are stable to determine which edges will |
| // be removed. |
| if (has_stable_descriptors) { |
| BGL_FORALL_OUTEDGES_T(*vi, e, *this, graph_type) |
| request(vertex_to_processor, target(e, *this)); |
| request_in_neighbors(*vi, vertex_to_processor, directed_selector()); |
| } |
| } |
| |
| inplace_all_to_all(pg, redistributed_vertices); |
| |
| // If we have stable descriptors, we need to know where our neighbor |
| // vertices are moving. |
| if (has_stable_descriptors) |
| synchronize(vertex_to_processor); |
| |
| // Build the sets of relocated edges for each process and then do |
| // an all-to-all transfer. |
| for (boost::tie(ei, ei_end) = edges(*this); ei != ei_end; ++ei) { |
| vertex_descriptor src = source(*ei, *this); |
| vertex_descriptor tgt = target(*ei, *this); |
| if (!has_stable_descriptors |
| || get(vertex_to_processor, src) != src.owner |
| || get(vertex_to_processor, tgt) != tgt.owner) |
| redistributed_edges[get(vertex_to_processor, source(*ei, *this))] |
| .push_back(redistributed_edge(*ei, get(edge_all_t(), base(), |
| ei->local))); |
| } |
| inplace_all_to_all(pg, redistributed_edges); |
| |
| // A mapping from old vertex descriptors to new vertex |
| // descriptors. This is an STL map partly because I'm too lazy to |
| // build a real property map (which is hard in the general case) but |
| // also because it won't try to look in the graph itself, because |
| // the keys are all vertex descriptors that have been invalidated. |
| std::map<vertex_descriptor, vertex_descriptor> old_to_new_vertex_map; |
| |
| if (has_stable_descriptors) { |
| // Clear out all vertices and edges that will have moved. There |
| // are several stages to this. |
| |
| // First, eliminate all outgoing edges from the (local) vertices |
| // that have been moved or whose targets have been moved. |
| BGL_FORALL_VERTICES_T(v, *this, graph_type) { |
| if (get(vertex_to_processor, v) != v.owner) { |
| clear_out_edges(v.local, base()); |
| clear_in_edges_local(v, directed_selector()); |
| } else { |
| remove_out_edge_if(v.local, |
| target_migrated(vertex_to_processor, base()), |
| base()); |
| remove_migrated_in_edges(v, vertex_to_processor, directed_selector()); |
| } |
| } |
| |
| // Next, eliminate locally-stored edges that have migrated (for |
| // undirected graphs). |
| graph_detail::erase_if(local_edges_, |
| source_or_target_migrated(vertex_to_processor, *this)); |
| |
| // Eliminate vertices that have migrated |
| for (boost::tie(vi, vi_end) = vertices(*this); vi != vi_end; /* in loop */) { |
| if (get(vertex_to_processor, *vi) != vi->owner) |
| remove_vertex((*vi++).local, base()); |
| else { |
| // Add the identity relation for vertices that have not migrated |
| old_to_new_vertex_map[*vi] = *vi; |
| ++vi; |
| } |
| } |
| } else { |
| // Clear out the local graph: the entire graph is in transit |
| clear(); |
| } |
| |
| // Add the new vertices to the graph. When we do so, update the old |
| // -> new vertex mapping both locally and for the owner of the "old" |
| // vertex. |
| { |
| typedef std::pair<vertex_descriptor, vertex_descriptor> mapping_pair; |
| std::vector<std::vector<mapping_pair> > mappings(p); |
| |
| for (process_id_type src = 0; src < p; ++src) { |
| for (typename std::vector<redistributed_vertex>::iterator vi = |
| redistributed_vertices[src].begin(); |
| vi != redistributed_vertices[src].end(); ++vi) { |
| vertex_descriptor new_vertex = |
| add_vertex(vi->get_property(), *this); |
| old_to_new_vertex_map[vi->descriptor] = new_vertex; |
| mappings[vi->descriptor.owner].push_back(mapping_pair(vi->descriptor, |
| new_vertex)); |
| } |
| |
| redistributed_vertices[src].clear(); |
| } |
| |
| inplace_all_to_all(pg, mappings); |
| |
| // Add the mappings we were sent into the old->new map. |
| for (process_id_type src = 0; src < p; ++src) |
| old_to_new_vertex_map.insert(mappings[src].begin(), mappings[src].end()); |
| } |
| |
| // Get old->new vertex mappings for all of the vertices we need to |
| // know about. |
| |
| // TBD: An optimization here might involve sending the |
| // request-response pairs without an explicit request step (for |
| // bidirectional and undirected graphs). However, it may not matter |
| // all that much given the cost of redistribution. |
| { |
| std::vector<std::vector<vertex_descriptor> > vertex_map_requests(p); |
| std::vector<std::vector<vertex_descriptor> > vertex_map_responses(p); |
| |
| // We need to know about all of the vertices incident on edges |
| // that have been relocated to this processor. Tell each processor |
| // what each other processor needs to know. |
| for (process_id_type src = 0; src < p; ++src) |
| for (typename std::vector<redistributed_edge>::iterator ei = |
| redistributed_edges[src].begin(); |
| ei != redistributed_edges[src].end(); ++ei) { |
| vertex_descriptor need_vertex = target(ei->descriptor, *this); |
| if (old_to_new_vertex_map.find(need_vertex) |
| == old_to_new_vertex_map.end()) |
| { |
| old_to_new_vertex_map[need_vertex] = need_vertex; |
| vertex_map_requests[need_vertex.owner].push_back(need_vertex); |
| } |
| } |
| inplace_all_to_all(pg, |
| vertex_map_requests, |
| vertex_map_responses); |
| |
| // Process the requests made for vertices we own. Then perform yet |
| // another all-to-all swap. This one matches the requests we've |
| // made to the responses we were given. |
| for (process_id_type src = 0; src < p; ++src) |
| for (typename std::vector<vertex_descriptor>::iterator vi = |
| vertex_map_responses[src].begin(); |
| vi != vertex_map_responses[src].end(); ++vi) |
| *vi = old_to_new_vertex_map[*vi]; |
| inplace_all_to_all(pg, vertex_map_responses); |
| |
| // Matching the requests to the responses, update the old->new |
| // vertex map for all of the vertices we will need to know. |
| for (process_id_type src = 0; src < p; ++src) { |
| typedef typename std::vector<vertex_descriptor>::size_type size_type; |
| for (size_type i = 0; i < vertex_map_requests[src].size(); ++i) { |
| old_to_new_vertex_map[vertex_map_requests[src][i]] = |
| vertex_map_responses[src][i]; |
| } |
| } |
| } |
| |
| // Add edges to the graph by mapping the source and target. |
| for (process_id_type src = 0; src < p; ++src) { |
| for (typename std::vector<redistributed_edge>::iterator ei = |
| redistributed_edges[src].begin(); |
| ei != redistributed_edges[src].end(); ++ei) { |
| add_edge(old_to_new_vertex_map[source(ei->descriptor, *this)], |
| old_to_new_vertex_map[target(ei->descriptor, *this)], |
| ei->get_property(), |
| *this); |
| } |
| |
| redistributed_edges[src].clear(); |
| } |
| |
| // Be sure that edge-addition messages are received now, completing |
| // the graph. |
| synchronize(process_group_); |
| |
| this->distribution().clear(); |
| |
| detail::parallel::maybe_initialize_vertex_indices(vertices(base()), |
| get(vertex_index, base())); |
| } |
| |
| } // end namespace boost |