1 /** 2 * Copyright 2020 Huawei Technologies Co., Ltd 3 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 8 * http://www.apache.org/licenses/LICENSE-2.0 9 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #ifndef MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_CACHE_SERVICE_H_ 18 #define MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_CACHE_SERVICE_H_ 19 20 #include <algorithm> 21 #include <atomic> 22 #include <memory> 23 #include <mutex> 24 #include <string> 25 #include <type_traits> 26 #include <utility> 27 #include <vector> 28 29 #include "minddata/dataset/core/global_context.h" 30 #include "minddata/dataset/core/tensor.h" 31 #include "minddata/dataset/engine/cache/cache_request.h" 32 #include "minddata/dataset/engine/cache/cache_pool.h" 33 #include "minddata/dataset/util/arena.h" 34 #include "minddata/dataset/util/btree.h" 35 #include "minddata/dataset/util/service.h" 36 #include "minddata/dataset/util/services.h" 37 #include "minddata/dataset/util/system_pool.h" 38 39 namespace mindspore { 40 namespace dataset { 41 /// \brief A cache service for storing/fetching buffers to in memory cache and may spill to disk the cache service is 42 /// created to support spilling 43 class CacheService : public Service { 44 public: 45 friend class CacheServer; 46 47 /// \brief Constructor 48 /// \param mem_sz Memory size to be set aside for the in memory cache. 0 means unlimited 49 /// \param root Spill path. Empty string means no spilling 50 /// \param generate_id If the cache service should generate row id for buffer that is cached. 51 /// For non-mappable dataset, this should be set to true. 52 CacheService(uint64_t mem_sz, const std::string &root, bool generate_id); 53 ~CacheService() override; 54 55 Status DoServiceStart() override; 56 Status DoServiceStop() override; 57 58 /// \brief Main function to cache a row which is in form a series of buffers. 59 /// The first buffer is a Google flatbuffer which describes the rest of the buffers followed. 60 /// \param[in] buf Vector of buffer 61 /// \param[out] row_id_generated The row id assigned to this row if any 62 /// \return Status object 63 Status CacheRow(const std::vector<const void *> &buf, row_id_type *row_id_generated); 64 65 /// \brief A fast version of CacheRow where all the data is already in one contiguous piece. 66 /// \param src Slice of the data 67 /// \param row_id_generated 68 /// \return Status object 69 Status FastCacheRow(const ReadableSlice &src, row_id_type *row_id_generated); 70 71 /// \brief This function is used in preparation for batch fetching. 72 /// It calculates how much memory we should allocate and which row id are present, etc. 73 /// All needed results are stored in the flat buffer. 74 /// \return Status object 75 Status PreBatchFetch(connection_id_type connection_id, const std::vector<row_id_type> &v, 76 const std::shared_ptr<flatbuffers::FlatBufferBuilder> &); 77 78 /// \brief Getter function 79 /// \return Spilling path 80 Path GetSpillPath() const; 81 /// \brief A structure returned from the cache server for statistics request. 82 class ServiceStat { 83 public: 84 using state_type = std::underlying_type<CacheServiceState>::type; ServiceStat()85 ServiceStat() : state_(0) {} 86 ~ServiceStat() = default; 87 CachePool::CacheStat stat_{}; 88 state_type state_; 89 }; 90 /// \brief Statistics for the current service 91 /// \param[in/out] A pointer to a pre-allocated ServiceStat structure 92 /// \return Status Object 93 Status GetStat(ServiceStat *); 94 /// \brief Return the current state GetState()95 CacheServiceState GetState() const { return st_.load(); } 96 /// \brief Cache schema 97 /// \param buf A Google Flatbuffer that contains the schema 98 /// \param len size of the buffer 99 /// \return Status object 100 Status CacheSchema(const void *buf, int64_t len); 101 /// \brief Fetch schema 102 /// \param out A contiguous memory that contains the serialized form of schema. 103 /// \return Status object 104 Status FetchSchema(std::string *out) const; 105 /// \brief Return a set of keys that are definitely cache miss 106 /// \return Status object 107 Status FindKeysMiss(std::vector<row_id_type> *out); 108 /// \brief Overload the << operator to print a cache service 109 /// \param out std::ostream 110 /// \param cs A cache service 111 /// \return std::ostream 112 friend std::ostream &operator<<(std::ostream &out, const CacheService &cs); 113 /// \brief Every cache service has a cookie. If the cookie of a CacheClient matches this cookie, this CacheClient 114 /// is the creator 115 /// \return Cookie cookie()116 std::string cookie() const { return cookie_; } 117 /// \brief If this cache service generates row id for buffer cached, it is divided into two phases, a build phase and 118 /// a read phase. 119 /// \return True if has two phases. HasBuildPhase()120 bool HasBuildPhase() const { return generate_id_; } 121 /// \brief Change from write phase to read phase. Only the creator of this service is allowed to make this call. 122 /// \return Status object 123 Status BuildPhaseDone(); 124 /// \brief For kToggleWriteMode request 125 Status ToggleWriteMode(bool on_off); 126 127 private: 128 mutable RWLock rw_lock_; 129 std::string root_; 130 uint64_t cache_mem_sz_; 131 std::shared_ptr<CachePool> cp_; 132 std::atomic<row_id_type> next_id_; 133 bool generate_id_; 134 std::string cookie_; 135 std::atomic<int32_t> num_clients_; 136 std::atomic<CacheServiceState> st_; 137 std::string schema_; 138 std::shared_ptr<NumaMemoryPool> numa_pool_; 139 // We also cache the result from calling FindKeysMiss because it is expensive. Besides user make 140 // this request after we hit memory full or disk full. So the result is unlikely to change. 141 std::mutex get_key_miss_mux_; 142 std::shared_ptr<std::vector<row_id_type>> key_miss_results_; 143 /// \brief Private function to generate a row id 144 /// \return Row id assigned. GetNextRowId()145 row_id_type GetNextRowId() { return next_id_.fetch_add(1); } 146 147 Status InternalFetchRow(const FetchRowMsg *p); 148 }; 149 } // namespace dataset 150 } // namespace mindspore 151 #endif // MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_CACHE_SERVICE_H_ 152