| // Copyright (C) 2004-2006 The Trustees of Indiana University. |
| |
| // Use, modification and distribution is subject to the Boost Software |
| // License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at |
| // http://www.boost.org/LICENSE_1_0.txt) |
| |
| // Authors: Douglas Gregor |
| // Nick Edmonds |
| // Andrew Lumsdaine |
| #include <boost/property_map/parallel/distributed_property_map.hpp> |
| #include <boost/graph/parallel/detail/untracked_pair.hpp> |
| #include <boost/type_traits/is_base_and_derived.hpp> |
| #include <boost/bind.hpp> |
| #include <boost/graph/parallel/simple_trigger.hpp> |
| |
| #ifndef BOOST_GRAPH_USE_MPI |
| #error "Parallel BGL files should not be included unless <boost/graph/use_mpi.hpp> has been included" |
| #endif |
| |
| namespace boost { namespace parallel { |
| |
| template<typename ProcessGroup, typename GlobalMap, typename StorageMap> |
| template<typename Reduce> |
| PBGL_DISTRIB_PMAP |
| ::distributed_property_map(const ProcessGroup& pg, const GlobalMap& global, |
| const StorageMap& pm, const Reduce& reduce) |
| : data(new data_t(pg, global, pm, reduce, Reduce::non_default_resolver)) |
| { |
| typedef handle_message<Reduce> Handler; |
| |
| data->ghost_cells.reset(new ghost_cells_type()); |
| data->reset = &data_t::template do_reset<Reduce>; |
| data->process_group.replace_handler(Handler(data, reduce)); |
| data->process_group.template get_receiver<Handler>() |
| ->setup_triggers(data->process_group); |
| } |
| |
| template<typename ProcessGroup, typename GlobalMap, typename StorageMap> |
| PBGL_DISTRIB_PMAP::~distributed_property_map() { } |
| |
| template<typename ProcessGroup, typename GlobalMap, typename StorageMap> |
| template<typename Reduce> |
| void |
| PBGL_DISTRIB_PMAP::set_reduce(const Reduce& reduce) |
| { |
| typedef handle_message<Reduce> Handler; |
| data->process_group.replace_handler(Handler(data, reduce)); |
| Handler* handler = data->process_group.template get_receiver<Handler>(); |
| assert(handler); |
| handler->setup_triggers(data->process_group); |
| data->get_default_value = reduce; |
| data->has_default_resolver = Reduce::non_default_resolver; |
| int model = data->model; |
| data->reset = &data_t::template do_reset<Reduce>; |
| set_consistency_model(model); |
| } |
| |
| template<typename ProcessGroup, typename GlobalMap, typename StorageMap> |
| void PBGL_DISTRIB_PMAP::prune_ghost_cells() const |
| { |
| if (data->max_ghost_cells == 0) |
| return; |
| |
| while (data->ghost_cells->size() > data->max_ghost_cells) { |
| // Evict the last ghost cell |
| |
| if (data->model & cm_flush) { |
| // We need to flush values when we evict them. |
| boost::parallel::detail::untracked_pair<key_type, value_type> const& victim |
| = data->ghost_cells->back(); |
| send(data->process_group, get(data->global, victim.first).first, |
| property_map_put, victim); |
| } |
| |
| // Actually remove the ghost cell |
| data->ghost_cells->pop_back(); |
| } |
| } |
| |
| template<typename ProcessGroup, typename GlobalMap, typename StorageMap> |
| typename PBGL_DISTRIB_PMAP::value_type& |
| PBGL_DISTRIB_PMAP::cell(const key_type& key, bool request_if_missing) const |
| { |
| // Index by key |
| ghost_cells_key_index_type const& key_index |
| = data->ghost_cells->template get<1>(); |
| |
| // Search for the ghost cell by key, and project back to the sequence |
| iterator ghost_cell |
| = data->ghost_cells->template project<0>(key_index.find(key)); |
| if (ghost_cell == data->ghost_cells->end()) { |
| value_type value; |
| if (data->has_default_resolver) |
| // Since we have a default resolver, use it to create a default |
| // value for this ghost cell. |
| value = data->get_default_value(key); |
| else if (request_if_missing) |
| // Request the actual value of this key from its owner |
| send_oob_with_reply(data->process_group, get(data->global, key).first, |
| property_map_get, key, value); |
| else |
| value = value_type(); |
| |
| // Create a ghost cell containing the new value |
| ghost_cell |
| = data->ghost_cells->push_front(std::make_pair(key, value)).first; |
| |
| // If we need to, prune the ghost cells |
| if (data->max_ghost_cells > 0) |
| prune_ghost_cells(); |
| } else if (data->max_ghost_cells > 0) |
| // Put this cell at the beginning of the MRU list |
| data->ghost_cells->relocate(data->ghost_cells->begin(), ghost_cell); |
| |
| return const_cast<value_type&>(ghost_cell->second); |
| } |
| |
| template<typename ProcessGroup, typename GlobalMap, typename StorageMap> |
| template<typename Reduce> |
| void |
| PBGL_DISTRIB_PMAP |
| ::handle_message<Reduce>::operator()(process_id_type source, int tag) |
| { |
| assert(false); |
| } |
| |
| template<typename ProcessGroup, typename GlobalMap, typename StorageMap> |
| template<typename Reduce> |
| void |
| PBGL_DISTRIB_PMAP::handle_message<Reduce>:: |
| handle_put(int /*source*/, int /*tag*/, |
| const boost::parallel::detail::untracked_pair<key_type, value_type>& req, trigger_receive_context) |
| { |
| using boost::get; |
| |
| shared_ptr<data_t> data(data_ptr); |
| |
| owner_local_pair p = get(data->global, req.first); |
| assert(p.first == process_id(data->process_group)); |
| |
| detail::maybe_put(data->storage, p.second, |
| reduce(req.first, |
| get(data->storage, p.second), |
| req.second)); |
| } |
| |
| template<typename ProcessGroup, typename GlobalMap, typename StorageMap> |
| template<typename Reduce> |
| typename PBGL_DISTRIB_PMAP::value_type |
| PBGL_DISTRIB_PMAP::handle_message<Reduce>:: |
| handle_get(int source, int /*tag*/, const key_type& key, |
| trigger_receive_context) |
| { |
| using boost::get; |
| |
| shared_ptr<data_t> data(data_ptr); |
| assert(data); |
| |
| owner_local_pair p = get(data->global, key); |
| return get(data->storage, p.second); |
| } |
| |
| template<typename ProcessGroup, typename GlobalMap, typename StorageMap> |
| template<typename Reduce> |
| void |
| PBGL_DISTRIB_PMAP::handle_message<Reduce>:: |
| handle_multiget(int source, int tag, const std::vector<key_type>& keys, |
| trigger_receive_context) |
| { |
| shared_ptr<data_t> data(data_ptr); |
| assert(data); |
| |
| typedef boost::parallel::detail::untracked_pair<key_type, value_type> key_value; |
| std::vector<key_value> results; |
| std::size_t n = keys.size(); |
| results.reserve(n); |
| |
| using boost::get; |
| |
| for (std::size_t i = 0; i < n; ++i) { |
| local_key_type local_key = get(data->global, keys[i]).second; |
| results.push_back(key_value(keys[i], get(data->storage, local_key))); |
| } |
| send(data->process_group, source, property_map_multiget_reply, results); |
| } |
| |
| template<typename ProcessGroup, typename GlobalMap, typename StorageMap> |
| template<typename Reduce> |
| void |
| PBGL_DISTRIB_PMAP::handle_message<Reduce>:: |
| handle_multiget_reply |
| (int source, int tag, |
| const std::vector<boost::parallel::detail::untracked_pair<key_type, value_type> >& msg, |
| trigger_receive_context) |
| { |
| shared_ptr<data_t> data(data_ptr); |
| assert(data); |
| |
| // Index by key |
| ghost_cells_key_index_type const& key_index |
| = data->ghost_cells->template get<1>(); |
| |
| std::size_t n = msg.size(); |
| for (std::size_t i = 0; i < n; ++i) { |
| // Search for the ghost cell by key, and project back to the sequence |
| iterator position |
| = data->ghost_cells->template project<0>(key_index.find(msg[i].first)); |
| |
| if (position != data->ghost_cells->end()) |
| const_cast<value_type&>(position->second) = msg[i].second; |
| } |
| } |
| |
| template<typename ProcessGroup, typename GlobalMap, typename StorageMap> |
| template<typename Reduce> |
| void |
| PBGL_DISTRIB_PMAP::handle_message<Reduce>:: |
| handle_multiput |
| (int source, int tag, |
| const std::vector<unsafe_pair<local_key_type, value_type> >& values, |
| trigger_receive_context) |
| { |
| using boost::get; |
| |
| shared_ptr<data_t> data(data_ptr); |
| assert(data); |
| |
| std::size_t n = values.size(); |
| for (std::size_t i = 0; i < n; ++i) { |
| local_key_type local_key = values[i].first; |
| value_type local_value = get(data->storage, local_key); |
| detail::maybe_put(data->storage, values[i].first, |
| reduce(values[i].first, |
| local_value, |
| values[i].second)); |
| } |
| } |
| |
| template<typename ProcessGroup, typename GlobalMap, typename StorageMap> |
| template<typename Reduce> |
| void |
| PBGL_DISTRIB_PMAP::handle_message<Reduce>:: |
| setup_triggers(process_group_type& pg) |
| { |
| using boost::graph::parallel::simple_trigger; |
| |
| simple_trigger(pg, property_map_put, this, &handle_message::handle_put); |
| simple_trigger(pg, property_map_get, this, &handle_message::handle_get); |
| simple_trigger(pg, property_map_multiget, this, |
| &handle_message::handle_multiget); |
| simple_trigger(pg, property_map_multiget_reply, this, |
| &handle_message::handle_multiget_reply); |
| simple_trigger(pg, property_map_multiput, this, |
| &handle_message::handle_multiput); |
| } |
| |
| template<typename ProcessGroup, typename GlobalMap, typename StorageMap> |
| void |
| PBGL_DISTRIB_PMAP |
| ::on_synchronize::operator()() |
| { |
| int stage=0; // we only get called at the start now |
| shared_ptr<data_t> data(data_ptr); |
| assert(data); |
| |
| // Determine in which stage backward consistency messages should be sent. |
| int backward_stage = -1; |
| if (data->model & cm_backward) { |
| if (data->model & cm_flush) backward_stage = 1; |
| else backward_stage = 0; |
| } |
| |
| // Flush results in first stage |
| if (stage == 0 && data->model & cm_flush) |
| data->flush(); |
| |
| // Backward consistency |
| if (stage == backward_stage && !(data->model & (cm_clear | cm_reset))) |
| data->refresh_ghost_cells(); |
| |
| // Optionally clear results |
| if (data->model & cm_clear) |
| data->clear(); |
| |
| // Optionally reset results |
| if (data->model & cm_reset) { |
| if (data->reset) ((*data).*data->reset)(); |
| } |
| } |
| |
| |
| template<typename ProcessGroup, typename GlobalMap, typename StorageMap> |
| void |
| PBGL_DISTRIB_PMAP::set_consistency_model(int model) |
| { |
| data->model = model; |
| |
| int stages = 1; |
| bool need_on_synchronize = (model != cm_forward); |
| |
| // Backward consistency is a two-stage process. |
| if (model & cm_backward) { |
| if (model & cm_flush) stages = 3; |
| else stages = 2; |
| |
| // For backward consistency to work, we absolutely cannot throw |
| // away any ghost cells. |
| data->max_ghost_cells = 0; |
| } |
| |
| // attach the on_synchronize handler. |
| if (need_on_synchronize) |
| data->process_group.replace_on_synchronize_handler(on_synchronize(data)); |
| } |
| |
| template<typename ProcessGroup, typename GlobalMap, typename StorageMap> |
| void |
| PBGL_DISTRIB_PMAP::set_max_ghost_cells(std::size_t max_ghost_cells) |
| { |
| if ((data->model & cm_backward) && max_ghost_cells > 0) |
| boost::throw_exception(std::runtime_error("distributed_property_map::set_max_ghost_cells: " |
| "cannot limit ghost-cell usage with a backward " |
| "consistency model")); |
| |
| if (max_ghost_cells == 1) |
| // It is not safe to have only 1 ghost cell; the cell() method |
| // will fail. |
| max_ghost_cells = 2; |
| |
| data->max_ghost_cells = max_ghost_cells; |
| prune_ghost_cells(); |
| } |
| |
| template<typename ProcessGroup, typename GlobalMap, typename StorageMap> |
| void PBGL_DISTRIB_PMAP::clear() |
| { |
| data->clear(); |
| } |
| |
| template<typename ProcessGroup, typename GlobalMap, typename StorageMap> |
| void PBGL_DISTRIB_PMAP::data_t::clear() |
| { |
| ghost_cells->clear(); |
| } |
| |
| template<typename ProcessGroup, typename GlobalMap, typename StorageMap> |
| void PBGL_DISTRIB_PMAP::reset() |
| { |
| if (data->reset) ((*data).*data->reset)(); |
| } |
| |
| template<typename ProcessGroup, typename GlobalMap, typename StorageMap> |
| void PBGL_DISTRIB_PMAP::flush() |
| { |
| data->flush(); |
| } |
| |
| template<typename ProcessGroup, typename GlobalMap, typename StorageMap> |
| void PBGL_DISTRIB_PMAP::data_t::refresh_ghost_cells() |
| { |
| using boost::get; |
| |
| std::vector<std::vector<key_type> > keys; |
| keys.resize(num_processes(process_group)); |
| |
| // Collect the set of keys for which we will request values |
| for (iterator i = ghost_cells->begin(); i != ghost_cells->end(); ++i) |
| keys[get(global, i->first).first].push_back(i->first); |
| |
| // Send multiget requests to each of the other processors |
| typedef typename ProcessGroup::process_size_type process_size_type; |
| process_size_type n = num_processes(process_group); |
| process_id_type id = process_id(process_group); |
| for (process_size_type p = (id + 1) % n ; p != id ; p = (p + 1) % n) { |
| if (!keys[p].empty()) |
| send(process_group, p, property_map_multiget, keys[p]); |
| } |
| } |
| |
| template<typename ProcessGroup, typename GlobalMap, typename StorageMap> |
| void PBGL_DISTRIB_PMAP::data_t::flush() |
| { |
| using boost::get; |
| |
| int n = num_processes(process_group); |
| std::vector<std::vector<unsafe_pair<local_key_type, value_type> > > values; |
| values.resize(n); |
| |
| // Collect all of the flushed values |
| for (iterator i = ghost_cells->begin(); i != ghost_cells->end(); ++i) { |
| std::pair<int, local_key_type> g = get(global, i->first); |
| values[g.first].push_back(std::make_pair(g.second, i->second)); |
| } |
| |
| // Transmit flushed values |
| for (int p = 0; p < n; ++p) { |
| if (!values[p].empty()) |
| send(process_group, p, property_map_multiput, values[p]); |
| } |
| } |
| |
| template<typename ProcessGroup, typename GlobalMap, typename StorageMap> |
| void PBGL_DISTRIB_PMAP::do_synchronize() |
| { |
| if (data->model & cm_backward) { |
| synchronize(data->process_group); |
| return; |
| } |
| |
| // Request refreshes of the values of our ghost cells |
| data->refresh_ghost_cells(); |
| |
| // Allows all of the multigets to get to their destinations |
| synchronize(data->process_group); |
| |
| // Allows all of the multiget responses to get to their destinations |
| synchronize(data->process_group); |
| } |
| |
| template<typename ProcessGroup, typename GlobalMap, typename StorageMap> |
| template<typename Resolver> |
| void PBGL_DISTRIB_PMAP::data_t::do_reset() |
| { |
| Resolver* resolver = get_default_value.template target<Resolver>(); |
| assert(resolver); |
| |
| for (iterator i = ghost_cells->begin(); i != ghost_cells->end(); ++i) |
| const_cast<value_type&>(i->second) = (*resolver)(i->first); |
| } |
| |
| } } // end namespace boost::parallel |