blob: d8e82c2e6888710bce88a75c3f158f2befbe1232 [file] [log] [blame]
// Copyright (C) 2014 Ian Forbed
// Copyright (C) 2014 Vicente J. Botet Escriba
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include <boost/config.hpp>
#if ! defined BOOST_NO_CXX11_DECLTYPE
#define BOOST_RESULT_OF_USE_DECLTYPE
#endif
#define BOOST_THREAD_VERSION 4
#define BOOST_THREAD_PROVIDES_EXECUTORS
#include <exception>
#include <boost/thread/thread.hpp>
#include <boost/thread/barrier.hpp>
#include <boost/thread/concurrent_queues/sync_priority_queue.hpp>
#include <boost/core/lightweight_test.hpp>
typedef boost::concurrent::sync_priority_queue<int> sync_pq;
int call_pull(sync_pq* q, boost::barrier* go)
{
go->wait();
return q->pull();
}
void call_push(sync_pq* q, boost::barrier* go, int val)
{
go->wait();
q->push(val);
}
void test_pull(const int n)
{
sync_pq pq;
BOOST_TEST(pq.empty());
for(int i = 0; i < n; i++)
{
pq.push(i);
}
BOOST_TEST(!pq.empty());
BOOST_TEST_EQ(pq.size(), std::size_t(n));
pq.close();
BOOST_TEST(pq.closed());
boost::barrier b(n);
boost::thread_group tg;
for(int i = 0; i < n; i++)
{
tg.create_thread(boost::bind(call_pull, &pq, &b));
}
tg.join_all();
BOOST_TEST(pq.empty());
}
void test_push(const int n)
{
sync_pq pq;
BOOST_TEST(pq.empty());
boost::barrier b(n);
boost::thread_group tg;
for(int i = 0; i < n; i++)
{
tg.create_thread(boost::bind(call_push, &pq, &b, i));
}
tg.join_all();
BOOST_TEST(!pq.empty());
BOOST_TEST_EQ(pq.size(), std::size_t(n));
}
void test_both(const int n)
{
sync_pq pq;
BOOST_TEST(pq.empty());
boost::barrier b(2*n);
boost::thread_group tg;
for(int i = 0; i < n; i++)
{
tg.create_thread(boost::bind(call_pull, &pq, &b));
tg.create_thread(boost::bind(call_push, &pq, &b, i));
}
tg.join_all();
BOOST_TEST(pq.empty());
BOOST_TEST_EQ(pq.size(), std::size_t(0));
}
void push_range(sync_pq* q, const int begin, const int end)
{
for(int i = begin; i < end; i++)
q->push(i);
}
void atomic_pull(sync_pq* q, boost::atomic<int>* sum)
{
while(1)
{
try{
const int val = q->pull();
sum->fetch_add(val);
}
catch(std::exception& e ){
break;
}
}
}
/**
* This test computes the sum of the first N integers upto $limit using
* $n threads for the push operation and $n threads for the pull and count
* operation. The push operation push a range of numbers on the queue while
* the pull operation pull from the queue and increments an atomic int.
* At the end of execution the value of atomic<int> $sum should be the same
* as n*(n+1)/2 as this is the closed form solution to this problem.
*/
void compute_sum(const int n)
{
const int limit = 1000;
sync_pq pq;
BOOST_TEST(pq.empty());
boost::atomic<int> sum(0);
boost::thread_group tg1;
boost::thread_group tg2;
for(int i = 0; i < n; i++)
{
tg1.create_thread(boost::bind(push_range, &pq, i*(limit/n)+1, (i+1)*(limit/n)+1));
tg2.create_thread(boost::bind(atomic_pull, &pq, &sum));
}
tg1.join_all();
pq.close(); //Wait until all enqueuing is done before closing.
BOOST_TEST(pq.closed());
tg2.join_all();
BOOST_TEST(pq.empty());
BOOST_TEST_EQ(sum.load(), limit*(limit+1)/2);
}
void move_between_queues(sync_pq* q1, sync_pq* q2)
{
while(1){
try{
const int val = q1->pull();
q2->push(val);
}
catch(std::exception& e){
break;
}
}
}
/**
* This test computes the sum of the first N integers upto $limit by moving
* numbers between 2 sync_priority_queues. A range of numbers are pushed onto
* one queue by $n threads while $n threads pull from this queue and push onto
* another sync_pq. At the end the main thread ensures the the values in the
* second queue are in proper order and then sums all the values from this
* queue. The sum should match n*(n+1)/2, the closed form solution to this
* problem.
*/
void sum_with_moving(const int n)
{
const int limit = 1000;
sync_pq pq1;
sync_pq pq2;
BOOST_TEST(pq1.empty());
BOOST_TEST(pq2.empty());
boost::thread_group tg1;
boost::thread_group tg2;
for(int i = 0; i < n; i++)
{
tg1.create_thread(boost::bind(push_range, &pq1, i*(limit/n)+1, (i+1)*(limit/n)+1));
tg2.create_thread(boost::bind(move_between_queues, &pq1, &pq2));
}
tg1.join_all();
pq1.close(); //Wait until all enqueuing is done before closing.
BOOST_TEST(pq1.closed());
tg2.join_all();
BOOST_TEST(pq1.empty());
BOOST_TEST(!pq2.empty());
int sum = 0;
for(int i = 1000; i > 0; i--){
const int val = pq2.pull();
BOOST_TEST_EQ(i,val);
sum += val;
}
BOOST_TEST(pq2.empty());
BOOST_TEST_EQ(sum, limit*(limit+1)/2);
}
int main()
{
for(int i = 1; i <= 64; i *= 2)
{
test_pull(i);
test_push(i);
test_both(i);
}
//These numbers must divide 1000
compute_sum(1);
compute_sum(4);
compute_sum(10);
compute_sum(25);
compute_sum(50);
sum_with_moving(1);
sum_with_moving(4);
sum_with_moving(10);
sum_with_moving(25);
sum_with_moving(50);
return boost::report_errors();
}