1 /**
2 * Copyright 2020-2021 Huawei Technologies Co., Ltd
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 #include "minddata/dataset/engine/cache/cache_admin_arg.h"
17 #include <sys/types.h>
18 #include <sys/wait.h>
19 #include <unistd.h>
20 #include <algorithm>
21 #include <cerrno>
22 #include <iomanip>
23 #include <iostream>
24 #include <string>
25 #include <cstdlib>
26 #include <vector>
27 #include "minddata/dataset/engine/cache/cache_request.h"
28 #include "minddata/dataset/engine/cache/cache_client.h"
29 #include "minddata/dataset/engine/cache/cache_server.h"
30 #include "minddata/dataset/engine/cache/cache_ipc.h"
31 #include "minddata/dataset/util/path.h"
32 #include "minddata/dataset/include/dataset/constants.h"
33
34 namespace mindspore {
35 namespace dataset {
36 const char CacheAdminArgHandler::kServerBinary[] = "cache_server";
37
CacheAdminArgHandler()38 CacheAdminArgHandler::CacheAdminArgHandler()
39 : command_id_(CommandId::kCmdUnknown),
40 num_workers_(kDefaultNumWorkers),
41 shm_mem_sz_(kDefaultSharedMemorySize),
42 log_level_(kDefaultLogLevel),
43 memory_cap_ratio_(kDefaultMemoryCapRatio),
44 hostname_(kCfgDefaultCacheHost),
45 port_(kCfgDefaultCachePort),
46 spill_dir_("") {
47 std::string env_cache_host = common::GetEnv("MS_CACHE_HOST");
48 std::string env_cache_port = common::GetEnv("MS_CACHE_PORT");
49 if (!env_cache_host.empty()) {
50 hostname_ = env_cache_host;
51 }
52 if (!env_cache_port.empty()) {
53 char *end = nullptr;
54 port_ = static_cast<int32_t>(strtol(env_cache_port.c_str(), &end, kDecimal));
55 if (*end != '\0') {
56 std::cerr << "Cache port from env variable MS_CACHE_PORT is invalid\n";
57 port_ = 0; // cause the port range validation to generate an error during the validation checks
58 }
59 }
60 std::string env_log_level = common::GetEnv("GLOG_v");
61 if (!env_log_level.empty()) {
62 char *end = nullptr;
63 log_level_ = static_cast<int32_t>(strtol(env_log_level.c_str(), &end, kDecimal));
64 if (*end != '\0') {
65 std::cerr << "Log level from env variable GLOG_v is invalid\n";
66 log_level_ = -1; // cause the log level range validation to generate an error during the validation checks
67 }
68 }
69 // Initialize the command mappings
70 arg_map_["-h"] = ArgValue::kArgHost;
71 arg_map_["--hostname"] = ArgValue::kArgHost;
72 arg_map_["-p"] = ArgValue::kArgPort;
73 arg_map_["--port"] = ArgValue::kArgPort;
74 arg_map_["--start"] = ArgValue::kArgStart;
75 arg_map_["--stop"] = ArgValue::kArgStop;
76 arg_map_["--help"] = ArgValue::kArgHelp;
77 arg_map_["--generate_session"] = ArgValue::kArgGenerateSession;
78 arg_map_["-g"] = ArgValue::kArgGenerateSession;
79 arg_map_["--destroy_session"] = ArgValue::kArgDestroySession;
80 arg_map_["-d"] = ArgValue::kArgDestroySession;
81 arg_map_["--spilldir"] = ArgValue::kArgSpillDir;
82 arg_map_["-s"] = ArgValue::kArgSpillDir;
83 arg_map_["-w"] = ArgValue::kArgNumWorkers;
84 arg_map_["--workers"] = ArgValue::kArgNumWorkers;
85 arg_map_["-m"] = ArgValue::kArgSharedMemorySize;
86 arg_map_["--shared_memory_size"] = ArgValue::kArgSharedMemorySize;
87 arg_map_["-l"] = ArgValue::kArgLogLevel;
88 arg_map_["--loglevel"] = ArgValue::kArgLogLevel;
89 arg_map_["-r"] = ArgValue::kArgMemoryCapRatio;
90 arg_map_["--memory_cap_ratio"] = ArgValue::kArgMemoryCapRatio;
91 arg_map_["--list_sessions"] = ArgValue::kArgListSessions;
92 arg_map_["--server_info"] = ArgValue::kArgServerInfo;
93 // Initialize argument tracker with false values
94 for (int16_t i = 0; i < static_cast<int16_t>(ArgValue::kArgNumArgs); ++i) {
95 ArgValue currAV = static_cast<ArgValue>(i);
96 used_args_[currAV] = false;
97 }
98 }
99
100 CacheAdminArgHandler::~CacheAdminArgHandler() = default;
101
AssignArg(const std::string & option,std::vector<uint32_t> * out_arg,std::stringstream * arg_stream,CommandId command_id)102 Status CacheAdminArgHandler::AssignArg(const std::string &option, std::vector<uint32_t> *out_arg,
103 std::stringstream *arg_stream, CommandId command_id) {
104 // Detect if the user tried to provide this argument more than once
105 ArgValue selected_arg = arg_map_[option];
106 if (used_args_[selected_arg]) {
107 std::string err_msg = "The " + option + " argument was given more than once.";
108 return Status(StatusCode::kMDSyntaxError, err_msg);
109 }
110
111 // Flag that this arg is used now
112 used_args_[selected_arg] = true;
113
114 // Some options are just arguments, for example "--port 50052" is not a command, it's just a argument.
115 // Other options are actual commands, for example "--destroy_session 1234". This executes the destroy session.
116 // If this option is also a command, make sure there has not been multiple commands given before assigning it.
117 if (command_id != CommandId::kCmdUnknown) {
118 if (command_id_ != CommandId::kCmdUnknown) {
119 std::string err_msg = "Only one command at a time is allowed. Invalid command: " + option;
120 return Status(StatusCode::kMDSyntaxError, err_msg);
121 } else {
122 command_id_ = command_id;
123 }
124 }
125
126 uint32_t value_as_uint;
127 while (arg_stream->rdbuf()->in_avail() != 0) {
128 std::stringstream::pos_type pos = arg_stream->tellg();
129 *arg_stream >> value_as_uint;
130 if (arg_stream->fail()) {
131 arg_stream->clear();
132 (void)arg_stream->seekg(pos, std::ios::beg);
133 break;
134 } else {
135 out_arg->push_back(value_as_uint);
136 }
137 }
138
139 if (out_arg->empty()) {
140 std::string err_msg = option + " option requires an argument field. Syntax: " + option + " <field>";
141 return Status(StatusCode::kMDSyntaxError, err_msg);
142 }
143
144 return Status::OK();
145 }
146
AssignArg(const std::string & option,int32_t * out_arg,std::stringstream * arg_stream,CommandId command_id)147 Status CacheAdminArgHandler::AssignArg(const std::string &option, int32_t *out_arg, std::stringstream *arg_stream,
148 CommandId command_id) {
149 // Detect if the user tried to provide this argument more than once
150 ArgValue selected_arg = arg_map_[option];
151 if (used_args_[selected_arg]) {
152 std::string err_msg = "The " + option + " argument was given more than once.";
153 return Status(StatusCode::kMDSyntaxError, err_msg);
154 }
155
156 // Flag that this arg is used now
157 used_args_[selected_arg] = true;
158
159 // Some options are just arguments, for example "--port 50052" is not a command, it's just a argument.
160 // Other options are actual commands, for example "--destroy_session 1234". This executes the destroy session.
161 // If this option is also a command, make sure there has not been multiple commands given before assigning it.
162 if (command_id != CommandId::kCmdUnknown) {
163 if (command_id_ != CommandId::kCmdUnknown) {
164 std::string err_msg = "Only one command at a time is allowed. Invalid command: " + option;
165 return Status(StatusCode::kMDSyntaxError, err_msg);
166 } else {
167 command_id_ = command_id;
168 }
169 }
170
171 std::string value_as_string;
172
173 // Fetch the argument from the arg stream into a string
174 *arg_stream >> value_as_string;
175 if (value_as_string.empty()) {
176 std::string err_msg = option + " option requires an argument field. Syntax: " + option + " <field>";
177 return Status(StatusCode::kMDSyntaxError, err_msg);
178 }
179
180 // Now, attempt to convert the value into it's numeric format for output
181 try {
182 *out_arg = static_cast<int32_t>(std::stoul(value_as_string));
183 } catch (const std::exception &e) {
184 std::string err_msg = "Invalid numeric value: " + value_as_string;
185 return Status(StatusCode::kMDSyntaxError, err_msg);
186 }
187
188 return Status::OK();
189 }
190
AssignArg(const std::string & option,std::string * out_arg,std::stringstream * arg_stream,CommandId command_id)191 Status CacheAdminArgHandler::AssignArg(const std::string &option, std::string *out_arg, std::stringstream *arg_stream,
192 CommandId command_id) {
193 // Detect if the user tried to provide this argument more than once
194 ArgValue selected_arg = arg_map_[option];
195 if (used_args_[selected_arg]) {
196 std::string err_msg = "The " + option + " argument was given more than once.";
197 return Status(StatusCode::kMDSyntaxError, err_msg);
198 }
199
200 // Flag that this arg is used now
201 used_args_[selected_arg] = true;
202
203 // Some options are just arguments, for example "--hostname "127.0.0.1" is not a command, it's just an argument.
204 // Other options are actual commands, for example "--start".
205 // If this option is also a command, make sure there has not been multiple commands given before assigning it.
206 if (command_id != CommandId::kCmdUnknown) {
207 if (command_id_ != CommandId::kCmdUnknown) {
208 std::string err_msg = "Only one command at a time is allowed. Invalid command: " + option;
209 return Status(StatusCode::kMDSyntaxError, err_msg);
210 } else {
211 command_id_ = command_id;
212 }
213 }
214
215 // If there is no argument to get, such as the --start command, then out_arg will be a nullptr.
216 if (out_arg != nullptr) {
217 // Fetch the argument from the arg stream into a string
218 if (arg_stream->rdbuf()->in_avail() != 0) {
219 *arg_stream >> *out_arg;
220 } else {
221 std::string err_msg = option + " option requires an argument field. Syntax: " + option + " <field>";
222 return Status(StatusCode::kMDSyntaxError, err_msg);
223 }
224
225 if (out_arg->empty()) {
226 std::string err_msg = option + " option requires an argument field. Syntax: " + option + " <field>";
227 return Status(StatusCode::kMDSyntaxError, err_msg);
228 }
229 }
230
231 return Status::OK();
232 }
233
AssignArg(const std::string & option,float * out_arg,std::stringstream * arg_stream,CommandId command_id)234 Status CacheAdminArgHandler::AssignArg(const std::string &option, float *out_arg, std::stringstream *arg_stream,
235 CommandId command_id) {
236 // Detect if the user tried to provide this argument more than once
237 ArgValue selected_arg = arg_map_[option];
238 if (used_args_[selected_arg]) {
239 std::string err_msg = "The " + option + " argument was given more than once.";
240 return Status(StatusCode::kMDSyntaxError, err_msg);
241 }
242
243 // Flag that this arg is used now
244 used_args_[selected_arg] = true;
245
246 // Some options are just arguments, for example "--hostname "127.0.0.1" is not a command, it's just an argument.
247 // Other options are actual commands, for example "--start".
248 // If this option is also a command, make sure there has not been multiple commands given before assigning it.
249 if (command_id != CommandId::kCmdUnknown) {
250 if (command_id_ != CommandId::kCmdUnknown) {
251 std::string err_msg = "Only one command at a time is allowed. Invalid command: " + option;
252 return Status(StatusCode::kMDSyntaxError, err_msg);
253 } else {
254 command_id_ = command_id;
255 }
256 }
257
258 std::string value_as_string;
259
260 // Fetch the argument from the arg stream into a string
261 *arg_stream >> value_as_string;
262 if (value_as_string.empty()) {
263 std::string err_msg = option + " option requires an argument field. Syntax: " + option + " <field>";
264 return Status(StatusCode::kMDSyntaxError, err_msg);
265 }
266
267 // Now, attempt to convert the value into it's string format for output
268 try {
269 *out_arg = std::stof(value_as_string, nullptr);
270 } catch (const std::exception &e) {
271 std::string err_msg = "Invalid numeric value: " + value_as_string;
272 return Status(StatusCode::kMDSyntaxError, err_msg);
273 }
274
275 return Status::OK();
276 }
277
ParseArgStream(std::stringstream * arg_stream)278 Status CacheAdminArgHandler::ParseArgStream(std::stringstream *arg_stream) {
279 std::string tok;
280 while (*arg_stream >> tok) {
281 switch (arg_map_[tok]) {
282 case ArgValue::kArgHost: {
283 RETURN_IF_NOT_OK(AssignArg(tok, &hostname_, arg_stream));
284 // Temporary sanity check. We only support localhost for now
285 if (hostname_ != std::string(kCfgDefaultCacheHost)) {
286 std::string err_msg =
287 "Invalid host interface: " + hostname_ + ". Current limitation, only 127.0.0.1 can be used.";
288 return Status(StatusCode::kMDSyntaxError, err_msg);
289 }
290 break;
291 }
292 case ArgValue::kArgPort: {
293 RETURN_IF_NOT_OK(AssignArg(tok, &port_, arg_stream));
294 break;
295 }
296 case ArgValue::kArgStart: {
297 RETURN_IF_NOT_OK(AssignArg(tok, static_cast<std::string *>(nullptr), arg_stream, CommandId::kCmdStart));
298 break;
299 }
300 case ArgValue::kArgStop: {
301 RETURN_IF_NOT_OK(AssignArg(tok, static_cast<std::string *>(nullptr), arg_stream, CommandId::kCmdStop));
302 break;
303 }
304 case ArgValue::kArgGenerateSession: {
305 RETURN_IF_NOT_OK(
306 AssignArg(tok, static_cast<std::string *>(nullptr), arg_stream, CommandId::kCmdGenerateSession));
307 break;
308 }
309 case ArgValue::kArgHelp: {
310 command_id_ = CommandId::kCmdHelp;
311 break;
312 }
313 case ArgValue::kArgDestroySession: {
314 RETURN_IF_NOT_OK(AssignArg(tok, &session_ids_, arg_stream, CommandId::kCmdDestroySession));
315 break;
316 }
317 case ArgValue::kArgNumWorkers: {
318 RETURN_IF_NOT_OK(AssignArg(tok, &num_workers_, arg_stream));
319 break;
320 }
321 case ArgValue::kArgSpillDir: {
322 RETURN_IF_NOT_OK(AssignArg(tok, &spill_dir_, arg_stream));
323 break;
324 }
325 case ArgValue::kArgSharedMemorySize: {
326 RETURN_IF_NOT_OK(AssignArg(tok, &shm_mem_sz_, arg_stream));
327 break;
328 }
329 case ArgValue::kArgLogLevel: {
330 RETURN_IF_NOT_OK(AssignArg(tok, &log_level_, arg_stream));
331 break;
332 }
333 case ArgValue::kArgMemoryCapRatio: {
334 RETURN_IF_NOT_OK(AssignArg(tok, &memory_cap_ratio_, arg_stream));
335 break;
336 }
337 case ArgValue::kArgListSessions: {
338 RETURN_IF_NOT_OK(AssignArg(tok, static_cast<std::string *>(nullptr), arg_stream, CommandId::kCmdListSessions));
339 break;
340 }
341 case ArgValue::kArgServerInfo: {
342 RETURN_IF_NOT_OK(AssignArg(tok, static_cast<std::string *>(nullptr), arg_stream, CommandId::kCmdServerInfo));
343 break;
344 }
345 default: {
346 // Save space delimited trailing arguments
347 trailing_args_ += (" " + tok);
348 break;
349 }
350 }
351 }
352
353 RETURN_IF_NOT_OK(Validate());
354
355 return Status::OK();
356 }
357
Validate()358 Status CacheAdminArgHandler::Validate() {
359 // This sanity check is delayed until now in case there may be valid use-cases of trailing args.
360 // Any unhandled arguments at this point is an error.
361 if (!trailing_args_.empty()) {
362 std::string err_msg = "Invalid arguments provided: " + trailing_args_;
363 err_msg += "\nPlease try `cache_admin --help` for more information";
364 return Status(StatusCode::kMDSyntaxError, err_msg);
365 }
366
367 // The user must pick at least one command. i.e. it's meaningless to just give a hostname or port but no command to
368 // run.
369 if (command_id_ == CommandId::kCmdUnknown) {
370 std::string err_msg = "No command provided";
371 err_msg += "\nPlease try `cache_admin --help` for more information";
372 return Status(StatusCode::kMDSyntaxError, err_msg);
373 }
374
375 // Additional checks here
376 auto max_num_workers = std::max<int32_t>(std::thread::hardware_concurrency(), kMaxNumWorkers);
377 if (used_args_[ArgValue::kArgNumWorkers] && (num_workers_ < 1 || num_workers_ > max_num_workers)) {
378 // Check the value of num_workers only if it's provided by users.
379 return Status(StatusCode::kMDSyntaxError,
380 "Number of workers must be in range of 1 and " + std::to_string(max_num_workers) + ".");
381 }
382
383 if (log_level_ < MsLogLevel::kDebug || log_level_ > MsLogLevel::kException) {
384 return Status(StatusCode::kMDSyntaxError, "Log level must be in range (0..4).");
385 }
386
387 if (memory_cap_ratio_ <= 0 || memory_cap_ratio_ > 1) {
388 return Status(StatusCode::kMDSyntaxError, "Memory cap ratio should be positive and no greater than 1");
389 }
390
391 if (port_ < kMinLegalPort || port_ > kMaxLegalPort) {
392 return Status(StatusCode::kMDSyntaxError, "Port must be in range (1025..65535).");
393 }
394
395 return Status::OK();
396 }
397
RunCommand()398 Status CacheAdminArgHandler::RunCommand() {
399 switch (command_id_) {
400 case CommandId::kCmdHelp: {
401 Help();
402 break;
403 }
404 case CommandId::kCmdStart: {
405 RETURN_IF_NOT_OK(StartServer());
406 break;
407 }
408 case CommandId::kCmdStop: {
409 RETURN_IF_NOT_OK(StopServer());
410 break;
411 }
412 case CommandId::kCmdGenerateSession: {
413 CacheClientGreeter comm(hostname_, port_, 1);
414 RETURN_IF_NOT_OK(comm.ServiceStart());
415 auto rq = std::make_shared<GenerateSessionIdRequest>();
416 RETURN_IF_NOT_OK(comm.HandleRequest(rq));
417 RETURN_IF_NOT_OK(rq->Wait());
418 std::cout << "Session created for server on port " << std::to_string(port_) << ": " << rq->GetSessionId()
419 << std::endl;
420 break;
421 }
422 case CommandId::kCmdDestroySession: {
423 CacheClientGreeter comm(hostname_, port_, 1);
424 RETURN_IF_NOT_OK(comm.ServiceStart());
425 CacheClientInfo cinfo;
426 for (session_id_type id : session_ids_) {
427 cinfo.set_session_id(id);
428 auto rq = std::make_shared<DropSessionRequest>(cinfo);
429 RETURN_IF_NOT_OK(comm.HandleRequest(rq));
430 RETURN_IF_NOT_OK(rq->Wait());
431 std::cout << "Drop session " << id << " successfully for server on port " << std::to_string(port_) << std::endl;
432 }
433 break;
434 }
435 case CommandId::kCmdListSessions: {
436 CacheClientGreeter comm(hostname_, port_, 1);
437 RETURN_IF_NOT_OK(comm.ServiceStart());
438 auto rq = std::make_shared<ListSessionsRequest>();
439 std::cout << "Listing sessions for server on port " << port_ << "\n" << std::endl;
440 RETURN_IF_NOT_OK(comm.HandleRequest(rq));
441 RETURN_IF_NOT_OK(rq->Wait());
442 std::vector<SessionCacheInfo> session_info = rq->GetSessionCacheInfo();
443 if (!session_info.empty()) {
444 std::cout << std::setw(12) << "Session" << std::setw(12) << "Cache Id" << std::setw(12) << "Mem cached"
445 << std::setw(12) << "Disk cached" << std::setw(16) << "Avg cache size" << std::setw(10) << "Numa hit"
446 << std::endl;
447 for (auto curr_session : session_info) {
448 std::string cache_id;
449 std::string stat_mem_cached;
450 std::string stat_disk_cached;
451 std::string stat_avg_cached;
452 std::string stat_numa_hit;
453 uint32_t crc = (curr_session.connection_id & 0x00000000FFFFFFFF);
454 cache_id = (curr_session.connection_id == 0) ? "n/a" : std::to_string(crc);
455 stat_mem_cached =
456 (curr_session.stats.num_mem_cached == 0) ? "n/a" : std::to_string(curr_session.stats.num_mem_cached);
457 stat_disk_cached =
458 (curr_session.stats.num_disk_cached == 0) ? "n/a" : std::to_string(curr_session.stats.num_disk_cached);
459 stat_avg_cached =
460 (curr_session.stats.avg_cache_sz == 0) ? "n/a" : std::to_string(curr_session.stats.avg_cache_sz);
461 stat_numa_hit =
462 (curr_session.stats.num_numa_hit == 0) ? "n/a" : std::to_string(curr_session.stats.num_numa_hit);
463
464 std::cout << std::setw(12) << curr_session.session_id << std::setw(12) << cache_id << std::setw(12)
465 << stat_mem_cached << std::setw(12) << stat_disk_cached << std::setw(16) << stat_avg_cached
466 << std::setw(10) << stat_numa_hit << std::endl;
467 }
468 } else {
469 std::cout << "No active sessions." << std::endl;
470 }
471 break;
472 }
473 case CommandId::kCmdServerInfo: {
474 RETURN_IF_NOT_OK(ShowServerInfo());
475 break;
476 }
477 default: {
478 RETURN_STATUS_UNEXPECTED("Invalid cache admin command id.");
479 break;
480 }
481 }
482
483 return Status::OK();
484 }
485
ShowServerInfo()486 Status CacheAdminArgHandler::ShowServerInfo() {
487 CacheClientGreeter comm(hostname_, port_, 1);
488 RETURN_IF_NOT_OK(comm.ServiceStart());
489 auto rq = std::make_shared<ListSessionsRequest>();
490 RETURN_IF_NOT_OK(comm.HandleRequest(rq));
491 RETURN_IF_NOT_OK(rq->Wait());
492
493 auto session_ids = rq->GetSessionIds();
494 auto server_cfg_info = rq->GetServerStat();
495 int32_t num_workers = server_cfg_info.num_workers;
496 int8_t log_level = server_cfg_info.log_level;
497 std::string spill_dir = server_cfg_info.spill_dir;
498 if (spill_dir.empty()) {
499 spill_dir = "None";
500 }
501
502 int name_w = 20;
503 int value_w = 50;
504 std::cout << "Cache Server Configuration: " << std::endl;
505 std::cout << std::string(name_w + value_w, '-') << std::endl;
506 std::cout << std::left << std::setw(name_w) << "config name" << std::setw(value_w) << "value" << std::endl;
507 std::cout << std::string(name_w + value_w, '-') << std::endl;
508 std::cout << std::left << std::setw(name_w) << "hostname" << std::setw(value_w) << hostname_ << std::endl;
509 std::cout << std::left << std::setw(name_w) << "port" << std::setw(value_w) << port_ << std::endl;
510 std::cout << std::left << std::setw(name_w) << "number of workers" << std::setw(value_w)
511 << std::to_string(num_workers) << std::endl;
512 std::cout << std::left << std::setw(name_w) << "log level" << std::setw(value_w) << std::to_string(log_level)
513 << std::endl;
514 std::cout << std::left << std::setw(name_w) << "spill dir" << std::setw(value_w) << spill_dir << std::endl;
515 std::cout << std::string(name_w + value_w, '-') << std::endl;
516
517 std::cout << "Active sessions: " << std::endl;
518 if (!session_ids.empty()) {
519 for (auto session_id : session_ids) {
520 std::cout << session_id << " ";
521 }
522 std::cout << std::endl << "(Please use 'cache_admin --list_sessions' to get detailed info of sessions.)\n";
523 } else {
524 std::cout << "No active sessions." << std::endl;
525 }
526 return Status::OK();
527 }
528
StopServer()529 Status CacheAdminArgHandler::StopServer() {
530 CacheClientGreeter comm(hostname_, port_, 1);
531 RETURN_IF_NOT_OK(comm.ServiceStart());
532 SharedMessage msg;
533 RETURN_IF_NOT_OK(msg.Create());
534 auto rq = std::make_shared<ServerStopRequest>(msg.GetMsgQueueId());
535 RETURN_IF_NOT_OK(comm.HandleRequest(rq));
536 Status rc = rq->Wait();
537 if (rc.IsError()) {
538 msg.RemoveResourcesOnExit();
539 if (rc == StatusCode::kMDNetWorkError) {
540 std::string errMsg =
541 "Server on port " + std::to_string(port_) + " is not reachable or has been shutdown already.";
542 return Status(StatusCode::kMDNetWorkError, errMsg);
543 }
544 return rc;
545 }
546 // OK return code only means the server acknowledge our request but we still
547 // have to wait for its complete shutdown because the server will shutdown
548 // the comm layer as soon as the request is received, and we need to wait
549 // on the message queue instead.
550 // The server will send a message back and remove the queue and we will then wake up. But on the safe
551 // side, we will also set up an alarm and kill this process if we hang on
552 // the message queue.
553 (void)alarm(kAlarmDeadline);
554 Status dummy_rc;
555 (void)msg.ReceiveStatus(&dummy_rc);
556 std::cout << "Cache server on port " << std::to_string(port_) << " has been stopped successfully." << std::endl;
557 return Status::OK();
558 }
559
StartServer()560 Status CacheAdminArgHandler::StartServer() {
561 // There currently does not exist any "install path" or method to identify which path the installed binaries will
562 // exist in. As a temporary approach, we will assume that the server binary shall exist in the same path as the
563 // cache_admin binary (this process).
564 const std::string self_proc = "/proc/self/exe";
565 std::string canonical_path;
566 canonical_path.resize(400); // PATH_MAX is large. This value should be big enough for our use.
567 // Some lower level OS library calls are needed here to determine the binary path.
568 // Fetch the path of this binary for admin_cache into C character array and then truncate off the binary name so that
569 // we are left with only the absolute path
570 if (realpath(self_proc.data(), canonical_path.data()) == nullptr) {
571 std::string err_msg = "Failed to identify cache admin binary path: " + std::to_string(errno);
572 RETURN_STATUS_UNEXPECTED(err_msg);
573 }
574 canonical_path.resize(strlen(canonical_path.data()));
575 uint64_t last_separator = canonical_path.find_last_of('/');
576 CHECK_FAIL_RETURN_UNEXPECTED(last_separator != std::string::npos, "No / found");
577 // truncate the binary name so we are left with the absolute path of cache_admin binary
578 canonical_path.resize(last_separator + 1);
579 std::string cache_server_binary = canonical_path + std::string(kServerBinary);
580
581 // Create a pipe before we fork. If all goes well, the child will run as a daemon in the background
582 // and never returns until shutdown. If there is any error, the child will notify us through the pipe.
583 int fd[2];
584 if (pipe(fd) == -1) {
585 std::string err_msg = "Failed to create a pipe for communication " + std::to_string(errno);
586 RETURN_STATUS_UNEXPECTED(err_msg);
587 }
588
589 // fork the child process to become the daemon
590 pid_t pid = fork();
591 // failed to fork
592 if (pid < 0) {
593 std::string err_msg = "Failed to fork process for cache server: " + std::to_string(errno);
594 RETURN_STATUS_UNEXPECTED(err_msg);
595 } else if (pid > 0) {
596 // As a parent, we close the write end. We only listen.
597 close(fd[1]);
598 (void)dup2(fd[0], STDIN_FILENO);
599 close(fd[0]);
600 std::string msg;
601 std::string buf;
602 const uint32_t buf_sz = 1024;
603 buf.resize(buf_sz);
604 auto n = read(0, buf.data(), buf_sz);
605 // keep reading until we drain the pipe
606 while (n > 0) {
607 msg += buf.substr(0, n);
608 n = read(0, buf.data(), buf_sz);
609 }
610 if (n < 0) {
611 std::string err_msg = "Failed to read from pipeline " + std::to_string(errno);
612 RETURN_STATUS_UNEXPECTED(err_msg);
613 }
614
615 int status;
616 if (waitpid(pid, &status, 0) == -1) {
617 RETURN_STATUS_UNEXPECTED("waitpid fails. errno = " + std::to_string(errno));
618 }
619 if (WIFEXITED(status)) {
620 auto exit_status = WEXITSTATUS(status);
621 if (exit_status) {
622 return Status(StatusCode::kMDUnexpectedError, msg);
623 } else {
624 // Not an error, some info message goes to stdout
625 std::cout << msg;
626 }
627 }
628 return Status::OK();
629 } else {
630 // Child here ...
631 // Close all stdin, redirect stdout and stderr to the write end of the pipe.
632 (void)close(fd[0]);
633 (void)dup2(fd[1], STDOUT_FILENO);
634 (void)dup2(fd[1], STDERR_FILENO);
635 (void)close(STDIN_FILENO);
636 (void)close(fd[1]);
637 // exec the cache server binary in this process
638 // If the user did not provide the value of num_workers, we pass -1 to cache server to allow it assign the default.
639 // So that the server knows if the number is provided by users or by default.
640 std::string workers_string = used_args_[ArgValue::kArgNumWorkers] ? std::to_string(num_workers_) : "-1";
641 std::string port_string = std::to_string(port_);
642 std::string shared_memory_string = std::to_string(shm_mem_sz_);
643 std::string minloglevel_string = std::to_string(log_level_);
644 std::string daemonize_string = "true";
645 std::string memory_cap_ratio_string = std::to_string(memory_cap_ratio_);
646
647 char *argv[9];
648 argv[0] = cache_server_binary.data();
649 argv[1] = spill_dir_.data();
650 argv[2] = workers_string.data();
651 argv[3] = port_string.data();
652 argv[4] = shared_memory_string.data();
653 argv[5] = minloglevel_string.data();
654 argv[6] = daemonize_string.data();
655 argv[7] = memory_cap_ratio_string.data();
656 argv[8] = nullptr;
657
658 // Now exec the binary
659 execv(cache_server_binary.data(), argv);
660 // If the exec was successful, this line will never be reached due to process image being replaced.
661 // ..unless exec failed.
662 std::string err_msg = "Failed to exec cache server: " + cache_server_binary;
663 std::cerr << err_msg << std::endl;
664 RETURN_STATUS_UNEXPECTED(err_msg);
665 }
666 }
667
Help()668 void CacheAdminArgHandler::Help() {
669 std::cerr << "Syntax:\n";
670 std::cerr << "cache_admin [--start | --stop]\n";
671 std::cerr << " [[-h | --hostname] <hostname>] Default is " << kCfgDefaultCacheHost << ".\n";
672 std::cerr << " [[-p | --port] <port number>] Default is " << kCfgDefaultCachePort << ".\n";
673 std::cerr << " [[-w | --workers] <number of workers>] Default is " << kDefaultNumWorkers << ".\n";
674 std::cerr << " [[-s | --spilldir] <spilling directory>] Default is no spilling.\n";
675 std::cerr << " [[-l | --loglevel] <log level>] Default is 1 (INFO level).\n";
676 std::cerr << " [--destroy_session | -d] <session id>\n";
677 std::cerr << " [[-p | --port] <port number>]\n";
678 std::cerr << " [--generate_session | -g]\n";
679 std::cerr << " [[-p | --port] <port number>]\n";
680 std::cerr << " [--list_sessions]\n";
681 std::cerr << " [[-p | --port] <port number>]\n";
682 std::cerr << " [--server_info]\n";
683 std::cerr << " [[-p | --port] <port number>]\n";
684 std::cerr << " [--help]" << std::endl;
685 // Do not expose these option to the user via help or documentation, but the options do exist to aid with
686 // development and tuning.
687 // [ [-m | --shared_memory_size] <shared memory size> ]
688 // Default is: kDefaultSharedMemorySizeInGB (Gb in unit)
689 // [ [-r | --memory_cap_ratio] <float percent value>]
690 // Default is kMemoryCapRatio
691 }
692 } // namespace dataset
693 } // namespace mindspore
694