• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2019 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include "minddata/dataset/engine/cache/storage_manager.h"
17 #include <iomanip>
18 #include "minddata/dataset/util/log_adapter.h"
19 #include "minddata/dataset/util/path.h"
20 #include "minddata/dataset/util/random.h"
21 #include "minddata/dataset/util/services.h"
22 
23 namespace mindspore {
24 namespace dataset {
GetBaseName(const std::string & prefix,int32_t file_id)25 std::string StorageManager::GetBaseName(const std::string &prefix, int32_t file_id) {
26   std::ostringstream oss;
27   oss << prefix << std::setfill('0') << std::setw(5) << file_id;
28   return oss.str();
29 }
30 
ConstructFileName(const std::string & prefix,int32_t file_id,const std::string & suffix)31 std::string StorageManager::ConstructFileName(const std::string &prefix, int32_t file_id, const std::string &suffix) {
32   std::string base_name = GetBaseName(prefix, file_id);
33   return (base_name + "." + suffix);
34 }
35 
AddOneContainer(int replaced_container_pos)36 Status StorageManager::AddOneContainer(int replaced_container_pos) {
37   const std::string kPrefix = "IMG";
38   const std::string kSuffix = "LB";
39   Path container_name = root_ / ConstructFileName(kPrefix, file_id_, kSuffix);
40   std::shared_ptr<StorageContainer> sc;
41   RETURN_IF_NOT_OK(StorageContainer::CreateStorageContainer(&sc, container_name.ToString()));
42   containers_.push_back(sc);
43   file_id_++;
44   if (replaced_container_pos >= 0) {
45     writable_containers_pool_[replaced_container_pos] = containers_.size() - 1;
46   } else {
47     writable_containers_pool_.push_back(containers_.size() - 1);
48   }
49   return Status::OK();
50 }
51 
DoServiceStart()52 Status StorageManager::DoServiceStart() {
53   containers_.reserve(kMaxNumContainers);
54   writable_containers_pool_.reserve(pool_size_);
55   if (root_.IsDirectory()) {
56     // create multiple containers and store their index in a pool
57     CHECK_FAIL_RETURN_UNEXPECTED(pool_size_ > 0, "Expect positive pool_size_, but got:" + std::to_string(pool_size_));
58     for (auto i = 0; i < pool_size_; i++) {
59       RETURN_IF_NOT_OK(AddOneContainer());
60     }
61   } else {
62     RETURN_STATUS_UNEXPECTED("Not a directory");
63   }
64   return Status::OK();
65 }
66 
Write(key_type * key,const std::vector<ReadableSlice> & buf)67 Status StorageManager::Write(key_type *key, const std::vector<ReadableSlice> &buf) {
68   RETURN_UNEXPECTED_IF_NULL(key);
69   size_t sz = 0;
70   for (auto &v : buf) {
71     sz += v.GetSize();
72   }
73   if (sz == 0) {
74     RETURN_STATUS_UNEXPECTED("Unexpected 0 length");
75   }
76   auto mt = GetRandomDevice();
77   std::shared_ptr<StorageContainer> cont;
78   key_type out_key;
79   value_type out_value;
80   bool create_new_container = false;
81   int old_container_pos = -1;
82   int last_num_container = -1;
83   do {
84     SharedLock lock_s(&rw_lock_);
85     size_t num_containers = containers_.size();
86     if (create_new_container && (num_containers == last_num_container) && (old_container_pos >= 0)) {
87       // Upgrade to exclusive lock.
88       lock_s.Upgrade();
89       create_new_container = false;
90       // Check again if someone has already added a
91       // new container after we got the x lock
92       if (containers_.size() == num_containers) {
93         // Create a new container and replace the full container in the pool with the newly created one
94         RETURN_IF_NOT_OK(AddOneContainer(old_container_pos));
95       }
96       // Refresh how many containers there are.
97       num_containers = containers_.size();
98       // Downgrade back to shared lock
99       lock_s.Downgrade();
100     }
101     if (num_containers == 0) {
102       RETURN_STATUS_UNEXPECTED("num_containers is zero");
103     }
104     // Pick a random container from the writable container pool to insert.
105     std::uniform_int_distribution<size_t> distribution(0, pool_size_ - 1);
106     size_t pos_in_pool = distribution(mt);
107     size_t cont_index = writable_containers_pool_.at(pos_in_pool);
108     cont = containers_.at(cont_index);
109     off64_t offset;
110     Status rc = cont->Insert(buf, &offset);
111     if (rc.StatusCode() == StatusCode::kMDBuddySpaceFull) {
112       create_new_container = true;
113       old_container_pos = pos_in_pool;
114       // Remember how many containers we saw. In the next iteration we will do a comparison to see
115       // if someone has already created it.
116       last_num_container = num_containers;
117     } else if (rc.IsOk()) {
118       out_value = std::make_pair(cont_index, std::make_pair(offset, sz));
119       RETURN_IF_NOT_OK(index_.insert(out_value, &out_key));
120       *key = out_key;
121       break;
122     } else {
123       return rc;
124     }
125   } while (true);
126   return Status::OK();
127 }
128 
Read(StorageManager::key_type key,WritableSlice * dest,size_t * bytesRead) const129 Status StorageManager::Read(StorageManager::key_type key, WritableSlice *dest, size_t *bytesRead) const {
130   RETURN_UNEXPECTED_IF_NULL(dest);
131   auto r = index_.Search(key);
132   if (r.second) {
133     auto &it = r.first;
134     value_type v = *it;
135     size_t container_inx = v.first;
136     off_t offset = v.second.first;
137     size_t sz = v.second.second;
138     if (dest->GetSize() < sz) {
139       std::string errMsg = "Destination buffer too small. Expect at least " + std::to_string(sz) +
140                            " but length = " + std::to_string(dest->GetSize());
141       RETURN_STATUS_UNEXPECTED(errMsg);
142     }
143     if (bytesRead != nullptr) {
144       *bytesRead = sz;
145     }
146     auto cont = containers_.at(container_inx);
147     RETURN_IF_NOT_OK(cont->Read(dest, offset));
148   } else {
149     RETURN_STATUS_UNEXPECTED("Key not found");
150   }
151   return Status::OK();
152 }
153 
DoServiceStop()154 Status StorageManager::DoServiceStop() noexcept {
155   Status rc;
156   Status rc1;
157   for (auto const &p : containers_) {
158     // The destructor of StorageContainer is not called automatically until the use
159     // count drops to 0. But it is not always the case. We will do it ourselves.
160     rc = p.get()->Truncate();
161     if (rc.IsError()) {
162       rc1 = rc;
163     }
164   }
165   containers_.clear();
166   writable_containers_pool_.clear();
167   file_id_ = 0;
168   return rc1;
169 }
170 
StorageManager(const Path & root)171 StorageManager::StorageManager(const Path &root) : root_(root), file_id_(0), index_(), pool_size_(1) {}
172 
StorageManager(const Path & root,size_t pool_size)173 StorageManager::StorageManager(const Path &root, size_t pool_size)
174     : root_(root), file_id_(0), index_(), pool_size_(pool_size) {}
175 
~StorageManager()176 StorageManager::~StorageManager() { (void)StorageManager::DoServiceStop(); }
177 
operator <<(std::ostream & os,const StorageManager & s)178 std::ostream &operator<<(std::ostream &os, const StorageManager &s) {
179   os << "Dumping all containers ..."
180      << "\n";
181   for (auto const &p : s.containers_) {
182     os << *(p.get());
183   }
184   return os;
185 }
186 }  // namespace dataset
187 }  // namespace mindspore
188