/*
 * Copyright 2015 Facebook, Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include <gtest/gtest.h>

#include <folly/futures/Future.h>
#include <folly/futures/InlineExecutor.h>
#include <folly/futures/ManualExecutor.h>
#include <folly/futures/DrivableExecutor.h>
#include <folly/Baton.h>
#include <folly/MPMCQueue.h>

#include <thread>
#include <unistd.h>
#include <deque>
#include <mutex>
#include <condition_variable>

using namespace folly;

struct ManualWaiter : public DrivableExecutor {
  explicit ManualWaiter(std::shared_ptr<ManualExecutor> ex) : ex(ex) {}

  void add(Func f) override {
    ex->add(f);
  }

  void drive() override {
    ex->wait();
    ex->run();
  }

  std::shared_ptr<ManualExecutor> ex;
};

struct ViaFixture : public testing::Test {
  ViaFixture() :
    westExecutor(new ManualExecutor),
    eastExecutor(new ManualExecutor),
    waiter(new ManualWaiter(westExecutor)),
    done(false)
  {
    t = std::thread([=] {
        ManualWaiter eastWaiter(eastExecutor);
        while (!done)
          eastWaiter.drive();
      });
  }

  ~ViaFixture() override {
    done = true;
    eastExecutor->add([=]() { });
    t.join();
  }

  void addAsync(int a, int b, std::function<void(int&&)>&& cob) {
    eastExecutor->add([=]() {
      cob(a + b);
    });
  }

  std::shared_ptr<ManualExecutor> westExecutor;
  std::shared_ptr<ManualExecutor> eastExecutor;
  std::shared_ptr<ManualWaiter> waiter;
  InlineExecutor inlineExecutor;
  std::atomic<bool> done;
  std::thread t;
};

TEST(Via, exceptionOnLaunch) {
  auto future = makeFuture<int>(std::runtime_error("E"));
  EXPECT_THROW(future.value(), std::runtime_error);
}

TEST(Via, thenValue) {
  auto future = makeFuture(std::move(1))
    .then([](Try<int>&& t) {
      return t.value() == 1;
    })
    ;

  EXPECT_TRUE(future.value());
}

TEST(Via, thenFuture) {
  auto future = makeFuture(1)
    .then([](Try<int>&& t) {
      return makeFuture(t.value() == 1);
    });
  EXPECT_TRUE(future.value());
}

static Future<std::string> doWorkStatic(Try<std::string>&& t) {
  return makeFuture(t.value() + ";static");
}

TEST(Via, thenFunction) {
  struct Worker {
    Future<std::string> doWork(Try<std::string>&& t) {
      return makeFuture(t.value() + ";class");
    }
    static Future<std::string> doWorkStatic(Try<std::string>&& t) {
      return makeFuture(t.value() + ";class-static");
    }
  } w;

  auto f = makeFuture(std::string("start"))
    .then(doWorkStatic)
    .then(Worker::doWorkStatic)
    .then(&Worker::doWork, &w)
    ;

  EXPECT_EQ(f.value(), "start;static;class-static;class");
}

TEST_F(ViaFixture, threadHops) {
  auto westThreadId = std::this_thread::get_id();
  auto f = via(eastExecutor.get()).then([=](Try<Unit>&& t) {
    EXPECT_NE(std::this_thread::get_id(), westThreadId);
    return makeFuture<int>(1);
  }).via(westExecutor.get()
  ).then([=](Try<int>&& t) {
    EXPECT_EQ(std::this_thread::get_id(), westThreadId);
    return t.value();
  });
  EXPECT_EQ(f.getVia(waiter.get()), 1);
}

TEST_F(ViaFixture, chainVias) {
  auto westThreadId = std::this_thread::get_id();
  auto f = via(eastExecutor.get()).then([=]() {
    EXPECT_NE(std::this_thread::get_id(), westThreadId);
    return 1;
  }).then([=](int val) {
    return makeFuture(val).via(westExecutor.get())
      .then([=](int val) mutable {
        EXPECT_EQ(std::this_thread::get_id(), westThreadId);
        return val + 1;
      });
  }).then([=](int val) {
    // even though ultimately the future that triggers this one executed in
    // the west thread, this then() inherited the executor from its
    // predecessor, ie the eastExecutor.
    EXPECT_NE(std::this_thread::get_id(), westThreadId);
    return val + 1;
  }).via(westExecutor.get()).then([=](int val) {
    // go back to west, so we can wait on it
    EXPECT_EQ(std::this_thread::get_id(), westThreadId);
    return val + 1;
  });

  EXPECT_EQ(f.getVia(waiter.get()), 4);
}

TEST_F(ViaFixture, bareViaAssignment) {
  auto f = via(eastExecutor.get());
}
TEST_F(ViaFixture, viaAssignment) {
  // via()&&
  auto f = makeFuture().via(eastExecutor.get());
  // via()&
  auto f2 = f.via(eastExecutor.get());
}

TEST(Via, chain1) {
  EXPECT_EQ(42,
            makeFuture()
            .thenMulti([] { return 42; })
            .get());
}

TEST(Via, chain3) {
  int count = 0;
  auto f = makeFuture().thenMulti(
      [&]{ count++; return 3.14159; },
      [&](double) { count++; return std::string("hello"); },
      [&]{ count++; return makeFuture(42); });
  EXPECT_EQ(42, f.get());
  EXPECT_EQ(3, count);
}

struct PriorityExecutor : public Executor {
  void add(Func f) override {}

  void addWithPriority(Func f, int8_t priority) override {
    int mid = getNumPriorities() / 2;
    int p = priority < 0 ?
            std::max(0, mid + priority) :
            std::min(getNumPriorities() - 1, mid + priority);
    EXPECT_LT(p, 3);
    EXPECT_GE(p, 0);
    if (p == 0) {
      count0++;
    } else if (p == 1) {
      count1++;
    } else if (p == 2) {
      count2++;
    }
    f();
  }

  uint8_t getNumPriorities() const override {
    return 3;
  }

  int count0{0};
  int count1{0};
  int count2{0};
};

TEST(Via, priority) {
  PriorityExecutor exe;
  via(&exe, -1).then([]{});
  via(&exe, 0).then([]{});
  via(&exe, 1).then([]{});
  via(&exe, 42).then([]{});  // overflow should go to max priority
  via(&exe, -42).then([]{}); // underflow should go to min priority
  via(&exe).then([]{});      // default to mid priority
  via(&exe, Executor::LO_PRI).then([]{});
  via(&exe, Executor::HI_PRI).then([]{});
  EXPECT_EQ(3, exe.count0);
  EXPECT_EQ(2, exe.count1);
  EXPECT_EQ(3, exe.count2);
}

TEST_F(ViaFixture, chainX1) {
  EXPECT_EQ(42,
            makeFuture()
            .thenMultiWithExecutor(eastExecutor.get(),[] { return 42; })
            .get());
}

TEST_F(ViaFixture, chainX3) {
  auto westThreadId = std::this_thread::get_id();
  int count = 0;
  auto f = via(westExecutor.get()).thenMultiWithExecutor(
      eastExecutor.get(),
      [&]{
        EXPECT_NE(std::this_thread::get_id(), westThreadId);
        count++; return 3.14159;
      },
      [&](double) { count++; return std::string("hello"); },
      [&]{ count++; })
    .then([&](){
        EXPECT_EQ(std::this_thread::get_id(), westThreadId);
        return makeFuture(42);
    });
  EXPECT_EQ(42, f.getVia(waiter.get()));
  EXPECT_EQ(3, count);
}

TEST(Via, then2) {
  ManualExecutor x1, x2;
  bool a = false, b = false, c = false;
  via(&x1)
    .then([&]{ a = true; })
    .then(&x2, [&]{ b = true; })
    .then([&]{ c = true; });

  EXPECT_FALSE(a);
  EXPECT_FALSE(b);

  x1.run();
  EXPECT_TRUE(a);
  EXPECT_FALSE(b);
  EXPECT_FALSE(c);

  x2.run();
  EXPECT_TRUE(b);
  EXPECT_FALSE(c);

  x1.run();
  EXPECT_TRUE(c);
}

TEST(Via, then2Variadic) {
  struct Foo { bool a = false; void foo(Try<Unit>) { a = true; } };
  Foo f;
  ManualExecutor x;
  makeFuture().then(&x, &Foo::foo, &f);
  EXPECT_FALSE(f.a);
  x.run();
  EXPECT_TRUE(f.a);
}

#ifndef __APPLE__ // TODO #7372389
/// Simple executor that does work in another thread
class ThreadExecutor : public Executor {
  std::mutex mx;
  std::condition_variable cond;
  std::deque<Func> funcs2;
  std::atomic<bool> done {false};
  folly::Baton<> baton;
  std::thread worker;

  void work() {
    baton.post();
    Func fn;
    std::unique_lock<std::mutex> lock(mx);
    while (!done) {
      while (!funcs2.empty()) {
        fn = std::move(funcs2.front());
        funcs2.pop_front();
        lock.unlock();
        fn();
        lock.lock();
      }
      if (!done && funcs2.empty())
          cond.wait(lock);
    }
  }

 public:
  explicit ThreadExecutor(size_t n = 1024)
  {
    worker = std::thread(std::bind(&ThreadExecutor::work, this));
  }

  ~ThreadExecutor() override {
    {
        std::lock_guard<std::mutex> lock(mx);
        done = true;
        cond.notify_one();
    }
    worker.join();
  }

  void add(Func fn) override {
      std::lock_guard<std::mutex> lock(mx);
      funcs2.push_back(std::move(fn));
      cond.notify_one();
  }

  void waitForStartup() {
    baton.wait();
  }
};

TEST(Via, viaThenGetWasRacy) {
  ThreadExecutor x;
  std::unique_ptr<int> val = folly::via(&x)
    .then([] {
            return folly::make_unique<int>(42);
        })
    .get();
  ASSERT_TRUE(!!val);
  EXPECT_EQ(42, *val);
}

TEST(Via, callbackRace) {
  ThreadExecutor x;

  auto fn = [&x]{
    auto promises = std::make_shared<std::vector<Promise<Unit>>>(4);
    std::vector<Future<Unit>> futures;

    for (auto& p : *promises) {
      futures.emplace_back(
        p.getFuture()
        .via(&x)
        .then([](Try<Unit>&&){}));
    }

    x.waitForStartup();
    x.add([promises]{
      for (auto& p : *promises) {
        p.setValue();
      }
    });

    return collectAll(futures);
  };

  fn().wait();
}
#endif

class DummyDrivableExecutor : public DrivableExecutor {
 public:
  void add(Func f) override {}
  void drive() override { ran = true; }
  bool ran{false};
};

TEST(Via, getVia) {
  {
    // non-void
    ManualExecutor x;
    auto f = via(&x).then([]{ return true; });
    EXPECT_TRUE(f.getVia(&x));
  }

  {
    // void
    ManualExecutor x;
    auto f = via(&x).then();
    f.getVia(&x);
  }

  {
    DummyDrivableExecutor x;
    auto f = makeFuture(true);
    EXPECT_TRUE(f.getVia(&x));
    EXPECT_FALSE(x.ran);
  }
}

TEST(Via, waitVia) {
  {
    ManualExecutor x;
    auto f = via(&x).then();
    EXPECT_FALSE(f.isReady());
    f.waitVia(&x);
    EXPECT_TRUE(f.isReady());
  }

  {
    // try rvalue as well
    ManualExecutor x;
    auto f = via(&x).then().waitVia(&x);
    EXPECT_TRUE(f.isReady());
  }

  {
    DummyDrivableExecutor x;
    makeFuture(true).waitVia(&x);
    EXPECT_FALSE(x.ran);
  }
}

TEST(Via, viaRaces) {
  ManualExecutor x;
  Promise<Unit> p;
  auto tid = std::this_thread::get_id();
  bool done = false;

  std::thread t1([&] {
    p.getFuture()
      .via(&x)
      .then([&](Try<Unit>&&) { EXPECT_EQ(tid, std::this_thread::get_id()); })
      .then([&](Try<Unit>&&) { EXPECT_EQ(tid, std::this_thread::get_id()); })
      .then([&](Try<Unit>&&) { done = true; });
  });

  std::thread t2([&] {
    p.setValue();
  });

  while (!done) x.run();
  t1.join();
  t2.join();
}

TEST(ViaFunc, liftsVoid) {
  ManualExecutor x;
  int count = 0;
  Future<Unit> f = via(&x, [&]{ count++; });

  EXPECT_EQ(0, count);
  x.run();
  EXPECT_EQ(1, count);
}

TEST(ViaFunc, value) {
  ManualExecutor x;
  EXPECT_EQ(42, via(&x, []{ return 42; }).getVia(&x));
}

TEST(ViaFunc, exception) {
  ManualExecutor x;
  EXPECT_THROW(
    via(&x, []() -> int { throw std::runtime_error("expected"); })
      .getVia(&x),
    std::runtime_error);
}

TEST(ViaFunc, future) {
  ManualExecutor x;
  EXPECT_EQ(42, via(&x, []{ return makeFuture(42); })
            .getVia(&x));
}

TEST(ViaFunc, voidFuture) {
  ManualExecutor x;
  int count = 0;
  via(&x, [&]{ count++; }).getVia(&x);
  EXPECT_EQ(1, count);
}

TEST(ViaFunc, isSticky) {
  ManualExecutor x;
  int count = 0;

  auto f = via(&x, [&]{ count++; });
  x.run();

  f.then([&]{ count++; });
  EXPECT_EQ(1, count);
  x.run();
  EXPECT_EQ(2, count);
}
