1 /**
2 * Copyright 2019 Huawei Technologies Co., Ltd
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 #include "minddata/dataset/engine/cache/storage_manager.h"
17
18 #include <iomanip>
19
20 #include "utils/ms_utils.h"
21 #include "minddata/dataset/util/log_adapter.h"
22 #include "minddata/dataset/util/path.h"
23 #include "minddata/dataset/util/random.h"
24 #include "minddata/dataset/util/services.h"
25
26 namespace mindspore {
27 namespace dataset {
GetBaseName(const std::string & prefix,int32_t file_id)28 std::string StorageManager::GetBaseName(const std::string &prefix, int32_t file_id) {
29 std::ostringstream oss;
30 oss << prefix << std::setfill('0') << std::setw(5) << file_id;
31 return oss.str();
32 }
33
ConstructFileName(const std::string & prefix,int32_t file_id,const std::string & suffix)34 std::string StorageManager::ConstructFileName(const std::string &prefix, int32_t file_id, const std::string &suffix) {
35 std::string base_name = GetBaseName(prefix, file_id);
36 return (base_name + "." + suffix);
37 }
38
AddOneContainer(int replaced_container_pos)39 Status StorageManager::AddOneContainer(int replaced_container_pos) {
40 const std::string kPrefix = "IMG";
41 const std::string kSuffix = "LB";
42 Path container_name = root_ / ConstructFileName(kPrefix, file_id_, kSuffix);
43 std::shared_ptr<StorageContainer> sc;
44 RETURN_IF_NOT_OK(StorageContainer::CreateStorageContainer(&sc, container_name.ToString()));
45 containers_.push_back(sc);
46 file_id_++;
47 if (replaced_container_pos >= 0) {
48 writable_containers_pool_[replaced_container_pos] = containers_.size() - 1;
49 } else {
50 writable_containers_pool_.push_back(containers_.size() - 1);
51 }
52 return Status::OK();
53 }
54
DoServiceStart()55 Status StorageManager::DoServiceStart() {
56 containers_.reserve(kMaxNumContainers);
57 writable_containers_pool_.reserve(pool_size_);
58 if (root_.IsDirectory()) {
59 // create multiple containers and store their index in a pool
60 CHECK_FAIL_RETURN_UNEXPECTED(pool_size_ > 0, "Expect positive pool_size_, but got:" + std::to_string(pool_size_));
61 for (int i = 0; i < pool_size_; i++) {
62 RETURN_IF_NOT_OK(AddOneContainer());
63 }
64 } else {
65 RETURN_STATUS_UNEXPECTED("Not a directory");
66 }
67 return Status::OK();
68 }
69
Write(key_type * key,const std::vector<ReadableSlice> & buf)70 Status StorageManager::Write(key_type *key, const std::vector<ReadableSlice> &buf) {
71 RETURN_UNEXPECTED_IF_NULL(key);
72 size_t sz = 0;
73 for (auto &v : buf) {
74 sz += v.GetSize();
75 }
76 if (sz == 0) {
77 RETURN_STATUS_UNEXPECTED("Unexpected 0 length");
78 }
79 auto mt = GetRandomDevice();
80 std::shared_ptr<StorageContainer> cont;
81 key_type out_key;
82 value_type out_value;
83 bool create_new_container = false;
84 int old_container_pos = -1;
85 size_t last_num_container = -1;
86 do {
87 SharedLock lock_s(&rw_lock_);
88 size_t num_containers = containers_.size();
89 if (create_new_container && (num_containers == last_num_container) && (old_container_pos >= 0)) {
90 // Upgrade to exclusive lock.
91 lock_s.Upgrade();
92 create_new_container = false;
93 // Check again if someone has already added a
94 // new container after we got the x lock
95 if (containers_.size() == num_containers) {
96 // Create a new container and replace the full container in the pool with the newly created one
97 RETURN_IF_NOT_OK(AddOneContainer(old_container_pos));
98 }
99 // Refresh how many containers there are.
100 num_containers = containers_.size();
101 // Downgrade back to shared lock
102 lock_s.Downgrade();
103 }
104 if (num_containers == 0) {
105 RETURN_STATUS_UNEXPECTED("num_containers is zero");
106 }
107 // Pick a random container from the writable container pool to insert.
108 std::uniform_int_distribution<int> distribution(0, pool_size_ - 1);
109 int pos_in_pool = distribution(mt);
110 int cont_index = writable_containers_pool_.at(pos_in_pool);
111 cont = containers_.at(cont_index);
112 off64_t offset;
113 Status rc = cont->Insert(buf, &offset);
114 if (rc.StatusCode() == StatusCode::kMDBuddySpaceFull) {
115 create_new_container = true;
116 old_container_pos = pos_in_pool;
117 // Remember how many containers we saw. In the next iteration we will do a comparison to see
118 // if someone has already created it.
119 last_num_container = num_containers;
120 } else if (rc.IsOk()) {
121 out_value = std::make_pair(cont_index, std::make_pair(offset, sz));
122 RETURN_IF_NOT_OK(index_.insert(out_value, &out_key));
123 *key = out_key;
124 break;
125 } else {
126 return rc;
127 }
128 } while (true);
129 return Status::OK();
130 }
131
Read(StorageManager::key_type key,WritableSlice * dest,size_t * bytesRead) const132 Status StorageManager::Read(StorageManager::key_type key, WritableSlice *dest, size_t *bytesRead) const {
133 RETURN_UNEXPECTED_IF_NULL(dest);
134 auto r = index_.Search(key);
135 if (r.second) {
136 auto &it = r.first;
137 value_type v = *it;
138 int container_inx = v.first;
139 off_t offset = v.second.first;
140 size_t sz = v.second.second;
141 if (dest->GetSize() < sz) {
142 std::string errMsg = "Destination buffer too small. Expect at least " + std::to_string(sz) +
143 " but length = " + std::to_string(dest->GetSize());
144 RETURN_STATUS_UNEXPECTED(errMsg);
145 }
146 if (bytesRead != nullptr) {
147 *bytesRead = sz;
148 }
149 auto cont = containers_.at(container_inx);
150 RETURN_IF_NOT_OK(cont->Read(dest, offset));
151 } else {
152 RETURN_STATUS_UNEXPECTED("Key not found");
153 }
154 return Status::OK();
155 }
156
DoServiceStop()157 Status StorageManager::DoServiceStop() noexcept {
158 Status rc;
159 Status rc1;
160 for (auto const &p : containers_) {
161 // The destructor of StorageContainer is not called automatically until the use
162 // count drops to 0. But it is not always the case. We will do it ourselves.
163 rc = p.get()->Truncate();
164 if (rc.IsError()) {
165 rc1 = rc;
166 }
167 }
168 containers_.clear();
169 writable_containers_pool_.clear();
170 file_id_ = 0;
171 return rc1;
172 }
173
StorageManager(const Path & root)174 StorageManager::StorageManager(const Path &root) : root_(root), file_id_(0), index_(), pool_size_(1) {}
175
StorageManager(const Path & root,int pool_size)176 StorageManager::StorageManager(const Path &root, int pool_size)
177 : root_(root), file_id_(0), index_(), pool_size_(pool_size) {}
178
~StorageManager()179 StorageManager::~StorageManager() { (void)StorageManager::DoServiceStop(); }
180
operator <<(std::ostream & os,const StorageManager & s)181 std::ostream &operator<<(std::ostream &os, const StorageManager &s) {
182 os << "Dumping all containers ..."
183 << "\n";
184 for (auto const &p : s.containers_) {
185 os << *(p.get());
186 }
187 return os;
188 }
189 } // namespace dataset
190 } // namespace mindspore
191