• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2019 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include "minddata/dataset/engine/cache/storage_container.h"
17 
18 #include <unistd.h>
19 #include <vector>
20 #include "utils/ms_utils.h"
21 #include "minddata/dataset/util/log_adapter.h"
22 #include "minddata/dataset/util/path.h"
23 #include "minddata/dataset/util/status.h"
24 
25 namespace mindspore {
26 namespace dataset {
Create()27 Status StorageContainer::Create() {
28   RETURN_IF_NOT_OK(BuddySpace::CreateBuddySpace(&bs_));
29   RETURN_IF_NOT_OK(cont_.CreateFile(&fd_));
30   is_open_ = true;
31   MS_LOG(INFO) << "Container " << cont_ << " created";
32   return Status::OK();
33 }
34 
Open()35 Status StorageContainer::Open() noexcept {
36   std::lock_guard<std::mutex> lck(mutex_);
37   // Check again
38   if (!is_open_) {
39     RETURN_IF_NOT_OK(cont_.OpenFile(&fd_));
40     is_open_ = true;
41   }
42   return Status::OK();
43 }
44 
Close()45 Status StorageContainer::Close() noexcept {
46   if (is_open_) {
47     std::lock_guard<std::mutex> lck(mutex_);
48     // Check again
49     if (is_open_) {
50       RETURN_IF_NOT_OK(cont_.CloseFile(fd_));
51       is_open_ = false;
52       fd_ = -1;
53     }
54   }
55   return Status::OK();
56 }
57 
Read(WritableSlice * dest,off64_t offset) const58 Status StorageContainer::Read(WritableSlice *dest, off64_t offset) const noexcept {
59   MS_ASSERT(is_open_);
60   RETURN_UNEXPECTED_IF_NULL(dest);
61   auto sz = dest->GetSize();
62 #if defined(_WIN32) || defined(_WIN64)
63   // Doesn't seem there is any pread64 on mingw.
64   // So we will do a seek and then a read under
65   // a protection of mutex.
66   std::lock_guard<std::mutex> lck(mutex_);
67   auto seek_err = lseek(fd_, offset, SEEK_SET);
68   if (seek_err < 0) {
69     RETURN_STATUS_UNEXPECTED(strerror(errno));
70   }
71   auto r_sz = read(fd_, dest->GetMutablePointer(), sz);
72 #elif defined(__APPLE__)
73   auto r_sz = pread(fd_, dest->GetMutablePointer(), sz, offset);
74 #else
75   auto r_sz = pread64(fd_, dest->GetMutablePointer(), sz, offset);
76 #endif
77   if (r_sz != sz) {
78     errno_t err = (r_sz == 0) ? EOF : errno;
79     RETURN_STATUS_UNEXPECTED(strerror(err));
80   }
81   return Status::OK();
82 }
83 
Write(const ReadableSlice & dest,off64_t offset) const84 Status StorageContainer::Write(const ReadableSlice &dest, off64_t offset) const noexcept {
85   MS_ASSERT(is_open_);
86   auto sz = dest.GetSize();
87 #if defined(_WIN32) || defined(_WIN64)
88   // Doesn't seem there is any pwrite64 on mingw.
89   // So we will do a seek and then a read under
90   // a protection of mutex.
91   std::lock_guard<std::mutex> lck(mutex_);
92   auto seek_err = lseek(fd_, offset, SEEK_SET);
93   if (seek_err < 0) {
94     RETURN_STATUS_UNEXPECTED(strerror(errno));
95   }
96   auto r_sz = write(fd_, dest.GetPointer(), sz);
97 #elif defined(__APPLE__)
98   auto r_sz = pwrite(fd_, dest.GetPointer(), sz, offset);
99 #else
100   auto r_sz = pwrite64(fd_, dest.GetPointer(), sz, offset);
101 #endif
102   if (r_sz != sz) {
103     errno_t err = (r_sz == 0) ? EOF : errno;
104     if (errno == ENOSPC) {
105       return Status(StatusCode::kMDNoSpace, __LINE__, __FILE__);
106     } else {
107       RETURN_STATUS_UNEXPECTED(strerror(err));
108     }
109   }
110   return Status::OK();
111 }
112 
Insert(const std::vector<ReadableSlice> & buf,off64_t * offset)113 Status StorageContainer::Insert(const std::vector<ReadableSlice> &buf, off64_t *offset) noexcept {
114   size_t sz = 0;
115   for (auto &v : buf) {
116     sz += v.GetSize();
117   }
118   if (sz == 0) {
119     RETURN_STATUS_UNEXPECTED("Unexpected 0 length");
120   }
121   if (sz > bs_->GetMaxSize()) {
122     RETURN_STATUS_UNEXPECTED("Request size too big");
123   }
124   BSpaceDescriptor bspd{0};
125   addr_t addr = 0;
126   RETURN_IF_NOT_OK(bs_->Alloc(sz, &bspd, &addr));
127   *offset = static_cast<off64_t>(addr);
128   // We will do piecewise copy of the data to a large buffer
129   std::string mem;
130   try {
131     mem.resize(sz);
132     CHECK_FAIL_RETURN_UNEXPECTED(mem.capacity() >= sz, "Programming error");
133   } catch (const std::bad_alloc &e) {
134     return Status(StatusCode::kMDOutOfMemory);
135   }
136   WritableSlice all(mem.data(), sz);
137   size_t pos = 0;
138   for (auto &v : buf) {
139     WritableSlice row_data(all, pos);
140     RETURN_IF_NOT_OK(WritableSlice::Copy(&row_data, v));
141     pos += v.GetSize();
142   }
143   // Write all data to disk at once
144   RETURN_IF_NOT_OK(Write(all, addr));
145   return Status::OK();
146 }
147 
Truncate() const148 Status StorageContainer::Truncate() const noexcept {
149   if (is_open_) {
150     RETURN_IF_NOT_OK(cont_.TruncateFile(fd_));
151     MS_LOG(INFO) << "Container " << cont_ << " truncated";
152   }
153   return Status::OK();
154 }
155 
~StorageContainer()156 StorageContainer::~StorageContainer() noexcept {
157   (void)Truncate();
158   (void)Close();
159 }
160 
operator <<(std::ostream & os,const StorageContainer & s)161 std::ostream &operator<<(std::ostream &os, const StorageContainer &s) {
162   os << "File path : " << s.cont_ << "\n" << *(s.bs_.get());
163   return os;
164 }
165 
CreateStorageContainer(std::shared_ptr<StorageContainer> * out_sc,const std::string & path)166 Status StorageContainer::CreateStorageContainer(std::shared_ptr<StorageContainer> *out_sc, const std::string &path) {
167   Status rc;
168   auto sc = new (std::nothrow) StorageContainer(path);
169   if (sc == nullptr) {
170     return Status(StatusCode::kMDOutOfMemory);
171   }
172   rc = sc->Create();
173   if (rc.IsOk()) {
174     (*out_sc).reset(sc);
175   } else {
176     delete sc;
177   }
178   return rc;
179 }
180 }  // namespace dataset
181 }  // namespace mindspore
182