1 /**
2 * Copyright 2020-2021 Huawei Technologies Co., Ltd
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 #include "minddata/dataset/engine/datasetops/cache_op.h"
17
18 #include <memory>
19 #include <utility>
20 #include "minddata/dataset/core/config_manager.h"
21 #include "minddata/dataset/include/dataset/constants.h"
22 #include "minddata/dataset/core/global_context.h"
23 #include "minddata/dataset/engine/datasetops/repeat_op.h"
24 #include "minddata/dataset/engine/dataset_iterator.h"
25
26 #include "minddata/dataset/util/log_adapter.h"
27 #include "minddata/dataset/util/task_manager.h"
28
29 namespace mindspore {
30 namespace dataset {
31 // Constructor of CacheOp
CacheOp(int32_t num_workers,int32_t op_connector_size,std::shared_ptr<CacheClient> cache_client,std::shared_ptr<SamplerRT> sampler)32 CacheOp::CacheOp(int32_t num_workers, int32_t op_connector_size, std::shared_ptr<CacheClient> cache_client,
33 std::shared_ptr<SamplerRT> sampler)
34 : CacheBase(num_workers, op_connector_size, std::move(cache_client), std::move(sampler)),
35 num_guys_in_(0),
36 phase_(Phase::kBuildPhase) {}
37
38 // Destructor
39 CacheOp::~CacheOp() = default;
40
41 // This class functor will provide the master loop that drives the logic for performing the work
operator ()()42 Status CacheOp::operator()() {
43 RETURN_UNEXPECTED_IF_NULL(tree_);
44 if (!sampler_) {
45 return Status(StatusCode::kMDUnexpectedError, __LINE__, __FILE__,
46 "Invalid parameter, CacheOp requires a sampler before it can be executed, but got nullptr.");
47 }
48 RETURN_IF_NOT_OK(RegisterResources());
49 // Kick off the workers
50 RETURN_IF_NOT_OK(
51 tree_->LaunchWorkers(num_workers_, std::bind(&CacheOp::WorkerEntry, this, std::placeholders::_1), Name()));
52 // required task group sync after launching workers
53 TaskManager::FindMe()->Post();
54 // Wait for the workers to finish caching the rows.
55 RETURN_IF_NOT_OK(WaitForCachingAllRows());
56 // Current repeats and current epochs may have increased when caching all rows with DatasetOp::GetNextInput.
57 // But they shouldn't be increased because now cache op is starting to act as a leaf and its epoch hasn't started.
58 op_current_repeats_ = 0;
59 op_current_epochs_ = 0;
60 RETURN_IF_NOT_OK(FetchSamplesToWorkers());
61 return Status::OK();
62 }
CacheAllRows(int32_t worker_id)63 Status CacheOp::CacheAllRows(int32_t worker_id) {
64 // If the current phase is to fill the cache, do it then.
65 if (phase_ == Phase::kBuildPhase) {
66 // We will take the chance to cache the schema at the server.
67 // Just do it once and pick one worker to do it.
68 if (worker_id == 0) {
69 RETURN_IF_NOT_OK(cache_client_->CacheSchema(column_name_id_map()));
70 }
71 MS_LOG(INFO) << "CacheOp first epoch SAVE mode started. Worker: " << worker_id;
72 // SAVE mode loop
73 TensorRow row;
74 auto child_iterator = std::make_unique<ChildIterator>(this, worker_id, 0);
75 RETURN_IF_NOT_OK(child_iterator->FetchNextTensorRow(&row));
76 while (!row.eof()) {
77 if (!row.eoe()) {
78 Status rc;
79 // Do the Async write if we attach to the shared memory.
80 rc = cache_client_->AsyncWriteRow(row);
81 if (rc.StatusCode() == StatusCode::kMDNotImplementedYet) {
82 RETURN_IF_NOT_OK(cache_client_->WriteRow(row));
83 } else if (rc.IsError()) {
84 return rc;
85 }
86 } else {
87 // In a repeat-over-cache scenario, any of the "real" leaf operators below us have been set up
88 // as non-repeating leaf ops. As such, they only do one epoch and then quit. Since we got the
89 // the eoe to indicate the end of the epoch, we should next expect to get the eof.
90 // Drain this eof so that we don't leave it sitting there on a connector that we'll never fetch
91 // from again.
92 RETURN_IF_NOT_OK(child_iterator->FetchNextTensorRow(&row));
93 if (!row.eof()) {
94 RETURN_STATUS_UNEXPECTED("[Internal ERROR] Cache op expects to get an eof after eoe from child.");
95 }
96 break;
97 }
98 RETURN_IF_NOT_OK(child_iterator->FetchNextTensorRow(&row));
99 }
100 }
101 // Let the main guy know we are done.
102 auto last_guy_in = num_guys_in_.fetch_add(1);
103 if ((last_guy_in + 1) == num_workers_) {
104 rows_cache_done_.Set();
105 } else {
106 // Let's do a sync up here.
107 RETURN_IF_NOT_OK(rows_cache_done_.Wait());
108 }
109 return Status::OK();
110 }
WaitForCachingAllRows()111 Status CacheOp::WaitForCachingAllRows() {
112 // Wait for the workers to finish caching the rows.
113 RETURN_IF_NOT_OK(rows_cache_done_.Wait());
114 // Move from build phase to fetch phase if we are the one to fill the cache
115 if (phase_ == Phase::kBuildPhase) {
116 RETURN_IF_NOT_OK(cache_client_->FlushAsyncWriteBuffer()); // One more flush
117 RETURN_IF_NOT_OK(cache_client_->BuildPhaseDone());
118 // Move to the next phase
119 phase_ = Phase::kFetchPhase;
120 }
121 // If we are not the one to create the cache,
122 // wait until the state changed from build phase to fetch base.
123 bool BuildPhaseDone = true;
124 do {
125 int8_t out;
126 RETURN_IF_NOT_OK(cache_client_->GetState(&out));
127 auto state = static_cast<CacheServiceState>(out);
128 switch (state) {
129 case CacheServiceState::kBuildPhase:
130 // Do nothing. Continue to wait.
131 BuildPhaseDone = false;
132 std::this_thread::sleep_for(std::chrono::milliseconds(kPhaseCheckIntervalInMilliSec));
133 break;
134 case CacheServiceState::kFetchPhase:
135 BuildPhaseDone = true;
136 break;
137 case CacheServiceState::kOutOfMemory:
138 return Status(StatusCode::kMDOutOfMemory, "Cache server is running out of memory");
139 case CacheServiceState::kNoSpace:
140 return Status(StatusCode::kMDNoSpace, "Cache server is running of out spill storage");
141 case CacheServiceState::kNone:
142 case CacheServiceState::kError:
143 default:
144 RETURN_STATUS_UNEXPECTED("Unexpected Cache server state: " + std::to_string(out));
145 }
146 } while (!BuildPhaseDone);
147 // Get statistics from the server, and if we are not the one to create the cache,
148 // wait until the state changed from build phase to fetch base.
149 CacheServiceStat stat{};
150 RETURN_IF_NOT_OK(cache_client_->GetStat(&stat));
151 const row_id_type min_key = stat.min_row_id;
152 const row_id_type max_key = stat.max_row_id;
153 num_rows_ = max_key - min_key + 1;
154 MS_LOG(INFO) << "Number of rows cached: " << num_rows_;
155 MS_LOG(INFO) << "Number of rows cached in memory : " << stat.num_mem_cached;
156 MS_LOG(INFO) << "Number of rows spilled to disk : " << stat.num_disk_cached;
157 MS_LOG(INFO) << "Average cache size : " << stat.avg_cache_sz;
158 // Now all rows are cached and we have done a sync point check up. Next phase is
159 // is pick up fetch input from sampler and pass up to the caller.
160 RETURN_IF_NOT_OK(sampler_->HandshakeRandomAccessOp(this));
161 return Status::OK();
162 }
WorkerEntry(int32_t worker_id)163 Status CacheOp::WorkerEntry(int32_t worker_id) {
164 TaskManager::FindMe()->Post();
165 RETURN_IF_NOT_OK(CacheAllRows(worker_id));
166 RETURN_IF_NOT_OK(FetchFromCache(worker_id));
167 return Status::OK();
168 }
RegisterResources()169 Status CacheOp::RegisterResources() {
170 RETURN_UNEXPECTED_IF_NULL(tree_);
171 RETURN_IF_NOT_OK(CacheBase::RegisterResources());
172 RETURN_IF_NOT_OK(rows_cache_done_.Register(tree_->AllTasks()));
173 RETURN_IF_NOT_OK(keys_miss_->Register(tree_->AllTasks()));
174 return Status::OK();
175 }
176
177 // Base-class override for special eoe handler.
178 // CacheOp must override this because it shall not perform default handling of eoe. Instead
179 // the CacheOp manages actions related to the end of the epoch.
EoeReceived(int32_t worker_id)180 Status CacheOp::EoeReceived(int32_t worker_id) {
181 state_ = OpState::kDeOpIdle;
182 return Status::OK();
183 }
184 // Base-class override for handling cases when an eof is received.
EofReceived(int32_t worker_id)185 Status CacheOp::EofReceived(int32_t worker_id) {
186 // eofReceived is overloaded because we want to manually handle this eof.
187 // Specifically, the default behaviour is to pack it and flow it up to the next connection.
188 // In this case, we want a no-op behaviour so that we can perform correct action.
189 return Status::OK();
190 }
191
PrepareOperator()192 Status CacheOp::PrepareOperator() {
193 // Run any common code from super class first before adding our own
194 RETURN_IF_NOT_OK(DatasetOp::PrepareOperator());
195 // Get the computed check sum from all ops in our cache path below us and ask the cache op to create it's cache
196 uint32_t cache_crc = DatasetOp::GenerateCRC(shared_from_this());
197 // This is a non-mappable cache op so the id's need to be generated.
198 // Construct the cache
199 const bool generate_ids = true;
200 Status rc = cache_client_->CreateCache(cache_crc, generate_ids);
201 if (rc.StatusCode() == StatusCode::kMDDuplicateKey) {
202 // We are told the cache has been created already. So we skip the build phase.
203 phase_ = Phase::kFetchPhase;
204 rc = Status::OK();
205 }
206 RETURN_IF_NOT_OK(rc);
207 return Status::OK();
208 }
209 } // namespace dataset
210 } // namespace mindspore
211