/* * Copyright (C) 2016 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #pragma once #include #include #include #include "common/libs/time/monotonic_time.h" /** * This abstract class simulates a buffer that either fills or empties at * a specified rate. * * The simulated buffer automatically fills or empties at a specific rate. * * An item is the thing contained in the simulated buffer. Items are moved * in and out of the buffer without subdivision. * * An integral number of items must arrive / depart in each second. * This number is stored in items_per_second_ * * items_per_second * 2000000000 must fit within an int64_t. This * works if items_per_second is represented by an int32. * * The base class does have the concept of capacity, but doesn't use it. * It is included here to simplify unit testing. * * For actual use, see SimulatedInputBuffer and SimulatedOutputBuffer below. */ class SimulatedBufferBase { public: static inline int64_t divide_and_round_up(int64_t q, int64_t d) { return q / d + ((q % d) != 0); } SimulatedBufferBase( int32_t items_per_second, int64_t simulated_item_capacity, cvd::time::MonotonicTimePointFactory* clock = cvd::time::MonotonicTimePointFactory::GetInstance()) : clock_(clock), current_item_num_(0), base_item_num_(0), simulated_item_capacity_(simulated_item_capacity), items_per_second_(items_per_second), initialize_(true), paused_(false) { } virtual ~SimulatedBufferBase() { } int64_t GetCurrentItemNum() { Update(); return current_item_num_; } const cvd::time::MonotonicTimePoint GetLastUpdatedTime() const { return current_time_; } // Sleep for the given amount of time. Subclasses may override this to use // different sleep calls. // Sleep is best-effort. The code assumes that the acutal sleep time may be // greater or less than the time requested. virtual void SleepUntilTime(const cvd::time::MonotonicTimePoint& in) { struct timespec ts; in.ToTimespec(&ts); clock_nanosleep(CLOCK_MONOTONIC, TIMER_ABSTIME, &ts, NULL); } // The time counter may not start at 0. Concrete classes should call this // to allow the buffer simulation to read the current time number and // initialize its internal state. virtual void Init() { if (initialize_) { clock_->FetchCurrentTime(&base_time_); current_time_ = base_time_; initialize_ = false; } } virtual void Update() { if (initialize_) { Init(); } cvd::time::MonotonicTimePoint now; clock_->FetchCurrentTime(&now); // We can't call FetchCurrentTime() in the constuctor because a subclass may // want to override it, so we initialze the times to 0. If we detect this // case go ahead and initialize to a current timestamp. if (paused_) { base_time_ += now - current_time_; current_time_ = now; return; } // Avoid potential overflow by limiting the scaling to one time second. // There is no round-off error here because the bases are adjusted for full // seconds. // There is no issue with int64 overflow because 2's compliment subtraction // is immune to overflow. // However, this does assume that kNanosecondsPerSecond * items_per_second_ // fits in an int64. cvd::time::Seconds seconds(now - base_time_); base_time_ += seconds; base_item_num_ += seconds.count() * items_per_second_; current_time_ = now; current_item_num_ = cvd::time::Nanoseconds(now - base_time_).count() * items_per_second_ / cvd::time::kNanosecondsPerSecond + base_item_num_; } // If set to true new items will not be created. bool SetPaused(bool new_state) { bool rval = paused_; Update(); paused_ = new_state; return rval; } // Calculate the TimePoint that corresponds to an item. // Caution: This may not return a correct time for items in the past. cvd::time::MonotonicTimePoint CalculateItemTime(int64_t item) { int64_t seconds = (item - base_item_num_) / items_per_second_; int64_t new_base_item_num = base_item_num_ + seconds * items_per_second_; return base_time_ + cvd::time::Seconds(seconds) + cvd::time::Nanoseconds(divide_and_round_up( (item - new_base_item_num) * cvd::time::kNanosecondsPerSecond, items_per_second_)); } // Sleep until the given item number is generated. If the generator is // paused unpause it to make the sleep finite. void SleepUntilItem(int64_t item) { if (paused_) { SetPaused(false); } cvd::time::MonotonicTimePoint desired_time = CalculateItemTime(item); while (1) { Update(); if (current_item_num_ - item >= 0) { return; } SleepUntilTime(desired_time); } } protected: // Source of the timepoints. cvd::time::MonotonicTimePointFactory* clock_; // Time when the other values in the structure were updated. cvd::time::MonotonicTimePoint current_time_; // Most recent time when there was no round-off error between the clock and // items. cvd::time::MonotonicTimePoint base_time_; // Number of the current item. int64_t current_item_num_; // Most recent item number where there was no round-off error between the // clock and items. int64_t base_item_num_; // Simulated_Item_Capacity of the buffer in items. int64_t simulated_item_capacity_; // Number of items that are created in 1s. A typical number would be 48000. int32_t items_per_second_; bool initialize_; // If true then don't generate new items. bool paused_; }; /** * This is a simulation of an output buffer that drains at a constant rate. */ class SimulatedOutputBuffer : public SimulatedBufferBase { public: SimulatedOutputBuffer( int64_t item_rate, int64_t simulated_item_capacity, cvd::time::MonotonicTimePointFactory* clock = cvd::time::MonotonicTimePointFactory::GetInstance()) : SimulatedBufferBase(item_rate, simulated_item_capacity, clock) { output_buffer_item_num_ = current_item_num_; } void Update() override { SimulatedBufferBase::Update(); if ((output_buffer_item_num_ - current_item_num_) < 0) { // We ran out of items at some point in the past. However, the // output capactiy can't be negative. output_buffer_item_num_ = current_item_num_; } } int64_t AddToOutputBuffer(int64_t num_new_items, bool block) { Update(); // The easy case: num_new_items fit in the bucket. if ((output_buffer_item_num_ + num_new_items - current_item_num_) <= simulated_item_capacity_) { output_buffer_item_num_ += num_new_items; return num_new_items; } // If we're non-blocking accept enough items to fill the output. if (!block) { int64_t used = current_item_num_ + simulated_item_capacity_ - output_buffer_item_num_; output_buffer_item_num_ = current_item_num_ + simulated_item_capacity_; return used; } int64_t new_output_buffer_item_num = output_buffer_item_num_ + num_new_items; SleepUntilItem(new_output_buffer_item_num - simulated_item_capacity_); output_buffer_item_num_ = new_output_buffer_item_num; return num_new_items; } int64_t GetNextOutputBufferItemNum() { Update(); return output_buffer_item_num_; } cvd::time::MonotonicTimePoint GetNextOutputBufferItemTime() { Update(); return CalculateItemTime(output_buffer_item_num_); } int64_t GetOutputBufferSize() { Update(); return output_buffer_item_num_ - current_item_num_; } void Drain() { SleepUntilItem(output_buffer_item_num_); } protected: int64_t output_buffer_item_num_; }; /** * Simulates an input buffer that fills at a constant rate. */ class SimulatedInputBuffer : public SimulatedBufferBase { public: SimulatedInputBuffer( int64_t item_rate, int64_t simulated_item_capacity, cvd::time::MonotonicTimePointFactory* clock = cvd::time::MonotonicTimePointFactory::GetInstance()) : SimulatedBufferBase(item_rate, simulated_item_capacity, clock) { input_buffer_item_num_ = current_item_num_; lost_input_items_ = 0; } void Update() override { SimulatedBufferBase::Update(); if ((current_item_num_ - input_buffer_item_num_) > simulated_item_capacity_) { // The buffer overflowed at some point in the past. Account for the lost // times. int64_t new_input_buffer_item_num = current_item_num_ - simulated_item_capacity_; lost_input_items_ += new_input_buffer_item_num - input_buffer_item_num_; input_buffer_item_num_ = new_input_buffer_item_num; } } int64_t RemoveFromInputBuffer(int64_t num_items_wanted, bool block) { Update(); if (!block) { int64_t num_items_available = current_item_num_ - input_buffer_item_num_; if (num_items_available < num_items_wanted) { input_buffer_item_num_ += num_items_available; return num_items_available; } else { input_buffer_item_num_ += num_items_wanted; return num_items_wanted; } } // Calculate the item number that is being claimed. Sleep until it appears. // Advancing input_buffer_item_num_ causes a negative value to be compared // to the capacity, effectively disabling the overflow detection code // in Update(). input_buffer_item_num_ += num_items_wanted; while (input_buffer_item_num_ - current_item_num_ > 0) { SleepUntilItem(input_buffer_item_num_); } return num_items_wanted; } int64_t GetLostInputItems() { Update(); int64_t rval = lost_input_items_; lost_input_items_ = 0; return rval; } protected: int64_t input_buffer_item_num_; int64_t lost_input_items_; };