1 /**
2 * Copyright 2020-2023 Huawei Technologies Co., Ltd
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 #include "minddata/dataset/engine/datasetops/cache_op.h"
17
18 #include "minddata/dataset/core/config_manager.h"
19 #include "minddata/dataset/core/global_context.h"
20 #include "minddata/dataset/engine/datasetops/repeat_op.h"
21 #include "minddata/dataset/include/dataset/constants.h"
22 #include "minddata/dataset/engine/dataset_iterator.h"
23 #include "minddata/dataset/util/log_adapter.h"
24 #include "minddata/dataset/util/task_manager.h"
25
26 namespace mindspore {
27 namespace dataset {
28 // Constructor of CacheOp
CacheOp(int32_t num_workers,int32_t op_connector_size,std::shared_ptr<CacheClient> cache_client,std::shared_ptr<SamplerRT> sampler)29 CacheOp::CacheOp(int32_t num_workers, int32_t op_connector_size, std::shared_ptr<CacheClient> cache_client,
30 std::shared_ptr<SamplerRT> sampler)
31 : CacheBase(num_workers, op_connector_size, std::move(cache_client), std::move(sampler)),
32 num_guys_in_(0),
33 phase_(Phase::kBuildPhase) {}
34
35 // Destructor
36 CacheOp::~CacheOp() = default;
37
38 // This class functor will provide the master loop that drives the logic for performing the work
operator ()()39 Status CacheOp::operator()() {
40 RETURN_UNEXPECTED_IF_NULL(tree_);
41 if (!sampler_) {
42 RETURN_STATUS_UNEXPECTED("Invalid sampler, CacheOp requires a sampler before it can be executed, but got nullptr.");
43 }
44 RETURN_IF_NOT_OK(RegisterResources());
45
46 // required task group sync after launching workers
47 TaskManager::FindMe()->Post();
48 // Wait for the workers to finish caching the rows.
49 RETURN_IF_NOT_OK(WaitForCachingAllRows());
50 // Current repeats and current epochs may have increased when caching all rows with DatasetOp::GetNextInput.
51 // But they shouldn't be increased because now cache op is starting to act as a leaf and its epoch hasn't started.
52 op_current_repeats_ = 0;
53 op_current_epochs_ = 0;
54 RETURN_IF_NOT_OK(FetchSamplesToWorkers());
55 return Status::OK();
56 }
57
CacheAllRows(int32_t worker_id)58 Status CacheOp::CacheAllRows(int32_t worker_id) {
59 // If the current phase is to fill the cache, do it then.
60 if (phase_ == Phase::kBuildPhase) {
61 MS_LOG(INFO) << "CacheOp first epoch SAVE mode started. Worker: " << worker_id;
62 // SAVE mode loop
63 TensorRow row;
64 RETURN_IF_NOT_OK(cache_workers_in_queue_[worker_id]->PopFront(&row));
65 while (!row.eof()) {
66 if (!row.eoe()) {
67 Status rc;
68 // Do the Async write if we attach to the shared memory.
69 rc = cache_client_->AsyncWriteRow(row);
70 if (rc.StatusCode() == StatusCode::kMDNotImplementedYet) {
71 RETURN_IF_NOT_OK(cache_client_->WriteRow(row));
72 } else if (rc.IsError()) {
73 return rc;
74 }
75 } else {
76 // In a repeat-over-cache scenario, any of the "real" leaf operators below us have been set up
77 // as non-repeating leaf ops. As such, they only do one epoch and then quit. Since we got the
78 // the eoe to indicate the end of the epoch, we should next expect to get the eof.
79 // Drain this eof so that we don't leave it sitting there on a connector that we'll never fetch
80 // from again.
81 RETURN_IF_NOT_OK(cache_workers_in_queue_[worker_id]->PopFront(&row));
82 if (!row.eof()) {
83 RETURN_STATUS_UNEXPECTED("[Internal ERROR] Cache op expects to get an eof after eoe from child.");
84 }
85 break;
86 }
87 RETURN_IF_NOT_OK(cache_workers_in_queue_[worker_id]->PopFront(&row));
88 }
89 }
90 // Let the main guy know we are done.
91 auto last_guy_in = num_guys_in_.fetch_add(1);
92 if ((last_guy_in + 1) == num_workers_) {
93 rows_cache_done_.Set();
94 } else {
95 // Let's do a sync up here.
96 RETURN_IF_NOT_OK(rows_cache_done_.Wait());
97 }
98 return Status::OK();
99 }
WaitForCachingAllRows()100 Status CacheOp::WaitForCachingAllRows() {
101 // Fetch all rows and wait for workers to cache them
102 if (phase_ == Phase::kBuildPhase) {
103 // We will take the chance to cache the schema at the server.
104 RETURN_IF_NOT_OK(cache_client_->CacheSchema(column_name_id_map()));
105 // SAVE mode loop
106 TensorRow new_row;
107 auto child_iterator = std::make_unique<ChildIterator>(this, 0, 0);
108 int64_t ctr = 0;
109 do {
110 RETURN_IF_NOT_OK(child_iterator->FetchNextTensorRow(&new_row));
111 RETURN_IF_NOT_OK(cache_workers_in_queue_[ctr++ % num_workers_]->EmplaceBack(std::move(new_row)));
112 } while (!new_row.eof());
113
114 for (int32_t i = 1; i < num_workers_; i++) {
115 RETURN_IF_NOT_OK(cache_workers_in_queue_[ctr++ % num_workers_]->EmplaceBack(TensorRow(TensorRow::kFlagEOF)));
116 }
117 }
118
119 // Wait for the workers to finish caching the rows.
120 RETURN_IF_NOT_OK(rows_cache_done_.Wait());
121 // Move from build phase to fetch phase if we are the one to fill the cache
122 if (phase_ == Phase::kBuildPhase) {
123 RETURN_IF_NOT_OK(cache_client_->FlushAsyncWriteBuffer()); // One more flush
124 RETURN_IF_NOT_OK(cache_client_->BuildPhaseDone());
125 // Move to the next phase
126 phase_ = Phase::kFetchPhase;
127 }
128 // If we are not the one to create the cache,
129 // wait until the state changed from build phase to fetch base.
130 bool BuildPhaseDone = true;
131 do {
132 int8_t out;
133 RETURN_IF_NOT_OK(cache_client_->GetState(&out));
134 auto state = static_cast<CacheServiceState>(out);
135 switch (state) {
136 case CacheServiceState::kBuildPhase:
137 // Do nothing. Continue to wait.
138 BuildPhaseDone = false;
139 std::this_thread::sleep_for(std::chrono::milliseconds(kPhaseCheckIntervalInMilliSec));
140 break;
141 case CacheServiceState::kFetchPhase:
142 BuildPhaseDone = true;
143 break;
144 case CacheServiceState::kOutOfMemory:
145 RETURN_STATUS_OOM("Out of memory.");
146 case CacheServiceState::kNoSpace:
147 RETURN_STATUS_ERROR(StatusCode::kMDNoSpace,
148 "Cache server is running of out spill storage, check memory usage.");
149 case CacheServiceState::kNone:
150 case CacheServiceState::kError:
151 default:
152 RETURN_STATUS_UNEXPECTED("Unexpected Cache server state: " + std::to_string(out));
153 }
154 } while (!BuildPhaseDone);
155 // Get statistics from the server, and if we are not the one to create the cache,
156 // wait until the state changed from build phase to fetch base.
157 CacheServiceStat stat{};
158 RETURN_IF_NOT_OK(cache_client_->GetStat(&stat));
159 const row_id_type min_key = stat.min_row_id;
160 const row_id_type max_key = stat.max_row_id;
161 num_rows_ = max_key - min_key + 1;
162 MS_LOG(INFO) << "Number of rows cached: " << num_rows_;
163 MS_LOG(INFO) << "Number of rows cached in memory : " << stat.num_mem_cached;
164 MS_LOG(INFO) << "Number of rows spilled to disk : " << stat.num_disk_cached;
165 MS_LOG(INFO) << "Average cache size : " << stat.avg_cache_sz;
166 // Now all rows are cached and we have done a sync point check up. Next phase is
167 // is pick up fetch input from sampler and pass up to the caller.
168 RETURN_IF_NOT_OK(sampler_->HandshakeRandomAccessOp(this));
169 return Status::OK();
170 }
WorkerEntry(int32_t worker_id)171 Status CacheOp::WorkerEntry(int32_t worker_id) {
172 TaskManager::FindMe()->Post();
173 RETURN_IF_NOT_OK(CacheAllRows(worker_id));
174 RETURN_IF_NOT_OK(FetchFromCache(worker_id));
175 return Status::OK();
176 }
RegisterResources()177 Status CacheOp::RegisterResources() {
178 RETURN_UNEXPECTED_IF_NULL(tree_);
179 cache_workers_in_queue_.Init(num_workers_, oc_queue_size_);
180 RETURN_IF_NOT_OK(cache_workers_in_queue_.Register(tree_->AllTasks()));
181 RETURN_IF_NOT_OK(CacheBase::RegisterResources());
182 RETURN_IF_NOT_OK(rows_cache_done_.Register(tree_->AllTasks()));
183 RETURN_IF_NOT_OK(keys_miss_->Register(tree_->AllTasks()));
184 return Status::OK();
185 }
186
187 // Base-class override for special eoe handler.
188 // CacheOp must override this because it shall not perform default handling of eoe. Instead
189 // the CacheOp manages actions related to the end of the epoch.
EoeReceived(int32_t worker_id)190 Status CacheOp::EoeReceived(int32_t worker_id) {
191 state_ = OpState::kDeOpIdle;
192 return Status::OK();
193 }
194 // Base-class override for handling cases when an eof is received.
EofReceived(int32_t worker_id)195 Status CacheOp::EofReceived(int32_t worker_id) {
196 // eofReceived is overloaded because we want to manually handle this eof.
197 // Specifically, the default behavior is to pack it and flow it up to the next connection.
198 // In this case, we want a no-op behavior so that we can perform correct action.
199 return Status::OK();
200 }
201
PrepareOperator()202 Status CacheOp::PrepareOperator() {
203 // Run any common code from super class first before adding our own
204 RETURN_IF_NOT_OK(DatasetOp::PrepareOperator());
205 // Get the computed check sum from all ops in our cache path below us and ask the cache op to create it's cache
206 uint32_t cache_crc = DatasetOp::GenerateCRC(shared_from_this());
207 // This is a non-mappable cache op so the id's need to be generated.
208 // Construct the cache
209 const bool generate_ids = true;
210 Status rc = cache_client_->CreateCache(cache_crc, generate_ids);
211 if (rc.StatusCode() == StatusCode::kMDDuplicateKey) {
212 // We are told the cache has been created already. So we skip the build phase.
213 phase_ = Phase::kFetchPhase;
214 rc = Status::OK();
215 }
216 RETURN_IF_NOT_OK(rc);
217 return Status::OK();
218 }
219 } // namespace dataset
220 } // namespace mindspore
221