1 /**
2 * Copyright 2019 Huawei Technologies Co., Ltd
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 #include "minddata/dataset/engine/cache/storage_container.h"
17 #include <unistd.h>
18 #include <vector>
19 #include "minddata/dataset/util/log_adapter.h"
20 #include "minddata/dataset/util/path.h"
21 #include "minddata/dataset/util/status.h"
22
23 namespace mindspore {
24 namespace dataset {
Create()25 Status StorageContainer::Create() {
26 RETURN_IF_NOT_OK(BuddySpace::CreateBuddySpace(&bs_));
27 RETURN_IF_NOT_OK(cont_.CreateFile(&fd_));
28 is_open_ = true;
29 MS_LOG(INFO) << "Container " << cont_ << " created";
30 return Status::OK();
31 }
32
Open()33 Status StorageContainer::Open() noexcept {
34 std::lock_guard<std::mutex> lck(mutex_);
35 // Check again
36 if (!is_open_) {
37 RETURN_IF_NOT_OK(cont_.OpenFile(&fd_));
38 is_open_ = true;
39 }
40 return Status::OK();
41 }
42
Close()43 Status StorageContainer::Close() noexcept {
44 if (is_open_) {
45 std::lock_guard<std::mutex> lck(mutex_);
46 // Check again
47 if (is_open_) {
48 RETURN_IF_NOT_OK(cont_.CloseFile(fd_));
49 is_open_ = false;
50 fd_ = -1;
51 }
52 }
53 return Status::OK();
54 }
55
Read(WritableSlice * dest,off64_t offset) const56 Status StorageContainer::Read(WritableSlice *dest, off64_t offset) const noexcept {
57 MS_ASSERT(is_open_);
58 RETURN_UNEXPECTED_IF_NULL(dest);
59 auto sz = dest->GetSize();
60 #if defined(_WIN32) || defined(_WIN64)
61 // Doesn't seem there is any pread64 on mingw.
62 // So we will do a seek and then a read under
63 // a protection of mutex.
64 std::lock_guard<std::mutex> lck(mutex_);
65 auto seek_err = lseek(fd_, offset, SEEK_SET);
66 if (seek_err < 0) {
67 RETURN_STATUS_UNEXPECTED(strerror(errno));
68 }
69 auto r_sz = read(fd_, dest->GetMutablePointer(), sz);
70 #elif defined(__APPLE__)
71 auto r_sz = pread(fd_, dest->GetMutablePointer(), sz, offset);
72 #else
73 auto r_sz = pread64(fd_, dest->GetMutablePointer(), sz, offset);
74 #endif
75 if (r_sz != sz) {
76 errno_t err = (r_sz == 0) ? EOF : errno;
77 RETURN_STATUS_UNEXPECTED(strerror(err));
78 }
79 return Status::OK();
80 }
81
Write(const ReadableSlice & dest,off64_t offset) const82 Status StorageContainer::Write(const ReadableSlice &dest, off64_t offset) const noexcept {
83 MS_ASSERT(is_open_);
84 auto sz = dest.GetSize();
85 #if defined(_WIN32) || defined(_WIN64)
86 // Doesn't seem there is any pwrite64 on mingw.
87 // So we will do a seek and then a read under
88 // a protection of mutex.
89 std::lock_guard<std::mutex> lck(mutex_);
90 auto seek_err = lseek(fd_, offset, SEEK_SET);
91 if (seek_err < 0) {
92 RETURN_STATUS_UNEXPECTED(strerror(errno));
93 }
94 auto r_sz = write(fd_, dest.GetPointer(), sz);
95 #elif defined(__APPLE__)
96 auto r_sz = pwrite(fd_, dest.GetPointer(), sz, offset);
97 #else
98 auto r_sz = pwrite64(fd_, dest.GetPointer(), sz, offset);
99 #endif
100 if (r_sz != sz) {
101 errno_t err = (r_sz == 0) ? EOF : errno;
102 if (errno == ENOSPC) {
103 RETURN_STATUS_ERROR(StatusCode::kMDNoSpace, "no space left.");
104 } else {
105 RETURN_STATUS_UNEXPECTED(strerror(err));
106 }
107 }
108 return Status::OK();
109 }
110
Insert(const std::vector<ReadableSlice> & buf,off64_t * offset)111 Status StorageContainer::Insert(const std::vector<ReadableSlice> &buf, off64_t *offset) noexcept {
112 size_t sz = 0;
113 for (auto &v : buf) {
114 sz += v.GetSize();
115 }
116 if (sz == 0) {
117 RETURN_STATUS_UNEXPECTED("Unexpected 0 length");
118 }
119 if (sz > bs_->GetMaxSize()) {
120 RETURN_STATUS_UNEXPECTED("Request size too big");
121 }
122 BSpaceDescriptor bspd{0};
123 addr_t addr = 0;
124 RETURN_IF_NOT_OK(bs_->Alloc(sz, &bspd, &addr));
125 *offset = static_cast<off64_t>(addr);
126 // We will do piecewise copy of the data to a large buffer
127 std::string mem;
128 try {
129 mem.resize(sz);
130 CHECK_FAIL_RETURN_UNEXPECTED(mem.capacity() >= sz, "Programming error");
131 } catch (const std::bad_alloc &e) {
132 return Status(StatusCode::kMDOutOfMemory);
133 }
134 WritableSlice all(mem.data(), sz);
135 size_t pos = 0;
136 for (auto &v : buf) {
137 WritableSlice row_data(all, pos);
138 RETURN_IF_NOT_OK(WritableSlice::Copy(&row_data, v));
139 pos += v.GetSize();
140 }
141 // Write all data to disk at once
142 RETURN_IF_NOT_OK(Write(all, addr));
143 return Status::OK();
144 }
145
Truncate() const146 Status StorageContainer::Truncate() const noexcept {
147 if (is_open_) {
148 RETURN_IF_NOT_OK(cont_.TruncateFile(fd_));
149 MS_LOG(INFO) << "Container " << cont_ << " truncated";
150 }
151 return Status::OK();
152 }
153
~StorageContainer()154 StorageContainer::~StorageContainer() noexcept {
155 (void)Truncate();
156 (void)Close();
157 }
158
operator <<(std::ostream & os,const StorageContainer & s)159 std::ostream &operator<<(std::ostream &os, const StorageContainer &s) {
160 os << "File path : " << s.cont_ << "\n" << *(s.bs_.get());
161 return os;
162 }
163
CreateStorageContainer(std::shared_ptr<StorageContainer> * out_sc,const std::string & path)164 Status StorageContainer::CreateStorageContainer(std::shared_ptr<StorageContainer> *out_sc, const std::string &path) {
165 Status rc;
166 auto sc = new (std::nothrow) StorageContainer(path);
167 if (sc == nullptr) {
168 return Status(StatusCode::kMDOutOfMemory);
169 }
170 rc = sc->Create();
171 if (rc.IsOk()) {
172 (*out_sc).reset(sc);
173 } else {
174 delete sc;
175 }
176 return rc;
177 }
178 } // namespace dataset
179 } // namespace mindspore
180