• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2021 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #ifndef MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_DATASETOPS_SOURCE_MAPPABLE_LEAF_OP_H_
17 #define MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_DATASETOPS_SOURCE_MAPPABLE_LEAF_OP_H_
18 
19 #include <deque>
20 #include <memory>
21 #include <queue>
22 #include <string>
23 #include <algorithm>
24 #include <map>
25 #include <set>
26 #include <utility>
27 #include <vector>
28 #include "minddata/dataset/core/tensor.h"
29 
30 #include "minddata/dataset/engine/data_schema.h"
31 #include "minddata/dataset/engine/datasetops/parallel_op.h"
32 #include "minddata/dataset/engine/datasetops/source/sampler/sampler.h"
33 #ifndef ENABLE_ANDROID
34 #include "minddata/dataset/kernels/image/image_utils.h"
35 #else
36 #include "minddata/dataset/kernels/image/lite_image_utils.h"
37 #endif
38 #include "minddata/dataset/util/path.h"
39 #include "minddata/dataset/util/queue.h"
40 #include "minddata/dataset/util/services.h"
41 #include "minddata/dataset/util/status.h"
42 #include "minddata/dataset/util/wait_post.h"
43 
44 namespace mindspore {
45 namespace dataset {
46 // Forward declares
47 template <typename T>
48 class Queue;
49 
50 class MappableLeafOp : public ParallelOp, public RandomAccessOp {
51  public:
52   /// Constructor
53   /// \param int32_t num_wkrs - Num of workers reading images in parallel
54   /// \param int32_t queue_size - connector queue size
55   /// \param td::unique_ptr<Sampler> sampler - sampler tells the source  what to read
56   MappableLeafOp(int32_t num_wkrs, int32_t queue_size, std::shared_ptr<SamplerRT> sampler);
57 
58   /// Destructor.
59   ~MappableLeafOp() = default;
60 
61   /// Main Loop of MappableLeaf
62   /// Master thread: Fill IOBlockQueue, then goes to sleep
63   /// Worker thread: pulls IOBlock from IOBlockQueue, work on it then put row to out_connector_
64   /// \return Status The status code returned
65   Status operator()() override;
66 
67   /// Op name getter
68   /// @return Name of the current Op
Name()69   std::string Name() const override { return "MappableLeafPp"; }
70 
71  protected:
72   /// Initialize Sampler, calls sampler->Init() within
73   /// @return Status The status code returned
74   Status InitSampler();
75 
76   /// Called first when function is called
77   /// \return Status The status code returned
78   virtual Status LaunchThreadsAndInitOp() = 0;
79 
80   /// Worker thread pulls a number of IOBlock from IOBlock Queue, make a row and push it to Connector
81   /// \param int32_t workerId - id of each worker
82   /// \return Status The status code returned
83   Status WorkerEntry(int32_t workerId) override;
84 
85   /// Virtual function to Load a tensor row at location row_id
86   /// \param row_id_type row_id - id for this tensor row
87   /// \param TensorRow row - loaded row
88   /// \return Status The status code returned
89   virtual Status LoadTensorRow(row_id_type row_id, TensorRow *row) = 0;
90 
91   /// Reset function to be called after every epoch to reset the source op after
92   /// \return Status The status code returned
93   Status Reset() override;
94 };
95 }  // namespace dataset
96 }  // namespace mindspore
97 #endif  // MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_DATASETOPS_SOURCE_MAPPABLE_LEAF_OP_H_
98