• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2020-2021 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #ifndef MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_DATASETOPS_CACHE_OP_H_
17 #define MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_DATASETOPS_CACHE_OP_H_
18 
19 #include <atomic>
20 #include <string>
21 #include <utility>
22 #include <memory>
23 #include "minddata/dataset/engine/datasetops/cache_base_op.h"
24 
25 namespace mindspore {
26 namespace dataset {
27 /// \brief CacheOp provides a memory/disk cache that acts as a save-point within a non-mappable dataset.
28 /// \note For mappable dataset, please see CacheLookupOp.
29 /// \see CacheLookupOp
30 class CacheOp : public CacheBase, public RandomAccessOp {
31  public:
32   // This CacheOp is for non-mappable case where it is divided into two phases.
33   // The first phase is we cache all the rows from the child (and let the cache server
34   // assigns row id). No read access in the first phase. Once the cache is fully built,
35   // we switch to second phase and fetch requests from the sampler.
36   enum class Phase : uint8_t { kBuildPhase = 0, kFetchPhase = 1 };
37   constexpr static int32_t kPhaseCheckIntervalInMilliSec = 100;
38 
39   /// \brief Constructor of CacheOp
40   /// \note The builder class should be used to call it.
41   /// \param num_workers The number of worker threads.
42   /// \param op_connector_size The size of each queue in the connector.
43   CacheOp(int32_t num_workers, int32_t op_connector_size, std::shared_ptr<CacheClient> cache_client,
44           std::shared_ptr<SamplerRT> sampler);
45 
46   // Destructor
47   ~CacheOp();
48 
49   /// \brief Base-class override for special eoe handler.
50   /// \notes CacheOp must override this because it shall not perform default handling of eoe. Instead
51   ///     the CacheOp manages actions related to the end of the epoch.
52   /// \return Status The status code returned
53   Status EoeReceived(int32_t worker_id) override;
54 
55   /// \brief Base-class override for handling cases when an eof is received.
56   /// \param worker_id - The worker id
57   /// \return Status The status code returned
58   Status EofReceived(int32_t worker_id) override;
59 
60   // \brief Class functor operator ().
61   /// \return Status The status code returned
62   Status operator()() override;
63 
64   /// \brief Entry function for worker thread that fetch rows from CacheLookupOp
65   /// \param workerId
66   /// \return Status The status code returned
67   Status WorkerEntry(int32_t worker_id) override;
68 
69   /// \brief Base-class override for handling cases if we allow cache miss.
AllowCacheMiss()70   bool AllowCacheMiss() override { return false; }
71 
72   /// \brief Base-class override for the name of this operator.
Name()73   std::string Name() const override { return kCacheOp; }
74 
75   /// \brief Perform specific post-operations on CacheOp
76   /// \return Status The status code returned
77   Status PrepareOperator() override;
78 
79  private:
80   WaitPost rows_cache_done_;
81   std::atomic<int64_t> num_guys_in_;
82   Phase phase_;
83 
84   QueueList<TensorRow> cache_workers_in_queue_;
85   /// \brief The main thread will wait until all the rows are cached and will start the handshake with the sampler.
86   /// \return Status object
87   Status WaitForCachingAllRows();
88 
89   Status CacheAllRowsMaster();
90 
91   /// \brief For non-mappable dataset, there is a build phase where we cache all the rows.
92   /// \return Status object
93   Status CacheAllRows(int32_t worker_id);
94   Status RegisterResources() override;
95 };
96 }  // namespace dataset
97 }  // namespace mindspore
98 
99 #endif  // MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_DATASETOPS_CACHE_OP_H_
100