• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2019 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include <algorithm>
17 #include "utils/ms_utils.h"
18 #include "minddata/dataset/engine/cache/cache_pool.h"
19 #include "minddata/dataset/engine/cache/cache_server.h"
20 #include "minddata/dataset/util/services.h"
21 
22 namespace mindspore {
23 namespace dataset {
CachePool(std::shared_ptr<NumaMemoryPool> mp,const std::string & root)24 CachePool::CachePool(std::shared_ptr<NumaMemoryPool> mp, const std::string &root)
25     : mp_(std::move(mp)), root_(root), subfolder_(Services::GetUniqueID()), sm_(nullptr), tree_(nullptr) {
26   // Initialize soft memory cap to the current available memory on the machine.
27   soft_mem_limit_ = CacheServerHW::GetAvailableMemory();
28   temp_mem_usage_ = 0;
29   min_avail_mem_ = static_cast<uint64_t>(CacheServerHW::GetTotalSystemMemory() * (1.0 - mp_->GetMemoryCapRatio()));
30 }
31 
DoServiceStart()32 Status CachePool::DoServiceStart() {
33   tree_ = std::make_shared<data_index>();
34   // If we are given a disk path, set up the StorageManager
35   if (!root_.ToString().empty()) {
36     Path spill = GetSpillPath();
37     RETURN_IF_NOT_OK(spill.CreateDirectories());
38     auto &cs = CacheServer::GetInstance();
39     sm_ = std::make_shared<StorageManager>(spill, cs.GetNumWorkers());
40     RETURN_IF_NOT_OK(sm_->ServiceStart());
41     MS_LOG(INFO) << "CachePool will use disk folder: " << spill.ToString();
42   }
43   return Status::OK();
44 }
45 
DoServiceStop()46 Status CachePool::DoServiceStop() {
47   Status rc;
48   Status rc2;
49   if (sm_ != nullptr) {
50     rc = sm_->ServiceStop();
51     if (rc.IsError()) {
52       rc2 = rc;
53     }
54   }
55   sm_.reset();
56 
57   // We used to free the memory allocated from each DataLocator but
58   // since all of them are coming from NumaMemoryPool and we will
59   // skip this and release the whole NumaMemoryPool instead. Otherwise
60   // release each buffer in the DataLocator one by one.
61 
62   tree_.reset();
63   if (!root_.ToString().empty()) {
64     Path spill = GetSpillPath();
65     auto it = Path::DirIterator::OpenDirectory(&spill);
66     while (it->HasNext()) {
67       rc = it->Next().Remove();
68       if (rc.IsError() && rc2.IsOk()) {
69         rc2 = rc;
70       }
71     }
72     rc = spill.Remove();
73     if (rc.IsError() && rc2.IsOk()) {
74       rc2 = rc;
75     }
76   }
77   return rc2;
78 }
79 
~CachePool()80 CachePool::~CachePool() noexcept { (void)ServiceStop(); }
81 
Insert(CachePool::key_type key,const std::vector<ReadableSlice> & buf)82 Status CachePool::Insert(CachePool::key_type key, const std::vector<ReadableSlice> &buf) {
83   DataLocator bl;
84   Status rc;
85   size_t sz = 0;
86   // We will consolidate all the slices into one piece.
87   for (auto &v : buf) {
88     sz += v.GetSize();
89   }
90   bl.sz = sz;
91   // If required memory size exceeds the available size, it gives OOM status. To avoid cache server process got killed
92   // or crashing the machine, set lower bound memory, which means stopping cache once the rest available memory is less
93   // than the lower bound. (The default is 20% of physical RAM)
94   if (soft_mem_limit_ - temp_mem_usage_ - static_cast<uint64_t>(sz) < min_avail_mem_) {
95     MS_LOG(WARNING) << "Memory usage will exceed the upper bound limit of: " << min_avail_mem_
96                     << ". The cache server will not cache any more data.";
97     rc = Status(StatusCode::kMDOutOfMemory, __LINE__, __FILE__);
98   } else {
99     rc = mp_->Allocate(sz, reinterpret_cast<void **>(&bl.ptr));
100     // Adjust the soft limit and usage counting when every 100M memory are used.
101     if (temp_mem_usage_ + sz >= kMemoryCapAdjustInterval) {
102       soft_mem_limit_ = CacheServerHW::GetAvailableMemory();
103       temp_mem_usage_ = 0;
104     }
105   }
106   if (rc.IsOk()) {
107     temp_mem_usage_ += sz;
108     // Write down which numa node where we allocate from. It only make sense if the policy is kOnNode.
109     if (CacheServerHW::numa_enabled()) {
110       auto &cs = CacheServer::GetInstance();
111       auto node_id = cs.GetHWControl()->GetMyNode();
112       bl.node_id = mp_->FindNode(bl.ptr);
113       CHECK_FAIL_RETURN_UNEXPECTED(bl.node_id != -1, "Allocator is not from numa memory pool");
114       bl.node_hit = (bl.node_id == node_id);
115     }
116     // We will do a piecewise copy.
117     WritableSlice dest(bl.ptr, bl.sz);
118     size_t pos = 0;
119     for (auto &v : buf) {
120       WritableSlice out(dest, pos);
121       rc = WritableSlice::Copy(&out, v);
122       if (rc.IsError()) {
123         break;
124       }
125       pos += v.GetSize();
126     }
127     if (rc.IsError()) {
128       mp_->Deallocate(bl.ptr);
129       bl.ptr = nullptr;
130       return rc;
131     }
132   } else if (rc == StatusCode::kMDOutOfMemory) {
133     // If no memory, write to disk.
134     if (sm_ != nullptr) {
135       MS_LOG(DEBUG) << "Spill to disk directly ... " << bl.sz << " bytes.";
136       RETURN_IF_NOT_OK(sm_->Write(&bl.storage_key, buf));
137     } else {
138       // If asked to spill to disk instead but there is no storage set up, simply return no memory
139       // instead.
140       return Status(StatusCode::kMDOutOfMemory, __LINE__, __FILE__, "No enough storage for cache server to cache data");
141     }
142   } else {
143     return rc;
144   }
145   // Insert into the B+ tree. We may still get out of memory error. So need to catch it.
146   try {
147     rc = tree_->DoInsert(key, bl);
148   } catch (const std::bad_alloc &e) {
149     rc = Status(StatusCode::kMDOutOfMemory, __LINE__, __FILE__);
150   }
151   // Duplicate key is treated as error and we will also free the memory.
152   if (rc.IsError() && bl.ptr != nullptr) {
153     mp_->Deallocate(bl.ptr);
154     bl.ptr = nullptr;
155     return rc;
156   }
157   return rc;
158 }
159 
Read(CachePool::key_type key,WritableSlice * dest,size_t * bytesRead) const160 Status CachePool::Read(CachePool::key_type key, WritableSlice *dest, size_t *bytesRead) const {
161   RETURN_UNEXPECTED_IF_NULL(dest);
162   auto r = tree_->Search(key);
163   if (r.second) {
164     auto &it = r.first;
165     if (it->ptr != nullptr) {
166       ReadableSlice src(it->ptr, it->sz);
167       RETURN_IF_NOT_OK(WritableSlice::Copy(dest, src));
168     } else if (sm_ != nullptr) {
169       size_t expectedLength = 0;
170       RETURN_IF_NOT_OK(sm_->Read(it->storage_key, dest, &expectedLength));
171       if (expectedLength != it->sz) {
172         MS_LOG(ERROR) << "Unexpected length. Read " << expectedLength << ". Expected " << it->sz << "."
173                       << " Internal key: " << key << "\n";
174         RETURN_STATUS_UNEXPECTED("Length mismatch. See log file for details.");
175       }
176     }
177     if (bytesRead != nullptr) {
178       *bytesRead = it->sz;
179     }
180   } else {
181     RETURN_STATUS_UNEXPECTED("Key not found");
182   }
183   return Status::OK();
184 }
185 
GetSpillPath() const186 Path CachePool::GetSpillPath() const {
187   auto spill = Path(root_) / subfolder_;
188   return spill;
189 }
190 
GetStat(bool GetMissingKeys) const191 CachePool::CacheStat CachePool::GetStat(bool GetMissingKeys) const {
192   tree_->LockShared();  // Prevent any node split while we search.
193   CacheStat cs{-1, -1, 0, 0, 0, 0};
194   int64_t total_sz = 0;
195   if (tree_->begin() != tree_->end()) {
196     cs.min_key = tree_->begin().key();
197     cs.max_key = cs.min_key;  // will adjust later.
198     for (auto it = tree_->begin(); it != tree_->end(); ++it) {
199       it.LockShared();
200       total_sz += it.value().sz;
201       if (it.value().ptr != nullptr) {
202         ++cs.num_mem_cached;
203       } else {
204         ++cs.num_disk_cached;
205       }
206       if (it.value().node_hit) {
207         ++cs.num_numa_hit;
208       }
209       auto cur_key = it.key();
210       if (GetMissingKeys) {
211         for (auto i = cs.max_key + 1; i < cur_key; ++i) {
212           cs.gap.push_back((i));
213         }
214       }
215       cs.max_key = cur_key;
216       it.Unlock();
217     }
218   }
219   if (total_sz > 0) {
220     // integer arithmetic. NO need to cast to float or double.
221     cs.average_cache_sz = total_sz / (cs.num_disk_cached + cs.num_mem_cached);
222     if (cs.average_cache_sz == 0) {
223       cs.average_cache_sz = 1;
224     }
225   }
226   tree_->Unlock();
227   return cs;
228 }
229 
GetDataLocator(key_type key,const std::shared_ptr<flatbuffers::FlatBufferBuilder> & fbb,flatbuffers::Offset<DataLocatorMsg> * out) const230 Status CachePool::GetDataLocator(key_type key, const std::shared_ptr<flatbuffers::FlatBufferBuilder> &fbb,
231                                  flatbuffers::Offset<DataLocatorMsg> *out) const {
232   RETURN_UNEXPECTED_IF_NULL(out);
233   auto r = tree_->Search(key);
234   if (r.second) {
235     auto &it = r.first;
236     DataLocatorMsgBuilder bld(*fbb);
237     bld.add_key(key);
238     bld.add_size(it->sz);
239     bld.add_node_id(it->node_id);
240     bld.add_addr(reinterpret_cast<int64_t>(it->ptr));
241     auto offset = bld.Finish();
242     *out = offset;
243   } else {
244     // Key not in the cache.
245     auto offset = CreateDataLocatorMsg(*fbb, key, 0, 0, 0);
246     *out = offset;
247   }
248   return Status::OK();
249 }
250 }  // namespace dataset
251 }  // namespace mindspore
252