• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2020-2021 Huawei Technologies Co., Ltd
3 
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7 
8  * http://www.apache.org/licenses/LICENSE-2.0
9 
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15 */
16 #include "minddata/dataset/engine/cache/cache_admin_arg.h"
17 #include <sys/types.h>
18 #include <sys/wait.h>
19 #include <unistd.h>
20 #include <algorithm>
21 #include <cerrno>
22 #include <iomanip>
23 #include <iostream>
24 #include <string>
25 #include <cstdlib>
26 #include <vector>
27 #include "minddata/dataset/engine/cache/cache_request.h"
28 #include "minddata/dataset/engine/cache/cache_client.h"
29 #include "minddata/dataset/engine/cache/cache_server.h"
30 #include "minddata/dataset/engine/cache/cache_ipc.h"
31 #include "minddata/dataset/util/path.h"
32 #include "minddata/dataset/include/dataset/constants.h"
33 
34 namespace mindspore {
35 namespace dataset {
36 const char CacheAdminArgHandler::kServerBinary[] = "cache_server";
37 
CacheAdminArgHandler()38 CacheAdminArgHandler::CacheAdminArgHandler()
39     : command_id_(CommandId::kCmdUnknown),
40       num_workers_(kDefaultNumWorkers),
41       shm_mem_sz_(kDefaultSharedMemorySize),
42       log_level_(kDefaultLogLevel),
43       memory_cap_ratio_(kDefaultMemoryCapRatio),
44       hostname_(kCfgDefaultCacheHost),
45       port_(kCfgDefaultCachePort),
46       spill_dir_("") {
47   std::string env_cache_host = common::GetEnv("MS_CACHE_HOST");
48   std::string env_cache_port = common::GetEnv("MS_CACHE_PORT");
49   if (!env_cache_host.empty()) {
50     hostname_ = env_cache_host;
51   }
52   if (!env_cache_port.empty()) {
53     char *end = nullptr;
54     port_ = static_cast<int32_t>(strtol(env_cache_port.c_str(), &end, kDecimal));
55     if (*end != '\0') {
56       std::cerr << "Cache port from env variable MS_CACHE_PORT is invalid\n";
57       port_ = 0;  // cause the port range validation to generate an error during the validation checks
58     }
59   }
60   std::string env_log_level = common::GetEnv("GLOG_v");
61   if (!env_log_level.empty()) {
62     char *end = nullptr;
63     log_level_ = static_cast<int32_t>(strtol(env_log_level.c_str(), &end, kDecimal));
64     if (*end != '\0') {
65       std::cerr << "Log level from env variable GLOG_v is invalid\n";
66       log_level_ = -1;  // cause the log level range validation to generate an error during the validation checks
67     }
68   }
69   // Initialize the command mappings
70   arg_map_["-h"] = ArgValue::kArgHost;
71   arg_map_["--hostname"] = ArgValue::kArgHost;
72   arg_map_["-p"] = ArgValue::kArgPort;
73   arg_map_["--port"] = ArgValue::kArgPort;
74   arg_map_["--start"] = ArgValue::kArgStart;
75   arg_map_["--stop"] = ArgValue::kArgStop;
76   arg_map_["--help"] = ArgValue::kArgHelp;
77   arg_map_["--generate_session"] = ArgValue::kArgGenerateSession;
78   arg_map_["-g"] = ArgValue::kArgGenerateSession;
79   arg_map_["--destroy_session"] = ArgValue::kArgDestroySession;
80   arg_map_["-d"] = ArgValue::kArgDestroySession;
81   arg_map_["--spilldir"] = ArgValue::kArgSpillDir;
82   arg_map_["-s"] = ArgValue::kArgSpillDir;
83   arg_map_["-w"] = ArgValue::kArgNumWorkers;
84   arg_map_["--workers"] = ArgValue::kArgNumWorkers;
85   arg_map_["-m"] = ArgValue::kArgSharedMemorySize;
86   arg_map_["--shared_memory_size"] = ArgValue::kArgSharedMemorySize;
87   arg_map_["-l"] = ArgValue::kArgLogLevel;
88   arg_map_["--loglevel"] = ArgValue::kArgLogLevel;
89   arg_map_["-r"] = ArgValue::kArgMemoryCapRatio;
90   arg_map_["--memory_cap_ratio"] = ArgValue::kArgMemoryCapRatio;
91   arg_map_["--list_sessions"] = ArgValue::kArgListSessions;
92   arg_map_["--server_info"] = ArgValue::kArgServerInfo;
93   // Initialize argument tracker with false values
94   for (int16_t i = 0; i < static_cast<int16_t>(ArgValue::kArgNumArgs); ++i) {
95     ArgValue currAV = static_cast<ArgValue>(i);
96     used_args_[currAV] = false;
97   }
98 }
99 
100 CacheAdminArgHandler::~CacheAdminArgHandler() = default;
101 
AssignArg(const std::string & option,std::vector<uint32_t> * out_arg,std::stringstream * arg_stream,CommandId command_id)102 Status CacheAdminArgHandler::AssignArg(const std::string &option, std::vector<uint32_t> *out_arg,
103                                        std::stringstream *arg_stream, CommandId command_id) {
104   // Detect if the user tried to provide this argument more than once
105   ArgValue selected_arg = arg_map_[option];
106   if (used_args_[selected_arg]) {
107     std::string err_msg = "The " + option + " argument was given more than once.";
108     return Status(StatusCode::kMDSyntaxError, err_msg);
109   }
110 
111   // Flag that this arg is used now
112   used_args_[selected_arg] = true;
113 
114   // Some options are just arguments, for example "--port 50052" is not a command, it's just a argument.
115   // Other options are actual commands, for example "--destroy_session 1234".  This executes the destroy session.
116   // If this option is also a command, make sure there has not been multiple commands given before assigning it.
117   if (command_id != CommandId::kCmdUnknown) {
118     if (command_id_ != CommandId::kCmdUnknown) {
119       std::string err_msg = "Only one command at a time is allowed.  Invalid command: " + option;
120       return Status(StatusCode::kMDSyntaxError, err_msg);
121     } else {
122       command_id_ = command_id;
123     }
124   }
125 
126   uint32_t value_as_uint;
127   while (arg_stream->rdbuf()->in_avail() != 0) {
128     std::stringstream::pos_type pos = arg_stream->tellg();
129     *arg_stream >> value_as_uint;
130     if (arg_stream->fail()) {
131       arg_stream->clear();
132       (void)arg_stream->seekg(pos, std::ios::beg);
133       break;
134     } else {
135       out_arg->push_back(value_as_uint);
136     }
137   }
138 
139   if (out_arg->empty()) {
140     std::string err_msg = option + " option requires an argument field.  Syntax: " + option + " <field>";
141     return Status(StatusCode::kMDSyntaxError, err_msg);
142   }
143 
144   return Status::OK();
145 }
146 
AssignArg(const std::string & option,int32_t * out_arg,std::stringstream * arg_stream,CommandId command_id)147 Status CacheAdminArgHandler::AssignArg(const std::string &option, int32_t *out_arg, std::stringstream *arg_stream,
148                                        CommandId command_id) {
149   // Detect if the user tried to provide this argument more than once
150   ArgValue selected_arg = arg_map_[option];
151   if (used_args_[selected_arg]) {
152     std::string err_msg = "The " + option + " argument was given more than once.";
153     return Status(StatusCode::kMDSyntaxError, err_msg);
154   }
155 
156   // Flag that this arg is used now
157   used_args_[selected_arg] = true;
158 
159   // Some options are just arguments, for example "--port 50052" is not a command, it's just a argument.
160   // Other options are actual commands, for example "--destroy_session 1234".  This executes the destroy session.
161   // If this option is also a command, make sure there has not been multiple commands given before assigning it.
162   if (command_id != CommandId::kCmdUnknown) {
163     if (command_id_ != CommandId::kCmdUnknown) {
164       std::string err_msg = "Only one command at a time is allowed.  Invalid command: " + option;
165       return Status(StatusCode::kMDSyntaxError, err_msg);
166     } else {
167       command_id_ = command_id;
168     }
169   }
170 
171   std::string value_as_string;
172 
173   // Fetch the argument from the arg stream into a string
174   *arg_stream >> value_as_string;
175   if (value_as_string.empty()) {
176     std::string err_msg = option + " option requires an argument field.  Syntax: " + option + " <field>";
177     return Status(StatusCode::kMDSyntaxError, err_msg);
178   }
179 
180   // Now, attempt to convert the value into it's numeric format for output
181   try {
182     *out_arg = static_cast<int32_t>(std::stoul(value_as_string));
183   } catch (const std::exception &e) {
184     std::string err_msg = "Invalid numeric value: " + value_as_string;
185     return Status(StatusCode::kMDSyntaxError, err_msg);
186   }
187 
188   return Status::OK();
189 }
190 
AssignArg(const std::string & option,std::string * out_arg,std::stringstream * arg_stream,CommandId command_id)191 Status CacheAdminArgHandler::AssignArg(const std::string &option, std::string *out_arg, std::stringstream *arg_stream,
192                                        CommandId command_id) {
193   // Detect if the user tried to provide this argument more than once
194   ArgValue selected_arg = arg_map_[option];
195   if (used_args_[selected_arg]) {
196     std::string err_msg = "The " + option + " argument was given more than once.";
197     return Status(StatusCode::kMDSyntaxError, err_msg);
198   }
199 
200   // Flag that this arg is used now
201   used_args_[selected_arg] = true;
202 
203   // Some options are just arguments, for example "--hostname "127.0.0.1" is not a command, it's just an argument.
204   // Other options are actual commands, for example "--start".
205   // If this option is also a command, make sure there has not been multiple commands given before assigning it.
206   if (command_id != CommandId::kCmdUnknown) {
207     if (command_id_ != CommandId::kCmdUnknown) {
208       std::string err_msg = "Only one command at a time is allowed.  Invalid command: " + option;
209       return Status(StatusCode::kMDSyntaxError, err_msg);
210     } else {
211       command_id_ = command_id;
212     }
213   }
214 
215   // If there is no argument to get, such as the --start command, then out_arg will be a nullptr.
216   if (out_arg != nullptr) {
217     // Fetch the argument from the arg stream into a string
218     if (arg_stream->rdbuf()->in_avail() != 0) {
219       *arg_stream >> *out_arg;
220     } else {
221       std::string err_msg = option + " option requires an argument field.  Syntax: " + option + " <field>";
222       return Status(StatusCode::kMDSyntaxError, err_msg);
223     }
224 
225     if (out_arg->empty()) {
226       std::string err_msg = option + " option requires an argument field.  Syntax: " + option + " <field>";
227       return Status(StatusCode::kMDSyntaxError, err_msg);
228     }
229   }
230 
231   return Status::OK();
232 }
233 
AssignArg(const std::string & option,float * out_arg,std::stringstream * arg_stream,CommandId command_id)234 Status CacheAdminArgHandler::AssignArg(const std::string &option, float *out_arg, std::stringstream *arg_stream,
235                                        CommandId command_id) {
236   // Detect if the user tried to provide this argument more than once
237   ArgValue selected_arg = arg_map_[option];
238   if (used_args_[selected_arg]) {
239     std::string err_msg = "The " + option + " argument was given more than once.";
240     return Status(StatusCode::kMDSyntaxError, err_msg);
241   }
242 
243   // Flag that this arg is used now
244   used_args_[selected_arg] = true;
245 
246   // Some options are just arguments, for example "--hostname "127.0.0.1" is not a command, it's just an argument.
247   // Other options are actual commands, for example "--start".
248   // If this option is also a command, make sure there has not been multiple commands given before assigning it.
249   if (command_id != CommandId::kCmdUnknown) {
250     if (command_id_ != CommandId::kCmdUnknown) {
251       std::string err_msg = "Only one command at a time is allowed.  Invalid command: " + option;
252       return Status(StatusCode::kMDSyntaxError, err_msg);
253     } else {
254       command_id_ = command_id;
255     }
256   }
257 
258   std::string value_as_string;
259 
260   // Fetch the argument from the arg stream into a string
261   *arg_stream >> value_as_string;
262   if (value_as_string.empty()) {
263     std::string err_msg = option + " option requires an argument field.  Syntax: " + option + " <field>";
264     return Status(StatusCode::kMDSyntaxError, err_msg);
265   }
266 
267   // Now, attempt to convert the value into it's string format for output
268   try {
269     *out_arg = std::stof(value_as_string, nullptr);
270   } catch (const std::exception &e) {
271     std::string err_msg = "Invalid numeric value: " + value_as_string;
272     return Status(StatusCode::kMDSyntaxError, err_msg);
273   }
274 
275   return Status::OK();
276 }
277 
ParseArgStream(std::stringstream * arg_stream)278 Status CacheAdminArgHandler::ParseArgStream(std::stringstream *arg_stream) {
279   std::string tok;
280   while (*arg_stream >> tok) {
281     switch (arg_map_[tok]) {
282       case ArgValue::kArgHost: {
283         RETURN_IF_NOT_OK(AssignArg(tok, &hostname_, arg_stream));
284         // Temporary sanity check. We only support localhost for now
285         if (hostname_ != std::string(kCfgDefaultCacheHost)) {
286           std::string err_msg =
287             "Invalid host interface: " + hostname_ + ". Current limitation, only 127.0.0.1 can be used.";
288           return Status(StatusCode::kMDSyntaxError, err_msg);
289         }
290         break;
291       }
292       case ArgValue::kArgPort: {
293         RETURN_IF_NOT_OK(AssignArg(tok, &port_, arg_stream));
294         break;
295       }
296       case ArgValue::kArgStart: {
297         RETURN_IF_NOT_OK(AssignArg(tok, static_cast<std::string *>(nullptr), arg_stream, CommandId::kCmdStart));
298         break;
299       }
300       case ArgValue::kArgStop: {
301         RETURN_IF_NOT_OK(AssignArg(tok, static_cast<std::string *>(nullptr), arg_stream, CommandId::kCmdStop));
302         break;
303       }
304       case ArgValue::kArgGenerateSession: {
305         RETURN_IF_NOT_OK(
306           AssignArg(tok, static_cast<std::string *>(nullptr), arg_stream, CommandId::kCmdGenerateSession));
307         break;
308       }
309       case ArgValue::kArgHelp: {
310         command_id_ = CommandId::kCmdHelp;
311         break;
312       }
313       case ArgValue::kArgDestroySession: {
314         RETURN_IF_NOT_OK(AssignArg(tok, &session_ids_, arg_stream, CommandId::kCmdDestroySession));
315         break;
316       }
317       case ArgValue::kArgNumWorkers: {
318         RETURN_IF_NOT_OK(AssignArg(tok, &num_workers_, arg_stream));
319         break;
320       }
321       case ArgValue::kArgSpillDir: {
322         RETURN_IF_NOT_OK(AssignArg(tok, &spill_dir_, arg_stream));
323         break;
324       }
325       case ArgValue::kArgSharedMemorySize: {
326         RETURN_IF_NOT_OK(AssignArg(tok, &shm_mem_sz_, arg_stream));
327         break;
328       }
329       case ArgValue::kArgLogLevel: {
330         RETURN_IF_NOT_OK(AssignArg(tok, &log_level_, arg_stream));
331         break;
332       }
333       case ArgValue::kArgMemoryCapRatio: {
334         RETURN_IF_NOT_OK(AssignArg(tok, &memory_cap_ratio_, arg_stream));
335         break;
336       }
337       case ArgValue::kArgListSessions: {
338         RETURN_IF_NOT_OK(AssignArg(tok, static_cast<std::string *>(nullptr), arg_stream, CommandId::kCmdListSessions));
339         break;
340       }
341       case ArgValue::kArgServerInfo: {
342         RETURN_IF_NOT_OK(AssignArg(tok, static_cast<std::string *>(nullptr), arg_stream, CommandId::kCmdServerInfo));
343         break;
344       }
345       default: {
346         // Save space delimited trailing arguments
347         trailing_args_ += (" " + tok);
348         break;
349       }
350     }
351   }
352 
353   RETURN_IF_NOT_OK(Validate());
354 
355   return Status::OK();
356 }
357 
Validate()358 Status CacheAdminArgHandler::Validate() {
359   // This sanity check is delayed until now in case there may be valid use-cases of trailing args.
360   // Any unhandled arguments at this point is an error.
361   if (!trailing_args_.empty()) {
362     std::string err_msg = "Invalid arguments provided: " + trailing_args_;
363     err_msg += "\nPlease try `cache_admin --help` for more information";
364     return Status(StatusCode::kMDSyntaxError, err_msg);
365   }
366 
367   // The user must pick at least one command.  i.e. it's meaningless to just give a hostname or port but no command to
368   // run.
369   if (command_id_ == CommandId::kCmdUnknown) {
370     std::string err_msg = "No command provided";
371     err_msg += "\nPlease try `cache_admin --help` for more information";
372     return Status(StatusCode::kMDSyntaxError, err_msg);
373   }
374 
375   // Additional checks here
376   auto max_num_workers = std::max<int32_t>(std::thread::hardware_concurrency(), kMaxNumWorkers);
377   if (used_args_[ArgValue::kArgNumWorkers] && (num_workers_ < 1 || num_workers_ > max_num_workers)) {
378     // Check the value of num_workers only if it's provided by users.
379     return Status(StatusCode::kMDSyntaxError,
380                   "Number of workers must be in range of 1 and " + std::to_string(max_num_workers) + ".");
381   }
382 
383   if (log_level_ < MsLogLevel::kDebug || log_level_ > MsLogLevel::kException) {
384     return Status(StatusCode::kMDSyntaxError, "Log level must be in range (0..4).");
385   }
386 
387   if (memory_cap_ratio_ <= 0 || memory_cap_ratio_ > 1) {
388     return Status(StatusCode::kMDSyntaxError, "Memory cap ratio should be positive and no greater than 1");
389   }
390 
391   if (port_ < kMinLegalPort || port_ > kMaxLegalPort) {
392     return Status(StatusCode::kMDSyntaxError, "Port must be in range (1025..65535).");
393   }
394 
395   return Status::OK();
396 }
397 
RunCommand()398 Status CacheAdminArgHandler::RunCommand() {
399   switch (command_id_) {
400     case CommandId::kCmdHelp: {
401       Help();
402       break;
403     }
404     case CommandId::kCmdStart: {
405       RETURN_IF_NOT_OK(StartServer());
406       break;
407     }
408     case CommandId::kCmdStop: {
409       RETURN_IF_NOT_OK(StopServer());
410       break;
411     }
412     case CommandId::kCmdGenerateSession: {
413       CacheClientGreeter comm(hostname_, port_, 1);
414       RETURN_IF_NOT_OK(comm.ServiceStart());
415       auto rq = std::make_shared<GenerateSessionIdRequest>();
416       RETURN_IF_NOT_OK(comm.HandleRequest(rq));
417       RETURN_IF_NOT_OK(rq->Wait());
418       std::cout << "Session created for server on port " << std::to_string(port_) << ": " << rq->GetSessionId()
419                 << std::endl;
420       break;
421     }
422     case CommandId::kCmdDestroySession: {
423       CacheClientGreeter comm(hostname_, port_, 1);
424       RETURN_IF_NOT_OK(comm.ServiceStart());
425       CacheClientInfo cinfo;
426       for (session_id_type id : session_ids_) {
427         cinfo.set_session_id(id);
428         auto rq = std::make_shared<DropSessionRequest>(cinfo);
429         RETURN_IF_NOT_OK(comm.HandleRequest(rq));
430         RETURN_IF_NOT_OK(rq->Wait());
431         std::cout << "Drop session " << id << " successfully for server on port " << std::to_string(port_) << std::endl;
432       }
433       break;
434     }
435     case CommandId::kCmdListSessions: {
436       CacheClientGreeter comm(hostname_, port_, 1);
437       RETURN_IF_NOT_OK(comm.ServiceStart());
438       auto rq = std::make_shared<ListSessionsRequest>();
439       std::cout << "Listing sessions for server on port " << port_ << "\n" << std::endl;
440       RETURN_IF_NOT_OK(comm.HandleRequest(rq));
441       RETURN_IF_NOT_OK(rq->Wait());
442       std::vector<SessionCacheInfo> session_info = rq->GetSessionCacheInfo();
443       if (!session_info.empty()) {
444         std::cout << std::setw(12) << "Session" << std::setw(12) << "Cache Id" << std::setw(12) << "Mem cached"
445                   << std::setw(12) << "Disk cached" << std::setw(16) << "Avg cache size" << std::setw(10) << "Numa hit"
446                   << std::endl;
447         for (auto curr_session : session_info) {
448           std::string cache_id;
449           std::string stat_mem_cached;
450           std::string stat_disk_cached;
451           std::string stat_avg_cached;
452           std::string stat_numa_hit;
453           uint32_t crc = (curr_session.connection_id & 0x00000000FFFFFFFF);
454           cache_id = (curr_session.connection_id == 0) ? "n/a" : std::to_string(crc);
455           stat_mem_cached =
456             (curr_session.stats.num_mem_cached == 0) ? "n/a" : std::to_string(curr_session.stats.num_mem_cached);
457           stat_disk_cached =
458             (curr_session.stats.num_disk_cached == 0) ? "n/a" : std::to_string(curr_session.stats.num_disk_cached);
459           stat_avg_cached =
460             (curr_session.stats.avg_cache_sz == 0) ? "n/a" : std::to_string(curr_session.stats.avg_cache_sz);
461           stat_numa_hit =
462             (curr_session.stats.num_numa_hit == 0) ? "n/a" : std::to_string(curr_session.stats.num_numa_hit);
463 
464           std::cout << std::setw(12) << curr_session.session_id << std::setw(12) << cache_id << std::setw(12)
465                     << stat_mem_cached << std::setw(12) << stat_disk_cached << std::setw(16) << stat_avg_cached
466                     << std::setw(10) << stat_numa_hit << std::endl;
467         }
468       } else {
469         std::cout << "No active sessions." << std::endl;
470       }
471       break;
472     }
473     case CommandId::kCmdServerInfo: {
474       RETURN_IF_NOT_OK(ShowServerInfo());
475       break;
476     }
477     default: {
478       RETURN_STATUS_UNEXPECTED("Invalid cache admin command id.");
479       break;
480     }
481   }
482 
483   return Status::OK();
484 }
485 
ShowServerInfo()486 Status CacheAdminArgHandler::ShowServerInfo() {
487   CacheClientGreeter comm(hostname_, port_, 1);
488   RETURN_IF_NOT_OK(comm.ServiceStart());
489   auto rq = std::make_shared<ListSessionsRequest>();
490   RETURN_IF_NOT_OK(comm.HandleRequest(rq));
491   RETURN_IF_NOT_OK(rq->Wait());
492 
493   auto session_ids = rq->GetSessionIds();
494   auto server_cfg_info = rq->GetServerStat();
495   int32_t num_workers = server_cfg_info.num_workers;
496   int8_t log_level = server_cfg_info.log_level;
497   std::string spill_dir = server_cfg_info.spill_dir;
498   if (spill_dir.empty()) {
499     spill_dir = "None";
500   }
501 
502   int name_w = 20;
503   int value_w = 50;
504   std::cout << "Cache Server Configuration: " << std::endl;
505   std::cout << std::string(name_w + value_w, '-') << std::endl;
506   std::cout << std::left << std::setw(name_w) << "config name" << std::setw(value_w) << "value" << std::endl;
507   std::cout << std::string(name_w + value_w, '-') << std::endl;
508   std::cout << std::left << std::setw(name_w) << "hostname" << std::setw(value_w) << hostname_ << std::endl;
509   std::cout << std::left << std::setw(name_w) << "port" << std::setw(value_w) << port_ << std::endl;
510   std::cout << std::left << std::setw(name_w) << "number of workers" << std::setw(value_w)
511             << std::to_string(num_workers) << std::endl;
512   std::cout << std::left << std::setw(name_w) << "log level" << std::setw(value_w) << std::to_string(log_level)
513             << std::endl;
514   std::cout << std::left << std::setw(name_w) << "spill dir" << std::setw(value_w) << spill_dir << std::endl;
515   std::cout << std::string(name_w + value_w, '-') << std::endl;
516 
517   std::cout << "Active sessions: " << std::endl;
518   if (!session_ids.empty()) {
519     for (auto session_id : session_ids) {
520       std::cout << session_id << "  ";
521     }
522     std::cout << std::endl << "(Please use 'cache_admin --list_sessions' to get detailed info of sessions.)\n";
523   } else {
524     std::cout << "No active sessions." << std::endl;
525   }
526   return Status::OK();
527 }
528 
StopServer()529 Status CacheAdminArgHandler::StopServer() {
530   CacheClientGreeter comm(hostname_, port_, 1);
531   RETURN_IF_NOT_OK(comm.ServiceStart());
532   SharedMessage msg;
533   RETURN_IF_NOT_OK(msg.Create());
534   auto rq = std::make_shared<ServerStopRequest>(msg.GetMsgQueueId());
535   RETURN_IF_NOT_OK(comm.HandleRequest(rq));
536   Status rc = rq->Wait();
537   if (rc.IsError()) {
538     msg.RemoveResourcesOnExit();
539     if (rc == StatusCode::kMDNetWorkError) {
540       std::string errMsg =
541         "Server on port " + std::to_string(port_) + " is not reachable or has been shutdown already.";
542       return Status(StatusCode::kMDNetWorkError, errMsg);
543     }
544     return rc;
545   }
546   // OK return code only means the server acknowledge our request but we still
547   // have to wait for its complete shutdown because the server will shutdown
548   // the comm layer as soon as the request is received, and we need to wait
549   // on the message queue instead.
550   // The server will send a message back and remove the queue and we will then wake up. But on the safe
551   // side, we will also set up an alarm and kill this process if we hang on
552   // the message queue.
553   (void)alarm(kAlarmDeadline);
554   Status dummy_rc;
555   (void)msg.ReceiveStatus(&dummy_rc);
556   std::cout << "Cache server on port " << std::to_string(port_) << " has been stopped successfully." << std::endl;
557   return Status::OK();
558 }
559 
StartServer()560 Status CacheAdminArgHandler::StartServer() {
561   // There currently does not exist any "install path" or method to identify which path the installed binaries will
562   // exist in. As a temporary approach, we will assume that the server binary shall exist in the same path as the
563   // cache_admin binary (this process).
564   const std::string self_proc = "/proc/self/exe";
565   std::string canonical_path;
566   canonical_path.resize(400);  // PATH_MAX is large. This value should be big enough for our use.
567   // Some lower level OS library calls are needed here to determine the binary path.
568   // Fetch the path of this binary for admin_cache into C character array and then truncate off the binary name so that
569   // we are left with only the absolute path
570   if (realpath(self_proc.data(), canonical_path.data()) == nullptr) {
571     std::string err_msg = "Failed to identify cache admin binary path: " + std::to_string(errno);
572     RETURN_STATUS_UNEXPECTED(err_msg);
573   }
574   canonical_path.resize(strlen(canonical_path.data()));
575   uint64_t last_separator = canonical_path.find_last_of('/');
576   CHECK_FAIL_RETURN_UNEXPECTED(last_separator != std::string::npos, "No / found");
577   // truncate the binary name so we are left with the absolute path of cache_admin binary
578   canonical_path.resize(last_separator + 1);
579   std::string cache_server_binary = canonical_path + std::string(kServerBinary);
580 
581   // Create a pipe before we fork. If all goes well, the child will run as a daemon in the background
582   // and never returns until shutdown. If there is any error, the child will notify us through the pipe.
583   int fd[2];
584   if (pipe(fd) == -1) {
585     std::string err_msg = "Failed to create a pipe for communication " + std::to_string(errno);
586     RETURN_STATUS_UNEXPECTED(err_msg);
587   }
588 
589   // fork the child process to become the daemon
590   pid_t pid = fork();
591   // failed to fork
592   if (pid < 0) {
593     std::string err_msg = "Failed to fork process for cache server: " + std::to_string(errno);
594     RETURN_STATUS_UNEXPECTED(err_msg);
595   } else if (pid > 0) {
596     // As a parent, we close the write end. We only listen.
597     close(fd[1]);
598     (void)dup2(fd[0], STDIN_FILENO);
599     close(fd[0]);
600     std::string msg;
601     std::string buf;
602     const uint32_t buf_sz = 1024;
603     buf.resize(buf_sz);
604     auto n = read(0, buf.data(), buf_sz);
605     // keep reading until we drain the pipe
606     while (n > 0) {
607       msg += buf.substr(0, n);
608       n = read(0, buf.data(), buf_sz);
609     }
610     if (n < 0) {
611       std::string err_msg = "Failed to read from pipeline " + std::to_string(errno);
612       RETURN_STATUS_UNEXPECTED(err_msg);
613     }
614 
615     int status;
616     if (waitpid(pid, &status, 0) == -1) {
617       RETURN_STATUS_UNEXPECTED("waitpid fails. errno = " + std::to_string(errno));
618     }
619     if (WIFEXITED(status)) {
620       auto exit_status = WEXITSTATUS(status);
621       if (exit_status) {
622         return Status(StatusCode::kMDUnexpectedError, msg);
623       } else {
624         // Not an error, some info message goes to stdout
625         std::cout << msg;
626       }
627     }
628     return Status::OK();
629   } else {
630     // Child here ...
631     // Close all stdin, redirect stdout and stderr to the write end of the pipe.
632     (void)close(fd[0]);
633     (void)dup2(fd[1], STDOUT_FILENO);
634     (void)dup2(fd[1], STDERR_FILENO);
635     (void)close(STDIN_FILENO);
636     (void)close(fd[1]);
637     // exec the cache server binary in this process
638     // If the user did not provide the value of num_workers, we pass -1 to cache server to allow it assign the default.
639     // So that the server knows if the number is provided by users or by default.
640     std::string workers_string = used_args_[ArgValue::kArgNumWorkers] ? std::to_string(num_workers_) : "-1";
641     std::string port_string = std::to_string(port_);
642     std::string shared_memory_string = std::to_string(shm_mem_sz_);
643     std::string minloglevel_string = std::to_string(log_level_);
644     std::string daemonize_string = "true";
645     std::string memory_cap_ratio_string = std::to_string(memory_cap_ratio_);
646 
647     char *argv[9];
648     argv[0] = cache_server_binary.data();
649     argv[1] = spill_dir_.data();
650     argv[2] = workers_string.data();
651     argv[3] = port_string.data();
652     argv[4] = shared_memory_string.data();
653     argv[5] = minloglevel_string.data();
654     argv[6] = daemonize_string.data();
655     argv[7] = memory_cap_ratio_string.data();
656     argv[8] = nullptr;
657 
658     // Now exec the binary
659     execv(cache_server_binary.data(), argv);
660     // If the exec was successful, this line will never be reached due to process image being replaced.
661     // ..unless exec failed.
662     std::string err_msg = "Failed to exec cache server: " + cache_server_binary;
663     std::cerr << err_msg << std::endl;
664     RETURN_STATUS_UNEXPECTED(err_msg);
665   }
666 }
667 
Help()668 void CacheAdminArgHandler::Help() {
669   std::cerr << "Syntax:\n";
670   std::cerr << "cache_admin [--start | --stop]\n";
671   std::cerr << "                [[-h | --hostname] <hostname>]            Default is " << kCfgDefaultCacheHost << ".\n";
672   std::cerr << "                [[-p | --port] <port number>]             Default is " << kCfgDefaultCachePort << ".\n";
673   std::cerr << "                [[-w | --workers] <number of workers>]    Default is " << kDefaultNumWorkers << ".\n";
674   std::cerr << "                [[-s | --spilldir] <spilling directory>]  Default is no spilling.\n";
675   std::cerr << "                [[-l | --loglevel] <log level>]           Default is 1 (INFO level).\n";
676   std::cerr << "            [--destroy_session  | -d] <session id>\n";
677   std::cerr << "                [[-p | --port] <port number>]\n";
678   std::cerr << "            [--generate_session | -g]\n";
679   std::cerr << "                [[-p | --port] <port number>]\n";
680   std::cerr << "            [--list_sessions]\n";
681   std::cerr << "                [[-p | --port] <port number>]\n";
682   std::cerr << "            [--server_info]\n";
683   std::cerr << "                [[-p | --port] <port number>]\n";
684   std::cerr << "            [--help]" << std::endl;
685   // Do not expose these option to the user via help or documentation, but the options do exist to aid with
686   // development and tuning.
687   // [ [-m | --shared_memory_size] <shared memory size> ]
688   //    Default is: kDefaultSharedMemorySizeInGB (Gb in unit)
689   // [ [-r | --memory_cap_ratio] <float percent value>]
690   //    Default is kMemoryCapRatio
691 }
692 }  // namespace dataset
693 }  // namespace mindspore
694