1 /**
2 * Copyright 2019 Huawei Technologies Co., Ltd
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 #include "minddata/dataset/engine/cache/storage_container.h"
17
18 #include <unistd.h>
19 #include <vector>
20 #include "utils/ms_utils.h"
21 #include "minddata/dataset/util/log_adapter.h"
22 #include "minddata/dataset/util/path.h"
23 #include "minddata/dataset/util/status.h"
24
25 namespace mindspore {
26 namespace dataset {
Create()27 Status StorageContainer::Create() {
28 RETURN_IF_NOT_OK(BuddySpace::CreateBuddySpace(&bs_));
29 RETURN_IF_NOT_OK(cont_.CreateFile(&fd_));
30 is_open_ = true;
31 MS_LOG(INFO) << "Container " << cont_ << " created";
32 return Status::OK();
33 }
34
Open()35 Status StorageContainer::Open() noexcept {
36 std::lock_guard<std::mutex> lck(mutex_);
37 // Check again
38 if (!is_open_) {
39 RETURN_IF_NOT_OK(cont_.OpenFile(&fd_));
40 is_open_ = true;
41 }
42 return Status::OK();
43 }
44
Close()45 Status StorageContainer::Close() noexcept {
46 if (is_open_) {
47 std::lock_guard<std::mutex> lck(mutex_);
48 // Check again
49 if (is_open_) {
50 RETURN_IF_NOT_OK(cont_.CloseFile(fd_));
51 is_open_ = false;
52 fd_ = -1;
53 }
54 }
55 return Status::OK();
56 }
57
Read(WritableSlice * dest,off64_t offset) const58 Status StorageContainer::Read(WritableSlice *dest, off64_t offset) const noexcept {
59 MS_ASSERT(is_open_);
60 RETURN_UNEXPECTED_IF_NULL(dest);
61 auto sz = dest->GetSize();
62 #if defined(_WIN32) || defined(_WIN64)
63 // Doesn't seem there is any pread64 on mingw.
64 // So we will do a seek and then a read under
65 // a protection of mutex.
66 std::lock_guard<std::mutex> lck(mutex_);
67 auto seek_err = lseek(fd_, offset, SEEK_SET);
68 if (seek_err < 0) {
69 RETURN_STATUS_UNEXPECTED(strerror(errno));
70 }
71 auto r_sz = read(fd_, dest->GetMutablePointer(), sz);
72 #elif defined(__APPLE__)
73 auto r_sz = pread(fd_, dest->GetMutablePointer(), sz, offset);
74 #else
75 auto r_sz = pread64(fd_, dest->GetMutablePointer(), sz, offset);
76 #endif
77 if (r_sz != sz) {
78 errno_t err = (r_sz == 0) ? EOF : errno;
79 RETURN_STATUS_UNEXPECTED(strerror(err));
80 }
81 return Status::OK();
82 }
83
Write(const ReadableSlice & dest,off64_t offset) const84 Status StorageContainer::Write(const ReadableSlice &dest, off64_t offset) const noexcept {
85 MS_ASSERT(is_open_);
86 auto sz = dest.GetSize();
87 #if defined(_WIN32) || defined(_WIN64)
88 // Doesn't seem there is any pwrite64 on mingw.
89 // So we will do a seek and then a read under
90 // a protection of mutex.
91 std::lock_guard<std::mutex> lck(mutex_);
92 auto seek_err = lseek(fd_, offset, SEEK_SET);
93 if (seek_err < 0) {
94 RETURN_STATUS_UNEXPECTED(strerror(errno));
95 }
96 auto r_sz = write(fd_, dest.GetPointer(), sz);
97 #elif defined(__APPLE__)
98 auto r_sz = pwrite(fd_, dest.GetPointer(), sz, offset);
99 #else
100 auto r_sz = pwrite64(fd_, dest.GetPointer(), sz, offset);
101 #endif
102 if (r_sz != sz) {
103 errno_t err = (r_sz == 0) ? EOF : errno;
104 if (errno == ENOSPC) {
105 return Status(StatusCode::kMDNoSpace, __LINE__, __FILE__);
106 } else {
107 RETURN_STATUS_UNEXPECTED(strerror(err));
108 }
109 }
110 return Status::OK();
111 }
112
Insert(const std::vector<ReadableSlice> & buf,off64_t * offset)113 Status StorageContainer::Insert(const std::vector<ReadableSlice> &buf, off64_t *offset) noexcept {
114 size_t sz = 0;
115 for (auto &v : buf) {
116 sz += v.GetSize();
117 }
118 if (sz == 0) {
119 RETURN_STATUS_UNEXPECTED("Unexpected 0 length");
120 }
121 if (sz > bs_->GetMaxSize()) {
122 RETURN_STATUS_UNEXPECTED("Request size too big");
123 }
124 BSpaceDescriptor bspd{0};
125 addr_t addr = 0;
126 RETURN_IF_NOT_OK(bs_->Alloc(sz, &bspd, &addr));
127 *offset = static_cast<off64_t>(addr);
128 // We will do piecewise copy of the data to a large buffer
129 std::string mem;
130 try {
131 mem.resize(sz);
132 CHECK_FAIL_RETURN_UNEXPECTED(mem.capacity() >= sz, "Programming error");
133 } catch (const std::bad_alloc &e) {
134 return Status(StatusCode::kMDOutOfMemory);
135 }
136 WritableSlice all(mem.data(), sz);
137 size_t pos = 0;
138 for (auto &v : buf) {
139 WritableSlice row_data(all, pos);
140 RETURN_IF_NOT_OK(WritableSlice::Copy(&row_data, v));
141 pos += v.GetSize();
142 }
143 // Write all data to disk at once
144 RETURN_IF_NOT_OK(Write(all, addr));
145 return Status::OK();
146 }
147
Truncate() const148 Status StorageContainer::Truncate() const noexcept {
149 if (is_open_) {
150 RETURN_IF_NOT_OK(cont_.TruncateFile(fd_));
151 MS_LOG(INFO) << "Container " << cont_ << " truncated";
152 }
153 return Status::OK();
154 }
155
~StorageContainer()156 StorageContainer::~StorageContainer() noexcept {
157 (void)Truncate();
158 (void)Close();
159 }
160
operator <<(std::ostream & os,const StorageContainer & s)161 std::ostream &operator<<(std::ostream &os, const StorageContainer &s) {
162 os << "File path : " << s.cont_ << "\n" << *(s.bs_.get());
163 return os;
164 }
165
CreateStorageContainer(std::shared_ptr<StorageContainer> * out_sc,const std::string & path)166 Status StorageContainer::CreateStorageContainer(std::shared_ptr<StorageContainer> *out_sc, const std::string &path) {
167 Status rc;
168 auto sc = new (std::nothrow) StorageContainer(path);
169 if (sc == nullptr) {
170 return Status(StatusCode::kMDOutOfMemory);
171 }
172 rc = sc->Create();
173 if (rc.IsOk()) {
174 (*out_sc).reset(sc);
175 } else {
176 delete sc;
177 }
178 return rc;
179 }
180 } // namespace dataset
181 } // namespace mindspore
182