• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2019 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include "minddata/dataset/engine/cache/storage_manager.h"
17 
18 #include <iomanip>
19 
20 #include "utils/ms_utils.h"
21 #include "minddata/dataset/util/log_adapter.h"
22 #include "minddata/dataset/util/path.h"
23 #include "minddata/dataset/util/random.h"
24 #include "minddata/dataset/util/services.h"
25 
26 namespace mindspore {
27 namespace dataset {
GetBaseName(const std::string & prefix,int32_t file_id)28 std::string StorageManager::GetBaseName(const std::string &prefix, int32_t file_id) {
29   std::ostringstream oss;
30   oss << prefix << std::setfill('0') << std::setw(5) << file_id;
31   return oss.str();
32 }
33 
ConstructFileName(const std::string & prefix,int32_t file_id,const std::string & suffix)34 std::string StorageManager::ConstructFileName(const std::string &prefix, int32_t file_id, const std::string &suffix) {
35   std::string base_name = GetBaseName(prefix, file_id);
36   return (base_name + "." + suffix);
37 }
38 
AddOneContainer(int replaced_container_pos)39 Status StorageManager::AddOneContainer(int replaced_container_pos) {
40   const std::string kPrefix = "IMG";
41   const std::string kSuffix = "LB";
42   Path container_name = root_ / ConstructFileName(kPrefix, file_id_, kSuffix);
43   std::shared_ptr<StorageContainer> sc;
44   RETURN_IF_NOT_OK(StorageContainer::CreateStorageContainer(&sc, container_name.ToString()));
45   containers_.push_back(sc);
46   file_id_++;
47   if (replaced_container_pos >= 0) {
48     writable_containers_pool_[replaced_container_pos] = containers_.size() - 1;
49   } else {
50     writable_containers_pool_.push_back(containers_.size() - 1);
51   }
52   return Status::OK();
53 }
54 
DoServiceStart()55 Status StorageManager::DoServiceStart() {
56   containers_.reserve(kMaxNumContainers);
57   writable_containers_pool_.reserve(pool_size_);
58   if (root_.IsDirectory()) {
59     // create multiple containers and store their index in a pool
60     CHECK_FAIL_RETURN_UNEXPECTED(pool_size_ > 0, "Expect positive pool_size_, but got:" + std::to_string(pool_size_));
61     for (int i = 0; i < pool_size_; i++) {
62       RETURN_IF_NOT_OK(AddOneContainer());
63     }
64   } else {
65     RETURN_STATUS_UNEXPECTED("Not a directory");
66   }
67   return Status::OK();
68 }
69 
Write(key_type * key,const std::vector<ReadableSlice> & buf)70 Status StorageManager::Write(key_type *key, const std::vector<ReadableSlice> &buf) {
71   RETURN_UNEXPECTED_IF_NULL(key);
72   size_t sz = 0;
73   for (auto &v : buf) {
74     sz += v.GetSize();
75   }
76   if (sz == 0) {
77     RETURN_STATUS_UNEXPECTED("Unexpected 0 length");
78   }
79   auto mt = GetRandomDevice();
80   std::shared_ptr<StorageContainer> cont;
81   key_type out_key;
82   value_type out_value;
83   bool create_new_container = false;
84   int old_container_pos = -1;
85   size_t last_num_container = -1;
86   do {
87     SharedLock lock_s(&rw_lock_);
88     size_t num_containers = containers_.size();
89     if (create_new_container && (num_containers == last_num_container) && (old_container_pos >= 0)) {
90       // Upgrade to exclusive lock.
91       lock_s.Upgrade();
92       create_new_container = false;
93       // Check again if someone has already added a
94       // new container after we got the x lock
95       if (containers_.size() == num_containers) {
96         // Create a new container and replace the full container in the pool with the newly created one
97         RETURN_IF_NOT_OK(AddOneContainer(old_container_pos));
98       }
99       // Refresh how many containers there are.
100       num_containers = containers_.size();
101       // Downgrade back to shared lock
102       lock_s.Downgrade();
103     }
104     if (num_containers == 0) {
105       RETURN_STATUS_UNEXPECTED("num_containers is zero");
106     }
107     // Pick a random container from the writable container pool to insert.
108     std::uniform_int_distribution<int> distribution(0, pool_size_ - 1);
109     int pos_in_pool = distribution(mt);
110     int cont_index = writable_containers_pool_.at(pos_in_pool);
111     cont = containers_.at(cont_index);
112     off64_t offset;
113     Status rc = cont->Insert(buf, &offset);
114     if (rc.StatusCode() == StatusCode::kMDBuddySpaceFull) {
115       create_new_container = true;
116       old_container_pos = pos_in_pool;
117       // Remember how many containers we saw. In the next iteration we will do a comparison to see
118       // if someone has already created it.
119       last_num_container = num_containers;
120     } else if (rc.IsOk()) {
121       out_value = std::make_pair(cont_index, std::make_pair(offset, sz));
122       RETURN_IF_NOT_OK(index_.insert(out_value, &out_key));
123       *key = out_key;
124       break;
125     } else {
126       return rc;
127     }
128   } while (true);
129   return Status::OK();
130 }
131 
Read(StorageManager::key_type key,WritableSlice * dest,size_t * bytesRead) const132 Status StorageManager::Read(StorageManager::key_type key, WritableSlice *dest, size_t *bytesRead) const {
133   RETURN_UNEXPECTED_IF_NULL(dest);
134   auto r = index_.Search(key);
135   if (r.second) {
136     auto &it = r.first;
137     value_type v = *it;
138     int container_inx = v.first;
139     off_t offset = v.second.first;
140     size_t sz = v.second.second;
141     if (dest->GetSize() < sz) {
142       std::string errMsg = "Destination buffer too small. Expect at least " + std::to_string(sz) +
143                            " but length = " + std::to_string(dest->GetSize());
144       RETURN_STATUS_UNEXPECTED(errMsg);
145     }
146     if (bytesRead != nullptr) {
147       *bytesRead = sz;
148     }
149     auto cont = containers_.at(container_inx);
150     RETURN_IF_NOT_OK(cont->Read(dest, offset));
151   } else {
152     RETURN_STATUS_UNEXPECTED("Key not found");
153   }
154   return Status::OK();
155 }
156 
DoServiceStop()157 Status StorageManager::DoServiceStop() noexcept {
158   Status rc;
159   Status rc1;
160   for (auto const &p : containers_) {
161     // The destructor of StorageContainer is not called automatically until the use
162     // count drops to 0. But it is not always the case. We will do it ourselves.
163     rc = p.get()->Truncate();
164     if (rc.IsError()) {
165       rc1 = rc;
166     }
167   }
168   containers_.clear();
169   writable_containers_pool_.clear();
170   file_id_ = 0;
171   return rc1;
172 }
173 
StorageManager(const Path & root)174 StorageManager::StorageManager(const Path &root) : root_(root), file_id_(0), index_(), pool_size_(1) {}
175 
StorageManager(const Path & root,int pool_size)176 StorageManager::StorageManager(const Path &root, int pool_size)
177     : root_(root), file_id_(0), index_(), pool_size_(pool_size) {}
178 
~StorageManager()179 StorageManager::~StorageManager() { (void)StorageManager::DoServiceStop(); }
180 
operator <<(std::ostream & os,const StorageManager & s)181 std::ostream &operator<<(std::ostream &os, const StorageManager &s) {
182   os << "Dumping all containers ..."
183      << "\n";
184   for (auto const &p : s.containers_) {
185     os << *(p.get());
186   }
187   return os;
188 }
189 }  // namespace dataset
190 }  // namespace mindspore
191