1 /** 2 * Copyright 2020-2021 Huawei Technologies Co., Ltd 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 #ifndef MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_DATASETOPS_CACHE_MERGE_OP_H_ 17 #define MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_DATASETOPS_CACHE_MERGE_OP_H_ 18 19 #include <algorithm> 20 #include <atomic> 21 #include <deque> 22 #include <map> 23 #include <memory> 24 #include <mutex> 25 #include <string> 26 #include <utility> 27 #include "minddata/dataset/core/tensor_row.h" 28 #include "minddata/dataset/engine/cache/cache_client.h" 29 #include "minddata/dataset/engine/datasetops/parallel_op.h" 30 #include "minddata/dataset/engine/dataset_iterator.h" 31 #include "minddata/dataset/util/queue.h" 32 #include "minddata/dataset/util/queue_map.h" 33 #include "minddata/dataset/util/semaphore.h" 34 35 namespace mindspore { 36 namespace dataset { 37 /// \brief Provides method to merge two streams (one from CacheLookup and one from cache miss stream) into one single 38 /// stream 39 class CacheMergeOp : public ParallelOp<TensorRow, TensorRow> { 40 public: 41 // Some handshake structures between CacheMissWorkerEntry and Cleaner 42 class TensorRowCacheRequest { 43 public: 44 enum class State : uint8_t { 45 kEmpty = 0, // Initial state. Row hasn't arrived from cache miss stream yet. 46 kDirty = 1, // Cleaner hasn't flushed it to the cache server yet. 47 kClean = 2 // The row has been flushed already. 48 }; TensorRowCacheRequest()49 TensorRowCacheRequest() : st_(State::kEmpty) {} 50 ~TensorRowCacheRequest() = default; 51 /// Getter and Setter of the state GetState()52 State GetState() const { return st_; } SetState(State newState)53 void SetState(State newState) { st_ = newState; } 54 /// Take a tensor row and send rpc call to the server async 55 /// \param cc Cache client of the CacheMergeOp 56 /// \param row TensorRow to be sent to the server 57 /// \return Status object 58 /// \note Thread safe 59 Status AsyncSendCacheRequest(const std::shared_ptr<CacheClient> &cc, const TensorRow &row); 60 61 /// \brief We send the row to the server async so the CacheMissWorkerEntry can continue. 62 /// It is the cleaner that will check the result. 63 /// \return Status object 64 Status CheckCacheResult(); 65 66 private: 67 std::atomic<State> st_; 68 std::shared_ptr<CacheRowRequest> cleaner_copy_; 69 }; 70 71 constexpr static int kNumChildren = 2; // CacheMergeOp has 2 children 72 constexpr static int kCacheHitChildIdx = 0; // Cache hit stream 73 constexpr static int kCacheMissChildIdx = 1; // Cache miss stream 74 75 /// \brief Constructor 76 /// \param numWorkers Number of parallel workers as a derived class of ParallelOp 77 /// \param opConnector Size Connector size as a derived class of ParallelOp 78 /// \param numCleaners Number of cleaners to move cache miss rows into the cache server 79 /// \param cache_client CacheClient to communicate with the Cache server 80 CacheMergeOp(int32_t numWorkers, int32_t opConnectorSize, int32_t numCleaners, 81 std::shared_ptr<CacheClient> cache_client); 82 ~CacheMergeOp(); 83 void Print(std::ostream &out, bool show_all) const override; Name()84 std::string Name() const override { return kCacheMergeOp; } 85 86 friend std::ostream &operator<<(std::ostream &out, const CacheMergeOp &mo) { 87 mo.Print(out, false); 88 return out; 89 } 90 /// \brief Master thread responsible to spawn all the necessary worker threads for the two streams and 91 /// the threads for the cleaners. 92 /// \return 93 Status operator()() override; 94 95 /// \brief Entry function for worker thread that fetch rows from CacheLookupOp 96 /// \param workerId 97 /// \return Status object 98 Status WorkerEntry(int32_t workerId) override; 99 100 /// \brief Perform specific post-operations on CacheOp 101 /// \return Status The status code returned 102 Status PrepareOperator() override; 103 104 /// \brief Main thread to fetch rows from the miss child and assign it to workers 105 /// \return Status The status code returned 106 Status CacheMissMaster(); 107 108 /// \brief Entry function for worker thread that fetch rows from the cache miss stream 109 /// \param workerId 110 /// \return Status object 111 Status CacheMissWorkerEntry(int32_t workerId); 112 113 /// \brief Base-class override for eoe handling 114 /// \param worker_id 115 /// \return Status object 116 Status EoeReceived(int32_t worker_id) override; 117 118 /// \brief Base-class override for handling cases when an eof is received. 119 /// \param worker_id - The worker id 120 /// \return Status The status code returned 121 Status EofReceived(int32_t worker_id) override; 122 123 protected: 124 Status ComputeColMap() override; 125 126 private: 127 std::mutex mux_; 128 QueueMap<row_id_type, TensorRow> cache_miss_; 129 std::map<row_id_type, MemGuard<TensorRowCacheRequest, Allocator<TensorRowCacheRequest>>> io_request_; 130 std::unique_ptr<Queue<row_id_type>> io_que_; 131 int32_t num_cleaners_; 132 std::shared_ptr<CacheClient> cache_client_; 133 std::atomic<bool> cache_missing_rows_; 134 135 QueueList<TensorRow> missWorkers_in_queues_; 136 137 /// \brief Locate the cache request from the io_request_ map 138 /// \param row_id 139 /// \param out pointer to the cache request 140 /// \return Status object 141 Status GetRq(row_id_type row_id, TensorRowCacheRequest **out); 142 143 /// \brief These are the entry functions for the cleaner threads. Each cleaner is responsible for 144 /// moving cache miss TensorRow into the CacheServer. 145 /// \return Status object 146 Status Cleaner(); 147 }; 148 } // namespace dataset 149 } // namespace mindspore 150 #endif // MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_DATASETOPS_CACHE_MERGE_OP_H_ 151