1 /** 2 * Copyright 2020-2021 Huawei Technologies Co., Ltd 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 #ifndef MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_DATASETOPS_CACHE_OP_H_ 17 #define MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_DATASETOPS_CACHE_OP_H_ 18 19 #include <atomic> 20 #include <string> 21 #include <utility> 22 #include <memory> 23 #include "minddata/dataset/engine/datasetops/cache_base_op.h" 24 25 namespace mindspore { 26 namespace dataset { 27 /// \brief CacheOp provides a memory/disk cache that acts as a save-point within a non-mappable dataset. 28 /// \note For mappable dataset, please see CacheLookupOp. 29 /// \see CacheLookupOp 30 class CacheOp : public CacheBase, public RandomAccessOp { 31 public: 32 // This CacheOp is for non-mappable case where it is divided into two phases. 33 // The first phase is we cache all the rows from the child (and let the cache server 34 // assigns row id). No read access in the first phase. Once the cache is fully built, 35 // we switch to second phase and fetch requests from the sampler. 36 enum class Phase : uint8_t { kBuildPhase = 0, kFetchPhase = 1 }; 37 constexpr static int32_t kPhaseCheckIntervalInMilliSec = 100; 38 39 /// \brief Constructor of CacheOp 40 /// \note The builder class should be used to call it. 41 /// \param num_workers The number of worker threads. 42 /// \param op_connector_size The size of each queue in the connector. 43 CacheOp(int32_t num_workers, int32_t op_connector_size, std::shared_ptr<CacheClient> cache_client, 44 std::shared_ptr<SamplerRT> sampler); 45 46 // Destructor 47 ~CacheOp(); 48 49 /// \brief Base-class override for special eoe handler. 50 /// \notes CacheOp must override this because it shall not perform default handling of eoe. Instead 51 /// the CacheOp manages actions related to the end of the epoch. 52 /// \return Status The status code returned 53 Status EoeReceived(int32_t worker_id) override; 54 55 /// \brief Base-class override for handling cases when an eof is received. 56 /// \param worker_id - The worker id 57 /// \return Status The status code returned 58 Status EofReceived(int32_t worker_id) override; 59 60 // \brief Class functor operator (). 61 /// \return Status The status code returned 62 Status operator()() override; 63 64 /// \brief Entry function for worker thread that fetch rows from CacheLookupOp 65 /// \param workerId 66 /// \return Status The status code returned 67 Status WorkerEntry(int32_t worker_id) override; 68 69 /// \brief Base-class override for handling cases if we allow cache miss. AllowCacheMiss()70 bool AllowCacheMiss() override { return false; } 71 72 /// \brief Base-class override for the name of this operator. Name()73 std::string Name() const override { return kCacheOp; } 74 75 /// \brief Perform specific post-operations on CacheOp 76 /// \return Status The status code returned 77 Status PrepareOperator() override; 78 79 private: 80 WaitPost rows_cache_done_; 81 std::atomic<int64_t> num_guys_in_; 82 Phase phase_; 83 84 QueueList<TensorRow> cache_workers_in_queue_; 85 /// \brief The main thread will wait until all the rows are cached and will start the handshake with the sampler. 86 /// \return Status object 87 Status WaitForCachingAllRows(); 88 89 Status CacheAllRowsMaster(); 90 91 /// \brief For non-mappable dataset, there is a build phase where we cache all the rows. 92 /// \return Status object 93 Status CacheAllRows(int32_t worker_id); 94 Status RegisterResources() override; 95 }; 96 } // namespace dataset 97 } // namespace mindspore 98 99 #endif // MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_DATASETOPS_CACHE_OP_H_ 100