1 /**
2 * Copyright 2020-2022 Huawei Technologies Co., Ltd
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 #include "minddata/dataset/engine/datasetops/cache_merge_op.h"
17
18 #include "minddata/dataset/core/config_manager.h"
19 #include "minddata/dataset/include/dataset/constants.h"
20 #include "minddata/dataset/core/global_context.h"
21 #include "minddata/dataset/engine/execution_tree.h"
22 #include "minddata/dataset/util/system_pool.h"
23 #include "minddata/dataset/util/task_manager.h"
24
25 namespace mindspore {
26 namespace dataset {
27 CacheMergeOp::~CacheMergeOp() = default;
Print(std::ostream & out,bool show_all) const28 void CacheMergeOp::Print(std::ostream &out, bool show_all) const {
29 if (!show_all) {
30 // Call the super class for displaying any common 1-liner info
31 ParallelOp::Print(out, show_all);
32 // Then show any custom derived-internal 1-liner info for this op
33 out << "\n";
34 } else {
35 // Call the super class for displaying any common detailed info
36 ParallelOp::Print(out, show_all);
37 // Then show any custom derived-internal stuff
38 out << "\n\n";
39 }
40 }
41
CacheMergeOp(int32_t numWorkers,int32_t opConnectorSize,int32_t numCleaners,std::shared_ptr<CacheClient> cache_client)42 CacheMergeOp::CacheMergeOp(int32_t numWorkers, int32_t opConnectorSize, int32_t numCleaners,
43 std::shared_ptr<CacheClient> cache_client)
44 : ParallelOp(numWorkers, opConnectorSize),
45 num_cleaners_(numCleaners),
46 cache_client_(std::move(cache_client)),
47 cache_missing_rows_(true) {}
48
operator ()()49 Status CacheMergeOp::operator()() {
50 // A queue of row id to let cleaner send cache miss rows to the cache server
51 // We don't want a small queue as this will block the parallel op workers.
52 // A row id is 8 byte integer. So bigger size doesn't consume a lot of memory.
53 static const int32_t queue_sz = 512;
54 io_que_ = std::make_unique<Queue<row_id_type>>(queue_sz);
55 RETURN_IF_NOT_OK(io_que_->Register(tree_->AllTasks()));
56
57 RETURN_IF_NOT_OK(RegisterAndLaunchThreads());
58
59 RETURN_IF_NOT_OK(
60 tree_->LaunchWorkers(1, std::bind(&CacheMergeOp::CacheMissMaster, this), Name() + "::CacheMissMaster", id()));
61 RETURN_IF_NOT_OK(tree_->LaunchWorkers(num_workers_,
62 std::bind(&CacheMergeOp::CacheMissWorkerEntry, this, std::placeholders::_1),
63 Name() + "::CacheMissWorkerEntry", id()));
64
65 // One dedicated thread to move TensorRow from the pool to the cache server
66 for (auto i = 0; i < num_cleaners_; ++i) {
67 RETURN_IF_NOT_OK(
68 tree_->AllTasks()->CreateAsyncTask("Cleaner", std::bind(&CacheMergeOp::Cleaner, this), nullptr, id()));
69 }
70 TaskManager::FindMe()->Post();
71 TensorRow new_row;
72 auto child_iterator = std::make_unique<ChildIterator>(this, 0, kCacheHitChildIdx);
73 int64_t ctr = 0;
74 do {
75 RETURN_IF_NOT_OK(child_iterator->FetchNextTensorRow(&new_row));
76 RETURN_IF_NOT_OK(worker_in_queues_[static_cast<const int>(ctr++ % num_workers_)]->EmplaceBack(std::move(new_row)));
77 } while (!new_row.eof());
78
79 return Status::OK();
80 }
81
82 // Each parallel worker will pop from the CacheHit stream. If there is a missing TensorRow, we will wait
83 // until it shows up in the pool.
WorkerEntry(int32_t worker_id)84 Status CacheMergeOp::WorkerEntry(int32_t worker_id) {
85 TaskManager::FindMe()->Post();
86 TensorRow new_row;
87 RETURN_IF_NOT_OK(worker_in_queues_[worker_id]->PopFront(&new_row));
88 while (!new_row.eof()) {
89 if (new_row.eoe()) {
90 RETURN_IF_NOT_OK(EoeReceived(worker_id));
91 RETURN_IF_NOT_OK(worker_in_queues_[worker_id]->PopFront(&new_row));
92 } else {
93 if (new_row.empty()) {
94 auto row_id = new_row.getId();
95 // Block until the row shows up in the pool.
96 RETURN_IF_NOT_OK(cache_miss_.PopFront(row_id, &new_row));
97 }
98 RETURN_IF_NOT_OK(worker_out_queues_[worker_id]->EmplaceBack(std::move(new_row)));
99
100 RETURN_IF_NOT_OK(worker_in_queues_[worker_id]->PopFront(&new_row));
101 }
102 }
103 RETURN_IF_NOT_OK(EofReceived(worker_id));
104 return Status::OK();
105 }
106
CacheMissMaster()107 Status CacheMergeOp::CacheMissMaster() {
108 missWorkers_in_queues_.Init(num_workers_, oc_queue_size_);
109 RETURN_IF_NOT_OK(missWorkers_in_queues_.Register(tree_->AllTasks()));
110 TaskManager::FindMe()->Post();
111 RETURN_IF_NOT_OK(cache_client_->CacheSchema(column_name_id_map()));
112 TensorRow new_row;
113 auto child_iterator = std::make_unique<ChildIterator>(this, 0, kCacheMissChildIdx);
114 int64_t ctr = 0;
115 do {
116 RETURN_IF_NOT_OK(child_iterator->FetchNextTensorRow(&new_row));
117 RETURN_IF_NOT_OK(
118 missWorkers_in_queues_[static_cast<const int>(ctr++ % num_workers_)]->EmplaceBack(std::move(new_row)));
119 } while (!new_row.eof());
120 return Status::OK();
121 }
122
CacheMissWorkerEntry(int32_t workerId)123 Status CacheMergeOp::CacheMissWorkerEntry(int32_t workerId) {
124 TaskManager::FindMe()->Post();
125 // We will simply pop TensorRow from the stream and insert them into the pool and
126 // wake up any worker that is awaiting on the missing TensorRow.
127 // If we see an eoe, ignore it. For eof, we exit.
128 // Before we start, cache the schema at the server. Pick one of the workers
129 // do it. The schema should have been done at prepare time.
130
131 TensorRow new_row;
132 RETURN_IF_NOT_OK(missWorkers_in_queues_[workerId]->PopFront(&new_row));
133 while (!new_row.eof()) {
134 if (new_row.eoe()) {
135 // Ignore it.
136 MS_LOG(DEBUG) << "Ignore eoe";
137 // However we need to flush any left over from the async write buffer. But any error
138 // we are getting will just to stop caching but the pipeline will continue
139 Status rc = cache_client_->FlushAsyncWriteBuffer();
140 if (rc.IsError()) {
141 cache_missing_rows_ = false;
142 if (rc == StatusCode::kMDOutOfMemory || rc == kMDNoSpace) {
143 cache_client_->ServerRunningOutOfResources();
144 } else {
145 MS_LOG(INFO) << "Async row flushing not successful: " << rc.ToString();
146 }
147 }
148 } else {
149 row_id_type row_id = new_row.getId();
150 if (row_id < 0) {
151 std::string errMsg =
152 "[Internal ERROR] row id should be greater than or equal to 0, but got: " + std::to_string(row_id);
153 RETURN_STATUS_UNEXPECTED(errMsg);
154 }
155 if (cache_missing_rows_) {
156 // Technically number of this row shows up in the cache miss stream is equal to the number
157 // of P() call. However the cleaner wants it too. So we need an extra copy.
158 TensorRowCacheRequest *rq;
159 RETURN_IF_NOT_OK(GetRq(row_id, &rq));
160 if (rq->GetState() == TensorRowCacheRequest::State::kEmpty) {
161 // We will send the request async. But any error we most
162 // likely ignore and continue.
163 Status rc = rq->AsyncSendCacheRequest(cache_client_, new_row);
164 if (rc.IsOk()) {
165 RETURN_IF_NOT_OK(io_que_->EmplaceBack(row_id));
166 } else if (rc == StatusCode::kMDOutOfMemory || rc == kMDNoSpace) {
167 cache_missing_rows_ = false;
168 cache_client_->ServerRunningOutOfResources();
169 }
170 }
171 }
172 RETURN_IF_NOT_OK(cache_miss_.Add(row_id, std::move(new_row)));
173 }
174 RETURN_IF_NOT_OK(missWorkers_in_queues_[workerId]->PopFront(&new_row));
175 }
176 return Status::OK();
177 }
178
Cleaner()179 Status CacheMergeOp::Cleaner() {
180 TaskManager::FindMe()->Post();
181 while (true) {
182 row_id_type row_id;
183 RETURN_IF_NOT_OK(io_que_->PopFront(&row_id));
184 if (row_id < 0) {
185 break;
186 }
187 // Locate the cache request
188 TensorRowCacheRequest *rq;
189 RETURN_IF_NOT_OK(GetRq(row_id, &rq));
190 // If already flushed, move on to the next one.
191 if (rq->GetState() == TensorRowCacheRequest::State::kClean) {
192 continue;
193 }
194 Status rc = rq->CheckCacheResult();
195 if (rc.IsError()) {
196 // If interrupt, time to quit.
197 if (rc == StatusCode::kMDInterrupted) {
198 return Status::OK();
199 } else if (rc == StatusCode::kMDOutOfMemory || rc == kMDNoSpace) {
200 // The server is hitting some limit and we will turn off caching from now on.
201 cache_missing_rows_ = false;
202 cache_client_->ServerRunningOutOfResources();
203 } else {
204 MS_LOG(INFO) << "Cache row not successful: " << rc.ToString();
205 // Bad rc should not bring down the pipeline. We will simply continue and
206 // change the state back to empty. We don't need a CAS from CLEAN back to EMPTY.
207 rq->SetState(TensorRowCacheRequest::State::kEmpty);
208 }
209 }
210 }
211 return Status::OK();
212 }
213
PrepareOperator()214 Status CacheMergeOp::PrepareOperator() { // Run any common code from super class first before adding our own
215 // specific logic
216 CHECK_FAIL_RETURN_UNEXPECTED(
217 child_.size() == kNumChildren,
218 "[Internal ERROR] Incorrect number of children of CacheMergeOp, required num is 2, but got:" +
219 std::to_string(child_.size()));
220 RETURN_IF_NOT_OK(DatasetOp::PrepareOperator());
221 // Get the computed check sum from all ops in the cache miss class
222 uint32_t cache_crc = DatasetOp::GenerateCRC(child_[kCacheMissChildIdx]);
223 // This is a mappable cache op so the id's need to be generated.
224 // Construct the cache
225 const bool generate_ids = false;
226 Status rc = cache_client_->CreateCache(cache_crc, generate_ids);
227 if (rc.StatusCode() == StatusCode::kMDDuplicateKey) {
228 // We are told the cache has been created already.
229 MS_LOG(INFO) << "Cache created already";
230 rc = Status::OK();
231 }
232 RETURN_IF_NOT_OK(rc);
233 return Status::OK();
234 }
235
ComputeColMap()236 Status CacheMergeOp::ComputeColMap() {
237 CHECK_FAIL_RETURN_UNEXPECTED(child_[kCacheMissChildIdx] != nullptr, "[Internal ERROR] cache miss stream is empty.");
238 if (column_name_id_map().empty()) {
239 column_name_id_map_ = child_[kCacheMissChildIdx]->column_name_id_map();
240 }
241 CHECK_FAIL_RETURN_UNEXPECTED(!column_name_id_map().empty(),
242 "Invalid data, column_name_id_map of CacheMergeOp is empty.");
243 return Status::OK();
244 }
245
EoeReceived(int32_t worker_id)246 Status CacheMergeOp::EoeReceived(int32_t worker_id) {
247 // Send the eoe up.
248 MS_LOG(DEBUG) << "Cache merge sending eoe";
249 RETURN_IF_NOT_OK(worker_out_queues_[worker_id]->EmplaceBack(TensorRow(TensorRow::TensorRowFlags::kFlagEOE)));
250 return Status::OK();
251 }
252
253 // Base-class override for handling cases when an eof is received.
EofReceived(int32_t worker_id)254 Status CacheMergeOp::EofReceived(int32_t worker_id) {
255 // Send the eof up.
256 MS_LOG(DEBUG) << "Cache merge sending eof";
257 RETURN_IF_NOT_OK(worker_out_queues_[worker_id]->EmplaceBack(TensorRow(TensorRow::TensorRowFlags::kFlagEOF)));
258 return Status::OK();
259 }
260
GetRq(row_id_type row_id,CacheMergeOp::TensorRowCacheRequest ** out)261 Status CacheMergeOp::GetRq(row_id_type row_id, CacheMergeOp::TensorRowCacheRequest **out) {
262 RETURN_UNEXPECTED_IF_NULL(out);
263 std::unique_lock<std::mutex> lock(mux_);
264 auto it = io_request_.find(row_id);
265 if (it != io_request_.end()) {
266 *out = it->second.GetMutablePointer();
267 } else {
268 // We will create a new one.
269 auto alloc = SystemPool::GetAllocator<TensorRowCacheRequest>();
270 auto r = io_request_.emplace(row_id, MemGuard<TensorRowCacheRequest, Allocator<TensorRowCacheRequest>>(alloc));
271 if (r.second) {
272 auto &mem = r.first->second;
273 RETURN_IF_NOT_OK(mem.allocate(1));
274 *out = mem.GetMutablePointer();
275 } else {
276 RETURN_STATUS_UNEXPECTED("[Internal ERROR] map insert fail.");
277 }
278 }
279 return Status::OK();
280 }
281
AsyncSendCacheRequest(const std::shared_ptr<CacheClient> & cc,const TensorRow & row)282 Status CacheMergeOp::TensorRowCacheRequest::AsyncSendCacheRequest(const std::shared_ptr<CacheClient> &cc,
283 const TensorRow &row) {
284 auto expected = State::kEmpty;
285 if (st_.compare_exchange_strong(expected, State::kDirty)) {
286 // We will do a deep copy but write directly into CacheRequest protobuf or shared memory
287 Status rc = cc->AsyncWriteRow(row);
288 if (rc.StatusCode() == StatusCode::kMDNotImplementedYet) {
289 cleaner_copy_ = std::make_shared<CacheRowRequest>(cc.get());
290 rc = cleaner_copy_->SerializeCacheRowRequest(cc.get(), row);
291 if (rc.IsOk()) {
292 // Send the request async. The cleaner will check the return code.
293 rc = cc->PushRequest(cleaner_copy_);
294 }
295 } else if (rc.IsOk()) {
296 // Set the state to clean even though it still sits in the cache client async buffer.
297 // The cleaner will then ignore it once the state is clean.
298 st_ = State::kClean;
299 }
300 if (rc.IsError()) {
301 // Clean up the shared pointer and reset the state back to empty
302 cleaner_copy_.reset();
303 st_ = State::kEmpty;
304 }
305 return rc;
306 }
307 return Status::OK();
308 }
309
CheckCacheResult()310 Status CacheMergeOp::TensorRowCacheRequest::CheckCacheResult() {
311 auto expected = State::kDirty;
312 if (st_.compare_exchange_strong(expected, State::kClean)) {
313 // Success or not, we will release the memory.
314 // We simply move it out of the structure and let it go out of scope.
315 auto cache_request = std::move(cleaner_copy_);
316 RETURN_IF_NOT_OK(cache_request->Wait());
317 return Status::OK();
318 }
319 return Status::OK();
320 }
321 } // namespace dataset
322 } // namespace mindspore
323