• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2020-2021 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #ifndef MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_DATASETOPS_CACHE_MERGE_OP_H_
17 #define MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_DATASETOPS_CACHE_MERGE_OP_H_
18 
19 #include <algorithm>
20 #include <atomic>
21 #include <deque>
22 #include <map>
23 #include <memory>
24 #include <mutex>
25 #include <string>
26 #include <utility>
27 #include "minddata/dataset/core/tensor_row.h"
28 #include "minddata/dataset/engine/cache/cache_client.h"
29 #include "minddata/dataset/engine/datasetops/parallel_op.h"
30 #include "minddata/dataset/engine/dataset_iterator.h"
31 #include "minddata/dataset/util/queue.h"
32 #include "minddata/dataset/util/queue_map.h"
33 #include "minddata/dataset/util/semaphore.h"
34 
35 namespace mindspore {
36 namespace dataset {
37 /// \brief Provides method to merge two streams (one from CacheLookup and one from cache miss stream) into one single
38 /// stream
39 class CacheMergeOp : public ParallelOp<TensorRow, TensorRow> {
40  public:
41   // Some handshake structures between CacheMissWorkerEntry and Cleaner
42   class TensorRowCacheRequest {
43    public:
44     enum class State : uint8_t {
45       kEmpty = 0,  // Initial state. Row hasn't arrived from cache miss stream yet.
46       kDirty = 1,  // Cleaner hasn't flushed it to the cache server yet.
47       kClean = 2   // The row has been flushed already.
48     };
TensorRowCacheRequest()49     TensorRowCacheRequest() : st_(State::kEmpty) {}
50     ~TensorRowCacheRequest() = default;
51     /// Getter and Setter of the state
GetState()52     State GetState() const { return st_; }
SetState(State newState)53     void SetState(State newState) { st_ = newState; }
54     /// Take a tensor row and send rpc call to the server async
55     /// \param cc Cache client of the CacheMergeOp
56     /// \param row TensorRow to be sent to the server
57     /// \return Status object
58     /// \note Thread safe
59     Status AsyncSendCacheRequest(const std::shared_ptr<CacheClient> &cc, const TensorRow &row);
60 
61     /// \brief We send the row to the server async so the CacheMissWorkerEntry can continue.
62     /// It is the cleaner that will check the result.
63     /// \return Status object
64     Status CheckCacheResult();
65 
66    private:
67     std::atomic<State> st_;
68     std::shared_ptr<CacheRowRequest> cleaner_copy_;
69   };
70 
71   constexpr static int kNumChildren = 2;        // CacheMergeOp has 2 children
72   constexpr static int kCacheHitChildIdx = 0;   // Cache hit stream
73   constexpr static int kCacheMissChildIdx = 1;  // Cache miss stream
74 
75   /// \brief Constructor
76   /// \param numWorkers Number of parallel workers as a derived class of ParallelOp
77   /// \param opConnector Size Connector size as a derived class of ParallelOp
78   /// \param numCleaners Number of cleaners to move cache miss rows into the cache server
79   /// \param cache_client CacheClient to communicate with the Cache server
80   CacheMergeOp(int32_t numWorkers, int32_t opConnectorSize, int32_t numCleaners,
81                std::shared_ptr<CacheClient> cache_client);
82   ~CacheMergeOp();
83   void Print(std::ostream &out, bool show_all) const override;
Name()84   std::string Name() const override { return kCacheMergeOp; }
85 
86   friend std::ostream &operator<<(std::ostream &out, const CacheMergeOp &mo) {
87     mo.Print(out, false);
88     return out;
89   }
90   /// \brief Master thread responsible to spawn all the necessary worker threads for the two streams and
91   ///     the threads for the cleaners.
92   /// \return
93   Status operator()() override;
94 
95   /// \brief Entry function for worker thread that fetch rows from CacheLookupOp
96   /// \param workerId
97   /// \return Status object
98   Status WorkerEntry(int32_t workerId) override;
99 
100   /// \brief Perform specific post-operations on CacheOp
101   /// \return Status The status code returned
102   Status PrepareOperator() override;
103 
104   /// \brief Main thread to fetch rows from the miss child and assign it to workers
105   /// \return Status The status code returned
106   Status CacheMissMaster();
107 
108   /// \brief Entry function for worker thread that fetch rows from the cache miss stream
109   /// \param workerId
110   /// \return Status object
111   Status CacheMissWorkerEntry(int32_t workerId);
112 
113   /// \brief Base-class override for eoe handling
114   /// \param worker_id
115   /// \return Status object
116   Status EoeReceived(int32_t worker_id) override;
117 
118   /// \brief Base-class override for handling cases when an eof is received.
119   /// \param worker_id - The worker id
120   /// \return Status The status code returned
121   Status EofReceived(int32_t worker_id) override;
122 
123  protected:
124   Status ComputeColMap() override;
125 
126  private:
127   std::mutex mux_;
128   QueueMap<row_id_type, TensorRow> cache_miss_;
129   std::map<row_id_type, MemGuard<TensorRowCacheRequest, Allocator<TensorRowCacheRequest>>> io_request_;
130   std::unique_ptr<Queue<row_id_type>> io_que_;
131   int32_t num_cleaners_;
132   std::shared_ptr<CacheClient> cache_client_;
133   std::atomic<bool> cache_missing_rows_;
134 
135   QueueList<TensorRow> missWorkers_in_queues_;
136 
137   /// \brief Locate the cache request from the io_request_ map
138   /// \param row_id
139   /// \param out pointer to the cache request
140   /// \return Status object
141   Status GetRq(row_id_type row_id, TensorRowCacheRequest **out);
142 
143   /// \brief These are the entry functions for the cleaner threads. Each cleaner is responsible for
144   /// moving cache miss TensorRow into the CacheServer.
145   /// \return Status object
146   Status Cleaner();
147 };
148 }  // namespace dataset
149 }  // namespace mindspore
150 #endif  // MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_DATASETOPS_CACHE_MERGE_OP_H_
151