1 /** 2 * Copyright 2021 Huawei Technologies Co., Ltd 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 #ifndef MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_DATASETOPS_SOURCE_MAPPABLE_LEAF_OP_H_ 17 #define MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_DATASETOPS_SOURCE_MAPPABLE_LEAF_OP_H_ 18 19 #include <deque> 20 #include <memory> 21 #include <queue> 22 #include <string> 23 #include <algorithm> 24 #include <map> 25 #include <set> 26 #include <utility> 27 #include <vector> 28 #include "minddata/dataset/core/tensor.h" 29 30 #include "minddata/dataset/engine/data_schema.h" 31 #include "minddata/dataset/engine/datasetops/parallel_op.h" 32 #include "minddata/dataset/engine/datasetops/source/sampler/sampler.h" 33 #ifndef ENABLE_ANDROID 34 #include "minddata/dataset/kernels/image/image_utils.h" 35 #else 36 #include "minddata/dataset/kernels/image/lite_image_utils.h" 37 #endif 38 #include "minddata/dataset/util/path.h" 39 #include "minddata/dataset/util/queue.h" 40 #include "minddata/dataset/util/services.h" 41 #include "minddata/dataset/util/status.h" 42 #include "minddata/dataset/util/wait_post.h" 43 44 namespace mindspore { 45 namespace dataset { 46 // Forward declares 47 template <typename T> 48 class Queue; 49 50 class MappableLeafOp : public ParallelOp, public RandomAccessOp { 51 public: 52 /// Constructor 53 /// \param int32_t num_wkrs - Num of workers reading images in parallel 54 /// \param int32_t queue_size - connector queue size 55 /// \param td::unique_ptr<Sampler> sampler - sampler tells the source what to read 56 MappableLeafOp(int32_t num_wkrs, int32_t queue_size, std::shared_ptr<SamplerRT> sampler); 57 58 /// Destructor. 59 ~MappableLeafOp() = default; 60 61 /// Main Loop of MappableLeaf 62 /// Master thread: Fill IOBlockQueue, then goes to sleep 63 /// Worker thread: pulls IOBlock from IOBlockQueue, work on it then put row to out_connector_ 64 /// \return Status The status code returned 65 Status operator()() override; 66 67 /// Op name getter 68 /// @return Name of the current Op Name()69 std::string Name() const override { return "MappableLeafPp"; } 70 71 protected: 72 /// Initialize Sampler, calls sampler->Init() within 73 /// @return Status The status code returned 74 Status InitSampler(); 75 76 /// Called first when function is called 77 /// \return Status The status code returned 78 virtual Status LaunchThreadsAndInitOp() = 0; 79 80 /// Worker thread pulls a number of IOBlock from IOBlock Queue, make a row and push it to Connector 81 /// \param int32_t workerId - id of each worker 82 /// \return Status The status code returned 83 Status WorkerEntry(int32_t workerId) override; 84 85 /// Virtual function to Load a tensor row at location row_id 86 /// \param row_id_type row_id - id for this tensor row 87 /// \param TensorRow row - loaded row 88 /// \return Status The status code returned 89 virtual Status LoadTensorRow(row_id_type row_id, TensorRow *row) = 0; 90 91 /// Reset function to be called after every epoch to reset the source op after 92 /// \return Status The status code returned 93 Status Reset() override; 94 }; 95 } // namespace dataset 96 } // namespace mindspore 97 #endif // MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_DATASETOPS_SOURCE_MAPPABLE_LEAF_OP_H_ 98