• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2020-2022 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include "minddata/dataset/engine/datasetops/cache_merge_op.h"
17 
18 #include "minddata/dataset/core/config_manager.h"
19 #include "minddata/dataset/include/dataset/constants.h"
20 #include "minddata/dataset/core/global_context.h"
21 #include "minddata/dataset/engine/execution_tree.h"
22 #include "minddata/dataset/util/system_pool.h"
23 #include "minddata/dataset/util/task_manager.h"
24 
25 namespace mindspore {
26 namespace dataset {
27 CacheMergeOp::~CacheMergeOp() = default;
Print(std::ostream & out,bool show_all) const28 void CacheMergeOp::Print(std::ostream &out, bool show_all) const {
29   if (!show_all) {
30     // Call the super class for displaying any common 1-liner info
31     ParallelOp::Print(out, show_all);
32     // Then show any custom derived-internal 1-liner info for this op
33     out << "\n";
34   } else {
35     // Call the super class for displaying any common detailed info
36     ParallelOp::Print(out, show_all);
37     // Then show any custom derived-internal stuff
38     out << "\n\n";
39   }
40 }
41 
CacheMergeOp(int32_t numWorkers,int32_t opConnectorSize,int32_t numCleaners,std::shared_ptr<CacheClient> cache_client)42 CacheMergeOp::CacheMergeOp(int32_t numWorkers, int32_t opConnectorSize, int32_t numCleaners,
43                            std::shared_ptr<CacheClient> cache_client)
44     : ParallelOp(numWorkers, opConnectorSize),
45       num_cleaners_(numCleaners),
46       cache_client_(std::move(cache_client)),
47       cache_missing_rows_(true) {}
48 
operator ()()49 Status CacheMergeOp::operator()() {
50   // A queue of row id to let cleaner send cache miss rows to the cache server
51   // We don't want a small queue as this will block the parallel op workers.
52   // A row id is 8 byte integer. So bigger size doesn't consume a lot of memory.
53   static const int32_t queue_sz = 512;
54   io_que_ = std::make_unique<Queue<row_id_type>>(queue_sz);
55   RETURN_IF_NOT_OK(io_que_->Register(tree_->AllTasks()));
56 
57   RETURN_IF_NOT_OK(RegisterAndLaunchThreads());
58 
59   RETURN_IF_NOT_OK(
60     tree_->LaunchWorkers(1, std::bind(&CacheMergeOp::CacheMissMaster, this), Name() + "::CacheMissMaster", id()));
61   RETURN_IF_NOT_OK(tree_->LaunchWorkers(num_workers_,
62                                         std::bind(&CacheMergeOp::CacheMissWorkerEntry, this, std::placeholders::_1),
63                                         Name() + "::CacheMissWorkerEntry", id()));
64 
65   // One dedicated thread to move TensorRow from the pool to the cache server
66   for (auto i = 0; i < num_cleaners_; ++i) {
67     RETURN_IF_NOT_OK(
68       tree_->AllTasks()->CreateAsyncTask("Cleaner", std::bind(&CacheMergeOp::Cleaner, this), nullptr, id()));
69   }
70   TaskManager::FindMe()->Post();
71   TensorRow new_row;
72   auto child_iterator = std::make_unique<ChildIterator>(this, 0, kCacheHitChildIdx);
73   int64_t ctr = 0;
74   do {
75     RETURN_IF_NOT_OK(child_iterator->FetchNextTensorRow(&new_row));
76     RETURN_IF_NOT_OK(worker_in_queues_[static_cast<const int>(ctr++ % num_workers_)]->EmplaceBack(std::move(new_row)));
77   } while (!new_row.eof());
78 
79   return Status::OK();
80 }
81 
82 // Each parallel worker will pop from the CacheHit stream. If there is a missing TensorRow, we will wait
83 // until it shows up in the pool.
WorkerEntry(int32_t worker_id)84 Status CacheMergeOp::WorkerEntry(int32_t worker_id) {
85   TaskManager::FindMe()->Post();
86   TensorRow new_row;
87   RETURN_IF_NOT_OK(worker_in_queues_[worker_id]->PopFront(&new_row));
88   while (!new_row.eof()) {
89     if (new_row.eoe()) {
90       RETURN_IF_NOT_OK(EoeReceived(worker_id));
91       RETURN_IF_NOT_OK(worker_in_queues_[worker_id]->PopFront(&new_row));
92     } else {
93       if (new_row.empty()) {
94         auto row_id = new_row.getId();
95         // Block until the row shows up in the pool.
96         RETURN_IF_NOT_OK(cache_miss_.PopFront(row_id, &new_row));
97       }
98       RETURN_IF_NOT_OK(worker_out_queues_[worker_id]->EmplaceBack(std::move(new_row)));
99 
100       RETURN_IF_NOT_OK(worker_in_queues_[worker_id]->PopFront(&new_row));
101     }
102   }
103   RETURN_IF_NOT_OK(EofReceived(worker_id));
104   return Status::OK();
105 }
106 
CacheMissMaster()107 Status CacheMergeOp::CacheMissMaster() {
108   missWorkers_in_queues_.Init(num_workers_, oc_queue_size_);
109   RETURN_IF_NOT_OK(missWorkers_in_queues_.Register(tree_->AllTasks()));
110   TaskManager::FindMe()->Post();
111   RETURN_IF_NOT_OK(cache_client_->CacheSchema(column_name_id_map()));
112   TensorRow new_row;
113   auto child_iterator = std::make_unique<ChildIterator>(this, 0, kCacheMissChildIdx);
114   int64_t ctr = 0;
115   do {
116     RETURN_IF_NOT_OK(child_iterator->FetchNextTensorRow(&new_row));
117     RETURN_IF_NOT_OK(
118       missWorkers_in_queues_[static_cast<const int>(ctr++ % num_workers_)]->EmplaceBack(std::move(new_row)));
119   } while (!new_row.eof());
120   return Status::OK();
121 }
122 
CacheMissWorkerEntry(int32_t workerId)123 Status CacheMergeOp::CacheMissWorkerEntry(int32_t workerId) {
124   TaskManager::FindMe()->Post();
125   // We will simply pop TensorRow from the stream and insert them into the pool and
126   // wake up any worker that is awaiting on the missing TensorRow.
127   // If we see an eoe, ignore it. For eof, we exit.
128   // Before we start, cache the schema at the server. Pick one of the workers
129   // do it. The schema should have been done at prepare time.
130 
131   TensorRow new_row;
132   RETURN_IF_NOT_OK(missWorkers_in_queues_[workerId]->PopFront(&new_row));
133   while (!new_row.eof()) {
134     if (new_row.eoe()) {
135       // Ignore it.
136       MS_LOG(DEBUG) << "Ignore eoe";
137       // However we need to flush any left over from the async write buffer. But any error
138       // we are getting will just to stop caching but the pipeline will continue
139       Status rc = cache_client_->FlushAsyncWriteBuffer();
140       if (rc.IsError()) {
141         cache_missing_rows_ = false;
142         if (rc == StatusCode::kMDOutOfMemory || rc == kMDNoSpace) {
143           cache_client_->ServerRunningOutOfResources();
144         } else {
145           MS_LOG(INFO) << "Async row flushing not successful: " << rc.ToString();
146         }
147       }
148     } else {
149       row_id_type row_id = new_row.getId();
150       if (row_id < 0) {
151         std::string errMsg =
152           "[Internal ERROR] row id should be greater than or equal to 0, but got: " + std::to_string(row_id);
153         RETURN_STATUS_UNEXPECTED(errMsg);
154       }
155       if (cache_missing_rows_) {
156         // Technically number of this row shows up in the cache miss stream is equal to the number
157         // of P() call. However the cleaner wants it too. So we need an extra copy.
158         TensorRowCacheRequest *rq;
159         RETURN_IF_NOT_OK(GetRq(row_id, &rq));
160         if (rq->GetState() == TensorRowCacheRequest::State::kEmpty) {
161           // We will send the request async. But any error we most
162           // likely ignore and continue.
163           Status rc = rq->AsyncSendCacheRequest(cache_client_, new_row);
164           if (rc.IsOk()) {
165             RETURN_IF_NOT_OK(io_que_->EmplaceBack(row_id));
166           } else if (rc == StatusCode::kMDOutOfMemory || rc == kMDNoSpace) {
167             cache_missing_rows_ = false;
168             cache_client_->ServerRunningOutOfResources();
169           }
170         }
171       }
172       RETURN_IF_NOT_OK(cache_miss_.Add(row_id, std::move(new_row)));
173     }
174     RETURN_IF_NOT_OK(missWorkers_in_queues_[workerId]->PopFront(&new_row));
175   }
176   return Status::OK();
177 }
178 
Cleaner()179 Status CacheMergeOp::Cleaner() {
180   TaskManager::FindMe()->Post();
181   while (true) {
182     row_id_type row_id;
183     RETURN_IF_NOT_OK(io_que_->PopFront(&row_id));
184     if (row_id < 0) {
185       break;
186     }
187     // Locate the cache request
188     TensorRowCacheRequest *rq;
189     RETURN_IF_NOT_OK(GetRq(row_id, &rq));
190     // If already flushed, move on to the next one.
191     if (rq->GetState() == TensorRowCacheRequest::State::kClean) {
192       continue;
193     }
194     Status rc = rq->CheckCacheResult();
195     if (rc.IsError()) {
196       // If interrupt, time to quit.
197       if (rc == StatusCode::kMDInterrupted) {
198         return Status::OK();
199       } else if (rc == StatusCode::kMDOutOfMemory || rc == kMDNoSpace) {
200         // The server is hitting some limit and we will turn off caching from now on.
201         cache_missing_rows_ = false;
202         cache_client_->ServerRunningOutOfResources();
203       } else {
204         MS_LOG(INFO) << "Cache row not successful: " << rc.ToString();
205         // Bad rc should not bring down the pipeline. We will simply continue and
206         // change the state back to empty. We don't need a CAS from CLEAN back to EMPTY.
207         rq->SetState(TensorRowCacheRequest::State::kEmpty);
208       }
209     }
210   }
211   return Status::OK();
212 }
213 
PrepareOperator()214 Status CacheMergeOp::PrepareOperator() {  // Run any common code from super class first before adding our own
215                                           // specific logic
216   CHECK_FAIL_RETURN_UNEXPECTED(
217     child_.size() == kNumChildren,
218     "[Internal ERROR] Incorrect number of children of CacheMergeOp, required num is 2, but got:" +
219       std::to_string(child_.size()));
220   RETURN_IF_NOT_OK(DatasetOp::PrepareOperator());
221   // Get the computed check sum from all ops in the cache miss class
222   uint32_t cache_crc = DatasetOp::GenerateCRC(child_[kCacheMissChildIdx]);
223   // This is a mappable cache op so the id's need to be generated.
224   // Construct the cache
225   const bool generate_ids = false;
226   Status rc = cache_client_->CreateCache(cache_crc, generate_ids);
227   if (rc.StatusCode() == StatusCode::kMDDuplicateKey) {
228     // We are told the cache has been created already.
229     MS_LOG(INFO) << "Cache created already";
230     rc = Status::OK();
231   }
232   RETURN_IF_NOT_OK(rc);
233   return Status::OK();
234 }
235 
ComputeColMap()236 Status CacheMergeOp::ComputeColMap() {
237   CHECK_FAIL_RETURN_UNEXPECTED(child_[kCacheMissChildIdx] != nullptr, "[Internal ERROR] cache miss stream is empty.");
238   if (column_name_id_map().empty()) {
239     column_name_id_map_ = child_[kCacheMissChildIdx]->column_name_id_map();
240   }
241   CHECK_FAIL_RETURN_UNEXPECTED(!column_name_id_map().empty(),
242                                "Invalid data, column_name_id_map of CacheMergeOp is empty.");
243   return Status::OK();
244 }
245 
EoeReceived(int32_t worker_id)246 Status CacheMergeOp::EoeReceived(int32_t worker_id) {
247   // Send the eoe up.
248   MS_LOG(DEBUG) << "Cache merge sending eoe";
249   RETURN_IF_NOT_OK(worker_out_queues_[worker_id]->EmplaceBack(TensorRow(TensorRow::TensorRowFlags::kFlagEOE)));
250   return Status::OK();
251 }
252 
253 // Base-class override for handling cases when an eof is received.
EofReceived(int32_t worker_id)254 Status CacheMergeOp::EofReceived(int32_t worker_id) {
255   // Send the eof up.
256   MS_LOG(DEBUG) << "Cache merge sending eof";
257   RETURN_IF_NOT_OK(worker_out_queues_[worker_id]->EmplaceBack(TensorRow(TensorRow::TensorRowFlags::kFlagEOF)));
258   return Status::OK();
259 }
260 
GetRq(row_id_type row_id,CacheMergeOp::TensorRowCacheRequest ** out)261 Status CacheMergeOp::GetRq(row_id_type row_id, CacheMergeOp::TensorRowCacheRequest **out) {
262   RETURN_UNEXPECTED_IF_NULL(out);
263   std::unique_lock<std::mutex> lock(mux_);
264   auto it = io_request_.find(row_id);
265   if (it != io_request_.end()) {
266     *out = it->second.GetMutablePointer();
267   } else {
268     // We will create a new one.
269     auto alloc = SystemPool::GetAllocator<TensorRowCacheRequest>();
270     auto r = io_request_.emplace(row_id, MemGuard<TensorRowCacheRequest, Allocator<TensorRowCacheRequest>>(alloc));
271     if (r.second) {
272       auto &mem = r.first->second;
273       RETURN_IF_NOT_OK(mem.allocate(1));
274       *out = mem.GetMutablePointer();
275     } else {
276       RETURN_STATUS_UNEXPECTED("[Internal ERROR] map insert fail.");
277     }
278   }
279   return Status::OK();
280 }
281 
AsyncSendCacheRequest(const std::shared_ptr<CacheClient> & cc,const TensorRow & row)282 Status CacheMergeOp::TensorRowCacheRequest::AsyncSendCacheRequest(const std::shared_ptr<CacheClient> &cc,
283                                                                   const TensorRow &row) {
284   auto expected = State::kEmpty;
285   if (st_.compare_exchange_strong(expected, State::kDirty)) {
286     // We will do a deep copy but write directly into CacheRequest protobuf or shared memory
287     Status rc = cc->AsyncWriteRow(row);
288     if (rc.StatusCode() == StatusCode::kMDNotImplementedYet) {
289       cleaner_copy_ = std::make_shared<CacheRowRequest>(cc.get());
290       rc = cleaner_copy_->SerializeCacheRowRequest(cc.get(), row);
291       if (rc.IsOk()) {
292         // Send the request async. The cleaner will check the return code.
293         rc = cc->PushRequest(cleaner_copy_);
294       }
295     } else if (rc.IsOk()) {
296       // Set the state to clean even though it still sits in the cache client async buffer.
297       // The cleaner will then ignore it once the state is clean.
298       st_ = State::kClean;
299     }
300     if (rc.IsError()) {
301       // Clean up the shared pointer and reset the state back to empty
302       cleaner_copy_.reset();
303       st_ = State::kEmpty;
304     }
305     return rc;
306   }
307   return Status::OK();
308 }
309 
CheckCacheResult()310 Status CacheMergeOp::TensorRowCacheRequest::CheckCacheResult() {
311   auto expected = State::kDirty;
312   if (st_.compare_exchange_strong(expected, State::kClean)) {
313     // Success or not, we will release the memory.
314     // We simply move it out of the structure and let it go out of scope.
315     auto cache_request = std::move(cleaner_copy_);
316     RETURN_IF_NOT_OK(cache_request->Wait());
317     return Status::OK();
318   }
319   return Status::OK();
320 }
321 }  // namespace dataset
322 }  // namespace mindspore
323