• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2021-2022 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include "minddata/dataset/engine/datasetops/source/mappable_leaf_op.h"
17 #include "utils/ms_utils.h"
18 #include "minddata/dataset/core/config_manager.h"
19 #include "minddata/dataset/engine/datasetops/source/sampler/sequential_sampler.h"
20 #include "minddata/dataset/engine/execution_tree.h"
21 
22 namespace mindspore {
23 namespace dataset {
MappableLeafOp(int32_t num_wkrs,int32_t queue_size,std::shared_ptr<SamplerRT> sampler)24 MappableLeafOp::MappableLeafOp(int32_t num_wkrs, int32_t queue_size, std::shared_ptr<SamplerRT> sampler)
25     : ParallelOp(num_wkrs, queue_size, std::move(sampler)),
26       sample_ids_(nullptr),
27       curr_row_(0),
28       prepared_data_{false},
29       eof_handled_{false} {}
30 
31 #ifdef ENABLE_PYTHON
ImageDecrypt(const std::string & path,std::shared_ptr<Tensor> * tensor,const py::function & decrypt)32 Status MappableLeafOp::ImageDecrypt(const std::string &path, std::shared_ptr<Tensor> *tensor,
33                                     const py::function &decrypt) {
34   RETURN_UNEXPECTED_IF_NULL(tensor);
35   if (decrypt == nullptr || py::isinstance<py::none>(decrypt)) {
36     RETURN_IF_NOT_OK(Tensor::CreateFromFile(path, tensor));
37   } else {
38     // Acquire Python GIL
39     py::gil_scoped_acquire gil_acquire;
40     if (Py_IsInitialized() == 0) {
41       RETURN_STATUS_ERROR(StatusCode::kMDPythonInterpreterFailure, "[Internal ERROR] Python Interpreter is finalized.");
42     }
43     try {
44       py::bytes ret_py_obj = decrypt(path);
45       int64_t num_bytes = static_cast<int64_t>(len(ret_py_obj));
46       CHECK_FAIL_RETURN_UNEXPECTED(num_bytes < kDeMaxDim,
47                                    "The length of decrypted bytes returned by the decryption function exceeds the "
48                                    "maximum value of int64, check path: " +
49                                      path);
50       std::string ret_str = ret_py_obj;
51       RETURN_IF_NOT_OK(Tensor::CreateFromMemory(TensorShape{num_bytes}, DataType(DataType::DE_UINT8),
52                                                 reinterpret_cast<const uchar *>(ret_str.c_str()), num_bytes, tensor));
53     } catch (const py::error_already_set &e) {
54       RETURN_STATUS_ERROR(StatusCode::kMDPyFuncException, e.what());
55     }
56   }
57   return Status::OK();
58 }
59 #endif
60 
61 // Main logic, Register Queue with TaskGroup, launch all threads and do the functor's work
operator ()()62 Status MappableLeafOp::operator()() {
63   // Registering and launching worker threads have to be before in sync with caller (i.e., before FindMe()::Post())
64   RETURN_IF_NOT_OK(RegisterAndLaunchThreads());
65   // Initialize callback
66   RETURN_IF_NOT_OK(callback_manager_.Init(this));
67   // Synchronize with TaskManager
68   TaskManager::FindMe()->Post();
69   RETURN_IF_NOT_OK(InitOp());
70 
71   int64_t ep_step = 0, total_step = 0;
72   RETURN_IF_NOT_OK(callback_manager_.Begin(CallbackParam(0, ep_step, total_step)));
73   TensorRow sample_row;
74   RETURN_IF_NOT_OK(sampler_->GetNextSample(&sample_row));
75   for (;;) {  // each iteration is 1 repeat (usually =1 epoch, unless we have a repeat node above us), breaks when
76               // IsLastIteration() is true
77     if (op_current_repeats_ % GetOpNumRepeatsPerEpoch() == 0) {
78       ep_step = 0;
79       RETURN_IF_NOT_OK(callback_manager_.EpochBegin(CallbackParam(op_current_epochs_ + 1, ep_step, total_step)));
80     }
81     while (sample_row.eoe() == false) {
82       std::shared_ptr<Tensor> sample_ids = sample_row[0];
83       for (auto itr = sample_ids->begin<int64_t>(); itr != sample_ids->end<int64_t>(); ++itr) {
84         if ((*itr) >= num_rows_) {
85           MS_LOG(WARNING) << "Skipping sample with ID: " << *itr << " since it is out of bound: " << num_rows_;
86           continue;  // index out of bound, skipping
87         }
88         ep_step++;
89         total_step++;
90         RETURN_IF_NOT_OK(callback_manager_.StepBegin(CallbackParam(op_current_epochs_ + 1, ep_step, total_step)));
91         RETURN_IF_NOT_OK(worker_in_queues_[NextWorkerID()]->Add(std::make_unique<IOBlock>(*itr, IOBlock::kFlagNone)));
92       }
93       RETURN_IF_NOT_OK(sampler_->GetNextSample(&sample_row));
94     }
95     RETURN_IF_NOT_OK(worker_in_queues_[NextWorkerID()]->Add(std::make_unique<IOBlock>(IOBlock::kFlagEOE)));
96     if (!IsLastIteration()) {
97       // If not the last repeat, self-reset and go to loop again.
98       RETURN_IF_NOT_OK(Reset());
99       RETURN_IF_NOT_OK(sampler_->GetNextSample(&sample_row));
100     } else {
101       break;
102     }
103     UpdateRepeatAndEpochCounter();
104   }
105   RETURN_IF_NOT_OK(worker_in_queues_[NextWorkerID()]->Add(std::make_unique<IOBlock>(IOBlock::kFlagEOF)));
106   for (int32_t i = 0; i < num_workers_; ++i) {
107     RETURN_IF_NOT_OK(SendQuitFlagToWorker(NextWorkerID()));
108   }
109   return Status::OK();
110 }
111 
112 // Reset Sampler and wakeup Master thread (functor)
Reset()113 Status MappableLeafOp::Reset() {
114   MS_LOG(DEBUG) << Name() << " performing a self-reset.";
115   RETURN_IF_NOT_OK(sampler_->ResetSampler());
116   curr_row_ = 0;
117   return Status::OK();
118 }
119 
120 // hand shake with Sampler, allow Sampler to call RandomAccessOp's functions to get NumRows
InitSampler()121 Status MappableLeafOp::InitSampler() {
122   // Let the sampler know if we are resetting the pipeline to a specific epoch (op_current_repeats_ > 0)
123   // to mimic the behaviour in that state and have repeatability.
124   // Note that number of repeats is used since in each epoch we may reset sampler multiple times.
125   return sampler_->HandshakeRandomAccessOp(this, op_current_repeats_);
126 }
127 
128 // contains the main logic of pulling a IOBlock from IOBlockQueue, load a row and push the row to out_connector_
129 // IMPORTANT: 1 IOBlock produces 1 row
WorkerEntry(int32_t worker_id)130 Status MappableLeafOp::WorkerEntry(int32_t worker_id) {
131   TaskManager::FindMe()->Post();
132   std::unique_ptr<IOBlock> io_block;
133 
134   RETURN_IF_NOT_OK(CollectOpInfoStart(this->NameWithID(), "WorkerGet"));
135   RETURN_IF_NOT_OK(worker_in_queues_[worker_id]->PopFront(&io_block));
136   RETURN_IF_NOT_OK(CollectOpInfoEnd(this->NameWithID(), "WorkerGet", {{"TensorRowFlags", io_block->FlagName()}}));
137   RETURN_IF_NOT_OK(CollectOpInfoStart(this->NameWithID(), "WorkerProcess"));
138 
139   while (io_block != nullptr) {
140     if (io_block->wait()) {
141       RETURN_IF_NOT_OK(
142         CollectOpInfoEnd(this->NameWithID(), "WorkerProcess", {{"TensorRowFlags", io_block->FlagName()}}));
143       RETURN_IF_NOT_OK(worker_out_queues_[worker_id]->EmplaceBack(TensorRow(TensorRow::TensorRowFlags::kFlagWait)));
144       RETURN_IF_NOT_OK(TaskManager::FindMe()->Wait());  // wait for auto tune update workers successful
145       TaskManager::FindMe()->Clear();
146     } else if (io_block->eoe()) {
147       RETURN_IF_NOT_OK(
148         CollectOpInfoEnd(this->NameWithID(), "WorkerProcess", {{"TensorRowFlags", io_block->FlagName()}}));
149       RETURN_IF_NOT_OK(worker_out_queues_[worker_id]->EmplaceBack(TensorRow(TensorRow::TensorRowFlags::kFlagEOE)));
150     } else if (io_block->eof()) {
151       RETURN_IF_NOT_OK(
152         CollectOpInfoEnd(this->NameWithID(), "WorkerProcess", {{"TensorRowFlags", io_block->FlagName()}}));
153       RETURN_IF_NOT_OK(worker_out_queues_[worker_id]->EmplaceBack(TensorRow(TensorRow::TensorRowFlags::kFlagEOF)));
154     } else {
155       std::vector<int64_t> keys;
156       RETURN_IF_NOT_OK(io_block->GetKeys(&keys));
157       if (keys.empty()) {
158         RETURN_IF_NOT_OK(CollectOpInfoEnd(this->NameWithID(), "WorkerProcess",
159                                           {{"TensorRowFlags", IOBlock(IOBlock::kFlagQuit).FlagName()}}));
160         return Status::OK();  // empty key is a quit signal for workers
161       }
162       TensorRow trow;
163       RETURN_IF_NOT_OK(this->LoadTensorRow(keys[0], &trow));
164       RETURN_IF_NOT_OK(
165         CollectOpInfoEnd(this->NameWithID(), "WorkerProcess", {{"TensorRowFlags", io_block->FlagName()}}));
166       RETURN_IF_NOT_OK(worker_out_queues_[worker_id]->EmplaceBack(std::move(trow)));
167     }
168     RETURN_IF_NOT_OK(CollectOpInfoStart(this->NameWithID(), "WorkerGet"));
169     RETURN_IF_NOT_OK(worker_in_queues_[worker_id]->PopFront(&io_block));
170     RETURN_IF_NOT_OK(CollectOpInfoEnd(this->NameWithID(), "WorkerGet", {{"TensorRowFlags", io_block->FlagName()}}));
171     RETURN_IF_NOT_OK(CollectOpInfoStart(this->NameWithID(), "WorkerProcess"));
172   }
173   RETURN_STATUS_UNEXPECTED("[Internal ERROR] Unexpected nullptr received in worker.");
174 }
175 
SendWaitFlagToWorker(int32_t worker_id)176 Status MappableLeafOp::SendWaitFlagToWorker(int32_t worker_id) {
177   RETURN_IF_NOT_OK(worker_in_queues_[worker_id]->Add(std::make_unique<IOBlock>(IOBlock::kFlagWait)));
178   return Status::OK();
179 }
180 
SendQuitFlagToWorker(int32_t worker_id)181 Status MappableLeafOp::SendQuitFlagToWorker(int32_t worker_id) {
182   RETURN_IF_NOT_OK(worker_in_queues_[worker_id]->Add(std::make_unique<IOBlock>(IOBlock::kFlagQuit)));
183   return Status::OK();
184 }
185 
GetNextRowPullMode(TensorRow * const row)186 Status MappableLeafOp::GetNextRowPullMode(TensorRow *const row) {
187   RETURN_UNEXPECTED_IF_NULL(row);
188   row->clear();
189   if (!prepared_data_) {
190     RETURN_IF_NOT_OK(InitPullMode());
191     prepared_data_ = true;
192   }
193   if (eof_handled_) {
194     *row = TensorRow(TensorRow::kFlagEOF);
195     return Status::OK();
196   }
197   TensorRow sample_row;
198   if (sample_ids_ == nullptr) {
199     RETURN_IF_NOT_OK(this->InitSampler());
200     RETURN_IF_NOT_OK(sampler_->GetNextSample(&sample_row));
201     CHECK_FAIL_RETURN_UNEXPECTED(sample_row.size() > 0, "GetNextRowPullMode: Expect at least one sample in sampler.");
202     sample_ids_ = sample_row[0];
203     MS_LOG(DEBUG) << "Set sample_ids_=" << (*sample_ids_);
204   }
205   if (curr_row_ + 1 > sample_ids_->Size()) {
206     *row = TensorRow(TensorRow::kFlagEOE);
207     RETURN_IF_NOT_OK(ResetAndUpdateRepeat());
208     return Status::OK();
209   }
210   int64_t key;
211   RETURN_IF_NOT_OK(sample_ids_->GetItemAt(&key, {curr_row_}));
212   MS_LOG(DEBUG) << "Got key=" << key << " with curr_row_=" << curr_row_;
213   RETURN_IF_NOT_OK(LoadTensorRowPullMode(key, row));
214   curr_row_++;
215   return Status::OK();
216 }
217 
ResetAndUpdateRepeat()218 Status MappableLeafOp::ResetAndUpdateRepeat() {
219   if (!IsLastIteration()) {
220     RETURN_IF_NOT_OK(MappableLeafOp::Reset());
221     TensorRow sample_row;
222     RETURN_IF_NOT_OK(sampler_->GetNextSample(&sample_row));
223     if (sample_row.eoe()) {
224       return Status::OK();
225     }
226     CHECK_FAIL_RETURN_UNEXPECTED(sample_row.size() > 0, "GetNextRowPullMode: Expect at least one sample in sampler.");
227     // Get sample_ids
228     sample_ids_ = sample_row[0];
229     MS_LOG(DEBUG) << "Set sample_ids_=" << (*sample_ids_);
230     UpdateRepeatAndEpochCounter();
231   } else {
232     eof_handled_ = true;
233   }
234   return Status::OK();
235 }
236 }  // namespace dataset
237 }  // namespace mindspore
238