• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2020-2023 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include "minddata/dataset/engine/datasetops/cache_op.h"
17 
18 #include "minddata/dataset/core/config_manager.h"
19 #include "minddata/dataset/core/global_context.h"
20 #include "minddata/dataset/engine/datasetops/repeat_op.h"
21 #include "minddata/dataset/include/dataset/constants.h"
22 #include "minddata/dataset/engine/dataset_iterator.h"
23 #include "minddata/dataset/util/log_adapter.h"
24 #include "minddata/dataset/util/task_manager.h"
25 
26 namespace mindspore {
27 namespace dataset {
28 // Constructor of CacheOp
CacheOp(int32_t num_workers,int32_t op_connector_size,std::shared_ptr<CacheClient> cache_client,std::shared_ptr<SamplerRT> sampler)29 CacheOp::CacheOp(int32_t num_workers, int32_t op_connector_size, std::shared_ptr<CacheClient> cache_client,
30                  std::shared_ptr<SamplerRT> sampler)
31     : CacheBase(num_workers, op_connector_size, std::move(cache_client), std::move(sampler)),
32       num_guys_in_(0),
33       phase_(Phase::kBuildPhase) {}
34 
35 // Destructor
36 CacheOp::~CacheOp() = default;
37 
38 // This class functor will provide the master loop that drives the logic for performing the work
operator ()()39 Status CacheOp::operator()() {
40   RETURN_UNEXPECTED_IF_NULL(tree_);
41   if (!sampler_) {
42     RETURN_STATUS_UNEXPECTED("Invalid sampler, CacheOp requires a sampler before it can be executed, but got nullptr.");
43   }
44   RETURN_IF_NOT_OK(RegisterResources());
45 
46   // required task group sync after launching workers
47   TaskManager::FindMe()->Post();
48   // Wait for the workers to finish caching the rows.
49   RETURN_IF_NOT_OK(WaitForCachingAllRows());
50   // Current repeats and current epochs may have increased when caching all rows with DatasetOp::GetNextInput.
51   // But they shouldn't be increased because now cache op is starting to act as a leaf and its epoch hasn't started.
52   op_current_repeats_ = 0;
53   op_current_epochs_ = 0;
54   RETURN_IF_NOT_OK(FetchSamplesToWorkers());
55   return Status::OK();
56 }
57 
CacheAllRows(int32_t worker_id)58 Status CacheOp::CacheAllRows(int32_t worker_id) {
59   // If the current phase is to fill the cache, do it then.
60   if (phase_ == Phase::kBuildPhase) {
61     MS_LOG(INFO) << "CacheOp first epoch SAVE mode started. Worker: " << worker_id;
62     // SAVE mode loop
63     TensorRow row;
64     RETURN_IF_NOT_OK(cache_workers_in_queue_[worker_id]->PopFront(&row));
65     while (!row.eof()) {
66       if (!row.eoe()) {
67         Status rc;
68         // Do the Async write if we attach to the shared memory.
69         rc = cache_client_->AsyncWriteRow(row);
70         if (rc.StatusCode() == StatusCode::kMDNotImplementedYet) {
71           RETURN_IF_NOT_OK(cache_client_->WriteRow(row));
72         } else if (rc.IsError()) {
73           return rc;
74         }
75       } else {
76         // In a repeat-over-cache scenario, any of the "real" leaf operators below us have been set up
77         // as non-repeating leaf ops.  As such, they only do one epoch and then quit.  Since we got the
78         // the eoe to indicate the end of the epoch, we should next expect to get the eof.
79         // Drain this eof so that we don't leave it sitting there on a connector that we'll never fetch
80         // from again.
81         RETURN_IF_NOT_OK(cache_workers_in_queue_[worker_id]->PopFront(&row));
82         if (!row.eof()) {
83           RETURN_STATUS_UNEXPECTED("[Internal ERROR] Cache op expects to get an eof after eoe from child.");
84         }
85         break;
86       }
87       RETURN_IF_NOT_OK(cache_workers_in_queue_[worker_id]->PopFront(&row));
88     }
89   }
90   // Let the main guy know we are done.
91   auto last_guy_in = num_guys_in_.fetch_add(1);
92   if ((last_guy_in + 1) == num_workers_) {
93     rows_cache_done_.Set();
94   } else {
95     // Let's do a sync up here.
96     RETURN_IF_NOT_OK(rows_cache_done_.Wait());
97   }
98   return Status::OK();
99 }
WaitForCachingAllRows()100 Status CacheOp::WaitForCachingAllRows() {
101   // Fetch all rows and wait for workers to cache them
102   if (phase_ == Phase::kBuildPhase) {
103     // We will take the chance to cache the schema at the server.
104     RETURN_IF_NOT_OK(cache_client_->CacheSchema(column_name_id_map()));
105     // SAVE mode loop
106     TensorRow new_row;
107     auto child_iterator = std::make_unique<ChildIterator>(this, 0, 0);
108     int64_t ctr = 0;
109     do {
110       RETURN_IF_NOT_OK(child_iterator->FetchNextTensorRow(&new_row));
111       RETURN_IF_NOT_OK(cache_workers_in_queue_[ctr++ % num_workers_]->EmplaceBack(std::move(new_row)));
112     } while (!new_row.eof());
113 
114     for (int32_t i = 1; i < num_workers_; i++) {
115       RETURN_IF_NOT_OK(cache_workers_in_queue_[ctr++ % num_workers_]->EmplaceBack(TensorRow(TensorRow::kFlagEOF)));
116     }
117   }
118 
119   // Wait for the workers to finish caching the rows.
120   RETURN_IF_NOT_OK(rows_cache_done_.Wait());
121   // Move from build phase to fetch phase if we are the one to fill the cache
122   if (phase_ == Phase::kBuildPhase) {
123     RETURN_IF_NOT_OK(cache_client_->FlushAsyncWriteBuffer());  // One more flush
124     RETURN_IF_NOT_OK(cache_client_->BuildPhaseDone());
125     // Move to the next phase
126     phase_ = Phase::kFetchPhase;
127   }
128   // If we are not the one to create the cache,
129   // wait until the state changed from build phase to fetch base.
130   bool BuildPhaseDone = true;
131   do {
132     int8_t out;
133     RETURN_IF_NOT_OK(cache_client_->GetState(&out));
134     auto state = static_cast<CacheServiceState>(out);
135     switch (state) {
136       case CacheServiceState::kBuildPhase:
137         // Do nothing. Continue to wait.
138         BuildPhaseDone = false;
139         std::this_thread::sleep_for(std::chrono::milliseconds(kPhaseCheckIntervalInMilliSec));
140         break;
141       case CacheServiceState::kFetchPhase:
142         BuildPhaseDone = true;
143         break;
144       case CacheServiceState::kOutOfMemory:
145         RETURN_STATUS_OOM("Out of memory.");
146       case CacheServiceState::kNoSpace:
147         RETURN_STATUS_ERROR(StatusCode::kMDNoSpace,
148                             "Cache server is running of out spill storage, check memory usage.");
149       case CacheServiceState::kNone:
150       case CacheServiceState::kError:
151       default:
152         RETURN_STATUS_UNEXPECTED("Unexpected Cache server state: " + std::to_string(out));
153     }
154   } while (!BuildPhaseDone);
155   // Get statistics from the server, and if we are not the one to create the cache,
156   // wait until the state changed from build phase to fetch base.
157   CacheServiceStat stat{};
158   RETURN_IF_NOT_OK(cache_client_->GetStat(&stat));
159   const row_id_type min_key = stat.min_row_id;
160   const row_id_type max_key = stat.max_row_id;
161   num_rows_ = max_key - min_key + 1;
162   MS_LOG(INFO) << "Number of rows cached: " << num_rows_;
163   MS_LOG(INFO) << "Number of rows cached in memory : " << stat.num_mem_cached;
164   MS_LOG(INFO) << "Number of rows spilled to disk : " << stat.num_disk_cached;
165   MS_LOG(INFO) << "Average cache size : " << stat.avg_cache_sz;
166   // Now all rows are cached and we have done a sync point check up. Next phase is
167   // is pick up fetch input from sampler and pass up to the caller.
168   RETURN_IF_NOT_OK(sampler_->HandshakeRandomAccessOp(this));
169   return Status::OK();
170 }
WorkerEntry(int32_t worker_id)171 Status CacheOp::WorkerEntry(int32_t worker_id) {
172   TaskManager::FindMe()->Post();
173   RETURN_IF_NOT_OK(CacheAllRows(worker_id));
174   RETURN_IF_NOT_OK(FetchFromCache(worker_id));
175   return Status::OK();
176 }
RegisterResources()177 Status CacheOp::RegisterResources() {
178   RETURN_UNEXPECTED_IF_NULL(tree_);
179   cache_workers_in_queue_.Init(num_workers_, oc_queue_size_);
180   RETURN_IF_NOT_OK(cache_workers_in_queue_.Register(tree_->AllTasks()));
181   RETURN_IF_NOT_OK(CacheBase::RegisterResources());
182   RETURN_IF_NOT_OK(rows_cache_done_.Register(tree_->AllTasks()));
183   RETURN_IF_NOT_OK(keys_miss_->Register(tree_->AllTasks()));
184   return Status::OK();
185 }
186 
187 // Base-class override for special eoe handler.
188 // CacheOp must override this because it shall not perform default handling of eoe. Instead
189 // the CacheOp manages actions related to the end of the epoch.
EoeReceived(int32_t worker_id)190 Status CacheOp::EoeReceived(int32_t worker_id) {
191   state_ = OpState::kDeOpIdle;
192   return Status::OK();
193 }
194 // Base-class override for handling cases when an eof is received.
EofReceived(int32_t worker_id)195 Status CacheOp::EofReceived(int32_t worker_id) {
196   // eofReceived is overloaded because we want to manually handle this eof.
197   // Specifically, the default behavior is to pack it and flow it up to the next connection.
198   // In this case, we want a no-op behavior so that we can perform correct action.
199   return Status::OK();
200 }
201 
PrepareOperator()202 Status CacheOp::PrepareOperator() {
203   // Run any common code from super class first before adding our own
204   RETURN_IF_NOT_OK(DatasetOp::PrepareOperator());
205   // Get the computed check sum from all ops in our cache path below us and ask the cache op to create it's cache
206   uint32_t cache_crc = DatasetOp::GenerateCRC(shared_from_this());
207   // This is a non-mappable cache op so the id's need to be generated.
208   // Construct the cache
209   const bool generate_ids = true;
210   Status rc = cache_client_->CreateCache(cache_crc, generate_ids);
211   if (rc.StatusCode() == StatusCode::kMDDuplicateKey) {
212     // We are told the cache has been created already. So we skip the build phase.
213     phase_ = Phase::kFetchPhase;
214     rc = Status::OK();
215   }
216   RETURN_IF_NOT_OK(rc);
217   return Status::OK();
218 }
219 }  // namespace dataset
220 }  // namespace mindspore
221