1 /**
2 * Copyright 2019 Huawei Technologies Co., Ltd
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 #include "minddata/dataset/engine/cache/storage_manager.h"
17 #include <iomanip>
18 #include "minddata/dataset/util/log_adapter.h"
19 #include "minddata/dataset/util/path.h"
20 #include "minddata/dataset/util/random.h"
21 #include "minddata/dataset/util/services.h"
22
23 namespace mindspore {
24 namespace dataset {
GetBaseName(const std::string & prefix,int32_t file_id)25 std::string StorageManager::GetBaseName(const std::string &prefix, int32_t file_id) {
26 std::ostringstream oss;
27 oss << prefix << std::setfill('0') << std::setw(5) << file_id;
28 return oss.str();
29 }
30
ConstructFileName(const std::string & prefix,int32_t file_id,const std::string & suffix)31 std::string StorageManager::ConstructFileName(const std::string &prefix, int32_t file_id, const std::string &suffix) {
32 std::string base_name = GetBaseName(prefix, file_id);
33 return (base_name + "." + suffix);
34 }
35
AddOneContainer(int replaced_container_pos)36 Status StorageManager::AddOneContainer(int replaced_container_pos) {
37 const std::string kPrefix = "IMG";
38 const std::string kSuffix = "LB";
39 Path container_name = root_ / ConstructFileName(kPrefix, file_id_, kSuffix);
40 std::shared_ptr<StorageContainer> sc;
41 RETURN_IF_NOT_OK(StorageContainer::CreateStorageContainer(&sc, container_name.ToString()));
42 containers_.push_back(sc);
43 file_id_++;
44 if (replaced_container_pos >= 0) {
45 writable_containers_pool_[replaced_container_pos] = containers_.size() - 1;
46 } else {
47 writable_containers_pool_.push_back(containers_.size() - 1);
48 }
49 return Status::OK();
50 }
51
DoServiceStart()52 Status StorageManager::DoServiceStart() {
53 containers_.reserve(kMaxNumContainers);
54 writable_containers_pool_.reserve(pool_size_);
55 if (root_.IsDirectory()) {
56 // create multiple containers and store their index in a pool
57 CHECK_FAIL_RETURN_UNEXPECTED(pool_size_ > 0, "Expect positive pool_size_, but got:" + std::to_string(pool_size_));
58 for (auto i = 0; i < pool_size_; i++) {
59 RETURN_IF_NOT_OK(AddOneContainer());
60 }
61 } else {
62 RETURN_STATUS_UNEXPECTED("Not a directory");
63 }
64 return Status::OK();
65 }
66
Write(key_type * key,const std::vector<ReadableSlice> & buf)67 Status StorageManager::Write(key_type *key, const std::vector<ReadableSlice> &buf) {
68 RETURN_UNEXPECTED_IF_NULL(key);
69 size_t sz = 0;
70 for (auto &v : buf) {
71 sz += v.GetSize();
72 }
73 if (sz == 0) {
74 RETURN_STATUS_UNEXPECTED("Unexpected 0 length");
75 }
76 auto mt = GetRandomDevice();
77 std::shared_ptr<StorageContainer> cont;
78 key_type out_key;
79 value_type out_value;
80 bool create_new_container = false;
81 int old_container_pos = -1;
82 int last_num_container = -1;
83 do {
84 SharedLock lock_s(&rw_lock_);
85 size_t num_containers = containers_.size();
86 if (create_new_container && (num_containers == last_num_container) && (old_container_pos >= 0)) {
87 // Upgrade to exclusive lock.
88 lock_s.Upgrade();
89 create_new_container = false;
90 // Check again if someone has already added a
91 // new container after we got the x lock
92 if (containers_.size() == num_containers) {
93 // Create a new container and replace the full container in the pool with the newly created one
94 RETURN_IF_NOT_OK(AddOneContainer(old_container_pos));
95 }
96 // Refresh how many containers there are.
97 num_containers = containers_.size();
98 // Downgrade back to shared lock
99 lock_s.Downgrade();
100 }
101 if (num_containers == 0) {
102 RETURN_STATUS_UNEXPECTED("num_containers is zero");
103 }
104 // Pick a random container from the writable container pool to insert.
105 std::uniform_int_distribution<size_t> distribution(0, pool_size_ - 1);
106 size_t pos_in_pool = distribution(mt);
107 size_t cont_index = writable_containers_pool_.at(pos_in_pool);
108 cont = containers_.at(cont_index);
109 off64_t offset;
110 Status rc = cont->Insert(buf, &offset);
111 if (rc.StatusCode() == StatusCode::kMDBuddySpaceFull) {
112 create_new_container = true;
113 old_container_pos = pos_in_pool;
114 // Remember how many containers we saw. In the next iteration we will do a comparison to see
115 // if someone has already created it.
116 last_num_container = num_containers;
117 } else if (rc.IsOk()) {
118 out_value = std::make_pair(cont_index, std::make_pair(offset, sz));
119 RETURN_IF_NOT_OK(index_.insert(out_value, &out_key));
120 *key = out_key;
121 break;
122 } else {
123 return rc;
124 }
125 } while (true);
126 return Status::OK();
127 }
128
Read(StorageManager::key_type key,WritableSlice * dest,size_t * bytesRead) const129 Status StorageManager::Read(StorageManager::key_type key, WritableSlice *dest, size_t *bytesRead) const {
130 RETURN_UNEXPECTED_IF_NULL(dest);
131 auto r = index_.Search(key);
132 if (r.second) {
133 auto &it = r.first;
134 value_type v = *it;
135 size_t container_inx = v.first;
136 off_t offset = v.second.first;
137 size_t sz = v.second.second;
138 if (dest->GetSize() < sz) {
139 std::string errMsg = "Destination buffer too small. Expect at least " + std::to_string(sz) +
140 " but length = " + std::to_string(dest->GetSize());
141 RETURN_STATUS_UNEXPECTED(errMsg);
142 }
143 if (bytesRead != nullptr) {
144 *bytesRead = sz;
145 }
146 auto cont = containers_.at(container_inx);
147 RETURN_IF_NOT_OK(cont->Read(dest, offset));
148 } else {
149 RETURN_STATUS_UNEXPECTED("Key not found");
150 }
151 return Status::OK();
152 }
153
DoServiceStop()154 Status StorageManager::DoServiceStop() noexcept {
155 Status rc;
156 Status rc1;
157 for (auto const &p : containers_) {
158 // The destructor of StorageContainer is not called automatically until the use
159 // count drops to 0. But it is not always the case. We will do it ourselves.
160 rc = p.get()->Truncate();
161 if (rc.IsError()) {
162 rc1 = rc;
163 }
164 }
165 containers_.clear();
166 writable_containers_pool_.clear();
167 file_id_ = 0;
168 return rc1;
169 }
170
StorageManager(const Path & root)171 StorageManager::StorageManager(const Path &root) : root_(root), file_id_(0), index_(), pool_size_(1) {}
172
StorageManager(const Path & root,size_t pool_size)173 StorageManager::StorageManager(const Path &root, size_t pool_size)
174 : root_(root), file_id_(0), index_(), pool_size_(pool_size) {}
175
~StorageManager()176 StorageManager::~StorageManager() { (void)StorageManager::DoServiceStop(); }
177
operator <<(std::ostream & os,const StorageManager & s)178 std::ostream &operator<<(std::ostream &os, const StorageManager &s) {
179 os << "Dumping all containers ..."
180 << "\n";
181 for (auto const &p : s.containers_) {
182 os << *(p.get());
183 }
184 return os;
185 }
186 } // namespace dataset
187 } // namespace mindspore
188