1 /**
2 * Copyright 2020-2021 Huawei Technologies Co., Ltd
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "minddata/dataset/engine/cache/perf/cache_pipeline_run.h"
18
19 #include "minddata/dataset/core/tensor.h"
20 #include "minddata/dataset/engine/data_schema.h"
21 #include "minddata/dataset/util/random.h"
22 #include "minddata/dataset/util/services.h"
23
24 namespace mindspore {
25 namespace dataset {
PrintHelp()26 void CachePipelineRun::PrintHelp() { std::cout << "Please run the executable cache_perf instead." << std::endl; }
27
ProcessPipelineArgs(char * argv)28 int32_t CachePipelineRun::ProcessPipelineArgs(char *argv) {
29 try {
30 std::stringstream cfg_ss(argv);
31 std::string s;
32 int32_t numArgs = 0;
33 while (std::getline(cfg_ss, s, ',')) {
34 if (numArgs == 0) {
35 my_pipeline_ = std::stoi(s);
36 } else if (numArgs == 1) {
37 session_ = std::stoul(s);
38 cache_builder_.SetSessionId(session_);
39 } else if (numArgs == 2) {
40 crc_ = std::stoi(s);
41 } else if (numArgs == 3) {
42 recv_id_ = std::stoi(s);
43 } else if (numArgs == 4) {
44 send_id_ = std::stoi(s);
45 } else if (numArgs == 5) {
46 num_pipelines_ = std::stoi(s);
47 } else if (numArgs == 6) {
48 num_epoches_ = std::stoi(s);
49 } else if (numArgs == 7) {
50 num_rows_ = std::stol(s);
51 } else if (numArgs == 8) {
52 row_size_ = std::stoi(s);
53 } else if (numArgs == 9) {
54 cfg_.set_num_parallel_workers(std::stol(s));
55 } else if (numArgs == 10) {
56 shuffle_ = strcmp(s.data(), "true") == 0;
57 }
58 ++numArgs;
59 }
60 if (numArgs != 11) {
61 std::cerr << "Incomplete arguments. Expect 11. But get " << numArgs << std::endl;
62 return -1;
63 }
64 } catch (const std::exception &e) {
65 std::cerr << "Parse error: " << e.what() << std::endl;
66 return -1;
67 }
68 return 0;
69 }
70
ProcessClientArgs(char * argv)71 int32_t CachePipelineRun::ProcessClientArgs(char *argv) {
72 try {
73 std::stringstream client_ss(argv);
74 std::string s;
75 int32_t numArgs = 0;
76 while (std::getline(client_ss, s, ',')) {
77 if (numArgs == 0) {
78 cache_builder_.SetHostname(s);
79 } else if (numArgs == 1) {
80 cache_builder_.SetPort(std::stoi(s));
81 } else if (numArgs == 2) {
82 cache_builder_.SetPrefetchSize(std::stoi(s));
83 } else if (numArgs == 3) {
84 cache_builder_.SetCacheMemSz(std::stoi(s));
85 } else if (numArgs == 4) {
86 cache_builder_.SetNumConnections(std::stoi(s));
87 } else if (numArgs == 5) {
88 cache_builder_.SetSpill(strcmp(s.data(), "true") == 0);
89 }
90 ++numArgs;
91 }
92 if (numArgs != 6) {
93 std::cerr << "Incomplete arguments. Expect 6. But get " << numArgs << std::endl;
94 return -1;
95 }
96 } catch (const std::exception &e) {
97 std::cerr << "Parse error: " << e.what() << std::endl;
98 return -1;
99 }
100 return 0;
101 }
102
ProcessArgs(int argc,char ** argv)103 int32_t CachePipelineRun::ProcessArgs(int argc, char **argv) {
104 if (argc != 3) {
105 PrintHelp();
106 return -1;
107 }
108 int32_t rc = ProcessPipelineArgs(argv[1]);
109 if (rc < 0) return rc;
110 rc = ProcessClientArgs(argv[2]);
111 return rc;
112 }
113
CachePipelineRun()114 CachePipelineRun::CachePipelineRun()
115 : my_pipeline_(-1),
116 num_pipelines_(kDftNumOfPipelines),
117 num_epoches_(kDftNumberOfEpochs),
118 num_rows_(0),
119 row_size_(0),
120 shuffle_(kDftShuffle),
121 session_(0),
122 crc_(0),
123 send_id_(-1),
124 recv_id_(-1),
125 start_row_(-1),
126 end_row_(-1) {
127 cache_builder_.SetSpill(kDftSpill).SetCacheMemSz(kDftCacheSize);
128 }
129
~CachePipelineRun()130 CachePipelineRun::~CachePipelineRun() {
131 CachePerfMsg msg;
132 (void)SendMessage<ErrorMsg>(&msg, CachePerfMsg::MessageType::kInterrupt, nullptr);
133 }
134
ListenToParent()135 Status CachePipelineRun::ListenToParent() {
136 TaskManager::FindMe()->Post();
137 do {
138 RETURN_IF_INTERRUPTED();
139 CachePerfMsg msg;
140 RETURN_IF_NOT_OK(msg.Receive(recv_id_));
141 // Decode the messages.
142 auto type = msg.GetType();
143 switch (type) {
144 case CachePerfMsg::MessageType::kInterrupt: {
145 TaskManager::WakeUpWatchDog();
146 return Status::OK();
147 }
148 case CachePerfMsg::MessageType::kEpochStart: {
149 pipeline_wp_.Set();
150 break;
151 }
152 default:
153 std::string errMsg = "Unknown request type: " + std::to_string(type);
154 MS_LOG(ERROR) << errMsg;
155 RETURN_STATUS_UNEXPECTED(errMsg);
156 break;
157 }
158 } while (true);
159
160 return Status::OK();
161 }
162
Run()163 Status CachePipelineRun::Run() {
164 RETURN_IF_NOT_OK(cache_builder_.Build(&cc_));
165 RETURN_IF_NOT_OK(vg_.ServiceStart());
166
167 auto num_workers = cfg_.num_parallel_workers();
168 io_block_queues_.Init(num_workers, cfg_.op_connector_size());
169
170 RETURN_IF_NOT_OK(io_block_queues_.Register(&vg_));
171
172 Status rc = cc_->CreateCache(crc_, false);
173 // Duplicate key is fine.
174 if (rc.IsError() && rc != StatusCode::kMDDuplicateKey) {
175 return rc;
176 }
177
178 // Log a warning level message so we can see it in the log file when it starts.
179 MS_LOG(WARNING) << "Pipeline number " << (my_pipeline_ + 1) << " successfully creating cache service." << std::endl;
180
181 // Spawn a thread to listen to the parent process
182 RETURN_IF_NOT_OK(vg_.CreateAsyncTask("Queue listener", std::bind(&CachePipelineRun::ListenToParent, this)));
183
184 RETURN_IF_NOT_OK(RunFirstEpoch());
185
186 // The rest of the epochs are just fetching.
187 auto remaining_epochs = num_epoches_ - 1;
188 while (remaining_epochs > 0) {
189 // Wait for parent process signal to start
190 pipeline_wp_.Wait();
191 pipeline_wp_.Clear();
192 RETURN_IF_NOT_OK(RunReadEpoch());
193 --remaining_epochs;
194 }
195
196 // The listener thread is blocked on a shared message queue. It will be waken up by
197 // the parent process which will send an interrupt message to it, and this program
198 // will exit.
199 RETURN_IF_NOT_OK(vg_.join_all());
200 return Status::OK();
201 }
202
RunFirstEpoch()203 Status CachePipelineRun::RunFirstEpoch() {
204 auto sz = num_rows_ / num_pipelines_;
205 start_row_ = my_pipeline_ * sz;
206 end_row_ = (my_pipeline_ + 1) * sz - 1;
207 if (my_pipeline_ + 1 == num_pipelines_) {
208 end_row_ = num_rows_ - 1;
209 }
210 std::cout << "Pipeline number " << (my_pipeline_ + 1) << " row id range: [" << start_row_ << "," << end_row_ << "]"
211 << std::endl;
212
213 // Spawn the worker threads.
214 auto f = std::bind(&CachePipelineRun::WriterWorkerEntry, this, std::placeholders::_1);
215 std::vector<Task *> worker_threads;
216 auto num_workers = cfg_.num_parallel_workers();
217 worker_threads.reserve(num_workers);
218 for (int32_t i = 0; i < num_workers; ++i) {
219 Task *pTask;
220 RETURN_IF_NOT_OK(vg_.CreateAsyncTask("Parallel worker", std::bind(f, i), &pTask));
221 worker_threads.push_back(pTask);
222 }
223
224 std::vector<row_id_type> keys;
225 keys.reserve(1);
226 int32_t worker_id = 0;
227 for (auto i = start_row_; i <= end_row_; ++i) {
228 keys.push_back(i);
229 auto blk = std::make_unique<IOBlock>(IOBlock(keys, IOBlock::kDeIoBlockNone));
230 RETURN_IF_NOT_OK(io_block_queues_[worker_id++ % num_workers]->Add(std::move(blk)));
231 keys.clear();
232 }
233 if (!keys.empty()) {
234 auto blk = std::make_unique<IOBlock>(IOBlock(keys, IOBlock::kDeIoBlockNone));
235 RETURN_IF_NOT_OK(io_block_queues_[worker_id++ % num_workers]->Add(std::move(blk)));
236 keys.clear();
237 }
238
239 // Shutdown threads and wait for them to come back.
240 for (int32_t i = 0; i < num_workers; i++) {
241 RETURN_IF_NOT_OK(
242 io_block_queues_[i]->Add(std::make_unique<IOBlock>(std::vector<int64_t>(), IOBlock::kDeIoBlockNone)));
243 }
244 for (auto *pTask : worker_threads) {
245 RETURN_IF_NOT_OK(pTask->Join(Task::WaitFlag::kBlocking));
246 }
247
248 // Final flush
249 cc_->FlushAsyncWriteBuffer();
250
251 // Send a message saying epoch one done for this pipeline.
252 EpochDone proto;
253 proto.set_pipeline(my_pipeline_);
254 CachePerfMsg msg;
255 RETURN_IF_NOT_OK(SendMessage(&msg, CachePerfMsg::MessageType::kEpochEnd, &proto));
256
257 return Status::OK();
258 }
259
WriterWorkerEntry(int32_t worker_id)260 Status CachePipelineRun::WriterWorkerEntry(int32_t worker_id) {
261 Status rc;
262 TaskManager::FindMe()->Post();
263 int64_t min_val = std::numeric_limits<int64_t>::max();
264 int64_t max_val = 0;
265 int64_t total_val = 0;
266 std::vector<int64_t> duration;
267 duration.reserve(num_rows_ / num_pipelines_ / cfg_.num_parallel_workers());
268 bool resource_err = false;
269 auto col_desc = std::make_unique<ColDescriptor>("int64", DataType(DataType::DE_INT64), TensorImpl::kFlexible, 1);
270 auto num_elements = row_size_ / sizeof(int64_t);
271 TensorShape shape(std::vector<dsize_t>(1, num_elements));
272 std::unique_ptr<IOBlock> blk;
273 auto epoch_start = std::chrono::steady_clock::now();
274 do {
275 RETURN_IF_NOT_OK(io_block_queues_[worker_id]->PopFront(&blk));
276 std::vector<int64_t> keys;
277 RETURN_IF_NOT_OK(blk->GetKeys(&keys));
278 if (keys.empty()) {
279 // empty key is a quit signal for workers
280 break;
281 }
282 // Once we hit resource error, we drain the io block. No point to send anything to the server.
283 if (!resource_err) {
284 for (auto id : keys) {
285 TensorRow row;
286 std::shared_ptr<Tensor> element;
287 RETURN_IF_NOT_OK(Tensor::CreateEmpty(shape, col_desc->type(), &element));
288 row.setId(id);
289 // CreateEmpty allocates the memory but in virtual address. Let's commit the memory
290 // so we can get an accurate timing.
291 auto it = element->begin<int64_t>();
292 for (auto i = 0; i < num_elements; ++i, ++it) {
293 *it = i;
294 }
295 row.push_back(std::move(element));
296 // Measure the time to call WriteBuffer
297 auto start_tick = std::chrono::steady_clock::now();
298 rc = cc_->AsyncWriteRow(std::move(row));
299 auto end_tick = std::chrono::steady_clock::now();
300 if (rc.IsError()) {
301 if (rc == StatusCode::kMDOutOfMemory || rc == StatusCode::kMDNoSpace) {
302 MS_LOG(WARNING) << "Pipeline number " << (my_pipeline_ + 1) << " worker id " << worker_id << ": "
303 << rc.ToString();
304 resource_err = true;
305 cc_->ServerRunningOutOfResources();
306 continue;
307 } else {
308 return rc;
309 }
310 } else {
311 int64_t ms = std::chrono::duration_cast<std::chrono::microseconds>(end_tick - start_tick).count();
312 min_val = std::min(min_val, ms);
313 max_val = std::max(max_val, ms);
314 duration.push_back(ms);
315 total_val += ms;
316 }
317 }
318 }
319 } while (true);
320
321 auto epoch_end = std::chrono::steady_clock::now();
322 int64_t elapse_time = std::chrono::duration_cast<std::chrono::seconds>(epoch_end - epoch_start).count();
323
324 PipelineWorkerEpochSummary proto;
325 proto.set_pipeline(my_pipeline_);
326 proto.set_worker(worker_id);
327 proto.set_min(min_val);
328 proto.set_max(max_val);
329 proto.set_elapse(elapse_time);
330 auto sz = duration.size();
331 proto.set_cnt(sz);
332 if (sz > 0) {
333 // median
334 auto n = sz / 2;
335 std::nth_element(duration.begin(), duration.begin() + n, duration.end());
336 auto median = duration[n];
337 proto.set_med(median);
338 // average
339 int64_t avg = total_val / sz;
340 proto.set_avg(avg);
341 }
342 CachePerfMsg msg;
343 RETURN_IF_NOT_OK(SendMessage(&msg, CachePerfMsg::MessageType::kEpochResult, &proto));
344 return Status::OK();
345 }
346
RunReadEpoch()347 Status CachePipelineRun::RunReadEpoch() {
348 std::vector<row_id_type> keys;
349 auto num_workers = cfg_.num_parallel_workers();
350 keys.reserve(1);
351 // Spawn workers
352 auto f = std::bind(&CachePipelineRun::ReaderWorkerEntry, this, std::placeholders::_1);
353 std::vector<Task *> worker_threads;
354 worker_threads.reserve(num_workers);
355 for (int32_t i = 0; i < num_workers; ++i) {
356 Task *pTask;
357 RETURN_IF_NOT_OK(vg_.CreateAsyncTask("Parallel worker", std::bind(f, i), &pTask));
358 worker_threads.push_back(pTask);
359 }
360
361 std::vector<row_id_type> all_keys;
362 all_keys.reserve(end_row_ - start_row_ + 1);
363 for (auto i = start_row_; i <= end_row_; ++i) {
364 all_keys.push_back((i));
365 }
366 // If we need to shuffle the keys
367 if (shuffle_) {
368 std::shuffle(all_keys.begin(), all_keys.end(), GetRandomDevice());
369 }
370
371 int32_t worker_id = 0;
372 for (auto id : all_keys) {
373 keys.push_back(id);
374 auto blk = std::make_unique<IOBlock>(IOBlock(keys, IOBlock::kDeIoBlockNone));
375 RETURN_IF_NOT_OK(io_block_queues_[worker_id++ % num_workers]->Add(std::move(blk)));
376 keys.clear();
377 }
378 if (!keys.empty()) {
379 auto blk = std::make_unique<IOBlock>(IOBlock(keys, IOBlock::kDeIoBlockNone));
380 RETURN_IF_NOT_OK(io_block_queues_[worker_id++ % num_workers]->Add(std::move(blk)));
381 keys.clear();
382 }
383
384 // Shutdown threads and wait for them to come back.
385 for (int32_t i = 0; i < num_workers; i++) {
386 RETURN_IF_NOT_OK(
387 io_block_queues_[i]->Add(std::make_unique<IOBlock>(std::vector<int64_t>(), IOBlock::kDeIoBlockNone)));
388 }
389 for (auto *pTask : worker_threads) {
390 RETURN_IF_NOT_OK(pTask->Join(Task::WaitFlag::kBlocking));
391 }
392
393 // Send a message saying epoch one done for this pipeline.
394 EpochDone proto;
395 proto.set_pipeline(my_pipeline_);
396 CachePerfMsg msg;
397 RETURN_IF_NOT_OK(SendMessage(&msg, CachePerfMsg::MessageType::kEpochEnd, &proto));
398 return Status::OK();
399 }
400
ReaderWorkerEntry(int32_t worker_id)401 Status CachePipelineRun::ReaderWorkerEntry(int32_t worker_id) {
402 Status rc;
403 TaskManager::FindMe()->Post();
404 int64_t min_val = std::numeric_limits<int64_t>::max();
405 int64_t max_val = 0;
406 int64_t total_val = 0;
407 int64_t cnt = 0;
408 std::vector<int64_t> duration;
409 duration.reserve(num_rows_ / num_pipelines_ / cfg_.num_parallel_workers());
410 std::unique_ptr<IOBlock> blk;
411 auto epoch_start = std::chrono::steady_clock::now();
412 do {
413 RETURN_IF_NOT_OK(io_block_queues_[worker_id]->PopFront(&blk));
414 std::vector<int64_t> keys;
415 RETURN_IF_NOT_OK(blk->GetKeys(&keys));
416 if (keys.empty()) {
417 // empty key is a quit signal for workers
418 break;
419 }
420 std::vector<row_id_type> prefetch_keys;
421 prefetch_keys.reserve(keys.size());
422
423 // Filter out all those keys that unlikely we will find at the server
424 for (auto row_id : keys) {
425 if (!cc_->KeyIsCacheMiss(row_id)) {
426 prefetch_keys.push_back(row_id);
427 }
428 }
429 // Early exit if nothing to fetch
430 if (prefetch_keys.empty()) {
431 continue;
432 }
433 // Get the rows from the server
434 TensorTable ttbl;
435 // Measure how long it takes for the row to come back.
436 auto start_tick = std::chrono::steady_clock::now();
437 RETURN_IF_NOT_OK(cc_->GetRows(prefetch_keys, &ttbl));
438 auto end_tick = std::chrono::steady_clock::now();
439 int64_t ms = std::chrono::duration_cast<std::chrono::microseconds>(end_tick - start_tick).count();
440 min_val = std::min(min_val, ms);
441 max_val = std::max(max_val, ms);
442 duration.push_back(ms);
443 total_val += ms;
444 ++cnt;
445 } while (true);
446
447 auto epoch_end = std::chrono::steady_clock::now();
448 int64_t elapse_time = std::chrono::duration_cast<std::chrono::seconds>(epoch_end - epoch_start).count();
449
450 PipelineWorkerEpochSummary proto;
451 proto.set_pipeline(my_pipeline_);
452 proto.set_worker(worker_id);
453 proto.set_min(min_val);
454 proto.set_max(max_val);
455 proto.set_elapse(elapse_time);
456 auto sz = duration.size();
457 proto.set_cnt(sz);
458 if (sz > 0) {
459 // median
460 auto n = sz / 2;
461 std::nth_element(duration.begin(), duration.begin() + n, duration.end());
462 auto median = duration[n];
463 proto.set_med(median);
464 // average
465 int64_t avg = total_val / sz;
466 proto.set_avg(avg);
467 }
468 CachePerfMsg msg;
469 RETURN_IF_NOT_OK(SendMessage(&msg, CachePerfMsg::MessageType::kEpochResult, &proto));
470 return Status::OK();
471 }
472 } // namespace dataset
473 } // namespace mindspore
474