• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2020-2021 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include "minddata/dataset/engine/datasetops/cache_op.h"
17 
18 #include <memory>
19 #include <utility>
20 #include "minddata/dataset/core/config_manager.h"
21 #include "minddata/dataset/include/dataset/constants.h"
22 #include "minddata/dataset/core/global_context.h"
23 #include "minddata/dataset/engine/datasetops/repeat_op.h"
24 #include "minddata/dataset/engine/dataset_iterator.h"
25 
26 #include "minddata/dataset/util/log_adapter.h"
27 #include "minddata/dataset/util/task_manager.h"
28 
29 namespace mindspore {
30 namespace dataset {
31 // Constructor of CacheOp
CacheOp(int32_t num_workers,int32_t op_connector_size,std::shared_ptr<CacheClient> cache_client,std::shared_ptr<SamplerRT> sampler)32 CacheOp::CacheOp(int32_t num_workers, int32_t op_connector_size, std::shared_ptr<CacheClient> cache_client,
33                  std::shared_ptr<SamplerRT> sampler)
34     : CacheBase(num_workers, op_connector_size, std::move(cache_client), std::move(sampler)),
35       num_guys_in_(0),
36       phase_(Phase::kBuildPhase) {}
37 
38 // Destructor
39 CacheOp::~CacheOp() = default;
40 
41 // This class functor will provide the master loop that drives the logic for performing the work
operator ()()42 Status CacheOp::operator()() {
43   RETURN_UNEXPECTED_IF_NULL(tree_);
44   if (!sampler_) {
45     return Status(StatusCode::kMDUnexpectedError, __LINE__, __FILE__,
46                   "Invalid parameter, CacheOp requires a sampler before it can be executed, but got nullptr.");
47   }
48   RETURN_IF_NOT_OK(RegisterResources());
49   // Kick off the workers
50   RETURN_IF_NOT_OK(
51     tree_->LaunchWorkers(num_workers_, std::bind(&CacheOp::WorkerEntry, this, std::placeholders::_1), Name()));
52   // required task group sync after launching workers
53   TaskManager::FindMe()->Post();
54   // Wait for the workers to finish caching the rows.
55   RETURN_IF_NOT_OK(WaitForCachingAllRows());
56   // Current repeats and current epochs may have increased when caching all rows with DatasetOp::GetNextInput.
57   // But they shouldn't be increased because now cache op is starting to act as a leaf and its epoch hasn't started.
58   op_current_repeats_ = 0;
59   op_current_epochs_ = 0;
60   RETURN_IF_NOT_OK(FetchSamplesToWorkers());
61   return Status::OK();
62 }
CacheAllRows(int32_t worker_id)63 Status CacheOp::CacheAllRows(int32_t worker_id) {
64   // If the current phase is to fill the cache, do it then.
65   if (phase_ == Phase::kBuildPhase) {
66     // We will take the chance to cache the schema at the server.
67     // Just do it once and pick one worker to do it.
68     if (worker_id == 0) {
69       RETURN_IF_NOT_OK(cache_client_->CacheSchema(column_name_id_map()));
70     }
71     MS_LOG(INFO) << "CacheOp first epoch SAVE mode started. Worker: " << worker_id;
72     // SAVE mode loop
73     TensorRow row;
74     auto child_iterator = std::make_unique<ChildIterator>(this, worker_id, 0);
75     RETURN_IF_NOT_OK(child_iterator->FetchNextTensorRow(&row));
76     while (!row.eof()) {
77       if (!row.eoe()) {
78         Status rc;
79         // Do the Async write if we attach to the shared memory.
80         rc = cache_client_->AsyncWriteRow(row);
81         if (rc.StatusCode() == StatusCode::kMDNotImplementedYet) {
82           RETURN_IF_NOT_OK(cache_client_->WriteRow(row));
83         } else if (rc.IsError()) {
84           return rc;
85         }
86       } else {
87         // In a repeat-over-cache scenario, any of the "real" leaf operators below us have been set up
88         // as non-repeating leaf ops.  As such, they only do one epoch and then quit.  Since we got the
89         // the eoe to indicate the end of the epoch, we should next expect to get the eof.
90         // Drain this eof so that we don't leave it sitting there on a connector that we'll never fetch
91         // from again.
92         RETURN_IF_NOT_OK(child_iterator->FetchNextTensorRow(&row));
93         if (!row.eof()) {
94           RETURN_STATUS_UNEXPECTED("[Internal ERROR] Cache op expects to get an eof after eoe from child.");
95         }
96         break;
97       }
98       RETURN_IF_NOT_OK(child_iterator->FetchNextTensorRow(&row));
99     }
100   }
101   // Let the main guy know we are done.
102   auto last_guy_in = num_guys_in_.fetch_add(1);
103   if ((last_guy_in + 1) == num_workers_) {
104     rows_cache_done_.Set();
105   } else {
106     // Let's do a sync up here.
107     RETURN_IF_NOT_OK(rows_cache_done_.Wait());
108   }
109   return Status::OK();
110 }
WaitForCachingAllRows()111 Status CacheOp::WaitForCachingAllRows() {
112   // Wait for the workers to finish caching the rows.
113   RETURN_IF_NOT_OK(rows_cache_done_.Wait());
114   // Move from build phase to fetch phase if we are the one to fill the cache
115   if (phase_ == Phase::kBuildPhase) {
116     RETURN_IF_NOT_OK(cache_client_->FlushAsyncWriteBuffer());  // One more flush
117     RETURN_IF_NOT_OK(cache_client_->BuildPhaseDone());
118     // Move to the next phase
119     phase_ = Phase::kFetchPhase;
120   }
121   // If we are not the one to create the cache,
122   // wait until the state changed from build phase to fetch base.
123   bool BuildPhaseDone = true;
124   do {
125     int8_t out;
126     RETURN_IF_NOT_OK(cache_client_->GetState(&out));
127     auto state = static_cast<CacheServiceState>(out);
128     switch (state) {
129       case CacheServiceState::kBuildPhase:
130         // Do nothing. Continue to wait.
131         BuildPhaseDone = false;
132         std::this_thread::sleep_for(std::chrono::milliseconds(kPhaseCheckIntervalInMilliSec));
133         break;
134       case CacheServiceState::kFetchPhase:
135         BuildPhaseDone = true;
136         break;
137       case CacheServiceState::kOutOfMemory:
138         return Status(StatusCode::kMDOutOfMemory, "Cache server is running out of memory");
139       case CacheServiceState::kNoSpace:
140         return Status(StatusCode::kMDNoSpace, "Cache server is running of out spill storage");
141       case CacheServiceState::kNone:
142       case CacheServiceState::kError:
143       default:
144         RETURN_STATUS_UNEXPECTED("Unexpected Cache server state: " + std::to_string(out));
145     }
146   } while (!BuildPhaseDone);
147   // Get statistics from the server, and if we are not the one to create the cache,
148   // wait until the state changed from build phase to fetch base.
149   CacheServiceStat stat{};
150   RETURN_IF_NOT_OK(cache_client_->GetStat(&stat));
151   const row_id_type min_key = stat.min_row_id;
152   const row_id_type max_key = stat.max_row_id;
153   num_rows_ = max_key - min_key + 1;
154   MS_LOG(INFO) << "Number of rows cached: " << num_rows_;
155   MS_LOG(INFO) << "Number of rows cached in memory : " << stat.num_mem_cached;
156   MS_LOG(INFO) << "Number of rows spilled to disk : " << stat.num_disk_cached;
157   MS_LOG(INFO) << "Average cache size : " << stat.avg_cache_sz;
158   // Now all rows are cached and we have done a sync point check up. Next phase is
159   // is pick up fetch input from sampler and pass up to the caller.
160   RETURN_IF_NOT_OK(sampler_->HandshakeRandomAccessOp(this));
161   return Status::OK();
162 }
WorkerEntry(int32_t worker_id)163 Status CacheOp::WorkerEntry(int32_t worker_id) {
164   TaskManager::FindMe()->Post();
165   RETURN_IF_NOT_OK(CacheAllRows(worker_id));
166   RETURN_IF_NOT_OK(FetchFromCache(worker_id));
167   return Status::OK();
168 }
RegisterResources()169 Status CacheOp::RegisterResources() {
170   RETURN_UNEXPECTED_IF_NULL(tree_);
171   RETURN_IF_NOT_OK(CacheBase::RegisterResources());
172   RETURN_IF_NOT_OK(rows_cache_done_.Register(tree_->AllTasks()));
173   RETURN_IF_NOT_OK(keys_miss_->Register(tree_->AllTasks()));
174   return Status::OK();
175 }
176 
177 // Base-class override for special eoe handler.
178 // CacheOp must override this because it shall not perform default handling of eoe. Instead
179 // the CacheOp manages actions related to the end of the epoch.
EoeReceived(int32_t worker_id)180 Status CacheOp::EoeReceived(int32_t worker_id) {
181   state_ = OpState::kDeOpIdle;
182   return Status::OK();
183 }
184 // Base-class override for handling cases when an eof is received.
EofReceived(int32_t worker_id)185 Status CacheOp::EofReceived(int32_t worker_id) {
186   // eofReceived is overloaded because we want to manually handle this eof.
187   // Specifically, the default behaviour is to pack it and flow it up to the next connection.
188   // In this case, we want a no-op behaviour so that we can perform correct action.
189   return Status::OK();
190 }
191 
PrepareOperator()192 Status CacheOp::PrepareOperator() {
193   // Run any common code from super class first before adding our own
194   RETURN_IF_NOT_OK(DatasetOp::PrepareOperator());
195   // Get the computed check sum from all ops in our cache path below us and ask the cache op to create it's cache
196   uint32_t cache_crc = DatasetOp::GenerateCRC(shared_from_this());
197   // This is a non-mappable cache op so the id's need to be generated.
198   // Construct the cache
199   const bool generate_ids = true;
200   Status rc = cache_client_->CreateCache(cache_crc, generate_ids);
201   if (rc.StatusCode() == StatusCode::kMDDuplicateKey) {
202     // We are told the cache has been created already. So we skip the build phase.
203     phase_ = Phase::kFetchPhase;
204     rc = Status::OK();
205   }
206   RETURN_IF_NOT_OK(rc);
207   return Status::OK();
208 }
209 }  // namespace dataset
210 }  // namespace mindspore
211