| // Copyright (C) 2005-2006 Matthias Troyer |
| |
| // Use, modification and distribution is subject to the Boost Software |
| // License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at |
| // http://www.boost.org/LICENSE_1_0.txt) |
| |
| // An example of a parallel Monte Carlo simulation using some nodes to produce |
| // data and others to aggregate the data |
| #include <iostream> |
| |
| #include <boost/mpi.hpp> |
| #include <boost/random/parallel.hpp> |
| #include <boost/random.hpp> |
| #include <boost/foreach.hpp> |
| #include <iostream> |
| #include <cstdlib> |
| |
| namespace mpi = boost::mpi; |
| |
| enum {sample_tag, sample_skeleton_tag, sample_broadcast_tag, quit_tag}; |
| |
| |
| void calculate_samples(int sample_length) |
| { |
| int num_samples = 100; |
| std::vector<double> sample(sample_length); |
| |
| // setup communicator by splitting |
| |
| mpi::communicator world; |
| mpi::communicator calculate_communicator = world.split(0); |
| |
| unsigned int num_calculate_ranks = calculate_communicator.size(); |
| |
| // the master of the accumulaion ranks is the first of them, hence |
| // with a rank just one after the last calculation rank |
| int master_accumulate_rank = num_calculate_ranks; |
| |
| // the master of the calculation ranks sends the skeleton of the sample |
| // to the master of the accumulation ranks |
| |
| if (world.rank()==0) |
| world.send(master_accumulate_rank,sample_skeleton_tag,mpi::skeleton(sample)); |
| |
| // next we extract the content of the sample vector, to be used in sending |
| // the content later on |
| |
| mpi::content sample_content = mpi::get_content(sample); |
| |
| // now intialize the parallel random number generator |
| |
| boost::lcg64 engine( |
| boost::random::stream_number = calculate_communicator.rank(), |
| boost::random::total_streams = calculate_communicator.size() |
| ); |
| |
| boost::variate_generator<boost::lcg64&,boost::uniform_real<> > |
| rng(engine,boost::uniform_real<>()); |
| |
| for (unsigned int i=0; i<num_samples/num_calculate_ranks+1;++i) { |
| |
| // calculate sample by filling the vector with random numbers |
| // note that std::generate will not work since it takes the generator |
| // by value, and boost::ref cannot be used as a generator. |
| // boost::ref should be fixed so that it can be used as generator |
| |
| BOOST_FOREACH(double& x, sample) |
| x = rng(); |
| |
| // send sample to accumulation ranks |
| // Ideally we want to do this as a broadcast with an inter-communicator |
| // between the calculation and accumulation ranks. MPI2 should support |
| // this, but here we present an MPI1 compatible solution. |
| |
| // send content of sample to first (master) accumulation process |
| |
| world.send(master_accumulate_rank,sample_tag,sample_content); |
| |
| // gather some results from all calculation ranks |
| |
| double local_result = sample[0]; |
| std::vector<double> gathered_results(calculate_communicator.size()); |
| mpi::all_gather(calculate_communicator,local_result,gathered_results); |
| } |
| |
| // we are done: the master tells the accumulation ranks to quit |
| if (world.rank()==0) |
| world.send(master_accumulate_rank,quit_tag); |
| } |
| |
| |
| |
| void accumulate_samples() |
| { |
| std::vector<double> sample; |
| |
| // setup the communicator for all accumulation ranks by splitting |
| |
| mpi::communicator world; |
| mpi::communicator accumulate_communicator = world.split(1); |
| |
| bool is_master_accumulate_rank = accumulate_communicator.rank()==0; |
| |
| // the master receives the sample skeleton |
| |
| if (is_master_accumulate_rank) |
| world.recv(0,sample_skeleton_tag,mpi::skeleton(sample)); |
| |
| // and broadcasts it to all accumulation ranks |
| mpi::broadcast(accumulate_communicator,mpi::skeleton(sample),0); |
| |
| // next we extract the content of the sample vector, to be used in receiving |
| // the content later on |
| |
| mpi::content sample_content = mpi::get_content(sample); |
| |
| // accumulate until quit is called |
| double sum=0.; |
| while (true) { |
| |
| |
| // the accumulation master checks whether we should quit |
| if (world.iprobe(0,quit_tag)) { |
| world.recv(0,quit_tag); |
| for (int i=1; i<accumulate_communicator.size();++i) |
| accumulate_communicator.send(i,quit_tag); |
| std::cout << sum << "\n"; |
| break; // We're done |
| } |
| |
| // the otehr accumulation ranks check whether we should quit |
| if (accumulate_communicator.iprobe(0,quit_tag)) { |
| accumulate_communicator.recv(0,quit_tag); |
| std::cout << sum << "\n"; |
| break; // We're done |
| } |
| |
| // check whether the master accumulation rank has received a sample |
| if (world.iprobe(mpi::any_source,sample_tag)) { |
| BOOST_ASSERT(is_master_accumulate_rank); |
| |
| // receive the content |
| world.recv(mpi::any_source,sample_tag,sample_content); |
| |
| // now we need to braodcast |
| // the problam is we do not have a non-blocking broadcast that we could |
| // abort if we receive a quit message from the master. We thus need to |
| // first tell all accumulation ranks to start a broadcast. If the sample |
| // is small, we could just send the sample in this message, but here we |
| // optimize the code for large samples, so that the overhead of these |
| // sends can be ignored, and we count on an optimized broadcast |
| // implementation with O(log N) complexity |
| |
| for (int i=1; i<accumulate_communicator.size();++i) |
| accumulate_communicator.send(i,sample_broadcast_tag); |
| |
| // now broadcast the contents of the sample to all accumulate ranks |
| mpi::broadcast(accumulate_communicator,sample_content,0); |
| |
| // and handle the sample by summing the appropriate value |
| sum += sample[0]; |
| } |
| |
| // the other accumulation ranks wait for a mesage to start the broadcast |
| if (accumulate_communicator.iprobe(0,sample_broadcast_tag)) { |
| BOOST_ASSERT(!is_master_accumulate_rank); |
| |
| accumulate_communicator.recv(0,sample_broadcast_tag); |
| |
| // receive broadcast of the sample contents |
| mpi::broadcast(accumulate_communicator,sample_content,0); |
| |
| // and handle the sample |
| |
| // and handle the sample by summing the appropriate value |
| sum += sample[accumulate_communicator.rank()]; |
| } |
| } |
| } |
| |
| int main(int argc, char** argv) |
| { |
| mpi::environment env(argc, argv); |
| mpi::communicator world; |
| |
| // half of the processes generate, the others accumulate |
| // the sample size is just the number of accumulation ranks |
| if (world.rank() < world.size()/2) |
| calculate_samples(world.size()-world.size()/2); |
| else |
| accumulate_samples(); |
| |
| return 0; |
| } |
| |
| |