• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2019 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include "minddata/dataset/engine/cache/storage_container.h"
17 #include <unistd.h>
18 #include <vector>
19 #include "minddata/dataset/util/log_adapter.h"
20 #include "minddata/dataset/util/path.h"
21 #include "minddata/dataset/util/status.h"
22 
23 namespace mindspore {
24 namespace dataset {
Create()25 Status StorageContainer::Create() {
26   RETURN_IF_NOT_OK(BuddySpace::CreateBuddySpace(&bs_));
27   RETURN_IF_NOT_OK(cont_.CreateFile(&fd_));
28   is_open_ = true;
29   MS_LOG(INFO) << "Container " << cont_ << " created";
30   return Status::OK();
31 }
32 
Open()33 Status StorageContainer::Open() noexcept {
34   std::lock_guard<std::mutex> lck(mutex_);
35   // Check again
36   if (!is_open_) {
37     RETURN_IF_NOT_OK(cont_.OpenFile(&fd_));
38     is_open_ = true;
39   }
40   return Status::OK();
41 }
42 
Close()43 Status StorageContainer::Close() noexcept {
44   if (is_open_) {
45     std::lock_guard<std::mutex> lck(mutex_);
46     // Check again
47     if (is_open_) {
48       RETURN_IF_NOT_OK(cont_.CloseFile(fd_));
49       is_open_ = false;
50       fd_ = -1;
51     }
52   }
53   return Status::OK();
54 }
55 
Read(WritableSlice * dest,off64_t offset) const56 Status StorageContainer::Read(WritableSlice *dest, off64_t offset) const noexcept {
57   MS_ASSERT(is_open_);
58   RETURN_UNEXPECTED_IF_NULL(dest);
59   auto sz = dest->GetSize();
60 #if defined(_WIN32) || defined(_WIN64)
61   // Doesn't seem there is any pread64 on mingw.
62   // So we will do a seek and then a read under
63   // a protection of mutex.
64   std::lock_guard<std::mutex> lck(mutex_);
65   auto seek_err = lseek(fd_, offset, SEEK_SET);
66   if (seek_err < 0) {
67     RETURN_STATUS_UNEXPECTED(strerror(errno));
68   }
69   auto r_sz = read(fd_, dest->GetMutablePointer(), sz);
70 #elif defined(__APPLE__)
71   auto r_sz = pread(fd_, dest->GetMutablePointer(), sz, offset);
72 #else
73   auto r_sz = pread64(fd_, dest->GetMutablePointer(), sz, offset);
74 #endif
75   if (r_sz != sz) {
76     errno_t err = (r_sz == 0) ? EOF : errno;
77     RETURN_STATUS_UNEXPECTED(strerror(err));
78   }
79   return Status::OK();
80 }
81 
Write(const ReadableSlice & dest,off64_t offset) const82 Status StorageContainer::Write(const ReadableSlice &dest, off64_t offset) const noexcept {
83   MS_ASSERT(is_open_);
84   auto sz = dest.GetSize();
85 #if defined(_WIN32) || defined(_WIN64)
86   // Doesn't seem there is any pwrite64 on mingw.
87   // So we will do a seek and then a read under
88   // a protection of mutex.
89   std::lock_guard<std::mutex> lck(mutex_);
90   auto seek_err = lseek(fd_, offset, SEEK_SET);
91   if (seek_err < 0) {
92     RETURN_STATUS_UNEXPECTED(strerror(errno));
93   }
94   auto r_sz = write(fd_, dest.GetPointer(), sz);
95 #elif defined(__APPLE__)
96   auto r_sz = pwrite(fd_, dest.GetPointer(), sz, offset);
97 #else
98   auto r_sz = pwrite64(fd_, dest.GetPointer(), sz, offset);
99 #endif
100   if (r_sz != sz) {
101     errno_t err = (r_sz == 0) ? EOF : errno;
102     if (errno == ENOSPC) {
103       RETURN_STATUS_ERROR(StatusCode::kMDNoSpace, "no space left.");
104     } else {
105       RETURN_STATUS_UNEXPECTED(strerror(err));
106     }
107   }
108   return Status::OK();
109 }
110 
Insert(const std::vector<ReadableSlice> & buf,off64_t * offset)111 Status StorageContainer::Insert(const std::vector<ReadableSlice> &buf, off64_t *offset) noexcept {
112   size_t sz = 0;
113   for (auto &v : buf) {
114     sz += v.GetSize();
115   }
116   if (sz == 0) {
117     RETURN_STATUS_UNEXPECTED("Unexpected 0 length");
118   }
119   if (sz > bs_->GetMaxSize()) {
120     RETURN_STATUS_UNEXPECTED("Request size too big");
121   }
122   BSpaceDescriptor bspd{0};
123   addr_t addr = 0;
124   RETURN_IF_NOT_OK(bs_->Alloc(sz, &bspd, &addr));
125   *offset = static_cast<off64_t>(addr);
126   // We will do piecewise copy of the data to a large buffer
127   std::string mem;
128   try {
129     mem.resize(sz);
130     CHECK_FAIL_RETURN_UNEXPECTED(mem.capacity() >= sz, "Programming error");
131   } catch (const std::bad_alloc &e) {
132     return Status(StatusCode::kMDOutOfMemory);
133   }
134   WritableSlice all(mem.data(), sz);
135   size_t pos = 0;
136   for (auto &v : buf) {
137     WritableSlice row_data(all, pos);
138     RETURN_IF_NOT_OK(WritableSlice::Copy(&row_data, v));
139     pos += v.GetSize();
140   }
141   // Write all data to disk at once
142   RETURN_IF_NOT_OK(Write(all, addr));
143   return Status::OK();
144 }
145 
Truncate() const146 Status StorageContainer::Truncate() const noexcept {
147   if (is_open_) {
148     RETURN_IF_NOT_OK(cont_.TruncateFile(fd_));
149     MS_LOG(INFO) << "Container " << cont_ << " truncated";
150   }
151   return Status::OK();
152 }
153 
~StorageContainer()154 StorageContainer::~StorageContainer() noexcept {
155   (void)Truncate();
156   (void)Close();
157 }
158 
operator <<(std::ostream & os,const StorageContainer & s)159 std::ostream &operator<<(std::ostream &os, const StorageContainer &s) {
160   os << "File path : " << s.cont_ << "\n" << *(s.bs_.get());
161   return os;
162 }
163 
CreateStorageContainer(std::shared_ptr<StorageContainer> * out_sc,const std::string & path)164 Status StorageContainer::CreateStorageContainer(std::shared_ptr<StorageContainer> *out_sc, const std::string &path) {
165   Status rc;
166   auto sc = new (std::nothrow) StorageContainer(path);
167   if (sc == nullptr) {
168     return Status(StatusCode::kMDOutOfMemory);
169   }
170   rc = sc->Create();
171   if (rc.IsOk()) {
172     (*out_sc).reset(sc);
173   } else {
174     delete sc;
175   }
176   return rc;
177 }
178 }  // namespace dataset
179 }  // namespace mindspore
180