1 /**
2 * Copyright 2021-2022 Huawei Technologies Co., Ltd
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 #include "minddata/dataset/engine/datasetops/source/mappable_leaf_op.h"
17 #include "utils/ms_utils.h"
18 #include "minddata/dataset/core/config_manager.h"
19 #include "minddata/dataset/engine/datasetops/source/sampler/sequential_sampler.h"
20 #include "minddata/dataset/engine/execution_tree.h"
21
22 namespace mindspore {
23 namespace dataset {
MappableLeafOp(int32_t num_wkrs,int32_t queue_size,std::shared_ptr<SamplerRT> sampler)24 MappableLeafOp::MappableLeafOp(int32_t num_wkrs, int32_t queue_size, std::shared_ptr<SamplerRT> sampler)
25 : ParallelOp(num_wkrs, queue_size, std::move(sampler)),
26 sample_ids_(nullptr),
27 curr_row_(0),
28 prepared_data_{false},
29 eof_handled_{false} {}
30
31 #ifdef ENABLE_PYTHON
ImageDecrypt(const std::string & path,std::shared_ptr<Tensor> * tensor,const py::function & decrypt)32 Status MappableLeafOp::ImageDecrypt(const std::string &path, std::shared_ptr<Tensor> *tensor,
33 const py::function &decrypt) {
34 RETURN_UNEXPECTED_IF_NULL(tensor);
35 if (decrypt == nullptr || py::isinstance<py::none>(decrypt)) {
36 RETURN_IF_NOT_OK(Tensor::CreateFromFile(path, tensor));
37 } else {
38 // Acquire Python GIL
39 py::gil_scoped_acquire gil_acquire;
40 if (Py_IsInitialized() == 0) {
41 RETURN_STATUS_ERROR(StatusCode::kMDPythonInterpreterFailure, "[Internal ERROR] Python Interpreter is finalized.");
42 }
43 try {
44 py::bytes ret_py_obj = decrypt(path);
45 int64_t num_bytes = static_cast<int64_t>(len(ret_py_obj));
46 CHECK_FAIL_RETURN_UNEXPECTED(num_bytes < kDeMaxDim,
47 "The length of decrypted bytes returned by the decryption function exceeds the "
48 "maximum value of int64, check path: " +
49 path);
50 std::string ret_str = ret_py_obj;
51 RETURN_IF_NOT_OK(Tensor::CreateFromMemory(TensorShape{num_bytes}, DataType(DataType::DE_UINT8),
52 reinterpret_cast<const uchar *>(ret_str.c_str()), num_bytes, tensor));
53 } catch (const py::error_already_set &e) {
54 RETURN_STATUS_ERROR(StatusCode::kMDPyFuncException, e.what());
55 }
56 }
57 return Status::OK();
58 }
59 #endif
60
61 // Main logic, Register Queue with TaskGroup, launch all threads and do the functor's work
operator ()()62 Status MappableLeafOp::operator()() {
63 // Registering and launching worker threads have to be before in sync with caller (i.e., before FindMe()::Post())
64 RETURN_IF_NOT_OK(RegisterAndLaunchThreads());
65 // Initialize callback
66 RETURN_IF_NOT_OK(callback_manager_.Init(this));
67 // Synchronize with TaskManager
68 TaskManager::FindMe()->Post();
69 RETURN_IF_NOT_OK(InitOp());
70
71 int64_t ep_step = 0, total_step = 0;
72 RETURN_IF_NOT_OK(callback_manager_.Begin(CallbackParam(0, ep_step, total_step)));
73 TensorRow sample_row;
74 RETURN_IF_NOT_OK(sampler_->GetNextSample(&sample_row));
75 for (;;) { // each iteration is 1 repeat (usually =1 epoch, unless we have a repeat node above us), breaks when
76 // IsLastIteration() is true
77 if (op_current_repeats_ % GetOpNumRepeatsPerEpoch() == 0) {
78 ep_step = 0;
79 RETURN_IF_NOT_OK(callback_manager_.EpochBegin(CallbackParam(op_current_epochs_ + 1, ep_step, total_step)));
80 }
81 while (sample_row.eoe() == false) {
82 std::shared_ptr<Tensor> sample_ids = sample_row[0];
83 for (auto itr = sample_ids->begin<int64_t>(); itr != sample_ids->end<int64_t>(); ++itr) {
84 if ((*itr) >= num_rows_) {
85 MS_LOG(WARNING) << "Skipping sample with ID: " << *itr << " since it is out of bound: " << num_rows_;
86 continue; // index out of bound, skipping
87 }
88 ep_step++;
89 total_step++;
90 RETURN_IF_NOT_OK(callback_manager_.StepBegin(CallbackParam(op_current_epochs_ + 1, ep_step, total_step)));
91 RETURN_IF_NOT_OK(worker_in_queues_[NextWorkerID()]->Add(std::make_unique<IOBlock>(*itr, IOBlock::kFlagNone)));
92 }
93 RETURN_IF_NOT_OK(sampler_->GetNextSample(&sample_row));
94 }
95 RETURN_IF_NOT_OK(worker_in_queues_[NextWorkerID()]->Add(std::make_unique<IOBlock>(IOBlock::kFlagEOE)));
96 if (!IsLastIteration()) {
97 // If not the last repeat, self-reset and go to loop again.
98 RETURN_IF_NOT_OK(Reset());
99 RETURN_IF_NOT_OK(sampler_->GetNextSample(&sample_row));
100 } else {
101 break;
102 }
103 UpdateRepeatAndEpochCounter();
104 }
105 RETURN_IF_NOT_OK(worker_in_queues_[NextWorkerID()]->Add(std::make_unique<IOBlock>(IOBlock::kFlagEOF)));
106 for (int32_t i = 0; i < num_workers_; ++i) {
107 RETURN_IF_NOT_OK(SendQuitFlagToWorker(NextWorkerID()));
108 }
109 return Status::OK();
110 }
111
112 // Reset Sampler and wakeup Master thread (functor)
Reset()113 Status MappableLeafOp::Reset() {
114 MS_LOG(DEBUG) << Name() << " performing a self-reset.";
115 RETURN_IF_NOT_OK(sampler_->ResetSampler());
116 curr_row_ = 0;
117 return Status::OK();
118 }
119
120 // hand shake with Sampler, allow Sampler to call RandomAccessOp's functions to get NumRows
InitSampler()121 Status MappableLeafOp::InitSampler() {
122 // Let the sampler know if we are resetting the pipeline to a specific epoch (op_current_repeats_ > 0)
123 // to mimic the behaviour in that state and have repeatability.
124 // Note that number of repeats is used since in each epoch we may reset sampler multiple times.
125 return sampler_->HandshakeRandomAccessOp(this, op_current_repeats_);
126 }
127
128 // contains the main logic of pulling a IOBlock from IOBlockQueue, load a row and push the row to out_connector_
129 // IMPORTANT: 1 IOBlock produces 1 row
WorkerEntry(int32_t worker_id)130 Status MappableLeafOp::WorkerEntry(int32_t worker_id) {
131 TaskManager::FindMe()->Post();
132 std::unique_ptr<IOBlock> io_block;
133
134 RETURN_IF_NOT_OK(CollectOpInfoStart(this->NameWithID(), "WorkerGet"));
135 RETURN_IF_NOT_OK(worker_in_queues_[worker_id]->PopFront(&io_block));
136 RETURN_IF_NOT_OK(CollectOpInfoEnd(this->NameWithID(), "WorkerGet", {{"TensorRowFlags", io_block->FlagName()}}));
137 RETURN_IF_NOT_OK(CollectOpInfoStart(this->NameWithID(), "WorkerProcess"));
138
139 while (io_block != nullptr) {
140 if (io_block->wait()) {
141 RETURN_IF_NOT_OK(
142 CollectOpInfoEnd(this->NameWithID(), "WorkerProcess", {{"TensorRowFlags", io_block->FlagName()}}));
143 RETURN_IF_NOT_OK(worker_out_queues_[worker_id]->EmplaceBack(TensorRow(TensorRow::TensorRowFlags::kFlagWait)));
144 RETURN_IF_NOT_OK(TaskManager::FindMe()->Wait()); // wait for auto tune update workers successful
145 TaskManager::FindMe()->Clear();
146 } else if (io_block->eoe()) {
147 RETURN_IF_NOT_OK(
148 CollectOpInfoEnd(this->NameWithID(), "WorkerProcess", {{"TensorRowFlags", io_block->FlagName()}}));
149 RETURN_IF_NOT_OK(worker_out_queues_[worker_id]->EmplaceBack(TensorRow(TensorRow::TensorRowFlags::kFlagEOE)));
150 } else if (io_block->eof()) {
151 RETURN_IF_NOT_OK(
152 CollectOpInfoEnd(this->NameWithID(), "WorkerProcess", {{"TensorRowFlags", io_block->FlagName()}}));
153 RETURN_IF_NOT_OK(worker_out_queues_[worker_id]->EmplaceBack(TensorRow(TensorRow::TensorRowFlags::kFlagEOF)));
154 } else {
155 std::vector<int64_t> keys;
156 RETURN_IF_NOT_OK(io_block->GetKeys(&keys));
157 if (keys.empty()) {
158 RETURN_IF_NOT_OK(CollectOpInfoEnd(this->NameWithID(), "WorkerProcess",
159 {{"TensorRowFlags", IOBlock(IOBlock::kFlagQuit).FlagName()}}));
160 return Status::OK(); // empty key is a quit signal for workers
161 }
162 TensorRow trow;
163 RETURN_IF_NOT_OK(this->LoadTensorRow(keys[0], &trow));
164 RETURN_IF_NOT_OK(
165 CollectOpInfoEnd(this->NameWithID(), "WorkerProcess", {{"TensorRowFlags", io_block->FlagName()}}));
166 RETURN_IF_NOT_OK(worker_out_queues_[worker_id]->EmplaceBack(std::move(trow)));
167 }
168 RETURN_IF_NOT_OK(CollectOpInfoStart(this->NameWithID(), "WorkerGet"));
169 RETURN_IF_NOT_OK(worker_in_queues_[worker_id]->PopFront(&io_block));
170 RETURN_IF_NOT_OK(CollectOpInfoEnd(this->NameWithID(), "WorkerGet", {{"TensorRowFlags", io_block->FlagName()}}));
171 RETURN_IF_NOT_OK(CollectOpInfoStart(this->NameWithID(), "WorkerProcess"));
172 }
173 RETURN_STATUS_UNEXPECTED("[Internal ERROR] Unexpected nullptr received in worker.");
174 }
175
SendWaitFlagToWorker(int32_t worker_id)176 Status MappableLeafOp::SendWaitFlagToWorker(int32_t worker_id) {
177 RETURN_IF_NOT_OK(worker_in_queues_[worker_id]->Add(std::make_unique<IOBlock>(IOBlock::kFlagWait)));
178 return Status::OK();
179 }
180
SendQuitFlagToWorker(int32_t worker_id)181 Status MappableLeafOp::SendQuitFlagToWorker(int32_t worker_id) {
182 RETURN_IF_NOT_OK(worker_in_queues_[worker_id]->Add(std::make_unique<IOBlock>(IOBlock::kFlagQuit)));
183 return Status::OK();
184 }
185
GetNextRowPullMode(TensorRow * const row)186 Status MappableLeafOp::GetNextRowPullMode(TensorRow *const row) {
187 RETURN_UNEXPECTED_IF_NULL(row);
188 row->clear();
189 if (!prepared_data_) {
190 RETURN_IF_NOT_OK(InitPullMode());
191 prepared_data_ = true;
192 }
193 if (eof_handled_) {
194 *row = TensorRow(TensorRow::kFlagEOF);
195 return Status::OK();
196 }
197 TensorRow sample_row;
198 if (sample_ids_ == nullptr) {
199 RETURN_IF_NOT_OK(this->InitSampler());
200 RETURN_IF_NOT_OK(sampler_->GetNextSample(&sample_row));
201 CHECK_FAIL_RETURN_UNEXPECTED(sample_row.size() > 0, "GetNextRowPullMode: Expect at least one sample in sampler.");
202 sample_ids_ = sample_row[0];
203 MS_LOG(DEBUG) << "Set sample_ids_=" << (*sample_ids_);
204 }
205 if (curr_row_ + 1 > sample_ids_->Size()) {
206 *row = TensorRow(TensorRow::kFlagEOE);
207 RETURN_IF_NOT_OK(ResetAndUpdateRepeat());
208 return Status::OK();
209 }
210 int64_t key;
211 RETURN_IF_NOT_OK(sample_ids_->GetItemAt(&key, {curr_row_}));
212 MS_LOG(DEBUG) << "Got key=" << key << " with curr_row_=" << curr_row_;
213 RETURN_IF_NOT_OK(LoadTensorRowPullMode(key, row));
214 curr_row_++;
215 return Status::OK();
216 }
217
ResetAndUpdateRepeat()218 Status MappableLeafOp::ResetAndUpdateRepeat() {
219 if (!IsLastIteration()) {
220 RETURN_IF_NOT_OK(MappableLeafOp::Reset());
221 TensorRow sample_row;
222 RETURN_IF_NOT_OK(sampler_->GetNextSample(&sample_row));
223 if (sample_row.eoe()) {
224 return Status::OK();
225 }
226 CHECK_FAIL_RETURN_UNEXPECTED(sample_row.size() > 0, "GetNextRowPullMode: Expect at least one sample in sampler.");
227 // Get sample_ids
228 sample_ids_ = sample_row[0];
229 MS_LOG(DEBUG) << "Set sample_ids_=" << (*sample_ids_);
230 UpdateRepeatAndEpochCounter();
231 } else {
232 eof_handled_ = true;
233 }
234 return Status::OK();
235 }
236 } // namespace dataset
237 } // namespace mindspore
238