• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2020 Huawei Technologies Co., Ltd
3 
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7 
8  * http://www.apache.org/licenses/LICENSE-2.0
9 
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15 */
16 
17 #ifndef MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_CACHE_SERVICE_H_
18 #define MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_CACHE_SERVICE_H_
19 
20 #include <algorithm>
21 #include <atomic>
22 #include <memory>
23 #include <mutex>
24 #include <string>
25 #include <type_traits>
26 #include <utility>
27 #include <vector>
28 
29 #include "minddata/dataset/core/global_context.h"
30 #include "minddata/dataset/core/tensor.h"
31 #include "minddata/dataset/engine/cache/cache_request.h"
32 #include "minddata/dataset/engine/cache/cache_pool.h"
33 #include "minddata/dataset/util/arena.h"
34 #include "minddata/dataset/util/btree.h"
35 #include "minddata/dataset/util/service.h"
36 #include "minddata/dataset/util/services.h"
37 #include "minddata/dataset/util/system_pool.h"
38 
39 namespace mindspore {
40 namespace dataset {
41 /// \brief A cache service for storing/fetching buffers to in memory cache and may spill to disk the cache service is
42 /// created to support spilling
43 class CacheService : public Service {
44  public:
45   friend class CacheServer;
46 
47   /// \brief Constructor
48   /// \param mem_sz Memory size to be set aside for the in memory cache. 0 means unlimited
49   /// \param root Spill path. Empty string means no spilling
50   /// \param generate_id If the cache service should generate row id for buffer that is cached.
51   /// For non-mappable dataset, this should be set to true.
52   CacheService(uint64_t mem_sz, const std::string &root, bool generate_id);
53   ~CacheService() override;
54 
55   Status DoServiceStart() override;
56   Status DoServiceStop() override;
57 
58   /// \brief Main function to cache a row which is in form a series of buffers.
59   /// The first buffer is a Google flatbuffer which describes the rest of the buffers followed.
60   /// \param[in] buf Vector of buffer
61   /// \param[out] row_id_generated The row id assigned to this row if any
62   /// \return Status object
63   Status CacheRow(const std::vector<const void *> &buf, row_id_type *row_id_generated);
64 
65   /// \brief A fast version of CacheRow where all the data is already in one contiguous piece.
66   /// \param src Slice of the data
67   /// \param row_id_generated
68   /// \return Status object
69   Status FastCacheRow(const ReadableSlice &src, row_id_type *row_id_generated);
70 
71   /// \brief This function is used in preparation for batch fetching.
72   /// It calculates how much memory we should allocate and which row id are present, etc.
73   /// All needed results are stored in the flat buffer.
74   /// \return Status object
75   Status PreBatchFetch(connection_id_type connection_id, const std::vector<row_id_type> &v,
76                        const std::shared_ptr<flatbuffers::FlatBufferBuilder> &);
77 
78   /// \brief Getter function
79   /// \return Spilling path
80   Path GetSpillPath() const;
81   /// \brief A structure returned from the cache server for statistics request.
82   class ServiceStat {
83    public:
84     using state_type = std::underlying_type<CacheServiceState>::type;
ServiceStat()85     ServiceStat() : state_(0) {}
86     ~ServiceStat() = default;
87     CachePool::CacheStat stat_{};
88     state_type state_;
89   };
90   /// \brief Statistics for the current service
91   /// \param[in/out] A pointer to a pre-allocated ServiceStat structure
92   /// \return Status Object
93   Status GetStat(ServiceStat *);
94   /// \brief Return the current state
GetState()95   CacheServiceState GetState() const { return st_.load(); }
96   /// \brief Cache schema
97   /// \param buf A Google Flatbuffer that contains the schema
98   /// \param len size of the buffer
99   /// \return Status object
100   Status CacheSchema(const void *buf, int64_t len);
101   /// \brief Fetch schema
102   /// \param out A contiguous memory that contains the serialized form of schema.
103   /// \return Status object
104   Status FetchSchema(std::string *out) const;
105   /// \brief Return a set of keys that are definitely cache miss
106   /// \return Status object
107   Status FindKeysMiss(std::vector<row_id_type> *out);
108   /// \brief Overload the << operator to print a cache service
109   /// \param out std::ostream
110   /// \param cs A cache service
111   /// \return std::ostream
112   friend std::ostream &operator<<(std::ostream &out, const CacheService &cs);
113   /// \brief Every cache service has a cookie. If the cookie of a CacheClient matches this cookie, this CacheClient
114   /// is the creator
115   /// \return Cookie
cookie()116   std::string cookie() const { return cookie_; }
117   /// \brief If this cache service generates row id for buffer cached, it is divided into two phases, a build phase and
118   /// a read phase.
119   /// \return True if has two phases.
HasBuildPhase()120   bool HasBuildPhase() const { return generate_id_; }
121   /// \brief Change from write phase to read phase. Only the creator of this service is allowed to make this call.
122   /// \return Status object
123   Status BuildPhaseDone();
124   /// \brief For kToggleWriteMode request
125   Status ToggleWriteMode(bool on_off);
126 
127  private:
128   mutable RWLock rw_lock_;
129   std::string root_;
130   uint64_t cache_mem_sz_;
131   std::shared_ptr<CachePool> cp_;
132   std::atomic<row_id_type> next_id_;
133   bool generate_id_;
134   std::string cookie_;
135   std::atomic<int32_t> num_clients_;
136   std::atomic<CacheServiceState> st_;
137   std::string schema_;
138   std::shared_ptr<NumaMemoryPool> numa_pool_;
139   // We also cache the result from calling FindKeysMiss because it is expensive. Besides user make
140   // this request after we hit memory full or disk full. So the result is unlikely to change.
141   std::mutex get_key_miss_mux_;
142   std::shared_ptr<std::vector<row_id_type>> key_miss_results_;
143   /// \brief Private function to generate a row id
144   /// \return Row id assigned.
GetNextRowId()145   row_id_type GetNextRowId() { return next_id_.fetch_add(1); }
146 
147   Status InternalFetchRow(const FetchRowMsg *p);
148 };
149 }  // namespace dataset
150 }  // namespace mindspore
151 #endif  // MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_CACHE_SERVICE_H_
152