| /* |
| * Copyright 2015 Facebook, Inc. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| #pragma once |
| |
| #include <algorithm> |
| #include <atomic> |
| #include <limits> |
| #include <chrono> |
| |
| #include <folly/Likely.h> |
| |
| namespace folly { |
| |
| /** Threadsafe TokenBucket implementation, based on the idea of |
| * converting tokens into time and maintaining state as a timestamp relative to |
| * now. The number of tokens available is represented by the delta between now |
| * and the timestamp, and the 'burst' is represented by the maximum delta. |
| */ |
| class TokenBucket { |
| private: |
| std::atomic<double> time_; |
| std::atomic<double> secondsPerToken_; |
| std::atomic<double> secondsPerBurst_; |
| |
| public: |
| TokenBucket(double rate, double burst, double nowInSeconds) noexcept |
| : time_(nowInSeconds) { |
| reset(rate, burst, nowInSeconds); |
| } |
| |
| void reset(double rate, double burst, double nowInSeconds) noexcept { |
| double tokens = available(nowInSeconds); |
| |
| secondsPerToken_.store( |
| 1.0 / rate - std::numeric_limits<double>::epsilon(), |
| std::memory_order_relaxed); |
| |
| secondsPerBurst_.store( |
| burst / rate + std::numeric_limits<double>::epsilon(), |
| std::memory_order_relaxed); |
| |
| set_capacity(tokens, nowInSeconds); |
| } |
| |
| void set_capacity(double tokens, double nowInSeconds) noexcept { |
| const double secondsPerToken = std::atomic_load_explicit( |
| &secondsPerToken_, std::memory_order_relaxed); |
| |
| const double secondsPerBurst = std::atomic_load_explicit( |
| &secondsPerBurst_, std::memory_order_relaxed); |
| |
| double newTime = nowInSeconds - std::min( |
| tokens * secondsPerToken, secondsPerBurst); |
| |
| time_.store(newTime, std::memory_order_relaxed); |
| } |
| |
| // If there are `tokens` avilable at `nowInSeconds`, consume them and |
| // return true. Otherwise, return false. |
| // |
| // This implementation is written in a lock-free manner using a |
| // compare-and-exchange loop, with branch prediction optimized to minimize |
| // time spent in the 'success' case which performs a write. |
| bool consume(double tokens, double nowInSeconds) noexcept { |
| const double secondsNeeded = tokens * std::atomic_load_explicit( |
| &secondsPerToken_, std::memory_order_relaxed); |
| |
| const double minTime = nowInSeconds - std::atomic_load_explicit( |
| &secondsPerBurst_, std::memory_order_relaxed); |
| |
| double oldTime = |
| std::atomic_load_explicit(&time_, std::memory_order_relaxed); |
| double newTime = oldTime; |
| |
| // Limit the number of available tokens to 'burst'. We don't need to do |
| // this inside the loop because if we iterate more than once another |
| // caller will have performed an update that also covered this |
| // calculation. Also, tell the compiler to optimize branch prediction to |
| // minimize time spent between reads and writes in the success case |
| if (UNLIKELY(minTime > oldTime)) { |
| newTime = minTime; |
| } |
| |
| while (true) { |
| newTime += secondsNeeded; |
| |
| // Optimize for the write-contention case, to minimize the impact of |
| // branch misprediction on other threads |
| if (UNLIKELY(newTime > nowInSeconds)) { |
| return false; |
| } |
| |
| // Optimize for the write-contention case, to minimize the impact of |
| // branch misprediction on other threads |
| if (LIKELY(std::atomic_compare_exchange_weak_explicit( |
| &time_, &oldTime, newTime, |
| std::memory_order_relaxed, std::memory_order_relaxed))) { |
| return true; |
| } |
| |
| newTime = oldTime; |
| } |
| |
| return true; |
| } |
| |
| // Similar to consume, but will always consume some number of tokens. |
| double consumeOrDrain(double tokens, double nowInSeconds) noexcept { |
| const double secondsPerToken = std::atomic_load_explicit( |
| &secondsPerToken_, std::memory_order_relaxed); |
| |
| const double secondsNeeded = tokens * secondsPerToken; |
| const double minTime = nowInSeconds - std::atomic_load_explicit( |
| &secondsPerBurst_, std::memory_order_relaxed); |
| |
| double oldTime = |
| std::atomic_load_explicit(&time_, std::memory_order_relaxed); |
| double newTime = oldTime; |
| |
| |
| // Limit the number of available tokens to 'burst'. |
| // Also, tell the compiler to optimize branch prediction to |
| // minimize time spent between reads and writes in the success case |
| if (UNLIKELY(minTime > oldTime)) { |
| newTime = minTime; |
| } |
| |
| double consumed; |
| |
| newTime += secondsNeeded; |
| |
| consumed = (newTime - nowInSeconds) / secondsPerToken; |
| time_.store(newTime, std::memory_order_relaxed); |
| |
| return consumed; |
| } |
| |
| double available(double nowInSeconds = defaultClockNow()) const noexcept { |
| double time = |
| std::atomic_load_explicit(&time_, std::memory_order_relaxed); |
| |
| double deltaTime = std::min( |
| std::atomic_load_explicit(&secondsPerBurst_, |
| std::memory_order_relaxed), |
| nowInSeconds - time); |
| |
| return std::max(0.0, deltaTime / std::atomic_load_explicit( |
| &secondsPerToken_, std::memory_order_relaxed)); |
| } |
| |
| static double defaultClockNow() { |
| return std::chrono::duration_cast<std::chrono::microseconds>( |
| std::chrono::steady_clock::now().time_since_epoch() |
| ).count() / 1000000.0; |
| } |
| }; |
| |
| } |