• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2020-2021 Huawei Technologies Co., Ltd
3 
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7 
8  * http://www.apache.org/licenses/LICENSE-2.0
9 
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15 */
16 
17 #include "minddata/dataset/engine/cache/perf/cache_pipeline_run.h"
18 
19 #include "minddata/dataset/core/tensor.h"
20 #include "minddata/dataset/engine/data_schema.h"
21 #include "minddata/dataset/util/random.h"
22 #include "minddata/dataset/util/services.h"
23 
24 namespace mindspore {
25 namespace dataset {
PrintHelp()26 void CachePipelineRun::PrintHelp() { std::cout << "Please run the executable cache_perf instead." << std::endl; }
27 
ProcessPipelineArgs(char * argv)28 int32_t CachePipelineRun::ProcessPipelineArgs(char *argv) {
29   try {
30     std::stringstream cfg_ss(argv);
31     std::string s;
32     int32_t numArgs = 0;
33     while (std::getline(cfg_ss, s, ',')) {
34       if (numArgs == 0) {
35         my_pipeline_ = std::stoi(s);
36       } else if (numArgs == 1) {
37         session_ = std::stoul(s);
38         cache_builder_.SetSessionId(session_);
39       } else if (numArgs == 2) {
40         crc_ = std::stoi(s);
41       } else if (numArgs == 3) {
42         recv_id_ = std::stoi(s);
43       } else if (numArgs == 4) {
44         send_id_ = std::stoi(s);
45       } else if (numArgs == 5) {
46         num_pipelines_ = std::stoi(s);
47       } else if (numArgs == 6) {
48         num_epoches_ = std::stoi(s);
49       } else if (numArgs == 7) {
50         num_rows_ = std::stol(s);
51       } else if (numArgs == 8) {
52         row_size_ = std::stoi(s);
53       } else if (numArgs == 9) {
54         cfg_.set_num_parallel_workers(std::stol(s));
55       } else if (numArgs == 10) {
56         shuffle_ = strcmp(s.data(), "true") == 0;
57       }
58       ++numArgs;
59     }
60     if (numArgs != 11) {
61       std::cerr << "Incomplete arguments. Expect 11. But get " << numArgs << std::endl;
62       return -1;
63     }
64   } catch (const std::exception &e) {
65     std::cerr << "Parse error: " << e.what() << std::endl;
66     return -1;
67   }
68   return 0;
69 }
70 
ProcessClientArgs(char * argv)71 int32_t CachePipelineRun::ProcessClientArgs(char *argv) {
72   try {
73     std::stringstream client_ss(argv);
74     std::string s;
75     int32_t numArgs = 0;
76     while (std::getline(client_ss, s, ',')) {
77       if (numArgs == 0) {
78         cache_builder_.SetHostname(s);
79       } else if (numArgs == 1) {
80         cache_builder_.SetPort(std::stoi(s));
81       } else if (numArgs == 2) {
82         cache_builder_.SetPrefetchSize(std::stoi(s));
83       } else if (numArgs == 3) {
84         cache_builder_.SetCacheMemSz(std::stoi(s));
85       } else if (numArgs == 4) {
86         cache_builder_.SetNumConnections(std::stoi(s));
87       } else if (numArgs == 5) {
88         cache_builder_.SetSpill(strcmp(s.data(), "true") == 0);
89       }
90       ++numArgs;
91     }
92     if (numArgs != 6) {
93       std::cerr << "Incomplete arguments. Expect 6. But get " << numArgs << std::endl;
94       return -1;
95     }
96   } catch (const std::exception &e) {
97     std::cerr << "Parse error: " << e.what() << std::endl;
98     return -1;
99   }
100   return 0;
101 }
102 
ProcessArgs(int argc,char ** argv)103 int32_t CachePipelineRun::ProcessArgs(int argc, char **argv) {
104   if (argc != 3) {
105     PrintHelp();
106     return -1;
107   }
108   int32_t rc = ProcessPipelineArgs(argv[1]);
109   if (rc < 0) return rc;
110   rc = ProcessClientArgs(argv[2]);
111   return rc;
112 }
113 
CachePipelineRun()114 CachePipelineRun::CachePipelineRun()
115     : my_pipeline_(-1),
116       num_pipelines_(kDftNumOfPipelines),
117       num_epoches_(kDftNumberOfEpochs),
118       num_rows_(0),
119       row_size_(0),
120       shuffle_(kDftShuffle),
121       session_(0),
122       crc_(0),
123       send_id_(-1),
124       recv_id_(-1),
125       start_row_(-1),
126       end_row_(-1) {
127   cache_builder_.SetSpill(kDftSpill).SetCacheMemSz(kDftCacheSize);
128 }
129 
~CachePipelineRun()130 CachePipelineRun::~CachePipelineRun() {
131   CachePerfMsg msg;
132   (void)SendMessage<ErrorMsg>(&msg, CachePerfMsg::MessageType::kInterrupt, nullptr);
133 }
134 
ListenToParent()135 Status CachePipelineRun::ListenToParent() {
136   TaskManager::FindMe()->Post();
137   do {
138     RETURN_IF_INTERRUPTED();
139     CachePerfMsg msg;
140     RETURN_IF_NOT_OK(msg.Receive(recv_id_));
141     // Decode the messages.
142     auto type = msg.GetType();
143     switch (type) {
144       case CachePerfMsg::MessageType::kInterrupt: {
145         TaskManager::WakeUpWatchDog();
146         return Status::OK();
147       }
148       case CachePerfMsg::MessageType::kEpochStart: {
149         pipeline_wp_.Set();
150         break;
151       }
152       default:
153         std::string errMsg = "Unknown request type: " + std::to_string(type);
154         MS_LOG(ERROR) << errMsg;
155         RETURN_STATUS_UNEXPECTED(errMsg);
156         break;
157     }
158   } while (true);
159 
160   return Status::OK();
161 }
162 
Run()163 Status CachePipelineRun::Run() {
164   RETURN_IF_NOT_OK(cache_builder_.Build(&cc_));
165   RETURN_IF_NOT_OK(vg_.ServiceStart());
166 
167   auto num_workers = cfg_.num_parallel_workers();
168   io_block_queues_.Init(num_workers, cfg_.op_connector_size());
169 
170   RETURN_IF_NOT_OK(io_block_queues_.Register(&vg_));
171 
172   Status rc = cc_->CreateCache(crc_, false);
173   // Duplicate key is fine.
174   if (rc.IsError() && rc != StatusCode::kMDDuplicateKey) {
175     return rc;
176   }
177 
178   // Log a warning level message so we can see it in the log file when it starts.
179   MS_LOG(WARNING) << "Pipeline number " << (my_pipeline_ + 1) << " successfully creating cache service." << std::endl;
180 
181   // Spawn a thread to listen to the parent process
182   RETURN_IF_NOT_OK(vg_.CreateAsyncTask("Queue listener", std::bind(&CachePipelineRun::ListenToParent, this)));
183 
184   RETURN_IF_NOT_OK(RunFirstEpoch());
185 
186   // The rest of the epochs are just fetching.
187   auto remaining_epochs = num_epoches_ - 1;
188   while (remaining_epochs > 0) {
189     // Wait for parent process signal to start
190     pipeline_wp_.Wait();
191     pipeline_wp_.Clear();
192     RETURN_IF_NOT_OK(RunReadEpoch());
193     --remaining_epochs;
194   }
195 
196   // The listener thread is blocked on a shared message queue. It will be waken up by
197   // the parent process which will send an interrupt message to it, and this program
198   // will exit.
199   RETURN_IF_NOT_OK(vg_.join_all());
200   return Status::OK();
201 }
202 
RunFirstEpoch()203 Status CachePipelineRun::RunFirstEpoch() {
204   auto sz = num_rows_ / num_pipelines_;
205   start_row_ = my_pipeline_ * sz;
206   end_row_ = (my_pipeline_ + 1) * sz - 1;
207   if (my_pipeline_ + 1 == num_pipelines_) {
208     end_row_ = num_rows_ - 1;
209   }
210   std::cout << "Pipeline number " << (my_pipeline_ + 1) << " row id range: [" << start_row_ << "," << end_row_ << "]"
211             << std::endl;
212 
213   // Spawn the worker threads.
214   auto f = std::bind(&CachePipelineRun::WriterWorkerEntry, this, std::placeholders::_1);
215   std::vector<Task *> worker_threads;
216   auto num_workers = cfg_.num_parallel_workers();
217   worker_threads.reserve(num_workers);
218   for (int32_t i = 0; i < num_workers; ++i) {
219     Task *pTask;
220     RETURN_IF_NOT_OK(vg_.CreateAsyncTask("Parallel worker", std::bind(f, i), &pTask));
221     worker_threads.push_back(pTask);
222   }
223 
224   std::vector<row_id_type> keys;
225   keys.reserve(1);
226   int32_t worker_id = 0;
227   for (auto i = start_row_; i <= end_row_; ++i) {
228     keys.push_back(i);
229     auto blk = std::make_unique<IOBlock>(IOBlock(keys, IOBlock::kDeIoBlockNone));
230     RETURN_IF_NOT_OK(io_block_queues_[worker_id++ % num_workers]->Add(std::move(blk)));
231     keys.clear();
232   }
233   if (!keys.empty()) {
234     auto blk = std::make_unique<IOBlock>(IOBlock(keys, IOBlock::kDeIoBlockNone));
235     RETURN_IF_NOT_OK(io_block_queues_[worker_id++ % num_workers]->Add(std::move(blk)));
236     keys.clear();
237   }
238 
239   // Shutdown threads and wait for them to come back.
240   for (int32_t i = 0; i < num_workers; i++) {
241     RETURN_IF_NOT_OK(
242       io_block_queues_[i]->Add(std::make_unique<IOBlock>(std::vector<int64_t>(), IOBlock::kDeIoBlockNone)));
243   }
244   for (auto *pTask : worker_threads) {
245     RETURN_IF_NOT_OK(pTask->Join(Task::WaitFlag::kBlocking));
246   }
247 
248   // Final flush
249   cc_->FlushAsyncWriteBuffer();
250 
251   // Send a message saying epoch one done for this pipeline.
252   EpochDone proto;
253   proto.set_pipeline(my_pipeline_);
254   CachePerfMsg msg;
255   RETURN_IF_NOT_OK(SendMessage(&msg, CachePerfMsg::MessageType::kEpochEnd, &proto));
256 
257   return Status::OK();
258 }
259 
WriterWorkerEntry(int32_t worker_id)260 Status CachePipelineRun::WriterWorkerEntry(int32_t worker_id) {
261   Status rc;
262   TaskManager::FindMe()->Post();
263   int64_t min_val = std::numeric_limits<int64_t>::max();
264   int64_t max_val = 0;
265   int64_t total_val = 0;
266   std::vector<int64_t> duration;
267   duration.reserve(num_rows_ / num_pipelines_ / cfg_.num_parallel_workers());
268   bool resource_err = false;
269   auto col_desc = std::make_unique<ColDescriptor>("int64", DataType(DataType::DE_INT64), TensorImpl::kFlexible, 1);
270   auto num_elements = row_size_ / sizeof(int64_t);
271   TensorShape shape(std::vector<dsize_t>(1, num_elements));
272   std::unique_ptr<IOBlock> blk;
273   auto epoch_start = std::chrono::steady_clock::now();
274   do {
275     RETURN_IF_NOT_OK(io_block_queues_[worker_id]->PopFront(&blk));
276     std::vector<int64_t> keys;
277     RETURN_IF_NOT_OK(blk->GetKeys(&keys));
278     if (keys.empty()) {
279       // empty key is a quit signal for workers
280       break;
281     }
282     // Once we hit resource error, we drain the io block. No point to send anything to the server.
283     if (!resource_err) {
284       for (auto id : keys) {
285         TensorRow row;
286         std::shared_ptr<Tensor> element;
287         RETURN_IF_NOT_OK(Tensor::CreateEmpty(shape, col_desc->type(), &element));
288         row.setId(id);
289         // CreateEmpty allocates the memory but in virtual address. Let's commit the memory
290         // so we can get an accurate timing.
291         auto it = element->begin<int64_t>();
292         for (auto i = 0; i < num_elements; ++i, ++it) {
293           *it = i;
294         }
295         row.push_back(std::move(element));
296         // Measure the time to call WriteBuffer
297         auto start_tick = std::chrono::steady_clock::now();
298         rc = cc_->AsyncWriteRow(std::move(row));
299         auto end_tick = std::chrono::steady_clock::now();
300         if (rc.IsError()) {
301           if (rc == StatusCode::kMDOutOfMemory || rc == StatusCode::kMDNoSpace) {
302             MS_LOG(WARNING) << "Pipeline number " << (my_pipeline_ + 1) << " worker id " << worker_id << ": "
303                             << rc.ToString();
304             resource_err = true;
305             cc_->ServerRunningOutOfResources();
306             continue;
307           } else {
308             return rc;
309           }
310         } else {
311           int64_t ms = std::chrono::duration_cast<std::chrono::microseconds>(end_tick - start_tick).count();
312           min_val = std::min(min_val, ms);
313           max_val = std::max(max_val, ms);
314           duration.push_back(ms);
315           total_val += ms;
316         }
317       }
318     }
319   } while (true);
320 
321   auto epoch_end = std::chrono::steady_clock::now();
322   int64_t elapse_time = std::chrono::duration_cast<std::chrono::seconds>(epoch_end - epoch_start).count();
323 
324   PipelineWorkerEpochSummary proto;
325   proto.set_pipeline(my_pipeline_);
326   proto.set_worker(worker_id);
327   proto.set_min(min_val);
328   proto.set_max(max_val);
329   proto.set_elapse(elapse_time);
330   auto sz = duration.size();
331   proto.set_cnt(sz);
332   if (sz > 0) {
333     // median
334     auto n = sz / 2;
335     std::nth_element(duration.begin(), duration.begin() + n, duration.end());
336     auto median = duration[n];
337     proto.set_med(median);
338     // average
339     int64_t avg = total_val / sz;
340     proto.set_avg(avg);
341   }
342   CachePerfMsg msg;
343   RETURN_IF_NOT_OK(SendMessage(&msg, CachePerfMsg::MessageType::kEpochResult, &proto));
344   return Status::OK();
345 }
346 
RunReadEpoch()347 Status CachePipelineRun::RunReadEpoch() {
348   std::vector<row_id_type> keys;
349   auto num_workers = cfg_.num_parallel_workers();
350   keys.reserve(1);
351   // Spawn workers
352   auto f = std::bind(&CachePipelineRun::ReaderWorkerEntry, this, std::placeholders::_1);
353   std::vector<Task *> worker_threads;
354   worker_threads.reserve(num_workers);
355   for (int32_t i = 0; i < num_workers; ++i) {
356     Task *pTask;
357     RETURN_IF_NOT_OK(vg_.CreateAsyncTask("Parallel worker", std::bind(f, i), &pTask));
358     worker_threads.push_back(pTask);
359   }
360 
361   std::vector<row_id_type> all_keys;
362   all_keys.reserve(end_row_ - start_row_ + 1);
363   for (auto i = start_row_; i <= end_row_; ++i) {
364     all_keys.push_back((i));
365   }
366   // If we need to shuffle the keys
367   if (shuffle_) {
368     std::shuffle(all_keys.begin(), all_keys.end(), GetRandomDevice());
369   }
370 
371   int32_t worker_id = 0;
372   for (auto id : all_keys) {
373     keys.push_back(id);
374     auto blk = std::make_unique<IOBlock>(IOBlock(keys, IOBlock::kDeIoBlockNone));
375     RETURN_IF_NOT_OK(io_block_queues_[worker_id++ % num_workers]->Add(std::move(blk)));
376     keys.clear();
377   }
378   if (!keys.empty()) {
379     auto blk = std::make_unique<IOBlock>(IOBlock(keys, IOBlock::kDeIoBlockNone));
380     RETURN_IF_NOT_OK(io_block_queues_[worker_id++ % num_workers]->Add(std::move(blk)));
381     keys.clear();
382   }
383 
384   // Shutdown threads and wait for them to come back.
385   for (int32_t i = 0; i < num_workers; i++) {
386     RETURN_IF_NOT_OK(
387       io_block_queues_[i]->Add(std::make_unique<IOBlock>(std::vector<int64_t>(), IOBlock::kDeIoBlockNone)));
388   }
389   for (auto *pTask : worker_threads) {
390     RETURN_IF_NOT_OK(pTask->Join(Task::WaitFlag::kBlocking));
391   }
392 
393   // Send a message saying epoch one done for this pipeline.
394   EpochDone proto;
395   proto.set_pipeline(my_pipeline_);
396   CachePerfMsg msg;
397   RETURN_IF_NOT_OK(SendMessage(&msg, CachePerfMsg::MessageType::kEpochEnd, &proto));
398   return Status::OK();
399 }
400 
ReaderWorkerEntry(int32_t worker_id)401 Status CachePipelineRun::ReaderWorkerEntry(int32_t worker_id) {
402   Status rc;
403   TaskManager::FindMe()->Post();
404   int64_t min_val = std::numeric_limits<int64_t>::max();
405   int64_t max_val = 0;
406   int64_t total_val = 0;
407   int64_t cnt = 0;
408   std::vector<int64_t> duration;
409   duration.reserve(num_rows_ / num_pipelines_ / cfg_.num_parallel_workers());
410   std::unique_ptr<IOBlock> blk;
411   auto epoch_start = std::chrono::steady_clock::now();
412   do {
413     RETURN_IF_NOT_OK(io_block_queues_[worker_id]->PopFront(&blk));
414     std::vector<int64_t> keys;
415     RETURN_IF_NOT_OK(blk->GetKeys(&keys));
416     if (keys.empty()) {
417       // empty key is a quit signal for workers
418       break;
419     }
420     std::vector<row_id_type> prefetch_keys;
421     prefetch_keys.reserve(keys.size());
422 
423     // Filter out all those keys that unlikely we will find at the server
424     for (auto row_id : keys) {
425       if (!cc_->KeyIsCacheMiss(row_id)) {
426         prefetch_keys.push_back(row_id);
427       }
428     }
429     // Early exit if nothing to fetch
430     if (prefetch_keys.empty()) {
431       continue;
432     }
433     // Get the rows from the server
434     TensorTable ttbl;
435     // Measure how long it takes for the row to come back.
436     auto start_tick = std::chrono::steady_clock::now();
437     RETURN_IF_NOT_OK(cc_->GetRows(prefetch_keys, &ttbl));
438     auto end_tick = std::chrono::steady_clock::now();
439     int64_t ms = std::chrono::duration_cast<std::chrono::microseconds>(end_tick - start_tick).count();
440     min_val = std::min(min_val, ms);
441     max_val = std::max(max_val, ms);
442     duration.push_back(ms);
443     total_val += ms;
444     ++cnt;
445   } while (true);
446 
447   auto epoch_end = std::chrono::steady_clock::now();
448   int64_t elapse_time = std::chrono::duration_cast<std::chrono::seconds>(epoch_end - epoch_start).count();
449 
450   PipelineWorkerEpochSummary proto;
451   proto.set_pipeline(my_pipeline_);
452   proto.set_worker(worker_id);
453   proto.set_min(min_val);
454   proto.set_max(max_val);
455   proto.set_elapse(elapse_time);
456   auto sz = duration.size();
457   proto.set_cnt(sz);
458   if (sz > 0) {
459     // median
460     auto n = sz / 2;
461     std::nth_element(duration.begin(), duration.begin() + n, duration.end());
462     auto median = duration[n];
463     proto.set_med(median);
464     // average
465     int64_t avg = total_val / sz;
466     proto.set_avg(avg);
467   }
468   CachePerfMsg msg;
469   RETURN_IF_NOT_OK(SendMessage(&msg, CachePerfMsg::MessageType::kEpochResult, &proto));
470   return Status::OK();
471 }
472 }  // namespace dataset
473 }  // namespace mindspore
474