blob: b65872c22219603a9a96cdc66a4511055c8bc308 [file] [log] [blame]
/*
* Copyright 2015 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <gtest/gtest.h>
#include <folly/futures/Future.h>
#include <folly/futures/InlineExecutor.h>
#include <folly/futures/ManualExecutor.h>
#include <folly/futures/DrivableExecutor.h>
#include <folly/Baton.h>
#include <folly/MPMCQueue.h>
#include <thread>
#include <unistd.h>
#include <deque>
#include <mutex>
#include <condition_variable>
using namespace folly;
struct ManualWaiter : public DrivableExecutor {
explicit ManualWaiter(std::shared_ptr<ManualExecutor> ex) : ex(ex) {}
void add(Func f) override {
ex->add(f);
}
void drive() override {
ex->wait();
ex->run();
}
std::shared_ptr<ManualExecutor> ex;
};
struct ViaFixture : public testing::Test {
ViaFixture() :
westExecutor(new ManualExecutor),
eastExecutor(new ManualExecutor),
waiter(new ManualWaiter(westExecutor)),
done(false)
{
t = std::thread([=] {
ManualWaiter eastWaiter(eastExecutor);
while (!done)
eastWaiter.drive();
});
}
~ViaFixture() override {
done = true;
eastExecutor->add([=]() { });
t.join();
}
void addAsync(int a, int b, std::function<void(int&&)>&& cob) {
eastExecutor->add([=]() {
cob(a + b);
});
}
std::shared_ptr<ManualExecutor> westExecutor;
std::shared_ptr<ManualExecutor> eastExecutor;
std::shared_ptr<ManualWaiter> waiter;
InlineExecutor inlineExecutor;
std::atomic<bool> done;
std::thread t;
};
TEST(Via, exceptionOnLaunch) {
auto future = makeFuture<int>(std::runtime_error("E"));
EXPECT_THROW(future.value(), std::runtime_error);
}
TEST(Via, thenValue) {
auto future = makeFuture(std::move(1))
.then([](Try<int>&& t) {
return t.value() == 1;
})
;
EXPECT_TRUE(future.value());
}
TEST(Via, thenFuture) {
auto future = makeFuture(1)
.then([](Try<int>&& t) {
return makeFuture(t.value() == 1);
});
EXPECT_TRUE(future.value());
}
static Future<std::string> doWorkStatic(Try<std::string>&& t) {
return makeFuture(t.value() + ";static");
}
TEST(Via, thenFunction) {
struct Worker {
Future<std::string> doWork(Try<std::string>&& t) {
return makeFuture(t.value() + ";class");
}
static Future<std::string> doWorkStatic(Try<std::string>&& t) {
return makeFuture(t.value() + ";class-static");
}
} w;
auto f = makeFuture(std::string("start"))
.then(doWorkStatic)
.then(Worker::doWorkStatic)
.then(&Worker::doWork, &w)
;
EXPECT_EQ(f.value(), "start;static;class-static;class");
}
TEST_F(ViaFixture, threadHops) {
auto westThreadId = std::this_thread::get_id();
auto f = via(eastExecutor.get()).then([=](Try<Unit>&& t) {
EXPECT_NE(std::this_thread::get_id(), westThreadId);
return makeFuture<int>(1);
}).via(westExecutor.get()
).then([=](Try<int>&& t) {
EXPECT_EQ(std::this_thread::get_id(), westThreadId);
return t.value();
});
EXPECT_EQ(f.getVia(waiter.get()), 1);
}
TEST_F(ViaFixture, chainVias) {
auto westThreadId = std::this_thread::get_id();
auto f = via(eastExecutor.get()).then([=]() {
EXPECT_NE(std::this_thread::get_id(), westThreadId);
return 1;
}).then([=](int val) {
return makeFuture(val).via(westExecutor.get())
.then([=](int val) mutable {
EXPECT_EQ(std::this_thread::get_id(), westThreadId);
return val + 1;
});
}).then([=](int val) {
// even though ultimately the future that triggers this one executed in
// the west thread, this then() inherited the executor from its
// predecessor, ie the eastExecutor.
EXPECT_NE(std::this_thread::get_id(), westThreadId);
return val + 1;
}).via(westExecutor.get()).then([=](int val) {
// go back to west, so we can wait on it
EXPECT_EQ(std::this_thread::get_id(), westThreadId);
return val + 1;
});
EXPECT_EQ(f.getVia(waiter.get()), 4);
}
TEST_F(ViaFixture, bareViaAssignment) {
auto f = via(eastExecutor.get());
}
TEST_F(ViaFixture, viaAssignment) {
// via()&&
auto f = makeFuture().via(eastExecutor.get());
// via()&
auto f2 = f.via(eastExecutor.get());
}
TEST(Via, chain1) {
EXPECT_EQ(42,
makeFuture()
.thenMulti([] { return 42; })
.get());
}
TEST(Via, chain3) {
int count = 0;
auto f = makeFuture().thenMulti(
[&]{ count++; return 3.14159; },
[&](double) { count++; return std::string("hello"); },
[&]{ count++; return makeFuture(42); });
EXPECT_EQ(42, f.get());
EXPECT_EQ(3, count);
}
struct PriorityExecutor : public Executor {
void add(Func f) override {}
void addWithPriority(Func f, int8_t priority) override {
int mid = getNumPriorities() / 2;
int p = priority < 0 ?
std::max(0, mid + priority) :
std::min(getNumPriorities() - 1, mid + priority);
EXPECT_LT(p, 3);
EXPECT_GE(p, 0);
if (p == 0) {
count0++;
} else if (p == 1) {
count1++;
} else if (p == 2) {
count2++;
}
f();
}
uint8_t getNumPriorities() const override {
return 3;
}
int count0{0};
int count1{0};
int count2{0};
};
TEST(Via, priority) {
PriorityExecutor exe;
via(&exe, -1).then([]{});
via(&exe, 0).then([]{});
via(&exe, 1).then([]{});
via(&exe, 42).then([]{}); // overflow should go to max priority
via(&exe, -42).then([]{}); // underflow should go to min priority
via(&exe).then([]{}); // default to mid priority
via(&exe, Executor::LO_PRI).then([]{});
via(&exe, Executor::HI_PRI).then([]{});
EXPECT_EQ(3, exe.count0);
EXPECT_EQ(2, exe.count1);
EXPECT_EQ(3, exe.count2);
}
TEST_F(ViaFixture, chainX1) {
EXPECT_EQ(42,
makeFuture()
.thenMultiWithExecutor(eastExecutor.get(),[] { return 42; })
.get());
}
TEST_F(ViaFixture, chainX3) {
auto westThreadId = std::this_thread::get_id();
int count = 0;
auto f = via(westExecutor.get()).thenMultiWithExecutor(
eastExecutor.get(),
[&]{
EXPECT_NE(std::this_thread::get_id(), westThreadId);
count++; return 3.14159;
},
[&](double) { count++; return std::string("hello"); },
[&]{ count++; })
.then([&](){
EXPECT_EQ(std::this_thread::get_id(), westThreadId);
return makeFuture(42);
});
EXPECT_EQ(42, f.getVia(waiter.get()));
EXPECT_EQ(3, count);
}
TEST(Via, then2) {
ManualExecutor x1, x2;
bool a = false, b = false, c = false;
via(&x1)
.then([&]{ a = true; })
.then(&x2, [&]{ b = true; })
.then([&]{ c = true; });
EXPECT_FALSE(a);
EXPECT_FALSE(b);
x1.run();
EXPECT_TRUE(a);
EXPECT_FALSE(b);
EXPECT_FALSE(c);
x2.run();
EXPECT_TRUE(b);
EXPECT_FALSE(c);
x1.run();
EXPECT_TRUE(c);
}
TEST(Via, then2Variadic) {
struct Foo { bool a = false; void foo(Try<Unit>) { a = true; } };
Foo f;
ManualExecutor x;
makeFuture().then(&x, &Foo::foo, &f);
EXPECT_FALSE(f.a);
x.run();
EXPECT_TRUE(f.a);
}
#ifndef __APPLE__ // TODO #7372389
/// Simple executor that does work in another thread
class ThreadExecutor : public Executor {
std::mutex mx;
std::condition_variable cond;
std::deque<Func> funcs2;
std::atomic<bool> done {false};
folly::Baton<> baton;
std::thread worker;
void work() {
baton.post();
Func fn;
std::unique_lock<std::mutex> lock(mx);
while (!done) {
while (!funcs2.empty()) {
fn = std::move(funcs2.front());
funcs2.pop_front();
lock.unlock();
fn();
lock.lock();
}
if (!done && funcs2.empty())
cond.wait(lock);
}
}
public:
explicit ThreadExecutor(size_t n = 1024)
{
worker = std::thread(std::bind(&ThreadExecutor::work, this));
}
~ThreadExecutor() override {
{
std::lock_guard<std::mutex> lock(mx);
done = true;
cond.notify_one();
}
worker.join();
}
void add(Func fn) override {
std::lock_guard<std::mutex> lock(mx);
funcs2.push_back(std::move(fn));
cond.notify_one();
}
void waitForStartup() {
baton.wait();
}
};
TEST(Via, viaThenGetWasRacy) {
ThreadExecutor x;
std::unique_ptr<int> val = folly::via(&x)
.then([] {
return folly::make_unique<int>(42);
})
.get();
ASSERT_TRUE(!!val);
EXPECT_EQ(42, *val);
}
TEST(Via, callbackRace) {
ThreadExecutor x;
auto fn = [&x]{
auto promises = std::make_shared<std::vector<Promise<Unit>>>(4);
std::vector<Future<Unit>> futures;
for (auto& p : *promises) {
futures.emplace_back(
p.getFuture()
.via(&x)
.then([](Try<Unit>&&){}));
}
x.waitForStartup();
x.add([promises]{
for (auto& p : *promises) {
p.setValue();
}
});
return collectAll(futures);
};
fn().wait();
}
#endif
class DummyDrivableExecutor : public DrivableExecutor {
public:
void add(Func f) override {}
void drive() override { ran = true; }
bool ran{false};
};
TEST(Via, getVia) {
{
// non-void
ManualExecutor x;
auto f = via(&x).then([]{ return true; });
EXPECT_TRUE(f.getVia(&x));
}
{
// void
ManualExecutor x;
auto f = via(&x).then();
f.getVia(&x);
}
{
DummyDrivableExecutor x;
auto f = makeFuture(true);
EXPECT_TRUE(f.getVia(&x));
EXPECT_FALSE(x.ran);
}
}
TEST(Via, waitVia) {
{
ManualExecutor x;
auto f = via(&x).then();
EXPECT_FALSE(f.isReady());
f.waitVia(&x);
EXPECT_TRUE(f.isReady());
}
{
// try rvalue as well
ManualExecutor x;
auto f = via(&x).then().waitVia(&x);
EXPECT_TRUE(f.isReady());
}
{
DummyDrivableExecutor x;
makeFuture(true).waitVia(&x);
EXPECT_FALSE(x.ran);
}
}
TEST(Via, viaRaces) {
ManualExecutor x;
Promise<Unit> p;
auto tid = std::this_thread::get_id();
bool done = false;
std::thread t1([&] {
p.getFuture()
.via(&x)
.then([&](Try<Unit>&&) { EXPECT_EQ(tid, std::this_thread::get_id()); })
.then([&](Try<Unit>&&) { EXPECT_EQ(tid, std::this_thread::get_id()); })
.then([&](Try<Unit>&&) { done = true; });
});
std::thread t2([&] {
p.setValue();
});
while (!done) x.run();
t1.join();
t2.join();
}
TEST(ViaFunc, liftsVoid) {
ManualExecutor x;
int count = 0;
Future<Unit> f = via(&x, [&]{ count++; });
EXPECT_EQ(0, count);
x.run();
EXPECT_EQ(1, count);
}
TEST(ViaFunc, value) {
ManualExecutor x;
EXPECT_EQ(42, via(&x, []{ return 42; }).getVia(&x));
}
TEST(ViaFunc, exception) {
ManualExecutor x;
EXPECT_THROW(
via(&x, []() -> int { throw std::runtime_error("expected"); })
.getVia(&x),
std::runtime_error);
}
TEST(ViaFunc, future) {
ManualExecutor x;
EXPECT_EQ(42, via(&x, []{ return makeFuture(42); })
.getVia(&x));
}
TEST(ViaFunc, voidFuture) {
ManualExecutor x;
int count = 0;
via(&x, [&]{ count++; }).getVia(&x);
EXPECT_EQ(1, count);
}
TEST(ViaFunc, isSticky) {
ManualExecutor x;
int count = 0;
auto f = via(&x, [&]{ count++; });
x.run();
f.then([&]{ count++; });
EXPECT_EQ(1, count);
x.run();
EXPECT_EQ(2, count);
}